Connect Query Language

Kafka Connect is the tool to use for moving data in and out of Apache Kafka. We provide of 25+ Connectors. Each source or sink have their own specific connection details. Working with our clients we realized the way Connect expects configuration is not allowing for easy connector management. To give you an example let’s take a JMS Sink. We want to push data from Kafka topics to JMS Topics. For that, we need a mapping between the JMS source and Kafka target. Let’s add on top a secure JMS connection and furthermore from each JMS payload we want a few fields. The way majority of connectors are written makes a nightmare to configure and manage such instances. Keep things simple and make Kafka accessible is our motto, therefore we provide a SQL like syntax to describe the above. Part of the Lenses SQL engine we call it KCQL (SQL for Kafka Connect).

Here is an example of a KCQL syntax for moving data from Kafka to JMS:

INSERT INTO /sensors
SELECT sensorId AS id,
       minTemperature,
       maxTemperature,
       avgTemperature
FROM sensors_data
STOREAS JSON
WITHTYPE TOPIC

The SQL like syntax describes and drives the work the JMS sink will perform. It will pick the fields from the Kafka topic sensors_data and push it to a JMS topic /sensors. The resulting JMS messages payload will be JSON. Easy!

“Keep things simple and make Kafka accessible is our motto.”

All our connectors support the following:

  • AVRO and JSON support
  • Multiple Kafka topic to target mapping (for sinks)
  • Multiple sources to target Kafka topic (for sources)
  • Field selection, extraction, and filtering
  • Auto creation & auto evolution for the target storage
  • Error handling policies

KCQL Syntax

Here the full syntax our connectors share:

[INSERT|UPSERT]
 INTO   $TARGET
SELECT *|columns           (i.e col1,col2 | col1 AS column1,col2, field1.field2.field3, field1.field2.field3 as f3)
FROM   $TOPIC_NAME
  [ IGNORE field1, field2,.. ]
  [ AUTOCREATE ]
  [ WITHSTRUCTURE ]
  [ PK columns ]
  [ WITHTARGET = target]
  [ AUTOEVOLVE ]
  [ BATCH = N ]
  [ CAPITALIZE ]
  [ INITIALIZE ]
  [ PROJECTTO versionNumber]
  [ PARTITIONBY cola[,colb] ]
  [ DISTRIBUTEBY cola[,colb] ]
  [ CLUSTERBY value1[,value2] INTO bucketsNumber BUCKETS ]
  [ TIMESTAMP cola|sys_current ]
  [ TIMESTAMPUNIT = timestampUnit]
  [ WITHFORMAT TEXT|AVRO|JSON|BINARY|OBJECT|MAP ]
  [ WITHUNWRAP ]
  [ STOREAS   $YOUR_TYPE([key=value, .....]) ]
  [ WITHTAG (field1, tag2= constant, field2 as tag2) ]
  [ INCREMENTALMODE = incrementalMode ]
  [ WITHTYPE type ]
  [ WITHDOCTYPE = docType  ]
  [ WITHINDEXSUFFIX = suffix ]
  [ TTL = ttlType ]
  [ WITHCONVERTER = converterClass ]
  [ WITHJMSSELECTOR = jmsSelector ]
  [ WITHKEY (field1, field2.field3) ]
  [ KEYDELIM = value ]

Of course, the keywords, non-SQL specific, are not applicable at the same time. Each connector documentation details the keys it is using.

The SELECT mode is useful for target systems that do not support the concept of namespaces ( Key-Value stores such as HazelCast or Redis):

SELECT *|columns
FROM   $TOPIC_NAME
  [ IGNORE field1, field2,.. ]
  [ PK columns ]
  [ WITHSTRUCTURE ]
  [ WITHFORMAT  JSON|AVRO|BINARY ]
  [ WITHUNWRAP ]
  [ WITHGROUP theConsumerGroup]
  [ WITHOFFSET (offset1)(offset2) ]
  [ WITHPARTITION (partition),[(partition, offset) ]
  [ SAMPLE $RECORDS_NUMBER EVERY $SLIDE_WINDOW ]
  [ LIMIT limitValue ]
  [ STOREAS $YOUR_TYPE([key=value, .....]) ]
  [ WITHTAG (field1, tag2= constant, field2 as tag2) ]
  [ INCREMENTALMODE = incrementalMode ]
  [ WITHDOCTYPE = docType  ]
  [ WITHINDEXSUFFIX = suffix ]
  [ WITHCONVERTER = converterClass ]

Here is a Redis Connector example:

"connect.redis.sink.kcql": "INSERT INTO sensorTimeseries SELECT sensorID, temperature, humidity FROM sensorsTopic STOREAS SortedSet (score=ts)",

The above KCQL instructs the Redis Kafka Sink Connector to select the three fields and store the time-series data into a Sorted Set, using the incoming field ts for scoring each message.