SSE Consumer SDK
The @boilstream/consumer JavaScript SDK provides a high-level API for consuming real-time Arrow data from BoilStream's SSE streaming interface.
Open Source
The @boilstream/consumer SDK is open source on GitHub: dforsber/boilstream-consumer-js
Overview
- Connects to BoilStream's SSE endpoint and decodes Arrow IPC events automatically
- Works in browsers (native
EventSource) and Node.js (witheventsourcepolyfill) - Automatic reconnection with
Last-Event-IDfor reliable delivery - Uses flechette for fast Arrow IPC decoding
Installation
Once published, install via npm:
npm install @boilstream/consumerThe SDK depends on:
@uwdata/flechette(Arrow IPC decoding)eventsource(Node.js only — browsers have nativeEventSource)
Quick Start
import { BoilStreamConsumer } from "@boilstream/consumer";
const consumer = new BoilStreamConsumer("https://your-server:8443/stream/YOUR_TOKEN", {
onSchema: (schema) => {
console.log("Schema fields:", schema.fields.map(f => f.name));
},
onBatch: (table, batchSeq) => {
console.log(`Batch ${batchSeq}: ${table.numRows} rows`);
// Access columns by name
const col = table.getChild("my_column");
},
onHeartbeat: () => {
console.log("Connection alive");
},
onError: (err) => {
console.error("Error:", err);
},
});
// Later: disconnect
consumer.close();API Reference
new BoilStreamConsumer(url, options)
Creates a consumer and immediately connects to the SSE endpoint.
Parameters:
| Parameter | Type | Description |
|---|---|---|
url | string | SSE endpoint URL (https://host:8443/stream/{token}) |
options.onSchema | Function | Called with flechette Schema on schema events |
options.onBatch | Function | Called with (Table, batchSeq) on batch events |
options.onHeartbeat | Function | Called on heartbeat events |
options.onError | Function | Called with error on connection/decode errors |
options.EventSource | class | EventSource implementation (required in Node.js) |
consumer.schema
Returns the current schema (flechette Schema), or null until the first schema event.
consumer.close()
Disconnects from the SSE stream. After calling close(), the consumer will not reconnect.
Browser Usage
Browsers have native EventSource support, so no polyfill is needed:
<script type="module">
import { BoilStreamConsumer } from "@boilstream/consumer";
const consumer = new BoilStreamConsumer(
"https://your-server:8443/stream/YOUR_TOKEN",
{
onSchema: (schema) => {
document.getElementById("schema").textContent =
JSON.stringify(schema.fields.map(f => f.name));
},
onBatch: (table, batchSeq) => {
document.getElementById("rows").textContent =
`Batch ${batchSeq}: ${table.numRows} rows`;
},
onError: (err) => console.error(err),
}
);
</script>Node.js Usage
In Node.js, pass the eventsource package as the EventSource implementation:
import { BoilStreamConsumer } from "@boilstream/consumer";
import { EventSource } from "eventsource";
const consumer = new BoilStreamConsumer("https://your-server:8443/stream/YOUR_TOKEN", {
onSchema: (schema) => console.log("Schema:", schema),
onBatch: (table, batchSeq) => console.log(`Batch ${batchSeq}: ${table.numRows} rows`),
onError: (err) => console.error(err),
EventSource, // Required in Node.js
});TIP
For Node.js with self-signed certificates, set NODE_TLS_REJECT_UNAUTHORIZED=0 in your environment.
Event Flow
Connect to SSE endpoint
|
v
[schema event] --> onSchema(schema)
|
v
[batch event] --> onBatch(table, batchSeq)
|
v
[heartbeat] --> onHeartbeat()
|
v
[connection drop]
|
v
EventSource auto-reconnects with Last-Event-ID
|
v
[schema event] --> onSchema(schema) (fresh schema on reconnect)
|
v
[catchup batch events] --> onBatch(table, batchSeq) (missed batches)
|
v
[live batch events] --> onBatch(table, batchSeq) (new data)Arrow Decoding
The SDK uses flechette for Arrow IPC decoding. Each SSE event's data field contains base64-encoded Arrow IPC stream bytes. The SDK:
- Decodes base64 to
Uint8Array(usingBufferin Node.js oratobin browsers) - Passes bytes to
tableFromIPC()from flechette - Returns a flechette
Tablewith typed column access
Reconnection Behavior
Reconnection is handled entirely by the browser's EventSource API:
- On connection drop,
EventSourceautomatically reconnects - The
Last-Event-IDheader is sent with the last received batch sequence number - BoilStream replays missed batches, then resumes live streaming
- A fresh
schemaevent is always sent before catchup batches
No client-side reconnection logic is needed.
See Also
- SSE Consumer API Reference — Protocol details and error codes
- HTTP Ingestion API — Sending data to BoilStream