All pages
Powered by GitBook
1 of 1

Loading...

Google BigQuery

The Google BigQuery sink connector is an open-source connector imported from Confluent (originally developed by WePay) that enables you to export data from Apache Kafka® topics to Google BigQuery tables.

Overview

The BigQuery sink connector allows you to:

  • Stream data from Kafka topics to BigQuery tables

  • Automatically create tables based on topic data

  • Configure data delivery semantics (at-least-once or exactly-once)

  • Perform schema evolution when topic schemas change

Prerequisites

Before using the BigQuery sink connector, ensure you have:

  1. A Google Cloud Platform (GCP) account

  2. A BigQuery project with appropriate permissions

  3. Service account credentials with access to BigQuery

  4. Kafka topics with data to be exported

Configuration

Basic Configuration

Here's a basic configuration for the BigQuery sink connector:

Features of Google BigQuery Sink Connector

  • Multiple tasks support: Configure using tasks.max parameter for performance optimization when parsing multiple files

  • InsertAll API features: Supports insert operations with built-in duplicate detection capabilities

  • Real-time streaming: Records are inserted one at a time and available immediately for querying

Important Configuration Properties

Property
Description
Type
Default
Importance

Data Mapping

Data Type Conversions

The connector maps Kafka Connect schema types to BigQuery data types as follows:

BigQuery Data Type
Connector Mapping

Schema Evolution

When schema evolution is enabled (using allowNewBigQueryFields, allowBigQueryRequiredFieldRelaxation, and allowSchemaUnionization), the connector can handle schema changes:

  • New fields added to the Kafka topic can be added to the BigQuery table

  • Field constraints can be relaxed from REQUIRED to NULLABLE

  • Schemas can be unionized when records in the same batch have different schemas

Usage Examples

Basic Example

Example with Batch Loading

Example with Upsert Functionality

Troubleshooting

Common Issues

  1. Authentication errors: Ensure your service account key file is correct and has appropriate permissions.

  2. Schema compatibility issues: When schema updates are enabled, existing data might not be compatible with new schemas.

  3. Quota limitations: BigQuery has quotas for API requests; consider adjusting threadPoolSize and queueSize.

Logging

To enable detailed logging for troubleshooting:

Limitations

The BigQuery Sink connector has the following limitations:

  • The connector does not support schemas with recursion.

  • The connector does not support schemas having float fields with NaN or +Infinity values.

  • Auto schema update does not support removing columns.

  • Auto schema update does not support recursive schemas.

Upgrading to 2.x.x

The following changes aren’t backward compatible in the BigQuery connector:

  • datasets was removed and defaultDataset has been introduced. The connector now infers the dataset from the topic name if the topic is in the form <dataset>:<tableName>. If the topic name is in the form <tablename>, the connector defaults to defaultDataset.

  • topicsToTables was removed. You should use SMT RegexRouter to route topics to tables.

