Skip to content

Schema Registry API

BoilStream includes a built-in Confluent-compatible Schema Registry that provides schema discovery for Kafka producers.

Overview

The Schema Registry is:

  • Confluent-compatible: Implements the standard Confluent Schema Registry REST API
  • Read-only: Schemas are created through DuckLake table definitions, not via the API
  • Automatically enabled: Available on the auth server at /schema-registry
  • No authentication required: All GET endpoints are public for schema discovery

Base URL

https://localhost:443/schema-registry

The Schema Registry runs on the same port as the auth server (default 443) under the /schema-registry path.

How Schemas are Managed

Unlike traditional Confluent Schema Registry where clients register schemas, BoilStream takes a table-first approach:

  1. Create a table in DuckLake via the PostgreSQL interface:

    sql
    CREATE TABLE events (
        event_id BIGINT,
        event_type VARCHAR,
        timestamp TIMESTAMP,
        user_id INTEGER
    );
  2. BoilStream automatically:

    • Generates an Avro schema from the table definition
    • Registers it in the Schema Registry with subject events-value
    • Assigns a globally unique schema ID
  3. Schema evolution happens through ALTER TABLE:

    sql
    ALTER TABLE events ADD COLUMN metadata JSON;

    This creates a new schema version automatically.

TopicNameStrategy

BoilStream uses Confluent's TopicNameStrategy for subject naming: {table_name}-value. Key schemas are not currently supported.

API Reference

Health Check

GET /

Returns an empty JSON object indicating the service is healthy.

bash
curl https://localhost:443/schema-registry/

Response:

json
{}

Cluster Metadata

GET /v1/metadata/id

Returns the cluster ID.

bash
curl https://localhost:443/schema-registry/v1/metadata/id

Response:

json
{"id": "boilstream-schema-registry"}

Schema Types

GET /schemas/types

Returns supported schema types.

bash
curl https://localhost:443/schema-registry/schemas/types

Response:

json
["AVRO"]

Schema Operations

GET /schemas/ids/

Get a schema by its global ID.

bash
curl https://localhost:443/schema-registry/schemas/ids/1

Response:

json
{
  "schema": "{\"type\":\"record\",\"name\":\"events\",\"fields\":[...]}",
  "schemaType": "AVRO",
  "references": []
}

GET /schemas/ids/{id}/schema

Get only the schema string (not wrapped in JSON).

bash
curl https://localhost:443/schema-registry/schemas/ids/1/schema

Response:

json
{"type":"record","name":"events","fields":[...]}

GET /schemas/ids/{id}/versions

Get all subject-version pairs for a schema ID.

bash
curl https://localhost:443/schema-registry/schemas/ids/1/versions

Response:

json
[
  {"subject": "events-value", "version": 1}
]

GET /schemas/ids/{id}/subjects

Get all subjects using a schema ID.

bash
curl https://localhost:443/schema-registry/schemas/ids/1/subjects

Response:

json
["events-value"]

Subject Operations

GET /subjects

List all registered subjects.

bash
curl https://localhost:443/schema-registry/subjects

Response:

json
["events-value", "users-value", "orders-value"]

GET /subjects/{subject}/versions

List all versions for a subject.

bash
curl https://localhost:443/schema-registry/subjects/events-value/versions

Response:

json
[1, 2, 3]

GET /subjects/{subject}/versions/

Get a specific schema version. Use latest for the most recent version.

bash
curl https://localhost:443/schema-registry/subjects/events-value/versions/latest

Response:

json
{
  "subject": "events-value",
  "version": 3,
  "id": 42,
  "schema": "{\"type\":\"record\",\"name\":\"events\",\"fields\":[...]}"
}

GET /subjects/{subject}/versions/{version}/schema

Get only the schema string for a version.

bash
curl https://localhost:443/schema-registry/subjects/events-value/versions/1/schema

Response:

json
{"type":"record","name":"events","fields":[...]}

POST /subjects/

Check if a schema exists under a subject. Returns the schema info if found.

bash
curl -X POST \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",...}"}' \
  https://localhost:443/schema-registry/subjects/events-value

Response (if found):

