Pulsar Source

Download connector Pulsar Connector for Kafka 2.1.0

A Kafka Connector to write events from Apache Pulsar to Apache Kafka.

Prerequisites

  • Apache Kafka 0.11.x or above
  • Kafka Connect 0.11.x or above
  • Apache Pulsar
  • Java 1.8

Features

  1. The KCQL routing querying - Pulsar and Kafka topic selection is supported
  2. Message converters.

KCQL Support

INSERT INTO kafka_topic SELECT { FIELDS, ... } FROM pulsar_topic

Tip

You can specify multiple KCQL statements separated by ; to have a the connector sink multiple topics

The Apache Pulsar source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:

  1. Pulsar topic selection
  2. Target Kafka topic selection
  3. Message converter selection.

Example:

-- Insert mode, select all fields and write to kafka-topicA with message converter myclass
INSERT INTO kafka-topicA SELECT * FROM persistent://landoop/standalone/connect/kafka-topic [WITHCONVERTER=`myclass`]

The Apache Pulsar source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:

  1. Field selection
  2. Selection of target table
  3. Time to live on inserts.

Converters

We provide four converters out of the box but you can plug your own. See an example here. which and be set in connect.pulsar.kcql statement. The WITHCONVERTER keyword supports this option.

AvroConverter

com.datamountaineer.streamreactor.connect.converters.source.AvroConverter

The payload is an AVRO message. In this case you need to provide a path for the AVRO schema file to be able to decode it.

JsonSimpleConverter

com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter

The payload is a Json message. This converter will parse the JSON and create an AVRO record for it which will be sent over to Kafka.

JsonConverterWithSchemaEvolution

An experimental converter for converting JSON messages to AVRO. The resulting AVRO schema is fully compatible as new fields are added as the JSON payload evolves.

BytesConverter

com.datamountaineer.streamreactor.connect.converters.source.BytesConverter

This is the default implementation. The payload is taken as is: an array of bytes and sent over Kafka as an AVRO record with Schema.BYTES. You don’t have to provide a mapping for the source to get this converter!!

Apache Pulsar Setup

The documention for Pulasr is available here.

Download and extract the binary release:

wget http://www.apache.org/dist/incubator/pulsar/pulsar-1.21.0-incubating/apache-pulsar-1.21.0-incubating-bin.tar.gz
tar xvfz apache-pulsar-1.21.0-incubating-bin.tar.gz
cd apache-pulsar-1.21.0-incubating

Apache Pulsar requires Zookeeper, if you have Docker we recommend running Pulsar in a container.

docker run -it \
-p 6650:6650 \
-p 8080:8080 \
-v $PWD/data:/pulsar/data \
apachepulsar/pulsar:1.21.0-incubating \
bin/pulsar standalone --advertised-address 127.0.0.1

If you do not have Docker you can still run Pulsar locally and reuse the Zookeeper instances from you Kafka cluster. Pulsar uses Apache BookKeeper for persistence which stores Ledger details under /ledgers can is controlled via zkLedgersRootPath in the bookies config file. Using this approach you will may see Zookeeper warnings in the Pulsar logs.

# start
bin/pulsar standalone

Warning

We recommend separate Zookeeper quorums for Kafka and Pulsar and do not advise you try this on Production!

If you wish to use a separate Zookeeper instance outside of Docker you will need to update the configuration files of Apache Pulsar in conf to start Zookeeper on a different port, .i.e. 2182 please consult the Pulsar documentation.

Installing the Connector

Connect, in production should be run in distributed mode

  1. Install and configure a Kafka Connect cluster
  2. Create a folder on each server called plugins/lib
  3. Copy into the above folder the required connector jars from the stream reactor download
  4. Edit connect-avro-distributed.properties in the etc/schema-registry folder and uncomment the plugin.path option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2.
  5. Start Connect, bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

Connect Workers are long running processes so set an init.d or systemctl service accordingly.

Source Connector QuickStart

Start Kafka Connect in distributed mode (see install). In this mode a Rest Endpoint on port 8083 is exposed to accept connector configurations. We developed Command Line Interface to make interacting with the Connect Rest API easier. The CLI can be found in the Stream Reactor download under the bin folder. Alternatively the Jar can be pulled from our GitHub releases page.

Starting the Connector

Download, and install Stream Reactor. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed the Stream Reactor.

Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for MQTT. For the CLI to work including when using the dockers you will have to set the following environment variable to point the Kafka Connect Rest API.

export KAFKA_CONNECT_REST="http://myserver:myport"
➜  bin/connect-cli create pulsar-source < conf/pulsar-source.properties

name=pulsar-sink
connector.class=com.datamountaineer.streamreactor.connect.pulsar.source.PulsarSourceConnector
tasks.max=1
connect.pulsar.kcql=INSERT INTO pulsar-source-topic SELECT * FROM persistent://landoop/standalone/connect/kafka-topic
connect.pulsar.hosts=pulsar://localhost:6650
connect.progress.enabled=true

This connector will use the default BytesConverter as we did not specify any WITHCONVERTER syntax in the KCQL statement. We can use the CLI to check if the connector is up but you should be able to see this in logs as well.

#check for running connectors with the CLI
➜ bin/connect-cli ps
pulsar-source

In the logs of Connect you should see this:

   INFO
    __                    __
   / /   ____ _____  ____/ /___  ____  ____
  / /   / __ `/ __ \/ __  / __ \/ __ \/ __ \
 / /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
                                    /_/
       ____        __                  _____
      / __ \__  __/ /________ ______ / ___/____  __  _______________
     / /_/ / / / / / ___/ __ `/ ___/ \__ \/ __ \/ / / / ___/ ___/ _ \
    / ____/ /_/ / (__  ) /_/ / /    ___/ / /_/ / /_/ / /  / /__/  __/
   /_/    \__,_/_/____/\__,_/_/    /____/\____/\__,_/_/   \___/\___/

   (com.datamountaineer.streamreactor.connect.pulsar.source.PulsarSourceTask:41)

Insert Test Records to Pulsar

We can insert records to Pulsar using the Pulsar CLI’s:

bin/pulsar-client produce \
    persistent://landoop/standalone/connect/kafka-topic \
    --messages 'hello-pulsar'

The source connector should pick this us and in the logs you should see:

INFO Delivered 1 records for pulsar-source-topic since

Now if we check the target Kafka topic:

|⇒ bin/kafka-console-consumer --topic pulsar-source-topic --from-beginning --bootstrap-server localhost:9092

*hello-pulsar

Configurations

Config Description Type Value
name Name of the connector string This must be unique across the Connect cluster
tasks.max The number of tasks to scale output int 1
connector.class Name of the connector class string com.datamountaineer.streamreactor.connect.pulsar.source.PulsarSourceConnector

Connector Configurations

Config Description Type
connect.pulsar.kcql
Contains the Kafka Connect Query Language
describing the flow from Apache Kafka to Apache Pulsar topics
string
connect.pulsar.hosts Contains the Pulsar connection end points string

Optional Configurations

Config Description Type Default
connect.pulsar.polling.timeout
Provides the timeout to poll
incoming messages
int 1000
connect.pulsar.converter.throw.on.error
If set to false the conversion
exception will be swallowed
and everything carries on
BUT the message is lost!
boolean false
connect.converter.avro.schemas
If the AvroConverter is used you
need to provide an avro Schema
to be able to read and translate
the raw bytes to an avro record.
The format is $PULSAR_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
string  

Kubernetes

Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.

Add the Helm charts to your Helm instance:

helm repo add landoop https://lensesio.github.io/kafka-helm-charts/