Publish Data to an Apache Kafka topic


Scenario 

I need to publish a Kafka record to a Kafka topic.

Solution 

Sometimes there is the need to publish data to a topic. Maybe a previous record contained incorrect information or was missing data, and downstream systems have been impacted. Inserting one record with the correct information would correct the problems. Or maybe, one control message will trigger a data workflow. Finally, using the Kafka default command-line tools is a bit too much for your data stewards, business analysts, developers, or data scientists. You want a flexible RBAC system and audit out of the box.

Writing SQL to handle the insert using SQL studio is one option to publish the data. However, suppose the JSON representation for my data (quite an often scenario) is available, and it is a complex structure. In that case, this functionality can speed up your task.

Records 

We assume a topic named myTopic already exists and has the permission to publish data to it. Furthermore, the topic is configured to retain records with Json keys and AVRO values. The Avro schema is set to:

{
  "type": "record",
  "name": "milanoRecord",
  "namespace": "io.lenses.telecom.telecomitalia.grid",
  "doc": "Schema for Grid for Telecommunications Data from Telecom Italia.",
  "fields": [
    {
      "name": "SquareId",
      "type": "int",
      "doc": " The id of the square that is part of the Milano GRID"
    },
    {
      "name": "Polygon",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "coordinates",
          "fields": [
            {
              "name": "longitude",
              "type": "double"
            },
            {
              "name": "latitude",
              "type": "double"
            }
          ]
        }
      }
    }
  ]
}

We now want to insert a record to this topic with this content:

  • key
  { "SquareId": 51 }
  • value
  {
    "SquareId": 51,
    "Polygon": [
      { "longitude": "9.161512740279838", "latitude": "45.35868769924141" },
      { "longitude": "9.164513162265347", "latitude": "45.358683417093154" },
      { "longitude": "9.164507032776845", "latitude": "45.3565681059862" },
      { "longitude": "9.161506722580903", "latitude": "45.35657238782037" },
      { "longitude": "9.161512740279838", "latitude": "45.35868769924141" }
    ]
  }

To do so, navigate to the Explore screen and then to the details page for the topic myTopic. Follow the Actions button and chose the option Insert Messages. The following dialog appears, prompting the user for the data to insert.

Dialog to publish data to Apache Kafka

The input accepts an array of records, but for now, we would insert just one. Use the following JSON as the data content and click the Insert Messages button:

[
  {
    "key": { "SquareId": 51 },
    "value": {
      "SquareId": 51,
      "Polygon": [
        { "longitude": "9.161512740279838", "latitude": "45.35868769924141" },
        { "longitude": "9.164513162265347", "latitude": "45.358683417093154" },
        { "longitude": "9.164507032776845", "latitude": "45.3565681059862" },
        { "longitude": "9.161506722580903", "latitude": "45.35657238782037" },
        { "longitude": "9.161512740279838", "latitude": "45.35868769924141" }
      ]
    }
  }
]

Reload the page, and the following record appears on the Data tab:

Kafka record with the data published earlier

Lenses has taken the JSON input and validated the key content is valid. For the record Value, it used the Avro schema to transform the input to Avro before it published it. Any misalignment with the Avro schema will yield an error back to the user. Therefore, the data will not be published to the Kafka topic.

Publish a record with a null value/key 

The input JSON should omit the key or value fields to insert records with nullable keys or values. The following content will insert a record with a nullable value.

[
  {
    "key": { "SquareId": 51 }
  }
]

To insert a record with a nullable key requires the input JSON to omit the key field. If the target topic is compacted, inserting nullable keys are rejected.

[
  {
    "value": {
      "SquareId": 51,
      "Polygon": [
        { "longitude": "9.161512740279838", "latitude": "45.35868769924141" },
        { "longitude": "9.164513162265347", "latitude": "45.358683417093154" },
        { "longitude": "9.164507032776845", "latitude": "45.3565681059862" },
        { "longitude": "9.161506722580903", "latitude": "45.35657238782037" },
        { "longitude": "9.161512740279838", "latitude": "45.35868769924141" }
      ]
    }
  }
]

Lenses will allow inserting primitive types for a Kafka record like String, Integer, Long. It does not support inserting records when the target topic storage involves Bytes, Xml, or Time-window types (which result from aggregation).

Records with headers (available since version 4.2) 

At times, there is a requirement to publish records containing headers. Adding them to a record requires a header field in the input provided to the insert messages dialog.

Publishing headers is optional and is triggered by the presence of the headers field.
Here are the rules for inserting headers:

  • the headers field needs to be a complex type
[
  {
    "headers": {
       "h1": "...",
       "h2": "..."
    },
    "key" : "...",
    "value": "..."
  }
]
  • headers nested field name becomes the header name
  • All non-floating-point numbers are stored as Long
[
  {
    "headers": {
      "h1": "1"
    }
  }
]
  • Floating-point numbers are stored as Double
[
  {
    "headers": {
      "h1": "1.1"
    },
    "..."
  }
]
  • A header field set to null will insert null content for the header
[
  {
    "headers": {
      "h1": "null"
    },
    "..."
  }
]
  • Text, Boolean and decimal fields are stored as String
  • A second level nested Json will be stored as String. Both h1 and h2 headers will contain the inner content
[
  {
    "headers": {
      "h1": {
        "a": "..."
      },
      "h2": [
        {
          "x": "..."
        }
      ]
    },
    "..."
  }
]
  • Storing header as raw bytes is not supported.

If the rules above are not met, then an error will be returned.

Here is an example for all the supported headers features:

[
  {
    "headers":{
      "nullable": null,
      "int": 1,
      "double": 1.1,
      "text" : "example text",
      "boolean": true,
      "nested1": {
        "x": 1,
        "y": "example test",
        "z": {
          "a": 2
        }
      },
      "nested2": [
        {
          "x": 1,
          "y": "example test"
        },
        {
          "x": 2,
          "y": "example test"
        }
      ]  
    },
    "key": { "SquareId": 51 },
    "value": {
      "SquareId": 51,
      "Polygon": [
        { "longitude": "9.161512740279838", "latitude": "45.35868769924141" },
        { "longitude": "9.164513162265347", "latitude": "45.358683417093154" },
        { "longitude": "9.164507032776845", "latitude": "45.3565681059862" },
        { "longitude": "9.161506722580903", "latitude": "45.35657238782037" },
        { "longitude": "9.161512740279838", "latitude": "45.35868769924141" }
      ]
    }
  }
]

At the moment, headers have these types: double, longs are rendered as a String, a functionality gap that will be closed soon.

--
Last modified: September 13, 2024