Work in Progress: This page is under development. Use the feedback button on the bottom right to help us improve it.

laminar-storage

A unified, cloud-agnostic storage abstraction layer for the Laminar stream processing engine. This crate provides a consistent interface for interacting with various storage backends including S3, Google Cloud Storage, Azure Blob Storage, Cloudflare R2, and local filesystems.

Architecture Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                     External API Layer                                   β”‚
β”‚                                                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚                              StorageProvider                                        β”‚ β”‚
β”‚  β”‚  β€’ get() / put() / delete()                                                        β”‚ β”‚
β”‚  β”‚  β€’ get_as_stream() / buf_writer()                                                  β”‚ β”‚
β”‚  β”‚  β€’ list() / exists() / head()                                                      β”‚ β”‚
β”‚  β”‚  β€’ multipart operations                                                            β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                          β”‚
                          β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                  URL Parsing & Routing                                   β”‚
β”‚                                                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚   BackendConfig      β”‚ ◄──── parse_url() ────────────│    Regex Matchers         β”‚ β”‚
β”‚  β”‚   β€’ parse_url()      β”‚                                β”‚  β€’ S3_PATH, S3_VIRTUAL    β”‚ β”‚
β”‚  β”‚   β€’ Backend enum     β”‚                                β”‚  β€’ GCS_PATH, GCS_VIRTUAL  β”‚ β”‚
β”‚  β”‚   β€’ Config structs   β”‚                                β”‚  β€’ AZURE_HTTPS, ABFS_URL  β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β”‚  β€’ R2_URL, R2_ENDPOINT    β”‚ β”‚
β”‚             β”‚                                             β”‚  β€’ FILE_URI, FILE_PATH    β”‚ β”‚
β”‚             β–Ό                                             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚                         Backend Configurations                                    β”‚  β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚  β”‚
β”‚  β”‚  β”‚S3Config β”‚ β”‚GCSConfigβ”‚ β”‚AzureConfigβ”‚ β”‚R2Config    β”‚ β”‚LocalConfig            β”‚ β”‚  β”‚
β”‚  β”‚  β”‚ β€’bucket β”‚ β”‚ β€’bucket β”‚ β”‚ β€’account  β”‚ β”‚ β€’account_idβ”‚ β”‚ β€’path                 β”‚ β”‚  β”‚
β”‚  β”‚  β”‚ β€’region β”‚ β”‚ β€’key    β”‚ β”‚ β€’containerβ”‚ β”‚ β€’bucket    β”‚ β”‚ β€’key                  β”‚ β”‚  β”‚
β”‚  β”‚  β”‚ β€’endpointβ”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β€’key      β”‚ β”‚ β€’jurisdictionβ”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚  β”‚
β”‚  β”‚  β”‚ β€’key    β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β€’key       β”‚                           β”‚  β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                           β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                          β”‚
                          β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                              Backend Construction Layer                                  β”‚
β”‚                                                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                           ObjectStore Implementations                            β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚AmazonS3     β”‚ β”‚GoogleCloudStorageβ”‚ β”‚MicrosoftAzure     β”‚ β”‚LocalFileSystemβ”‚ β”‚   β”‚
β”‚  β”‚  β”‚(object_store)β”‚ β”‚(object_store)    β”‚ β”‚(object_store)     β”‚ β”‚(object_store) β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

StorageProvider

The main public interface that wraps an ObjectStore implementation. It provides high-level storage operations with automatic path qualification, retries, and both streaming and buffered I/O.

pub struct StorageProvider {
    config: BackendConfig,
    object_store: Arc<dyn ObjectStore>,
    multipart_store: Option<Arc<dyn MultipartStore>>,
    canonical_url: String,
    storage_options: HashMap<String, String>,
}

Key Methods:

MethodDescription
for_url(url)Create a provider from a storage URL
get(path)Read data from a path
put(path, bytes)Write data to a path
delete_if_present(path)Delete if exists (no error if missing)
exists(path)Check if a path exists
list(include_subdirs)List objects in the storage
get_as_stream(path)Get data as an async stream
buf_writer(path)Get a buffered writer for efficient writes
head(path)Get object metadata

BackendConfig

An enum representing the configuration for each supported storage backend. Includes URL parsing logic using regex patterns.

pub enum BackendConfig {
    S3(S3Config),
    R2(R2Config),
    GCS(GCSConfig),
    Azure(AzureConfig),
    Local(LocalConfig),
}

LaminarCredentialProvider

Manages AWS credentials with automatic refresh and caching. Implements the CredentialProvider trait from object_store.

Features:

  • Automatic token refresh before expiration
  • 5-minute expiration buffer
  • Background refresh task
  • Thread-safe credential caching

Supported Storage Backends

Amazon S3

URL Formats:

s3://bucket/key
https://bucket.s3.region.amazonaws.com/key
https://s3.region.amazonaws.com/bucket/key
s3::https://custom-endpoint:port/bucket/key

Environment Variables:

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • AWS_DEFAULT_REGION
  • AWS_ENDPOINT (for custom endpoints)

Configuration:

pub struct S3Config {
    endpoint: Option<String>,
    region: Option<String>,
    pub bucket: String,
    key: Option<Path>,
}

Google Cloud Storage (GCS)

URL Formats:

gs://bucket/key
https://bucket.storage.googleapis.com/key
https://storage.googleapis.com/bucket/key

Environment Variables:

  • GOOGLE_SERVICE_ACCOUNT_KEY - JSON service account key

Configuration:

pub struct GCSConfig {
    pub bucket: String,
    key: Option<Path>,
}

Azure Blob Storage

URL Formats:

