LogoLogo
HomeProductsDownload Community Edition
  • Lenses DevX
  • Kafka Connectors
  • Overview
  • Understanding Kafka Connect
  • Connectors
    • Install
    • Sources
      • AWS S3
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • GCP PubSub
      • GCP Storage
      • FTP
      • JMS
      • MQTT
    • Sinks
      • AWS S3
      • Azure CosmosDB
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • Elasticsearch
      • GCP PubSub
      • GCP Storage
      • HTTP
      • InfluxDB
      • JMS
      • MongoDB
      • MQTT
      • Redis
      • Google BigQuery
  • Secret Providers
    • Install
    • AWS Secret Manager
    • Azure KeyVault
    • Environment
    • Hashicorp Vault
    • AES256
  • Single Message Transforms
    • Overview
    • InsertFieldTimestampHeaders
    • InsertRecordTimestampHeaders
    • InsertRollingFieldTimestampHeaders
    • InsertRollingRecordTimestampHeaders
    • InsertRollingWallclock
    • InsertRollingWallclockHeaders
    • InsertSourcePartitionOrOffsetValue
    • InsertWallclock
    • InsertWallclockHeaders
    • InsertWallclockDateTimePart
    • TimestampConverter
  • Tutorials
    • Backup & Restore
    • Creating & managing a connector
    • Cloud Storage Examples
      • AWS S3 Source Examples
      • AWS S3 Sink Time Based Partitioning
      • GCP Source
      • GCP Sink Time Based Partitioning
    • Http Sink Templating
    • Sink converters & different data formats
    • Source converters with incoming JSON or Avro
    • Loading XML from Cloud storage
    • Loading ragged width files
    • Using the MQTT Connector with RabbitMQ
    • Using Error Policies
    • Using dead letter queues
  • Contributing
    • Developing a connector
    • Utilities
    • Testing
  • Lenses Connectors Support
  • Downloads
  • Release notes
    • Stream Reactor
    • Secret Providers
    • Single Message Transforms
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Overview
  • Prerequisites
  • Configuration
  • Features of Google BigQuery Sink Connector
  • Data Mapping
  • Usage Examples
  • Troubleshooting
  • Limitations
  • Upgrading to 2.x.x

Was this helpful?

Export as PDF
  1. Connectors
  2. Sinks

Google BigQuery

The Google BigQuery sink connector is an open-source connector imported from Confluent (originally developed by WePay) that enables you to export data from Apache Kafka® topics to Google BigQuery tables.

Overview

The BigQuery sink connector allows you to:

  • Stream data from Kafka topics to BigQuery tables

  • Automatically create tables based on topic data

  • Configure data delivery semantics (at-least-once or exactly-once)

  • Perform schema evolution when topic schemas change

Prerequisites

Before using the BigQuery sink connector, ensure you have:

  1. A Google Cloud Platform (GCP) account

  2. A BigQuery project with appropriate permissions

  3. Service account credentials with access to BigQuery

  4. Kafka topics with data to be exported

Configuration

Basic Configuration

Here's a basic configuration for the BigQuery sink connector:

name = kcbq-connect1
connector.class = com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max = 1
topics = quickstart
sanitizeTopics = true
autoCreateTables = true
allowNewBigQueryFields = true
allowBigQueryRequiredFieldRelaxation = true
schemaRetriever = com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
project = lenses-123
defaultDataset = ConfluentDataSet
keyfile = <path to json file>
transforms = RegexTransformation
transforms.RegexTransformation.type = org.apache.kafka.connect.transforms.RegexRouter
transforms.RegexTransformation.regex = (kcbq_)(.*)
transforms.RegexTransformation.replacement = $2

Features of Google BigQuery Sink Connector

  • Multiple tasks support: Configure using tasks.max parameter for performance optimization when parsing multiple files

  • InsertAll API features: Supports insert operations with built-in duplicate detection capabilities

  • Real-time streaming: Records are inserted one at a time and available immediately for querying

  • Multi-topic support: Can stream from multiple topics to corresponding BigQuery tables

  • Parallel processing: Uses an internal thread pool (default: 10 threads, configurable) for scalable record streaming

Important Configuration Properties

Property
Description
Type
Default
Importance

