Skip to content

Kafka Interface

BoilStream provides a Kafka-compatible protocol server that allows Kafka producers to stream data directly into BoilStream topics.

Overview

The Kafka interface enables:

  • Drop-in replacement for Kafka in data ingestion pipelines
  • Automatic format detection and conversion to Arrow
  • Zero-copy processing with SIMD optimizations
  • Built-in Schema Registry with Confluent-compatible API
  • SASL/PLAIN authentication using web tokens

Connection Details

properties
# Kafka producer configuration
bootstrap.servers=localhost:9092
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="<topic_name>" \
  password="<web_token>";

# Serializers
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

# BoilStream's built-in Schema Registry
schema.registry.url=https://localhost:443/schema-registry

Schema Registry URL

BoilStream includes a built-in Confluent-compatible Schema Registry at /schema-registry on the auth server (default port 443). See Schema Registry API for details.

Authentication

BoilStream's Kafka interface uses SASL/PLAIN authentication with web tokens.

Getting a Token

  1. Log in to the Web Auth GUI at https://localhost:443/auth
  2. Navigate to Topics and select your topic
  3. Generate an Ingest token for Kafka access

SASL Configuration

ParameterValue
security.protocolSASL_SSL
sasl.mechanismPLAIN
usernameTopic name (e.g., my_events)
passwordWeb token from dashboard

Token Expiry

Web tokens have a configurable TTL (default 8 hours). Generate new tokens before expiry or implement token refresh in your application.

Built-in Schema Registry

BoilStream includes a Confluent-compatible Schema Registry. Key characteristics:

  • Read-only: Schemas are created via DuckLake CREATE TABLE, not through the registry API
  • TopicNameStrategy: Subjects follow {topic_name}-value naming convention
  • Global Schema IDs: Full Confluent wire format support with global IDs
  • No authentication: GET endpoints are public for schema discovery

See Schema Registry API for complete documentation.

How Schemas are Created

  1. Create a table in DuckLake via PostgreSQL interface:

    sql
    CREATE TABLE my_events (
        event_id BIGINT,
        event_type VARCHAR,
        timestamp TIMESTAMP,
        payload JSON
    );
  2. BoilStream automatically:

    • Generates an Avro schema from the table definition
    • Registers it in the Schema Registry as my_events-value
    • Assigns a global schema ID
  3. Kafka producers can then discover the schema:

    bash
    curl https://localhost:443/schema-registry/subjects/my_events-value/versions/latest

Supported Message Formats

Fully Supported

  • Confluent Avro: Binary Avro with Schema Registry integration
    • Magic byte: 0x00
    • Global schema ID embedded in message (bytes 1-4)
    • Automatic schema resolution via built-in registry
    • Batch processing supported

Placeholder/Partial Support

  • Confluent JSON: JSON with Schema Registry (in development)
  • Raw JSON: Plain JSON messages
  • Raw Text: Plain text messages

Not Yet Supported

  • Confluent Protobuf: Protobuf with Schema Registry
  • Kafka RecordBatch V2: Native Kafka format
  • Raw Binary: Arbitrary binary data

Producer Examples

Java with Confluent Avro

java
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.avro.generic.GenericRecord;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"my_events\" password=\"<your_web_token>\";");

props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", KafkaAvroSerializer.class);
props.put("schema.registry.url", "https://localhost:443/schema-registry");

// For self-signed certificates in development
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "changeit");

Producer<String, GenericRecord> producer = new KafkaProducer<>(props);

// Create Avro record matching your DuckLake table schema
GenericRecord record = new GenericData.Record(schema);
record.put("event_id", 12345L);
record.put("event_type", "page_view");
record.put("timestamp", System.currentTimeMillis() * 1000); // microseconds
record.put("payload", "{\"page\": \"/home\"}");

ProducerRecord<String, GenericRecord> producerRecord =
    new ProducerRecord<>("my_events", record);

producer.send(producerRecord, (metadata, exception) -> {
    if (exception != null) {
        exception.printStackTrace();
    } else {
        System.out.println("Sent to partition " + metadata.partition());
    }
});

Python with confluent-kafka

python
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import time

# Schema Registry configuration (BoilStream's built-in registry)
schema_registry_conf = {
    'url': 'https://localhost:443/schema-registry',
    # For self-signed certificates in development:
    # 'ssl.ca.location': '/path/to/ca.crt'
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Get schema from registry (created via DuckLake CREATE TABLE)
# The schema is automatically available after table creation
subject = 'my_events-value'
schema_str = schema_registry_client.get_latest_version(subject).schema.schema_str

# Create Avro serializer
avro_serializer = AvroSerializer(
    schema_registry_client,
    schema_str
)

# Producer configuration with SASL authentication
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'my_events',  # Topic name
    'sasl.password': '<your_web_token>',  # Token from Web Auth GUI
    # For self-signed certificates:
    # 'ssl.ca.location': '/path/to/ca.crt'
}

producer = Producer(producer_conf)

# Send event
event = {
    'event_id': 12345,
    'event_type': 'page_view',
    'timestamp': int(time.time() * 1000000),  # microseconds
    'payload': '{"page": "/home"}'
}

producer.produce(
    topic='my_events',
    value=avro_serializer(
        event,
        SerializationContext('my_events', MessageField.VALUE)
    )
)

