Pulsar Source¶
Download connector Pulsar Connector for Kafka 2.1.0
A Kafka Connector to write events from Apache Pulsar to Apache Kafka.
Prerequisites¶
- Apache Kafka 0.11.x or above
- Kafka Connect 0.11.x or above
- Apache Pulsar
- Java 1.8
Features¶
- The KCQL routing querying - Pulsar and Kafka topic selection is supported
- Message converters.
KCQL Support¶
INSERT INTO kafka_topic SELECT { FIELDS, ... } FROM pulsar_topic
Tip
You can specify multiple KCQL statements separated by ;
to have a the connector sink multiple topics
The Apache Pulsar source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:
- Pulsar topic selection
- Target Kafka topic selection
- Message converter selection.
Example:
-- Insert mode, select all fields and write to kafka-topicA with message converter myclass
INSERT INTO kafka-topicA SELECT * FROM persistent://landoop/standalone/connect/kafka-topic [WITHCONVERTER=`myclass`]
The Apache Pulsar source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:
- Field selection
- Selection of target table
- Time to live on inserts.
Converters¶
We provide four converters out of the box but you can plug your own. See an example here. which
and be set in connect.pulsar.kcql
statement. The WITHCONVERTER
keyword supports this option.
AvroConverter
com.datamountaineer.streamreactor.connect.converters.source.AvroConverter
The payload is an AVRO message. In this case you need to provide a path for the AVRO schema file to be able to decode it.
JsonSimpleConverter
com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
The payload is a Json message. This converter will parse the JSON and create an AVRO record for it which will be sent over to Kafka.
JsonConverterWithSchemaEvolution
An experimental converter for converting JSON messages to AVRO. The resulting AVRO schema is fully compatible as new fields are added as the JSON payload evolves.
BytesConverter
com.datamountaineer.streamreactor.connect.converters.source.BytesConverter
This is the default implementation. The payload is taken as is: an array of bytes and sent over Kafka as an AVRO
record with Schema.BYTES
. You don’t have to provide a mapping for the source to get this converter!!
Apache Pulsar Setup¶
The documention for Pulasr is available here.
Download and extract the binary release:
wget http://www.apache.org/dist/incubator/pulsar/pulsar-1.21.0-incubating/apache-pulsar-1.21.0-incubating-bin.tar.gz
tar xvfz apache-pulsar-1.21.0-incubating-bin.tar.gz
cd apache-pulsar-1.21.0-incubating
Apache Pulsar requires Zookeeper, if you have Docker we recommend running Pulsar in a container.
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
-v $PWD/data:/pulsar/data \
apachepulsar/pulsar:1.21.0-incubating \
bin/pulsar standalone --advertised-address 127.0.0.1
If you do not have Docker you can still run Pulsar locally and reuse the Zookeeper instances from you Kafka cluster.
Pulsar uses Apache BookKeeper for persistence which stores Ledger details under /ledgers
can is controlled via zkLedgersRootPath
in the bookies
config file. Using this approach you will may see Zookeeper warnings in the Pulsar logs.
# start
bin/pulsar standalone
Warning
We recommend separate Zookeeper quorums for Kafka and Pulsar and do not advise you try this on Production!
If you wish to use a separate Zookeeper instance outside of Docker you will need to update the configuration files
of Apache Pulsar in conf
to start Zookeeper on a different port, .i.e. 2182 please consult the
Pulsar documentation.
Installing the Connector¶
Connect, in production should be run in distributed mode
- Install and configure a Kafka Connect cluster
- Create a folder on each server called
plugins/lib
- Copy into the above folder the required connector jars from the stream reactor download
- Edit
connect-avro-distributed.properties
in theetc/schema-registry
folder and uncomment theplugin.path
option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2. - Start Connect,
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
Connect Workers are long running processes so set an init.d
or systemctl
service accordingly.
Source Connector QuickStart¶
Start Kafka Connect in distributed mode (see install).
In this mode a Rest Endpoint on port 8083
is exposed to accept connector configurations.
We developed Command Line Interface to make interacting with the Connect Rest API easier. The CLI can be found in the Stream Reactor download under
the bin
folder. Alternatively the Jar can be pulled from our GitHub
releases page.
Starting the Connector¶
Download, and install Stream Reactor. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed the Stream Reactor.
Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for MQTT. For the CLI to work including when using the dockers you will have to set the following environment variable to point the Kafka Connect Rest API.
export KAFKA_CONNECT_REST="http://myserver:myport"
➜ bin/connect-cli create pulsar-source < conf/pulsar-source.properties
name=pulsar-sink
connector.class=com.datamountaineer.streamreactor.connect.pulsar.source.PulsarSourceConnector
tasks.max=1
connect.pulsar.kcql=INSERT INTO pulsar-source-topic SELECT * FROM persistent://landoop/standalone/connect/kafka-topic
connect.pulsar.hosts=pulsar://localhost:6650
connect.progress.enabled=true
This connector will use the default BytesConverter as we did not specify any WITHCONVERTER
syntax in the KCQL statement.
We can use the CLI to check if the connector is up but you should be able to see this in logs as well.
#check for running connectors with the CLI
➜ bin/connect-cli ps
pulsar-source
In the logs of Connect you should see this:
INFO
__ __
/ / ____ _____ ____/ /___ ____ ____
/ / / __ `/ __ \/ __ / __ \/ __ \/ __ \
/ /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
/_/
____ __ _____
/ __ \__ __/ /________ ______ / ___/____ __ _______________
/ /_/ / / / / / ___/ __ `/ ___/ \__ \/ __ \/ / / / ___/ ___/ _ \
/ ____/ /_/ / (__ ) /_/ / / ___/ / /_/ / /_/ / / / /__/ __/
/_/ \__,_/_/____/\__,_/_/ /____/\____/\__,_/_/ \___/\___/
(com.datamountaineer.streamreactor.connect.pulsar.source.PulsarSourceTask:41)
Insert Test Records to Pulsar¶
We can insert records to Pulsar using the Pulsar CLI’s:
bin/pulsar-client produce \
persistent://landoop/standalone/connect/kafka-topic \
--messages 'hello-pulsar'
The source connector should pick this us and in the logs you should see:
INFO Delivered 1 records for pulsar-source-topic since
Now if we check the target Kafka topic:
|⇒ bin/kafka-console-consumer --topic pulsar-source-topic --from-beginning --bootstrap-server localhost:9092
*hello-pulsar
Configurations¶
Config | Description | Type | Value |
---|---|---|---|
name |
Name of the connector | string | This must be unique across the Connect cluster |
tasks.max |
The number of tasks to scale output | int | 1 |
connector.class |
Name of the connector class | string | com.datamountaineer.streamreactor.connect.pulsar.source.PulsarSourceConnector |
Connector Configurations¶
Config | Description | Type |
---|---|---|
connect.pulsar.kcql |
Contains the Kafka Connect Query Language
describing the flow from Apache Kafka to Apache Pulsar topics
|
string |
connect.pulsar.hosts |
Contains the Pulsar connection end points | string |
Optional Configurations¶
Config | Description | Type | Default |
---|---|---|---|
connect.pulsar.polling.timeout |
Provides the timeout to poll
incoming messages
|
int | 1000 |
connect.pulsar.converter.throw.on.error |
If set to false the conversion
exception will be swallowed
and everything carries on
BUT the message is lost!
|
boolean | false |
connect.converter.avro.schemas |
If the AvroConverter is used you
need to provide an avro Schema
to be able to read and translate
the raw bytes to an avro record.
The format is $PULSAR_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
|
string |
Kubernetes¶
Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.
Add the Helm charts to your Helm instance:
helm repo add landoop https://lensesio.github.io/kafka-helm-charts/