Coap

A Kafka Connect sink connector for writing records from Kafka to a Coap Server.

KCQL support 

The following KCQL is supported:

INSERT
INTO <your-coap-resource>
SELECT FIELD, ...
FROM kafka_topic 

Examples:

-- Insert mode, select all fields from topicA and write to resourceA
INSERT INTO resourceA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB and write to resourceA
INSERT INTO resourceA SELECT x AS a, y,z FROM topicB

Concepts 

The connector writes message to Coap with the Content-Format set to application/json.

DTLS Secure connections 

The Connector uses the Californium Java API and for secure connections use the Scandium security module provided by Californium. Scandium (Sc) is an implementation of Datagram Transport Layer Security 1.2, also known as RFC 6347.

Please refer to the Californium for more information.

The connector supports:

  • SSL trust and key stores
  • Public/Private PEM keys and PSK client/identity
  • PSK Client Identity

The Sink will attempt secure connections in the following order if the URI schema of connect.coap.uri set to secure, i.e.coaps. If connect.coap.username is set PSK client identity authentication is used, if additional connect.coap.private.key.path Public/Private keys authentication will also be attempted. Otherwise SSL trust and key store.

openssl pkcs8 -in privatekey.pem -topk8 -nocrypt -out privatekey-pkcs8.pem

Only cipher suites TLS_PSK_WITH_AES_128_CCM_8 and TLS_PSK_WITH_AES_128_CBC_SHA256 are currently supported.

Loading specific certificates can be achieved by providing a comma separated list for the connect.coap.certs configuration option. The certificate chain can be set by the connect.coap.cert.chain.key configuration option.

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=coap
docker-compose up -d coap

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Coap as the sink and paste the following:

name=coap
connector.class=com.datamountaineer.streamreactor.connect.coap.sink.CoapSinkConnector
tasks.max=1
topics=orders
connect.coap.uri=coap://coap-server:5683
connect.coap.kcql=INSERT INTO create1 SELECT * FROM orders

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create coap < connector.properties

Wait a for the connector to start and check its running:

connect-cli status coap

Inserting test data 

In the to fastdata container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"string"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"string"}, {"name":"qty", "type":"string"}]}'

the console is now waiting for your input, enter the following:


{"id": "1", "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": "94.2", "qty":"100"}

Check for data in the Coap Server 


docker exec -ti coap coap get coap://coap-server:5683/create1

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.coap.uriThe COAP server to connect to.stringlocalhost
connect.coap.truststore.pathThe path to the truststore.string
connect.coap.truststore.passThe password of the trust store.passwordrootPass
connect.coap.certsThe password of the trust store.list[]
connect.coap.keystore.pathThe path to the truststore.string
connect.coap.keystore.passThe password of the key store.passwordrootPass
connect.coap.cert.chain.keyThe key to use to get the certificate chain.stringclient
connect.coap.portThe port the DTLS connector will bind to on the Connector host.int0
connect.coap.hostThe hostname the DTLS connector will bind to on the Connector host.stringlocalhost
connect.coap.usernameCoAP PSK identity.string
connect.coap.passwordCoAP PSK secret.password
connect.coap.private.key.filePath to the private key file for use in with PSK credentials in PKCS8 rather than PKCS1 Use open SSL to convert. openssl pkcs8 -in privatekey.pem -topk8 -nocrypt -out privatekey-pkcs8.pem Only cipher suites TLS_PSK_WITH_AES_128_CCM_8 and TLS_PSK_WITH_AES_128_CBC_SHA256 are currently supported.string
connect.coap.public.key.filePath to the public key file for use in with PSK credentialsstring
connect.coap.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.stringTHROW
connect.coap.max.retriesThe maximum number of times to try the write again.int20
connect.coap.retry.intervalThe time in milliseconds between retries.int60000
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.coap.batch.sizeThe number of events to take from the internal queue to batch together to send to Kafka. The records willbe flushed if the linger period has expired first.int100
connect.source.linger.msThe number of milliseconds to wait before flushing the received messages to Kafka. The records willbe flushed if the batch size is reached before the linger period has expired.int5000
connect.coap.kcqlThe KCQL statement to select and route resources to topics.string