Word Count
The classic word count example using @kafkats/flow.
Code
typescript
import { flow, topic, TimeWindows } from '@kafkats/flow'
import { string, json } from '@kafkats/client'
// Input: lines of text
const lines = topic('lines', {
key: string(),
value: string(),
})
// Output: word counts
interface WordCount {
word: string
count: number
windowStart: number
windowEnd: number
}
const wordCounts = topic('word-counts', {
key: string(),
value: json<WordCount>(),
})
async function main() {
const app = flow({
applicationId: 'word-count',
client: { clientId: 'word-count', brokers: ['localhost:9092'] },
})
app.stream(lines)
// Split lines into words
.flatMapValues(line =>
line
.toLowerCase()
.split(/\s+/)
.filter(word => word.length > 0)
)
// Rekey by word
.selectKey((_, word) => word)
// Group by word
.groupByKey()
// Count in 1-minute windows
.windowedBy(TimeWindows.of('1m'))
.count()
// Convert to output format
.toStream()
.map((windowedKey, count) => ({
key: windowedKey.key,
value: {
word: windowedKey.key,
count,
windowStart: windowedKey.window.start,
windowEnd: windowedKey.window.end,
},
}))
// Write results
.to(wordCounts)
// Handle shutdown
process.on('SIGTERM', async () => {
await app.close()
})
console.log('Word count processor started')
await app.start()
}
main().catch(console.error)How It Works
- Read lines of text from input topic
- FlatMapValues - Split each line into words
- SelectKey - Rekey by the word itself
- GroupByKey - Group all occurrences of each word
- WindowedBy - Apply 1-minute time windows
- Count - Count occurrences per word per window
- Map - Transform to output format
- To - Write to output topic
Topology Visualization
lines
│
├──► flatMapValues (split into words)
│
├──► selectKey (key by word)
│
├──► groupByKey
│
├──► windowedBy (1 minute)
│
├──► count
│
├──► toStream
│
├──► map (format output)
│
└──► word-countsTesting
- Start the word count processor:
bash
npx tsx word-count.ts- Send some text:
typescript
import { KafkaClient } from '@kafkats/client'
const client = new KafkaClient({
clientId: 'producer',
brokers: ['localhost:9092'],
})
const producer = client.producer()
await producer.send('lines', [
{ key: 'doc1', value: 'hello world hello' },
{ key: 'doc2', value: 'hello kafka streams' },
{ key: 'doc3', value: 'kafka is great kafka' },
])
await producer.disconnect()- Check results:
bash
kafka-console-consumer --topic word-counts --from-beginningExpected output:
json
{"word":"hello","count":1,"windowStart":1703001200000,"windowEnd":1703001260000}
{"word":"world","count":1,"windowStart":1703001200000,"windowEnd":1703001260000}
{"word":"hello","count":2,"windowStart":1703001200000,"windowEnd":1703001260000}
{"word":"hello","count":3,"windowStart":1703001200000,"windowEnd":1703001260000}
{"word":"kafka","count":1,"windowStart":1703001200000,"windowEnd":1703001260000}
{"word":"streams","count":1,"windowStart":1703001200000,"windowEnd":1703001260000}
{"word":"kafka","count":2,"windowStart":1703001200000,"windowEnd":1703001260000}
{"word":"is","count":1,"windowStart":1703001200000,"windowEnd":1703001260000}
{"word":"great","count":1,"windowStart":1703001200000,"windowEnd":1703001260000}
{"word":"kafka","count":3,"windowStart":1703001200000,"windowEnd":1703001260000}Variations
Global Count (No Window)
Count all time:
typescript
app.stream(lines)
.flatMapValues(line => line.toLowerCase().split(/\s+/))
.selectKey((_, word) => word)
.groupByKey()
.count() // No windowing
.toStream()
.to(wordCounts)Top N Words
Find top 10 words per window:
typescript
// Would require additional processing:
// 1. Collect all word counts
// 2. Sort by count
// 3. Take top 10Stop Words Filtering
Filter common words:
typescript
const stopWords = new Set(['the', 'a', 'an', 'is', 'are', 'was', 'were'])
app.stream(lines).flatMapValues(line =>
line
.toLowerCase()
.split(/\s+/)
.filter(word => word.length > 0 && !stopWords.has(word))
)
// ... rest of pipeline