You are viewing documentation for an older version of platform. For the latest, click here.

Stream Reactor install

Kafka topology view

The Kafka Connectors are bundled into the Stream Reactor open source project. The components are built around Apache Kafka. They rely on the Kafka Brokers, Zookeepers and optionally the Schema Registry.

Download Stream Reactor - Kafka 1.0 Stream Reactor - Kafka 0.11

The latest version can be downloaded on GitHub

Stream Reactor Docker Install 

The Stream Reactor is packaged into the Kafka Development docker which is ideal for developers to get started. This docker contains Lenses and allows you deploy and monitor the connectors contained in the Stream Reactor. We recommend you start with this.

Kafka Connect Helm Charts 

Kafka topology view

Helm is a package manager for Kubernetes, Helm charts are available for Connectors at

Even if you use Dockers your landscape can still be complex, handling multi-tenancy, inspecting and managing docker files, handling service discovery, environment variables and promotion to production, Helm and the Landscaper helps you manage this.

To add the Stream Reactor Helm charts simply add our repository to your Helm instance:

helm repo add lenses

Kafka Connect Install 

To install Kafka Connect follow the Kafka documentation instructions.

You will also need the Kafka Brokers, Zookeeper and optionally the Schema Registry to be installed and running. If you are using the Schema Registry you must set the key and value serializers in the worker properties:


If you require Schema Registry support you download this from Confluent.

Stream Reactor Install 

To install in a production environment or on Connect clusters, download the entire release. Or cherry-pick manually the connector you are interested at

Unpack the archive:

#Stream reactor release
mkdir stream-reactor
tar xvf stream-reactor-x.x.x-x.x.x.tar.gz -C stream-reactor

