Kafka
Apache Kafka connector for reading from and writing to Kafka topics.
Connection Profile
bootstrap_servers(string, required): Comma-separated list of Kafka serversauthentication(required): One of:{}(None)- SASL:
protocol(string, required): e.g., SASL_SSL, SASL_PLAINTEXTmechanism(string, required): e.g., SCRAM-SHA-256, SCRAM-SHA-512username(string, required)password(string, required)
- AWS MSK IAM:
region(string, required): AWS region
schema_registry(optional): Confluent Schema Registryendpoint(string, required): Schema Registry endpointapi_key(string, optional)api_secret(string, optional)
connection_properties(object, optional): Additional rdkafka properties
Connection Table
topic(string, required): Kafka topic nametype(required): One of:source_config:offset(string, required): "latest" | "earliest" | "group"read_mode(string, optional): "read_uncommitted" | "read_committed"group_id(string, optional): Consumer group IDgroup_id_prefix(string, optional): Prefix for group ID
sink_config:commit_mode(string, required): "at_least_once" | "exactly_once"key_field(string, optional): Field to use for message keytimestamp_field(string, optional): Field to use for message timestamp
client_configs(object, optional): Additional Kafka consumer/producer configsvalue_subject(string, optional): Schema Registry subject (defaults to TOPIC-value)
JSON Schema Reference
Connection Profile Schema
{
"type": "object",
"properties": {
"bootstrap_servers": {
"type": "string",
"description": "Comma-separated list of Kafka servers to connect to"
},
"authentication": {
"oneOf": [
{"type": "object", "title": "None"},
{
"type": "object",
"title": "SASL",
"required": ["protocol", "mechanism", "username", "password"],
"properties": {
"protocol": {"type": "string"},
"mechanism": {"type": "string"},
"username": {"type": "string"},
"password": {"type": "string"}
}
},
{
"type": "object",
"title": "AWS_MSK_IAM",
"required": ["region"],
"properties": {
"region": {"type": "string"}
}
}
]
},
"schema_registry": {
"oneOf": [
{"type": "object", "title": "None"},
{
"type": "object",
"title": "Confluent Schema Registry",
"required": ["endpoint"],
"properties": {
"endpoint": {"type": "string"},
"api_key": {"type": "string"},
"api_secret": {"type": "string"}
}
}
]
},
"connection_properties": {"type": "object"}
},
"required": ["bootstrap_servers", "authentication"]
}Connection Table Schema
{
"type": "object",
"properties": {
"topic": {"type": "string"},
"type": {
"oneOf": [
{
"type": "object",
"title": "Source",
"required": ["source_config"],
"properties": {
"source_config": {
"type": "object",
"required": ["offset"],
"properties": {
"offset": {"type": "string", "enum": ["latest", "earliest", "group"]},
"read_mode": {"type": "string", "enum": ["read_uncommitted", "read_committed"]},
"group_id": {"type": "string"},
"group_id_prefix": {"type": "string"}
}
}
}
},
{
"type": "object",
"title": "Sink",
"required": ["sink_config"],
"properties": {
"sink_config": {
"type": "object",
"required": ["commit_mode"],
"properties": {
"commit_mode": {"type": "string", "enum": ["at_least_once", "exactly_once"]},
"key_field": {"type": "string"},
"timestamp_field": {"type": "string"}
}
}
}
}
]
},
"client_configs": {"type": "object"},
"value_subject": {"type": "string"}
},
"required": ["topic", "type"]
}