Skip to content

Authentication

kafkats supports SASL authentication for secure Kafka connections.

Supported Mechanisms

MechanismDescription
PLAINUsername/password (use with TLS)
SCRAM-SHA-256Challenge-response, SHA-256
SCRAM-SHA-512Challenge-response, SHA-512
OAUTHBEARERBearer token (e.g. AWS MSK IAM)

SASL Options

SASL config is a discriminated union keyed by mechanism.

PLAIN / SCRAM

OptionTypeRequiredDescription
mechanism'PLAIN' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512'YesSASL mechanism name
usernamestringYesSASL username
passwordstringYesSASL password

OAUTHBEARER

OptionTypeRequiredDescription
mechanism'OAUTHBEARER'YesSASL mechanism name
oauthBearerProvider(context) => ({ value, extensions? })YesReturns the bearer token (and optional extensions) for the broker
reauthenticationThresholdMsnumberNoReauthenticate when this many milliseconds remain of broker session lifetime (default: 10000)

context includes { host, port, clientId }.

SASL/PLAIN

Simple username/password authentication. Always use with TLS:

typescript
import { KafkaClient } from '@kafkats/client'

const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['kafka.example.com:9093'],
	tls: { enabled: true },
	sasl: {
		mechanism: 'PLAIN',
		username: 'my-username',
		password: 'my-password',
	},
})

Security

PLAIN mechanism sends credentials in base64 (not encrypted). Always use TLS (tls: { enabled: true }) to protect credentials in transit.

SCRAM-SHA-256

More secure challenge-response authentication:

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['kafka.example.com:9093'],
	tls: { enabled: true },
	sasl: {
		mechanism: 'SCRAM-SHA-256',
		username: 'my-username',
		password: 'my-password',
	},
})

SCRAM-SHA-512

Strongest built-in authentication:

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['kafka.example.com:9093'],
	tls: { enabled: true },
	sasl: {
		mechanism: 'SCRAM-SHA-512',
		username: 'my-username',
		password: 'my-password',
	},
})

OAUTHBEARER

Provide a bearer token per broker connection via oauthBearerProvider. Tokens are commonly short-lived, so generate them on demand and refresh when needed.

If the broker has periodic reauthentication enabled (connections.max.reauth.ms), kafkats will automatically reauthenticate on the existing connection using SaslAuthenticate before the session expires.

typescript
import { KafkaClient } from '@kafkats/client'

const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['kafka.example.com:9093'],
	tls: { enabled: true },
	sasl: {
		mechanism: 'OAUTHBEARER',
		oauthBearerProvider: async ({ host, port }) => {
			const value = await getTokenForBroker(`${host}:${port}`)
			return { value }
		},
	},
})

TLS Configuration

TLS Options

OptionTypeDefaultDescription
enabledbooleanfalseEnables TLS when true
castring | Buffer | Array<string | Buffer>-CA certificate(s)
certstring | Buffer-Client certificate (mTLS)
keystring | Buffer-Client private key (mTLS)
passphrasestring-Private key passphrase
rejectUnauthorizedbooleantrueValidate broker certificates
servernamestring-SNI server name

Basic TLS

Use system CA certificates:

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['kafka.example.com:9093'],
	tls: { enabled: true },
})

Custom CA Certificate

typescript
import { readFileSync } from 'fs'

const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['kafka.example.com:9093'],
	tls: {
		enabled: true,
		ca: readFileSync('/path/to/ca.pem'),
	},
})

Mutual TLS (mTLS)

Client certificate authentication:

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['kafka.example.com:9093'],
	tls: {
		enabled: true,
		ca: readFileSync('/path/to/ca.pem'),
		cert: readFileSync('/path/to/client.pem'),
		key: readFileSync('/path/to/client-key.pem'),
	},
})

Disable Certificate Verification

Not for Production

Only use this for development/testing with self-signed certificates.

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['localhost:9093'],
	tls: {
		enabled: true,
		rejectUnauthorized: false,
	},
})

