Skip to content

FlightRPC API

BoilStream implements Apache Arrow Flight RPC protocol for high-performance data ingestion.

Overview

FlightRPC provides a binary protocol optimized for transferring Arrow columnar data:

  • Data Ingestion: Stream Arrow RecordBatches into DuckLake tables
  • Schema Management: Automatic schema validation and evolution
  • Zero-Copy: Direct memory transfer with Arrow IPC format
  • Authentication: Bearer token via Web Auth GUI

Connection

DuckDB Airport Extension

sql
-- Install and load Airport extension
INSTALL airport FROM community;
LOAD airport;

-- Connect to BoilStream (without TLS)
ATTACH 'my_ducklake' (TYPE AIRPORT, location 'grpc://localhost:50051/');

-- Connect with TLS
ATTACH 'my_ducklake' (TYPE AIRPORT, location 'grpc+tls://localhost:50051/');

-- Create table and insert data
CREATE TABLE my_ducklake.main.events (
    event_id BIGINT,
    event_type VARCHAR,
    timestamp TIMESTAMP
);

INSERT INTO my_ducklake.main.events VALUES (1, 'click', NOW());

Protocol Details

SettingValue
Port50051 (default)
ProtocolgRPC with HTTP/2
EncodingArrow IPC format
TLSOptional (set FLIGHT_TLS_ENABLED=true)

Authentication

When authentication is enabled, pass a Bearer token from the Web Auth GUI:

python
import pyarrow.flight as flight

# Get bootstrap token from Web Auth GUI
token = "your_bootstrap_token"

client = flight.FlightClient("grpc://localhost:50051")
options = flight.FlightCallOptions(headers=[(b"authorization", f"Bearer {token}".encode())])

# Use options in Flight calls
info = client.get_flight_info(descriptor, options)

Token Types

Use Bootstrap tokens from the Web Auth GUI for FlightRPC authentication. These tokens are designed for DuckDB extension and programmatic access.

Data Ingestion

do_exchange Method

Stream data using Arrow Flight's bidirectional exchange:

python
import pyarrow as pa
import pyarrow.flight as flight

client = flight.FlightClient("grpc://localhost:50051")

# Create Arrow table
table = pa.table({
    'event_id': [1, 2, 3],
    'event_type': ['click', 'view', 'purchase'],
    'timestamp': [pa.scalar(x, type=pa.timestamp('us')) for x in [1704067200000000, 1704067201000000, 1704067202000000]]
})

# Stream to topic via do_exchange
descriptor = flight.FlightDescriptor.for_path("my_ducklake", "main", "events")
writer, reader = client.do_exchange(descriptor)
writer.write_table(table)
writer.close()

Schema Validation

  • First write establishes the table schema
  • Subsequent writes must match the schema
  • Schema evolution via ALTER TABLE in PostgreSQL interface

Code Examples

Java

java
import org.apache.arrow.flight.*;

FlightClient client = FlightClient.builder()
    .location(Location.forGrpcInsecure("localhost", 50051))
    .build();

FlightDescriptor descriptor = FlightDescriptor.path("my_ducklake", "main", "events");
FlightClient.ExchangeReaderWriter exchange = client.doExchange(descriptor);

VectorSchemaRoot root = /* your Arrow data */;
exchange.getWriter().putNext(root);
exchange.getWriter().completed();

Rust

rust
use arrow_flight::FlightClient;

let mut client = FlightClient::connect("http://localhost:50051").await?;
let descriptor = FlightDescriptor::path(vec!["my_ducklake", "main", "events"]);

let batch = RecordBatch::try_new(schema, columns)?;
let (mut tx, rx) = client.do_exchange(descriptor).await?;
tx.send(batch).await?;

Performance

MetricValue
Single connection500+ MB/s
Multiple connections3+ GB/s per node
Optimal batch size10,000-100,000 rows

Best Practices

  • Reuse FlightClient connections
  • Batch data into 10k-100k row RecordBatches
  • Use HTTP/2 multiplexing for concurrent streams
  • Enable TLS in production

Error Handling

CodeDescriptionResolution
UNAVAILABLEService unavailableRetry with exponential backoff
INVALID_ARGUMENTSchema mismatchCheck schema compatibility
UNAUTHENTICATEDMissing/invalid tokenGet fresh token from Web Auth GUI
RESOURCE_EXHAUSTEDRate limit exceededReduce request rate

Monitoring

Prometheus metrics at port 8081:

  • flight_requests_total - Total Flight requests
  • flight_bytes_received - Bytes received
  • flight_duration_seconds - Request duration histogram
  • flight_active_connections - Current connections