abfs://container@account.dfs.core.windows.net/key
abfss://container@account.dfs.core.windows.net/key
https://account.blob.core.windows.net/container/key
https://account.dfs.core.windows.net/container/key

Configuration:

pub struct AzureConfig {
    pub account: String,
    pub container: String,
    key: Option<Path>,
}

Cloudflare R2

URL Formats:

r2://account@bucket/key
r2://bucket/key (with CLOUDFLARE_ACCOUNT_ID env var)
https://account.r2.cloudflarestorage.com/bucket/key
https://bucket.account.r2.cloudflarestorage.com/key
https://account.eu.r2.cloudflarestorage.com/bucket/key (with jurisdiction)

Environment Variables:

  • CLOUDFLARE_ACCOUNT_ID
  • R2_ACCESS_KEY_ID or AWS_ACCESS_KEY_ID
  • R2_SECRET_ACCESS_KEY or AWS_SECRET_ACCESS_KEY

Configuration:

pub struct R2Config {
    account_id: String,
    pub bucket: String,
    jurisdiction: Option<String>,  // e.g., "eu"
    key: Option<Path>,
}

Note: R2 requires same-size parts for multipart uploads. Use requires_same_part_sizes() to check.

Local Filesystem

URL Formats:

file:///path/to/directory
file:/path/to/directory
/absolute/path/to/directory

Configuration:

pub struct LocalConfig {
    pub path: String,
    pub key: Option<Path>,
}

Usage Examples

Basic Operations

use laminar_storage::StorageProvider;
 
// Create a storage provider from URL
let storage = StorageProvider::for_url("s3://my-bucket/path").await?;
 
// Write data
let data = b"Hello, World!";
storage.put("file.txt", data.to_vec()).await?;
 
// Read data
let bytes = storage.get("file.txt").await?;
 
// Check existence
if storage.exists("file.txt").await? {
    println!("File exists!");
}
 
// Delete
storage.delete_if_present("file.txt").await?;

Streaming Large Files

// Stream read
let mut stream = storage.get_as_stream("large_file.parquet").await?;
let mut buffer = Vec::new();
tokio::io::AsyncReadExt::read_to_end(&mut stream, &mut buffer).await?;
 
// Buffered write
let mut writer = storage.buf_writer("output.parquet");
writer.put(large_data.into()).await?;
writer.shutdown().await?;

Multipart Upload

For uploading large files in parts (useful for files > 5GB on S3):

let path = Path::from("huge_file.bin");
let multipart_id = storage.start_multipart(&path).await?;
 
let mut parts = Vec::new();
for (i, chunk) in data_chunks.enumerate() {
    let part = storage.add_multipart(&path, &multipart_id, i + 1, chunk).await?;
    parts.push(part);
}
 
storage.close_multipart(&path, &multipart_id, parts).await?;

Listing Objects

// List without subdirectories
let mut stream = storage.list(false).await?;
while let Some(path) = stream.next().await {
    println!("Found: {:?}", path?);
}
 
// List with subdirectories
let mut stream = storage.list(true).await?;

Using Storage Options

Pass provider-specific configuration options:

use std::collections::HashMap;
 
let mut options = HashMap::new();
options.insert("aws_access_key_id".to_string(), "AKIA...".to_string());
options.insert("aws_secret_access_key".to_string(), "secret".to_string());
 
let storage = StorageProvider::for_url_with_options(
    "s3://my-bucket/prefix",
    options
).await?;

Error Handling

The crate defines StorageError for all storage-related errors:

pub enum StorageError {
    InvalidUrl,                    // URL doesn't match any known pattern
    PathError(String),             // Invalid path or directory creation failed
    NoKeyInUrl,                    // URL doesn't contain an object key
    ObjectStore(object_store::Error),  // Underlying storage error
    CredentialsError(String),      // Failed to load credentials
}

Retry Logic

The crate includes automatic retry with exponential backoff for transient errors:

  • Max retries: 10
  • Initial delay: 100ms
  • Max delay: 10 seconds
  • Smart retry: Only retries on transient errors (not 404s)
// Internal retry macro usage
storage_retry!(self.object_store.get(&path).await)?;

Path Qualification

When a StorageProvider is created with a key prefix (e.g., s3://bucket/prefix), all paths are automatically qualified:

let storage = StorageProvider::for_url("s3://bucket/data/2024").await?;
 
// This actually accesses s3://bucket/data/2024/file.parquet
storage.get("file.parquet").await?;
 
// Use qualify_path to see the full path
let full_path = storage.qualify_path(&Path::from("file.parquet"));
// Returns: data/2024/file.parquet

Dependencies

DependencyPurpose
object_storeCore storage abstraction (AWS, GCP, Azure features)
aws-configAWS SDK configuration
aws-credential-typesAWS credential management
bytesEfficient byte buffer handling
regexURL pattern matching
tokioAsync runtime
futuresAsync stream utilities

Use Cases in Laminar

The storage crate is used throughout Laminar for:

  1. Checkpoints - Storing pipeline state for fault tolerance
  2. Data files - Reading/writing Parquet, JSON, and other data formats
  3. Configuration - Loading remote configuration files

Adding New Storage Backends

To add support for a new storage backend:

  1. Add URL pattern constants and regex matchers to matchers()
  2. Create a config struct (e.g., HDFSConfig)
  3. Add variant to BackendConfig enum
  4. Implement parse_* method for URL parsing
  5. Create construct_* method for building the provider
  6. Either implement ObjectStore trait or create an adapter
  7. Add tests for URL parsing and basic operations

See the existing implementations for S3, GCS, Azure, and R2 as examples.