Kafka Interface
BoilStream provides a Kafka-compatible protocol server that allows Kafka producers to stream data directly into BoilStream topics.
Overview
The Kafka interface enables:
- Drop-in replacement for Kafka in data ingestion pipelines
- Automatic format detection and conversion to Arrow
- Zero-copy processing with SIMD optimizations
- Schema Registry integration for Confluent Avro format
Connection Details
properties
# Kafka producer configuration
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
# For Confluent Schema Registry format
schema.registry.url=http://localhost:8081
Supported Message Formats
Fully Supported
- Confluent Avro: Binary Avro with Schema Registry integration
- Magic byte:
0x00
- Schema ID embedded in message
- Automatic schema resolution
- Batch processing supported
- Magic byte:
Placeholder/Partial Support
- Confluent JSON: JSON with Schema Registry (in development)
- Raw JSON: Plain JSON messages
- Raw Text: Plain text messages
Not Yet Supported
- Confluent Protobuf: Protobuf with Schema Registry
- Kafka RecordBatch V2: Native Kafka format
- Raw Binary: Arbitrary binary data
Producer Examples
Java with Confluent Avro
java
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.avro.generic.GenericRecord;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
// Create Avro record
GenericRecord record = new GenericData.Record(schema);
record.put("timestamp", System.currentTimeMillis());
record.put("event", "page_view");
record.put("user_id", 12345);
// Send to BoilStream
ProducerRecord<String, GenericRecord> producerRecord =
new ProducerRecord<>("events", record);
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Sent to partition " + metadata.partition());
}
});
Python with confluent-kafka
python
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
# Schema Registry configuration
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
# Avro schema
schema_str = """
{
"type": "record",
"name": "Event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "event", "type": "string"},
{"name": "user_id", "type": "int"}
]
}
"""
# Create Avro serializer
avro_serializer = AvroSerializer(
schema_registry_client,
schema_str
)
# Producer configuration
producer_conf = {
'bootstrap.servers': 'localhost:9092',
}
producer = Producer(producer_conf)
# Send event
event = {
'timestamp': int(time.time() * 1000),
'event': 'page_view',
'user_id': 12345
}
producer.produce(
topic='events',
value=avro_serializer(
event,
SerializationContext('events', MessageField.VALUE)
)
)
producer.flush()
Node.js with Kafka.js
javascript
const { Kafka } = require('kafkajs');
const avro = require('avsc');
// Define Avro schema
const type = avro.Type.forSchema({
type: 'record',
name: 'Event',
fields: [
{ name: 'timestamp', type: 'long' },
{ name: 'event', type: 'string' },
{ name: 'user_id', type: 'int' }
]
});
// Create Kafka client
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
async function produceEvents() {
await producer.connect();
// Create event
const event = {
timestamp: Date.now(),
event: 'page_view',
user_id: 12345
};
// Serialize to Avro
const buffer = type.toBuffer(event);
// Add Confluent wire format header (if using Schema Registry)
const message = Buffer.concat([
Buffer.from([0x00]), // Magic byte
Buffer.from([0x00, 0x00, 0x00, 0x01]), // Schema ID (example)
buffer
]);
await producer.send({
topic: 'events',
messages: [
{ value: message }
]
});
await producer.disconnect();
}
Schema Management
Automatic Schema Detection
BoilStream automatically detects the message format:
- Checks for Confluent magic byte (
0x00
) - Extracts schema ID from bytes 1-4
- Resolves schema from cache or registry
- Converts Avro to Arrow format
Schema Evolution
- Forward compatibility: Add optional fields
- Backward compatibility: Remove optional fields
- Full compatibility: Both forward and backward
Performance Optimization
SIMD Acceleration
The Kafka interface uses SIMD instructions for:
- Avro to Arrow conversion
- Schema validation
- Data type conversions
- Memory operations
Batching
Messages are automatically batched for efficiency:
- Batch size: Up to 10,000 records
- Reduces channel overhead by ~94%
- Improves throughput significantly
Zero-Copy Processing
- Direct memory mapping from Kafka buffers
- No intermediate serialization
- Efficient buffer pooling
Configuration
Enable Kafka Interface
yaml
kafka:
enabled: true
port: 9092
bind_address: "0.0.0.0"
Environment Variables
bash
export KAFKA_ENABLED=true
export KAFKA_PORT=9092
export KAFKA_BIND_ADDRESS="0.0.0.0"
Monitoring
Kafka-specific metrics available:
kafka_messages_received
: Total messages processedkafka_bytes_processed
: Total bytes processedkafka_schema_cache_hits
: Schema cache efficiencykafka_conversion_duration_seconds
: Avro to Arrow conversion timekafka_batch_size
: Average batch size
Limitations
Current limitations of the Kafka interface:
- No consumer support: Only producer protocol implemented
- Single partition: All data goes to partition 0
- No transactions: Transactional produces not supported
- Limited formats: Only Confluent Avro fully supported
- No compression: Kafka compression not yet implemented
Error Handling
Common errors and solutions:
Error | Cause | Solution |
---|---|---|
Schema not found | Schema ID not in cache | Ensure schema is registered |
Invalid magic byte | Unknown message format | Use supported format |
Topic not found | Topic doesn't exist in BoilStream | Create topic first |
Rate limit exceeded | Too many requests | Implement client-side throttling |
Migration from Kafka
To migrate from Apache Kafka to BoilStream:
- Update broker address to point to BoilStream
- Keep serializers unchanged (especially for Avro)
- Create topics in BoilStream matching Kafka topics
- Test with small batch before full migration
- Monitor metrics during transition
Best Practices
- Use Confluent Avro for best performance
- Enable batching in your producer
- Reuse producer instances for connection pooling
- Monitor schema cache hit rates
- Set appropriate buffer sizes for your message size