Google BigQuery
The Google BigQuery sink connector is an open-source connector imported from Confluent (originally developed by WePay) that enables you to export data from Apache Kafka® topics to Google BigQuery tables.
Overview
The BigQuery sink connector allows you to:
Stream data from Kafka topics to BigQuery tables
Automatically create tables based on topic data
Configure data delivery semantics (at-least-once or exactly-once)
Perform schema evolution when topic schemas change
Prerequisites
Before using the BigQuery sink connector, ensure you have:
A Google Cloud Platform (GCP) account
A BigQuery project with appropriate permissions
Service account credentials with access to BigQuery
Kafka topics with data to be exported
Configuration
Basic Configuration
Here's a basic configuration for the BigQuery sink connector:
Features of Google BigQuery Sink Connector
Multiple tasks support: Configure using
tasks.max
parameter for performance optimization when parsing multiple filesInsertAll API features: Supports insert operations with built-in duplicate detection capabilities
Real-time streaming: Records are inserted one at a time and available immediately for querying
Multi-topic support: Can stream from multiple topics to corresponding BigQuery tables
Parallel processing: Uses an internal thread pool (default: 10 threads, configurable) for scalable record streaming
Important Configuration Properties
defaultDataset
The default dataset to be used. Replaced the datasets parameter of older versions of this connector.
string
-
high
project
The BigQuery project to write to.
string
-
high
topics
A list of Kafka topics to read from.
list
-
high
autoCreateTables
Create BigQuery tables if they don't already exist. This property should only be enabled for Schema Registry-based inputs: Avro, Protobuf, or JSON Schema (JSON_SR). Table creation is not supported for JSON input.
boolean
false
high
gcsBucketName
The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if enableBatchLoad is configured.
string
""
high
queueSize
The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.
long
-1
high
bigQueryMessageTimePartitioning
Whether or not to use the message time when inserting records. Default uses the connector processing time.
boolean
false
high
bigQueryPartitionDecorator
Whether or not to append partition decorator to BigQuery table name when inserting records. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.
boolean
true
high
keySource
Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are FILE and JSON. This property is available in BigQuery sink connector version 1.3 (and later).
string
FILE
medium
keyfile
Keyfile can be either a string representation of the Google credentials file or the path to the Google credentials file itself. The string representation of the Google credentials file is supported in BigQuery sink connector version 1.3 (and later).
string
null
medium
bigQueryRetry
The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.
int
0
medium
bigQueryRetryWait
The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error.
long
1000
medium
sanitizeTopics
Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names.
boolean
false
medium
schemaRetriever
A class that can be used for automatically creating tables and/or updating schemas. Note that in version 2.0.0, the SchemaRetriever API changed to retrieve the schema from each SinkRecord, which will help support multiple schemas per topic.
class
com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
medium
threadPoolSize
The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.
int
10
medium
autoCreateBucket
Whether to automatically create the given bucket, if it does not exist.
boolean
true
medium
allowNewBigQueryFields
If true, new fields can be added to BigQuery tables during subsequent schema updates.
boolean
false
medium
allowBigQueryRequiredFieldRelaxation
If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE. Note that allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation replaced the autoUpdateSchemas parameter of older versions of this connector.
boolean
false
medium
allowSchemaUnionization
If true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates. If false, the record of the last schema in a batch will be used for any necessary table creation and schema update attempts. Note that setting allowSchemaUnionization to false and allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation to true is equivalent to setting autoUpdateSchemas to true in older versions.
boolean
false
medium
auto.register.schemas
Specifies if the Serializer should attempt to register the Schema with Schema Registry.
boolean
true
medium
use.latest.version
Only applies when auto.register.schemas is set to false. If use.latest.version is set to true, then Schema Registry uses the latest version of the schema in the subject for serialization.
boolean
true
medium
timestampPartitionFieldName
The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave blank to enable ingestion time partitioning for each table.
string
null
low
clusteringPartitionFieldNames
Comma-separated list of fields where data is clustered in BigQuery.
list
null
low
timePartitioningType
The time partitioning type to use when creating tables. Existing tables will not be altered to use this partitioning type.
string
DAY
low
allBQFieldsNullable
If true, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays).
boolean
false
low
avroDataCacheSize
The size of the cache to use when converting schemas from Avro to Kafka Connect.
int
100
low
batchLoadIntervalSec
The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if enableBatchLoad is configured.
int
120
low
convertDoubleSpecialValues
Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successful delivery to BigQuery.
boolean
false
low
enableBatchLoad
Beta Feature - Use with caution. The sublist of topics to be batch loaded through GCS.
list
""
low
includeKafkaData
Whether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows.
boolean
false
low
upsertEnabled
Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won't work with SMTs that change the name of the topic and doesn't support JSON input.
boolean
false
low
deleteEnabled
Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (that is–a tombstone record) is read. This feature will not work with SMTs that change the name of the topic and doesn't support JSON input.
boolean
false
low
intermediateTableSuffix
A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table.
string
"tmp"
low
mergeIntervalMs
How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to -1 to disable periodic flushing.
long
60000
low
mergeRecordsThreshold
How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. Can be set to -1 to disable record count-based flushing.
long
-1
low
kafkaDataFieldName
The Kafka data field name. The default value is null, which means the Kafka Data field will not be included.
string
null
low
kafkaKeyFieldName
The Kafka key field name. The default value is null, which means the Kafka Key field will not be included.
string
null
low
topic2TableMap
Map of topics to tables (optional). Format: comma-separated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,... Note that topic name should not be modified using regex SMT while using this option. Also note that SANITIZE_TOPICS_CONFIG would be ignored if this config is set.
string
""
low
csfle.enabled
CSFLE is enabled for the connector if set to True.
boolean
False
low
Data Mapping
Data Type Conversions
The connector maps Kafka Connect schema types to BigQuery data types as follows:
STRING
String
INTEGER
INT8
INTEGER
INT16
INTEGER
INT32
INTEGER
INT64
FLOAT
FLOAT32
FLOAT
FLOAT64
BOOLEAN
Boolean
BYTES
Bytes
TIMESTAMP
Logical TIMESTAMP
TIME
Logical TIME
DATE
Logical DATE
FLOAT
Logical Decimal
DATE
Debezium Date
TIME
Debezium MicroTime
TIME
Debezium Time
TIMESTAMP
Debezium MicroTimestamp
TIMESTAMP
Debezium TIMESTAMP
TIMESTAMP
Debezium ZonedTimestamp
Schema Evolution
When schema evolution is enabled (using allowNewBigQueryFields
, allowBigQueryRequiredFieldRelaxation
, and allowSchemaUnionization
), the connector can handle schema changes:
New fields added to the Kafka topic can be added to the BigQuery table
Field constraints can be relaxed from REQUIRED to NULLABLE
Schemas can be unionized when records in the same batch have different schemas
Usage Examples
Basic Example
Example with Batch Loading
Example with Upsert Functionality
Troubleshooting
Common Issues
Authentication errors: Ensure your service account key file is correct and has appropriate permissions.
Schema compatibility issues: When schema updates are enabled, existing data might not be compatible with new schemas.
Quota limitations: BigQuery has quotas for API requests; consider adjusting
threadPoolSize
andqueueSize
.Table creation failures: Ensure
autoCreateTables
is only used with Schema Registry-based inputs (Avro, Protobuf, or JSON Schema).Performance issues: For high-volume data, consider using batch loading via GCS instead of streaming inserts.
Logging
To enable detailed logging for troubleshooting:
Limitations
The BigQuery Sink connector has the following limitations:
The connector does not support schemas with recursion.
The connector does not support schemas having float fields with NaN or +Infinity values.
Auto schema update does not support removing columns.
Auto schema update does not support recursive schemas.
When the connector is configured with
upsertEnabled
ordeleteEnabled
, it does not support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
Upgrading to 2.x.x
The following changes aren’t backward compatible in the BigQuery connector:
datasets
was removed anddefaultDataset
has been introduced. The connector now infers the dataset from the topic name if the topic is in the form<dataset>:<tableName>
. If the topic name is in the form<tablename>
, the connector defaults todefaultDataset
.topicsToTables
was removed. You should use SMT RegexRouter to route topics to tables.autoUpdateSchemas
was replaced byallowNewBigQueryFields
andallowBigQueryRequiredFieldRelaxation
.value.converter.enhanced.avro.schema.support
should be set to false or removed. If this property is not removed or set to false, you may receive the following error:
Last updated
Was this helpful?