Skip to content

Windowing

Windowing groups stream records by time for aggregations. kafkats/flow supports three window types.

Time Windows

Fixed-size, non-overlapping time buckets:

typescript
import { flow, topic, TimeWindows } from '@kafkats/flow'
import { string, json } from '@kafkats/client'

// 5-minute tumbling windows
app.stream(clicks).groupByKey().windowedBy(TimeWindows.of('5m')).count()

Window Duration Syntax

FormatDuration
'100ms'100 milliseconds
'5s'5 seconds
'5m'5 minutes
'1h'1 hour
'1d'1 day
300000300000 ms (number)

Hopping Windows

Overlapping windows with custom advance:

typescript
// 5-minute windows, advancing every 1 minute
TimeWindows.of('5m').advanceBy('1m')
Window 1: [0:00 - 0:05)
Window 2: [0:01 - 0:06)
Window 3: [0:02 - 0:07)
...

Session Windows

Dynamic windows based on activity gaps:

typescript
import { SessionWindows } from '@kafkats/flow'

// Session closes after 30 minutes of inactivity
app.stream(userActivity).groupByKey().windowedBy(SessionWindows.withInactivityGap('30m')).count()

Sessions:

  • Start with first event for a key
  • Extend with each new event
  • Close after inactivity gap elapses

Sliding Windows

Continuous, overlapping windows:

typescript
import { SlidingWindows } from '@kafkats/flow'

// Look back 5 minutes from each event
app.stream(events).groupByKey().windowedBy(SlidingWindows.of('5m')).count()

Window Operations

Once windowed, use aggregation operations:

typescript
const windowed = stream.groupByKey().windowedBy(TimeWindows.of('1h'))

// Count per window
windowed.count()

// Sum per window
windowed.reduce((sum, value) => sum + value.amount)

// Custom aggregation
windowed.aggregate(
	() => ({ count: 0, total: 0 }),
	(key, value, agg) => ({
		count: agg.count + 1,
		total: agg.total + value.amount,
	})
)

Windowed Keys

Windowed aggregations produce Windowed<K> keys:

typescript
interface Windowed<K> {
	key: K
	window: {
		start: number // Window start timestamp (ms)
		end: number // Window end timestamp (ms)
	}
}

Access window boundaries in transformations:

typescript
stream
	.groupByKey()
	.windowedBy(TimeWindows.of('5m'))
	.count()
	.toStream()
	.map((windowedKey, count) => ({
		key: windowedKey.key,
		value: {
			key: windowedKey.key,
			windowStart: new Date(windowedKey.window.start),
			windowEnd: new Date(windowedKey.window.end),
			count,
		},
	}))

Grace Periods

Handle late-arriving data:

typescript
// Accept events up to 10 minutes late
TimeWindows.of('5m').gracePeriod('10m')

Without grace period, late events are dropped. With grace period:

  • Window remains open longer
  • Late events are included in aggregation
  • Higher memory usage

Window Comparison

TypeSizeOverlapUse Case
Time (Tumbling)FixedNoRegular metrics, hourly counts
Time (Hopping)FixedYesSmoothed averages, sliding stats
SessionDynamicNoUser sessions, activity tracking
SlidingFixedContinuousEvent-relative windows

Example: Hourly Metrics

typescript
interface MetricEvent {
	metricName: string
	value: number
	timestamp: number
}

interface HourlyMetric {
	metricName: string
	windowStart: Date
	windowEnd: Date
	sum: number
	count: number
	average: number
}

app.stream<string, MetricEvent>(metricsTopic)
	.groupByKey()
	.windowedBy(TimeWindows.of('1h'))
	.aggregate(
		() => ({ sum: 0, count: 0 }),
		(key, value, agg) => ({
			sum: agg.sum + value.value,
			count: agg.count + 1,
		})
	)
	.toStream()
	.map((windowedKey, agg) => ({
		key: windowedKey.key,
		value: {
			metricName: windowedKey.key,
			windowStart: new Date(windowedKey.window.start),
			windowEnd: new Date(windowedKey.window.end),
			sum: agg.sum,
			count: agg.count,
			average: agg.sum / agg.count,
		} as HourlyMetric,
	}))
	.to(hourlyMetricsTopic)

Example: User Sessions

typescript
interface UserAction {
	userId: string
	action: string
	timestamp: number
}

interface SessionSummary {
	userId: string
	sessionStart: Date
	sessionEnd: Date
	actionCount: number
	actions: string[]
}

app.stream<string, UserAction>(actionsTopic)
	.groupByKey()
	.windowedBy(SessionWindows.withInactivityGap('30m'))
	.aggregate(
		() => ({ actions: [] as string[] }),
		(key, action, agg) => ({
			actions: [...agg.actions, action.action],
		})
	)
	.toStream()
	.map((windowedKey, agg) => ({
		key: windowedKey.key,
		value: {
			userId: windowedKey.key,
			sessionStart: new Date(windowedKey.window.start),
			sessionEnd: new Date(windowedKey.window.end),
			actionCount: agg.actions.length,
			actions: agg.actions,
		} as SessionSummary,
	}))
	.to(sessionsTopic)

State Management

Windowed operations require state stores. Configure via stateStoreProvider:

typescript
import { inMemory } from '@kafkats/flow'
import { lmdb } from '@kafkats/flow-state-lmdb'

// In-memory (default)
const app = flow({
	applicationId: 'my-app',
	client: { clientId: 'my-app', brokers: ['localhost:9092'] },
	stateStoreProvider: inMemory(),
})

// Persistent (LMDB)
const app = flow({
	applicationId: 'my-app',
	client: { clientId: 'my-app', brokers: ['localhost:9092'] },
	stateStoreProvider: lmdb({ stateDir: './state' }),
})

Released under the MIT License.