Within the unpacked directory you will find the following structure:

    |-- LICENSE
    |-- bin
    |-- conf
    `-- libs

The libs folder contains all the Stream Reactor Connector jars.

Each connector jar needs to be available to each worker in the Kafka Connect Cluster. The best way to archive this is via the isolated classpath loader introduced into Connect in Kafka version 0.11.

  1. Create a folder called plugins/lib and place the Stream reactor connector jars you want inside.
  2. Set the plugin.path in the worker properties file to the location of the jar
  3. Restart the Connect worker.
#  create folder
mkdir -p plugins/lib

# copy in the jar
cp kafka-connect-*.jar plugins/lib

# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins >

# restart the workers
bin/ config/

If you are using Kafka versions 0.10.x the plugin.path classloader isolation is not available then set the connector first on the classpath

    export CLASSPATH=lenses-sql-runners-x.x.x-all.jar

The more connector jars you add to the plugin.path, the longer the Connect workers will take to start. This is becuase Connect is scanning all the jars in this folder for those classes implementing the Kafka Connect Source and Sink interfaces.

Tip: Only put the connectors you want in the plugin.path.

Latest Test Results 

The test suites running are available at and test results at

Release Notes 


  • Kafka 1.0.0 support


  • Add FTPS support to FTP connector, new configuration option ftp.protocol introduced, either ftp (default) or ftps.
  • Fix for MQTT source High CPU Thanks @masahirom!
  • Improve logging on Kudu
  • Added Helm Chart generator
  • DELETE functionality add to the Cassandra sink, deletion now possible for null payloads, thanks @sandonjacobs !
  • Fix in kafka-connect-common to handle primary keys with doc strings thanks, @medvekoma !
  • Fix writing multiple topics to the same table in Cassandra #284
  • Upgrade to Cassandra driver 3.3.0 and refactor Cassandra tests
  • Fix on JMS source transacted queues #285 thanks @matthedude !
  • Fix on Cassandra source, configurable timespan queries. You can now control the timespan the Connector will query for
  • Allow setting initial query timestamp on Cassandra source
  • Allow multiple primary keys on the redis sink



  • Upgrade CoAP to 2.0.0-M4
  • Upgrade to Confluent 3.3 and Kafka
  • Added MQTT Sink.
  • Add MQTT wildcard support.
  • Upgrade CoAP to 2.0.0-M4.
  • Added WITHCONVERTERS and WITHTYPE to JMS and MQTT connectors in KCQL to simplify configuration.
  • Add flush mode to Kudu sink with a PR from @patsak. Thanks



  • Upgrade to Confluent 3.2.2
  • Upgrade to KCQL 2x
  • Add CQL generator to Cassandra source
  • Add KCQL INCREMENTALMODE support to the Cassandra source, bulk mode and the timestamp column type is now take from KCQL
  • Support for setting key and truststore type on Cassandra connectors
  • Added token based paging support for Cassandra source
  • Added default bytes converter to JMS Source
  • Added default connection factory to JMS Source
  • Added support for SharedDurableConsumers to JMS Connectors
  • Upgraded JMS Connector to JMS 2.0
  • Moved to Elastic4s 2.4
  • Added Elastic5s with TCP, TCP+XPACK and HTTP client support
  • Upgrade Azure Documentdb to 1.11.0
  • Added optional progress counter to all connectors, it can be enabled with connect.progress.enabled which will periodically report log messages processed
  • Added authentication and TLS to ReThink Connectors
  • Added TLS support for ReThinkDB, add batch size option to source for draining the internal queues.
  • Upgrade Kudu Client to 1.4.0
  • Support for dates in Elastic Indexes and custom document types
  • Upgrade Connect CLI to 1.0.2 (Renamed to connect-cli)

Bug Fixes

  • Fixes for high CPU on CoAP source
  • Fixes for high CPU on Cassandra source
  • Fixed Avro double fields mapping to Kudu columns
  • Fixes on JMS properties converter, Invalid schema when extracting properties


  • Refactored Cassandra Tests to use only one embedded instance
  • Removed unused batch size and bucket size options from Kudu, they are taken from KCQL
  • Removed unused batch size option from DocumentDb
  • Rename Azure DocumentDb connect.documentdb.db to connect.documentdb.db
  • Rename Azure DocumentDb connect.documentdb.database.create to connect.documentdb.db.create
  • Rename Cassandra Source connect.cassandra.source.kcql to connect.cassandra.kcql
  • Rename Cassandra Source connect.cassandra.source.timestamp.type to connect.cassandra.timestamp.type
  • Rename Cassandra Source connect.cassandra.source.import.poll.interval to connect.cassandra.import.poll.interval
  • Rename Cassandra Source connect.cassandra.source.error.policy to connect.cassandra.error.policy
  • Rename Cassandra Source connect.cassandra.source.max.retries to connect.cassandra.max.retries
  • Rename Cassandra Sink connect.cassandra.source.retry.interval to connect.cassandra.retry.interval
  • Rename Cassandra Sink connect.cassandra.sink.kcql to connect.cassandra.kcql
  • Rename Cassandra Sink connect.cassandra.sink.error.policy to connect.cassandra.error.policy
  • Rename Cassandra Sink connect.cassandra.sink.max.retries to connect.cassandra.max.retries
  • Rename Cassandra Sink Sink connect.cassandra.sink.retry.interval to connect.cassandra.retry.interval
  • Rename Coap Source connect.coap.bind.port to connect.coap.port
  • Rename Coap Sink connect.coap.bind.port to connect.coap.port
  • Rename Coap Source to
  • Rename Coap Sink to
  • Rename MongoDb connect.mongo.database to connect.mongo.db
  • Rename MongoDb connect.mongo.sink.batch.size to connect.mongo.batch.size
  • Rename Druid connect.druid.sink.kcql to connect.druid.kcql
  • Rename Druid connect.druid.sink.conf.file to connect.druid.kcql
  • Rename Druid connect.druid.sink.write.timeout to connect.druid.write.timeout
  • Rename Elastic connect.elastic.sink.kcql to connect.elastic.kcql
  • Rename HBase to
  • Rename HBase connect.hbase.sink.kcql to connect.hbase.kcql
  • Rename HBase connect.hbase.sink.error.policy to connect.hbase.error.policy
  • Rename HBase connect.hbase.sink.max.retries to connect.hbase.max.retries
  • Rename HBase connect.hbase.sink.retry.interval to connect.hbase.retry.interval
  • Rename Influx connect.influx.sink.kcql to connect.influx.kcql
  • Rename Influx connect.influx.connection.user to connect.influx.username
  • Rename Influx connect.influx.connection.password to connect.influx.password
  • Rename Influx connect.influx.connection.database to connect.influx.db
  • Rename Influx connect.influx.connection.url to connect.influx.url
  • Rename Kudu connect.kudu.sink.kcql to connect.kudu.kcql
  • Rename Kudu connect.kudu.sink.error.policy to connect.kudu.error.policy
  • Rename Kudu connect.kudu.sink.retry.interval to connect.kudu.retry.interval
  • Rename Kudu connect.kudu.sink.max.retries to connect.kudu.max.reties
  • Rename Kudu connect.kudu.sink.schema.registry.url to connect.kudu.schema.registry.url
  • Rename Redis connect.redis.connection.password to connect.redis.password
  • Rename Redis connect.redis.sink.kcql to connect.redis.kcql
  • Rename Redis to
  • Rename Redis connect.redis.connection.port to connect.redis.port
  • Rename ReThink to
  • Rename ReThink connect.rethink.source.port to connect.rethink.port
  • Rename ReThink connect.rethink.source.db to connect.rethink.db
  • Rename ReThink connect.rethink.source.kcql to connect.rethink.kcql
  • Rename ReThink Sink to
  • Rename ReThink Sink connect.rethink.sink.port to connect.rethink.port
  • Rename ReThink Sink connect.rethink.sink.db to connect.rethink.db
  • Rename ReThink Sink connect.rethink.sink.kcql to connect.rethink.kcql
  • Rename JMS connect.jms.user to connect.jms.username
  • Rename JMS connect.jms.converters.source to connect.jms.converters
  • Remove JMS connect.jms.converters and replace my kcql withConverters
  • Remove JMS connect.jms.queues and replace my kcql withType=QUEUE
  • Remove JMS connect.jms.topics and replace my kcql withType=TOPIC
  • Rename Mqtt connect.mqtt.source.kcql to connect.mqtt.kcql
  • Rename Mqtt connect.mqtt.user to connect.mqtt.username
  • Rename Mqtt connect.mqtt.hosts to connect.mqtt.connection.hosts
  • Remove Mqtt connect.mqtt.converters and replace my kcql withConverters
  • Remove Mqtt connect.mqtt.queues and replace my kcql withType=QUEUE
  • Remove Mqtt connect.mqtt.topics and replace my kcql withType=TOPIC
  • Rename Hazelcast connect.hazelcast.sink.kcql to connect.hazelcast.kcql
  • Rename Hazelcast to
  • Rename Hazelcast to
  • Rename Hazelcast connect.hazelcast.sink.cluster.members tp connect.hazelcast.cluster.members
  • Rename Hazelcast connect.hazelcast.sink.batch.size to connect.hazelcast.batch.size
  • Rename Hazelcast connect.hazelcast.sink.error.policy to connect.hazelcast.error.policy
  • Rename Hazelcast connect.hazelcast.sink.max.retries to connect.hazelcast.max.retries
  • Rename Hazelcast connect.hazelcast.sink.retry.interval to connect.hazelcast.retry.interval
  • Rename VoltDB connect.volt.sink.kcql to connect.volt.kcql
  • Rename VoltDB connect.volt.sink.connection.servers to connect.volt.servers
  • Rename VoltDB connect.volt.sink.connection.user to connect.volt.username
  • Rename VoltDB connect.volt.sink.connection.password to connect.volt.password
  • Rename VoltDB connect.volt.sink.error.policy to connect.volt.error.policy
  • Rename VoltDB connect.volt.sink.max.retries to connect.volt.max.retries
  • Rename VoltDB connect.volt.sink.retry.interval to connect.volt.retry.interval


  • Adding Azure DocumentDb Sink
  • Adding UPSERT to Elastic Search
  • Cassandra improvements withunwrap
  • Upgrade to Kudu 1.0 and CLI 1.0
  • Add ingest_time to CoAP Source
  • Support Confluent 3.2 and Kafka 0.10.2.
  • Added Azure DocumentDB.
  • Added JMS Source.
  • Added Schemaless Json and Json with schema support to JMS Sink.
  • InfluxDB bug fixes for tags and field selection.
  • Support for Cassandra data type of timestamp in the Cassandra Source for timestamp tracking.

0.2.4 (26 Jan 2017)

  • Added FTP and HTTP Source.
  • Added InfluxDB tag support. KCQL: INSERT INTO target dimension SELECT * FROM influx-topic WITHTIMESTAMP sys_time() WITHTAG(field1, CONSTANT_KEY1=CONSTANT_VALUE1, field2,CONSTANT_KEY2=CONSTANT_VALUE1)
  • Added InfluxDb consistency level. Default is ALL. Use connect.influx.consistency.level to set it to ONE/QUORUM/ALL/ANY.
  • InfluxDb connect.influx.sink.route.query was renamed to connect.influx.sink.kcql.
  • Added support for multiple contact points in Cassandra.

0.2.3 (5 Jan 2017)

  • Added CoAP Source and Sink.
  • Added MongoDB Sink.
  • Added MQTT Source.
  • Hazelcast support for ring buffers, maps, sets, lists and cache.
  • Redis support for Sorted Sets.
  • Added start scripts.
  • Added Kafka Connect and Schema Registry CLI.
  • Kafka Connect CLI now supports pause/restart/resume; checking connectors on the classpath and validating configuration of connectors.
  • Support for Struct, Schema.STRING and Json with schema in the Cassandra, ReThinkDB, InfluxDB and MongoDB sinks.
  • Rename export.query.route to sink.kcql.
  • Rename import.query.route to source.kcql.
  • Upgrade to KCQL 0.9.5 - Add support for STOREAS so specify target sink types, e.g. Redis Sorted Sets, Hazelcast map, queues, ringbuffers.