Hazelcast

A Kafka Connect sink connector for writing records from Kafka to Hazelcast.

KCQL support 

The following KCQL is supported:

INSERT INTO <hazelcast-namespace> 
SELECT FIELD, ... 
FROM <your-kafka-topic> 
[PK FIELD, ...] 
WITHFORMAT JSON|AVRO 
STOREAS RELIABLE_TOPIC|RING_BUFFER|QUEUE|SET|LIST|IMAP|MULTI_MAP|ICACHE

Example:

-- store into a ring buffer
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS RING_BUFFER
-- store into a reliable topic
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS RELIABLE_TOPIC
-- store into a queue
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS QUEUE
-- store into a set
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS SET
-- store into a list
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS LIST
-- store into an i-map with field1 used as the map key
INSERT INTO lenses SELECT x,y,z FROM topicB PK field1 WITHFORMAT AVRO STOREAS IMAP
-- store into a multi-map with field1 used as the map key
INSERT INTO lenses SELECT x,y,z FROM topicB PK field1 WITHFORMAT AVRO STOREAS MULTI_MAP
-- store into an i-cache with field1 used as the cache key
INSERT INTO lenses SELECT x,y,z FROM topicB PK field1 WITHFORMAT AVRO STOREAS ICACHE

Concepts 

The connector takes the value from the Kafka Connect SinkRecords and inserts/updates an entry in HazelCast. The Sink supports writing to

  • reliable topics
  • ring buffers
  • queues
  • sets
  • lists
  • imap
  • multi-map
  • icache.

Primary Keys 

When inserting into MAP or MULTI_MAP we need a key, the PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.

With Format 

Hazelcast requires that data stored in collections and topics is serializable. The Sink offers two modes to store data.

AVRO In this mode the Sink converts the SinkRecords from Kafka to AVRO encoded byte arrays. JSON In this mode the Sink converts the SinkRecords from Kafka to JSON strings.

This behavior is controlled by the KCQL statement in the connect.hazelcast.kcql option. The default is JSON.

Stored As 

The Hazelcast Sink supports storing data in RingBuffers, ReliableTopics, Queues, Sets, Lists, IMaps, Multi-maps and ICaches. This behaviour is controlled by the KCQL statement in the connect.hazelcast.kcql option.

Parallel Writes 

By default each task in the Sink will write the records it receives sequentially, the Sink optionally supports parallel writes where an executorThreadPool is started and records are written in parallel. While this results in better performance we can’t guarantee the order of the writes.

To enable parallel writes set the connect.hazelcast.parallel.write configuration option to true

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=hazelcast
docker-compose up -d hazelcast

Next configure and start Hazelcast so that the Connector can join the cluster. Login to the Hazelcast container and create a file called hazelcast.xml with the following content:

docker exec -ti hazelcast /bin/bash 
apt-get update && apt-get install -y vim

<hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.hazelcast.com/schema/config
                               http://www.hazelcast.com/schema/config/hazelcast-config-3.8.xsd"
           xmlns="http://www.hazelcast.com/schema/config">
    <group>
        <name>dev</name>
        <password>dev-pass</password>
    </group>
</hazelcast>

Then start Hazelcast:

export JAVA_OPTS="-Dhazelcast.config=hazelcast.xml -Dgroup.name=dev -Dgroup.password=dev-pass"
./server.sh

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Hazelcast as the sink and paste the following:

name=hazelcast
connector.class=com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkConnector
tasks.max=1
topics=orders
connect.hazelcast.cluster.members=hazelcast
connect.hazelcast.group.name=dev
connect.hazelcast.group.password=dev-pass
connect.hazelcast.kcql=INSERT INTO orders SELECT * FROM orders WITHFORMAT JSON STOREAS QUEUE

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create hazelcast < connector.properties

Wait a for the connector to start and check its running:

connect-cli status hazelcast

Inserting test data 

In the to fastdata container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}

Check for data in Hazelcast 

Login to the hazelcast container:

 docker exec -ti hazelcast /bin/bash

Run start the console app:

 java -cp hazelcast-all-3.8.4.jar com.hazelcast.client.console.ClientConsoleApp

In the app, switch to the dev namespace and check the queue:

ns dev

q.iterator

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
ssl.keymanager.algorithmThe algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.stringSunX509
ssl.keystore.typeThe file format of the key store file. This is optional for client.stringJKS
ssl.keystore.locationThe location of the key store file. This is optional for client and can be used for two-way authentication for client.string
ssl.truststore.locationThe location of the trust store file.string
ssl.providerThe name of the security provider used for SSL connections. Default value is the default security provider of the JVM.string
ssl.secure.random.implementationThe SecureRandom PRNG implementation to use for SSL cryptography operations.string
ssl.key.passwordThe password of the private key in the key store file. This is optional for client.password
ssl.trustmanager.algorithmThe algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.stringPKIX
ssl.truststore.passwordThe password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.password
ssl.endpoint.identification.algorithmThe endpoint identification algorithm to validate server hostname using server certificate.stringhttps
ssl.enabled.protocolsThe list of protocols enabled for SSL connections.list[TLSv1.2, TLSv1.1, TLSv1]
ssl.protocolThe SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.stringTLS
ssl.cipher.suitesA list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.list
ssl.truststore.typeThe file format of the trust store file.stringJKS
ssl.keystore.passwordThe store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.password
connect.hazelcast.cluster.membersAddress List is the initial list of cluster addresses to which the client will connect. The client uses this list to find an alive node. Although it may be enough to give only one address of a node in the cluster (since all nodes communicate with each other), it is recommended that you give the addresses for all the nodes.list
connect.hazelcast.timeoutConnection timeout is the timeout value in milliseconds for nodes to accept client connection requests.long5000
connect.hazelcast.retriesNumber of times a client will retry the connection at startup.int2
connect.hazelcast.keep.aliveEnables/disables the SO_KEEPALIVE socket option. The default value is true.booleantrue
connect.hazelcast.tcp.no.delayEnables/disables the TCP_NODELAY socket option. The default value is true.booleantrue
connect.hazelcast.reuse.addressEnables/disables the SO_REUSEADDR socket option. The default value is true.booleantrue
connect.hazelcast.ssl.enabledEnables sslbooleanfalse
connect.hazelcast.linger.secondsEnables/disables SO_LINGER with the specified linger time in seconds. The default value is 3.int3
connect.hazelcast.buffer.sizeSets the SO_SNDBUF and SO_RCVBUF options to the specified value in KB for this Socket. The default value is 32.int32
connect.hazelcast.group.nameThe group name of the connector in the target Hazelcast cluster.string
connect.hazelcast.group.passwordThe password for the group name.passworddev-pass
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.hazelcast.kcqlconnect.hazelcast.kcqlstring
connect.hazelcast.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.hazelcast.retry.intervalThe time in milliseconds between retries.int60000
connect.hazelcast.max.retriesThe maximum number of times to try the write again.int20
connect.hazelcast.threadpool.sizeThe sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool.int0
connect.hazelcast.parallel.writeAll the sink to write in parallel the records received from Kafka on each poll.booleanfalse