producer.flush()

Node.js with KafkaJS and Schema Registry

javascript
import { Kafka } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import fs from 'fs';

// Read CA certificate for self-signed certs in development
const ca = fs.readFileSync('/path/to/ca.crt');

// Connect to BoilStream's built-in Schema Registry
const registry = new SchemaRegistry({
  host: 'https://localhost:443/schema-registry',
  agent: new (require('https').Agent)({ ca: [ca] })
});

// Create Kafka client with SASL authentication
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
  ssl: { ca: [ca] },
  sasl: {
    mechanism: 'plain',
    username: 'my_events',  // Topic name
    password: '<your_web_token>'  // Token from Web Auth GUI
  }
});

const producer = kafka.producer();

async function produceEvents() {
  await producer.connect();

  // Get schema ID from registry (schema created via DuckLake CREATE TABLE)
  const subject = 'my_events-value';
  const schemaId = await registry.getLatestSchemaId(subject);
  console.log(`Using schema ID: ${schemaId}`);

  // Create event matching your DuckLake table schema
  const event = {
    event_id: 12345,
    event_type: 'page_view',
    timestamp: Date.now() * 1000,  // microseconds
    payload: '{"page": "/home"}'
  };

  // Encode with Confluent wire format (adds magic byte + schema ID)
  const encodedValue = await registry.encode(schemaId, event);

  await producer.send({
    topic: 'my_events',
    messages: [{
      key: 'event-key',
      value: encodedValue
    }]
  });

  console.log('Message sent successfully');
  await producer.disconnect();
}

produceEvents().catch(console.error);

Schema Management

Automatic Schema Detection

BoilStream automatically detects the Confluent wire format:

  1. Checks for magic byte (0x00)
  2. Extracts global schema ID from bytes 1-4
  3. Resolves schema via built-in Schema Registry
  4. Converts Avro to Arrow format using SIMD

Schema Evolution

Schema changes are made through DuckLake ALTER TABLE:

sql
-- Add a new optional column
ALTER TABLE my_events ADD COLUMN user_agent VARCHAR;

BoilStream registers the new schema version automatically. Compatibility modes:

  • Forward compatibility: Add optional fields (nullable or with defaults)
  • Backward compatibility: Remove optional fields
  • Full compatibility: Both forward and backward

Schema Source of Truth

In BoilStream, DuckLake tables are the source of truth for schemas. The Schema Registry reflects table definitions - you cannot register schemas directly through the registry API.

Performance Optimization

SIMD Acceleration

The Kafka interface uses SIMD instructions for:

  • Avro to Arrow conversion
  • Schema validation
  • Data type conversions
  • Memory operations

Batching

Messages are automatically batched for efficiency:

  • Batch size: Up to 10,000 records
  • Reduces channel overhead by ~94%
  • Improves throughput significantly

Zero-Copy Processing

  • Direct memory mapping from Kafka buffers
  • No intermediate serialization
  • Efficient buffer pooling

Configuration

Enable Kafka Interface

yaml
kafka:
  enabled: true
  port: 9092
  bind_address: "0.0.0.0"
  tls:
    enabled: true
    cert_path: "/path/to/kafka.crt"
    key_path: "/path/to/kafka.key"

Environment Variables

bash
export KAFKA_ENABLED=true
export KAFKA_PORT=9092
export KAFKA_BIND_ADDRESS="0.0.0.0"

Monitoring

Kafka-specific metrics available:

  • kafka_messages_received: Total messages processed
  • kafka_bytes_processed: Total bytes processed
  • kafka_schema_cache_hits: Schema cache efficiency
  • kafka_conversion_duration_seconds: Avro to Arrow conversion time
  • kafka_batch_size: Average batch size

Limitations

Current limitations of the Kafka interface:

  1. No consumer support: Only producer protocol implemented
  2. Single partition: All data goes to partition 0
  3. No transactions: Transactional produces not supported
  4. Limited formats: Only Confluent Avro fully supported
  5. No compression: Kafka compression not yet implemented
  6. Read-only Schema Registry: Schemas created via DuckLake only

Error Handling

Common errors and solutions:

ErrorCauseSolution
Schema not foundSchema ID not in registryCreate table in DuckLake first
Invalid magic byteUnknown message formatUse Confluent Avro format
Topic not foundTopic doesn't existCreate table via PostgreSQL interface
Authentication failedInvalid tokenGet new token from Web Auth GUI
Token expiredToken TTL exceededGenerate fresh token

Migration from Kafka

To migrate from Apache Kafka to BoilStream:

  1. Create tables in DuckLake matching your Kafka topic schemas
  2. Get authentication tokens from the Web Auth GUI
  3. Update producer config:
    • Change bootstrap.servers to BoilStream
    • Change schema.registry.url to BoilStream's registry
    • Add SASL authentication
  4. Test with small batch before full migration
  5. Monitor metrics during transition

Best Practices

  1. Use Confluent Avro for best performance and schema management
  2. Create tables first in DuckLake before producing messages
  3. Enable batching in your producer for better throughput
  4. Reuse producer instances for connection pooling
  5. Monitor schema cache hit rates via metrics
  6. Implement token refresh before expiry in long-running producers
  7. Use TLS in production (SASL_SSL protocol)