4.3
Hive
Kafka Connect sink connector for writing data from Kafka to Hive.
Two versions of the Hive connector are available:
- Hive (Hive 2.1+)
- Hive 1.1 (Hive 1.1)
KCQL support
The following KCQL is supported:
INSERT INTO <hive_table>
SELECT FIELD, ...
FROM kafka_topic_name
[STOREDAS PARQUET|ORC]
[WITH_FLUSH_INTERVAL=interval in milliseconds]
[WITH_FLUSH_SIZE=integer]
[WITH_FLUZE]
[WITH_SCHEMA_EVOLUTION=ADD|MATCH]
[WITH_OVERWRITE]
[PARTITIONBY col1, col2]
[WITH_PARTITIONING = DYNAMIC|STRICT]
[AUTOCREATE]
[WITH_TABLE_LOCATION="my/path"]
Examples:
INSERT INTO hive_tableA SELECT * FROM kafka_topicA WITH_FLUSH_INTERVAL = 10
INSERT INTO hive_tableA SELECT col1,col2 FROM kafka_topicA WITH_SCHEMA_EVOLUTION = ADD
INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA WITH_TABLE_LOCATION = "/magic/location/on/my/ssd"
INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA WITH_OVERWRITE
INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA PARTITIONBY col1, col2
INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA WITH_PARTITIONING = DYNAMIC
INSERT INTO hive_tableA SELECT f1 as col1, f2 as col2 FROM kafka_topicA AUTOCREATE
Concepts
The Connector writes to HDFS via HIVE. The Hive metastore is used a metadata reference lookup.
Auto create tables
The connector can autocreate tables in HIVE is the AUTOCREATE clause is set.
Stored As
The Connector support writing Parquet and ORC files, controlled by the STORED AS clause
Controlling commits to HDFS
Records are flushed to HDFS based on three options:
- WITH_FLUSH_INTERVAL - Time in milliseconds to accumulate records before commiting
- WITH_FLUSH_SIZE - Size of files in bytes to commit
- WITH_FLUSH_COUNT - Number of files to commit.
The first threshold to be reached will trigger flushing and committing of the files.
Partitioning
HIVE tables and the underlying HDFS files can be partitioned by providing the fields names in the Kafka topic to partition by in the PARTITIONBY clause.
The partitions can be dynamically created by the connector using the WITH_PARTITIONING = DYNAMIC clause. If STRICT partitioning is set the partitions must be created beforehand in HIVE and HDFS.
HIVE table location
The HIVE table location can be set using the WITH_TABLE_LOCATION.
Overwriting
To overwrite records in HIVE table use the WITH_OVERWRITE clause.
Kafka payload support
This sink supports the following Kafka payloads:
- Schema.Struct and Struct (Avro)
- Schema.Struct and JSON
- No Schema and JSON
See connect payloads for more information.
Kerberos
For those Hive clusters using Kerberos for authentication, the connector supports two modes. Controlling the modes happens via connect.hive.security.kerberos.auth.mode configuration. The supported values are
- KEYTAB
- USERPASSWORD.
The connect.hive.security.kerberos.ticket.renew.ms configuration controls the interval (in milliseconds) to renew a previously obtained (during the login step) Kerberos token.
Keytab
When this mode is configured, these extra configurations need to be set:
connect.hive.security.principal= ....
connect.hive.security.keytab = /path/to/the/keytab
connect.hive.security.namenode.principal=....
The keytab file needs to be available on the same path on all the Connect cluster workers. In case the file is missing an error will be raised. You can find the details about the configurations in the Optional Configurations section.
User-password
For those setups where a keytab is not available, the Kerberos authentication can be handled via user and password approach. In this case, the following configurations are required by the sink:
connect.hive.security.kerberos.user = jsmith
connect.hive.security.kerberos.password=password123
connect.hive.security.kerberos.krb5=/path/to/the/krb5
connect.hive.security.kerberos.jaas=/path/to/the/jaas
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=hive
docker-compose up -d hive
Peparing the target system
Create the target database:
docker exec -ti hive hive -e "CREATE DATABASE lenses"
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Hive as the sink and paste the following:
name=hive
connector.class=com.landoop.streamreactor.connect.hive.sink.HiveSinkConnector
tasks.max=1
topics=orders
connect.hive.kcql=INSERT INTO orders SELECT * FROM orders AUTOCREATE PARTITION_BY state WITH_FLUSH_INTERVAL = 10
connect.hive.database.name=lenses
connect.hive.metastore=thrift
connect.hive.metastore.uris=thrift://hive-metastore:9083
connect.hive.fs.defaultFS=hdfs://namenode:8020
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create hive < connector.properties
connect-cli create hive < connector.properties
Wait a for the connector to start and check its running:
connect-cli status hive
Inserting test data
In the fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
Check for data in Hive
docker exec -ti hive hive -S -e "SELECT * FROM lenses.orders"
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.hive.database.name | Sets the database name | string | |
connect.hive.metastore | Protocol used by the hive metastore | string | |
connect.hive.kcql | Contains the Kafka Connect Query Language describing the flow from Apache Kafka topics to Apache Hive tables. | string | |
connect.hive.fs.defaultFS | HDFS Filesystem default uri | string | |
connect.hive.metastore.uris | URI to point to the metastore | string | |
connect.hive.hdfs.conf.dir | The Hadoop configuration directory. | string | |
connect.hive.conf.dir | The Hive configuration directory. | string | |
connect.hive.security.principal | The principal to use when HDFS is using Kerberos to for authentication. | string | |
connect.hive.security.keytab | The path to the keytab file for the HDFS connector principal. This keytab file should only be readable by the connector user. | string | |
connect.hive.namenode.principal | The principal for HDFS Namenode. | string | |
connect.hive.security.kerberos.ticket.renew.ms | The period in milliseconds to renew the Kerberos ticket. | long | 3600000 |
connect.hive.security.kerberos.user | The user name for login in. Used when auth.mode is set to USERPASSWORD | string | |
connect.hive.security.kerberos.password | The user password to login to Kerberos. Used when auth.mode is set to USERPASSWORD | password | |
connect.hive.security.kerberos.krb5 | The path to the KRB5 file | string | |
connect.hive.security.kerberos.jaas | The path to the JAAS file | string | |
connect.hive.security.kerberos.jaas.entry.name | The entry in the jaas file to consider | string | com.sun.security.jgss.initiate |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.hive.security.kerberos.enabled | Configuration indicating whether HDFS is using Kerberos for authentication. | boolean | false |
connect.hive.security.kerberos.auth.mode | The authentication mode for Kerberos. It can be KEYTAB or USERPASSWORD | string | KEYTAB |