# GCP BigQuery

The Google BigQuery sink connector is an open-source connector imported from Confluent (originally developed by WePay) that enables you to export data from Apache Kafka® topics to Google BigQuery tables.

### Overview

The BigQuery sink connector allows you to:

* Stream data from Kafka topics to BigQuery tables
* Automatically create tables based on topic data
* Configure data delivery semantics (at-least-once or exactly-once)
* Perform schema evolution when topic schemas change

### Prerequisites

Before using the BigQuery sink connector, ensure you have:

1. A Google Cloud Platform (GCP) account
2. A BigQuery project with appropriate permissions
3. Service account credentials with access to BigQuery
4. Kafka topics with data to be exported

### Configuration

#### Basic Configuration

Here's a basic configuration for the BigQuery sink connector:

```properties
name = kcbq-connect1
connector.class = com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max = 1
topics = quickstart
sanitizeTopics = true
autoCreateTables = true
allowNewBigQueryFields = true
allowBigQueryRequiredFieldRelaxation = true
schemaRetriever = com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
project = lenses-123
defaultDataset = ConfluentDataSet
keyfile = <path to json file>
transforms = RegexTransformation
transforms.RegexTransformation.type = org.apache.kafka.connect.transforms.RegexRouter
transforms.RegexTransformation.regex = (kcbq_)(.*)
transforms.RegexTransformation.replacement = $2
```

### Features of Google BigQuery Sink Connector

* **Multiple tasks support**: Configure using `tasks.max` parameter for performance optimization when parsing multiple files
* **InsertAll API features**: Supports insert operations with built-in duplicate detection capabilities
* **Real-time streaming**: Records are inserted one at a time and available immediately for querying
* **Multi-topic support**: Can stream from multiple topics to corresponding BigQuery tables
* **Parallel processing**: Uses an internal thread pool (default: 10 threads, configurable) for scalable record streaming

#### Important Configuration Properties

