1.0
Web Socket API
With Kafka adoption rate increasing we see now clients wanting Front-End Web Application hooked into Apache Kafka. Those applications can get the benefit of working with Apache Kafka while leveraging SQL capabilities.
Lenses comes with a REST API allowing JavaScript clients to work with Apache Kafka over a WebSocket. To speed up integration we provide an open source client JavaScript library; if you are using ReactJS (Angular to follow soon) you can get going fast and focus on your business requirements.
Below you can find the list of supported functionality:
- A client can and should
AUTHENTICATE
first to obtain the token allowing all other requests to be accepted. - A client can
SUBSCRIBE
to a topic via SQL. Please check the Lenses Kafka SQL section for the full details on how to set filters and use functions. - A client can
UNSUBSCRIBE
from a topic. - A client can
PUBLISH
messages to a topic. The current version supports only string/json. In the future, we will add support for Avro. - A client can
COMMIT
the (topic, partition) offsets. - A client subscription must specify the decoder type. This allows reading correctly the content of the Kafka Message Key/Value parts.
The following decoders are supported
STRING
- the byte[] payload received from Kafka is read as aString
INT
- the byte[] payload received from Kafka is read as anInt
LONG
- the byte[] payload received from Kafka is read as aLong
AVRO
- the byte[] payload received from Kafka is read as anAvro
JSON
- the byte[] payload received from Kafka is read as anAvro
BINARY
- the byte[] payload received from Kafka is kept as it is inBinary
format
Lenses WS client lib
Redux is currently supported via middleware. We are actively working on Angular support.
Install
npm i --save kafka-ws-js
If your browser doesn’t support Promises and fetch(), you will need the polyfills
import Promise from 'promise-polyfill';
import 'whatwg-fetch';
if (!window.Promise) {
window.Promise = Promise;
}
How to use
First setup the redux store as a Kafka middleware:
import { kafkaWsRedux } from 'kafka-ws-js';
function configureStore() {
const kafkaWsMiddleware = kafkaWsRedux.createWsMiddleware();
//Any other middleware you might use
const logger = createLogger();
const middleware = [logger, kafkaWsMiddleware];
const store = createStore(
rootReducer,
applyMiddleware(...middleware),
);
return store;
}
Or you can customise the middleware with custom options, in which case it will attempt to connect with:
import { kafkaWsRedux } from 'kafka-ws-js';
function configureStore() {
const kafkaOptions = {
host: 'cloudera02.landoop.com:24006/api/kafka/ws',
clientId: 'MyClient',
// See Options section for full list
};
const kafkaWsMiddleware = kafkaWsRedux.createWsMiddleware(kafkaOptions);
const middleware = [..., kafkaWsMiddleware];
const store = createStore(
rootReducer,
applyMiddleware(...middleware),
);
return store;
}
After, add the reducer:
import { combineReducers } from 'redux';
import { kafkaWsRedux } from 'kafka-ws-js';
import sessionReducer from './sessionReducer';
const rootReducer = combineReducers({
kafka: kafkaWsRedux.reducer,
//Add other application reducers where you listen to kafka actions
session: sessionReducer,
});
export default rootReducer;
Now you are ready to dispatch Actions that the middleware will intercept:
import { kafkaWsRedux } from 'kafka-ws-js';
const Actions = kafkaWsRedux.Actions;
dispatch(Actions.connect(options));
dispatch(Actions.publish(payload));
dispatch(Actions.subscribe(payload));
...
You can also listen to various Action Types dispatched by the middleware:
import { kafkaWsRedux } from 'kafka-ws-js';
const Actions = kafkaWsRedux.Type;
export const Type = {
KAFKA_MESSAGE
KAFKA_HEARTBEAT
CONNECT
CONNECT_SUCCESS
CONNECT_FAILURE
DISCONNECT
DISCONNECT_SUCCESS
DISCONNECT_FAILURE
PUBLISH
PUBLISH_SUCCESS
PUBLISH_FAILURE
SUBSCRIBE
SUBSCRIBE_SUCCESS
SUBSCRIBE_FAILURE
UNSUBSCRIBE
UNSUBSCRIBE_SUCCESS
UNSUBSCRIBE_FAILURE
};
...
Options Description
Passed when creating middleware or when dispatching connect actions.
Field | Type | Description |
---|---|---|
host | String | Web socket address, including port. If wss:// is not set, it will be added by the library. Example of address: test.landoop.com:21112/api/kafka/ws or wss://test.landoop.com:21112/api/kafka/ws |
clientId | String | Client Id. If previous session found, it will send back messages on topic subscription. |
authToken | String | Token used in order to authenticate kafka publish/subscribe/unsubscribe messages. |
authUrl | String | If no token is provided, this address will be used in order to retrieve token. ( provided the user and password are given ) |
user | String | User for Http authentication. |
password | String | Password for Http authentication. |
APIs
We strongly encourage you to use the JavaScript library we provide since it already implements the protocol required and is production ready. But should you chose not to you can find below the details required to implement a client able to use the API.
The API is straight forward, there is only one REST endpoint to connect to and open a WebSocket connection:
HTTP method | Path | Parameter |
---|---|---|
GET | /api/kafka/ws/$clientId | clientId - the unique identifier for the Kafka Consumer created behind the scenes |
Once the connection has been opened, the client needs to make sure it follows the protocol. Here is the template for each message the client can send to the back end:
{ "type":" SUBSCRIBE/UNSUBSCRIBE/PUBLISH/COMMIT/LOGIN", "content":"The json text for the specific request", "correlationId":1000, "authToken" : "Authorization token or empty" }
Field | Type | Description |
---|---|---|
type | String | Describes the action the back end will take in response to the request. The available values are:LOGIN , SUBSCRIBE , UNSUBSCRIBE , PUBLISH , COMMIT |
content | String | Contains the Json content of the actual request. The content is strictly related to the type described shortly. |
correlationId | Long | A unique identifier in order for the client to link the response with the request made. |
authToken | String | A unique token identifying the user making the request. This token can only be obtained once the LOGIN request has completed successfully. |
The response received from the back end follows this template:
{ "correlationId": Long, "type" : "ERROR/INVALIDREQUEST/KAFKAMSG/HEARTBEAT/SUCCESS", "content": String }
Field | Type | Description |
---|---|---|
correlationId | Long | The unique identifier the client has provided in the request associated with the response. |
type | String | Describes what response content the client has received. Available values are: ERROR , INVALIDREQUEST , KAFKAMSG , HEARTBEAT ,SUCCESS |
content | String | Contains the actual response content. Each response type has its own content layout. |
Protocol Definition
All requests made are constrained by user permissions on the back end. If the user has only Read
access
then publishing a record to a topic will not be allowed. If the user has only No-Data
user role, then
retrieving messages from Kafka will not be allowed either. See the
security
section for role definitions.
Login
The first thing to do when a WebSocket connection has been opened is to obtain an authorization token.
To do so the client will have to send the following LOGIN
request format:
{ "type" : "LOGIN", "content" : "{ "user" : String, "password" : String, }", "correlationId": Long, "authToken": String }
Field | Type | Description |
---|---|---|
content | String | Contains a json with two fields user and password to obtain the token for. |
correlationId | Long | A unique number the back end will send back as part of the response. |
authToken | String | For this request type the authorization token is not validated. |
Note: The content
field value is a string containing a JSON!
A successful login response will look like this:
{ "correlationId" : Long, "type" : "SUCCESS", "content" : String }
Field | Type | Description |
---|---|---|
content | String | Contains the authorization token |
correlationId | Long | A unique number sent in the request |
If the user or password provided is not correct, the client will receive an error response. In this case, the response format looks like this:
{ "correlationId": Long, "type" : "ERROR", "content": String }
Field | Type | Description |
---|---|---|
content | String | Contains the description error |
correlationId | Long | A unique number sent in the request |
Publishing
In order to publish a message to a topic the client has to send the following request:
{ "type" : "PUBLISH", "content" : "{ "topic" : String, "key" : String, "value" : String }", "correlationId": Long, "authToken": String }
Field | Type | Description |
---|---|---|
content | String | Contains a json with three fields: topic, key, and value. The last two fields are optional. Do not set the field if you want to send null values. |
correlationId | Long | A unique number the back end will send back as part of the response. |
authToken | String | The authorization token. The back end will check if the user roles allows such action. |
Note: Remember, the content for key/value are sent to the target Kafka topic are sent as String!
The content
field value is a string containing a json!
Subscription
To receive messages from a Kafka topic the client has to send a SUBSCRIBE
request.
{ "type" : "SUBSCRIBE", "content" : "{ "sqls" : [ String, String ] }", "correlationId" : Long, "authToken" : String }
Field | Type | Description |
---|---|---|
content | String | Contains a json with one field: SQLs. The field is and array of LSQL values. |
sqls | String[] | An array of LSQL values. The format is a SQL like syntax allowing you to use functions, filter and allows for field selection. See template below. |
correlationId | Long | A unique number the back end will send back as part of the response. |
authToken | String | The authorization token. The back end will check if the user roles allows such action. |
SELECT *
FROM $TOPIC
WHERE _ktype='INT/LONG/JSON/STRING/AVRO'
AND _vtype='INT/LONG/JSON/STRING/AVRO'
[AND ...]
You can provide more than one LSQL statement if you want to subscribe to more than 1 topic.
Please visit the Lenses
Kafka SQL section for full details on what it
supports. The response from the back end can be a SUCCESS
or an ERROR
.
Once the subscription has been successful, messages arriving in the Kafka topic(-s) and matching the filter will be delivered. A message received by the client will have this structure:
{ "content": [ { "key" : "...", "value" : "{...}", "topic" : "topicA", "partition" : Int, "offset" : Long, "timestamp" : Long }, .. ], "correlationId": Long, "type" : "KAFKAMSG" }
Field | Type | Description |
---|---|---|
content | String | Contains a Json with six fields: key, value, topic, partition, offset and timestamp. |
content.key | String | Contains the Kafka message key value. If the key is null, the field will not be present. |
content.value | String | Contains the Kafka message value part. If the value is null, the field will not be present. |
content.topic | String | Contains Kafka message topic name. |
content.partition | Int | Contains Kafka message partition number. |
content.offset | Long | Contains Kafka message offset. |
content.timestamp | Long | Contains the Kafka message timestamp. |
correlationId | Long | A unique number the back end will send back as part of the response. |
authToken | String | The authorization token. The back end will check if the user roles allows such action. |
Note: The timestamp
field requires Kafka 0.10.2+ and correct broker settings/or client publishing the timestamp.
Un-subscribe
A client can choose at any point to stop receiving messages from a given topic(-s). In order to do so it has to send the following message:
{ "type" :"UNSUBSCRIBE", "content": "{ "topics": [ "topic":String, .. ] }", "correlationId": Long, "authToken" : String, }
Field | Type | Description |
---|---|---|
content | String | Contains a Json with one field: topics. The field should contain an array of strings representing the topics to unsubscribe from. |
correlationId | Long | A unique number the back end will send back as part of the response. |
authToken | String | The authorization token. The back end will check if the user roles allows such action. |
Although the subscription allows you to specify via LSQL the partitions to subscribe to, the unsubscribe does not support selective partition dropping from the subscription.
Note: Executing a subscribe call with a new LSQL for a topic already in the subscription, will unsubscribe first and subscribe again.
Offsets Commit
The JavaScript client can decide when to commit the offset in Kafka. This way, when the client reopens a connection and resubscribes to the same Kafka topic it will receive the Kafka messages from where it left it.
To commit offsets the client has to send the following message structure:
{ "type" :"COMMIT", "content": "{ "commits": [ { "topic": String, "partition": Int, "offset" : Long }, ... ] }", "correlationId": Long, "authToken" : String }
Field | Type | Description |
---|---|---|
content | String | Contains a Json with one field: commits. The field should contain an array of elements with three fields: topic, partition and offset |
content.commits.topic | String | The Kafka topic to commit the offsets for |
content.commits.partition | Int | The Kafka topic partition to commit the offsets for |
content.commits.offset | Long | The offsets number to retain |
correlationId | Long | A unique number the back end will send back as part of the response |
authToken | String | The authorization token. The back end will check if the user roles allows such action |
Since the commits
field is an array, more than one (topic, partition, offset) tuple can be provided at once.
Note: The content field value is a string containing a json!
Heartbeat
The REST API makes sure it keeps the connection open in case there is no data going back and forth between the client and the back end. As a result the client should be able to handle messages with the following structure:
{ "type" : "HEARTBEAT" }
When such messages are received the client can discard them.
RoadMap
To be added in the near future:
- Finalize Angular implementation
- Support message batching
- Handle unexpected disconnects and reconnection attempt
- Add timeout and delay options
- Look into Rx.DOM.fromWebSocket for simplifying code