json
{
  "subject": "events-value",
  "version": 1,
  "id": 42,
  "schema": "{\"type\":\"record\",...}"
}

Compatibility Operations

POST /compatibility/subjects/{subject}/versions/

Check if a schema is compatible with a specific version.

bash
curl -X POST \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",...}"}' \
  https://localhost:443/schema-registry/compatibility/subjects/events-value/versions/latest

Response:

json
{"is_compatible": true}

Configuration Operations

GET /config

Get global compatibility configuration.

bash
curl https://localhost:443/schema-registry/config

Response:

json
{"compatibilityLevel": "BACKWARD"}

GET /config/

Get subject-specific configuration.

bash
curl https://localhost:443/schema-registry/config/events-value

Response:

json
{"compatibilityLevel": "BACKWARD"}

Mode Operations

GET /mode

Get the global registry mode.

bash
curl https://localhost:443/schema-registry/mode

Response:

json
{"mode": "READONLY"}

Read-Only Mode

BoilStream's Schema Registry always operates in READONLY mode. Schemas are managed through DuckLake table definitions, not through the registry API.

Read-Only Operations

The following operations return 403 Forbidden because BoilStream's Schema Registry is read-only:

MethodEndpointDescription
POST/subjects/{subject}/versionsRegister new schema
DELETE/subjects/Delete subject
DELETE/subjects/{subject}/versions/Delete version
PUT/configSet global config
PUT/config/Set subject config
DELETE/config/Delete subject config
PUT/modeSet global mode
PUT/mode/Set subject mode

To create or modify schemas, use DuckLake SQL:

sql
-- Create new schema
CREATE TABLE new_topic (
    id BIGINT,
    data VARCHAR
);

-- Evolve schema (adds new version)
ALTER TABLE new_topic ADD COLUMN timestamp TIMESTAMP;

Wire Format

BoilStream supports the standard Confluent wire format for Kafka messages:

[0x00][schema_id (4 bytes, big-endian)][avro_payload]
  • Magic byte: 0x00 indicates Confluent format
  • Schema ID: 4-byte big-endian integer (global schema ID from registry)
  • Payload: Avro-encoded data

Client Examples

Java

java
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;

SchemaRegistryClient client = new CachedSchemaRegistryClient(
    "https://localhost:443/schema-registry",
    100  // cache capacity
);

// Get latest schema
Schema schema = client.getLatestSchemaMetadata("events-value").getSchema();
int schemaId = client.getLatestSchemaMetadata("events-value").getId();

Python

python
from confluent_kafka.schema_registry import SchemaRegistryClient

client = SchemaRegistryClient({
    'url': 'https://localhost:443/schema-registry'
})

# Get latest schema
schema = client.get_latest_version('events-value')
print(f"Schema ID: {schema.schema_id}")
print(f"Schema: {schema.schema.schema_str}")

Node.js

javascript
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';

const registry = new SchemaRegistry({
  host: 'https://localhost:443/schema-registry'
});

// Get latest schema ID
const schemaId = await registry.getLatestSchemaId('events-value');

// Encode data with schema
const encoded = await registry.encode(schemaId, { event_id: 1, event_type: 'click' });

// Decode data
const decoded = await registry.decode(encoded);

curl

bash
# List all subjects
curl https://localhost:443/schema-registry/subjects

# Get latest schema for a subject
curl https://localhost:443/schema-registry/subjects/events-value/versions/latest

# Get schema by global ID
curl https://localhost:443/schema-registry/schemas/ids/42

Error Responses

The Schema Registry returns Confluent-compatible error responses:

json
{
  "error_code": 40401,
  "message": "Subject not found"
}

Common error codes:

CodeDescription
40401Subject not found
40402Version not found
40403Schema not found
42201Invalid schema
42202Invalid version
40301Operation not permitted (read-only mode)

Best Practices

  1. Create tables before producing: Schemas must exist before Kafka producers send messages
  2. Use schema caching: Configure client-side caching to reduce registry lookups
  3. Handle schema evolution carefully: Test compatibility before ALTER TABLE
  4. Use latest version sparingly: Cache schema IDs instead of repeatedly fetching latest
  5. Monitor schema versions: Track version count per subject for evolution auditing