Coap

A Kafka Connect Coap source connector and to stream messages from a CoAP server and write them to a Kafka topic.

KCQL support 

The following KCQL is supported:

INSERT INTO <your-topic> 
SELECT * 
FROM <coap-resource>

Examples:

INSERT INTO topicA SELECT * FROM resourceA

Selection of fields from the Coap message is not supported.

Concepts 

The Source Connector automatically converts the CoAP response into a Kafka Connect Struct to be store in Kafka as AVRO or JSON dependent on the Converters used in Connect. The schema is fixed can found Data Types.

The key of the Struct message sent to Kafka is made from the source defined in the message, the resource on the CoAP server and the message id.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=coap
docker-compose up -d coap

Inserting test data 

The coap docker starts a CoaP server with an observable resource.

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Coap as the source and paste the following:

name=coap
connector.class=com.datamountaineer.streamreactor.connect.coap.source.CoapSourceConnector
tasks.max=1
connect.coap.uri=coap://coap
connect.coap.kcql= INSERT INTO coap SELECT * FROM obs-pumping

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create coap < connector.properties

Wait a for the connector to start and check its running:

connect-cli status coap

Check for records in Kafka 

Check the records in Lenses or with via the console:

kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic coap \
    --from-beginning

Data type conversion 

The schema is fixed.

The following schema is used for the key:

NameType
sourceOptional string
source_resourceOptional String
message_idOptional int32

The following schema is used for the payload:


NameType
message_idOptional int32
typeOptional String
codeOptional String
raw_codeOptional int32
rttOptional int64
is_lastOptional boolean
is_notificationOptional boolean
sourceOptional String
destinationOptional String
timestampOptional int64
tokenOptional String
is_duplicateOptional boolean
is_confirmableOptional boolean
is_rejectedOptional boolean
is_acknowledgedOptional boolean
is_canceledOptional boolean
acceptOptional int32
block1Optional String
block2Optional String
content_formatOptional int32
etagsArray of Optional Strings
location_pathOptional String
location_queryOptional String
max_ageOptional int64
observeOptional int32
proxy_uriOptional String
size_1Optional String
size_2Optional String
uri_hostOptional String
uri_portOptional int32
uri_pathOptional String
uri_queryOptional String
payloadOptional String

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.coap.uriThe COAP server to connect to.stringlocalhost
connect.coap.truststore.pathThe path to the truststore.string
connect.coap.truststore.passThe password of the trust store.passwordrootPass
connect.coap.certsThe password of the trust store.list[]
connect.coap.keystore.pathThe path to the truststore.string
connect.coap.keystore.passThe password of the key store.passwordrootPass
connect.coap.cert.chain.keyThe key to use to get the certificate chain.stringclient
connect.coap.portThe port the DTLS connector will bind to on the Connector host.int0
connect.coap.hostThe hostname the DTLS connector will bind to on the Connector host.stringlocalhost
connect.coap.usernameCoAP PSK identity.string
connect.coap.passwordCoAP PSK secret.password
connect.coap.private.key.filePath to the private key file for use in with PSK credentials in PKCS8 rather than PKCS1 Use open SSL to convert. openssl pkcs8 -in privatekey.pem -topk8 -nocrypt -out privatekey-pkcs8.pem Only cipher suites TLS_PSK_WITH_AES_128_CCM_8 and TLS_PSK_WITH_AES_128_CBC_SHA256 are currently supported.string
connect.coap.public.key.filePath to the public key file for use in with PSK credentialsstring
connect.coap.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.stringTHROW
connect.coap.max.retriesThe maximum number of times to try the write again.int20
connect.coap.retry.intervalThe time in milliseconds between retries.int60000
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.coap.batch.sizeThe number of events to take from the internal queue to batch together to send to Kafka. The records willbe flushed if the linger period has expired first.int100
connect.source.linger.msThe number of milliseconds to wait before flushing the received messages to Kafka. The records willbe flushed if the batch size is reached before the linger period has expired.int5000
connect.coap.kcqlThe KCQL statement to select and route resources to topics.string