Skip to content

Getting Started with @kafkats/client

Creating a Client

The KafkaClient is the main entry point for all Kafka operations:

typescript
import { KafkaClient } from '@kafkats/client'

const client = new KafkaClient({
	clientId: 'my-application',
	brokers: ['localhost:9092'],
})

Client Options

OptionTypeDescription
clientIdstringIdentifier for this client (appears in broker logs)
brokersstring[]Bootstrap broker addresses
tlsTlsConfigTLS configuration (omit for plaintext)
saslSaslConfigSASL authentication configuration
connectionTimeoutMsnumberConnection timeout in ms (default: 10000)
requestTimeoutMsnumberRequest timeout in ms (default: 30000)
metadataRefreshIntervalMsnumberMetadata refresh interval (default: 300000)
maxInFlightRequestsnumberMax in-flight requests per connection (default: 5)

TLS Configuration

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['kafka.example.com:9093'],
	tls: { enabled: true }, // Use system CA
})

// Or with custom certificates
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['kafka.example.com:9093'],
	tls: {
		enabled: true,
		ca: fs.readFileSync('ca.pem'),
		cert: fs.readFileSync('client.pem'),
		key: fs.readFileSync('client-key.pem'),
	},
})

Creating a Producer

typescript
const producer = client.producer({
	acks: 'all', // Wait for all replicas
	compression: 'snappy', // Compress messages
	lingerMs: 5, // Batch for 5ms
})

// Send messages
await producer.send('my-topic', [{ value: 'Hello!' }])

// Close when done
await producer.disconnect()

See Producer API for full documentation.

Creating a Consumer

typescript
const consumer = client.consumer({
	groupId: 'my-consumer-group',
	autoOffsetReset: 'earliest',
})

// Process messages
await consumer.runEach('my-topic', async (message, ctx) => {
	console.log(`${ctx.topic}[${ctx.partition}] @ ${ctx.offset}: ${message.value}`)
})

See Consumer API for full documentation.

Creating a ShareConsumer (experimental)

Kafka Share Groups (KIP-932) provide queue-like consumption with per-record acknowledgements. They require Kafka 4.1+ with Share Groups enabled (see ShareConsumer API).

typescript
const shareConsumer = client.shareConsumer({
	groupId: 'my-share-group',
})

await shareConsumer.runEach('my-topic', async message => {
	// For string topic subscriptions, key/value are raw Buffers (same as Consumer).
	await process(message.value.toString('utf-8'))

	// If you don't call ack/release/reject, the message is implicitly ack'd (ACCEPT) on success.
	// await message.release()
	// await message.reject()
})

See ShareConsumer API for requirements and full documentation.

Typed Topics

Define topics with type-safe codecs:

typescript
import { topic, string, json } from '@kafkats/client'

interface UserEvent {
	userId: string
	action: 'login' | 'logout'
	timestamp: number
}

const userEvents = topic('user-events', {
	key: string(),
	value: json<UserEvent>(),
})

// Producer - type-checked
await producer.send(userEvents, [
	{
		key: 'user-123',
		value: { userId: 'user-123', action: 'login', timestamp: Date.now() },
	},
])

// Consumer - type-inferred
await consumer.runEach(userEvents, async message => {
	// message.key is string, message.value is UserEvent
	console.log(`User ${message.value.userId} performed ${message.value.action}`)
})

Error Handling

kafkats provides specific error types for different failure scenarios:

typescript
import { KafkaError, ConnectionError, TimeoutError, isRetriable } from '@kafkats/client'

try {
	await producer.send('events', [{ value: 'data' }])
} catch (error) {
	if (error instanceof ConnectionError) {
		console.log('Connection failed:', error.message)
	} else if (error instanceof TimeoutError) {
		console.log('Request timed out')
	} else if (isRetriable(error)) {
		console.log('Retriable error, will retry automatically')
	}
}

Graceful Shutdown

Always close clients when shutting down:

typescript
process.on('SIGTERM', async () => {
	consumer.stop()
	await producer.disconnect()
	process.exit(0)
})

Released under the MIT License.