LogoLogo
HomeProductsDownload Community Edition
  • Lenses DevX
  • Kafka Connectors
  • Overview
  • Understanding Kafka Connect
  • Connectors
    • Install
    • Sources
      • AWS S3
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • GCP PubSub
      • GCP Storage
      • FTP
      • JMS
      • MQTT
    • Sinks
      • AWS S3
      • Azure CosmosDB
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • Elasticsearch
      • GCP PubSub
      • GCP Storage
      • HTTP
      • InfluxDB
      • JMS
      • MongoDB
      • MQTT
      • Redis
      • Google BigQuery
  • Secret Providers
    • Install
    • AWS Secret Manager
    • Azure KeyVault
    • Environment
    • Hashicorp Vault
    • AES256
  • Single Message Transforms
    • Overview
    • InsertFieldTimestampHeaders
    • InsertRecordTimestampHeaders
    • InsertRollingFieldTimestampHeaders
    • InsertRollingRecordTimestampHeaders
    • InsertRollingWallclock
    • InsertRollingWallclockHeaders
    • InsertSourcePartitionOrOffsetValue
    • InsertWallclock
    • InsertWallclockHeaders
    • InsertWallclockDateTimePart
    • TimestampConverter
  • Tutorials
    • Backup & Restore
    • Creating & managing a connector
    • Cloud Storage Examples
      • AWS S3 Source Examples
      • AWS S3 Sink Time Based Partitioning
      • GCP Source
      • GCP Sink Time Based Partitioning
    • Http Sink Templating
    • Sink converters & different data formats
    • Source converters with incoming JSON or Avro
    • Loading XML from Cloud storage
    • Loading ragged width files
    • Using the MQTT Connector with RabbitMQ
    • Using Error Policies
    • Using dead letter queues
  • Contributing
    • Developing a connector
    • Utilities
    • Testing
  • Lenses Connectors Support
  • Downloads
  • Release notes
    • Stream Reactor
    • Secret Providers
    • Single Message Transforms
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Connector Class
  • Full Config Example
  • KCQL support
  • Authentication
  • QUEUE and TOPIC Mappings
  • Writing to QUEUE ServiceBus
  • Writing to TOPIC ServiceBus
  • Disabling batching
  • Kafka payload support
  • Null Payload Transfer
  • Option Reference
  • KCQL Properties
  • Configuration parameters

Was this helpful?

Export as PDF
  1. Connectors
  2. Sinks

Azure Service Bus

This page describes the usage of the Stream Reactor Azure Service Bus Sink Connector.

PreviousAzure Event HubsNextCassandra

Last updated 6 months ago

Was this helpful?

Stream Reactor Azure Service Bus Sink Connector is designed to effortlessly translate Kafka records into your Azure Service Bus cluster. It leverages Microsoft Azure API to transfer data to Service Bus in a seamless manner, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It supports both types of Service Buses: Queues and Topics. Azure Service Bus Source Connector provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Kafka topic (for assigned topic and partition) once Connector verifies it was successfully committed to designated Service Bus topic.

Connector Class

io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector

Full Config Example

For more examples see the .

The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed in Option Reference. Feel free to tweak the configuration to your requirements.

connector.class=io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector
name=AzureEventHubsSinkConnector
tasks.max=1
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
connect.servicebus.kcql=INSERT INTO output-servicebus SELECT * FROM input-topic PROPERTIES('servicebus.type'='QUEUE');

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector map between multiple topics.

The following KCQL is supported:

INSERT INTO <your-service-bus>
SELECT *
FROM <your-kafka-topic>
PROPERTIES(...); 

It allows you to map Kafka topic of name <your-kafka-topic> to Service Bus of name <your-service-bus> using the PROPERTIES specified (please check QUEUE and TOPIC Mappings for more info on necessary properties)

The selection of fields from the Service Bus message is not supported.

Authentication

You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.

connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=

QUEUE and TOPIC Mappings

Writing to QUEUE ServiceBus

In order to be writing to the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE here and we need to pass it.

connect.servicebus.kcql=INSERT INTO azure-queue SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='QUEUE');

This is sufficient to enable you to create the mapping with your queue.

Writing to TOPIC ServiceBus

In order to be writing to the topic there is an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part:

  1. Parameter servicebus.type which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC here and we need to pass it.

connect.servicebus.kcql=INSERT INTO azure-topic SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='TOPIC');

This is sufficient to enable you to create the mapping with your topic.

Disabling batching

If the Connector is supposed to transfer big messages (size of one megabyte and more), Service Bus may not want to accept a batch of such payloads, failing the Connector Task. In order to remediate that you may want to use batch.enabled parameter, setting it to false. This will sacrifice the ability to send the messages in batch (possibly doing it slower) but should enable user to transfer them safely.

For most of the usages, we recommend omitting it (it's set to true by default).

Kafka payload support

This sink supports the following Kafka payloads:

  • String Schema Key and Binary payload (then MessageId in Service Bus is set with Kafka Key)

  • any other key (or keyless) and Binary payload (this causes Service Bus messages to not have specified MessageId)

  • No Schema and JSON

Null Payload Transfer

Azure Service Bus doesn't allow to send messages with null content (payload)

Null Payload (sometimes referred as Kafka Tombstone) is a known concept in Kafka messages world. However, because of Service Bus limitations around that matter, we aren't allowed to send messages with null payload and we have to drop them instead.

Please keep that in mind when using Service Bus and designing business logic around null payloads!

Option Reference

KCQL Properties

Please find below all the necessary KCQL properties:

Name
Description
Type
Default Value

servicebus.type

Specifies Service Bus type: QUEUE or TOPIC

string

batch.enabled

boolean

true

Configuration parameters

Please find below all the relevant configuration parameters:

Name
Description
Type
Default Value

connect.servicebus.connection.string

Specifies the Connection String to connect to Service Bus

string

connect.servicebus.kcql

Comma-separated output KCQL queries

string

connect.servicebus.sink.retries.max

Number of retries if message has failed to be delivered to Service Bus

int

3

connect.servicebus.sink.retries.timeout

Timeout (in milliseconds) between retries if message has failed to be delivered to Service Bus

int

500

Learn more about different methods of connecting to Service Bus on the .

The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check to learn more about those mechanisms.

Specifies if the Connector can send messages in batch, see

tutorials
Azure Website
Azure Service Bus documentation
Azure Service Bus