Work in Progress: This page is under development. Use the feedback button on the bottom right to help us improve it.

Kafka

Apache Kafka connector for reading from and writing to Kafka topics.

Connection Profile

  • bootstrap_servers (string, required): Comma-separated list of Kafka servers
  • authentication (required): One of:
    • {} (None)
    • SASL:
      • protocol (string, required): e.g., SASL_SSL, SASL_PLAINTEXT
      • mechanism (string, required): e.g., SCRAM-SHA-256, SCRAM-SHA-512
      • username (string, required)
      • password (string, required)
    • AWS MSK IAM:
      • region (string, required): AWS region
  • schema_registry (optional): Confluent Schema Registry
    • endpoint (string, required): Schema Registry endpoint
    • api_key (string, optional)
    • api_secret (string, optional)
  • connection_properties (object, optional): Additional rdkafka properties

Connection Table

  • topic (string, required): Kafka topic name
  • type (required): One of:
    • source_config:
      • offset (string, required): "latest" | "earliest" | "group"
      • read_mode (string, optional): "read_uncommitted" | "read_committed"
      • group_id (string, optional): Consumer group ID
      • group_id_prefix (string, optional): Prefix for group ID
    • sink_config:
      • commit_mode (string, required): "at_least_once" | "exactly_once"
      • key_field (string, optional): Field to use for message key
      • timestamp_field (string, optional): Field to use for message timestamp
  • client_configs (object, optional): Additional Kafka consumer/producer configs
  • value_subject (string, optional): Schema Registry subject (defaults to TOPIC-value)

JSON Schema Reference

Connection Profile Schema

{
  "type": "object",
  "properties": {
    "bootstrap_servers": {
      "type": "string",
      "description": "Comma-separated list of Kafka servers to connect to"
    },
    "authentication": {
      "oneOf": [
        {"type": "object", "title": "None"},
        {
          "type": "object",
          "title": "SASL",
          "required": ["protocol", "mechanism", "username", "password"],
          "properties": {
            "protocol": {"type": "string"},
            "mechanism": {"type": "string"},
            "username": {"type": "string"},
            "password": {"type": "string"}
          }
        },
        {
          "type": "object",
          "title": "AWS_MSK_IAM",
          "required": ["region"],
          "properties": {
            "region": {"type": "string"}
          }
        }
      ]
    },
    "schema_registry": {
      "oneOf": [
        {"type": "object", "title": "None"},
        {
          "type": "object",
          "title": "Confluent Schema Registry",
          "required": ["endpoint"],
          "properties": {
            "endpoint": {"type": "string"},
            "api_key": {"type": "string"},
            "api_secret": {"type": "string"}
          }
        }
      ]
    },
    "connection_properties": {"type": "object"}
  },
  "required": ["bootstrap_servers", "authentication"]
}

Connection Table Schema

{
  "type": "object",
  "properties": {
    "topic": {"type": "string"},
    "type": {
      "oneOf": [
        {
          "type": "object",
          "title": "Source",
          "required": ["source_config"],
          "properties": {
            "source_config": {
              "type": "object",
              "required": ["offset"],
              "properties": {
                "offset": {"type": "string", "enum": ["latest", "earliest", "group"]},
                "read_mode": {"type": "string", "enum": ["read_uncommitted", "read_committed"]},
                "group_id": {"type": "string"},
                "group_id_prefix": {"type": "string"}
              }
            }
          }
        },
        {
          "type": "object",
          "title": "Sink",
          "required": ["sink_config"],
          "properties": {
            "sink_config": {
              "type": "object",
              "required": ["commit_mode"],
              "properties": {
                "commit_mode": {"type": "string", "enum": ["at_least_once", "exactly_once"]},
                "key_field": {"type": "string"},
                "timestamp_field": {"type": "string"}
              }
            }
          }
        }
      ]
    },
    "client_configs": {"type": "object"},
    "value_subject": {"type": "string"}
  },
  "required": ["topic", "type"]
}