1.1
REST API
Lenses provides a rich set of REST APIs that can be used to interact with Apache Kafka, topics, offsets, consumers as well as the micro-services of your data streaming platform. Lenses takes security as a first-class citizen and provides role-based access and auditing on APIs and protects sensitive data such as passwords.
Note: We recommend protecting via firewall rules direct access to any REST APIs (i.e. Schema Registry, Kafka Connect, Kubernetes) and then use Lenses APIs to take advantage of refined secure access policies.
To run the examples below, we recommend to install jq
.
Headers
x-kafka-lenses-token:myToken
content-type:application/json
Error Codes
All APIs use a standard HTTP error codes for any requests that return an HTTP status indicating an error (4xx or 5xx statuses).
Authentication
All REST APIs are protected via role-based authentication that is either BASIC or LDAP based, depending on how Lenses security has been setup. In order to be able to use the APIs, you will need to first authenticate via an appropriate user, then receive an access token and use that token for any subsequent request.
POST /api/login Data Params: user: string password: string Error Code: 401 UNAUTHORIZED
Example Request
# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X POST -H "Content-Type:application/json" -d '{"user":"admin", "password":"XXXXX"}' ${HOST}/api/login --compress -s | jq -r .'token')
echo $TOKEN
Example Response
{ "success": true, "token": "a1f44cb8-0f37-4b96-828c-57bbd8d4934b", "user": { "id": "admin", "name": "Admin User", "email": null, "roles": ["admin", "read", "write", "nodata"] }, "schemaRegistryDelete": true }
Note: Once an Access Token has been retrieved, it will need to be used in every subsequent request to any API call
-H "X-Kafka-Lenses-Token:${TOKEN}"
Data API
The REST APIs for getting Data allows you to get data from topics by sending LSQL Queries. You can also subscribe and produce messages to the Kafka Topic Live Stream via the Web Socket APIs
LSQL URL Encoding
The LSQL statements need to be encoded in order to fire the request. You may want to follow standard instructions here for the encoding: https://www.w3schools.com/tags/ref_urlencode.asp Here is an example:
Assume the query bellow:
SELECT * FROM `topicA` WHERE _vtype='AVRO' AND _ktype='AVRO' LIMIT 1000
The Encoded version will look like this:
SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000
LSQL Validation
GET /api/sql/validation URL Params: sql: string (Encoded URL LSQL query), Required
Example Request
# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X GET -H "Content-Type:application/json" ${HOST}/api/sql/validation?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000 --compress -s | jq -r .'token')
echo $TOKEN
Example Response
{ "isValid": true, "line": 0, "column": 0, "message": null }
Example Error
{ "isValid": false, "line": 4, "column": 1, "message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n... " }
LSQL Get Data
GET /api/sql/data URL Params: sql: string (Encoded URL LSQL query), Required
Example Request
# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X GET -H "Content-Type:application/json" ${HOST}/api/sql/data?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000 --compress -s | jq -r .'token')
echo $TOKEN
Example Response
{ "messages": [ { "timestamp": 1510605052735, "partition": 0, "key": "my-key", "offset": 0, "topic": "myTopic", "value": "my-value" }, { ... } ], "offsets": [ { "partition": 0, "min": 0, "max": 10000000 }, { ... } ] }
Example Error
{ "isValid": false, "line": 4, "column": 1, "message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n... " }
Kafka Topic API
Create Kafka Topic API
POST /api/topics Data Params topicName: string, Required replication: int partitions: int configs: topic key - value
Example Request
{ "topicName": "topicA", "replication": 1, "partitions": 1, "configs": { "cleanup.policy": "compact", "compression.type": "snappy" } }
Delete Kafka Topic API
DELETE /api/topics/(string: topicName) Route Params: topicName: string
Update Kafka Topic config API
PUT /api/topics/config/(string: topicName) Route Params: topicName: string Data Params: configs: array of topic config key-values
Example Request
PUT /api/topics/config/topicA { "configs": [{ "key": "cleanup.policy", "value": "compact" }] }
Get Topic info API
GET api/topics/(string: topicName) Route Params: topicName: string
Example Response
{ "topicName": "topicA", "keyType": "AVRO", "valueType": "AVRO", "partitions": 1, "replication": 1, "isControlTopic": false, "keySchema": null "valueSchema": null, "messagesPerSecond": 0, "totalMessages": 1737056563, "timestamp": 1515415557251, "isMarkedForDeletion": false, "config": [{ "configuration": "cleanup.policy", "value": "compact", "defaultValue": "delete", "documentation": "A string that is either \"delete\" or \"compact\". This string designates the retention policy to use on old log segments. The default policy (\"delete\") will discard old segments when their retention time or size limit has been reached. The \"compact\" setting will enable log compaction on the topic." }], "consumers": [], "messagesPerPartition": [{ "partition": 0, "messages": 1737056563, "begin": 0, "end": 1737056563 }] }
Kafka Streaming SQL API
Before using the Processor API make sure you are aware of which mode your Lenses instance is running to execute the processors.
Create processor
POST /api/streams Data Params name: string, Required sql: string, Required runners: int clusterName: string, applies for scale modes namespace: string, applies for Kubernetes mode pipeline: string, applies for Kubernetes mode
Example Request
{ "name": "myProcessor", "sql": "SET `autocreate`=true;INSERT INTO `topicB` SELECT * FROM `topicA` WHERE _ktype='BYTES' AND _vtype='AVRO'", "runners": 1, "clusterName": "myCluster", "namespace": "ns" }
Pause Processor
PUT /api/streams/(string: processorName)/pause Route Params processorName: string, Required
Resume Processor
PUT /api/streams/(string: processorName)/resume Route Params processorName: string, Required
Update Processor Runners
PUT /api/streams/(string: processorName)/scale/(int: numberOfRunners) Route Params processorName: string, Required numberOfRunners: int, Required
Delete Processor
DELETE /api/streams/(string: processorName) Route Params processorName: string, Required
Kafka Connect API
Kafka Connect APIs are getting proxied via Lenses. In case multiple connect clusters are managed via Lenses you will need to include the alias used for this cluster as per your Lenses Configuration.
# List active connectors GET /api/proxy-connect/(string: clusterAlias)/connectors # Create new connector POST /api/proxy-connect/(string: clusterAlias)/connectors [CONNECTOR_CONFIG] # Get information about a specific connector GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName) # Get connector config GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config # Set connector config PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config # Get connector status GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/status # Pause a connector PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/pause # Resume a paused connector PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/resume # Restart a connector POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/restart # Get list of connector tasks GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks # Get current status of a task GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/status # Restart a connector task POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/restart # Remove a running connector DELETE /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName) # List available connector plugins GET /api/proxy-connect/(string: clusterAlias)/connector-plugins
Kafka Schemas API
The Schema Registry proxy API interact directly with the schema registry services and iterate over the available hosts and in case of a failure retries with the next available one
# List all available subjects GET /api/proxy-sr/subjects # List all versions of a particular subject GET /api/proxy-sr/subjects/(string: subject)/versions # Delete a subject and associated compatibility level DELETE /api/proxy-sr/subjects/(string: subject) # Get the schema for a particular subject id GET /api/proxy-sr/schemas/ids/{int: id} # Get the schema at a particular version # - versionId string "latest" or numeric values between 1 and 2^31-1 # The string “latest” refers to the last registered schema under the specified subject. # Note that there may be a new latest schema that gets registered right after this request is served. GET /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version) # Register a new schema under a particular subject POST /api/proxy-sr/subjects/(string: subject)/versions # Delete a particular version of a subject DELETE /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version) # Update global compatibility level PUT /api/proxy-sr/config # Get global compatibility level GET /api/proxy-sr/config # Change compatibility level of a subject PUT /api/proxy-sr/config/(string: subject) # Get compatibility level of a subject GET /api/proxy-sr/config/(string: subject)
Tip: You can still use the rest endpoints for Schema Registry and Kafka Connect directly. Lenses only proxies the queries to the correct endpoint by also tracking the relevant auditing information!