FlightRPC API
BoilStream implements Apache Arrow Flight RPC protocol for high-performance data ingestion.
Overview
FlightRPC provides a binary protocol optimized for transferring Arrow columnar data:
- Data Ingestion: Stream Arrow RecordBatches into DuckLake tables
- Schema Management: Automatic schema validation and evolution
- Zero-Copy: Direct memory transfer with Arrow IPC format
- Authentication: Bearer token via Web Auth GUI
Connection
DuckDB Airport Extension
sql
-- Install and load Airport extension
INSTALL airport FROM community;
LOAD airport;
-- Connect to BoilStream (without TLS)
ATTACH 'my_ducklake' (TYPE AIRPORT, location 'grpc://localhost:50051/');
-- Connect with TLS
ATTACH 'my_ducklake' (TYPE AIRPORT, location 'grpc+tls://localhost:50051/');
-- Create table and insert data
CREATE TABLE my_ducklake.main.events (
event_id BIGINT,
event_type VARCHAR,
timestamp TIMESTAMP
);
INSERT INTO my_ducklake.main.events VALUES (1, 'click', NOW());Protocol Details
| Setting | Value |
|---|---|
| Port | 50051 (default) |
| Protocol | gRPC with HTTP/2 |
| Encoding | Arrow IPC format |
| TLS | Optional (set FLIGHT_TLS_ENABLED=true) |
Authentication
When authentication is enabled, pass a Bearer token from the Web Auth GUI:
python
import pyarrow.flight as flight
# Get bootstrap token from Web Auth GUI
token = "your_bootstrap_token"
client = flight.FlightClient("grpc://localhost:50051")
options = flight.FlightCallOptions(headers=[(b"authorization", f"Bearer {token}".encode())])
# Use options in Flight calls
info = client.get_flight_info(descriptor, options)Token Types
Use Bootstrap tokens from the Web Auth GUI for FlightRPC authentication. These tokens are designed for DuckDB extension and programmatic access.
Data Ingestion
do_exchange Method
Stream data using Arrow Flight's bidirectional exchange:
python
import pyarrow as pa
import pyarrow.flight as flight
client = flight.FlightClient("grpc://localhost:50051")
# Create Arrow table
table = pa.table({
'event_id': [1, 2, 3],
'event_type': ['click', 'view', 'purchase'],
'timestamp': [pa.scalar(x, type=pa.timestamp('us')) for x in [1704067200000000, 1704067201000000, 1704067202000000]]
})
# Stream to topic via do_exchange
descriptor = flight.FlightDescriptor.for_path("my_ducklake", "main", "events")
writer, reader = client.do_exchange(descriptor)
writer.write_table(table)
writer.close()Schema Validation
- First write establishes the table schema
- Subsequent writes must match the schema
- Schema evolution via
ALTER TABLEin PostgreSQL interface
Code Examples
Java
java
import org.apache.arrow.flight.*;
FlightClient client = FlightClient.builder()
.location(Location.forGrpcInsecure("localhost", 50051))
.build();
FlightDescriptor descriptor = FlightDescriptor.path("my_ducklake", "main", "events");
FlightClient.ExchangeReaderWriter exchange = client.doExchange(descriptor);
VectorSchemaRoot root = /* your Arrow data */;
exchange.getWriter().putNext(root);
exchange.getWriter().completed();Rust
rust
use arrow_flight::FlightClient;
let mut client = FlightClient::connect("http://localhost:50051").await?;
let descriptor = FlightDescriptor::path(vec!["my_ducklake", "main", "events"]);
let batch = RecordBatch::try_new(schema, columns)?;
let (mut tx, rx) = client.do_exchange(descriptor).await?;
tx.send(batch).await?;Performance
| Metric | Value |
|---|---|
| Single connection | 500+ MB/s |
| Multiple connections | 3+ GB/s per node |
| Optimal batch size | 10,000-100,000 rows |
Best Practices
- Reuse
FlightClientconnections - Batch data into 10k-100k row RecordBatches
- Use HTTP/2 multiplexing for concurrent streams
- Enable TLS in production
Error Handling
| Code | Description | Resolution |
|---|---|---|
| UNAVAILABLE | Service unavailable | Retry with exponential backoff |
| INVALID_ARGUMENT | Schema mismatch | Check schema compatibility |
| UNAUTHENTICATED | Missing/invalid token | Get fresh token from Web Auth GUI |
| RESOURCE_EXHAUSTED | Rate limit exceeded | Reduce request rate |
Monitoring
Prometheus metrics at port 8081:
flight_requests_total- Total Flight requestsflight_bytes_received- Bytes receivedflight_duration_seconds- Request duration histogramflight_active_connections- Current connections