Work in Progress: This page is under development. Use the feedback button on the bottom right to help us improve it.

API

The Laminar API is a REST-based HTTP server that provides programmatic access to all Laminar resources. Built with Axum (Rust web framework), it serves as the backend for both the Console UI and LMNR CLI.

Quick Start

# Health check
curl <laminar_backend>/api/v1/ping
 
# List pipelines
curl <laminar_backend>/api/v1/pipelines
 
# With authentication (if enabled)
curl -H "Authorization: Bearer <token>" <laminar_backend>/api/v1/pipelines

API Documentation

Interactive OpenAPI documentation is available at:

<laminar_backend>/api/v1/api-docs/openapi.json

Postman Collection

Download the Laminar API Postman Collection to explore and test the API endpoints.


Health & Config

MethodEndpointDescription
GET/api/v1/pingHealth check
GET/api/v1/configServer configuration

Pipelines

Pipelines are stream processing jobs defined by SQL queries.

Endpoints

MethodEndpointDescription
GET/api/v1/pipelinesList pipelines (paginated)
POST/api/v1/pipelinesCreate pipeline
GET/api/v1/pipelines/{id}Get pipeline details
PATCH/api/v1/pipelines/{id}Update pipeline
DELETE/api/v1/pipelines/{id}Delete pipeline
POST/api/v1/pipelines/{id}/restartRestart pipeline
GET/api/v1/pipelines/{id}/jobsList pipeline jobs
POST/api/v1/pipelines/previewCreate preview pipeline
POST/api/v1/pipelines/validate_queryValidate SQL query

Create Pipeline

Request Body (PipelinePost)

FieldTypeRequiredDescription
namestringYesPipeline name
querystringYesSQL query
parallelismintegerYesNumber of parallel workers
checkpoint_interval_microsintegerNoCheckpoint interval in microseconds
curl -X POST <laminar_backend>/api/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-pipeline",
    "query": "INSERT INTO sink SELECT * FROM source",
    "parallelism": 4,
    "checkpoint_interval_micros": 10000000
  }'

Response (Pipeline)

{
  "id": "pl_abc123",
  "name": "my-pipeline",
  "query": "INSERT INTO sink SELECT * FROM source",
  "checkpoint_interval_micros": 10000000,
  "stop": "none",
  "createdAt": 1699900000000,
  "actionText": "Running",
  "actionInProgress": false,
  "preview": false,
  "graph": {
    "nodes": [...],
    "edges": [...]
  }
}

Update Pipeline

Request Body (PipelinePatch)

FieldTypeDescription
parallelismintegerUpdate parallelism
checkpoint_interval_microsintegerUpdate checkpoint interval
stopStopTypeControl pipeline execution

StopType Values:

ValueDescription
noneStart/resume execution
gracefulStop after processing current records
immediateStop immediately
checkpointStop after next checkpoint
forceForce stop, ignoring errors
# Stop pipeline gracefully
curl -X PATCH <laminar_backend>/api/v1/pipelines/pl_abc123 \
  -H "Content-Type: application/json" \
  -d '{"stop": "graceful"}'
 
# Resume pipeline
curl -X PATCH <laminar_backend>/api/v1/pipelines/pl_abc123 \
  -H "Content-Type: application/json" \
  -d '{"stop": "none"}'
 
# Update parallelism
curl -X PATCH <laminar_backend>/api/v1/pipelines/pl_abc123 \
  -H "Content-Type: application/json" \
  -d '{"parallelism": 8}'

Validate Query

Validate SQL syntax and return the pipeline graph without creating a pipeline.

curl -X POST <laminar_backend>/api/v1/pipelines/validate_query \
  -H "Content-Type: application/json" \
  -d '{
    "query": "INSERT INTO sink SELECT * FROM source"
  }'

Jobs

Jobs are individual executions of pipelines.

Endpoints

MethodEndpointDescription
GET/api/v1/jobsList all jobs
GET/api/v1/pipelines/{id}/jobsList jobs for a pipeline
GET/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpointsGet job checkpoints
GET/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints/{epoch}/operator_checkpoint_groupsGet checkpoint operator details
GET/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/errorsGet job errors
GET/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groupsGet operator metrics
GET/api/v1/pipelines/{pipeline_id}/jobs/{job_id}/outputStream job output (SSE)

Job Object

FieldTypeDescription
idstringJob ID
runIdintegerRun number
statestringCurrent state
runningDesiredbooleanWhether running is desired
createdAtintegerCreation timestamp (micros)
startTimeintegerStart timestamp (micros)
finishTimeintegerFinish timestamp (micros)
tasksintegerNumber of tasks
failureMessagestringError message if failed

Job States: Created, Compiling, Running, Stopping, CheckpointStopping, Finishing, Failed, Finished

Get Job Output (SSE Stream)

Stream real-time output from a preview pipeline:

curl -N <laminar_backend>/api/v1/pipelines/pl_abc123/jobs/job_xyz/output

Connection Profiles

Connection profiles store connector configurations (credentials, endpoints).

Endpoints

