Kafka Interface
BoilStream provides a Kafka-compatible protocol server that allows Kafka producers to stream data directly into BoilStream topics.
Overview
The Kafka interface enables:
- Drop-in replacement for Kafka in data ingestion pipelines
- Automatic format detection and conversion to Arrow
- Zero-copy processing with SIMD optimizations
- Built-in Schema Registry with Confluent-compatible API
- SASL/PLAIN authentication using web tokens
Connection Details
# Kafka producer configuration
bootstrap.servers=localhost:9092
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<topic_name>" \
password="<web_token>";
# Serializers
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
# BoilStream's built-in Schema Registry
schema.registry.url=https://localhost:443/schema-registrySchema Registry URL
BoilStream includes a built-in Confluent-compatible Schema Registry at /schema-registry on the auth server (default port 443). See Schema Registry API for details.
Authentication
BoilStream's Kafka interface uses SASL/PLAIN authentication with web tokens.
Getting a Token
- Log in to the Web Auth GUI at
https://localhost:443/auth - Navigate to Topics and select your topic
- Generate an Ingest token for Kafka access
SASL Configuration
| Parameter | Value |
|---|---|
security.protocol | SASL_SSL |
sasl.mechanism | PLAIN |
username | Topic name (e.g., my_events) |
password | Web token from dashboard |
Token Expiry
Web tokens have a configurable TTL (default 8 hours). Generate new tokens before expiry or implement token refresh in your application.
Built-in Schema Registry
BoilStream includes a Confluent-compatible Schema Registry. Key characteristics:
- Read-only: Schemas are created via DuckLake
CREATE TABLE, not through the registry API - TopicNameStrategy: Subjects follow
{topic_name}-valuenaming convention - Global Schema IDs: Full Confluent wire format support with global IDs
- No authentication: GET endpoints are public for schema discovery
See Schema Registry API for complete documentation.
How Schemas are Created
Create a table in DuckLake via PostgreSQL interface:
sqlCREATE TABLE my_events ( event_id BIGINT, event_type VARCHAR, timestamp TIMESTAMP, payload JSON );BoilStream automatically:
- Generates an Avro schema from the table definition
- Registers it in the Schema Registry as
my_events-value - Assigns a global schema ID
Kafka producers can then discover the schema:
bashcurl https://localhost:443/schema-registry/subjects/my_events-value/versions/latest
Supported Message Formats
Fully Supported
- Confluent Avro: Binary Avro with Schema Registry integration
- Magic byte:
0x00 - Global schema ID embedded in message (bytes 1-4)
- Automatic schema resolution via built-in registry
- Batch processing supported
- Magic byte:
Placeholder/Partial Support
- Confluent JSON: JSON with Schema Registry (in development)
- Raw JSON: Plain JSON messages
- Raw Text: Plain text messages
Not Yet Supported
- Confluent Protobuf: Protobuf with Schema Registry
- Kafka RecordBatch V2: Native Kafka format
- Raw Binary: Arbitrary binary data
Producer Examples
Java with Confluent Avro
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.avro.generic.GenericRecord;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"my_events\" password=\"<your_web_token>\";");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", KafkaAvroSerializer.class);
props.put("schema.registry.url", "https://localhost:443/schema-registry");
// For self-signed certificates in development
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "changeit");
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
// Create Avro record matching your DuckLake table schema
GenericRecord record = new GenericData.Record(schema);
record.put("event_id", 12345L);
record.put("event_type", "page_view");
record.put("timestamp", System.currentTimeMillis() * 1000); // microseconds
record.put("payload", "{\"page\": \"/home\"}");
ProducerRecord<String, GenericRecord> producerRecord =
new ProducerRecord<>("my_events", record);
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Sent to partition " + metadata.partition());
}
});Python with confluent-kafka
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import time
# Schema Registry configuration (BoilStream's built-in registry)
schema_registry_conf = {
'url': 'https://localhost:443/schema-registry',
# For self-signed certificates in development:
# 'ssl.ca.location': '/path/to/ca.crt'
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
# Get schema from registry (created via DuckLake CREATE TABLE)
# The schema is automatically available after table creation
subject = 'my_events-value'
schema_str = schema_registry_client.get_latest_version(subject).schema.schema_str
# Create Avro serializer
avro_serializer = AvroSerializer(
schema_registry_client,
schema_str
)
# Producer configuration with SASL authentication
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'my_events', # Topic name
'sasl.password': '<your_web_token>', # Token from Web Auth GUI
# For self-signed certificates:
# 'ssl.ca.location': '/path/to/ca.crt'
}
producer = Producer(producer_conf)
# Send event
event = {
'event_id': 12345,
'event_type': 'page_view',
'timestamp': int(time.time() * 1000000), # microseconds
'payload': '{"page": "/home"}'
}
producer.produce(
topic='my_events',
value=avro_serializer(
event,
SerializationContext('my_events', MessageField.VALUE)
)
)
producer.flush()Node.js with KafkaJS and Schema Registry
import { Kafka } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import fs from 'fs';
// Read CA certificate for self-signed certs in development
const ca = fs.readFileSync('/path/to/ca.crt');
// Connect to BoilStream's built-in Schema Registry
const registry = new SchemaRegistry({
host: 'https://localhost:443/schema-registry',
agent: new (require('https').Agent)({ ca: [ca] })
});
// Create Kafka client with SASL authentication
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
ssl: { ca: [ca] },
sasl: {
mechanism: 'plain',
username: 'my_events', // Topic name
password: '<your_web_token>' // Token from Web Auth GUI
}
});
const producer = kafka.producer();
async function produceEvents() {
await producer.connect();
// Get schema ID from registry (schema created via DuckLake CREATE TABLE)
const subject = 'my_events-value';
const schemaId = await registry.getLatestSchemaId(subject);
console.log(`Using schema ID: ${schemaId}`);
// Create event matching your DuckLake table schema
const event = {
event_id: 12345,
event_type: 'page_view',
timestamp: Date.now() * 1000, // microseconds
payload: '{"page": "/home"}'
};
// Encode with Confluent wire format (adds magic byte + schema ID)
const encodedValue = await registry.encode(schemaId, event);
await producer.send({
topic: 'my_events',
messages: [{
key: 'event-key',
value: encodedValue
}]
});
console.log('Message sent successfully');
await producer.disconnect();
}
produceEvents().catch(console.error);Schema Management
Automatic Schema Detection
BoilStream automatically detects the Confluent wire format:
- Checks for magic byte (
0x00) - Extracts global schema ID from bytes 1-4
- Resolves schema via built-in Schema Registry
- Converts Avro to Arrow format using SIMD
Schema Evolution
Schema changes are made through DuckLake ALTER TABLE:
-- Add a new optional column
ALTER TABLE my_events ADD COLUMN user_agent VARCHAR;BoilStream registers the new schema version automatically. Compatibility modes:
- Forward compatibility: Add optional fields (nullable or with defaults)
- Backward compatibility: Remove optional fields
- Full compatibility: Both forward and backward
Schema Source of Truth
In BoilStream, DuckLake tables are the source of truth for schemas. The Schema Registry reflects table definitions - you cannot register schemas directly through the registry API.
Performance Optimization
SIMD Acceleration
The Kafka interface uses SIMD instructions for:
- Avro to Arrow conversion
- Schema validation
- Data type conversions
- Memory operations
Batching
Messages are automatically batched for efficiency:
- Batch size: Up to 10,000 records
- Reduces channel overhead by ~94%
- Improves throughput significantly
Zero-Copy Processing
- Direct memory mapping from Kafka buffers
- No intermediate serialization
- Efficient buffer pooling
Configuration
Enable Kafka Interface
kafka:
enabled: true
port: 9092
bind_address: "0.0.0.0"
tls:
enabled: true
cert_path: "/path/to/kafka.crt"
key_path: "/path/to/kafka.key"Environment Variables
export KAFKA_ENABLED=true
export KAFKA_PORT=9092
export KAFKA_BIND_ADDRESS="0.0.0.0"Monitoring
Kafka-specific metrics available:
kafka_messages_received: Total messages processedkafka_bytes_processed: Total bytes processedkafka_schema_cache_hits: Schema cache efficiencykafka_conversion_duration_seconds: Avro to Arrow conversion timekafka_batch_size: Average batch size
Limitations
Current limitations of the Kafka interface:
- No consumer support: Only producer protocol implemented
- Single partition: All data goes to partition 0
- No transactions: Transactional produces not supported
- Limited formats: Only Confluent Avro fully supported
- No compression: Kafka compression not yet implemented
- Read-only Schema Registry: Schemas created via DuckLake only
Error Handling
Common errors and solutions:
| Error | Cause | Solution |
|---|---|---|
Schema not found | Schema ID not in registry | Create table in DuckLake first |
Invalid magic byte | Unknown message format | Use Confluent Avro format |
Topic not found | Topic doesn't exist | Create table via PostgreSQL interface |
Authentication failed | Invalid token | Get new token from Web Auth GUI |
Token expired | Token TTL exceeded | Generate fresh token |
Migration from Kafka
To migrate from Apache Kafka to BoilStream:
- Create tables in DuckLake matching your Kafka topic schemas
- Get authentication tokens from the Web Auth GUI
- Update producer config:
- Change
bootstrap.serversto BoilStream - Change
schema.registry.urlto BoilStream's registry - Add SASL authentication
- Change
- Test with small batch before full migration
- Monitor metrics during transition
Best Practices
- Use Confluent Avro for best performance and schema management
- Create tables first in DuckLake before producing messages
- Enable batching in your producer for better throughput
- Reuse producer instances for connection pooling
- Monitor schema cache hit rates via metrics
- Implement token refresh before expiry in long-running producers
- Use TLS in production (
SASL_SSLprotocol)