This page describes the usage of the Stream Reactor GCP Storage Source Connector.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The GCP Storage source location is defined within the FROM clause. The connector will read all files from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in GCS was not written by the Lenses GCS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the files in the bucket.
connect.gcpstorage.source.partition.extractor.regex=none
connect.gcpstorage.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric
.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read files containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.
Text: The connector will read files containing lines of text, each line representing a distinct record.
CSV: The connector will read files containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read files containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read files containing bytes, each file is translated to a Kafka message.
Use the STOREAS
clause to configure the storage format. The following options are available:
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
s to ensure precise ordering, leveraging optimizations offered by the GCS API, guaranteeing the accurate sequence of files.
When using the GCS source alongside the GCS sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where GCS data is generated by applications that do not maintain lexical file name order.
In such cases, to process files in the correct sequence, the source needs to list all files in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.gcpstorage.source.ordering.type
to LastModified
. This ensures that the source correctly arranges and processes the data based on the timestamps of the files.
To limit the number of file names the source reads from GCS in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
The GCP Storage Source Connector allows you to filter the files to be processed based on their extensions. This is controlled by two properties: connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
.
The connect.gcpstorage.source.extension.excludes
property is a comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. For example, to exclude .txt
and .csv
files, you would set this property as follows:
The connect.gcpstorage.source.extension.includes
property is a comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. For example, to include only .json
and .xml
files, you would set this property as follows:
Note: If both connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
are set, the connector first applies the exclusion filter and then the inclusion filter.
The PROPERTIES
clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.
When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “File” mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in GCP Storage:
When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.
When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.
To configure the partition extractor, you can utilize the connect.gcpstorage.source.partition.extractor.type
property, which supports two options:
hierarchical: This option aligns with the default format used by the sink, topic/partition/offset.json.
regex: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the connect.gcpstorage.source.partition.extractor.regex
property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:
S3 Storage Format | Kafka Output Format | Restore or replicate cluster | Analytics | Sink Converter | Source Converter |
---|---|---|---|---|---|
Name | Description | Type | Available Values |
---|---|---|---|
Name | Description | Type | Available Values | Default Value |
---|---|---|---|---|
JSON
STRING
Same,Other
Yes, No
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
Yes
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
No
ByteArrayConverter
ByteArrayConverter
JSON
JSON
Same,Other
Yes
JsonConverter
StringConverter
JSON
JSON
Same,Other
No
StringConverter
StringConverter
AVRO,Parquet
JSON
Same,Other
Yes,No
JsonConverter
JsonConverter or Avro Converter( Glue, Confluent)
AVRO,Parquet, JSON
BYTES
Same,Other
Yes,No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Same
Yes
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
AVRO
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Other
Yes,No
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
Yes
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
Protobuf
Other
Yes,No
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet, JSON
Other
Same, Other
Yes,No
ByteArrayConverter
ByteArrayConverter
read.text.mode
Controls how Text content is read
Enum
Regex, StartEndTag, StartEndLine
read.text.regex
Regular Expression for Text Reading (if applicable)
String
read.text.start.tag
Start Tag for Text Reading (if applicable)
String
read.text.end.tag
End Tag for Text Reading (if applicable)
String
read.text.buffer.size
Text Buffer Size (for optimization)
Int
read.text.start.line
Start Line for Text Reading (if applicable)
String
read.text.end.line
End Line for Text Reading (if applicable)
String
read.text.trim
Trim Text During Reading
Boolean
store.envelope
Messages are stored as “Envelope”
Boolean
connect.gcpstorage.gcp.auth.mode
Specifies the authentication mode for connecting to GCP.
string
"Credentials", "File" or "Default"
"Default"
connect.gcpstorage.gcp.credentials
For "auth.mode" credentials: GCP Authentication credentials string.
string
(Empty)
connect.gcpstorage.gcp.file
For "auth.mode" file: Local file path for file containing GCP authentication credentials.
string
(Empty)
connect.gcpstorage.gcp.project.id
GCP Project ID.
string
(Empty)
connect.gcpstorage.gcp.quota.project.id
GCP Quota Project ID.
string
(Empty)
connect.gcpstorage.endpoint
Endpoint for GCP Storage.
string
connect.gcpstorage.error.policy
Defines the error handling policy when errors occur during data transfer to or from GCP Storage.
string
"NOOP," "THROW," "RETRY"
"THROW"
connect.gcpstorage.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.
int
20
connect.gcpstorage.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.gcpstorage.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage.
long
5
connect.gcpstorage.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.
long
50
connect.gcpstorage.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
string
[kcql configuration]({{< relref "#kcql-support" >}})
connect.gcpstorage.source.extension.excludes
A comma-separated list of file extensions to exclude from the source file search.
string
[file extension filtering]({{< relref "#file-extension-filtering" >}})
connect.gcpstorage.source.extension.includes
A comma-separated list of file extensions to include in the source file search.
string
[file extension filtering]({{< relref "#file-extension-filtering" >}})
connect.gcpstorage.source.partition.extractor.type
Type of Partition Extractor (Hierarchical or Regex)
string
hierarchical, regex
connect.gcpstorage.source.partition.extractor.regex
Regex Pattern for Partition Extraction (if applicable)
string
connect.gcpstorage.source.partition.search.continuous
If set to true the connector will continuously search for new partitions.
boolean
true, false
true
connect.gcpstorage.source.partition.search.interval
The interval in milliseconds between searching for new partitions.
long
300000
connect.gcpstorage.source.partition.search.excludes
A comma-separated list of paths to exclude from the partition search.
string
".indexes"
connect.gcpstorage.source.partition.search.recurse.levels
Controls how many levels deep to recurse when searching for new partitions
int
0
connect.gcpstorage.ordering,type
Type of ordering for the GCS file names to ensure the processing order.
string
AlphaNumeric, LastModified
AlphaNumeric