MethodEndpointDescription
GET/api/v1/connection_profilesList all profiles
POST/api/v1/connection_profilesCreate profile (generic)
GET/api/v1/connection_profiles/{id}Get profile by ID
DELETE/api/v1/connection_profiles/{id}Delete profile
POST/api/v1/connection_profiles/testTest connection
GET/api/v1/connection_profiles/{id}/autocompleteGet autocomplete suggestions

Connector-Specific Profile Endpoints

MethodEndpointConnector
POST/api/v1/connectors/kafka/profilesKafka
POST/api/v1/connectors/confluent/profilesConfluent Cloud
POST/api/v1/connectors/iceberg/profilesIceberg

Create Kafka Profile

Request Body

FieldTypeRequiredDescription
namestringYesProfile name
config.bootstrap_serversstring or arrayYesKafka broker addresses
config.authenticationobjectYesAuthentication config
config.schema_registryobjectNoSchema registry config

Authentication Types:

  • none - No authentication
  • sasl - SASL authentication (username/password)
  • aws_msk_iam - AWS MSK IAM authentication
# No authentication
curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
  -H "Content-Type: application/json" \
  -d '{
    "name": "local-kafka",
    "config": {
      "bootstrap_servers": "localhost:9092",
      "authentication": {"type": "none"}
    }
  }'
 
# SASL authentication
curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
  -H "Content-Type: application/json" \
  -d '{
    "name": "prod-kafka",
    "config": {
      "bootstrap_servers": ["broker1:9092", "broker2:9092"],
      "authentication": {
        "type": "sasl",
        "sasl_config": {
          "mechanism": "PLAIN",
          "username": "user",
          "password": "pass",
          "protocol": "SASL_SSL"
        }
      },
      "schema_registry": {
        "endpoint": "https://schema-registry:8081",
        "api_key": "key",
        "api_secret": "secret"
      }
    }
  }'

Create Iceberg Profile

Catalog Types:

  • rest - Iceberg REST catalog
  • glue - AWS Glue catalog
# REST catalog
curl -X POST <laminar_backend>/api/v1/connectors/iceberg/profiles \
  -H "Content-Type: application/json" \
  -d '{
    "name": "iceberg-rest",
    "config": {
      "catalog": {
        "type": "rest",
        "rest_catalog_config": {
          "uri": "http://localhost:8181"
        }
      }
    }
  }'
 
# AWS Glue catalog
curl -X POST <laminar_backend>/api/v1/connectors/iceberg/profiles \
  -H "Content-Type: application/json" \
  -d '{
    "name": "iceberg-glue",
    "config": {
      "catalog": {
        "type": "glue",
        "glue_catalog_config": {
          "database_name": "my_database",
          "region": "us-east-1"
        }
      }
    }
  }'

Connection Tables

Connection tables define source and sink tables linked to profiles.

Endpoints

MethodEndpointDescription
GET/api/v1/connection_tablesList tables (paginated)
POST/api/v1/connection_tablesCreate table (generic)
GET/api/v1/connection_tables/{id}Get table by ID
DELETE/api/v1/connection_tables/{id}Delete table
POST/api/v1/connection_tables/testTest table (SSE stream)
POST/api/v1/connection_tables/schemas/testValidate schema

Connector-Specific Table Endpoints

MethodEndpointConnector
POST/api/v1/connectors/kafka/tablesKafka
POST/api/v1/connectors/confluent/tablesConfluent Cloud
POST/api/v1/connectors/iceberg/tablesIceberg
POST/api/v1/connectors/kinesis/tablesAWS Kinesis
POST/api/v1/connectors/delta/tablesDelta Lake
POST/api/v1/connectors/filesystem/tablesFileSystem (S3, GCS, etc.)
POST/api/v1/connectors/single_file/tablesSingle File
POST/api/v1/connectors/mock/tablesMock (testing)
POST/api/v1/connectors/stdout/tablesStdOut (debugging)
POST/api/v1/connectors/preview/tablesPreview

Create Kafka Table

Request Body

FieldTypeRequiredDescription
namestringYesTable name
connection_profile_idstringNoProfile ID to use
config.topicstringYesKafka topic
config.typeobjectYesSource or sink config
schemaobjectNoSchema definition

Table Types:

  • source_config - For reading from Kafka
  • sink_config - For writing to Kafka

Format Types:

  • json - JSON format
  • avro - Avro with schema registry
  • protobuf - Protocol Buffers
  • parquet - Parquet format
  • raw_string - Raw string
  • raw_bytes - Raw bytes
# Kafka source table
curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
  -H "Content-Type: application/json" \
  -d '{
    "name": "orders",
    "connection_profile_id": "prof_abc123",
    "config": {
      "topic": "orders",
      "type": {
        "source_config": {
          "offset": "latest"
        }
      }
    },
    "schema": {
      "format": {"json": {}},
      "fields": [
        {"field_name": "id", "field_type": {"primitive": "Int64"}, "nullable": false},
        {"field_name": "amount", "field_type": {"primitive": "Float64"}, "nullable": false},
        {"field_name": "customer", "field_type": {"primitive": "Utf8"}, "nullable": true}
      ]
    }
  }'
 
