You are viewing documentation for an older version of Lenses.io View latest documentation here


Kafka Connect sink connector for writing data from Kafka to Pulsar.

KCQL support 

The following KCQL is supported:

INTO <pulsar-topic>
FROM <kafka-topic>


# Select all fields
INSERT INTO persistent://lenses/standalone/connect/kafka-topic SELECT * FROM kafka_topic
# Select individual fields
INSERT INTO persistent://lenses/standalone/connect/kafka-topic SELECT id, product_name FROM kafka_topic


The connector writes JSON messages to Pulsar topics.

Keyed Messages 

The connector key messages in Pulsar defined by value in the Kafka message using the WITHKEY clause.


The connector supports compression on Pulsar messages using the WITHCOMPRESSION clause. The available values are:

  • ZLIB
  • LZ4


The BATCH clause controls the batching of writes to Pulsar.


The connector supports partitioning in Puslar using the WITHPARTITIONER clause. The available values are:


Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Error polices 

The connector supports Error polices .


Launch the stack 

  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=pulsar
docker-compose up -d pulsar

Watch for message in Pulsar 

In the Pulsar contain start the consumer and wait for messages to arrive:

docker exec \
    -ti pulsar \
    bin/pulsar-client \
    consume \
    persistent://lenses/standalone/connect/orders \
    --subscription-name lenses

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page , select Pulsar as the sink and paste the following:

connect.pulsar.kcql=INSERT INTO persistent://lenses/standalone/connect/orders SELECT * FROM orders

To start the connector without using Lenses, log into the fastdatadev container:

docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli :

connect-cli create pulsar < connector.properties

connect-cli create pulsar < connector.properties

Wait a for the connector to start and check its running:

connect-cli status pulsar

Inserting test data 

In the to fastdata container start the kafka producer shell:

kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:

{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}

Check for data in Pulsar 

In your Pulsar container console the data will arrive.

Clean up 

Bring down the stack:

docker-compose down


NameDescriptionTypeDefault Value
connect.pulsar.hostsContains the Pulsar connection end points.string
connect.pulsar.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.pulsar.retry.intervalThe time in milliseconds between retries.int60000
connect.pulsar.max.retriesThe maximum number of times to try the write again.int20
connect.pulsar.kcqlContains the Kafka Connect Query Language describing the flow from Apache Pulsar to Apache Kafka topicsstring
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.pulsar.tls.ca.certProvides the path to the CA certificate file to use with the Pulsar connectionstring
connect.pulsar.tls.certProvides the path to the certificate file to use with the Pulsar connectionstring
connect.pulsar.tls.keyCertificate private [config] key file path.string