Redis


Kafka Connect sink connector for writing data from Kafka to Redis.

KCQL support 

The following KCQL is supported:

[INSERT INTO <redis-cache>]
SELECT FIELD, ...
FROM <kafka-topic>
[PK FIELD]
[STOREAS SortedSet(key=FIELD)|GEOADD|STREAM]

Concepts 

Cache mode 

The purpose of this mode is to cache in Redis [Key-Value] pairs. Imagine a Kafka topic with currency foreign exchange rate messages:

{ "symbol": "USDGBP" , "price": 0.7943 }
{ "symbol": "EURGBP" , "price": 0.8597 }

You may want to store in Redis: the symbol as the Key and the price as the Value. This will effectively make Redis a caching system, which multiple other applications can access to get the (latest) value. To achieve that using this particular Kafka Redis Sink Connector, you need to specify the KCQL as:

SELECT price from yahoo-fx PK symbol

This will update the keys USDGBP , EURGBP with the relevant price using the (default) JSON format:

Key=EURGBP  Value={ "price": 0.7943 }

Composite keys are support with the PK clause, a delimiter can be set with optional configuration property connect.redis.pk.delimiter

Sorted Sets 

To insert messages from a Kafka topic into 1 Sorted Set use the following KCQL syntax:

INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS SortedSet(score=timestamp) TTL=60

This will create and add entries to the (sorted set) named cpu_stats. The entries will be ordered in the Redis set based on the score that we define it to be the value of the timestamp field of the AVRO message from Kafka. In the above example, we are selecting and storing all the fields of the Kafka message.

The TTL statement allows setting a time to live on the sorted set. If not specified to TTL is set.

Multiple Sorted Sets 

The connector can create multiple sorted sets by promoting each value of one field from the Kafka message into one Sorted Set and selecting which values to store into the sorted-sets. Set KCQL clause to define the filed using PK (primary key)

SELECT temperature, humidity FROM sensorsTopic PK sensorID STOREAS SortedSet(score=timestamp)

The connector can also prefix the name of the Key using the INSERT statement for Multiple SortedSets:

INSERT INTO FX- SELECT price from yahoo-fx PK symbol STOREAS SortedSet(score=timestamp) TTL=60

This will create key with names FX-USDGBP , FX-EURGBP etc.

The TTL statement allows setting a time to live on the sorted set. If not specified to TTL is set.

Geospatial add 

To insert messages from a Kafka topic with GEOADD use the following KCQL syntax:

INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS GEOADD

Streams 

To insert messages from a Kafka topic to a Redis Stream use the following KCQL syntax:

INSERT INTO redis_stream_name SELECT * FROM my-kafka-topic STOREAS STREAM

PubSub 

To insert message from a Kafka topic to a Redis PubSub use the following KCQL syntax:

SELECT * FROM topic STOREAS PubSub (channel=myfield)

The channel to write to in Redis is determined by field in the payload of the Kafka message set in the KCQL statment, in this case a field called myfield.

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=redis
docker-compose up -d redis

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Redis as the sink and paste the following:

name=redis
connector.class=io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector
tasks.max=1
topics=redis
connect.redis.host=redis
connect.redis.port=6379
connect.redis.kcql=INSERT INTO lenses SELECT * FROM redis STOREAS STREAM


To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create redis < connector.properties

connect-cli create redis < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status redis

Inserting test data 

In the to lenses-box container start the kafka producer shell:


kafka-avro-console-producer \
  --broker-list localhost:9092 --topic redis \
  --property value.schema='{"type":"record","name":"User",
  "fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'

the console is now waiting for your input, enter the following:


{ "firstName": "John", "lastName": "Smith", "age": 30, "salary": 4830 }

Check for data in Redis 

 docker exec -ti redis redis-cli get "John"

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.redis.pk.delimiterSpecifies the redis primary key delimiterstring.
ssl.providerThe name of the security provider used for SSL connections. Default value is the default security provider of the JVM.string
ssl.protocolThe SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.stringTLS
ssl.truststore.locationThe location of the trust store file.string
ssl.keystore.passwordThe store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.password
ssl.keystore.locationThe location of the key store file. This is optional for client and can be used for two-way authentication for client.string
ssl.truststore.passwordThe password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.password
ssl.keymanager.algorithmThe algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.stringSunX509
ssl.trustmanager.algorithmThe algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.stringPKIX
ssl.keystore.typeThe file format of the key store file. This is optional for client.stringJKS
ssl.cipher.suitesA list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.list
ssl.endpoint.identification.algorithmThe endpoint identification algorithm to validate server hostname using server certificate.stringhttps
ssl.truststore.typeThe file format of the trust store file.stringJKS
ssl.enabled.protocolsThe list of protocols enabled for SSL connections.list[TLSv1.2, TLSv1.1, TLSv1]
ssl.key.passwordThe password of the private key in the key store file. This is optional for client.password
ssl.secure.random.implementationThe SecureRandom PRNG implementation to use for SSL cryptography operations.string
connect.redis.kcqlKCQL expression describing field selection and routes.string
connect.redis.hostSpecifies the redis serverstring
connect.redis.portSpecifies the redis connection portint
connect.redis.passwordProvides the password for the redis connection.password
connect.redis.ssl.enabledEnables ssl for the redis connectionbooleanfalse
connect.redis.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.redis.retry.intervalThe time in milliseconds between retries.int60000
connect.redis.max.retriesThe maximum number of times to try the write again.int20
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
--
Last modified: September 13, 2024