<table><thead><tr><th width="183.0234375">Property</th><th width="213.96484375">Description</th><th>Type</th><th>Default</th><th>Importance</th></tr></thead><tbody><tr><td><code>defaultDataset</code></td><td>The default dataset to be used. Replaced the datasets parameter of older versions of this connector.</td><td>string</td><td>-</td><td>high</td></tr><tr><td><code>project</code></td><td>The BigQuery project to write to.</td><td>string</td><td>-</td><td>high</td></tr><tr><td><code>topics</code></td><td>A list of Kafka topics to read from.</td><td>list</td><td>-</td><td>high</td></tr><tr><td><code>autoCreateTables</code></td><td>Create BigQuery tables if they don't already exist. This property should only be enabled for Schema Registry-based inputs: Avro, Protobuf, or JSON Schema (JSON_SR). Table creation is not supported for JSON input.</td><td>boolean</td><td>false</td><td>high</td></tr><tr><td><code>gcsBucketName</code></td><td>The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if enableBatchLoad is configured.</td><td>string</td><td>""</td><td>high</td></tr><tr><td><code>queueSize</code></td><td>The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.</td><td>long</td><td>-1</td><td>high</td></tr><tr><td><code>bigQueryMessageTimePartitioning</code></td><td>Whether or not to use the message time when inserting records. Default uses the connector processing time.</td><td>boolean</td><td>false</td><td>high</td></tr><tr><td><code>bigQueryPartitionDecorator</code></td><td>Whether or not to append partition decorator to BigQuery table name when inserting records. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.</td><td>boolean</td><td>true</td><td>high</td></tr><tr><td><code>keySource</code></td><td>Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are FILE and JSON. This property is available in BigQuery sink connector version 1.3 (and later).</td><td>string</td><td>FILE</td><td>medium</td></tr><tr><td><code>keyfile</code></td><td>Keyfile can be either a string representation of the Google credentials file or the path to the Google credentials file itself. The string representation of the Google credentials file is supported in BigQuery sink connector version 1.3 (and later).</td><td>string</td><td>null</td><td>medium</td></tr><tr><td><code>bigQueryRetry</code></td><td>The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.</td><td>int</td><td>0</td><td>medium</td></tr><tr><td><code>bigQueryRetryWait</code></td><td>The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error.</td><td>long</td><td>1000</td><td>medium</td></tr><tr><td><code>sanitizeTopics</code></td><td>Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names.</td><td>boolean</td><td>false</td><td>medium</td></tr><tr><td><code>schemaRetriever</code></td><td>A class that can be used for automatically creating tables and/or updating schemas. Note that in version 2.0.0, the SchemaRetriever API changed to retrieve the schema from each SinkRecord, which will help support multiple schemas per topic.</td><td>class</td><td>com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever</td><td>medium</td></tr><tr><td><code>threadPoolSize</code></td><td>The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.</td><td>int</td><td>10</td><td>medium</td></tr><tr><td><code>autoCreateBucket</code></td><td>Whether to automatically create the given bucket, if it does not exist.</td><td>boolean</td><td>true</td><td>medium</td></tr><tr><td><code>allowNewBigQueryFields</code></td><td>If true, new fields can be added to BigQuery tables during subsequent schema updates.</td><td>boolean</td><td>false</td><td>medium</td></tr><tr><td><code>allowBigQueryRequiredFieldRelaxation</code></td><td>If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE. Note that allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation replaced the autoUpdateSchemas parameter of older versions of this connector.</td><td>boolean</td><td>false</td><td>medium</td></tr><tr><td><code>allowSchemaUnionization</code></td><td>If true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates. If false, the record of the last schema in a batch will be used for any necessary table creation and schema update attempts. Note that setting allowSchemaUnionization to false and allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation to true is equivalent to setting autoUpdateSchemas to true in older versions.</td><td>boolean</td><td>false</td><td>medium</td></tr><tr><td><code>auto.register.schemas</code></td><td>Specifies if the Serializer should attempt to register the Schema with Schema Registry.</td><td>boolean</td><td>true</td><td>medium</td></tr><tr><td><code>use.latest.version</code></td><td>Only applies when auto.register.schemas is set to false. If use.latest.version is set to true, then Schema Registry uses the latest version of the schema in the subject for serialization.</td><td>boolean</td><td>true</td><td>medium</td></tr><tr><td><code>timestampPartitionFieldName</code></td><td>The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave blank to enable ingestion time partitioning for each table.</td><td>string</td><td>null</td><td>low</td></tr><tr><td><code>clusteringPartitionFieldNames</code></td><td>Comma-separated list of fields where data is clustered in BigQuery.</td><td>list</td><td>null</td><td>low</td></tr><tr><td><code>timePartitioningType</code></td><td>The time partitioning type to use when creating tables. Existing tables will not be altered to use this partitioning type.</td><td>string</td><td>DAY</td><td>low</td></tr><tr><td><code>allBQFieldsNullable</code></td><td>If true, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays).</td><td>boolean</td><td>false</td><td>low</td></tr><tr><td><code>avroDataCacheSize</code></td><td>The size of the cache to use when converting schemas from Avro to Kafka Connect.</td><td>int</td><td>100</td><td>low</td></tr><tr><td><code>batchLoadIntervalSec</code></td><td>The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if enableBatchLoad is configured.</td><td>int</td><td>120</td><td>low</td></tr><tr><td><code>convertDoubleSpecialValues</code></td><td>Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successful delivery to BigQuery.</td><td>boolean</td><td>false</td><td>low</td></tr><tr><td><code>enableBatchLoad</code></td><td>Beta Feature - Use with caution. The sublist of topics to be batch loaded through GCS.</td><td>list</td><td>""</td><td>low</td></tr><tr><td><code>includeKafkaData</code></td><td>Whether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows.</td><td>boolean</td><td>false</td><td>low</td></tr><tr><td><code>upsertEnabled</code></td><td>Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won't work with SMTs that change the name of the topic and doesn't support JSON input.</td><td>boolean</td><td>false</td><td>low</td></tr><tr><td><code>deleteEnabled</code></td><td>Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (that is–a tombstone record) is read. This feature will not work with SMTs that change the name of the topic and doesn't support JSON input.</td><td>boolean</td><td>false</td><td>low</td></tr><tr><td><code>intermediateTableSuffix</code></td><td>A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table.</td><td>string</td><td>"tmp"</td><td>low</td></tr><tr><td><code>mergeIntervalMs</code></td><td>How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to -1 to disable periodic flushing.</td><td>long</td><td>60000</td><td>low</td></tr><tr><td><code>mergeRecordsThreshold</code></td><td>How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. Can be set to -1 to disable record count-based flushing.</td><td>long</td><td>-1</td><td>low</td></tr><tr><td><code>kafkaDataFieldName</code></td><td>The Kafka data field name. The default value is null, which means the Kafka Data field will not be included.</td><td>string</td><td>null</td><td>low</td></tr><tr><td><code>kafkaKeyFieldName</code></td><td>The Kafka key field name. The default value is null, which means the Kafka Key field will not be included.</td><td>string</td><td>null</td><td>low</td></tr><tr><td><code>topic2TableMap</code></td><td>Map of topics to tables (optional). Format: comma-separated tuples, e.g. &#x3C;topic-1>:&#x3C;table-1>,&#x3C;topic-2>:&#x3C;table-2>,... Note that topic name should not be modified using regex SMT while using this option. Also note that SANITIZE_TOPICS_CONFIG would be ignored if this config is set.</td><td>string</td><td>""</td><td>low</td></tr><tr><td><code>csfle.enabled</code></td><td>CSFLE is enabled for the connector if set to True.</td><td>boolean</td><td>False</td><td>low</td></tr></tbody></table>

