Usage
Basic Usage
typescript
import { zodCodec } from '@kafkats/flow-codec-zod'
import { z } from 'zod'
// Simple schema
const MessageSchema = z.object({
type: z.string(),
payload: z.unknown(),
timestamp: z.number(),
})
const codec = zodCodec(MessageSchema)With Flow Topics
typescript
import { flow, topic } from '@kafkats/flow'
import { string } from '@kafkats/client'
import { zodCodec } from '@kafkats/flow-codec-zod'
import { z } from 'zod'
const OrderSchema = z.object({
orderId: z.string(),
userId: z.string(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number().int().positive(),
price: z.number().positive(),
})
),
total: z.number().positive(),
status: z.enum(['pending', 'confirmed', 'shipped', 'delivered']),
})
const orders = topic('orders', {
key: string(),
value: zodCodec(OrderSchema),
})
const app = flow({
applicationId: 'order-processor',
client: { clientId: 'order-processor', brokers: ['localhost:9092'] },
})
// Types are fully inferred
app.stream(orders)
.filter((_, order) => order.status === 'pending')
.mapValues(order => ({ ...order, status: 'confirmed' as const }))
.to(confirmedOrdersTopic)With Client Producer/Consumer
typescript
import { KafkaClient, topic, string } from '@kafkats/client'
import { zodCodec } from '@kafkats/codec-zod'
import { z } from 'zod'
const EventSchema = z.object({
type: z.string(),
data: z.record(z.unknown()),
})
const events = topic('events', {
key: string(),
value: zodCodec(EventSchema),
})
const client = new KafkaClient({
clientId: 'my-app',
brokers: ['localhost:9092'],
})
// Producer - validates on send
const producer = client.producer()
await producer.send(events, [
{
key: 'event-1',
value: { type: 'click', data: { page: '/home' } }, // Valid
},
])
// Consumer - validates on receive
const consumer = client.consumer({ groupId: 'my-group' })
await consumer.runEach(events, async message => {
// message.value is validated Event type
console.log(message.value.type)
})Complex Schemas
Nested Objects
typescript
const AddressSchema = z.object({
street: z.string(),
city: z.string(),
country: z.string(),
zipCode: z.string(),
})
const CustomerSchema = z.object({
id: z.string().uuid(),
name: z.string().min(1),
email: z.string().email(),
addresses: z.array(AddressSchema),
primaryAddressIndex: z.number().int().min(0),
})Unions and Discriminated Unions
typescript
// Simple union
const IdSchema = z.union([z.string(), z.number()])
// Discriminated union (recommended)
const EventSchema = z.discriminatedUnion('type', [
z.object({ type: z.literal('click'), page: z.string() }),
z.object({ type: z.literal('purchase'), amount: z.number() }),
z.object({ type: z.literal('signup'), email: z.string().email() }),
])
const eventCodec = zodCodec(EventSchema)Optional and Nullable
typescript
const ProfileSchema = z.object({
username: z.string(),
bio: z.string().optional(),
avatarUrl: z.string().url().nullable(),
metadata: z.record(z.string()).default({}),
})Transformations
typescript
const DateEventSchema = z.object({
type: z.string(),
// Transform string to Date on decode
timestamp: z
.string()
.datetime()
.transform(s => new Date(s)),
})
// Note: Transforms affect the inferred type
type DateEvent = z.infer<typeof DateEventSchema>
// { type: string; timestamp: Date }Error Handling
Zod throws detailed errors on validation failure:
typescript
import { ZodError } from 'zod'
try {
codec.decode(invalidBuffer)
} catch (error) {
if (error instanceof ZodError) {
console.log('Validation errors:')
for (const issue of error.issues) {
console.log(` ${issue.path.join('.')}: ${issue.message}`)
}
}
}Graceful Error Handling
typescript
// Create a codec that returns null on error
const safeCodec = {
encode: (value: Order) => {
const result = OrderSchema.safeParse(value)
if (!result.success) {
console.error('Encode failed:', result.error)
return Buffer.alloc(0)
}
return Buffer.from(JSON.stringify(result.data))
},
decode: (buf: Buffer) => {
try {
const data = JSON.parse(buf.toString())
const result = OrderSchema.safeParse(data)
if (!result.success) {
console.error('Decode failed:', result.error)
return null
}
return result.data
} catch {
return null
}
},
}Schema Evolution
Handle schema changes gracefully:
typescript
// V1 schema
const UserV1 = z.object({
id: z.string(),
name: z.string(),
})
// V2 schema with backward compatibility
const UserV2 = z.object({
id: z.string(),
name: z.string(),
email: z.string().email().optional(), // New optional field
})
// V2 can decode V1 messagesPerformance Tips
- Reuse schemas - Define schemas once, reuse everywhere
- Use
.strict()- Fail on extra properties - Avoid heavy transforms - Keep decode lightweight
- Consider caching - For repeated validations
typescript
// Good: Define once
const UserSchema = z.object({...}).strict()
const userCodec = zodCodec(UserSchema)
// Use everywhere
const topic1 = topic('users-v1', { value: userCodec })
const topic2 = topic('users-v2', { value: userCodec })