Skip to content

FlightRPC API

BoilStream implements Apache Arrow Flight RPC protocol for high-performance data ingestion and retrieval.

Overview

FlightRPC provides a binary protocol optimized for transferring Arrow columnar data. BoilStream supports:

  • Data Ingestion: Stream Arrow RecordBatches directly into topics
  • FlightSQL: Query interface for BI tools (implemented)
  • Schema Management: Automatic schema validation and evolution
  • Authentication: JWT-based authentication via Admin API

Connection

DuckDB Airport Extension

sql
-- Install and load Airport extension
INSTALL airport FROM community;
LOAD airport;

-- Connect to BoilStream
ATTACH 'boilstream' AS bs (
    TYPE AIRPORT,
    location 'grpc://localhost:50051/'
);

-- Stream data to BoilStream
COPY (SELECT * FROM my_table) 
TO 'bs://topic_name';

Protocol Details

  • Port: 50051 (default)
  • Protocol: gRPC with HTTP/2
  • Encoding: Arrow IPC format
  • TLS: Optional (recommended for production)

Authentication

Authentication uses JWT tokens obtained from the Admin API:

bash
# Get authentication token
curl -X POST http://localhost:50052/auth \
  -H "Content-Type: application/json" \
  -d '{"username": "user", "password": "pass"}'

# Use token in Flight calls
# The token is passed in the Flight ticket

Data Ingestion

do_exchange Method

The primary ingestion method for streaming data:

python
# Python example using pyarrow
import pyarrow as pa
import pyarrow.flight as flight

# Connect to BoilStream
client = flight.FlightClient("grpc://localhost:50051")

# Create Arrow table
table = pa.table({
    'id': [1, 2, 3],
    'value': ['a', 'b', 'c']
})

# Stream to topic
writer, reader = client.do_exchange(
    flight.FlightDescriptor.for_path("topic_name")
)
writer.write_table(table)
writer.close()

Schema Validation

  • Schemas are validated on first write to a topic
  • Subsequent writes must match the established schema
  • Schema evolution supported through versioning

FlightSQL Interface

BoilStream implements FlightSQL for SQL query execution:

sql
-- Query via FlightSQL
SELECT * FROM topic_name 
WHERE timestamp > now() - interval '1 hour';

Supported Operations

  • SELECT: Full SQL query support via DuckDB
  • INSERT: Stream data into topics
  • Schema Discovery: List topics and schemas
  • Metadata: Topic statistics and information

Performance Considerations

Batching

  • Optimal batch size: 10,000-100,000 rows
  • Use Arrow RecordBatch format for best performance
  • Enable compression for network transfer

Connection Pooling

  • Reuse FlightClient connections
  • Use HTTP/2 multiplexing for concurrent streams
  • Configure keep-alive for long-lived connections

Throughput

  • Single connection: 500MB/s+
  • Multiple connections: 3GB/s+ per node
  • Horizontal scaling: Add more BoilStream nodes

Error Handling

Common error codes and their meanings:

CodeDescriptionResolution
UNAVAILABLEService temporarily unavailableRetry with backoff
INVALID_ARGUMENTSchema mismatch or invalid dataCheck schema compatibility
UNAUTHENTICATEDMissing or invalid tokenRefresh authentication token
RESOURCE_EXHAUSTEDRate limit exceededImplement client-side throttling

Code Examples

Java (with Arrow Flight)

java
FlightClient client = FlightClient.builder()
    .location(Location.forGrpcInsecure("localhost", 50051))
    .build();

// Stream data
FlightDescriptor descriptor = FlightDescriptor.path("topic_name");
FlightClient.ExchangeReaderWriter exchange = client.doExchange(descriptor);

// Write batches
VectorSchemaRoot root = /* your data */;
exchange.getWriter().putNext(root);
exchange.getWriter().completed();

Rust (with arrow-flight)

rust
use arrow_flight::FlightClient;

let mut client = FlightClient::connect("http://localhost:50051").await?;

// Prepare data
let batch = RecordBatch::try_new(schema, columns)?;

// Stream to BoilStream
let (tx, rx) = client.do_exchange(descriptor).await?;
tx.send(batch).await?;

Monitoring

Monitor FlightRPC performance via Prometheus metrics:

  • flight_requests_total: Total number of Flight requests
  • flight_bytes_received: Bytes received via Flight
  • flight_duration_seconds: Request duration histogram
  • flight_active_connections: Current active connections