defaultDataset

The default dataset to be used. Replaced the datasets parameter of older versions of this connector.

string

-

high

project

The BigQuery project to write to.

string

-

high

topics

A list of Kafka topics to read from.

list

-

high

autoCreateTables

Create BigQuery tables if they don't already exist. This property should only be enabled for Schema Registry-based inputs: Avro, Protobuf, or JSON Schema (JSON_SR). Table creation is not supported for JSON input.

boolean

false

high

gcsBucketName

The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if enableBatchLoad is configured.

string

""

high

queueSize

The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.

long

-1

high

bigQueryMessageTimePartitioning

Whether or not to use the message time when inserting records. Default uses the connector processing time.

boolean

false

high

bigQueryPartitionDecorator

Whether or not to append partition decorator to BigQuery table name when inserting records. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.

boolean

true

high

keySource

Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are FILE and JSON. This property is available in BigQuery sink connector version 1.3 (and later).

string

FILE

medium

keyfile

Keyfile can be either a string representation of the Google credentials file or the path to the Google credentials file itself. The string representation of the Google credentials file is supported in BigQuery sink connector version 1.3 (and later).

string

null

medium

bigQueryRetry

The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.

int

0

medium

bigQueryRetryWait

The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error.

long

1000

medium

sanitizeTopics

Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names.

boolean

false

medium

schemaRetriever

A class that can be used for automatically creating tables and/or updating schemas. Note that in version 2.0.0, the SchemaRetriever API changed to retrieve the schema from each SinkRecord, which will help support multiple schemas per topic.

class

com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever

medium

threadPoolSize

The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.

int

10

medium

autoCreateBucket

Whether to automatically create the given bucket, if it does not exist.

boolean

true

medium

allowNewBigQueryFields

If true, new fields can be added to BigQuery tables during subsequent schema updates.

boolean

false

medium

allowBigQueryRequiredFieldRelaxation

If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE. Note that allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation replaced the autoUpdateSchemas parameter of older versions of this connector.

boolean

false

medium

allowSchemaUnionization

If true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates. If false, the record of the last schema in a batch will be used for any necessary table creation and schema update attempts. Note that setting allowSchemaUnionization to false and allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation to true is equivalent to setting autoUpdateSchemas to true in older versions.

boolean

false

medium

auto.register.schemas

Specifies if the Serializer should attempt to register the Schema with Schema Registry.

boolean

true

medium

use.latest.version

Only applies when auto.register.schemas is set to false. If use.latest.version is set to true, then Schema Registry uses the latest version of the schema in the subject for serialization.

boolean

true

medium

timestampPartitionFieldName

The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave blank to enable ingestion time partitioning for each table.

string

null

low

clusteringPartitionFieldNames

Comma-separated list of fields where data is clustered in BigQuery.

list

null

low

timePartitioningType

The time partitioning type to use when creating tables. Existing tables will not be altered to use this partitioning type.

string

DAY

low

allBQFieldsNullable

If true, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays).

boolean

false

low

avroDataCacheSize

The size of the cache to use when converting schemas from Avro to Kafka Connect.

int

100

low

batchLoadIntervalSec

The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if enableBatchLoad is configured.

int

120

low

convertDoubleSpecialValues

Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successful delivery to BigQuery.

boolean

false

low

enableBatchLoad

Beta Feature - Use with caution. The sublist of topics to be batch loaded through GCS.

list

""

low

includeKafkaData

Whether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows.

boolean

false

low

upsertEnabled

Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won't work with SMTs that change the name of the topic and doesn't support JSON input.

boolean

false

low

deleteEnabled

Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (that is–a tombstone record) is read. This feature will not work with SMTs that change the name of the topic and doesn't support JSON input.

boolean

false

low

intermediateTableSuffix

A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table.

string

"tmp"

low

mergeIntervalMs

How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to -1 to disable periodic flushing.

long

60000

low

mergeRecordsThreshold

How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. Can be set to -1 to disable record count-based flushing.

long

-1

low

kafkaDataFieldName

The Kafka data field name. The default value is null, which means the Kafka Data field will not be included.

string

null

low

kafkaKeyFieldName

