Skip to content

Configuration

KafkaClient Configuration

typescript
import { KafkaClient } from '@kafkats/client'

const client = new KafkaClient({
	// Required
	clientId: 'my-app',
	brokers: ['broker1:9092', 'broker2:9092'],

	// Optional
	tls: { enabled: true },
	sasl: { mechanism: 'PLAIN', username: 'user', password: 'pass' },
	connectionTimeoutMs: 10000,
	requestTimeoutMs: 30000,
})

SASL is configured based on mechanism (for example, OAUTHBEARER uses an async provider):

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['broker1:9093'],
	tls: { enabled: true },
	sasl: {
		mechanism: 'OAUTHBEARER',
		oauthBearerProvider: async () => {
			// For Amazon MSK IAM, install:
			//   pnpm add aws-msk-iam-sasl-signer-js
			const { generateAuthToken } = await import('aws-msk-iam-sasl-signer-js')
			const { token } = await generateAuthToken({ region: process.env.AWS_REGION! })
			return { value: token }
		},
	},
})

Options Reference

OptionTypeDefaultDescription
clientIdstring-Required. Client identifier shown in broker logs
brokersstring[]-Required. Bootstrap broker addresses
requestTimeoutMsnumber30000Request timeout (ms)
connectionTimeoutMsnumber10000Connection timeout (ms)
metadataRefreshIntervalMsnumber300000How often to refresh cluster metadata (ms)
maxInFlightRequestsnumber5Max in-flight requests per broker connection
tlsTlsConfig-TLS configuration (omit for plaintext)
saslSaslConfig-SASL authentication configuration
loggerLogger-Custom logger implementation
logLevel'debug' | 'info' | 'warn' | 'error''info'Log level for the built-in logger

SASL Reauthentication

If the broker enables periodic SASL reauthentication (connections.max.reauth.ms), kafkats will reauthenticate automatically. You can tune how early it refreshes via sasl.reauthenticationThresholdMs (default: 10000).

Producer Configuration

typescript
const producer = client.producer({
	acks: 'all',
	compression: 'snappy',
	lingerMs: 5,
	maxBatchBytes: 16384,
	retries: 3,
	idempotent: false,
	transactionalId: undefined,
})

Options Reference

OptionTypeDefaultDescription
acks'all' | 'leader' | 'none''all'Acknowledgment mode
compression'none' | 'gzip' | 'snappy' | 'lz4' | 'zstd''none'Compression type
lingerMsnumber5Batch wait time (ms)
maxBatchBytesnumber16384Max batch size (bytes)
retriesnumber3Retry attempts
retryBackoffMsnumber100Initial retry backoff (ms)
maxRetryBackoffMsnumber1000Max retry backoff (ms)
partitioner'murmur2' | 'round-robin' | Function'murmur2'Partitioning strategy
requestTimeoutMsnumber30000Request timeout (ms)
idempotentbooleanfalseEnable idempotent producer
maxInFlightnumber5Max in-flight requests
transactionalIdstring-Enable transactions
transactionTimeoutMsnumber60000Transaction timeout (ms)

Acknowledgment Modes

ValueMeaningDurability
'none'Don't wait for acknowledgmentLowest - may lose messages
'leader'Wait for leader to writeMedium - may lose if leader fails
'all'Wait for all in-sync replicasHighest - recommended

Compression Options

TypeSpeedRatioNotes
'none'Fastest1:1No compression
'gzip'SlowBestGood for text, built-in
'snappy'FastGoodBalanced choice
'lz4'FastestGoodBest for high throughput
'zstd'MediumBestModern, efficient

GZIP is built-in. For Snappy, LZ4, and Zstd, you need to install and register a compression library. See Compression for supported libraries.

Consumer Configuration

typescript
const consumer = client.consumer({
	groupId: 'my-group',
	sessionTimeoutMs: 30000,
	heartbeatIntervalMs: 3000,
	autoOffsetReset: 'latest',
	isolationLevel: 'read_committed',
})

Options Reference

OptionTypeDefaultDescription
groupIdstring-Required. Consumer group ID
groupInstanceIdstring-Static membership ID
sessionTimeoutMsnumber30000Session timeout (ms)
rebalanceTimeoutMsnumber60000Rebalance timeout (ms)
heartbeatIntervalMsnumber3000Heartbeat interval (ms)
maxBytesPerPartitionnumber1048576Max fetch bytes per partition
minBytesnumber1Min bytes to fetch
maxWaitMsnumber5000Max fetch wait time (ms)
autoOffsetReset'earliest' | 'latest' | 'none''latest'Offset reset strategy
isolationLevel'read_committed' | 'read_uncommitted''read_committed'Transaction isolation
partitionAssignmentStrategy'cooperative-sticky' | 'sticky' | 'range''cooperative-sticky'Assignment strategy

Offset Reset Strategies

ValueBehavior
'earliest'Start from beginning of topic
'latest'Start from end of topic (new messages only)
'none'Throw error if no committed offset exists

Isolation Levels

ValueBehavior
'read_committed'Only see committed transactional messages
'read_uncommitted'See all messages including uncommitted

Environment-Based Configuration

typescript
const client = new KafkaClient({
	clientId: process.env.KAFKA_CLIENT_ID || 'my-app',
	brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
	tls: process.env.KAFKA_TLS_ENABLED === 'true' ? { enabled: true } : undefined,
	sasl:
		process.env.KAFKA_SASL_MECHANISM === 'OAUTHBEARER'
			? {
					mechanism: 'OAUTHBEARER',
					oauthBearerProvider: async () => {
						// For Amazon MSK IAM, see the "Authentication" docs for required dependency + setup.
						const { generateAuthToken } = await import('aws-msk-iam-sasl-signer-js')
						const { token } = await generateAuthToken({ region: process.env.AWS_REGION! })
						return { value: token }
					},
				}
			: process.env.KAFKA_SASL_USERNAME
				? {
						mechanism: (process.env.KAFKA_SASL_MECHANISM || 'SCRAM-SHA-256') as 'SCRAM-SHA-256',
						username: process.env.KAFKA_SASL_USERNAME,
						password: process.env.KAFKA_SASL_PASSWORD!,
					}
				: undefined,
})

Released under the MIT License.