# Kafka sink table
curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
  -H "Content-Type: application/json" \
  -d '{
    "name": "processed-orders",
    "connection_profile_id": "prof_abc123",
    "config": {
      "topic": "processed-orders",
      "type": {
        "sink_config": {}
      }
    },
    "schema": {
      "format": {"json": {}}
    }
  }'

Create Iceberg Table

curl -X POST <laminar_backend>/api/v1/connectors/iceberg/tables \
  -H "Content-Type: application/json" \
  -d '{
    "name": "events_iceberg",
    "connection_profile_id": "prof_iceberg123",
    "config": {
      "database": "analytics",
      "table": "events",
      "type": {
        "sink_config": {
          "commit_interval_micros": 60000000
        }
      }
    }
  }'

Connectors

List available connector types.

MethodEndpointDescription
GET/api/v1/connectorsList all connectors
curl <laminar_backend>/api/v1/connectors

Pagination

List endpoints support cursor-based pagination:

ParameterTypeDescription
starting_afterstringCursor for next page (resource ID)
limitintegerMaximum items to return (default: 100)
# First page
curl "<laminar_backend>/api/v1/pipelines?limit=10"
 
# Next page (using last item's ID)
curl "<laminar_backend>/api/v1/pipelines?limit=10&starting_after=pl_abc123"

Response Format:

{
  "data": [...],
  "has_more": true
}

Error Handling

Errors return appropriate HTTP status codes with JSON body:

{
  "error": "Pipeline not found: pl_abc123"
}
StatusMeaning
200Success
400Bad request / validation error
404Resource not found
500Internal server error

SDK Examples

Python

import requests
 
API_URL = "<laminar_backend>/api/v1"
 
# List pipelines
response = requests.get(f"{API_URL}/pipelines")
pipelines = response.json()["data"]
 
# Create pipeline
pipeline = requests.post(f"{API_URL}/pipelines", json={
    "name": "my-pipeline",
    "query": "INSERT INTO sink SELECT * FROM source",
    "parallelism": 4
}).json()
 
# Stop pipeline gracefully
requests.patch(f"{API_URL}/pipelines/{pipeline['id']}", json={
    "stop": "graceful"
})
 
# Get job status
jobs = requests.get(f"{API_URL}/pipelines/{pipeline['id']}/jobs").json()["data"]

TypeScript

const API_URL = "<laminar_backend>/api/v1";
 
// List pipelines
const pipelines = await fetch(`${API_URL}/pipelines`)
  .then(r => r.json())
  .then(d => d.data);
 
// Create pipeline
const pipeline = await fetch(`${API_URL}/pipelines`, {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({
    name: "my-pipeline",
    query: "INSERT INTO sink SELECT * FROM source",
    parallelism: 4
  })
}).then(r => r.json());
 
// Stop pipeline
await fetch(`${API_URL}/pipelines/${pipeline.id}`, {
  method: "PATCH",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ stop: "graceful" })
});

Complete Workflow (cURL)

# 1. Create a Kafka profile
PROFILE=$(curl -s -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
  -H "Content-Type: application/json" \
  -d '{
    "name": "local-kafka",
    "config": {
      "bootstrap_servers": "localhost:9092",
      "authentication": {"type": "none"}
    }
  }')
PROFILE_ID=$(echo $PROFILE | jq -r '.id')
 
# 2. Create source table
curl -s -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
  -H "Content-Type: application/json" \
  -d "{
    \"name\": \"orders\",
    \"connection_profile_id\": \"$PROFILE_ID\",
    \"config\": {
      \"topic\": \"orders\",
      \"type\": {\"source_config\": {\"offset\": \"latest\"}}
    },
    \"schema\": {
      \"format\": {\"json\": {}},
      \"fields\": [
        {\"field_name\": \"id\", \"field_type\": {\"primitive\": \"Int64\"}, \"nullable\": false},
        {\"field_name\": \"amount\", \"field_type\": {\"primitive\": \"Float64\"}, \"nullable\": false}
      ]
    }
  }"
 
# 3. Create sink table
curl -s -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
  -H "Content-Type: application/json" \
  -d "{
    \"name\": \"large-orders\",
    \"connection_profile_id\": \"$PROFILE_ID\",
    \"config\": {
      \"topic\": \"large-orders\",
      \"type\": {\"sink_config\": {}}
    },
    \"schema\": {\"format\": {\"json\": {}}}
  }"
 
# 4. Create pipeline
PIPELINE=$(curl -s -X POST <laminar_backend>/api/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "filter-large-orders",
    "query": "INSERT INTO `large-orders` SELECT * FROM orders WHERE amount > 100",
    "parallelism": 4
  }')
PIPELINE_ID=$(echo $PIPELINE | jq -r '.id')
 
# 5. Check status
curl -s <laminar_backend>/api/v1/pipelines/$PIPELINE_ID | jq
 
# 6. List jobs
curl -s <laminar_backend>/api/v1/pipelines/$PIPELINE_ID/jobs | jq
 
# 7. Stop pipeline
curl -s -X PATCH <laminar_backend>/api/v1/pipelines/$PIPELINE_ID \
  -H "Content-Type: application/json" \
  -d '{"stop": "graceful"}'