Automate Kafka topic creation


Requirement 

We want to automate the process of creating Kafka topics and their schema. How can I do that?

Solution 

Lenses allows Kafka topic management, including their schema. When it comes to the schema, the Avro storage schema is also registered with the Schema Registry connection provided to Lenses. For JSON, CSV, or XML schemaless storage format, Lenses attempts to infer their schema, but Schema Registry is not involved. It enhances the Explore screen user experience and allows building stream analytics pipelines using SQL processors.

Here is the API to call to create a topic and attach a schema to it.

POST /api/v1/kafka/topic

Request body

{
  "name" : "$YOUR_TOPIC",
  "replication" : 1,               
  "partitions" : 1,                
  "configs" : {
    "$KAKFA_TOPIC_CONFIG_KEY" : "$VALUE"     
  },
  "format" : {
    "key" : {
      "format" : "INT/STRING/LONG/AVRO/JSON/CSV/XML",
      "schema" : "$AVRO_SCHEMA"
    },
    "value" : {
      "format" : "INT/STRING/LONG/AVRO/JSON/CSV/XML",
      "schema" : "$AVRO_SCHEMA"
    }
  }
}
FieldDescriptionRequiredRemarks
nameThe Kafka topic name to createyes
replicationThe topic replication factor. If provided, it has to be greater than zero, and less than the cluster brokers countno
partitionsThe topic partitions. If provided has to be greater than 0no
configsA key-value set, where the key is the Kafka topic configuration.no“configs” is required even without any properties set
format.key.formatSpecifies the storage format for the Kafka message KeyyesSupported values: STRING, INT, LONG, AVRO, JSON, CSV, XML
format.key.schemaKey data format in AVRO representation.yes except primitives (INT, LONG, STRING)
format.value.formatSpecifies the storage format for the Kafka message ValueyesSupported values: STRING, INT, LONG, AVRO, JSON, CSV, XML
format.value.schemaValue data format in AVRO representation.yes except primitives (INT, LONG, STRING)

Setting up our example 

We have an input topic, let’s call it ‘input,’ the topic message key is text(i.e., STRING), and the message value is an Avro with this schema:

{
  "type": "record",
  "name": "ConnectDefault",
  "namespace": "io.lenses.connect.avro",
  "fields": [
    {
      "name": "EVENTID",
      "type": "string"
    },
    {
      "name": "EVENTDATETIME",
      "type": "string"
    },
    {
      "name": "CLIENTID",
      "type": "string"
    },
    {
      "name": "CUSTOMERCATEGORY",
      "type": "string"
    },
    {
      "name": "CUSTOMERID",
      "type": "string"
    },
    {
      "name": "ENTITYID",
      "type": "string"
    },
    {
      "name": "EVENTCORRELATIONID",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "EVENTSOURCE",
      "type": "string"
    },
    {
      "name": "SUPERPRODUCTID",
      "type": "string"
    },
    {
      "name": "TENANTID",
      "type": "string"
    },
    {
      "name": "TREATMENTCODE",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SAPREASONCODE",
      "type": "string"
    },
    {
      "name": "TRXAMOUNT",
      "type": "string"
    },
    {
      "name": "TRXCODE",
      "type": "string"
    },
    {
      "name": "TRXDESCRIPTION",
      "type": "string"
    },
    {
      "name": "SourceType",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

Step 1 - Create the security group 

Navigate to the Admin section using the top bar navigation. From the left navigation system, navigate to Groups and add this group.

Content of Lenses group screen

Step 2 - Create the service account 

From there follow the left navigation system to go to Service accounts page, and add a new entry:

Content of service account screen

Grab the token provided for the service account; step 3 will use it.

Step 3 - Create the topic and the schema 

{
  "name": "input",
  "replication": 1,
  "partitions": 3,
  "configs": {},
  "format": {
    "key": {
      "format": "STRING"
    },
    "value": {
      "format": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.lenses.connect.avro\",\"fields\":[{\"name\":\"EVENTID\",\"type\":\"string\"},{\"name\":\"EVENTDATETIME\",\"type\":\"string\"},{\"name\":\"CLIENTID\",\"type\":\"string\"},{\"name\":\"CUSTOMERCATEGORY\",\"type\":\"string\"},{\"name\":\"CUSTOMERID\",\"type\":\"string\"},{\"name\":\"ENTITYID\",\"type\":\"string\"},{\"name\":\"EVENTCORRELATIONID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EVENTSOURCE\",\"type\":\"string\"},{\"name\":\"SUPERPRODUCTID\",\"type\":\"string\"},{\"name\":\"TENANTID\",\"type\":\"string\"},{\"name\":\"TREATMENTCODE\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SAPREASONCODE\",\"type\":\"string\"},{\"name\":\"TRXAMOUNT\",\"type\":\"string\"},{\"name\":\"TRXCODE\",\"type\":\"string\"},{\"name\":\"TRXDESCRIPTION\",\"type\":\"string\"},{\"name\":\"SourceType\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
    }
  }
}

Paste the content after running this command

cat > body.json
[Paste the JSON above]
[Hit Enter then CTRL+C]

Lenses authentication and authorization require a token passed via X-Kafka-Lenses-Token. Since we created the service account entry, this token is already available.

curl --header "Content-Type: application/json" \
  --header "X-Kafka-Lenses-Token: demo-sa:48cd653d-1889-42b6-9f38-6e5116db691a" \
  -i -X POST \
  --data @body.json \
  http://localhost:24015/api/v1/kafka/topic

On successful call there will be an input Kafka topic, and a input-value Schema Registry subject entry.

--
Last modified: September 15, 2024