Azure Service Bus
This page describes the usage of the Stream Reactor Azure Service Bus Source Connector.
This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.
Connector Class
Full Config Example
For more examples see the tutorials.
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.
KCQL support
You can specify multiple KCQL statements separated by ;
to have the connector map between multiple topics.
The following KCQL is supported:
It allows you to map Service Bus of name <your-service-bus>
to Kafka topic of name <your-kafka-topic>
using the PROPERTIES specified.
The selection of fields from the Service Bus message is not supported.
Payload support
Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.
Key Format (Schema)
MessageId
String
The message identifier that uniquely identifies the message and its payload.
Payload Format (Schema)
Field Name
Schema Type
Description
deliveryCount
int64
The number of the times this message was delivered to clients.
enqueuedTimeUtc
int64
The time at which this message was enqueued in Azure Service Bus.
contentType
Optional String
The content type of this message.
label
Optional String
The application specific message label.
correlationId
Optional String
The correlation identifier.
messageProperties
Optional String
The map of user application properties of this message.
partitionKey
Optional String
The partition key for sending a message to a partitioned entity.
replyTo
Optional String
The address of an entity to send replies to.
replyToSessionId
Optional String
The session identifier augmenting the ReplyTo address.
deadLetterSource
Optional String
The name of the queue or subscription that this message was enqueued on, before it was deadlettered.
timeToLive
int64
The duration before this message expires.
lockedUntilUtc
Optional int64
The time when the lock of this message expires.
sequenceNumber
Optional int64
The unique number assigned to a message by Azure Service Bus.
sessionId
Optional String
The session identifier for a session-aware entity.
lockToken
Optional String
The lock token for the current message.
messageBody
Optional bytes
The body of this message as a byte array.
getTo
Optional String
The “to” address.
Authentication
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Learn more about different methods of connecting to Service Bus on the Azure Website.
Queues & Topics
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check Azure Service Bus documentation to learn more about those mechanisms.
Reading from Queues
In order to be reading from the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type
and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE
here and we need to pass it.
This is sufficient to enable you to create the mapping with your queue.
Reading from Topics
In order to be reading from the topic there are two additional parameters that you need to pass with your KCQL mapping in the PROPERTIES part:
Parameter
servicebus.type
which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested inTOPIC
here and we need to pass it.Parameter
subscription.name
which takes the (case-sensitive) value of a subscription name that you've created for this topic for the connector to use. Please use Azure Portal to create one.
This is sufficient to enable you to create the mapping with your topic.
Option Reference
KCQL Properties
Please find below all the necessary KCQL properties:
servicebus.type
Specifies Service Bus type: QUEUE
or TOPIC
string
subscription.name
Specifies subscription name if Service Bus type is TOPIC
string
Configuration parameters
Please find below all the relevant configuration parameters:
connect.servicebus.connection.string
Specifies the Connection String to connect to Service Bus
string
connect.servicebus.kcql
Comma-separated output KCQL queries
string
connect.servicebus.source.task.records.queue.size
Specifies the Queue size between Service Bus Receivers and Kafka
int
20
connect.servicebus.source.sleep.on.empty.poll.ms
The duration in milliseconds to sleep when no records are returned from the poll. This avoids a tight loop in Connect.
long
250
connect.servicebus.source.complete.retries.max
The maximum number of retries to complete a message.
int
3
connect.servicebus.source.complete.retries.min.backoff.ms
The minimum duration in milliseconds for the first backoff
long
1000
connect.servicebus.source.prefetch.count
The number of messages to prefetch from the Azure Service Bus.
int
2000
Last updated