Loading...
This page describes an overview of the Lenses SMTs for Kafka Connect.
Lenses provides several SMTs designed for use with Stream Reactor connectors, you can also use them with other connectors or your own.
These SMTs are designed to be used with the Kafka Connect framework. The SMTs create record headers. The advantage of using headers is that they reduce the memory and CPU cycles required to change the payload. See for example the Kafka Connect TimestampConverter. Furthermore, they support Stream-Reactor S3 sink partitioners, for scenarios like:
Partitioning by the system clock (e.g. using the system clock as a partition key with a yyyy-MM-dd-HH format)
Partitioning by a rolling window (e.g. every 15 minutes, or one hour)
Partitioning by a custom timestamp (e.g. a timestamp field in the payload, record Key or Value)
Partitioning by a custom timestamp with a rolling window (e.g. a timestamp field in the payload, every 15 minutes, or one hour)
Add the plugin to the worker classloader
isolation via the plugin.path
option:
For MSK connect you need to bundle your SMT with the connector you need to use and deploy as one plugin.
This zip (containing both jars at the same level) must be uploaded as a plugin in MSK connect.
Loading...
Loading...
Loading...
Loading...
SMT that inserts the system clock value as a message header, a value adapted to a specified time window boundary, for example every 15 minutes, or one hour.
The value inserted is stored as a STRING, and it holds either a string representation of the date and time epoch value, or a string representation of the date and time in the format specified.
header.name
The name of the header to insert the timestamp into.
String
High
value.type
Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required."
String
format
epoch,format
High
format
Sets the format of the header value inserted if the type was set to string. It can be any valid java date format.
String
High
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
hours, minutes, seconds
High
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type
it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
High
timezone
Sets the timezone. It can be any valid java timezone. Overwrite it when value.type
is set to format
, otherwise it will raise an exception.
String
UTC
High
To store the epoch value, use the following configuration:
To store a string representation of the date and time in the format yyyy-MM-dd HH:mm:ss.SSS
, use the following:
To use the timezone Asia/Kolkoata
, use the following:
Kafka SMT that inserts date, year, month, day, hour, minute and second headers using the system timestamp and a rolling time window configuration.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH
or yyyy/MM/dd/HH
, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_
, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _
, here are a few KCQL examples:
header.prefix.name
Optional header prefix.
String
Low
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
Low
year.format
Optional Java date time formatter for the year component.
String
yyyy
Low
month.format
Optional Java date time formatter for the month component.
String
MM
Low
day.format
Optional Java date time formatter for the day component.
String
dd
Low
hour.format
Optional Java date time formatter for the hour component.
String
HH
Low
minute.format
Optional Java date time formatter for the minute component.
String
mm
Low
second.format
Optional Java date time formatter for the second component.
String
ss
Low
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
Low
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
Low
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
hours, minutes, seconds
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type
it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_
, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata
, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH
, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
Loading...
Loading...
Loading...
Loading...
Loading...