API
The Laminar API is a REST-based HTTP server that provides programmatic access to all Laminar resources. Built with Axum (Rust web framework), it serves as the backend for both the Console UI and LMNR CLI.
Quick Start
# Health check
curl <laminar_backend>/api/v1/ping
# List pipelines
curl <laminar_backend>/api/v1/pipelines
# With authentication (if enabled)
curl -H "Authorization: Bearer <token>" <laminar_backend>/api/v1/pipelinesAPI Documentation
Interactive OpenAPI documentation is available at:
<laminar_backend>/api/v1/api-docs/openapi.json
Postman Collection
Download the Laminar API Postman Collection to explore and test the API endpoints.
Health & Config
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/ping | Health check |
| GET | /api/v1/config | Server configuration |
Pipelines
Pipelines are stream processing jobs defined by SQL queries.
Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/pipelines | List pipelines (paginated) |
| POST | /api/v1/pipelines | Create pipeline |
| GET | /api/v1/pipelines/{id} | Get pipeline details |
| PATCH | /api/v1/pipelines/{id} | Update pipeline |
| DELETE | /api/v1/pipelines/{id} | Delete pipeline |
| POST | /api/v1/pipelines/{id}/restart | Restart pipeline |
| GET | /api/v1/pipelines/{id}/jobs | List pipeline jobs |
| POST | /api/v1/pipelines/preview | Create preview pipeline |
| POST | /api/v1/pipelines/validate_query | Validate SQL query |
Create Pipeline
Request Body (PipelinePost)
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Pipeline name |
query | string | Yes | SQL query |
parallelism | integer | Yes | Number of parallel workers |
checkpoint_interval_micros | integer | No | Checkpoint interval in microseconds |
curl -X POST <laminar_backend>/api/v1/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "my-pipeline",
"query": "INSERT INTO sink SELECT * FROM source",
"parallelism": 4,
"checkpoint_interval_micros": 10000000
}'Response (Pipeline)
{
"id": "pl_abc123",
"name": "my-pipeline",
"query": "INSERT INTO sink SELECT * FROM source",
"checkpoint_interval_micros": 10000000,
"stop": "none",
"createdAt": 1699900000000,
"actionText": "Running",
"actionInProgress": false,
"preview": false,
"graph": {
"nodes": [...],
"edges": [...]
}
}Update Pipeline
Request Body (PipelinePatch)
| Field | Type | Description |
|---|---|---|
parallelism | integer | Update parallelism |
checkpoint_interval_micros | integer | Update checkpoint interval |
stop | StopType | Control pipeline execution |
StopType Values:
| Value | Description |
|---|---|
none | Start/resume execution |
graceful | Stop after processing current records |
immediate | Stop immediately |
checkpoint | Stop after next checkpoint |
force | Force stop, ignoring errors |
# Stop pipeline gracefully
curl -X PATCH <laminar_backend>/api/v1/pipelines/pl_abc123 \
-H "Content-Type: application/json" \
-d '{"stop": "graceful"}'
# Resume pipeline
curl -X PATCH <laminar_backend>/api/v1/pipelines/pl_abc123 \
-H "Content-Type: application/json" \
-d '{"stop": "none"}'
# Update parallelism
curl -X PATCH <laminar_backend>/api/v1/pipelines/pl_abc123 \
-H "Content-Type: application/json" \
-d '{"parallelism": 8}'Validate Query
Validate SQL syntax and return the pipeline graph without creating a pipeline.
curl -X POST <laminar_backend>/api/v1/pipelines/validate_query \
-H "Content-Type: application/json" \
-d '{
"query": "INSERT INTO sink SELECT * FROM source"
}'Jobs
Jobs are individual executions of pipelines.
Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/jobs | List all jobs |
| GET | /api/v1/pipelines/{id}/jobs | List jobs for a pipeline |
| GET | /api/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints | Get job checkpoints |
| GET | /api/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints/{epoch}/operator_checkpoint_groups | Get checkpoint operator details |
| GET | /api/v1/pipelines/{pipeline_id}/jobs/{job_id}/errors | Get job errors |
| GET | /api/v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups | Get operator metrics |
| GET | /api/v1/pipelines/{pipeline_id}/jobs/{job_id}/output | Stream job output (SSE) |
Job Object
| Field | Type | Description |
|---|---|---|
id | string | Job ID |
runId | integer | Run number |
state | string | Current state |
runningDesired | boolean | Whether running is desired |
createdAt | integer | Creation timestamp (micros) |
startTime | integer | Start timestamp (micros) |
finishTime | integer | Finish timestamp (micros) |
tasks | integer | Number of tasks |
failureMessage | string | Error message if failed |
Job States: Created, Compiling, Running, Stopping, CheckpointStopping, Finishing, Failed, Finished
Get Job Output (SSE Stream)
Stream real-time output from a preview pipeline:
curl -N <laminar_backend>/api/v1/pipelines/pl_abc123/jobs/job_xyz/outputConnection Profiles
Connection profiles store connector configurations (credentials, endpoints).
Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/connection_profiles | List all profiles |
| POST | /api/v1/connection_profiles | Create profile (generic) |
| GET | /api/v1/connection_profiles/{id} | Get profile by ID |
| DELETE | /api/v1/connection_profiles/{id} | Delete profile |
| POST | /api/v1/connection_profiles/test | Test connection |
| GET | /api/v1/connection_profiles/{id}/autocomplete | Get autocomplete suggestions |
Connector-Specific Profile Endpoints
| Method | Endpoint | Connector |
|---|---|---|
| POST | /api/v1/connectors/kafka/profiles | Kafka |
| POST | /api/v1/connectors/confluent/profiles | Confluent Cloud |
| POST | /api/v1/connectors/iceberg/profiles | Iceberg |
Create Kafka Profile
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Profile name |
config.bootstrap_servers | string or array | Yes | Kafka broker addresses |
config.authentication | object | Yes | Authentication config |
config.schema_registry | object | No | Schema registry config |
Authentication Types:
none- No authenticationsasl- SASL authentication (username/password)aws_msk_iam- AWS MSK IAM authentication
# No authentication
curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
-H "Content-Type: application/json" \
-d '{
"name": "local-kafka",
"config": {
"bootstrap_servers": "localhost:9092",
"authentication": {"type": "none"}
}
}'
# SASL authentication
curl -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
-H "Content-Type: application/json" \
-d '{
"name": "prod-kafka",
"config": {
"bootstrap_servers": ["broker1:9092", "broker2:9092"],
"authentication": {
"type": "sasl",
"sasl_config": {
"mechanism": "PLAIN",
"username": "user",
"password": "pass",
"protocol": "SASL_SSL"
}
},
"schema_registry": {
"endpoint": "https://schema-registry:8081",
"api_key": "key",
"api_secret": "secret"
}
}
}'Create Iceberg Profile
Catalog Types:
rest- Iceberg REST catalogglue- AWS Glue catalog
# REST catalog
curl -X POST <laminar_backend>/api/v1/connectors/iceberg/profiles \
-H "Content-Type: application/json" \
-d '{
"name": "iceberg-rest",
"config": {
"catalog": {
"type": "rest",
"rest_catalog_config": {
"uri": "http://localhost:8181"
}
}
}
}'
# AWS Glue catalog
curl -X POST <laminar_backend>/api/v1/connectors/iceberg/profiles \
-H "Content-Type: application/json" \
-d '{
"name": "iceberg-glue",
"config": {
"catalog": {
"type": "glue",
"glue_catalog_config": {
"database_name": "my_database",
"region": "us-east-1"
}
}
}
}'Connection Tables
Connection tables define source and sink tables linked to profiles.
Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/connection_tables | List tables (paginated) |
| POST | /api/v1/connection_tables | Create table (generic) |
| GET | /api/v1/connection_tables/{id} | Get table by ID |
| DELETE | /api/v1/connection_tables/{id} | Delete table |
| POST | /api/v1/connection_tables/test | Test table (SSE stream) |
| POST | /api/v1/connection_tables/schemas/test | Validate schema |
Connector-Specific Table Endpoints
| Method | Endpoint | Connector |
|---|---|---|
| POST | /api/v1/connectors/kafka/tables | Kafka |
| POST | /api/v1/connectors/confluent/tables | Confluent Cloud |
| POST | /api/v1/connectors/iceberg/tables | Iceberg |
| POST | /api/v1/connectors/kinesis/tables | AWS Kinesis |
| POST | /api/v1/connectors/delta/tables | Delta Lake |
| POST | /api/v1/connectors/filesystem/tables | FileSystem (S3, GCS, etc.) |
| POST | /api/v1/connectors/single_file/tables | Single File |
| POST | /api/v1/connectors/mock/tables | Mock (testing) |
| POST | /api/v1/connectors/stdout/tables | StdOut (debugging) |
| POST | /api/v1/connectors/preview/tables | Preview |
Create Kafka Table
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Table name |
connection_profile_id | string | No | Profile ID to use |
config.topic | string | Yes | Kafka topic |
config.type | object | Yes | Source or sink config |
schema | object | No | Schema definition |
Table Types:
source_config- For reading from Kafkasink_config- For writing to Kafka
Format Types:
json- JSON formatavro- Avro with schema registryprotobuf- Protocol Buffersparquet- Parquet formatraw_string- Raw stringraw_bytes- Raw bytes
# Kafka source table
curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
-H "Content-Type: application/json" \
-d '{
"name": "orders",
"connection_profile_id": "prof_abc123",
"config": {
"topic": "orders",
"type": {
"source_config": {
"offset": "latest"
}
}
},
"schema": {
"format": {"json": {}},
"fields": [
{"field_name": "id", "field_type": {"primitive": "Int64"}, "nullable": false},
{"field_name": "amount", "field_type": {"primitive": "Float64"}, "nullable": false},
{"field_name": "customer", "field_type": {"primitive": "Utf8"}, "nullable": true}
]
}
}'
# Kafka sink table
curl -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
-H "Content-Type: application/json" \
-d '{
"name": "processed-orders",
"connection_profile_id": "prof_abc123",
"config": {
"topic": "processed-orders",
"type": {
"sink_config": {}
}
},
"schema": {
"format": {"json": {}}
}
}'Create Iceberg Table
curl -X POST <laminar_backend>/api/v1/connectors/iceberg/tables \
-H "Content-Type: application/json" \
-d '{
"name": "events_iceberg",
"connection_profile_id": "prof_iceberg123",
"config": {
"database": "analytics",
"table": "events",
"type": {
"sink_config": {
"commit_interval_micros": 60000000
}
}
}
}'Connectors
List available connector types.
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/connectors | List all connectors |
curl <laminar_backend>/api/v1/connectorsPagination
List endpoints support cursor-based pagination:
| Parameter | Type | Description |
|---|---|---|
starting_after | string | Cursor for next page (resource ID) |
limit | integer | Maximum items to return (default: 100) |
# First page
curl "<laminar_backend>/api/v1/pipelines?limit=10"
# Next page (using last item's ID)
curl "<laminar_backend>/api/v1/pipelines?limit=10&starting_after=pl_abc123"Response Format:
{
"data": [...],
"has_more": true
}Error Handling
Errors return appropriate HTTP status codes with JSON body:
{
"error": "Pipeline not found: pl_abc123"
}| Status | Meaning |
|---|---|
| 200 | Success |
| 400 | Bad request / validation error |
| 404 | Resource not found |
| 500 | Internal server error |
SDK Examples
Python
import requests
API_URL = "<laminar_backend>/api/v1"
# List pipelines
response = requests.get(f"{API_URL}/pipelines")
pipelines = response.json()["data"]
# Create pipeline
pipeline = requests.post(f"{API_URL}/pipelines", json={
"name": "my-pipeline",
"query": "INSERT INTO sink SELECT * FROM source",
"parallelism": 4
}).json()
# Stop pipeline gracefully
requests.patch(f"{API_URL}/pipelines/{pipeline['id']}", json={
"stop": "graceful"
})
# Get job status
jobs = requests.get(f"{API_URL}/pipelines/{pipeline['id']}/jobs").json()["data"]TypeScript
const API_URL = "<laminar_backend>/api/v1";
// List pipelines
const pipelines = await fetch(`${API_URL}/pipelines`)
.then(r => r.json())
.then(d => d.data);
// Create pipeline
const pipeline = await fetch(`${API_URL}/pipelines`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
name: "my-pipeline",
query: "INSERT INTO sink SELECT * FROM source",
parallelism: 4
})
}).then(r => r.json());
// Stop pipeline
await fetch(`${API_URL}/pipelines/${pipeline.id}`, {
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ stop: "graceful" })
});Complete Workflow (cURL)
# 1. Create a Kafka profile
PROFILE=$(curl -s -X POST <laminar_backend>/api/v1/connectors/kafka/profiles \
-H "Content-Type: application/json" \
-d '{
"name": "local-kafka",
"config": {
"bootstrap_servers": "localhost:9092",
"authentication": {"type": "none"}
}
}')
PROFILE_ID=$(echo $PROFILE | jq -r '.id')
# 2. Create source table
curl -s -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
-H "Content-Type: application/json" \
-d "{
\"name\": \"orders\",
\"connection_profile_id\": \"$PROFILE_ID\",
\"config\": {
\"topic\": \"orders\",
\"type\": {\"source_config\": {\"offset\": \"latest\"}}
},
\"schema\": {
\"format\": {\"json\": {}},
\"fields\": [
{\"field_name\": \"id\", \"field_type\": {\"primitive\": \"Int64\"}, \"nullable\": false},
{\"field_name\": \"amount\", \"field_type\": {\"primitive\": \"Float64\"}, \"nullable\": false}
]
}
}"
# 3. Create sink table
curl -s -X POST <laminar_backend>/api/v1/connectors/kafka/tables \
-H "Content-Type: application/json" \
-d "{
\"name\": \"large-orders\",
\"connection_profile_id\": \"$PROFILE_ID\",
\"config\": {
\"topic\": \"large-orders\",
\"type\": {\"sink_config\": {}}
},
\"schema\": {\"format\": {\"json\": {}}}
}"
# 4. Create pipeline
PIPELINE=$(curl -s -X POST <laminar_backend>/api/v1/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "filter-large-orders",
"query": "INSERT INTO `large-orders` SELECT * FROM orders WHERE amount > 100",
"parallelism": 4
}')
PIPELINE_ID=$(echo $PIPELINE | jq -r '.id')
# 5. Check status
curl -s <laminar_backend>/api/v1/pipelines/$PIPELINE_ID | jq
# 6. List jobs
curl -s <laminar_backend>/api/v1/pipelines/$PIPELINE_ID/jobs | jq
# 7. Stop pipeline
curl -s -X PATCH <laminar_backend>/api/v1/pipelines/$PIPELINE_ID \
-H "Content-Type: application/json" \
-d '{"stop": "graceful"}'