5.0

Hive

Kafka Connect sink connector for writing data from Kafka to Hive.

Two versions of the Hive connector are available:

  • Hive (Hive 2.1+)
  • Hive 1.1 (Hive 1.1)

KCQL support 

The following KCQL is supported:

INSERT INTO <hive_table>
SELECT FIELD, ... 
FROM kafka_topic_name
[STOREDAS PARQUET|ORC]
[WITH_FLUSH_INTERVAL=interval in milliseconds]
[WITH_FLUSH_SIZE=integer]
[WITH_FLUZE]
[WITH_SCHEMA_EVOLUTION=ADD|MATCH]
[WITH_OVERWRITE]
[PARTITIONBY col1, col2]
[WITH_PARTITIONING = DYNAMIC|STRICT]
[AUTOCREATE]
[WITH_TABLE_LOCATION="my/path"]

Examples:

INSERT INTO hive_tableA SELECT * FROM kafka_topicA WITH_FLUSH_INTERVAL = 10
INSERT INTO hive_tableA SELECT col1,col2 FROM kafka_topicA WITH_SCHEMA_EVOLUTION = ADD
INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA WITH_TABLE_LOCATION = "/magic/location/on/my/ssd"
INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA WITH_OVERWRITE
INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA PARTITIONBY col1, col2
INSERT INTO hive_tableA SELECT col1, col2 FROM kafka_topicA WITH_PARTITIONING = DYNAMIC
INSERT INTO hive_tableA SELECT f1 as col1, f2 as col2 FROM kafka_topicA AUTOCREATE

Concepts 

The Connector writes to HDFS via HIVE. The Hive metastore is used a metadata reference lookup.

Auto create tables 

The connector can autocreate tables in HIVE is the AUTOCREATE clause is set.

Stored As 

The Connector support writing Parquet and ORC files, controlled by the STORED AS clause

Controlling commits to HDFS 

Records are flushed to HDFS based on three options:

  • WITH_FLUSH_INTERVAL - Time in milliseconds to accumulate records before commiting
  • WITH_FLUSH_SIZE - Size of files in bytes to commit
  • WITH_FLUSH_COUNT - Number of files to commit.

The first threshold to be reached will trigger flushing and committing of the files.

Partitioning 

HIVE tables and the underlying HDFS files can be partitioned by providing the fields names in the Kafka topic to partition by in the PARTITIONBY clause.

The partitions can be dynamically created by the connector using the WITH_PARTITIONING = DYNAMIC clause. If STRICT partitioning is set the partitions must be created beforehand in HIVE and HDFS.

HIVE table location 

The HIVE table location can be set using the WITH_TABLE_LOCATION.

Overwriting 

To overwrite records in HIVE table use the WITH_OVERWRITE clause.

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Kerberos 

For those Hive clusters using Kerberos for authentication, the connector supports two modes. Controlling the modes happens via connect.hive.security.kerberos.auth.mode configuration. The supported values are

  • KEYTAB
  • USERPASSWORD.

The connect.hive.security.kerberos.ticket.renew.ms configuration controls the interval (in milliseconds) to renew a previously obtained (during the login step) Kerberos token.

Keytab 

When this mode is configured, these extra configurations need to be set:

connect.hive.security.principal= ....
connect.hive.security.keytab = /path/to/the/keytab
connect.hive.security.namenode.principal=....

The keytab file needs to be available on the same path on all the Connect cluster workers. In case the file is missing an error will be raised. You can find the details about the configurations in the Optional Configurations section.

User-password 

For those setups where a keytab is not available, the Kerberos authentication can be handled via user and password approach. In this case, the following configurations are required by the sink:

connect.hive.security.kerberos.user = jsmith
connect.hive.security.kerberos.password=password123
connect.hive.security.kerberos.krb5=/path/to/the/krb5
connect.hive.security.kerberos.jaas=/path/to/the/jaas

Error polices 

The connector supports Error polices .

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=hive
docker-compose up -d hive

Peparing the target system 

Create the target database:

docker exec -ti hive hive -e "CREATE DATABASE lenses"

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page , select Hive as the sink and paste the following:

name=hive
connector.class=com.landoop.streamreactor.connect.hive.sink.HiveSinkConnector
tasks.max=1
topics=orders
connect.hive.kcql=INSERT INTO orders SELECT * FROM orders AUTOCREATE PARTITION_BY state WITH_FLUSH_INTERVAL = 10
connect.hive.database.name=lenses
connect.hive.metastore=thrift
connect.hive.metastore.uris=thrift://hive-metastore:9083
connect.hive.fs.defaultFS=hdfs://namenode:8020

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli :

connect-cli create hive < connector.properties

connect-cli create hive < connector.properties

Wait a for the connector to start and check its running:

connect-cli status hive

Inserting test data 

In the fastdata container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}

Check for data in Hive 


docker exec -ti hive hive -S -e "SELECT * FROM lenses.orders"

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.hive.database.nameSets the database namestring
connect.hive.metastoreProtocol used by the hive metastorestring
connect.hive.kcqlContains the Kafka Connect Query Language describing the flow from Apache Kafka topics to Apache Hive tables.string
connect.hive.fs.defaultFSHDFS Filesystem default uristring
connect.hive.metastore.urisURI to point to the metastorestring
connect.hive.hdfs.conf.dirThe Hadoop configuration directory.string
connect.hive.conf.dirThe Hive configuration directory.string
connect.hive.security.principalThe principal to use when HDFS is using Kerberos to for authentication.string
connect.hive.security.keytabThe path to the keytab file for the HDFS connector principal. This keytab file should only be readable by the connector user.string
connect.hive.namenode.principalThe principal for HDFS Namenode.string
connect.hive.security.kerberos.ticket.renew.msThe period in milliseconds to renew the Kerberos ticket.long3600000
connect.hive.security.kerberos.userThe user name for login in. Used when auth.mode is set to USERPASSWORDstring
connect.hive.security.kerberos.passwordThe user password to login to Kerberos. Used when auth.mode is set to USERPASSWORDpassword
connect.hive.security.kerberos.krb5The path to the KRB5 filestring
connect.hive.security.kerberos.jaasThe path to the JAAS filestring
connect.hive.security.kerberos.jaas.entry.nameThe entry in the jaas file to considerstringcom.sun.security.jgss.initiate
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.hive.security.kerberos.enabledConfiguration indicating whether HDFS is using Kerberos for authentication.booleanfalse
connect.hive.security.kerberos.auth.modeThe authentication mode for Kerberos. It can be KEYTAB or USERPASSWORDstringKEYTAB