Work in Progress: This page is under development. Use the feedback button on the bottom right to help us improve it.

Kafka to Kafka

Build a streaming pipeline that reads from one Kafka topic, transforms the data, and writes to another Kafka topic. This pattern is common for real-time filtering, enrichment, and routing.

Prerequisites

  • Connectivity to Laminar backend
  • Kafka cluster accessible
  • Kafka topics created: raw-events (source) and processed-events (sink)

For the rest of this example, we will use <laminar_backend> as a placeholder for your Laminar backend domain.

What You'll Build

  • Source: Kafka topic containing raw user events
  • Sink: Kafka topic for filtered and enriched events
  • Pipeline: Filter bot traffic and add processing metadata

Via API

Step 1: Create Connection Profile

Create a Kafka connection profile with your cluster details:

curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
  -H "Content-Type: application/json" \
  -d '{
    "name": "local-kafka",
    "config": {
      "bootstrap_servers": "localhost:9092",
      "authentication": {"type": "none"}
    }
  }'

Response:

{
  "id": "prof_xxx",
  "name": "local-kafka",
  "connector": "kafka"
}

For authenticated clusters (SASL):

curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
  -H "Content-Type: application/json" \
  -d '{
    "name": "prod-kafka",
    "config": {
      "bootstrap_servers": "broker1:9092,broker2:9092",
      "authentication": {
        "type": "sasl",
        "sasl_config": {
          "mechanism": "SCRAM-SHA-256",
          "username": "your-username",
          "password": "your-password",
          "protocol": "SASL_SSL"
        }
      }
    }
  }'

Step 2: Create Source Table

Create a source table connected to the raw-events topic:

curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
  -H "Content-Type: application/json" \
  -d '{
    "name": "raw_events",
    "connection_profile_id": "prof_xxx",
    "config": {
      "topic": "raw-events",
      "type": {
        "source_config": {
          "offset": "latest"
        }
      }
    },
    "schema": {
      "format": {"json": {}},
      "fields": [
        {"field_name": "event_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "user_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
        {"field_name": "event_type", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "user_agent", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
        {"field_name": "ip_address", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
        {"field_name": "timestamp", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
      ]
    }
  }'

Response:

{
  "id": "ct_xxx",
  "name": "raw_events",
  "connector": "kafka",
  "tableType": "source"
}

Step 3: Create Sink Table

Create a sink table for the processed-events topic:

curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
  -H "Content-Type: application/json" \
  -d '{
    "name": "processed_events",
    "connection_profile_id": "prof_xxx",
    "config": {
      "topic": "processed-events",
      "type": {
        "sink_config": {
          "commit_mode": "at_least_once"
        }
      }
    },
    "schema": {
      "format": {"json": {}},
      "fields": [
        {"field_name": "event_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "user_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
        {"field_name": "event_type", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "ip_address", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
        {"field_name": "timestamp", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false},
        {"field_name": "processed_at", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
      ]
    }
  }'

Response:

{
  "id": "ct_xxx",
  "name": "processed_events",
  "connector": "kafka",
  "tableType": "sink"
}

Step 4: Create Pipeline

Create a pipeline that filters out bot traffic and adds processing timestamp:

curl -X POST <laminar_backend>/api/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "filter-bot-traffic",
    "query": "INSERT INTO processed_events SELECT event_id, user_id, event_type, ip_address, timestamp, NOW() as processed_at FROM raw_events WHERE user_agent NOT LIKE '\''%bot%'\'' AND user_agent NOT LIKE '\''%crawler%'\''",
    "parallelism": 4
  }'

Response:

{
  "id": "pl_xxx",
  "name": "filter-bot-traffic",
  "query": "INSERT INTO processed_events SELECT ...",
  "parallelism": 4,
  "actionText": "Running"
}

Step 5: Monitor Pipeline

# Check pipeline status
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}
 
# List jobs
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs
 
# Get job metrics
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups

Via CLI

Step 1: Create YAML Manifest

Create a file called kafka-to-kafka.yaml:

---
apiVersion: laminar.io/v1
kind: Profile
spec:
  name: local-kafka
  connector: kafka
  config:
    bootstrap_servers: localhost:9092
    authentication:
      type: none
 
---
apiVersion: laminar.io/v1
kind: Table
spec:
  name: raw_events
  connector: kafka
  connection_profile_id: local-kafka
  config:
    topic: raw-events
    type:
      source_config:
        offset: latest
  schema:
    format:
      json: {}
    fields:
      - field_name: event_id
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: user_id
        field_type:
          type:
            primitive: Int64
        nullable: false
      - field_name: event_type
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: user_agent
        field_type:
          type:
            primitive: Utf8
        nullable: true
      - field_name: ip_address
        field_type:
          type:
            primitive: Utf8
        nullable: true
      - field_name: timestamp
        field_type:
          type:
            primitive: DateTime
        nullable: false
 
---
apiVersion: laminar.io/v1
kind: Table
spec:
  name: processed_events
  connector: kafka
  connection_profile_id: local-kafka
  config:
    topic: processed-events
    type:
      sink_config:
        commit_mode: at_least_once
  schema:
    format:
      json: {}
    fields:
      - field_name: event_id
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: user_id
        field_type:
          type:
            primitive: Int64
        nullable: false
      - field_name: event_type
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: ip_address
        field_type:
          type:
            primitive: Utf8
        nullable: true
      - field_name: timestamp
        field_type:
          type:
            primitive: DateTime
        nullable: false
      - field_name: processed_at
        field_type:
          type:
            primitive: DateTime
        nullable: false
 
---
apiVersion: laminar.io/v1
kind: Pipeline
spec:
  name: filter-bot-traffic
  query: |
    INSERT INTO processed_events
    SELECT
      event_id,
      user_id,
      event_type,
      ip_address,
      timestamp,
      NOW() as processed_at
    FROM raw_events
    WHERE user_agent NOT LIKE '%bot%'
      AND user_agent NOT LIKE '%crawler%'
  parallelism: 4

Step 2: Apply Manifest

lmnr apply -f kafka-to-kafka.yaml

Output:

Parsed 4 resource(s) from kafka-to-kafka.yaml

[1/4] Applying Profile 'local-kafka'...
  āœ“ Created Profile 'local-kafka' (ID: prof_xxx)

[2/4] Applying Table 'raw_events'...
  āœ“ Created Table 'raw_events' (ID: ct_xxx)

[3/4] Applying Table 'processed_events'...
  āœ“ Created Table 'processed_events' (ID: ct_xxx)

[4/4] Applying Pipeline 'filter-bot-traffic'...
  āœ“ Created Pipeline 'filter-bot-traffic' (ID: pl_xxx)

All resources applied successfully

Step 3: Verify Resources

# List profiles
lmnr list profiles

Output:

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│ NAME        ┆ ID            ┆ CONNECTOR ┆ CREATED │
ā•žā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•Ŗā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•Ŗā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•Ŗā•ā•ā•ā•ā•ā•ā•ā•ā•ā•”
│ local-kafka ┆ prof_xxx      ┆ kafka     ┆ 1m      │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
# List tables
lmnr list tables

Output:

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│ NAME             ┆ ID            ┆ CONNECTOR ┆ PROFILE     ┆ CREATED │
ā•žā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•Ŗā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•Ŗā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•Ŗā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•Ŗā•ā•ā•ā•ā•ā•ā•ā•ā•ā•”
│ raw_events       ┆ ct_xxx        ┆ kafka     ┆ local-kafka ┆ 1m      │
ā”œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā”¼ā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā”¼ā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā”¼ā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā”¼ā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā•Œā”¤
│ processed_events ┆ ct_xxx        ┆ kafka     ┆ local-kafka ┆ 1m      │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
# Check pipeline status
lmnr list pipelines

Via UI

TBD


Cleanup

Via API

# Stop and delete pipeline
curl -X PATCH <laminar_backend>/api/v1/pipelines/{pipeline_id} \
  -H "Content-Type: application/json" \
  -d '{"stop": "graceful"}'
 
curl -X DELETE <laminar_backend>/api/v1/pipelines/{pipeline_id}
 
# Delete tables
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{source_table_id}
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{sink_table_id}
 
# Delete profile
curl -X DELETE <laminar_backend>/api/v1/connection_profiles/{profile_id}

Via CLI

lmnr delete pipelines <pipeline_id>
lmnr delete tables <source_table_id>
lmnr delete tables <sink_table_id>
lmnr delete profiles <profile_id>

Next Steps