# GCP PubSub

The Kafka connector is designed to seamlessly ingest records from GCP Pub/Sub topics and queues into your Kafka cluster. This makes it useful for backing up or streaming data from Pub/Sub to your Kafka infrastructure. This connector provides robust support for at least once semantics (this connector ensures that each record reaches the Kafka topic at least once).

## Connector Class

```bash
io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code fullWidth="false" %}

```properties
name=GcpPubSubSourceDemo
connector.class=io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnector
topics=kafka_topic_to_write_to
tasks.max=1
connect.pubsub.gcp.auth.mode=File
connect.pubsub.gcp.file=/path/to/gcp-service-account-key.json
connect.pubsub.gcp.project.id=gcp-project-id
connect.pubsub.kcql=insert into `kafka_topic_to_write_to` select * from `gcp-subscription-id`
```

{% endcode %}

## KCQL Support

{% hint style="success" %}
You can specify multiple KCQL statements separated by `;` to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.
{% endhint %}

The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

```sql
INSERT INTO kafka-topic
SELECT *
FROM subscriptionId
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]
```

Please note that you can employ escaping within KCQL for the INSERT INTO and SELECT \* FROM clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

```sql
INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefix
```

{% hint style="warning" %}
The connector does not support multiple KCQL statements that reference the same source location; to use multiple statements, configure each one in a separate connector instance.
{% endhint %}

### Source Subscription ID and Target Topic

The source and target of the data are specified via the `INSERT INTO... SELECT * FROM` clause. The connector will write all the records to the given topic, from the given subscription:

```sql
INSERT INTO my-topic SELECT * FROM subscriptionId;
```

### Properties

The `PROPERTIES` clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ',').

{% hint style="success" %}
Properties can be defined in any order.
{% endhint %}

The following properties are supported:

| Name       | Description                                                                                                                                                                        | Type | Default Value |
| ---------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---- | ------------- |
| batch.size | The maximum number of messages the connector will retrieve and process at one time per polling request (per KCQL mapping).                                                         | int  | 1000          |
| cache.ttl  | The maximum amount of time (in milliseconds) to store message data to allow acknowledgement of a message.                                                                          | long | 1 hour        |
| queue.max  | Data is loaded into a queue asynchronously so that it stands ready when the `poll` call is activated. Control the maximum number of records to hold in the queue per KCQL mapping. | int  | 10000         |

## Auth Mode

The connector offers three distinct authentication modes:

* **Default**: This mode relies on the default GCP authentication chain, simplifying the authentication process.
* **File**: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
* **Credentials**: In this mode, explicit configuration of a GCP Credentials string is required for authentication.

The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration.

```properties
connect.pubsub.gcp.auth.mode=Default
```

When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described [here](https://cloud.google.com/docs/authentication/application-default-credentials).

Here's an example configuration for the "Credentials" mode:

```properties
connect.pubsub.gcp.auth.mode=Credentials
connect.pubsub.gcp.credentials=$GCP_CREDENTIALS
connect.pubsub.gcp.project.id=$GCP_PROJECT_ID
```

And here is an example configuration using the "File" mode:

```properties
connect.pubsub.gcp.auth.mode=File
connect.pubsub.gcp.file=/home/secure-stuff/gcp-read-credential.txt
```

Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.

For enhanced security and flexibility when using either the "Credentials" mode, it is highly advisable to utilize Connect Secret Providers.

## Output Modes

Two modes are available: **Default Mode** and **Compatibility Mode**.

**Compatibility Mode** is intended to ensure compatibility with existing tools, while **Default Mode** offers a simpler modern redesign of the functionality.

You can choose whichever suits your requirements.

### Default Mode

#### **Configuration**

```properties
connect.pubsub.output.mode=DEFAULT
```

### **Record Schema**

Each Pub/Sub message is transformed into a single Kafka record, structured as follows:

* **Kafka Key**: A String of the Pub/Sub MessageID.
* **Kafka Value**: The Pub/Sub message value as BYTES.
* **Kafka Headers**: Includes the "PublishTimestamp" (in seconds) and all Pub/Sub message attributes mapped as separate headers.

#### **Key Schema**

The Kafka Key is mapped from the Pub/Sub MessageID, a unique ID for a Pub/Sub message.

#### **Value Schema**

The Kafka Value is mapped from the body of the Pub/Sub message.

#### **Headers Schema**

The Kafka Headers include:

* **PublishTimestamp**: Long value representing the time when the Pub/Sub message was published, in seconds.
* **GCPProjectID**: The GCP Project
* **PubSubTopicID**: The Pub/Sub Topic ID.
* **PubSubSubscriptionID**: The Pub/Sub Subscription ID.
* **All Pub/Sub message attributes**: Each attribute from the Pub/Sub message is mapped as a separate header.

### Compatibility Mode

#### **Configuration**

```properties
connect.pubsub.output.mode=COMPATIBILITY
```

#### **Record Schema**

Each Pub/Sub message is transformed into a single Kafka record, structured as follows:

* **Kafka Key**: Comprises the project ID, message ID, and subscription ID of the Pub/Sub message.
* **Kafka Value**: Contains the message data and attributes from the Pub/Sub message.

#### **Key Schema**

The Key is a structure with these fields:

| Field Name     | Schema Type | Description                                                              |
| -------------- | ----------- | ------------------------------------------------------------------------ |
| ProjectId      | String      | The Pub/Sub project containing the topic from which messages are polled. |
| TopicId        | String      | The Pub/Sub topic containing the messages.                               |
| SubscriptionId | String      | The Pub/Sub subscription of the Pub/Sub topic.                           |
| MessageId      | String      | A unique ID for a Pub/Sub message                                        |

#### **Value Schema**

The Value is a structure with these fields:

| Field Name   | Schema Type     | Description                                            |
| ------------ | --------------- | ------------------------------------------------------ |
| MessageData  | Optional String | The body of the Pub/Sub message.                       |
| AttributeMap | Optional String | The attribute map associated with the Pub/Sub message. |

## Option Reference

| Name                           | Description                                                                               | Type   | Available Values                    | Default Value |
| ------------------------------ | ----------------------------------------------------------------------------------------- | ------ | ----------------------------------- | ------------- |
| connect.pubsub.gcp.auth.mode   | Specifies the authentication mode for connecting to GCP.                                  | string | Credentials, File or Default        | Default       |
| connect.pubsub.gcp.credentials | For “auth.mode” credentials: GCP Authentication credentials string.                       | string |                                     | (Empty)       |
| connect.pubsub.gcp.file        | For “auth.mode” file: Local file path for file containing GCP authentication credentials. | string |                                     | (Empty)       |
| connect.pubsub.gcp.project.id  | GCP Project ID.                                                                           | string |                                     | (Empty)       |
| connect.pubsub.kcql            | Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour      | string | [kcql configuration](#kcql-support) |               |
| connect.pubsub.output.mode     | Output mode. Please see [Output Modes](#output-modes) documentation below.                |        | Default or Compatibility            | Default       |
