Kudu


Kafka Connect sink connector for writing data from Kafka to Kudu.

KCQL support 

The following KCQL is supported:

INSERT | UPDATE
INTO <kudu-table>
SELECT FIELDS, ...
FROM <kafka-topic>
[AUTOCREATE]
[DISTRIBUTE BY FIELD, ... INTO NBR_OF_BUCKETS BUCKETS]
[AUTOEVOLVE]

Examples:

-- Insert mode, select all fields from topicA and write to tableA
INSERT INTO tableA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB and write to tableB
INSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB

-- Upsert mode, select all fields from topicC,
-- auto create tableC and auto evolve, use field1 as the primary key
UPSERT INTO tableC
SELECT * FROM topicC AUTOCREATE DISTRIBUTEBY field1 INTO 10 BUCKETS AUTOEVOLVE

Concepts 

Kudu at present does not have a concept of databases like Impala. If you want to insert into Impala you need to set the target table accordingly:

impala:database.impala_table

Insert Mode 

Insert is the default write mode of the sink.

Kafka currently can provide exactly once delivery semantics, however to ensure no errors are produced if unique constraints have been implemented on the target tables, the sink can run in UPSERT mode. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.

Upsert Mode 

The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

Auto Create 

If you set AUTOCREATE, the sink will use the schema attached to the topic to create a table in Kudu. The primary keys are set by the PK keyword. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.

Kudu has a number of partition strategies. The sink only supports the HASH partition strategy, you can control which fields are used to hash and the number of buckets which should be created. This behavior is controlled via the DISTRIBUTEBY clause.

DISTRIBUTE BY id INTO 10 BUCKETS

CREATE EXTERNAL TABLE my_mapping_table
STORED AS KUDU
TBLPROPERTIES (
'kudu.table_name' = 'my_kudu_table'
);

Auto Evolution 

The Connector supports auto evolution of tables for each topic. When set the connector will identify new schemas for each topic based on the schema version from the Schema registry. New columns will be identified and an alter table DDL statement issued against Kudu. All new columns are set as nullable

Schema evolution can occur upstream, for example new fields added or change in data type in the schema of the topic.

Fields cannot be deleted upstream. Fields should be of AVRO union type [null, ] with a default set. This allows the Connector to either retrieve the default value or null. The Connector is not aware that the field has been deleted as a value is always supplied to it.

If a upstream field is removed and the topic is not following the Schema Registry’s evolution rules, i.e. not full or backwards compatible, any errors will default to the error policy.

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=kudu
docker-compose up -d kudu

Prepare the target system 

Login into the Kudu container and start the impala-shell:

docker exec -ti kudu impala-shell

CREATE TABLE orders (
    id INT
    , created VARCHAR
    , product VARCHAR
    , qty INT
    , price DOUBLE
    , PRIMARY KEY (id)
    )
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES (
    'kudu.num_tablet_replicas' = '1'
);

SELECT * FROM orders;

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Kudu as the sink and paste the following:

name=kudu
connector.class=com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector
tasks.max=1
topics=orders
connect.kudu.master=kudu
connect.kudu.kcql=INSERT INTO impala::default.orders SELECT * FROM orders

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create kudu < connector.properties

connect-cli create kudu < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status kudu

Inserting test data 

In the lenses-box container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{
  "id": 1,
  "created": "2016-05-06 13:53:00",
  "product": "OP-DAX-P-20150201-95.7",
  "price": 94.2,
  "qty": 100
}

Check for data in Kudu 

In the Kudu container:

SELECT * FROM orders;

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.kudu.masterKudu master address, comma-separated list.stringlocalhost
connect.kudu.kcqlconnect.kudu.kcqlstring
connect.kudu.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.kudu.retry.intervalThe time in milliseconds between retries.int60000
connect.kudu.max.retriesThe maximum number of times to try the write again.int20
connect.kudu.schema.registry.urlUrl for the schema registrystringhttp://localhost:8081
connect.kudu.write.flush.modeSpecify kudu write mode: SYNC - flush each sink record. Batching is disabled. BATCH_BACKGROUND - flush batch of sink records in background thread. BATCH_SYNC - flush batch of sink records.stringSYNC
connect.kudu.mutation.buffer.spaceKudu Session mutation buffer spaceint1000
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
--
Last modified: September 15, 2024