Compression
kafkats supports multiple compression algorithms for reducing network bandwidth and storage. Compression is applied at the RecordBatch level - the producer compresses batches before sending, and consumers automatically decompress.
Quick Start
import { KafkaClient, CompressionType, compressionCodecs, createSnappyCodec } from '@kafkats/client'
import snappy from 'snappy'
// Register a compression codec
compressionCodecs.register(CompressionType.Snappy, createSnappyCodec(snappy))
// Use compression in producer
const producer = client.producer({
compression: 'snappy',
})Compression Types
| Type | Speed | Ratio | Built-in | Notes |
|---|---|---|---|---|
'none' | Fastest | 1:1 | Yes | No compression |
'gzip' | Slow | Best | Yes | Uses Node.js zlib |
'snappy' | Fast | Good | No | Balanced choice |
'lz4' | Very fast | Good | No | Best for high throughput |
'zstd' | Medium | Best | No | Modern, efficient, versatile |
Built-in Codecs
GZIP is built-in and requires no additional setup:
const producer = client.producer({
compression: 'gzip',
})Pluggable Compression Libraries
For Snappy, LZ4, and Zstd, you must install a compression library and register it. kafkats supports multiple libraries for each algorithm.
Snappy
| Library | Type | Performance |
|---|---|---|
snappy | Native | Fastest |
snappyjs | Pure JS | Good |
snappy (Recommended)
npm install snappyimport snappy from 'snappy'
import { CompressionType, compressionCodecs, createSnappyCodec } from '@kafkats/client'
compressionCodecs.register(CompressionType.Snappy, createSnappyCodec(snappy))snappyjs
npm install snappyjsimport * as SnappyJS from 'snappyjs'
import { CompressionType, compressionCodecs, createSnappyCodec } from '@kafkats/client'
compressionCodecs.register(CompressionType.Snappy, createSnappyCodec(SnappyJS))LZ4
| Library | Type | Performance |
|---|---|---|
lz4-napi | Native | Fastest |
lz4 | Native | Fast |
lz4js | Pure JS | Good |
lz4-napi (Recommended)
npm install lz4-napiimport * as lz4 from 'lz4-napi'
import { CompressionType, compressionCodecs, createLz4Codec } from '@kafkats/client'
compressionCodecs.register(CompressionType.Lz4, createLz4Codec(lz4))node-lz4
npm install lz4import lz4 from 'lz4'
import { CompressionType, compressionCodecs, createLz4Codec } from '@kafkats/client'
compressionCodecs.register(CompressionType.Lz4, createLz4Codec(lz4))lz4js
npm install lz4jsimport * as lz4js from 'lz4js'
import { CompressionType, compressionCodecs, createLz4Codec } from '@kafkats/client'
compressionCodecs.register(CompressionType.Lz4, createLz4Codec(lz4js))Zstd
| Library | Type | Performance |
|---|---|---|
@mongodb-js/zstd | Native | Fastest |
zstd-napi | Native | Fastest |
zstd-codec | WASM | Good |
@mongodb-js/zstd (Recommended)
npm install @mongodb-js/zstdimport { compress, decompress } from '@mongodb-js/zstd'
import { CompressionType, compressionCodecs, createZstdCodec } from '@kafkats/client'
compressionCodecs.register(CompressionType.Zstd, createZstdCodec({ compress, decompress }))zstd-napi
npm install zstd-napiimport { compress, decompress } from 'zstd-napi'
import { CompressionType, compressionCodecs, createZstdCodec } from '@kafkats/client'
compressionCodecs.register(CompressionType.Zstd, createZstdCodec({ compress, decompress }))zstd-codec
npm install zstd-codecimport { ZstdCodec } from 'zstd-codec'
import { CompressionType, compressionCodecs, createZstdCodec } from '@kafkats/client'
// Initialize and register within callback
ZstdCodec.run(zstd => {
const simple = new zstd.Simple()
compressionCodecs.register(CompressionType.Zstd, createZstdCodec(simple))
})Compression Options
Zstd Compression Level
Zstd supports compression levels from 1-22 (default: 3). Lower levels are faster, higher levels achieve better compression:
import { compress, decompress } from '@mongodb-js/zstd'
compressionCodecs.register(CompressionType.Zstd, createZstdCodec({ compress, decompress }, { level: 6 }))Transparent Decompression
Consumers automatically detect and decompress messages without any configuration. The compression type is stored in the RecordBatch header, so consumers can decode messages regardless of which compression was used by the producer.
// Producer uses gzip compression (built-in)
const producer = client.producer({ compression: 'gzip' })
await producer.send('my-topic', [{ value: Buffer.from('compressed data') }])
// Consumer automatically decompresses
const consumer = client.consumer({ groupId: 'my-group' })
for await (const { message } of consumer.stream('my-topic')) {
console.log(message.value.toString()) // 'compressed data'
}TIP
Make sure the compression codec is registered before consuming messages that use that compression type. GZIP works out of the box, but Snappy/LZ4/Zstd codecs must be registered.
Performance Considerations
Choose your compression strategy based on your use case:
| Use Case | Recommended | Why |
|---|---|---|
| High throughput, low CPU | LZ4 or None | Fastest compression/decompression |
| Network-constrained | Zstd or Gzip | Best compression ratio |
| Balanced workload | Snappy | Good mix of speed and compression |
| Log/text data | Gzip or Zstd | Text compresses well with these |
Supported Libraries Summary
Snappy
LZ4
Zstd
- Native:
@mongodb-js/zstd- MongoDB's binding - Native:
zstd-napi- Node-API binding - WASM:
zstd-codec- Emscripten based
Custom Codecs
You can also implement your own compression codec:
import { CompressionCodec, CompressionType, compressionCodecs } from '@kafkats/client'
const myCodec: CompressionCodec = {
async compress(data: Buffer): Promise<Buffer> {
// Your compression logic
return compressedData
},
async decompress(data: Buffer): Promise<Buffer> {
// Your decompression logic
return decompressedData
},
}
compressionCodecs.register(CompressionType.Snappy, myCodec)Next Steps
- Producer API - Configure producer compression
- Configuration - Full configuration reference
- Codecs - Message serialization (different from compression)