Skip to content

SSE Consumer SDK

The @boilstream/consumer JavaScript SDK provides a high-level API for consuming real-time Arrow data from BoilStream's SSE streaming interface.

Open Source

The @boilstream/consumer SDK is open source on GitHub: dforsber/boilstream-consumer-js

Overview

  • Connects to BoilStream's SSE endpoint and decodes Arrow IPC events automatically
  • Works in browsers (native EventSource) and Node.js (with eventsource polyfill)
  • Automatic reconnection with Last-Event-ID for reliable delivery
  • Uses flechette for fast Arrow IPC decoding

Installation

Once published, install via npm:

bash
npm install @boilstream/consumer

The SDK depends on:

  • @uwdata/flechette (Arrow IPC decoding)
  • eventsource (Node.js only — browsers have native EventSource)

Quick Start

javascript
import { BoilStreamConsumer } from "@boilstream/consumer";

const consumer = new BoilStreamConsumer("https://your-server:8443/stream/YOUR_TOKEN", {
  onSchema: (schema) => {
    console.log("Schema fields:", schema.fields.map(f => f.name));
  },
  onBatch: (table, batchSeq) => {
    console.log(`Batch ${batchSeq}: ${table.numRows} rows`);
    // Access columns by name
    const col = table.getChild("my_column");
  },
  onHeartbeat: () => {
    console.log("Connection alive");
  },
  onError: (err) => {
    console.error("Error:", err);
  },
});

// Later: disconnect
consumer.close();

API Reference

new BoilStreamConsumer(url, options)

Creates a consumer and immediately connects to the SSE endpoint.

Parameters:

ParameterTypeDescription
urlstringSSE endpoint URL (https://host:8443/stream/{token})
options.onSchemaFunctionCalled with flechette Schema on schema events
options.onBatchFunctionCalled with (Table, batchSeq) on batch events
options.onHeartbeatFunctionCalled on heartbeat events
options.onErrorFunctionCalled with error on connection/decode errors
options.EventSourceclassEventSource implementation (required in Node.js)

consumer.schema

Returns the current schema (flechette Schema), or null until the first schema event.

consumer.close()

Disconnects from the SSE stream. After calling close(), the consumer will not reconnect.

Browser Usage

Browsers have native EventSource support, so no polyfill is needed:

html
<script type="module">
  import { BoilStreamConsumer } from "@boilstream/consumer";

  const consumer = new BoilStreamConsumer(
    "https://your-server:8443/stream/YOUR_TOKEN",
    {
      onSchema: (schema) => {
        document.getElementById("schema").textContent =
          JSON.stringify(schema.fields.map(f => f.name));
      },
      onBatch: (table, batchSeq) => {
        document.getElementById("rows").textContent =
          `Batch ${batchSeq}: ${table.numRows} rows`;
      },
      onError: (err) => console.error(err),
    }
  );
</script>

Node.js Usage

In Node.js, pass the eventsource package as the EventSource implementation:

javascript
import { BoilStreamConsumer } from "@boilstream/consumer";
import { EventSource } from "eventsource";

const consumer = new BoilStreamConsumer("https://your-server:8443/stream/YOUR_TOKEN", {
  onSchema: (schema) => console.log("Schema:", schema),
  onBatch: (table, batchSeq) => console.log(`Batch ${batchSeq}: ${table.numRows} rows`),
  onError: (err) => console.error(err),
  EventSource, // Required in Node.js
});

TIP

For Node.js with self-signed certificates, set NODE_TLS_REJECT_UNAUTHORIZED=0 in your environment.

Event Flow

Connect to SSE endpoint
    |
    v
[schema event] --> onSchema(schema)
    |
    v
[batch event]  --> onBatch(table, batchSeq)
    |
    v
[heartbeat]    --> onHeartbeat()
    |
    v
[connection drop]
    |
    v
EventSource auto-reconnects with Last-Event-ID
    |
    v
[schema event] --> onSchema(schema)  (fresh schema on reconnect)
    |
    v
[catchup batch events] --> onBatch(table, batchSeq)  (missed batches)
    |
    v
[live batch events] --> onBatch(table, batchSeq)  (new data)

Arrow Decoding

The SDK uses flechette for Arrow IPC decoding. Each SSE event's data field contains base64-encoded Arrow IPC stream bytes. The SDK:

  1. Decodes base64 to Uint8Array (using Buffer in Node.js or atob in browsers)
  2. Passes bytes to tableFromIPC() from flechette
  3. Returns a flechette Table with typed column access

Reconnection Behavior

Reconnection is handled entirely by the browser's EventSource API:

  • On connection drop, EventSource automatically reconnects
  • The Last-Event-ID header is sent with the last received batch sequence number
  • BoilStream replays missed batches, then resumes live streaming
  • A fresh schema event is always sent before catchup batches

No client-side reconnection logic is needed.

See Also