Converting JSON to AVRO


JSON is a widely used way to represent and store data. However, it is less than ideal for handling data consistency and data quality on its own. Using JSON alone, there is no description of the data shape: fields and their types. Nothing stops changing the type of a field between two consecutive messages. Imagine the first message with a field called tag and a string value and then a second message with the same field but this time a number.

{
    "tag" : "this-is-awesome"
}
{
    "tag" : 123
}

Fields with null values increase complexity. More often than not, fields that contain null are not present in the payload.
When it comes to converting JSON to AVRO, there needs to be some more attention to fields that can be null.

AVRO is a self describing data representation, where the existence or nullability of a given field it’s known via its schema and not via checking the next messages. Avoiding having different resulting AVRO schemas, just because the input JSON varies, should be a well-established goal.

Lenses schema inference 

Lenses learns about your topics and infers their schema. This process, at the moment, it is not continuous for performance reasons. Repeatedly reading the topic data will impact the Kafka cluster performance. Inferring the schema is done based on data sampling and not by reading the entire topic data and merging all the schemas identified.

When it comes to JSON, understanding the full schema is never going to be perfect. Here are a few scenarios to highlight the limitations of JSON.

If I do not know about the field, it does not exist 

Imagine your topic at the moment contains messages with this payload structure:

{
    "deviceid" : "iot123",
    "temp" : 54.98,
    "humidity" : 32.43,
    "coords" : {
        "latitude" : 47.615694,
        "longitude" : -122.3359976
    }
}

Then a few days later, some messages received contain the structure below. By then, Lenses has inferred what was available, and therefore this new field tags will not appear.

{
    "deviceid" : "iot123",
    "temp" : 54.98,
    "humidity" : 32.43,
    "coords" : {
        "latitude" : 47.615694,
        "longitude" : -122.3359976
    },
    "tags" : "tag1,tag2"
}

When processing the topic data via Lenses SQL streaming, the tags field needs to be added by the user manually. Later in the article, you will see how.

Field can also be null in the future 

Another challenge is when the messages contain all the possible fields. Let us flip the earlier example. This time, all the messages contain the field tags. A few days later, some messages arrive without a tag. The tags field schema has been identified as string but not nullable by then.

When processing the topic data via SQL streaming, the topic schema in Lenses needs to be manually updated for the field tags to highlight it is nullable. With the examples below, you will see how.

Scenarios 

Scenario 1: Lenses does not know about the field, but it exists 

I have a topic with messages, some of which contain the field tags, some of which do not. When I try to use the field in a SQL processor, Lenses gives me this error:

Cannot create the Lenses SQL Processor.Unknown field [tags]. Available fields are:deviceid,temp,humidity,coords.

What can I do to get this working?

It looks like the tags field has not been identified by Lenses schema inference. As explained already on schema inference and its challenges, it can be a real scenario. The user should update the schema Lenses has to match the latest changes.

First, let’s recreate the context.

Setting up 

We first create the topic and insert the data. To do so, navigate to the Lenses SQL Studio screen, run the queries below.

CREATE TABLE iot_1(
   _key string
  , deviceid string
  , temp double
  , humidity double
  , coords.latitude double
  , coords.longitude double
)
FORMAT(string, json);

INSERT INTO iot_1(
    _key
    , deviceid
    , temp
    , humidity
    , coords.latitude
    , coords.longitude
) VALUES
("iot123", "iot123", 54.98, 32.43, 47.615694, -122.3359976);

To check the data run this query:

select * from iot_1; 

So far, we replicated the scenario where you have at least one producer pushing data to iot_1 and Lenses identified the schema. Next, let’s push a new message to contain a new field tags. To do so, navigate to the iot_1 topic details. Chose “Insert Messages” from the Actions drop down. Paste the following in the dialog presented:

[
  {
    "key": "iot456",
    "value": {
      "deviceid": "iot456",
      "temp": 54.18,
      "humidity": 33.03,
      "coords": {
        "latitude": 47.615094,
        "longitude": -122.3356976
      },
      "tags": "tag1,tag2"
    }
  }
]
Insert a message with tags into iot_1 topic.

