4.3
Pulsar
Kafka Connect sink connector for writing data from Kafka to Pulsar.
KCQL support
The following KCQL is supported:
INSERT
INTO <pulsar-topic>
SELECT FIELD, ...
FROM <kafka-topic>
[WITHKEY(FIELD, ...)]
[BATCH=100]
Examples:
# Select all fields
INSERT INTO persistent://lenses/standalone/connect/kafka-topic SELECT * FROM kafka_topic
# Select individual fields
INSERT INTO persistent://lenses/standalone/connect/kafka-topic SELECT id, product_name FROM kafka_topic
Concepts
The connector writes JSON messages to Pulsar topics.
Keyed Messages
The connector key messages in Pulsar defined by value in the Kafka message using the WITHKEY clause.
Compression
The connector supports compression on Pulsar messages using the WITHCOMPRESSION clause. The available values are:
- ZLIB
- LZ4
Batching
The BATCH clause controls the batching of writes to Pulsar.
Partitioning
The connector supports partitioning in Puslar using the WITHPARTITIONER clause. The available values are:
- SINGLEPARTITION
- ROUNDROBINPARTITION
- CUSTOMPARTITION
Kafka payload support
This sink supports the following Kafka payloads:
- Schema.Struct and Struct (Avro)
- Schema.Struct and JSON
- No Schema and JSON
See connect payloads for more information.
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=pulsar
docker-compose up -d pulsar
Watch for message in Pulsar
In the Pulsar contain start the consumer and wait for messages to arrive:
docker exec \
-ti pulsar \
bin/pulsar-client \
consume \
persistent://lenses/standalone/connect/orders \
--subscription-name lenses
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Pulsar as the sink and paste the following:
name=pulsar
connector.class=com.datamountaineer.streamreactor.connect.pulsar.sink.PulsarSinkConnector
tasks.max=1
topics=orders
connect.pulsar.kcql=INSERT INTO persistent://lenses/standalone/connect/orders SELECT * FROM orders
connect.pulsar.hosts=pulsar://pulsar:6650
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create pulsar < connector.properties
connect-cli create pulsar < connector.properties
Wait a for the connector to start and check its running:
connect-cli status pulsar
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
Check for data in Pulsar
In your Pulsar container console the data will arrive.
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.pulsar.hosts | Contains the Pulsar connection end points. | string | |
connect.pulsar.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.pulsar.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.pulsar.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.pulsar.kcql | Contains the Kafka Connect Query Language describing the flow from Apache Pulsar to Apache Kafka topics | string | |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.pulsar.tls.ca.cert | Provides the path to the CA certificate file to use with the Pulsar connection | string | |
connect.pulsar.tls.cert | Provides the path to the certificate file to use with the Pulsar connection | string | |
connect.pulsar.tls.key | Certificate private [config] key file path. | string |