Skip to content

Configuration

KafkaClient Configuration

typescript
import { KafkaClient } from '@kafkats/client'

const client = new KafkaClient({
	// Required
	clientId: 'my-app',
	brokers: ['broker1:9092', 'broker2:9092'],

	// Optional
	tls: { enabled: true },
	sasl: { mechanism: 'PLAIN', username: 'user', password: 'pass' },
	connectionTimeoutMs: 10000,
	requestTimeoutMs: 30000,
})

SASL is configured based on mechanism (for example, OAUTHBEARER uses an async provider):

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['broker1:9093'],
	tls: { enabled: true },
	sasl: {
		mechanism: 'OAUTHBEARER',
		oauthBearerProvider: async () => {
			// For Amazon MSK IAM, install:
			//   pnpm add aws-msk-iam-sasl-signer-js
			const { generateAuthToken } = await import('aws-msk-iam-sasl-signer-js')
			const { token } = await generateAuthToken({ region: process.env.AWS_REGION! })
			return { value: token }
		},
	},
})

Options Reference

OptionTypeDefaultDescription
clientIdstring-Required. Client identifier shown in broker logs
brokersstring[]-Required. Bootstrap broker addresses
requestTimeoutMsnumber30000Request timeout (ms)
connectionTimeoutMsnumber10000Connection timeout (ms)
metadataRefreshIntervalMsnumber300000How often to refresh cluster metadata (ms)
maxInFlightRequestsnumber5Max in-flight requests per broker connection
tlsTlsConfig-TLS configuration (omit for plaintext)
saslSaslConfig-SASL authentication configuration
loggerLogger-Custom logger implementation
logLevel'debug' | 'info' | 'warn' | 'error''info'Log level for the built-in logger

SASL Reauthentication

If the broker enables periodic SASL reauthentication (connections.max.reauth.ms), kafkats will reauthenticate automatically. You can tune how early it refreshes via sasl.reauthenticationThresholdMs (default: 10000).

Producer Configuration

typescript
const producer = client.producer({
	acks: 'all',
	compression: 'snappy',
	lingerMs: 5,
	maxBatchBytes: 16384,
	retries: 3,
	idempotent: false,
	transactionalId: undefined,
})

Options Reference

OptionTypeDefaultDescription
acks'all' | 'leader' | 'none''all'Acknowledgment mode
compression'none' | 'gzip' | 'snappy' | 'lz4' | 'zstd''none'Compression type
lingerMsnumber5Batch wait time (ms)
maxBatchBytesnumber16384Max batch size (bytes)
retriesnumber3Retry attempts
retryBackoffMsnumber100Initial retry backoff (ms)
maxRetryBackoffMsnumber1000Max retry backoff (ms)
partitioner'murmur2' | 'round-robin' | Function'murmur2'Partitioning strategy
requestTimeoutMsnumber30000Request timeout (ms)
idempotentbooleanfalseEnable idempotent producer
maxInFlightnumber5Max in-flight requests
transactionalIdstring-Enable transactions
transactionTimeoutMsnumber60000Transaction timeout (ms)
maxBlockMsnumber60000Max time to block acquiring a producer id during transactional/idempotent producer initialization (ms)

Acknowledgment Modes

ValueMeaningDurability
'none'Don't wait for acknowledgmentLowest - may lose messages
'leader'Wait for leader to writeMedium - may lose if leader fails
'all'Wait for all in-sync replicasHighest - recommended

Compression Options

TypeSpeedRatioNotes
'none'Fastest1:1No compression
'gzip'SlowBestGood for text, built-in
'snappy'FastGoodBalanced choice
'lz4'FastestGoodBest for high throughput
'zstd'MediumBestModern, efficient

GZIP is built-in. For Snappy, LZ4, and Zstd, you need to install and register a compression library. See Compression for supported libraries.

Consumer Configuration

typescript
const consumer = client.consumer({
	groupId: 'my-group',
	sessionTimeoutMs: 30000,
	heartbeatIntervalMs: 3000,
	autoOffsetReset: 'latest',
	isolationLevel: 'read_committed',
})

Options Reference

OptionTypeDefaultDescription
groupIdstring-Required. Consumer group ID
groupInstanceIdstring-Static membership ID
sessionTimeoutMsnumber30000Session timeout (ms)
rebalanceTimeoutMsnumber60000Rebalance timeout (ms)
heartbeatIntervalMsnumber3000Heartbeat interval (ms)
maxBytesPerPartitionnumber1048576Max fetch bytes per partition
minBytesnumber1Min bytes to fetch
maxWaitMsnumber5000Max fetch wait time (ms)
autoOffsetReset'earliest' | 'latest' | 'none''latest'Offset reset strategy
isolationLevel'read_committed' | 'read_uncommitted''read_committed'Transaction isolation
partitionAssignmentStrategy'cooperative-sticky' | 'sticky' | 'range''cooperative-sticky'Assignment strategy
defaultApiTimeoutMsnumber60000Max time to retry offset fetch/commit operations (ms)

Offset Reset Strategies

ValueBehavior
'earliest'Start from beginning of topic
'latest'Start from end of topic (new messages only)
'none'Throw error if no committed offset exists

Isolation Levels

ValueBehavior
'read_committed'Only see committed transactional messages
'read_uncommitted'See all messages including uncommitted

Environment-Based Configuration

typescript
const client = new KafkaClient({
	clientId: process.env.KAFKA_CLIENT_ID || 'my-app',
	brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
	tls: process.env.KAFKA_TLS_ENABLED === 'true' ? { enabled: true } : undefined,
	sasl:
		process.env.KAFKA_SASL_MECHANISM === 'OAUTHBEARER'
			? {
					mechanism: 'OAUTHBEARER',
					oauthBearerProvider: async () => {
						// For Amazon MSK IAM, see the "Authentication" docs for required dependency + setup.
						const { generateAuthToken } = await import('aws-msk-iam-sasl-signer-js')
						const { token } = await generateAuthToken({ region: process.env.AWS_REGION! })
						return { value: token }
					},
				}
			: process.env.KAFKA_SASL_USERNAME
				? {
						mechanism: (process.env.KAFKA_SASL_MECHANISM || 'SCRAM-SHA-256') as 'SCRAM-SHA-256',
						username: process.env.KAFKA_SASL_USERNAME,
						password: process.env.KAFKA_SASL_PASSWORD!,
					}
				: undefined,
})

Released under the MIT License.