Skip to content

Kafka Interface

BoilStream provides a Kafka-compatible protocol server that allows Kafka producers to stream data directly into BoilStream topics.

Overview

The Kafka interface enables:

  • Drop-in replacement for Kafka in data ingestion pipelines
  • Automatic format detection and conversion to Arrow
  • Zero-copy processing with SIMD optimizations
  • Schema Registry integration for Confluent Avro format

Connection Details

properties
# Kafka producer configuration
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

# For Confluent Schema Registry format
schema.registry.url=http://localhost:8081

Supported Message Formats

Fully Supported

  • Confluent Avro: Binary Avro with Schema Registry integration
    • Magic byte: 0x00
    • Schema ID embedded in message
    • Automatic schema resolution
    • Batch processing supported

Placeholder/Partial Support

  • Confluent JSON: JSON with Schema Registry (in development)
  • Raw JSON: Plain JSON messages
  • Raw Text: Plain text messages

Not Yet Supported

  • Confluent Protobuf: Protobuf with Schema Registry
  • Kafka RecordBatch V2: Native Kafka format
  • Raw Binary: Arbitrary binary data

Producer Examples

Java with Confluent Avro

java
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.avro.generic.GenericRecord;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

Producer<String, GenericRecord> producer = new KafkaProducer<>(props);

// Create Avro record
GenericRecord record = new GenericData.Record(schema);
record.put("timestamp", System.currentTimeMillis());
record.put("event", "page_view");
record.put("user_id", 12345);

// Send to BoilStream
ProducerRecord<String, GenericRecord> producerRecord = 
    new ProducerRecord<>("events", record);
    
producer.send(producerRecord, (metadata, exception) -> {
    if (exception != null) {
        exception.printStackTrace();
    } else {
        System.out.println("Sent to partition " + metadata.partition());
    }
});

Python with confluent-kafka

python
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

# Schema Registry configuration
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Avro schema
schema_str = """
{
  "type": "record",
  "name": "Event",
  "fields": [
    {"name": "timestamp", "type": "long"},
    {"name": "event", "type": "string"},
    {"name": "user_id", "type": "int"}
  ]
}
"""

# Create Avro serializer
avro_serializer = AvroSerializer(
    schema_registry_client,
    schema_str
)

# Producer configuration
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
}

producer = Producer(producer_conf)

# Send event
event = {
    'timestamp': int(time.time() * 1000),
    'event': 'page_view',
    'user_id': 12345
}

producer.produce(
    topic='events',
    value=avro_serializer(
        event, 
        SerializationContext('events', MessageField.VALUE)
    )
)

producer.flush()

Node.js with Kafka.js

javascript
const { Kafka } = require('kafkajs');
const avro = require('avsc');

// Define Avro schema
const type = avro.Type.forSchema({
  type: 'record',
  name: 'Event',
  fields: [
    { name: 'timestamp', type: 'long' },
    { name: 'event', type: 'string' },
    { name: 'user_id', type: 'int' }
  ]
});

// Create Kafka client
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const producer = kafka.producer();

async function produceEvents() {
  await producer.connect();
  
  // Create event
  const event = {
    timestamp: Date.now(),
    event: 'page_view',
    user_id: 12345
  };
  
  // Serialize to Avro
  const buffer = type.toBuffer(event);
  
  // Add Confluent wire format header (if using Schema Registry)
  const message = Buffer.concat([
    Buffer.from([0x00]),  // Magic byte
    Buffer.from([0x00, 0x00, 0x00, 0x01]),  // Schema ID (example)
    buffer
  ]);
  
  await producer.send({
    topic: 'events',
    messages: [
      { value: message }
    ]
  });
  
  await producer.disconnect();
}

Schema Management

Automatic Schema Detection

BoilStream automatically detects the message format:

  1. Checks for Confluent magic byte (0x00)
  2. Extracts schema ID from bytes 1-4
  3. Resolves schema from cache or registry
  4. Converts Avro to Arrow format

Schema Evolution

  • Forward compatibility: Add optional fields
  • Backward compatibility: Remove optional fields
  • Full compatibility: Both forward and backward

Performance Optimization

SIMD Acceleration

The Kafka interface uses SIMD instructions for:

  • Avro to Arrow conversion
  • Schema validation
  • Data type conversions
  • Memory operations

Batching

Messages are automatically batched for efficiency:

  • Batch size: Up to 10,000 records
  • Reduces channel overhead by ~94%
  • Improves throughput significantly

Zero-Copy Processing

  • Direct memory mapping from Kafka buffers
  • No intermediate serialization
  • Efficient buffer pooling

Configuration

Enable Kafka Interface

yaml
kafka:
  enabled: true
  port: 9092
  bind_address: "0.0.0.0"

Environment Variables

bash
export KAFKA_ENABLED=true
export KAFKA_PORT=9092
export KAFKA_BIND_ADDRESS="0.0.0.0"

Monitoring

Kafka-specific metrics available:

  • kafka_messages_received: Total messages processed
  • kafka_bytes_processed: Total bytes processed
  • kafka_schema_cache_hits: Schema cache efficiency
  • kafka_conversion_duration_seconds: Avro to Arrow conversion time
  • kafka_batch_size: Average batch size

Limitations

Current limitations of the Kafka interface:

  1. No consumer support: Only producer protocol implemented
  2. Single partition: All data goes to partition 0
  3. No transactions: Transactional produces not supported
  4. Limited formats: Only Confluent Avro fully supported
  5. No compression: Kafka compression not yet implemented

Error Handling

Common errors and solutions:

ErrorCauseSolution
Schema not foundSchema ID not in cacheEnsure schema is registered
Invalid magic byteUnknown message formatUse supported format
Topic not foundTopic doesn't exist in BoilStreamCreate topic first
Rate limit exceededToo many requestsImplement client-side throttling

Migration from Kafka

To migrate from Apache Kafka to BoilStream:

  1. Update broker address to point to BoilStream
  2. Keep serializers unchanged (especially for Avro)
  3. Create topics in BoilStream matching Kafka topics
  4. Test with small batch before full migration
  5. Monitor metrics during transition

Best Practices

  1. Use Confluent Avro for best performance
  2. Enable batching in your producer
  3. Reuse producer instances for connection pooling
  4. Monitor schema cache hit rates
  5. Set appropriate buffer sizes for your message size