Mock to Preview
Build your first Laminar pipeline using mock data and preview output. This example requires no external dependencies - perfect for learning the fundamentals.
Prerequisites
- Connectivity to Laminar backend
For the rest of this example, we will use <laminar_backend> as a placeholder for your Laminar backend domain.
What You'll Build
- Source: Mock connector generating simulated e-commerce order events
- Sink: Preview connector for real-time output viewing
- Pipeline: Filter high-value orders and calculate totals
Via API
Step 1: Create Source Table
Create a mock source that generates order events at 10 records per second:
curl -X POST <laminar_backend>/api/v1/connectors/mock/tables \
-H "Content-Type: application/json" \
-d '{
"name": "orders",
"config": {
"source_type": "schema_driven",
"schema_driven": {
"name": "orders",
"fields": [
{
"name": "order_id",
"type": "string",
"generator": {"generator_type": "uuid"}
},
{
"name": "customer_id",
"type": "int64",
"generator": {"generator_type": "range", "min": 1000, "max": 9999}
},
{
"name": "product",
"type": "string",
"generator": {"generator_type": "enum", "values": ["laptop", "phone", "tablet", "headphones", "monitor"]}
},
{
"name": "quantity",
"type": "int32",
"generator": {"generator_type": "range", "min": 1, "max": 10}
},
{
"name": "unit_price",
"type": "float64",
"generator": {"generator_type": "range", "min": 10.0, "max": 1500.0}
},
{
"name": "order_time",
"type": "timestamp",
"generator": {"generator_type": "datetime"}
}
],
"generation": {
"mode": "streaming",
"rate": 10.0
}
}
},
"schema": {
"format": {"json": {}},
"fields": [
{"field_name": "order_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "customer_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
{"field_name": "product", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "quantity", "field_type": {"type": {"primitive": "Int32"}}, "nullable": false},
{"field_name": "unit_price", "field_type": {"type": {"primitive": "F64"}}, "nullable": false},
{"field_name": "order_time", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
]
}
}'Response:
{
"id": "ct_xxx",
"name": "orders",
"connector": "mock",
"tableType": "source"
}Step 2: Create Sink Table
Create a preview sink to view the pipeline output:
curl -X POST <laminar_backend>/api/v1/connectors/preview/tables \
-H "Content-Type: application/json" \
-d '{
"name": "high_value_orders",
"config": {},
"schema": {
"format": {"json": {}},
"fields": [
{"field_name": "order_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "customer_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
{"field_name": "product", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
{"field_name": "quantity", "field_type": {"type": {"primitive": "Int32"}}, "nullable": false},
{"field_name": "total_amount", "field_type": {"type": {"primitive": "F64"}}, "nullable": false},
{"field_name": "order_time", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
]
}
}'Response:
{
"id": "ct_xxx",
"name": "high_value_orders",
"connector": "preview",
"tableType": "sink"
}Step 3: Create Pipeline
Create a pipeline that filters orders with total > $100 and calculates the total amount:
curl -X POST <laminar_backend>/api/v1/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "high-value-order-filter",
"query": "INSERT INTO high_value_orders SELECT order_id, customer_id, product, quantity, (quantity * unit_price) as total_amount, order_time FROM orders WHERE (quantity * unit_price) > 100",
"parallelism": 1
}'Response:
{
"id": "pl_xxx",
"name": "high-value-order-filter",
"query": "INSERT INTO high_value_orders SELECT ...",
"parallelism": 1,
"actionText": "Running",
"graph": {
"nodes": [...],
"edges": [...]
}
}Step 4: View Pipeline Output
Get the pipeline ID and job ID, then stream the output:
# List jobs for the pipeline
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs
# Stream live output (SSE)
curl -N <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/outputSample output:
{"order_id":"a1b2c3d4-...","customer_id":5432,"product":"laptop","quantity":2,"total_amount":2150.50,"order_time":"2024-01-15T10:30:00Z"}
{"order_id":"e5f6g7h8-...","customer_id":7890,"product":"monitor","quantity":3,"total_amount":450.75,"order_time":"2024-01-15T10:30:01Z"}Via CLI
Step 1: Create YAML Manifest
Create a file called mock-to-preview.yaml:
---
apiVersion: laminar.io/v1
kind: Table
spec:
name: orders
connector: mock
config:
source_type: schema_driven
schema_driven:
name: orders
fields:
- name: order_id
type: string
generator:
generator_type: uuid
- name: customer_id
type: int64
generator:
generator_type: range
min: 1000
max: 9999
- name: product
type: string
generator:
generator_type: enum
values:
- laptop
- phone
- tablet
- headphones
- monitor
- name: quantity
type: int32
generator:
generator_type: range
min: 1
max: 10
- name: unit_price
type: float64
generator:
generator_type: range
min: 10.0
max: 1500.0
- name: order_time
type: timestamp
generator:
generator_type: datetime
generation:
mode: streaming
rate: 10.0
schema:
format:
json: {}
fields:
- field_name: order_id
field_type:
type:
primitive: Utf8
nullable: false
- field_name: customer_id
field_type:
type:
primitive: Int64
nullable: false
- field_name: product
field_type:
type:
primitive: Utf8
nullable: false
- field_name: quantity
field_type:
type:
primitive: Int32
nullable: false
- field_name: unit_price
field_type:
type:
primitive: F64
nullable: false
- field_name: order_time
field_type:
type:
primitive: DateTime
nullable: false
---
apiVersion: laminar.io/v1
kind: Table
spec:
name: high_value_orders
connector: preview
config: {}
schema:
format:
json: {}
fields:
- field_name: order_id
field_type:
type:
primitive: Utf8
nullable: false
- field_name: customer_id
field_type:
type:
primitive: Int64
nullable: false
- field_name: product
field_type:
type:
primitive: Utf8
nullable: false
- field_name: quantity
field_type:
type:
primitive: Int32
nullable: false
- field_name: total_amount
field_type:
type:
primitive: F64
nullable: false
- field_name: order_time
field_type:
type:
primitive: DateTime
nullable: false
---
apiVersion: laminar.io/v1
kind: Pipeline
spec:
name: high-value-order-filter
query: |
INSERT INTO high_value_orders
SELECT
order_id,
customer_id,
product,
quantity,
(quantity * unit_price) as total_amount,
order_time
FROM orders
WHERE (quantity * unit_price) > 100
parallelism: 1Step 2: Apply Manifest
lmnr apply -f mock-to-preview.yamlOutput:
Parsed 3 resource(s) from mock-to-preview.yaml
[1/3] Applying Table 'orders'...
β Created Table 'orders' (ID: ct_xxx)
[2/3] Applying Table 'high_value_orders'...
β Created Table 'high_value_orders' (ID: ct_xxx)
[3/3] Applying Pipeline 'high-value-order-filter'...
β Created Pipeline 'high-value-order-filter' (ID: pl_xxx)
All resources applied successfully
Step 3: Verify Resources
# List tables
lmnr list tablesOutput:
βββββββββββββββββββββ¬ββββββββββββββββ¬ββββββββββββ¬ββββββββββ¬ββββββββββ
β NAME β ID β CONNECTOR β PROFILE β CREATED β
βββββββββββββββββββββͺββββββββββββββββͺββββββββββββͺββββββββββͺββββββββββ‘
β orders β ct_xxx β mock β - β 1m β
βββββββββββββββββββββΌββββββββββββββββΌββββββββββββΌββββββββββΌββββββββββ€
β high_value_orders β ct_xxx β preview β - β 1m β
βββββββββββββββββββββ΄ββββββββββββββββ΄ββββββββββββ΄ββββββββββ΄ββββββββββ
# List pipelines
lmnr list pipelinesOutput:
ββββββββββββββββββββββββββ¬ββββββββββββββββ¬ββββββββββββββ¬βββββββββββββ¬ββββββββββ
β NAME β ID β PARALLELISM β CHECKPOINT β STATUS β
ββββββββββββββββββββββββββͺββββββββββββββββͺββββββββββββββͺβββββββββββββͺββββββββββ‘
β high-value-order-filterβ pl_xxx β 1 β 1s β Running β
ββββββββββββββββββββββββββ΄ββββββββββββββββ΄ββββββββββββββ΄βββββββββββββ΄ββββββββββ
# Get pipeline details
lmnr get pipelines <pipeline_id>Via UI
TBD
Cleanup
Via API
# Delete pipeline first (requires tables to exist)
curl -X DELETE <laminar_backend>/api/v1/pipelines/{pipeline_id}
# Delete tables
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{source_table_id}
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{sink_table_id}Via CLI
# Delete all resources
lmnr delete pipelines <pipeline_id>
lmnr delete tables <source_table_id>
lmnr delete tables <sink_table_id>
# Or use cleanup to remove everything
lmnr cleanupNext Steps
- Kafka to Kafka - Connect to real Kafka clusters
- Core Concepts - Understand pipelines, tables, and profiles
- SQL Reference - Learn streaming SQL extensions