Sink ConnectorΒΆ

A Connector application (or Data Reading Service) is a service which reads data from at least one topic in Apache Kafka and writes to a target system

In this page you will learn how to create a Connector that saves data to a plain text file.

A Connector (Sink) is a an application for reading data from Kafka, which underneath creates and uses a Kafka consumer client code. This page will use a File Sink Connector to get the desired data and save it to an external file.

The output will contain all the lines of the log file that are longer than 1000 characters, including spaces.

The final implementation of the Sink Connector will be as follows:

connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
errors.retry.timeout=0
errors.log.include.messages=false
tasks.max=1
topics=topic_to_be_exported
errors.retry.delay.max.ms=60000
errors.deadletterqueue.context.headers.enable=false
file=/tmp/long_entries.log
name=sink-file
errors.tolerance=none
errors.deadletterqueue.topic.replication.factor=3
value.converter=org.apache.kafka.connect.storage.StringConverter
config.action.reload=RESTART
errors.log.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter

Many things are happening here. First, we define that the output will be written to the value of file, which is this case is /tmp/test_converter_string. Second, we define that the key and the value of the input from Kafka are both strings, which means that you will need a string converter for both of them, which in this case is org.apache.kafka.connect.storage.StringConverter.

You can find more information about the use of org.apache.kafka.connect.storage.StringConverter here.

Last, we need to define the Kafka topic that will be used as input using topics. Please notice that topics can accept multiple values.

Is goes without saying that the selected Kafka topic or topics must already exist. For this particular example the selected Kafka topic was generated using the following Lenses SQL query:

SET autocreate=true;

INSERT into topic_to_be_exported
SELECT the_text AS my_message
FROM log_files_as_json
WHERE LENGTH(the_text) > 1000;

The aforementioned query filters the message text based on its length. You can modify it to fit your needs.