Schema Registry API
BoilStream includes a built-in Confluent-compatible Schema Registry that provides schema discovery for Kafka producers.
Overview
The Schema Registry is:
- Confluent-compatible: Implements the standard Confluent Schema Registry REST API
- Read-only: Schemas are created through DuckLake table definitions, not via the API
- Automatically enabled: Available on the auth server at
/schema-registry - No authentication required: All GET endpoints are public for schema discovery
Base URL
https://localhost:443/schema-registryThe Schema Registry runs on the same port as the auth server (default 443) under the /schema-registry path.
How Schemas are Managed
Unlike traditional Confluent Schema Registry where clients register schemas, BoilStream takes a table-first approach:
Create a table in DuckLake via the PostgreSQL interface:
sqlCREATE TABLE events ( event_id BIGINT, event_type VARCHAR, timestamp TIMESTAMP, user_id INTEGER );BoilStream automatically:
- Generates an Avro schema from the table definition
- Registers it in the Schema Registry with subject
events-value - Assigns a globally unique schema ID
Schema evolution happens through
ALTER TABLE:sqlALTER TABLE events ADD COLUMN metadata JSON;This creates a new schema version automatically.
TopicNameStrategy
BoilStream uses Confluent's TopicNameStrategy for subject naming: {table_name}-value. Key schemas are not currently supported.
API Reference
Health Check
GET /
Returns an empty JSON object indicating the service is healthy.
curl https://localhost:443/schema-registry/Response:
{}Cluster Metadata
GET /v1/metadata/id
Returns the cluster ID.
curl https://localhost:443/schema-registry/v1/metadata/idResponse:
{"id": "boilstream-schema-registry"}Schema Types
GET /schemas/types
Returns supported schema types.
curl https://localhost:443/schema-registry/schemas/typesResponse:
["AVRO"]Schema Operations
GET /schemas/ids/
Get a schema by its global ID.
curl https://localhost:443/schema-registry/schemas/ids/1Response:
{
"schema": "{\"type\":\"record\",\"name\":\"events\",\"fields\":[...]}",
"schemaType": "AVRO",
"references": []
}GET /schemas/ids/{id}/schema
Get only the schema string (not wrapped in JSON).
curl https://localhost:443/schema-registry/schemas/ids/1/schemaResponse:
{"type":"record","name":"events","fields":[...]}GET /schemas/ids/{id}/versions
Get all subject-version pairs for a schema ID.
curl https://localhost:443/schema-registry/schemas/ids/1/versionsResponse:
[
{"subject": "events-value", "version": 1}
]GET /schemas/ids/{id}/subjects
Get all subjects using a schema ID.
curl https://localhost:443/schema-registry/schemas/ids/1/subjectsResponse:
["events-value"]Subject Operations
GET /subjects
List all registered subjects.
curl https://localhost:443/schema-registry/subjectsResponse:
["events-value", "users-value", "orders-value"]GET /subjects/{subject}/versions
List all versions for a subject.
curl https://localhost:443/schema-registry/subjects/events-value/versionsResponse:
[1, 2, 3]GET /subjects/{subject}/versions/
Get a specific schema version. Use latest for the most recent version.
curl https://localhost:443/schema-registry/subjects/events-value/versions/latestResponse:
{
"subject": "events-value",
"version": 3,
"id": 42,
"schema": "{\"type\":\"record\",\"name\":\"events\",\"fields\":[...]}"
}GET /subjects/{subject}/versions/{version}/schema
Get only the schema string for a version.
curl https://localhost:443/schema-registry/subjects/events-value/versions/1/schemaResponse:
{"type":"record","name":"events","fields":[...]}POST /subjects/
Check if a schema exists under a subject. Returns the schema info if found.
curl -X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",...}"}' \
https://localhost:443/schema-registry/subjects/events-valueResponse (if found):
{
"subject": "events-value",
"version": 1,
"id": 42,
"schema": "{\"type\":\"record\",...}"
}Compatibility Operations
POST /compatibility/subjects/{subject}/versions/
Check if a schema is compatible with a specific version.
curl -X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",...}"}' \
https://localhost:443/schema-registry/compatibility/subjects/events-value/versions/latestResponse:
{"is_compatible": true}Configuration Operations
GET /config
Get global compatibility configuration.
curl https://localhost:443/schema-registry/configResponse:
{"compatibilityLevel": "BACKWARD"}GET /config/
Get subject-specific configuration.
curl https://localhost:443/schema-registry/config/events-valueResponse:
{"compatibilityLevel": "BACKWARD"}Mode Operations
GET /mode
Get the global registry mode.
curl https://localhost:443/schema-registry/modeResponse:
{"mode": "READONLY"}Read-Only Mode
BoilStream's Schema Registry always operates in READONLY mode. Schemas are managed through DuckLake table definitions, not through the registry API.
Read-Only Operations
The following operations return 403 Forbidden because BoilStream's Schema Registry is read-only:
| Method | Endpoint | Description |
|---|---|---|
| POST | /subjects/{subject}/versions | Register new schema |
| DELETE | /subjects/ | Delete subject |
| DELETE | /subjects/{subject}/versions/ | Delete version |
| PUT | /config | Set global config |
| PUT | /config/ | Set subject config |
| DELETE | /config/ | Delete subject config |
| PUT | /mode | Set global mode |
| PUT | /mode/ | Set subject mode |
To create or modify schemas, use DuckLake SQL:
-- Create new schema
CREATE TABLE new_topic (
id BIGINT,
data VARCHAR
);
-- Evolve schema (adds new version)
ALTER TABLE new_topic ADD COLUMN timestamp TIMESTAMP;Wire Format
BoilStream supports the standard Confluent wire format for Kafka messages:
[0x00][schema_id (4 bytes, big-endian)][avro_payload]- Magic byte:
0x00indicates Confluent format - Schema ID: 4-byte big-endian integer (global schema ID from registry)
- Payload: Avro-encoded data
Client Examples
Java
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
SchemaRegistryClient client = new CachedSchemaRegistryClient(
"https://localhost:443/schema-registry",
100 // cache capacity
);
// Get latest schema
Schema schema = client.getLatestSchemaMetadata("events-value").getSchema();
int schemaId = client.getLatestSchemaMetadata("events-value").getId();Python
from confluent_kafka.schema_registry import SchemaRegistryClient
client = SchemaRegistryClient({
'url': 'https://localhost:443/schema-registry'
})
# Get latest schema
schema = client.get_latest_version('events-value')
print(f"Schema ID: {schema.schema_id}")
print(f"Schema: {schema.schema.schema_str}")Node.js
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const registry = new SchemaRegistry({
host: 'https://localhost:443/schema-registry'
});
// Get latest schema ID
const schemaId = await registry.getLatestSchemaId('events-value');
// Encode data with schema
const encoded = await registry.encode(schemaId, { event_id: 1, event_type: 'click' });
// Decode data
const decoded = await registry.decode(encoded);curl
# List all subjects
curl https://localhost:443/schema-registry/subjects
# Get latest schema for a subject
curl https://localhost:443/schema-registry/subjects/events-value/versions/latest
# Get schema by global ID
curl https://localhost:443/schema-registry/schemas/ids/42Error Responses
The Schema Registry returns Confluent-compatible error responses:
{
"error_code": 40401,
"message": "Subject not found"
}Common error codes:
| Code | Description |
|---|---|
| 40401 | Subject not found |
| 40402 | Version not found |
| 40403 | Schema not found |
| 42201 | Invalid schema |
| 42202 | Invalid version |
| 40301 | Operation not permitted (read-only mode) |
Best Practices
- Create tables before producing: Schemas must exist before Kafka producers send messages
- Use schema caching: Configure client-side caching to reduce registry lookups
- Handle schema evolution carefully: Test compatibility before
ALTER TABLE - Use latest version sparingly: Cache schema IDs instead of repeatedly fetching latest
- Monitor schema versions: Track version count per subject for evolution auditing