Next, we want to create a processor using the field tags.

SET defaults.topic.autocreate=true;
INSERT INTO iot_1_processed
STORE VALUE AS AVRO
SELECT STREAM deviceid, tags
FROM iot_1

Trying to run this replicates the error reported.

Solution 

To get your SQL processor working, you need to manually update the schema Lenses inferred to contain the field tags. Navigate to the iot_1 topic details and then to the schema tab. From the actions, chose Edit schemas and paste the following:

{
  "type": "record",
  "name": "record0",
  "fields": [
    {
      "name": "deviceid",
      "type": "string"
    },
    {
      "name": "temp",
      "type": "double"
    },
    {
      "name": "humidity",
      "type": "double"
    },
    {
      "name": "coords",
      "type": {
        "type": "record",
        "name": "record",
        "fields": [
          {
            "name": "latitude",
            "type": "double"
          },
          {
            "name": "longitude",
            "type": "double"
          }
        ]
      }
    },
   {
      "name": "tags",
      "type": ["null","string"]
    }
  ]
}

The change is this:

 {
    "name": "tags",
    "type": ["null","string"]
 }

Save the change and go back to the SQL processor and this time, it allows you to reference the tags field and thus runs successfully.

Scenario 2: I have messages with null field 

I am trying to convert my data to AVRO, but I get this error on my SQL processor:

    Error serializing Avro message Caused by: java.lang.NullPointerException: null of ...

What can I do to get this working?

Lenses has identified the tags field (let’s consider this the field in action), but not as one which contains null values. As explained already, this is not possible to always pick up during schema inference. Before jumping to the solution, let’s recreate the context.

Setting up 

We start this tutorial by creating a topic and insert data. To do so, navigate to the Lenses SQL Studio screen, and run the queries below.

The next step is to push data as it would come from producers.

CREATE TABLE iot_2(
   _key string
  , deviceid string
  , temp double
  , humidity double
  , coords.latitude double
  , coords.longitude  double
  , tags string
)
FORMAT(string, json);
INSERT INTO iot_2(
    _key
    , deviceid
    , temp
    , humidity
    , coords.latitude
    , coords.longitude
    , tags
) VALUES
("iot123", "iot123", 54.98, 32.43, 47.615694, -122.3359976, "tag1,tag2");   

Let’s check the data by running this query:

select * from iot_2;

Next, let’s insert a message without the tags field. Navigate to the iot_2 topic details and from the Actions option chose Insert Messages. Paste the content below - notice that there is no field named tags:

[
  {
    "key": "iot456",
    "value": {
      "deviceid": "iot456",
      "temp": 54.18,
      "humidity": 33.03,
      "coords": {
        "latitude": 47.615094,
        "longitude": -122.3356976
      }
    }
  }
]

Now, let’s create a processor which converts to Avro:

SET defaults.topic.autocreate=true;
INSERT INTO iot_2_fails
STORE VALUE AS AVRO
SELECT STREAM 
    deviceid
    , tags
FROM iot_2

The processor will fail when it gets to process the second message because now it has null for the tags field, but the schema says it does not allow null values.

Solution 

There are two ways to address this.

Field as nullable 

If you do not know if the field will always be present in the future, you can write the query to make it nullable on the output topic. The solution is to mark the input field as nullable, regardless of its schema. See as_nullable(tags) in the SQL below:

SET defaults.topic.autocreate=true;
INSERT INTO iot_2_as_nullable
STORE VALUE AS AVRO
SELECT STREAM 
    deviceid
    , as_nullable(tags) as tags
FROM iot_2

The resulting topic schema will be:

{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "deviceid",
      "type": "string"
    },
    {
      "name": "tags",
      "type": [
        "null",
        "string"
      ]
    }
  ]
}

The output topic will contain two records, with the last entry having a null tags.

Update the schema to make the field nullable 

To fix the processor, you need to update the source topic schema to mark the tags field nullable. Navigate to iot_2 topic, and then on the schema tab. From the Actions options, choose edit schema, and paste the following content for the value.

