TransformationsΒΆ

This is the basic and most common use case. Transforming an incoming record to morph the messages using any of the capabilities provided by the SELECT statement. That includes:

  • Selecting specific fields.
  • Applying supported functions to achieve a new field.
  • Filtering the records based on your criteria.

Here is an example of a sensor data sent over Apache Kafka:

{
  "device_id": 1,
  "ip": "191.35.83.75",
  "timestamp": 1447886791,
  "lat": 22,
  "long": 82,
  "scale": "Celsius",
  "temp": 22.0,
  "device_name": "sensor-AbC-193X",
  "humidity": 15,
  "zipcode": 95498
}

The requirement is to consider only those entries where the temperature is equal to or above 30 degrees Celsius. Furthermore, the temperature value should be expressed in Fahrenheit. Last, the only extra information needed includes the ip, lat, long fields. This continuous query processes the records as they arrive and pushes new records downstream:

INSERT INTO hot_sensors
SELECT STREAM
      ip
    , lat
    , long
    , (temp * 1.8 + 32) AS metric
FROM  sensors
WHERE temp > 30

The new record Value component would look similar to this:

{
  "ip": "191.35.83.75",
  "lat": 22,
  "long": 82,
  "metric": 71.6
}

In this example there was no reference to the Key component. As detailed in here, a record consists of Key, Value and Metadata. When working with SQL for streaming, the engine does not support yet changing the key. For the example above, if the record received from the sensors topic contains a Key, then its value will be propagated to the record sent to the hot_sensors topic.

Accessing nested fields is fully supported by the engine. If the incoming message data was to look like the following sample:

{
  "description":"Sensor embedded in exhaust pipes in the ceilings",
  "ip":"204.116.105.67",
  "id":5,
  "temp":22,
  "c02_level":1574,
  "geo":{
    "lat":35.93,
    "long":-85.46
    }
}

then the SQL code would have to be changed to:

INSERT INTO `new_sensors`
SELECT STREAM
    ip
    , geo.lat
    , geo.long
    , (temp * 1.8 + 32) AS metric
FROM  `sensors`

The result of applying this query will be AVRO records with the following format:

{
   "ip":"204.116.105.67",
   "lat":35.93,
   "long":-85.46,
   "temp":71.6
}

Selecting a complex field is also possible. This means its entire structure is copied over to the result. Given a payload like this:

INSERT INTO `new_sensors`
SELECT STREAM
    ip
    , geo
    , temp
FROM  `sensors`

then the new record will have the Value data shaped like this:

{
  "ip":"204.116.105.67",
  "geo":{
    "lat":35.93,
    "long":-85.46
    },
  "temp":40
}

These examples given are applicable regardless of the topic storage type. Therefore working with an AVRO or JSON payload does not impact the code from the user.