Schema
Schemas define the structure and format of data in your tables. Every source and sink table requires a schema that specifies the data format and field definitions.
Schema Object Structure
A schema consists of:
| Property | Required | Description |
|---|---|---|
format | Yes | Data serialization format (JSON, Avro, Parquet, etc.) |
fields | Yes | Array of field definitions |
primary_keys | No | Array of field names that form the primary key |
bad_data | No | How to handle malformed records (Fail or Drop) |
inferred | No | Whether the schema was auto-inferred |
format
json
The most common format for streaming data.
| Option | Type | Default | Description |
|---|---|---|---|
confluentSchemaRegistry | boolean | false | Use Confluent Schema Registry |
schemaId | number | - | Schema version ID from registry |
includeSchema | boolean | false | Include schema in each message |
debezium | boolean | false | Enable Debezium CDC mode |
unstructured | boolean | false | Treat payload as single unstructured field |
timestampFormat | string | "rfc3339" | rfc3339 or unix_millis |
decimalEncoding | string | "number" | number, string, or bytes |
{
"format": {
"json": {
"timestampFormat": "rfc3339"
}
}
}avro
Apache Avro binary format.
| Option | Type | Default | Description |
|---|---|---|---|
confluentSchemaRegistry | boolean | false | Use Confluent Schema Registry |
rawDatums | boolean | false | Raw Avro datums without container |
intoUnstructuredJson | boolean | false | Convert to unstructured JSON |
readerSchema | string | - | Override reader schema |
schemaId | number | - | Schema version ID |
{
"format": {
"avro": {
"confluentSchemaRegistry": true
}
}
}parquet
Apache Parquet columnar format, typically used for sink tables.
| Option | Type | Default | Description |
|---|---|---|---|
compression | string | "uncompressed" | uncompressed, snappy, gzip, zstd, lz4 |
rowGroupBytes | number | - | Target row group size in bytes |
{
"format": {
"parquet": {
"compression": "snappy",
"rowGroupBytes": 134217728
}
}
}protobuf
Protocol Buffers binary format.
| Option | Type | Default | Description |
|---|---|---|---|
messageName | string | - | Protobuf message type name |
compiledSchema | bytes | - | Compiled protobuf descriptor |
confluentSchemaRegistry | boolean | false | Use Confluent Schema Registry |
lengthDelimited | boolean | false | Use length-delimited framing |
intoUnstructuredJson | boolean | false | Convert to unstructured JSON |
{
"format": {
"protobuf": {
"confluentSchemaRegistry": true,
"messageName": "MyMessage"
}
}
}rawString
Single UTF-8 string field. Requires exactly one field of type Utf8.
{
"format": {
"rawString": {}
}
}rawBytes
Single binary field. Requires exactly one field of type Bytes.
{
"format": {
"rawBytes": {}
}
}fields
Primitive Types
| Type | Description | SQL Equivalent |
|---|---|---|
Int32 | 32-bit signed integer | INTEGER |
Int64 | 64-bit signed integer | BIGINT |
UInt32 | 32-bit unsigned integer | - |
UInt64 | 64-bit unsigned integer | - |
F32 | 32-bit floating point | FLOAT |
F64 | 64-bit floating point | DOUBLE |
Bool | Boolean | BOOLEAN |
Utf8 | UTF-8 string | VARCHAR / TEXT |
Bytes | Binary data | BYTEA |
DateTime | RFC3339 datetime | TIMESTAMP |
Date32 | Date (days since epoch) | DATE |
UnixMillis | Unix timestamp (milliseconds) | TIMESTAMP |
UnixMicros | Unix timestamp (microseconds) | TIMESTAMP |
UnixNanos | Unix timestamp (nanoseconds) | TIMESTAMP |
Json | JSON data | JSON |
Field Definition
Each field in the fields array has this structure:
{
"field_name": "user_id",
"field_type": {
"type": {
"primitive": "Int64"
}
},
"nullable": false
}struct (Nested Objects)
For nested objects, use the struct type with nested fields:
{
"field_name": "address",
"field_type": {
"type": {
"struct": {
"fields": [
{
"field_name": "street",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": true
},
{
"field_name": "city",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": false
},
{
"field_name": "zip",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": true
}
]
}
}
},
"nullable": true
}list (Arrays)
For arrays, use the list type with an element definition:
{
"field_name": "tags",
"field_type": {
"type": {
"list": {
"field_name": "item",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": false
}
}
},
"nullable": true
}Complete Examples
Simple JSON Schema
{
"format": { "json": {} },
"fields": [
{
"field_name": "event_id",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": false
},
{
"field_name": "user_id",
"field_type": { "type": { "primitive": "Int64" } },
"nullable": false
},
{
"field_name": "event_type",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": false
},
{
"field_name": "amount",
"field_type": { "type": { "primitive": "F64" } },
"nullable": true
},
{
"field_name": "timestamp",
"field_type": { "type": { "primitive": "DateTime" } },
"nullable": false
}
]
}Schema with Nested Struct
{
"format": { "json": {} },
"fields": [
{
"field_name": "order_id",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": false
},
{
"field_name": "customer",
"field_type": {
"type": {
"struct": {
"fields": [
{
"field_name": "id",
"field_type": { "type": { "primitive": "Int64" } },
"nullable": false
},
{
"field_name": "name",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": false
},
{
"field_name": "email",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": true
}
]
}
}
},
"nullable": false
},
{
"field_name": "items",
"field_type": {
"type": {
"list": {
"field_name": "item",
"field_type": {
"type": {
"struct": {
"fields": [
{
"field_name": "sku",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": false
},
{
"field_name": "quantity",
"field_type": { "type": { "primitive": "Int32" } },
"nullable": false
}
]
}
}
},
"nullable": false
}
}
},
"nullable": false
}
]
}Parquet Sink Schema
{
"format": {
"parquet": {
"compression": "snappy",
"rowGroupBytes": 134217728
}
},
"fields": [
{
"field_name": "id",
"field_type": { "type": { "primitive": "Utf8" } },
"nullable": false
},
{
"field_name": "value",
"field_type": { "type": { "primitive": "F64" } },
"nullable": false
},
{
"field_name": "event_date",
"field_type": { "type": { "primitive": "Date32" } },
"nullable": false
}
]
}bad_data
Control how malformed records are handled:
| Mode | Description |
|---|---|
Fail | Pipeline fails on bad records (default) |
Drop | Skip bad records and continue processing |
{
"format": { "json": {} },
"bad_data": { "Drop": {} },
"fields": [...]
}