laminar-storage
A unified, cloud-agnostic storage abstraction layer for the Laminar stream processing engine. This crate provides a consistent interface for interacting with various storage backends including S3, Google Cloud Storage, Azure Blob Storage, Cloudflare R2, and local filesystems.
Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β External API Layer β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β StorageProvider β β
β β β’ get() / put() / delete() β β
β β β’ get_as_stream() / buf_writer() β β
β β β’ list() / exists() / head() β β
β β β’ multipart operations β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β URL Parsing & Routing β
β β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββββββββ β
β β BackendConfig β βββββ parse_url() βββββββββββββ Regex Matchers β β
β β β’ parse_url() β β β’ S3_PATH, S3_VIRTUAL β β
β β β’ Backend enum β β β’ GCS_PATH, GCS_VIRTUAL β β
β β β’ Config structs β β β’ AZURE_HTTPS, ABFS_URL β β
β ββββββββββββ¬ββββββββββββ β β’ R2_URL, R2_ENDPOINT β β
β β β β’ FILE_URI, FILE_PATH β β
β βΌ ββββββββββββββββββββββββββββββ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Backend Configurations β β
β β βββββββββββ βββββββββββ ββββββββββββ ββββββββββββββ βββββββββββββββββββββββββ β β
β β βS3Config β βGCSConfigβ βAzureConfigβ βR2Config β βLocalConfig β β β
β β β β’bucket β β β’bucket β β β’account β β β’account_idβ β β’path β β β
β β β β’region β β β’key β β β’containerβ β β’bucket β β β’key β β β
β β β β’endpointββββββββββββ β β’key β β β’jurisdictionββββββββββββββββββββββββββ β β
β β β β’key β βββββββββββββ β β’key β β β
β β βββββββββββ ββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Backend Construction Layer β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ObjectStore Implementations β β
β β ββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββββ ββββββββββββββββ β β
β β βAmazonS3 β βGoogleCloudStorageβ βMicrosoftAzure β βLocalFileSystemβ β β
β β β(object_store)β β(object_store) β β(object_store) β β(object_store) β β β
β β ββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββββ ββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Core Components
StorageProvider
The main public interface that wraps an ObjectStore implementation. It provides high-level storage operations with automatic path qualification, retries, and both streaming and buffered I/O.
pub struct StorageProvider {
config: BackendConfig,
object_store: Arc<dyn ObjectStore>,
multipart_store: Option<Arc<dyn MultipartStore>>,
canonical_url: String,
storage_options: HashMap<String, String>,
}Key Methods:
| Method | Description |
|---|---|
for_url(url) | Create a provider from a storage URL |
get(path) | Read data from a path |
put(path, bytes) | Write data to a path |
delete_if_present(path) | Delete if exists (no error if missing) |
exists(path) | Check if a path exists |
list(include_subdirs) | List objects in the storage |
get_as_stream(path) | Get data as an async stream |
buf_writer(path) | Get a buffered writer for efficient writes |
head(path) | Get object metadata |
BackendConfig
An enum representing the configuration for each supported storage backend. Includes URL parsing logic using regex patterns.
pub enum BackendConfig {
S3(S3Config),
R2(R2Config),
GCS(GCSConfig),
Azure(AzureConfig),
Local(LocalConfig),
}LaminarCredentialProvider
Manages AWS credentials with automatic refresh and caching. Implements the CredentialProvider trait from object_store.
Features:
- Automatic token refresh before expiration
- 5-minute expiration buffer
- Background refresh task
- Thread-safe credential caching
Supported Storage Backends
Amazon S3
URL Formats:
s3://bucket/key
https://bucket.s3.region.amazonaws.com/key
https://s3.region.amazonaws.com/bucket/key
s3::https://custom-endpoint:port/bucket/key
Environment Variables:
AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_DEFAULT_REGIONAWS_ENDPOINT(for custom endpoints)
Configuration:
pub struct S3Config {
endpoint: Option<String>,
region: Option<String>,
pub bucket: String,
key: Option<Path>,
}Google Cloud Storage (GCS)
URL Formats:
gs://bucket/key
https://bucket.storage.googleapis.com/key
https://storage.googleapis.com/bucket/key
Environment Variables:
GOOGLE_SERVICE_ACCOUNT_KEY- JSON service account key
Configuration:
pub struct GCSConfig {
pub bucket: String,
key: Option<Path>,
}Azure Blob Storage
URL Formats:
abfs://container@account.dfs.core.windows.net/key
abfss://container@account.dfs.core.windows.net/key
https://account.blob.core.windows.net/container/key
https://account.dfs.core.windows.net/container/key
Configuration:
pub struct AzureConfig {
pub account: String,
pub container: String,
key: Option<Path>,
}Cloudflare R2
URL Formats:
r2://account@bucket/key
r2://bucket/key (with CLOUDFLARE_ACCOUNT_ID env var)
https://account.r2.cloudflarestorage.com/bucket/key
https://bucket.account.r2.cloudflarestorage.com/key
https://account.eu.r2.cloudflarestorage.com/bucket/key (with jurisdiction)
Environment Variables:
CLOUDFLARE_ACCOUNT_IDR2_ACCESS_KEY_IDorAWS_ACCESS_KEY_IDR2_SECRET_ACCESS_KEYorAWS_SECRET_ACCESS_KEY
Configuration:
pub struct R2Config {
account_id: String,
pub bucket: String,
jurisdiction: Option<String>, // e.g., "eu"
key: Option<Path>,
}Note: R2 requires same-size parts for multipart uploads. Use requires_same_part_sizes() to check.
Local Filesystem
URL Formats:
file:///path/to/directory
file:/path/to/directory
/absolute/path/to/directory
Configuration:
pub struct LocalConfig {
pub path: String,
pub key: Option<Path>,
}Usage Examples
Basic Operations
use laminar_storage::StorageProvider;
// Create a storage provider from URL
let storage = StorageProvider::for_url("s3://my-bucket/path").await?;
// Write data
let data = b"Hello, World!";
storage.put("file.txt", data.to_vec()).await?;
// Read data
let bytes = storage.get("file.txt").await?;
// Check existence
if storage.exists("file.txt").await? {
println!("File exists!");
}
// Delete
storage.delete_if_present("file.txt").await?;Streaming Large Files
// Stream read
let mut stream = storage.get_as_stream("large_file.parquet").await?;
let mut buffer = Vec::new();
tokio::io::AsyncReadExt::read_to_end(&mut stream, &mut buffer).await?;
// Buffered write
let mut writer = storage.buf_writer("output.parquet");
writer.put(large_data.into()).await?;
writer.shutdown().await?;Multipart Upload
For uploading large files in parts (useful for files > 5GB on S3):
let path = Path::from("huge_file.bin");
let multipart_id = storage.start_multipart(&path).await?;
let mut parts = Vec::new();
for (i, chunk) in data_chunks.enumerate() {
let part = storage.add_multipart(&path, &multipart_id, i + 1, chunk).await?;
parts.push(part);
}
storage.close_multipart(&path, &multipart_id, parts).await?;Listing Objects
// List without subdirectories
let mut stream = storage.list(false).await?;
while let Some(path) = stream.next().await {
println!("Found: {:?}", path?);
}
// List with subdirectories
let mut stream = storage.list(true).await?;Using Storage Options
Pass provider-specific configuration options:
use std::collections::HashMap;
let mut options = HashMap::new();
options.insert("aws_access_key_id".to_string(), "AKIA...".to_string());
options.insert("aws_secret_access_key".to_string(), "secret".to_string());
let storage = StorageProvider::for_url_with_options(
"s3://my-bucket/prefix",
options
).await?;Error Handling
The crate defines StorageError for all storage-related errors:
pub enum StorageError {
InvalidUrl, // URL doesn't match any known pattern
PathError(String), // Invalid path or directory creation failed
NoKeyInUrl, // URL doesn't contain an object key
ObjectStore(object_store::Error), // Underlying storage error
CredentialsError(String), // Failed to load credentials
}Retry Logic
The crate includes automatic retry with exponential backoff for transient errors:
- Max retries: 10
- Initial delay: 100ms
- Max delay: 10 seconds
- Smart retry: Only retries on transient errors (not 404s)
// Internal retry macro usage
storage_retry!(self.object_store.get(&path).await)?;Path Qualification
When a StorageProvider is created with a key prefix (e.g., s3://bucket/prefix), all paths are automatically qualified:
let storage = StorageProvider::for_url("s3://bucket/data/2024").await?;
// This actually accesses s3://bucket/data/2024/file.parquet
storage.get("file.parquet").await?;
// Use qualify_path to see the full path
let full_path = storage.qualify_path(&Path::from("file.parquet"));
// Returns: data/2024/file.parquetDependencies
| Dependency | Purpose |
|---|---|
object_store | Core storage abstraction (AWS, GCP, Azure features) |
aws-config | AWS SDK configuration |
aws-credential-types | AWS credential management |
bytes | Efficient byte buffer handling |
regex | URL pattern matching |
tokio | Async runtime |
futures | Async stream utilities |
Use Cases in Laminar
The storage crate is used throughout Laminar for:
- Checkpoints - Storing pipeline state for fault tolerance
- Data files - Reading/writing Parquet, JSON, and other data formats
- Configuration - Loading remote configuration files
Adding New Storage Backends
To add support for a new storage backend:
- Add URL pattern constants and regex matchers to
matchers() - Create a config struct (e.g.,
HDFSConfig) - Add variant to
BackendConfigenum - Implement
parse_*method for URL parsing - Create
construct_*method for building the provider - Either implement
ObjectStoretrait or create an adapter - Add tests for URL parsing and basic operations
See the existing implementations for S3, GCS, Azure, and R2 as examples.