Multi-topic support: Can stream from multiple topics to corresponding BigQuery tables
  • Parallel processing: Uses an internal thread pool (default: 10 threads, configurable) for scalable record streaming

  • A list of Kafka topics to read from.

    list

    -

    high

    autoCreateTables

    Create BigQuery tables if they don't already exist. This property should only be enabled for Schema Registry-based inputs: Avro, Protobuf, or JSON Schema (JSON_SR). Table creation is not supported for JSON input.

    boolean

    false

    high

    gcsBucketName

    The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if enableBatchLoad is configured.

    string

    ""

    high

    queueSize

    The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.

    long

    -1

    high

    bigQueryMessageTimePartitioning

    Whether or not to use the message time when inserting records. Default uses the connector processing time.

    boolean

    false

    high

    bigQueryPartitionDecorator

    Whether or not to append partition decorator to BigQuery table name when inserting records. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.

    boolean

    true

    high

    keySource

    Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are FILE and JSON. This property is available in BigQuery sink connector version 1.3 (and later).

    string

    FILE

    medium

    keyfile

    Keyfile can be either a string representation of the Google credentials file or the path to the Google credentials file itself. The string representation of the Google credentials file is supported in BigQuery sink connector version 1.3 (and later).

    string

    null

    medium

    bigQueryRetry

    The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.

    int

    0

    medium

    bigQueryRetryWait

    The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error.

    long

    1000

    medium

    sanitizeTopics

    Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names.

    boolean

    false

    medium

    schemaRetriever

    A class that can be used for automatically creating tables and/or updating schemas. Note that in version 2.0.0, the SchemaRetriever API changed to retrieve the schema from each SinkRecord, which will help support multiple schemas per topic.

    class

    com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever

    medium

    threadPoolSize

    The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.

    int

    10

    medium

    autoCreateBucket

    Whether to automatically create the given bucket, if it does not exist.

    boolean

    true

    medium

    allowNewBigQueryFields

    If true, new fields can be added to BigQuery tables during subsequent schema updates.

    boolean

    false

    medium

    allowBigQueryRequiredFieldRelaxation

    If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE. Note that allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation replaced the autoUpdateSchemas parameter of older versions of this connector.

    boolean

    false

    medium

    allowSchemaUnionization

    If true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates. If false, the record of the last schema in a batch will be used for any necessary table creation and schema update attempts. Note that setting allowSchemaUnionization to false and allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation to true is equivalent to setting autoUpdateSchemas to true in older versions.

    boolean

    false

    medium

    auto.register.schemas

    Specifies if the Serializer should attempt to register the Schema with Schema Registry.

    boolean

    true

    medium

    use.latest.version

    Only applies when auto.register.schemas is set to false. If use.latest.version is set to true, then Schema Registry uses the latest version of the schema in the subject for serialization.

    boolean

    true

    medium

    timestampPartitionFieldName

    The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave blank to enable ingestion time partitioning for each table.

    string

    null

    low

    clusteringPartitionFieldNames

    Comma-separated list of fields where data is clustered in BigQuery.

    list

    null

    low

    timePartitioningType

    The time partitioning type to use when creating tables. Existing tables will not be altered to use this partitioning type.

    string

    DAY

    low

    allBQFieldsNullable

    If true, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays).

    boolean

    false

    low

    avroDataCacheSize

    The size of the cache to use when converting schemas from Avro to Kafka Connect.

    int

    100

    low

    batchLoadIntervalSec

    The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if enableBatchLoad is configured.

    int

    120

    low

    convertDoubleSpecialValues

    Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successful delivery to BigQuery.

    boolean

    false

    low

    enableBatchLoad

    Beta Feature - Use with caution. The sublist of topics to be batch loaded through GCS.

    list

    ""

    low

    includeKafkaData

    Whether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows.

    boolean

    false

    low

    upsertEnabled

    Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won't work with SMTs that change the name of the topic and doesn't support JSON input.

    boolean

    false

    low

    deleteEnabled

    Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (that is–a tombstone record) is read. This feature will not work with SMTs that change the name of the topic and doesn't support JSON input.

    boolean

    false

    low

    intermediateTableSuffix

    A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table.

    string

    "tmp"

    low

    mergeIntervalMs

    How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to -1 to disable periodic flushing.

    long

    60000

    low

    mergeRecordsThreshold

    How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. Can be set to -1 to disable record count-based flushing.

    long

    -1

    low

    kafkaDataFieldName

    The Kafka data field name. The default value is null, which means the Kafka Data field will not be included.

    string

    null

    low

    kafkaKeyFieldName

    The Kafka key field name. The default value is null, which means the Kafka Key field will not be included.

    string

    null

    low

    topic2TableMap

    Map of topics to tables (optional). Format: comma-separated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,... Note that topic name should not be modified using regex SMT while using this option. Also note that SANITIZE_TOPICS_CONFIG would be ignored if this config is set.

    string

    ""

    low

    csfle.enabled

    CSFLE is enabled for the connector if set to True.

    boolean

    False

    low

    FLOAT

    FLOAT64

    BOOLEAN

    Boolean

    BYTES

    Bytes

    TIMESTAMP

    Logical TIMESTAMP

    TIME

    Logical TIME

    DATE

    Logical DATE

    FLOAT

    Logical Decimal

    DATE

    Debezium Date

    TIME

    Debezium MicroTime

    TIME

    Debezium Time

    TIMESTAMP

    Debezium MicroTimestamp

    TIMESTAMP

    Debezium TIMESTAMP

    TIMESTAMP

    Debezium ZonedTimestamp

    Table creation failures: Ensure autoCreateTables is only used with Schema Registry-based inputs (Avro, Protobuf, or JSON Schema).

  • Performance issues: For high-volume data, consider using batch loading via GCS instead of streaming inserts.

  • When the connector is configured with upsertEnabled or deleteEnabled, it does not support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:

    • io.debezium.transforms.ByLogicalTableRouter

    • io.debezium.transforms.outbox.EventRouter

    • org.apache.kafka.connect.transforms.RegexRouter

    • org.apache.kafka.connect.transforms.TimestampRouter

    • io.confluent.connect.transforms.MessageTimestampRouter

    • io.confluent.connect.transforms.ExtractTopic$Key

    autoUpdateSchemas was replaced by allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation.

  • value.converter.enhanced.avro.schema.support should be set to false or removed. If this property is not removed or set to false, you may receive the following error:

  • defaultDataset

    The default dataset to be used. Replaced the datasets parameter of older versions of this connector.

    string

    -

    high

    project

    The BigQuery project to write to.

    string

    -

    high

    STRING

    String

    INTEGER

    INT8

    INTEGER

    INT16

    INTEGER

    INT32

    INTEGER

    INT64

    FLOAT

    FLOAT32

    topics

    Invalid field name
    "com.examples.project-super-important.v1.MyData". Fields must
    contain only letters, numbers, and underscores, start with a letter or
    underscore, and be at most 300 characters long.
    name = kcbq-connect1
    connector.class = com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    tasks.max = 1
    topics = quickstart
    sanitizeTopics = true
    autoCreateTables = true
    allowNewBigQueryFields = true
    allowBigQueryRequiredFieldRelaxation = true
    schemaRetriever = com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
    project = lenses-123
    defaultDataset = ConfluentDataSet
    keyfile = <path to json file>
    transforms = RegexTransformation
    transforms.RegexTransformation.type = org.apache.kafka.connect.transforms.RegexRouter
    transforms.RegexTransformation.regex = (kcbq_)(.*)
    transforms.RegexTransformation.replacement = $2
    name=bigquery-sink
    connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    tasks.max=1
    topics=orders,customers
    project=my-gcp-project
    defaultDataset=kafka_data
    keyfile=/path/to/keyfile.json
    autoCreateTables=true
    name=bigquery-sink
    connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    tasks.max=1
    topics=orders,customers
    project=my-gcp-project
    defaultDataset=kafka_data
    keyfile=/path/to/keyfile.json
    enableBatchLoad=orders,customers
    gcsBucketName=my-gcs-bucket
    autoCreateBucket=true
    name=bigquery-sink
    connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    tasks.max=1
    topics=orders,customers
    project=my-gcp-project
    defaultDataset=kafka_data
    keyfile=/path/to/keyfile.json
    upsertEnabled=true
    mergeIntervalMs=30000
    mergeRecordsThreshold=1000
    log4j.logger.com.wepay.kafka.connect.bigquery=DEBUG