# Azure Service Bus

This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see **Payload support**). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.

## Connector Class

```
io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector
```

## Full Config Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.

{% code fullWidth="true" %}

```properties
connector.class=io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector
name=AzureServiceBusSourceConnector
tasks.max=1
connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
connect.servicebus.kcql=INSERT INTO output-topic SELECT * FROM servicebus-queue PROPERTIES('servicebus.type'='QUEUE');
```

{% endcode %}

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by `;` to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.
{% endhint %}

The following KCQL is supported:

```sql
INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-service-bus> 
PROPERTIES(...); 
```

It allows you to map Service Bus of name `<your-service-bus>` to Kafka topic of name `<your-kafka-topic>` using the PROPERTIES specified.

{% hint style="warning" %}
The selection of fields from the Service Bus message is **not** supported.
{% endhint %}

## Payload support

Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.

### Key Format (Schema)

<table data-full-width="false"><thead><tr><th>Field Name</th><th>Schema Type</th><th>Description</th></tr></thead><tbody><tr><td>MessageId</td><td>String</td><td>The message identifier that uniquely identifies the message and its payload.</td></tr></tbody></table>

### Payload Format (Schema)

<table data-header-hidden data-full-width="true"><thead><tr><th width="265"></th><th></th><th></th></tr></thead><tbody><tr><td>Field Name</td><td>Schema Type</td><td>Description</td></tr><tr><td>deliveryCount</td><td>int64</td><td>The number of the times this message was delivered to clients.</td></tr><tr><td>enqueuedTimeUtc</td><td>int64</td><td>The time at which this message was enqueued in Azure Service Bus.</td></tr><tr><td>contentType</td><td>Optional String</td><td>The content type of this message.</td></tr><tr><td>label</td><td>Optional String</td><td>The application specific message label.</td></tr><tr><td>correlationId</td><td>Optional String</td><td>The correlation identifier.</td></tr><tr><td>messageProperties</td><td>Optional String</td><td>The map of user application properties of this message.</td></tr><tr><td>partitionKey</td><td>Optional String</td><td>The partition key for sending a message to a partitioned entity.</td></tr><tr><td>replyTo</td><td>Optional String</td><td>The address of an entity to send replies to.</td></tr><tr><td>replyToSessionId</td><td>Optional String</td><td>The session identifier augmenting the ReplyTo address.</td></tr><tr><td>deadLetterSource</td><td>Optional String</td><td>The name of the queue or subscription that this message was enqueued on, before it was deadlettered.</td></tr><tr><td>timeToLive</td><td>int64</td><td>The duration before this message expires.</td></tr><tr><td>lockedUntilUtc</td><td>Optional int64</td><td>The time when the lock of this message expires.</td></tr><tr><td>sequenceNumber</td><td>Optional int64</td><td>The unique number assigned to a message by Azure Service Bus.</td></tr><tr><td>sessionId</td><td>Optional String</td><td>The session identifier for a session-aware entity.</td></tr><tr><td>lockToken</td><td>Optional String</td><td>The lock token for the current message.</td></tr><tr><td>messageBody</td><td>Optional bytes</td><td>The body of this message as a byte array.</td></tr><tr><td>getTo</td><td>Optional String</td><td>The “to” address.</td></tr></tbody></table>

## Authentication

You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the **Shared access policies** section of your Azure Portal.

{% code fullWidth="true" %}

```properties
connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=
```

{% endcode %}

Learn more about different methods of connecting to Service Bus on the [Azure Website](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-queues?tabs=passwordless#authenticate-the-app-to-azure).

## Queues & Topics <a href="#keyed-json-format" id="keyed-json-format"></a>

The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type **QUEUE** and of type **TOPIC.** Please refer to the relevant sections below. In case of further questions check [Azure Service Bus documentation](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions) to learn more about those mechanisms.

### Reading from Queues

In order to be reading from the queue there's an additional parameter that you need to pass with your KCQL mapping in the **PROPERTIES** part. This parameter is `servicebus.type` and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in `QUEUE` here and we need to pass it.

{% code fullWidth="true" %}

```properties
connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-queue PROPERTIES('servicebus.type'='QUEUE');
```

{% endcode %}

This is sufficient to enable you to create the mapping with your queue.

### Reading from Topics

In order to be reading from the topic there are two additional parameters that you need to pass with your KCQL mapping in the **PROPERTIES** part:

1. Parameter `servicebus.type` which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in `TOPIC` here and we need to pass it.
2. Parameter `subscription.name` which takes the (case-sensitive) value of a subscription name that you've created for this topic for the connector to use. Please use Azure Portal to create one.

{% hint style="warning" %}
Make sure your subscription exists otherwise you will get a similar error to this

Caused by: com.azure.core.amqp.exception.AmqpException: The messaging entity 'streamreactor:Topic:my-topic|lenses' could not be found. To know more visit <https://aka.ms/sbResourceMgrExceptions>.

Create the subscription per topic in the Azure Portal.
{% endhint %}

{% code fullWidth="true" %}

```properties
connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-topic PROPERTIES('servicebus.type'='TOPIC','subscription.name'='subscription1');
```

{% endcode %}

This is sufficient to enable you to create the mapping with your topic.

## Option Reference <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

### KCQL Properties

Please find below all the necessary KCQL properties:

<table data-full-width="false"><thead><tr><th width="192">Name</th><th width="319">Description</th><th width="90">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>servicebus.type</td><td>Specifies Service Bus type: <code>QUEUE</code> or <code>TOPIC</code></td><td>string</td><td></td></tr><tr><td>subscription.name</td><td>Specifies subscription name if Service Bus type is <code>TOPIC</code></td><td>string</td><td></td></tr></tbody></table>

### Configuration parameters <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

Please find below all the relevant configuration parameters:

<table data-full-width="false"><thead><tr><th width="356">Name</th><th width="194">Description</th><th width="90">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.servicebus.connection.string</td><td>Specifies the Connection String to connect to Service Bus</td><td>string</td><td></td></tr><tr><td>connect.servicebus.kcql</td><td>Comma-separated output KCQL queries</td><td>string</td><td></td></tr><tr><td>connect.servicebus.source.task.records.queue.size</td><td>Specifies the Queue size between Service Bus Receivers and Kafka</td><td>int</td><td>5000</td></tr><tr><td>connect.servicebus.source.sleep.on.empty.poll.ms</td><td>The duration in milliseconds to sleep when no records are returned from the poll. This avoids a tight loop in Connect.</td><td>long</td><td>250</td></tr><tr><td>connect.servicebus.source.complete.retries.max</td><td>The maximum number of retries to complete a message.</td><td>int</td><td>3</td></tr><tr><td>connect.servicebus.source.complete.retries.min.backoff.ms</td><td>The minimum duration in milliseconds for the first backoff</td><td>long</td><td>1000</td></tr><tr><td>connect.servicebus.source.prefetch.count</td><td>The number of messages to prefetch from the Azure Service Bus.</td><td>int</td><td>2000</td></tr></tbody></table>
