FlightRPC API
BoilStream implements Apache Arrow Flight RPC protocol for high-performance data ingestion and retrieval.
Overview
FlightRPC provides a binary protocol optimized for transferring Arrow columnar data. BoilStream supports:
- Data Ingestion: Stream Arrow RecordBatches directly into topics
- FlightSQL: Query interface for BI tools (implemented)
- Schema Management: Automatic schema validation and evolution
- Authentication: JWT-based authentication via Admin API
Connection
DuckDB Airport Extension
sql
-- Install and load Airport extension
INSTALL airport FROM community;
LOAD airport;
-- Connect to BoilStream
ATTACH 'boilstream' AS bs (
TYPE AIRPORT,
location 'grpc://localhost:50051/'
);
-- Stream data to BoilStream
COPY (SELECT * FROM my_table)
TO 'bs://topic_name';
Protocol Details
- Port: 50051 (default)
- Protocol: gRPC with HTTP/2
- Encoding: Arrow IPC format
- TLS: Optional (recommended for production)
Authentication
Authentication uses JWT tokens obtained from the Admin API:
bash
# Get authentication token
curl -X POST http://localhost:50052/auth \
-H "Content-Type: application/json" \
-d '{"username": "user", "password": "pass"}'
# Use token in Flight calls
# The token is passed in the Flight ticket
Data Ingestion
do_exchange Method
The primary ingestion method for streaming data:
python
# Python example using pyarrow
import pyarrow as pa
import pyarrow.flight as flight
# Connect to BoilStream
client = flight.FlightClient("grpc://localhost:50051")
# Create Arrow table
table = pa.table({
'id': [1, 2, 3],
'value': ['a', 'b', 'c']
})
# Stream to topic
writer, reader = client.do_exchange(
flight.FlightDescriptor.for_path("topic_name")
)
writer.write_table(table)
writer.close()
Schema Validation
- Schemas are validated on first write to a topic
- Subsequent writes must match the established schema
- Schema evolution supported through versioning
FlightSQL Interface
BoilStream implements FlightSQL for SQL query execution:
sql
-- Query via FlightSQL
SELECT * FROM topic_name
WHERE timestamp > now() - interval '1 hour';
Supported Operations
- SELECT: Full SQL query support via DuckDB
- INSERT: Stream data into topics
- Schema Discovery: List topics and schemas
- Metadata: Topic statistics and information
Performance Considerations
Batching
- Optimal batch size: 10,000-100,000 rows
- Use Arrow RecordBatch format for best performance
- Enable compression for network transfer
Connection Pooling
- Reuse FlightClient connections
- Use HTTP/2 multiplexing for concurrent streams
- Configure keep-alive for long-lived connections
Throughput
- Single connection: 500MB/s+
- Multiple connections: 3GB/s+ per node
- Horizontal scaling: Add more BoilStream nodes
Error Handling
Common error codes and their meanings:
Code | Description | Resolution |
---|---|---|
UNAVAILABLE | Service temporarily unavailable | Retry with backoff |
INVALID_ARGUMENT | Schema mismatch or invalid data | Check schema compatibility |
UNAUTHENTICATED | Missing or invalid token | Refresh authentication token |
RESOURCE_EXHAUSTED | Rate limit exceeded | Implement client-side throttling |
Code Examples
Java (with Arrow Flight)
java
FlightClient client = FlightClient.builder()
.location(Location.forGrpcInsecure("localhost", 50051))
.build();
// Stream data
FlightDescriptor descriptor = FlightDescriptor.path("topic_name");
FlightClient.ExchangeReaderWriter exchange = client.doExchange(descriptor);
// Write batches
VectorSchemaRoot root = /* your data */;
exchange.getWriter().putNext(root);
exchange.getWriter().completed();
Rust (with arrow-flight)
rust
use arrow_flight::FlightClient;
let mut client = FlightClient::connect("http://localhost:50051").await?;
// Prepare data
let batch = RecordBatch::try_new(schema, columns)?;
// Stream to BoilStream
let (tx, rx) = client.do_exchange(descriptor).await?;
tx.send(batch).await?;
Monitoring
Monitor FlightRPC performance via Prometheus metrics:
flight_requests_total
: Total number of Flight requestsflight_bytes_received
: Bytes received via Flightflight_duration_seconds
: Request duration histogramflight_active_connections
: Current active connections