TransformationsΒΆ

This is the basic and most common use case. Transforming an incoming record to morph the messages using any of the capabilities provided by the SELECT statement. That includes:

  • Selecting specific fields
  • Applying supported functions to achieve a new field
  • Filtering the records based on your criteria.

Here is an example of a sensor data sent over Apache Kafka.

{
  "device_id": 1,
  "ip": "191.35.83.75",
  "timestamp": 1447886791,
  "lat": 22,
  "long": 82,
  "scale": "Celsius",
  "temp": 22.0,
  "device_name": "sensor-AbC-193X",
  "humidity": 15,
  "zipcode": 95498
}

The requirement is to consider only those entries where the temperature is equal to or above 30 degrees Celsius. Furthermore, the temperature value should be expressed in Fahrenheit the only extra information needs includes these fields ip, lat, long. This continuous query processes the records as they arrive and pushes new records downstream:

INSERT INTO hot_sensors
SELECT
      ip
    , lat
    , long
    , (temp * 1.8 + 32) AS metric
FROM  sensors
WHERE temp > 30

The new record Value component would look similar to this:

{
  "ip": "191.35.83.75",
  "lat": 22,
  "long": 82,
  "metric": 71.6
}

In this example there was no reference to the Key component. As detailed in here, a record consists of Key, Value and Metadata. When working with SQL for streaming, the engine does not support yet changing the key. For the example above, if the record received from the sensors topic contains a Key then its value will be propagated to the record sent to the hot_sensors topic.

Accessing nested fields it is fully supported by the engine. If the incoming message data were to look like the sample below:

{
  "description":"Sensor embedded in exhaust pipes in the ceilings",
  "ip":"204.116.105.67",
  "id":5,
  "temp":22,
  "c02_level":1574,
  "geo":{
    "lat":35.93,
    "long":-85.46
    }
}

then the SQL code would have to be changed to:

INSERT INTO `new_sensors`
SELECT
    ip
    , geo.lat
    , geo.long
    , (temp * 1.8 + 32) AS metric
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO

The result of applying this query will be AVRO records with the following format:

{
   "ip":"204.116.105.67",
   "lat":35.93,
   "long":-85.46,
   "temp":71.6
}

Selecting a complex field is also possible. This means its entire structure is copied over to the result. Given a payload like this:

INSERT INTO `new_sensors`
SELECT
    ip
    , geo
    , temp
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO

then the new record will have the Value data shaped like this:

{
  "ip":"204.116.105.67",
  "geo":{
    "lat":35.93,
    "long":-85.46
    },
  "temp":40
}

These examples given are applicable regardless of the topic storage type. Therefore working with an AVRO or JSON payload does not impact the user’s code.