Environment Variables

Common pattern for configuration:

typescript
const client = new KafkaClient({
	clientId: process.env.KAFKA_CLIENT_ID || 'my-app',
	brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
	tls: process.env.KAFKA_TLS_ENABLED === 'true' ? { enabled: true } : undefined,
	sasl:
		process.env.KAFKA_SASL_MECHANISM === 'OAUTHBEARER'
			? {
					mechanism: 'OAUTHBEARER',
					oauthBearerProvider: async () => {
						// For Amazon MSK IAM, install the AWS signer:
						//   pnpm add aws-msk-iam-sasl-signer-js
						const { generateAuthToken } = await import('aws-msk-iam-sasl-signer-js')
						const { token } = await generateAuthToken({ region: process.env.AWS_REGION! })
						return { value: token }
					},
				}
			: process.env.KAFKA_SASL_USERNAME
				? {
						mechanism: (process.env.KAFKA_SASL_MECHANISM || 'SCRAM-SHA-256') as 'SCRAM-SHA-256',
						username: process.env.KAFKA_SASL_USERNAME,
						password: process.env.KAFKA_SASL_PASSWORD!,
					}
				: undefined,
})

Broker Configuration

Confluent Cloud

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['xxx.confluent.cloud:9092'],
	tls: { enabled: true },
	sasl: {
		mechanism: 'PLAIN',
		username: process.env.CONFLUENT_API_KEY!,
		password: process.env.CONFLUENT_API_SECRET!,
	},
})

Amazon MSK (IAM)

Amazon MSK IAM can be used via SASL OAUTHBEARER by returning a SigV4-based token from oauthBearerProvider.

References:

typescript
import { KafkaClient } from '@kafkats/client'

async function createMskIamToken(options: { region: string }): Promise<string> {
	// Recommended: use AWS' official MSK IAM SASL signer (Node.js)
	// Install:
	//   pnpm add aws-msk-iam-sasl-signer-js
	// (AWS also documents installing from GitHub:
	//   npm install https://github.com/aws/aws-msk-iam-sasl-signer-js)
	// It also supports fetching creds from a profile or role:
	//   generateAuthTokenFromProfile({ region, awsProfileName })
	//   generateAuthTokenFromRole({ region, awsRoleArn, awsRoleSessionName? })
	const { generateAuthToken } = await import('aws-msk-iam-sasl-signer-js')
	const { token, expiryTime } = await generateAuthToken({ region: options.region })
	// expiryTime is milliseconds since epoch (useful if you want to cache/refresh)
	return token
}

const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['b-1.msk.example.amazonaws.com:9098'],
	tls: { enabled: true },
	sasl: {
		mechanism: 'OAUTHBEARER',
		oauthBearerProvider: async () => ({ value: await createMskIamToken({ region: 'us-east-1' }) }),
	},
})

Token lifetime

MSK IAM tokens are short-lived. Generate them on demand inside oauthBearerProvider rather than hard-coding a static token.

How the token is built

The AWS signer generates a SigV4 presigned URL for the kafka-cluster service with Action=kafka-cluster:Connect, then base64url-encodes it for use as the OAUTHBEARER token.

Redpanda

typescript
const client = new KafkaClient({
	clientId: 'my-app',
	brokers: ['redpanda.example.com:9092'],
	tls: { enabled: true },
	sasl: {
		mechanism: 'SCRAM-SHA-256',
		username: 'user',
		password: 'password',
	},
})

Troubleshooting

Authentication Failed

Error: SASL authentication failed: Invalid credentials
  • Verify username and password
  • Check if the mechanism matches broker configuration
  • Ensure the user has proper ACLs

Connection Refused

Error: Connection refused
  • Check if broker address is correct
  • Verify TLS port (usually 9093) vs plaintext (9092)
  • Check firewall rules

Certificate Errors

Error: unable to verify the first certificate
  • Add the CA certificate to your configuration
  • Or set rejectUnauthorized: false for testing

Released under the MIT License.