ServiceBus


This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.

KCQL support 

The following KCQL is supported:

INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-service-bus> 
PROPERTIES(...); 

The selection of fields from the Service Bus message is not supported.


Connector Class 

io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector

Configuration 

Full Config Example 

The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.

connector.class=io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector
name=AzureServiceBusSourceConnector
tasks.max=1
connect.servicebus.connection.string=password="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
connect.servicebus.kcql=INSERT INTO output-topic SELECT * FROM servicebus-queue PROPERTIES('servicebus.type'='QUEUE');

Payload support 

Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.

Key Format (Schema) 

Field NameSchema TypeDescription
MessageIdStringThe message identifier that uniquely identifies the message and its payload.

Payload Format (Schema) 

Field NameSchema TypeDescription
deliveryCountint64The number of the times this message was delivered to clients.
enqueuedTimeUtcint64The time at which this message was enqueued in Azure Service Bus.
contentTypeStringThe content type of this message.
labelStringThe application specific message label.
correlationIdOptional StringThe correlation identifier.
messagePropertiesOptional StringThe map of user application properties of this message.
partitionKeyOptional StringThe partition key for sending a message to a partitioned entity.
replyToOptional StringThe address of an entity to send replies to.
replyToSessionIdOptional StringThe session identifier augmenting the ReplyTo address.
deadLetterSourceOptional StringThe name of the queue or subscription that this message was enqueued on, before it was deadlettered.
timeToLiveint64The duration before this message expires.
lockedUntilUtcOptional int64The time when the lock of this message expires.
sequenceNumberOptional int64The unique number assigned to a message by Azure Service Bus.
sessionIdOptional StringThe session identifier for a session-aware entity.
lockTokenOptional StringThe lock token for the current message.
messageBodybytesThe body of this message as a byte array.
getToOptional StringThe “to” address.

Authentication 

You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.

Example:

connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=

Learn more about different methods of connecting to Service Bus on the Azure Website.

Option Reference 

KCQL Properties 

Please find below all the necessary KCQL properties:

namedescriptiontypedefault value
servicebus.typeSpecifies Service Bus type: QUEUE or TOPICstring
subscription.nameSpecifies subscription name if Service Bus type is TOPICstring

Configuration parameters 

Please find below all the relevant configuration parameters:

namedescriptiontypedefault value
connect.servicebus.connection.stringSpecifies the Connection String to connect to Service Busstring
connect.servicebus.kcqlComma-separated output KCQL queriesstring
connect.servicebus.source.task.records.queue.sizeSpecifies the Queue size between Service Bus Receivers and Kafkaint20
--
Last modified: September 2, 2024