View the latest documentation 5.5
Kafka Connect sink connector for writing data from Kafka to Hive.
Hive connector version available are:
The following KCQL is supported:
INSERT INTO <hive_table> SELECT FIELD, ... FROM kafka_topic_name [STOREDAS PARQUET|ORC] [WITH_FLUSH_INTERVAL=interval in milliseconds] [WITH_FLUSH_SIZE=integer] [WITH_FLUZE] [WITH_SCHEMA_EVOLUTION=ADD|MATCH] [WITH_OVERWRITE] [PARTITIONBY col1, col2] [WITH_PARTITIONING = DYNAMIC|STRICT] [AUTOCREATE] [WITH_TABLE_LOCATION="my/path"]
Examples:
INSERT INTO hive_tableA SELECT * FROM kafka_topicA WITH_FLUSH_INTERVAL = 10 INSERT INTO hive_tableA SELECT col1,col2 FROM kafka_topicA WITH_SCHEMA_EVOLUTION = ADD INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA WITH_TABLE_LOCATION = "/magic/location/on/my/ssd" INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA WITH_OVERWRITE INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA PARTITIONBY col1, col2 INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA WITH_PARTITIONING = DYNAMIC INSERT INTO hive_tableA SELECT f1 as col1, f2 as col2 FROM kafka_topicA AUTOCREATE
The Connector writes to HDFS via HIVE. The Hive metastore is used a metadata reference lookup.
The connector can autocreate tables in HIVE is the AUTOCREATE clause is set.
The Connector support writing Parquet and ORC files, controlled by the STORED AS clause
Records are flushed to HDFS based on three options:
The first threshold to be reached will trigger flushing and committing of the files.
HIVE tables and the underlying HDFS files can be partitioned by providing the fields names in the Kafka topic to partition by in the PARTITIONBY clause.
The partitions can be dynamically created by the connector using the WITH_PARTITIONING = DYNAMIC clause. If STRICT partitioning is set the partitions must be created beforehand in HIVE and HDFS.
The HIVE table location can be set using the WITH_TABLE_LOCATION.
To overwrite records in HIVE table use the WITH_OVERWRITE clause.
This sink supports the following Kafka payloads:
See connect payloads for more information.
For those Hive clusters using Kerberos for authentication, the connector supports two modes. Controlling the modes happens via connect.hive.security.kerberos.auth.mode configuration. The supported values are:
The connect.hive.security.kerberos.ticket.renew.ms configuration controls the interval (in milliseconds) to renew a previously obtained (during the login step) Kerberos token.
When this mode is configured, these extra configurations need to be set:
connect.hive.security.principal= .... connect.hive.security.keytab = /path/to/the/keytab connect.hive.security.namenode.principal=....
The keytab file needs to be available on the same path on all the Connect cluster workers. In case the file is missing an error will be raised. You can find the details about the configurations in the Optional Configurations section.
For those setups where a keytab is not available, the Kerberos authentication can be handled via user and password approach. In this case, the following configurations are required by the sink:
connect.hive.security.kerberos.user = jsmith connect.hive.security.kerberos.password=password123 connect.hive.security.kerberos.krb5=/path/to/the/krb5 connect.hive.security.kerberos.jaas=/path/to/the/jaas
The connector supports Error polices.
export CONNECTOR=hive docker-compose up -d hive
Create the target database:
docker exec -ti hive hive -e "CREATE DATABASE lenses"
If you are using Lenses, login into Lenses and navigate to the connectors page, select Hive as the sink and paste the following:
name=hive connector.class=com.landoop.streamreactor.connect.hive.sink.HiveSinkConnector tasks.max=1 topics=orders connect.hive.kcql=INSERT INTO orders SELECT * FROM orders AUTOCREATE PARTITION_BY state WITH_FLUSH_INTERVAL = 10 connect.hive.database.name=lenses connect.hive.metastore=thrift connect.hive.metastore.uris=thrift://hive-metastore:9083 connect.hive.fs.defaultFS=hdfs://namenode:8020
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create hive < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status hive
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
docker exec -ti hive hive -S -e "SELECT * FROM lenses.orders"
Bring down the stack:
docker-compose down
On this page