{
  "type": "record",
  "name": "record0",
  "fields": [
    {
      "name": "deviceid",
      "type": "string"
    },
    {
      "name": "temp",
      "type": "double"
    },
    {
      "name": "humidity",
      "type": "double"
    },
    {
      "name": "coords",
      "type": {
        "type": "record",
        "name": "record",
        "fields": [
          {
            "name": "latitude",
            "type": "double"
          },
          {
            "name": "longitude",
            "type": "double"
          }
        ]
      }
    },
   {
      "name": "tags",
      "type": ["null","string"]
    }
  ]
}

Here is how it looks from the Lenses UI:

Update the schema for the iot_2 topic to make tags field nullable.

The change is this:

 {
    "name": "tags",
    "type": ["null","string"]
 }

Save the change and go back to the SQL processors page.

Use this code to create a new processor:

SET defaults.topic.autocreate=true;
INSERT INTO iot_2_works
STORE VALUE AS AVRO
SELECT STREAM 
    deviceid
    , tags
FROM iot_2

Scenario 3: I don’t control the JSON payload 

I receive JSON messages, I don’t know the entire schema, but I want to process the fields that I care about only. How can that work without the schema for the entire JSON?

To handle stream processing you only need Lenses to know about the fields you target.

Consider a scenario where these type of messages are received from the upstream system

{
    "InvoiceNumber": "GBPGB011",
    "TotalSalePrice": 89000,
    "SaleDate": "2015-04-30T00:00:00",
    "Customer": {
        "Name": "Wonderland Wheels",
        "CreditRisk": false,
        "Reseller": true,
        "..."
    },
    "Address": {
        "First Line" : "21A",
        "Second Line" : "Buckingham Palace Road",
        "Town": "London",
        "PostCode": "E7 4BR",
        "CountryName": "United Kingdom",
        "CountryISO": "GBR       ",
        "..."
    } ,
    "CustomerComments": [
        "Excellent",
        "Wonderful",
        "Superb"
    ],
    "Salesdetails": [
        {
            "LineItem": 1,
            "Make": "Porsche",
            "Model": "944",
            "SellingPrice": 8500,
            "...",
        },
        {
            "LineItem": 2,
            "Make": "Bentley",
            "Model": "Flying Spur",
            "...",
        }
    ],
    "id": "f58a70dc-f107-d3ba-acda-02f39893eb44",
    "_rid": "molfALBK0z8BAAAAAAAAAA==",
    "_etag": "00000000-0000-0000-c2fa",
    "_attachments": "attachments/",
    "_ts": 1549993225,
    "..."
}

Imagine these fields are required:

  • InvoiceNumber
  • SalesDate
  • Customer.Name
  • Address.FirstLine (possible null)
  • Address.SecondLine (possible null)
  • Address.Town
  • Address.PostCode
  • Address.CountryName
  • Salesdetails.Make
  • Salesdetails.Model
  • Salesdetails.SellingPrice

To process the data it’s enough to only have the schema covering the fields above and not the entire set of possible fields. Then the topic schema should be:

{
  "type" : "record",
  "name" : "Invoice",
  "namespace" : "io.lenses.core.sql.common.metrics",
  "fields" : [ {
    "name" : "InvoiceNumber",
    "type" : "string"
  }, {
    "name" : "SalesDate",
    "type" : "string"
  }, {
    "name" : "Address",
    "type" : {
      "type" : "record",
      "name" : "Address",
      "fields" : [ {
        "name" : "FirstLine",
        "type" : [ "null", "string" ]
      }, {
        "name" : "SecondLine",
        "type" : [ "null", "string" ]
      }, {
        "name" : "Town",
        "type" : "string"
      }, {
        "name" : "PostCode",
        "type" : "string"
      }, {
        "name" : "CountryName",
        "type" : "string"
      } ]
    }
  }, {
    "name" : "salesDetails",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "SalesDetails",
        "fields" : [ {
          "name" : "Make",
          "type" : "string"
        }, {
          "name" : "Model",
          "type" : "string"
        }, {
          "name" : "SellingPrice",
          "type" : "double"
        } ]
      }
    }
  } ]
}
--
Last modified: September 26, 2024