### Data Mapping

#### Data Type Conversions

The connector maps Kafka Connect schema types to BigQuery data types as follows:

| BigQuery Data Type | Connector Mapping         |
| ------------------ | ------------------------- |
| `STRING`           | `String`                  |
| `INTEGER`          | `INT8`                    |
| `INTEGER`          | `INT16`                   |
| `INTEGER`          | `INT32`                   |
| `INTEGER`          | `INT64`                   |
| `FLOAT`            | `FLOAT32`                 |
| `FLOAT`            | `FLOAT64`                 |
| `BOOLEAN`          | `Boolean`                 |
| `BYTES`            | `Bytes`                   |
| `TIMESTAMP`        | `Logical TIMESTAMP`       |
| `TIME`             | `Logical TIME`            |
| `DATE`             | `Logical DATE`            |
| `FLOAT`            | `Logical Decimal`         |
| `DATE`             | `Debezium Date`           |
| `TIME`             | `Debezium MicroTime`      |
| `TIME`             | `Debezium Time`           |
| `TIMESTAMP`        | `Debezium MicroTimestamp` |
| `TIMESTAMP`        | `Debezium TIMESTAMP`      |
| `TIMESTAMP`        | `Debezium ZonedTimestamp` |

#### Schema Evolution

When schema evolution is enabled (using `allowNewBigQueryFields`, `allowBigQueryRequiredFieldRelaxation`, and `allowSchemaUnionization`), the connector can handle schema changes:

* New fields added to the Kafka topic can be added to the BigQuery table
* Field constraints can be relaxed from REQUIRED to NULLABLE
* Schemas can be unionized when records in the same batch have different schemas

### Usage Examples

#### Basic Example

```properties
name=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
autoCreateTables=true
```

#### Example with Batch Loading

```properties
name=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
enableBatchLoad=orders,customers
gcsBucketName=my-gcs-bucket
autoCreateBucket=true
```

#### Example with Upsert Functionality

```properties
name=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
upsertEnabled=true
mergeIntervalMs=30000
mergeRecordsThreshold=1000
```

### Troubleshooting

#### Common Issues

1. **Authentication errors**: Ensure your service account key file is correct and has appropriate permissions.
2. **Schema compatibility issues**: When schema updates are enabled, existing data might not be compatible with new schemas.
3. **Quota limitations**: BigQuery has quotas for API requests; consider adjusting `threadPoolSize` and `queueSize`.
4. **Table creation failures**: Ensure `autoCreateTables` is only used with Schema Registry-based inputs (Avro, Protobuf, or JSON Schema).
5. **Performance issues**: For high-volume data, consider using batch loading via GCS instead of streaming inserts.

#### Logging

To enable detailed logging for troubleshooting:

```properties
log4j.logger.com.wepay.kafka.connect.bigquery=DEBUG
```

### Limitations

The BigQuery Sink connector has the following limitations:

* The connector does not support schemas with recursion.
* The connector does not support schemas having float fields with NaN or +Infinity values.
* Auto schema update does not support removing columns.
* Auto schema update does not support recursive schemas.
* When the connector is configured with `upsertEnabled` or `deleteEnabled`, it does not support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:
  * `io.debezium.transforms.ByLogicalTableRouter`
  * `io.debezium.transforms.outbox.EventRouter`
  * `org.apache.kafka.connect.transforms.RegexRouter`
  * `org.apache.kafka.connect.transforms.TimestampRouter`
  * `io.confluent.connect.transforms.MessageTimestampRouter`
  * `io.confluent.connect.transforms.ExtractTopic$Key`

### Upgrading to 2.x.x

The following changes aren’t backward compatible in the BigQuery connector:

* `datasets` was removed and `defaultDataset` has been introduced. The connector now infers the dataset from the topic name if the topic is in the form `<dataset>:<tableName>`. If the topic name is in the form `<tablename>`, the connector defaults to `defaultDataset`.
* `topicsToTables` was removed. You should use SMT RegexRouter to route topics to tables.
* `autoUpdateSchemas` was replaced by `allowNewBigQueryFields` and `allowBigQueryRequiredFieldRelaxation`.
* `value.converter.enhanced.avro.schema.support` should be set to false or removed. If this property is not removed or set to false, you may receive the following error:

  ```
  Invalid field name
  "com.examples.project-super-important.v1.MyData". Fields must
  contain only letters, numbers, and underscores, start with a letter or
  underscore, and be at most 300 characters long.
  ```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/google-bigquery.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
