Work in Progress: This page is under development. Use the feedback button on the bottom right to help us improve it.

Kafka to Iceberg

Build a streaming pipeline that reads from Kafka and writes to an Apache Iceberg table. This is a common pattern for streaming data into a data lakehouse for analytics and long-term storage.

Prerequisites

  • Connectivity to Laminar backend
  • Kafka cluster with a topic containing events
  • Iceberg REST catalog (e.g., Tabular, AWS Glue, or local REST catalog)
  • Object storage (S3, GCS, or local filesystem)

For the rest of this example, we will use <laminar_backend> as a placeholder for your Laminar backend domain.

What You'll Build

  • Source: Kafka topic with clickstream events
  • Sink: Iceberg table partitioned by date
  • Pipeline: Transform and partition data for efficient querying

Via API

Step 1: Create Kafka Connection Profile

curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
  -H "Content-Type: application/json" \
  -d '{
    "name": "kafka-cluster",
    "config": {
      "bootstrap_servers": "localhost:9092",
      "authentication": {"type": "none"}
    }
  }'

Response:

{
  "id": "prof_kafka_xxx",
  "name": "kafka-cluster",
  "connector": "kafka"
}

Step 2: Create Iceberg Connection Profile

curl -X POST <laminar_backend>/api/v1/connectors/iceberg/profiles \
  -H "Content-Type: application/json" \
  -d '{
    "name": "iceberg-catalog",
    "config": {
      "catalog": {
        "type": "rest",
        "url": "http://localhost:8181",
        "warehouse": "my-warehouse"
      }
    }
  }'

Response:

{
  "id": "prof_iceberg_xxx",
  "name": "iceberg-catalog",
  "connector": "iceberg"
}

Step 3: Create Kafka Source Table

curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
  -H "Content-Type: application/json" \
  -d '{
    "name": "clickstream",
    "connection_profile_id": "prof_kafka_xxx",
    "config": {
      "topic": "clickstream",
      "type": {
        "source_config": {
          "offset": "earliest"
        }
      }
    },
    "schema": {
      "format": {"json": {}},
      "fields": [
        {"field_name": "event_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "user_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
        {"field_name": "session_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "page_url", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "referrer", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
        {"field_name": "event_type", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "event_time", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
      ]
    }
  }'

Response:

{
  "id": "ct_kafka_xxx",
  "name": "clickstream",
  "connector": "kafka",
  "tableType": "source"
}

Step 4: Create Iceberg Sink Table

curl -X POST <laminar_backend>/api/v1/connectors/iceberg/tables \
  -H "Content-Type: application/json" \
  -d '{
    "name": "clickstream_iceberg",
    "connection_profile_id": "prof_iceberg_xxx",
    "config": {
      "type": "sink",
      "sink_table_config": {
        "namespace": "analytics",
        "table_name": "clickstream",
        "rolling_policy": {
          "file_size_bytes": 134217728,
          "interval_seconds": 300
        },
        "partitioning": {
          "fields": [
            {"name": "event_date", "transform": "identity"}
          ],
          "shuffle_by_partition": {
            "enabled": true
          }
        }
      }
    },
    "schema": {
      "format": {"parquet": {}},
      "fields": [
        {"field_name": "event_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "user_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
        {"field_name": "session_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "page_url", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "referrer", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
        {"field_name": "event_type", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "event_time", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false},
        {"field_name": "event_date", "field_type": {"type": {"primitive": "Date32"}}, "nullable": false}
      ]
    }
  }'

Response:

{
  "id": "ct_iceberg_xxx",
  "name": "clickstream_iceberg",
  "connector": "iceberg",
  "tableType": "sink"
}

Step 5: Create Pipeline

curl -X POST <laminar_backend>/api/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "kafka-to-iceberg-clickstream",
    "query": "INSERT INTO clickstream_iceberg SELECT event_id, user_id, session_id, page_url, referrer, event_type, event_time, CAST(event_time AS DATE) as event_date FROM clickstream",
    "parallelism": 4
  }'

Response:

{
  "id": "pl_xxx",
  "name": "kafka-to-iceberg-clickstream",
  "parallelism": 4,
  "actionText": "Running"
}

Step 6: Monitor Pipeline

# Check pipeline status
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}
 
# View checkpoints (Iceberg commits)
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints

Via CLI

Step 1: Create YAML Manifest

Create a file called kafka-to-iceberg.yaml:

---
apiVersion: laminar.io/v1
kind: Profile
spec:
  name: kafka-cluster
  connector: kafka
  config:
    bootstrap_servers: localhost:9092
    authentication:
      type: none
 
---
apiVersion: laminar.io/v1
kind: Profile
spec:
  name: iceberg-catalog
  connector: iceberg
  config:
    catalog:
      type: rest
      url: http://localhost:8181
      warehouse: my-warehouse
 
---
apiVersion: laminar.io/v1
kind: Table
spec:
  name: clickstream
  connector: kafka
  connection_profile_id: kafka-cluster
  config:
    topic: clickstream
    type:
      source_config:
        offset: earliest
  schema:
    format:
      json: {}
    fields:
      - field_name: event_id
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: user_id
        field_type:
          type:
            primitive: Int64
        nullable: false
      - field_name: session_id
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: page_url
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: referrer
        field_type:
          type:
            primitive: Utf8
        nullable: true
      - field_name: event_type
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: event_time
        field_type:
          type:
            primitive: DateTime
        nullable: false
 
---
apiVersion: laminar.io/v1
kind: Table
spec:
  name: clickstream_iceberg
  connector: iceberg
  connection_profile_id: iceberg-catalog
  config:
    type: sink
    sink_table_config:
      namespace: analytics
      table_name: clickstream
      rolling_policy:
        file_size_bytes: 134217728
        interval_seconds: 300
      partitioning:
        fields:
          - name: event_date
            transform: identity
        shuffle_by_partition:
          enabled: true
  schema:
    format:
      parquet: {}
    fields:
      - field_name: event_id
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: user_id
        field_type:
          type:
            primitive: Int64
        nullable: false
      - field_name: session_id
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: page_url
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: referrer
        field_type:
          type:
            primitive: Utf8
        nullable: true
      - field_name: event_type
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: event_time
        field_type:
          type:
            primitive: DateTime
        nullable: false
      - field_name: event_date
        field_type:
          type:
            primitive: Date32
        nullable: false
 
---
apiVersion: laminar.io/v1
kind: Pipeline
spec:
  name: kafka-to-iceberg-clickstream
  query: |
    INSERT INTO clickstream_iceberg
    SELECT
      event_id,
      user_id,
      session_id,
      page_url,
      referrer,
      event_type,
      event_time,
      CAST(event_time AS DATE) as event_date
    FROM clickstream
  parallelism: 4

Step 2: Apply Manifest

lmnr apply -f kafka-to-iceberg.yaml

Output:

Parsed 5 resource(s) from kafka-to-iceberg.yaml

[1/5] Applying Profile 'kafka-cluster'...
  ✓ Created Profile 'kafka-cluster' (ID: prof_xxx)

[2/5] Applying Profile 'iceberg-catalog'...
  ✓ Created Profile 'iceberg-catalog' (ID: prof_xxx)

[3/5] Applying Table 'clickstream'...
  ✓ Created Table 'clickstream' (ID: ct_xxx)

[4/5] Applying Table 'clickstream_iceberg'...
  ✓ Created Table 'clickstream_iceberg' (ID: ct_xxx)

[5/5] Applying Pipeline 'kafka-to-iceberg-clickstream'...
  ✓ Created Pipeline 'kafka-to-iceberg-clickstream' (ID: pl_xxx)

All resources applied successfully

Step 3: Verify Resources

# List profiles
lmnr list profiles
 
# List tables
lmnr list tables
 
# Check pipeline status
lmnr list pipelines
 
# Get pipeline details
lmnr get pipelines <pipeline_id>

Via UI

TBD


Configuration Details

Rolling Policy

Controls when Laminar creates new Parquet files:

ParameterDescriptionRecommended
file_size_bytesCreate new file after reaching this size128MB - 256MB
interval_secondsCreate new file after this many seconds300 (5 min)
inactivity_secondsCreate new file after inactivityOptional

Partitioning

Iceberg partitioning improves query performance:

TransformDescriptionUse Case
identityUse field value as-isDate fields
hourExtract hour from timestampHigh-volume hourly data
monthExtract month from timestampMonthly aggregations
yearExtract year from timestampHistorical data

Cleanup

Via API

# Stop and delete pipeline
curl -X PATCH <laminar_backend>/api/v1/pipelines/{pipeline_id} \
  -H "Content-Type: application/json" \
  -d '{"stop": "graceful"}'
 
curl -X DELETE <laminar_backend>/api/v1/pipelines/{pipeline_id}
 
# Delete tables
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{kafka_table_id}
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{iceberg_table_id}
 
# Delete profiles
curl -X DELETE <laminar_backend>/api/v1/connection_profiles/{kafka_profile_id}
curl -X DELETE <laminar_backend>/api/v1/connection_profiles/{iceberg_profile_id}

Via CLI

lmnr delete pipelines <pipeline_id>
lmnr delete tables <kafka_table_id>
lmnr delete tables <iceberg_table_id>
lmnr delete profiles <kafka_profile_id>
lmnr delete profiles <iceberg_profile_id>

Next Steps