Web Socket API

Lenses provides a rich set of WebSocket APIs as well to interact with Apache Kafka data in real-time. It can be used to build powerful Front-End Web applications with full Lenses SQL support.

To speed up integration we provide a Javascript lib for Kafka developers. if you are using React/Redux you can get going fast and focus on your business requirements.

npm install rxjs redux-lenses-streaming --save

Introduction 

The API is straightforward. Once a WebSocket connection has been opened, the client needs to make sure it follows the protocol. Here is the template for each message the client can send to the back end:

    {
        "type":" SUBSCRIBE/UNSUBSCRIBE/PUBLISH/COMMIT/LOGIN",
        "content":"The json text for the specific request",
        "correlationId":1000,
        "authToken" : "Authorization token or empty"
    }

Below you can find the list of supported functionality:

  • A client can and should AUTHENTICATE first to obtain the token allowing all other requests to be accepted.
  • A client can SUBSCRIBE to a topic via SQL. Please check the Lenses Kafka SQL section for the full details on how to set filters and use functions.
  • A client can UNSUBSCRIBE from a topic.
  • A client can PUBLISH messages to a topic. The current version supports only string/json. In the future, we will add support for Avro.
  • A client can COMMIT the (topic, partition) offsets.
  • A client subscription must specify the decoder type. This allows reading correctly the content of the Kafka Message Key/Value parts.

The following decoders are supported

  • STRING - the byte[] payload received from Kafka is read as a String
  • INT - the byte[] payload received from Kafka is read as an Int
  • LONG - the byte[] payload received from Kafka is read as a Long
  • AVRO - the byte[] payload received from Kafka is read as an Avro
  • JSON - the byte[] payload received from Kafka is read as an Avro
  • BINARY - the byte[] payload received from Kafka is kept as it is in Binary format
FieldTypeDescription
typeStringDescribes the action the back end will take in response to the request. The available values are:LOGIN, SUBSCRIBE, UNSUBSCRIBE, PUBLISH, COMMIT
contentStringContains the Json content of the actual request. The content is strictly related to the type described shortly.
correlationIdLongA unique identifier in order for the client to link the response with the request made.
authTokenStringA unique token identifying the user making the request. This token can only be obtained once the LOGIN request has completed successfully.

The response received from the back end follows this template:

    {
        "correlationId": Long,
        "type"  : "ERROR/INVALIDREQUEST/KAFKAMSG/HEARTBEAT/SUCCESS",
        "content": String
    }
FieldTypeDescription
correlationIdLongThe unique identifier the client has provided in the request associated with the response.
typeStringDescribes what response content the client has received. Available values are: ERROR, INVALIDREQUEST, KAFKAMSG, HEARTBEAT,SUCCESS
contentStringContains the actual response content. Each response type has its own content layout.

Protocol Definition 

All requests made are constrained by user permissions on the back end. If the user has only Read access then publishing a record to a topic will not be allowed. If the user has only No-Data user role, then retrieving messages from Kafka will not be allowed either. See the security section for role definitions.

Login 

The first thing to do when a WebSocket connection has been opened is to obtain an authorization token. To do so the client will have to send the following LOGIN request format:

    {
       "type" : "LOGIN",
       "content" : "{
                     "user" : String,
                     "password" : String,
                    }",
       "correlationId": Long,
       "authToken": String
    }
FieldTypeDescription
contentStringContains a json with two fields user and password to obtain the token for.
correlationIdLongA unique number the back end will send back as part of the response.
authTokenStringFor this request type the authorization token is not validated.

Note: The content field value is a string containing a JSON!

A successful login response will look like this:

    {
      "correlationId" : Long,
      "type" : "SUCCESS",
      "content" : String
    }
FieldTypeDescription
contentStringContains the authorization token
correlationIdLongA unique number sent in the request

If the user or password provided is not correct, the client will receive an error response. In this case, the response format looks like this:

    {
      "correlationId": Long,
      "type" : "ERROR",
      "content": String
    }
FieldTypeDescription
contentStringContains the description error
correlationIdLongA unique number sent in the request

Publishing 

In order to publish a message to a topic the client has to send the following request:

    {
       "type" : "PUBLISH",
       "content" : "{
                     "topic" : String,
                     "key" : String,
                     "value" : String
                    }",
       "correlationId": Long,
       "authToken": String
    }
FieldTypeDescription
contentStringContains a json with three fields: topic, key, and value. The last two fields are optional. Do not set the field if you want to send null values.
correlationIdLongA unique number the back end will send back as part of the response.
authTokenStringThe authorization token. The back end will check if the user roles allows such action.