The Kafka key field name. The default value is null, which means the Kafka Key field will not be included.

string

null

low

topic2TableMap

Map of topics to tables (optional). Format: comma-separated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,... Note that topic name should not be modified using regex SMT while using this option. Also note that SANITIZE_TOPICS_CONFIG would be ignored if this config is set.

string

""

low

csfle.enabled

CSFLE is enabled for the connector if set to True.

boolean

False

low

Data Mapping

Data Type Conversions

The connector maps Kafka Connect schema types to BigQuery data types as follows:

BigQuery Data Type
Connector Mapping

STRING

String

INTEGER

INT8

INTEGER

INT16

INTEGER

INT32

INTEGER

INT64

FLOAT

FLOAT32

FLOAT

FLOAT64

BOOLEAN

Boolean

BYTES

Bytes

TIMESTAMP

Logical TIMESTAMP

TIME

Logical TIME

DATE

Logical DATE

FLOAT

Logical Decimal

DATE

Debezium Date

TIME

Debezium MicroTime

TIME

Debezium Time

TIMESTAMP

Debezium MicroTimestamp

TIMESTAMP

Debezium TIMESTAMP

TIMESTAMP

Debezium ZonedTimestamp

Schema Evolution

When schema evolution is enabled (using allowNewBigQueryFields, allowBigQueryRequiredFieldRelaxation, and allowSchemaUnionization), the connector can handle schema changes:

  • New fields added to the Kafka topic can be added to the BigQuery table

  • Field constraints can be relaxed from REQUIRED to NULLABLE

  • Schemas can be unionized when records in the same batch have different schemas

Usage Examples

Basic Example

name=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
autoCreateTables=true

Example with Batch Loading

name=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
enableBatchLoad=orders,customers
gcsBucketName=my-gcs-bucket
autoCreateBucket=true

Example with Upsert Functionality

name=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
upsertEnabled=true
mergeIntervalMs=30000
mergeRecordsThreshold=1000

Troubleshooting

Common Issues

  1. Authentication errors: Ensure your service account key file is correct and has appropriate permissions.

  2. Schema compatibility issues: When schema updates are enabled, existing data might not be compatible with new schemas.

  3. Quota limitations: BigQuery has quotas for API requests; consider adjusting threadPoolSize and queueSize.

  4. Table creation failures: Ensure autoCreateTables is only used with Schema Registry-based inputs (Avro, Protobuf, or JSON Schema).

  5. Performance issues: For high-volume data, consider using batch loading via GCS instead of streaming inserts.

Logging

To enable detailed logging for troubleshooting:

log4j.logger.com.wepay.kafka.connect.bigquery=DEBUG

Limitations

The BigQuery Sink connector has the following limitations:

  • The connector does not support schemas with recursion.

  • The connector does not support schemas having float fields with NaN or +Infinity values.

  • Auto schema update does not support removing columns.

  • Auto schema update does not support recursive schemas.

  • When the connector is configured with upsertEnabled or deleteEnabled, it does not support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:

    • io.debezium.transforms.ByLogicalTableRouter

    • io.debezium.transforms.outbox.EventRouter

    • org.apache.kafka.connect.transforms.RegexRouter

    • org.apache.kafka.connect.transforms.TimestampRouter

    • io.confluent.connect.transforms.MessageTimestampRouter

    • io.confluent.connect.transforms.ExtractTopic$Key

Upgrading to 2.x.x

The following changes aren’t backward compatible in the BigQuery connector:

  • datasets was removed and defaultDataset has been introduced. The connector now infers the dataset from the topic name if the topic is in the form <dataset>:<tableName>. If the topic name is in the form <tablename>, the connector defaults to defaultDataset.

  • topicsToTables was removed. You should use SMT RegexRouter to route topics to tables.

  • autoUpdateSchemas was replaced by allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation.

  • value.converter.enhanced.avro.schema.support should be set to false or removed. If this property is not removed or set to false, you may receive the following error:

    Invalid field name
    "com.examples.project-super-important.v1.MyData". Fields must
    contain only letters, numbers, and underscores, start with a letter or
    underscore, and be at most 300 characters long.

PreviousRedisNextSecret Providers

Last updated 1 month ago

Was this helpful?