Kafka to Kafka
Build a streaming pipeline that reads from one Kafka topic, transforms the data, and writes to another Kafka topic. This pattern is common for real-time filtering, enrichment, and routing.
Prerequisites
- Connectivity to Laminar backend
- Kafka cluster accessible
- Kafka topics created:
raw-events(source) andprocessed-events(sink)
For the rest of this example, we will use <laminar_backend> as a placeholder for your Laminar backend domain.
What You'll Build
- Source: Kafka topic containing raw user events
- Sink: Kafka topic for filtered and enriched events
- Pipeline: Filter bot traffic and add processing metadata
Via API
Step 1: Create Connection Profile
Create a Kafka connection profile with your cluster details:
curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
-H "Content-Type: application/json" \
-d '{
"name": "local-kafka",
"config": {
"bootstrap_servers": "localhost:9092",
"authentication": {"type": "none"}
}
}'Response:
{
"id": "prof_xxx",
"name": "local-kafka",
"connector": "kafka"
}For authenticated clusters (SASL):
curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
-H "Content-Type: application/json" \
-d '{
"name": "prod-kafka",
"config": {
"bootstrap_servers": "broker1:9092,broker2:9092",
"authentication": {
"type": "sasl",
"sasl_config": {
"mechanism": "SCRAM-SHA-256",
"username": "your-username",
"password": "your-password",
"protocol": "SASL_SSL"
}
}
}
}'Step 2: Create Source Table
Create a source table connected to the raw-events topic:
curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
-H "Content-Type: application/json" \
-d '{
"name": "raw_events",
"connection_profile_id": "prof_xxx",
"config": {
"topic": "raw-events",
"type": {
"source_config": {
"offset": "latest"
}
}
},
"schema": {
"format": {"json": {}},
"fields": [
{"field_name": "event_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "user_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
{"field_name": "event_type", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "user_agent", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
{"field_name": "ip_address", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
{"field_name": "timestamp", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
]
}
}'Response:
{
"id": "ct_xxx",
"name": "raw_events",
"connector": "kafka",
"tableType": "source"
}Step 3: Create Sink Table
Create a sink table for the processed-events topic:
curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
-H "Content-Type: application/json" \
-d '{
"name": "processed_events",
"connection_profile_id": "prof_xxx",
"config": {
"topic": "processed-events",
"type": {
"sink_config": {
"commit_mode": "at_least_once"
}
}
},
"schema": {
"format": {"json": {}},
"fields": [
{"field_name": "event_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "user_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
{"field_name": "event_type", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "ip_address", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": true},
{"field_name": "timestamp", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false},
{"field_name": "processed_at", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
]
}
}'Response:
{
"id": "ct_xxx",
"name": "processed_events",
"connector": "kafka",
"tableType": "sink"
}Step 4: Create Pipeline
Create a pipeline that filters out bot traffic and adds processing timestamp:
curl -X POST <laminar_backend>/api/v1/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "filter-bot-traffic",
"query": "INSERT INTO processed_events SELECT event_id, user_id, event_type, ip_address, timestamp, NOW() as processed_at FROM raw_events WHERE user_agent NOT LIKE '\''%bot%'\'' AND user_agent NOT LIKE '\''%crawler%'\''",
"parallelism": 4
}'Response:
{
"id": "pl_xxx",
"name": "filter-bot-traffic",
"query": "INSERT INTO processed_events SELECT ...",
"parallelism": 4,
"actionText": "Running"
}Step 5: Monitor Pipeline
# Check pipeline status
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}
# List jobs
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs
# Get job metrics
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groupsVia CLI
Step 1: Create YAML Manifest
Create a file called kafka-to-kafka.yaml:
---
apiVersion: laminar.io/v1
kind: Profile
spec:
name: local-kafka
connector: kafka
config:
bootstrap_servers: localhost:9092
authentication:
type: none
---
apiVersion: laminar.io/v1
kind: Table
spec:
name: raw_events
connector: kafka
connection_profile_id: local-kafka
config:
topic: raw-events
type:
source_config:
offset: latest
schema:
format:
json: {}
fields:
- field_name: event_id
field_type:
type:
primitive: Utf8
nullable: false
- field_name: user_id
field_type:
type:
primitive: Int64
nullable: false
- field_name: event_type
field_type:
type:
primitive: Utf8
nullable: false
- field_name: user_agent
field_type:
type:
primitive: Utf8
nullable: true
- field_name: ip_address
field_type:
type:
primitive: Utf8
nullable: true
- field_name: timestamp
field_type:
type:
primitive: DateTime
nullable: false
---
apiVersion: laminar.io/v1
kind: Table
spec:
name: processed_events
connector: kafka
connection_profile_id: local-kafka
config:
topic: processed-events
type:
sink_config:
commit_mode: at_least_once
schema:
format:
json: {}
fields:
- field_name: event_id
field_type:
type:
primitive: Utf8
nullable: false
- field_name: user_id
field_type:
type:
primitive: Int64
nullable: false
- field_name: event_type
field_type:
type:
primitive: Utf8
nullable: false
- field_name: ip_address
field_type:
type:
primitive: Utf8
nullable: true
- field_name: timestamp
field_type:
type:
primitive: DateTime
nullable: false
- field_name: processed_at
field_type:
type:
primitive: DateTime
nullable: false
---
apiVersion: laminar.io/v1
kind: Pipeline
spec:
name: filter-bot-traffic
query: |
INSERT INTO processed_events
SELECT
event_id,
user_id,
event_type,
ip_address,
timestamp,
NOW() as processed_at
FROM raw_events
WHERE user_agent NOT LIKE '%bot%'
AND user_agent NOT LIKE '%crawler%'
parallelism: 4Step 2: Apply Manifest
lmnr apply -f kafka-to-kafka.yamlOutput:
Parsed 4 resource(s) from kafka-to-kafka.yaml
[1/4] Applying Profile 'local-kafka'...
ā Created Profile 'local-kafka' (ID: prof_xxx)
[2/4] Applying Table 'raw_events'...
ā Created Table 'raw_events' (ID: ct_xxx)
[3/4] Applying Table 'processed_events'...
ā Created Table 'processed_events' (ID: ct_xxx)
[4/4] Applying Pipeline 'filter-bot-traffic'...
ā Created Pipeline 'filter-bot-traffic' (ID: pl_xxx)
All resources applied successfully
Step 3: Verify Resources
# List profiles
lmnr list profilesOutput:
āāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāā¬āāāāāāāāāāāā¬āāāāāāāāāā
ā NAME ā ID ā CONNECTOR ā CREATED ā
āāāāāāāāāāāāāāāŖāāāāāāāāāāāāāāāāŖāāāāāāāāāāāāŖāāāāāāāāāā”
ā local-kafka ā prof_xxx ā kafka ā 1m ā
āāāāāāāāāāāāāāā“āāāāāāāāāāāāāāāā“āāāāāāāāāāāā“āāāāāāāāāā
# List tables
lmnr list tablesOutput:
āāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāā¬āāāāāāāāāāāā¬āāāāāāāāāāāāāā¬āāāāāāāāāā
ā NAME ā ID ā CONNECTOR ā PROFILE ā CREATED ā
āāāāāāāāāāāāāāāāāāāāŖāāāāāāāāāāāāāāāāŖāāāāāāāāāāāāŖāāāāāāāāāāāāāāŖāāāāāāāāāā”
ā raw_events ā ct_xxx ā kafka ā local-kafka ā 1m ā
āāāāāāāāāāāāāāāāāāāā¼āāāāāāāāāāāāāāāā¼āāāāāāāāāāāā¼āāāāāāāāāāāāāā¼āāāāāāāāāā¤
ā processed_events ā ct_xxx ā kafka ā local-kafka ā 1m ā
āāāāāāāāāāāāāāāāāāāā“āāāāāāāāāāāāāāāā“āāāāāāāāāāāā“āāāāāāāāāāāāāā“āāāāāāāāāā
# Check pipeline status
lmnr list pipelinesVia UI
TBD
Cleanup
Via API
# Stop and delete pipeline
curl -X PATCH <laminar_backend>/api/v1/pipelines/{pipeline_id} \
-H "Content-Type: application/json" \
-d '{"stop": "graceful"}'
curl -X DELETE <laminar_backend>/api/v1/pipelines/{pipeline_id}
# Delete tables
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{source_table_id}
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{sink_table_id}
# Delete profile
curl -X DELETE <laminar_backend>/api/v1/connection_profiles/{profile_id}Via CLI
lmnr delete pipelines <pipeline_id>
lmnr delete tables <source_table_id>
lmnr delete tables <sink_table_id>
lmnr delete profiles <profile_id>Next Steps
- Kafka to Iceberg - Write streaming data to a data lakehouse
- Kafka Connector Reference - Full configuration options
- SQL Streaming Extensions - Windowing, joins, and more