Note: Remember, the content for key/value are sent to the target Kafka topic are sent as String! The content field value is a string containing a json!

Subscription 

To receive messages from a Kafka topic the client has to send a SUBSCRIBE request.

    {
       "type" : "SUBSCRIBE",
       "content" : "{
           "sqls" : [
               String,
               String
            ]
       }",
       "correlationId" : Long,
       "authToken" : String
    }
FieldTypeDescription
contentStringContains a json with one field: SQLs. The field is and array of LSQL values.
sqlsString[]An array of LSQL values. The format is a SQL like syntax allowing you to use functions, filter and allows for field selection. See template below.
correlationIdLongA unique number the back end will send back as part of the response.
authTokenStringThe authorization token. The back end will check if the user roles allows such action.
SELECT *
FROM $TOPIC
WHERE  _ktype='INT/LONG/JSON/STRING/AVRO'
       AND _vtype='INT/LONG/JSON/STRING/AVRO'
       [AND ...]

You can provide more than one LSQL statement if you want to subscribe to more than 1 topic. Please visit the Lenses Kafka SQL section for full details on what it supports. The response from the back end can be a SUCCESS or an ERROR.

Once the subscription has been successful, messages arriving in the Kafka topic(-s) and matching the filter will be delivered. A message received by the client will have this structure:

    {
      "content": [
        {
           "key" : "...",
           "value" : "{...}",
           "topic" : "topicA",
           "partition" : Int,
           "offset" : Long,
           "timestamp" : Long
        },
        ..
      ],
      "correlationId": Long,
      "type" : "KAFKAMSG"
    }
FieldTypeDescription
contentStringContains a Json with six fields: key, value, topic, partition, offset and timestamp.
content.keyStringContains the Kafka message key value. If the key is null, the field will not be present.
content.valueStringContains the Kafka message value part. If the value is null, the field will not be present.
content.topicStringContains Kafka message topic name.
content.partitionIntContains Kafka message partition number.
content.offsetLongContains Kafka message offset.
content.timestampLongContains the Kafka message timestamp.
correlationIdLongA unique number the back end will send back as part of the response.
authTokenStringThe authorization token. The back end will check if the user roles allows such action.

Note: The timestamp field requires Kafka 0.10.2+ and correct broker settings/or client publishing the timestamp.

Un-subscribe 

A client can choose at any point to stop receiving messages from a given topic(-s). In order to do so it has to send the following message:

    {
       "type" :"UNSUBSCRIBE",
       "content": "{
          "topics": [
             "topic":String,
             ..
          ]
       }",
       "correlationId": Long,
       "authToken" : String,
    }
FieldTypeDescription
contentStringContains a Json with one field: topics. The field should contain an array of strings representing the topics to unsubscribe from.
correlationIdLongA unique number the back end will send back as part of the response.
authTokenStringThe authorization token. The back end will check if the user roles allows such action.

Although the subscription allows you to specify via LSQL the partitions to subscribe to, the unsubscribe does not support selective partition dropping from the subscription.

Note: Executing a subscribe call with a new LSQL for a topic already in the subscription, will unsubscribe first and subscribe again.

Offsets Commit 

The JavaScript client can decide when to commit the offset in Kafka. This way, when the client reopens a connection and resubscribes to the same Kafka topic it will receive the Kafka messages from where it left it.

To commit offsets the client has to send the following message structure:

    {
       "type" :"COMMIT",
       "content": "{
          "commits": [
             {
                "topic": String,
                "partition": Int,
                "offset" : Long
             },
             ...
          ]
       }",
       "correlationId": Long,
       "authToken" : String
    }
FieldTypeDescription
contentStringContains a Json with one field: commits. The field should contain an array of elements with three fields: topic, partition and offset
content.commits.topicStringThe Kafka topic to commit the offsets for
content.commits.partitionIntThe Kafka topic partition to commit the offsets for
content.commits.offsetLongThe offsets number to retain
correlationIdLongA unique number the back end will send back as part of the response
authTokenStringThe authorization token. The back end will check if the user roles allows such action

Since the commits field is an array, more than one (topic, partition, offset) tuple can be provided at once.

Note: The content field value is a string containing a json!

Heartbeat 

The REST API makes sure it keeps the connection open in case there is no data going back and forth between the client and the back end. As a result the client should be able to handle messages with the following structure:

    {
      "type" : "HEARTBEAT"
    }

When such messages are received the client can discard them.