Work in Progress: This page is under development. Use the feedback button on the bottom right to help us improve it.

Mock to Preview

Build your first Laminar pipeline using mock data and preview output. This example requires no external dependencies - perfect for learning the fundamentals.

Prerequisites

  • Connectivity to Laminar backend

For the rest of this example, we will use <laminar_backend> as a placeholder for your Laminar backend domain.

What You'll Build

  • Source: Mock connector generating simulated e-commerce order events
  • Sink: Preview connector for real-time output viewing
  • Pipeline: Filter high-value orders and calculate totals

Via API

Step 1: Create Source Table

Create a mock source that generates order events at 10 records per second:

curl -X POST <laminar_backend>/api/v1/connectors/mock/tables \
  -H "Content-Type: application/json" \
  -d '{
    "name": "orders",
    "config": {
      "source_type": "schema_driven",
      "schema_driven": {
        "name": "orders",
        "fields": [
          {
            "name": "order_id",
            "type": "string",
            "generator": {"generator_type": "uuid"}
          },
          {
            "name": "customer_id",
            "type": "int64",
            "generator": {"generator_type": "range", "min": 1000, "max": 9999}
          },
          {
            "name": "product",
            "type": "string",
            "generator": {"generator_type": "enum", "values": ["laptop", "phone", "tablet", "headphones", "monitor"]}
          },
          {
            "name": "quantity",
            "type": "int32",
            "generator": {"generator_type": "range", "min": 1, "max": 10}
          },
          {
            "name": "unit_price",
            "type": "float64",
            "generator": {"generator_type": "range", "min": 10.0, "max": 1500.0}
          },
          {
            "name": "order_time",
            "type": "timestamp",
            "generator": {"generator_type": "datetime"}
          }
        ],
        "generation": {
          "mode": "streaming",
          "rate": 10.0
        }
      }
    },
    "schema": {
      "format": {"json": {}},
      "fields": [
        {"field_name": "order_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "customer_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
        {"field_name": "product", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "quantity", "field_type": {"type": {"primitive": "Int32"}}, "nullable": false},
        {"field_name": "unit_price", "field_type": {"type": {"primitive": "F64"}}, "nullable": false},
        {"field_name": "order_time", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
      ]
    }
  }'

Response:

{
  "id": "ct_xxx",
  "name": "orders",
  "connector": "mock",
  "tableType": "source"
}

Step 2: Create Sink Table

Create a preview sink to view the pipeline output:

curl -X POST <laminar_backend>/api/v1/connectors/preview/tables \
  -H "Content-Type: application/json" \
  -d '{
    "name": "high_value_orders",
    "config": {},
    "schema": {
      "format": {"json": {}},
      "fields": [
        {"field_name": "order_id", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "customer_id", "field_type": {"type": {"primitive": "Int64"}}, "nullable": false},
        {"field_name": "product", "field_type": {"type": {"primitive": "Utf8"}}, "nullable": false},
        {"field_name": "quantity", "field_type": {"type": {"primitive": "Int32"}}, "nullable": false},
        {"field_name": "total_amount", "field_type": {"type": {"primitive": "F64"}}, "nullable": false},
        {"field_name": "order_time", "field_type": {"type": {"primitive": "DateTime"}}, "nullable": false}
      ]
    }
  }'

Response:

{
  "id": "ct_xxx",
  "name": "high_value_orders",
  "connector": "preview",
  "tableType": "sink"
}

Step 3: Create Pipeline

Create a pipeline that filters orders with total > $100 and calculates the total amount:

curl -X POST <laminar_backend>/api/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "high-value-order-filter",
    "query": "INSERT INTO high_value_orders SELECT order_id, customer_id, product, quantity, (quantity * unit_price) as total_amount, order_time FROM orders WHERE (quantity * unit_price) > 100",
    "parallelism": 1
  }'

Response:

{
  "id": "pl_xxx",
  "name": "high-value-order-filter",
  "query": "INSERT INTO high_value_orders SELECT ...",
  "parallelism": 1,
  "actionText": "Running",
  "graph": {
    "nodes": [...],
    "edges": [...]
  }
}

Step 4: View Pipeline Output

Get the pipeline ID and job ID, then stream the output:

# List jobs for the pipeline
curl <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs
 
# Stream live output (SSE)
curl -N <laminar_backend>/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/output

Sample output:

{"order_id":"a1b2c3d4-...","customer_id":5432,"product":"laptop","quantity":2,"total_amount":2150.50,"order_time":"2024-01-15T10:30:00Z"}
{"order_id":"e5f6g7h8-...","customer_id":7890,"product":"monitor","quantity":3,"total_amount":450.75,"order_time":"2024-01-15T10:30:01Z"}

