Kafka to Iceberg
Build a streaming pipeline that reads from Kafka and writes to an Apache Iceberg table. This is a common pattern for streaming data into a data lakehouse for analytics and long-term storage.
Prerequisites
- Connectivity to Laminar backend
- Kafka cluster with a topic containing events
- Iceberg REST catalog (e.g., Tabular, AWS Glue, or local REST catalog)
- Object storage (S3, GCS, or local filesystem)
For the rest of this example, we will use <laminar_backend> as a placeholder for your Laminar backend domain.
What You'll Build
- Source: Kafka topic with clickstream events
- Sink: Iceberg table partitioned by date
- Pipeline: Transform and partition data for efficient querying
Via API
Step 1: Create Kafka Connection Profile
curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
-H "Content-Type: application/json" \
-d '{
"name": "kafka-cluster",
"config": {
"bootstrap_servers": "localhost:9092",
"authentication": {"type": "none"}
}
}'Response:
{
"id": "prof_kafka_xxx",
"name": "kafka-cluster",
"connector": "kafka"
}Step 2: Create Iceberg Connection Profile
curl -X POST <laminar_backend>/api/v1/connectors/iceberg/profiles \
-H "Content-Type: application/json" \
-d '{
"name": "iceberg-catalog",
"config": {
"catalog": {
"type": "rest",
"url": "http://localhost:8181",
"warehouse": "my-warehouse"
}
}
}'Response:
{
"id": "prof_iceberg_xxx",
"name": "iceberg-catalog",
"connector": "iceberg"
}Step 3: Create Kafka Source Table
curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
-H "Content-Type: application/json" \
-d '{
"name": "clickstream",
"connection_profile_id": "prof_kafka_xxx",
"config": {
"topic": "clickstream",
"type": {
"source_config": {
"offset": "earliest"
}
}
},
"schema": {
"format": {"json": {}},
"fields": [
{"field_name": "event_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "user_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
{"field_name": "session_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "page_url", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "referrer", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
{"field_name": "event_type", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "event_time", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
]
}
}'Response:
{
"id": "ct_kafka_xxx",
"name": "clickstream",
"connector": "kafka",
"tableType": "source"
}Step 4: Create Iceberg Sink Table
curl -X POST <laminar_backend>/api/v1/connectors/iceberg/tables \
-H "Content-Type: application/json" \
-d '{
"name": "clickstream_iceberg",
"connection_profile_id": "prof_iceberg_xxx",
"config": {
"type": "sink",
"sink_table_config": {
"namespace": "analytics",
"table_name": "clickstream",
"rolling_policy": {
"file_size_bytes": 134217728,
"interval_seconds": 300
},
"partitioning": {
"fields": [
{"name": "event_date", "transform": "identity"}
],
"shuffle_by_partition": {
"enabled": true
}
}
}
},
"schema": {
"format": {"parquet": {}},
"fields": [
{"field_name": "event_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "user_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
{"field_name": "session_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "page_url", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "referrer", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
{"field_name": "event_type", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "event_time", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false},
{"field_name": "event_date", "field_type": {"type": {"primitive": "Date32"}}, "nullable": false}
]
}
}'Response:
{
"id": "ct_iceberg_xxx",
"name": "clickstream_iceberg",
"connector": "iceberg",
"tableType": "sink"
}Step 5: Create Pipeline
curl -X POST <laminar_backend>/api/v1/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "kafka-to-iceberg-clickstream",
"query": "INSERT INTO clickstream_iceberg SELECT event_id, user_id, session_id, page_url, referrer, event_type, event_time, CAST(event_time AS DATE) as event_date FROM clickstream",
"parallelism": 4
}'Response:
{
"id": "pl_xxx",
"name": "kafka-to-iceberg-clickstream",
"parallelism": 4,
"actionText": "Running"
}Step 6: Monitor Pipeline
# Check pipeline status
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}
# View checkpoints (Iceberg commits)
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpointsVia CLI
Step 1: Create YAML Manifest
Create a file called kafka-to-iceberg.yaml:
---
apiVersion: laminar.io/v1
kind: Profile
spec:
name: kafka-cluster
connector: kafka
config:
bootstrap_servers: localhost:9092
authentication:
type: none
---
apiVersion: laminar.io/v1
kind: Profile
spec:
name: iceberg-catalog
connector: iceberg
config:
catalog:
type: rest
url: http://localhost:8181
warehouse: my-warehouse
---
apiVersion: laminar.io/v1
kind: Table
spec:
name: clickstream
connector: kafka
connection_profile_id: kafka-cluster
config:
topic: clickstream
type:
source_config:
offset: earliest
schema:
format:
json: {}
fields:
- field_name: event_id
field_type:
type:
primitive: Utf8
nullable: false
- field_name: user_id
field_type:
type:
primitive: Int64
nullable: false
- field_name: session_id
field_type:
type:
primitive: Utf8
nullable: false
- field_name: page_url
field_type:
type:
primitive: Utf8
nullable: false
- field_name: referrer
field_type:
type:
primitive: Utf8
nullable: true
- field_name: event_type
field_type:
type:
primitive: Utf8
nullable: false
- field_name: event_time
field_type:
type:
primitive: DateTime
nullable: false
---
apiVersion: laminar.io/v1
kind: Table
spec:
name: clickstream_iceberg
connector: iceberg
connection_profile_id: iceberg-catalog
config:
type: sink
sink_table_config:
namespace: analytics
table_name: clickstream
rolling_policy:
file_size_bytes: 134217728
interval_seconds: 300
partitioning:
fields:
- name: event_date
transform: identity
shuffle_by_partition:
enabled: true
schema:
format:
parquet: {}
fields:
- field_name: event_id
field_type:
type:
primitive: Utf8
nullable: false
- field_name: user_id
field_type:
type:
primitive: Int64
nullable: false
- field_name: session_id
field_type:
type:
primitive: Utf8
nullable: false
- field_name: page_url
field_type:
type:
primitive: Utf8
nullable: false
- field_name: referrer
field_type:
type:
primitive: Utf8
nullable: true
- field_name: event_type
field_type:
type:
primitive: Utf8
nullable: false
- field_name: event_time
field_type:
type:
primitive: DateTime
nullable: false
- field_name: event_date
field_type:
type:
primitive: Date32
nullable: false
---
apiVersion: laminar.io/v1
kind: Pipeline
spec:
name: kafka-to-iceberg-clickstream
query: |
INSERT INTO clickstream_iceberg
SELECT
event_id,
user_id,
session_id,
page_url,
referrer,
event_type,
event_time,
CAST(event_time AS DATE) as event_date
FROM clickstream
parallelism: 4Step 2: Apply Manifest
lmnr apply -f kafka-to-iceberg.yamlOutput:
Parsed 5 resource(s) from kafka-to-iceberg.yaml
[1/5] Applying Profile 'kafka-cluster'...
✓ Created Profile 'kafka-cluster' (ID: prof_xxx)
[2/5] Applying Profile 'iceberg-catalog'...
✓ Created Profile 'iceberg-catalog' (ID: prof_xxx)
[3/5] Applying Table 'clickstream'...
✓ Created Table 'clickstream' (ID: ct_xxx)
[4/5] Applying Table 'clickstream_iceberg'...
✓ Created Table 'clickstream_iceberg' (ID: ct_xxx)
[5/5] Applying Pipeline 'kafka-to-iceberg-clickstream'...
✓ Created Pipeline 'kafka-to-iceberg-clickstream' (ID: pl_xxx)
All resources applied successfully
Step 3: Verify Resources
# List profiles
lmnr list profiles
# List tables
lmnr list tables
# Check pipeline status
lmnr list pipelines
# Get pipeline details
lmnr get pipelines <pipeline_id>Via UI
TBD
Configuration Details
Rolling Policy
Controls when Laminar creates new Parquet files:
| Parameter | Description | Recommended |
|---|---|---|
file_size_bytes | Create new file after reaching this size | 128MB - 256MB |
interval_seconds | Create new file after this many seconds | 300 (5 min) |
inactivity_seconds | Create new file after inactivity | Optional |
Partitioning
Iceberg partitioning improves query performance:
| Transform | Description | Use Case |
|---|---|---|
identity | Use field value as-is | Date fields |
hour | Extract hour from timestamp | High-volume hourly data |
month | Extract month from timestamp | Monthly aggregations |
year | Extract year from timestamp | Historical data |
Cleanup
Via API
# Stop and delete pipeline
curl -X PATCH <laminar_backend>/api/v1/pipelines/{pipeline_id} \
-H "Content-Type: application/json" \
-d '{"stop": "graceful"}'
curl -X DELETE <laminar_backend>/api/v1/pipelines/{pipeline_id}
# Delete tables
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{kafka_table_id}
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{iceberg_table_id}
# Delete profiles
curl -X DELETE <laminar_backend>/api/v1/connection_profiles/{kafka_profile_id}
curl -X DELETE <laminar_backend>/api/v1/connection_profiles/{iceberg_profile_id}Via CLI
lmnr delete pipelines <pipeline_id>
lmnr delete tables <kafka_table_id>
lmnr delete tables <iceberg_table_id>
lmnr delete profiles <kafka_profile_id>
lmnr delete profiles <iceberg_profile_id>Next Steps
- Iceberg Connector Reference - Full configuration options
- Observability - Monitor your pipelines