# Azure Service Bus

Stream Reactor Azure Service Bus Sink Connector is designed to effortlessly translate Kafka records into your Azure Service Bus cluster. It leverages Microsoft Azure API to transfer data to Service Bus in a seamless manner, allowing for their safe transition and safekeeping both payloads and metadata (see **Payload support**). It supports both types of Service Buses: Queues and Topics. Azure Service Bus Source Connector provides its user with *AT-LEAST-ONCE* guarantee as the data is committed (marked as read) in Kafka topic (for assigned topic and partition) once Connector verifies it was successfully committed to designated Service Bus topic.

## Connector Class

```
io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector
```

## Full Config Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed in [#storage-to-output-matrix](#storage-to-output-matrix "mention"). Feel free to tweak the configuration to your requirements.

{% code fullWidth="true" %}

```bash
connector.class=io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector
name=AzureEventHubsSinkConnector
tasks.max=1
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
connect.servicebus.kcql=INSERT INTO output-servicebus SELECT * FROM input-topic PROPERTIES('servicebus.type'='QUEUE');
```

{% endcode %}

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="info" %}
You can specify multiple KCQL statements separated by `;` to have the connector map between multiple topics.
{% endhint %}

The following KCQL is supported:

```sql
INSERT INTO <your-service-bus>
SELECT *
FROM <your-kafka-topic>
PROPERTIES(...); 
```

It allows you to map Kafka topic of name `<your-kafka-topic>` to Service Bus of name `<your-service-bus>` using the PROPERTIES specified (please check [#keyed-json-format](#keyed-json-format "mention") for more info on necessary properties)

The selection of fields from the Service Bus message is **not** supported.

## Authentication

You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the **Shared access policies** section of your Azure Portal.

{% code fullWidth="true" %}

```bash
connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=
```

{% endcode %}

Learn more about different methods of connecting to Service Bus on the [Azure Website](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-queues?tabs=passwordless#authenticate-the-app-to-azure).

## QUEUE and TOPIC Mappings <a href="#keyed-json-format" id="keyed-json-format"></a>

The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type **QUEUE** and of type **TOPIC.** Please refer to the relevant sections below. In case of further questions check [Azure Service Bus documentation](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions) to learn more about those mechanisms.

### Writing to QUEUE ServiceBus&#x20;

In order to be writing to the queue there's an additional parameter that you need to pass with your KCQL mapping in the **PROPERTIES** part. This parameter is `servicebus.type` and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in `QUEUE` here and we need to pass it.

{% code fullWidth="true" %}

```properties
connect.servicebus.kcql=INSERT INTO azure-queue SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='QUEUE');
```

{% endcode %}

This is sufficient to enable you to create the mapping with your queue.

### Writing to TOPIC ServiceBus&#x20;

In order to be writing to the topic there is an additional parameter that you need to pass with your KCQL mapping in the **PROPERTIES** part:

1. Parameter `servicebus.type` which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in `TOPIC` here and we need to pass it.

{% code fullWidth="true" %}

```properties
connect.servicebus.kcql=INSERT INTO azure-topic SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='TOPIC');
```

{% endcode %}

This is sufficient to enable you to create the mapping with your topic.

### Disabling batching

If the Connector is supposed to transfer big messages (size of one megabyte and more), Service Bus may not want to accept a batch of such payloads, failing the Connector Task. In order to remediate that you may want to use `batch.enabled` parameter, setting it to `false`. This will sacrifice the ability to send the messages in batch (possibly doing it slower) but should enable user to transfer them safely.

For most of the usages, we recommend omitting it (it's set to `true` by default).

## Kafka payload support <a href="#kafka-payload-support" id="kafka-payload-support"></a>

This sink supports the following Kafka payloads:

* String Schema Key and Binary payload (then `MessageId` in Service Bus is set with Kafka Key)
* any other key (or keyless) and Binary payload (this causes Service Bus messages to not have specified `MessageId`)
* No Schema and JSON

### Null Payload Transfer

{% hint style="warning" %}
Azure Service Bus doesn't allow to send messages with null content (payload)
{% endhint %}

Null Payload (sometimes referred as **Kafka Tombstone**) is a known concept in Kafka messages world. However, because of Service Bus limitations around that matter, we aren't allowed to send messages with null payload and we have to drop them instead.

Please keep that in mind when using Service Bus and designing business logic around null payloads!

## Option Reference <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

### KCQL Properties

Please find below all the necessary KCQL properties:

<table data-full-width="false"><thead><tr><th width="233">Name</th><th width="217">Description</th><th width="104">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>servicebus.type</td><td>Specifies Service Bus type: <code>QUEUE</code> or <code>TOPIC</code></td><td>string</td><td></td></tr><tr><td>batch.enabled</td><td>Specifies if the Connector can send messages in batch, see <a data-mention href="#specifying-the-batching-parameter">#specifying-the-batching-parameter</a></td><td>boolean</td><td>true</td></tr></tbody></table>

### Configuration parameters <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

Please find below all the relevant configuration parameters:

<table data-full-width="false"><thead><tr><th width="340">Name</th><th width="189">Description</th><th width="85">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.servicebus.connection.string</td><td>Specifies the Connection String to connect to Service Bus</td><td>string</td><td></td></tr><tr><td>connect.servicebus.kcql</td><td>Comma-separated output KCQL queries</td><td>string</td><td></td></tr><tr><td>connect.servicebus.sink.retries.max</td><td>Number of retries if message has failed to be delivered to Service Bus</td><td>int</td><td>3</td></tr><tr><td>connect.servicebus.sink.retries.timeout</td><td>Timeout (in milliseconds) between retries if message has failed to be delivered to Service Bus</td><td>int</td><td>500</td></tr></tbody></table>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/azure-service-bus.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