Via CLI

Step 1: Create YAML Manifest

Create a file called mock-to-preview.yaml:

---
apiVersion: laminar.io/v1
kind: Table
spec:
  name: orders
  connector: mock
  config:
    source_type: schema_driven
    schema_driven:
      name: orders
      fields:
        - name: order_id
          type: string
          generator:
            generator_type: uuid
        - name: customer_id
          type: int64
          generator:
            generator_type: range
            min: 1000
            max: 9999
        - name: product
          type: string
          generator:
            generator_type: enum
            values:
              - laptop
              - phone
              - tablet
              - headphones
              - monitor
        - name: quantity
          type: int32
          generator:
            generator_type: range
            min: 1
            max: 10
        - name: unit_price
          type: float64
          generator:
            generator_type: range
            min: 10.0
            max: 1500.0
        - name: order_time
          type: timestamp
          generator:
            generator_type: datetime
      generation:
        mode: streaming
        rate: 10.0
  schema:
    format:
      json: {}
    fields:
      - field_name: order_id
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: customer_id
        field_type:
          type:
            primitive: Int64
        nullable: false
      - field_name: product
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: quantity
        field_type:
          type:
            primitive: Int32
        nullable: false
      - field_name: unit_price
        field_type:
          type:
            primitive: F64
        nullable: false
      - field_name: order_time
        field_type:
          type:
            primitive: DateTime
        nullable: false
 
---
apiVersion: laminar.io/v1
kind: Table
spec:
  name: high_value_orders
  connector: preview
  config: {}
  schema:
    format:
      json: {}
    fields:
      - field_name: order_id
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: customer_id
        field_type:
          type:
            primitive: Int64
        nullable: false
      - field_name: product
        field_type:
          type:
            primitive: Utf8
        nullable: false
      - field_name: quantity
        field_type:
          type:
            primitive: Int32
        nullable: false
      - field_name: total_amount
        field_type:
          type:
            primitive: F64
        nullable: false
      - field_name: order_time
        field_type:
          type:
            primitive: DateTime
        nullable: false
 
---
apiVersion: laminar.io/v1
kind: Pipeline
spec:
  name: high-value-order-filter
  query: |
    INSERT INTO high_value_orders
    SELECT
      order_id,
      customer_id,
      product,
      quantity,
      (quantity * unit_price) as total_amount,
      order_time
    FROM orders
    WHERE (quantity * unit_price) > 100
  parallelism: 1

Step 2: Apply Manifest

lmnr apply -f mock-to-preview.yaml

Output:

Parsed 3 resource(s) from mock-to-preview.yaml

[1/3] Applying Table 'orders'...
  βœ“ Created Table 'orders' (ID: ct_xxx)

[2/3] Applying Table 'high_value_orders'...
  βœ“ Created Table 'high_value_orders' (ID: ct_xxx)

[3/3] Applying Pipeline 'high-value-order-filter'...
  βœ“ Created Pipeline 'high-value-order-filter' (ID: pl_xxx)

All resources applied successfully

Step 3: Verify Resources

# List tables
lmnr list tables

Output:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ NAME              ┆ ID            ┆ CONNECTOR ┆ PROFILE ┆ CREATED β”‚
β•žβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•ͺ═══════════════β•ͺ═══════════β•ͺ═════════β•ͺ═════════║
β”‚ orders            ┆ ct_xxx        ┆ mock      ┆ -       ┆ 1m      β”‚
β”œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ”Όβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ”Όβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ”Όβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ”Όβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ”€
β”‚ high_value_orders ┆ ct_xxx        ┆ preview   ┆ -       ┆ 1m      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
# List pipelines
lmnr list pipelines

Output:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ NAME                   ┆ ID            ┆ PARALLELISM ┆ CHECKPOINT ┆ STATUS  β”‚
β•žβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•ͺ═══════════════β•ͺ═════════════β•ͺ════════════β•ͺ═════════║
β”‚ high-value-order-filter┆ pl_xxx        ┆ 1           ┆ 1s         ┆ Running β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
# Get pipeline details
lmnr get pipelines <pipeline_id>

Via UI

TBD


Cleanup

Via API

# Delete pipeline first (requires tables to exist)
curl -X DELETE <laminar_backend>/api/v1/pipelines/{pipeline_id}
 
# Delete tables
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{source_table_id}
curl -X DELETE <laminar_backend>/api/v1/connection_tables/{sink_table_id}

Via CLI

# Delete all resources
lmnr delete pipelines <pipeline_id>
lmnr delete tables <source_table_id>
lmnr delete tables <sink_table_id>
 
# Or use cleanup to remove everything
lmnr cleanup

Next Steps