You are viewing documentation for an older version of Lenses.io View latest documentation here

REST API

Lenses provides a rich set of REST APIs that can be used to interact with Apache Kafka, topics, offsets, consumers as well as the micro-services of your data streaming platform. Lenses takes security as a first-class citizen and provides role-based access and auditing on APIs and protects sensitive data such as passwords.

Note: We recommend protecting via firewall rules direct access to any REST APIs (i.e. Schema Registry, Kafka Connect, Kubernetes) and then use Lenses APIs to take advantage of refined secure access policies.

To run the examples below, we recommend to install jq.

Headers 

  • x-kafka-lenses-token:myToken
  • content-type:application/json

Error Codes 

All APIs use a standard HTTP error codes for any requests that return an HTTP status indicating an error (4xx or 5xx statuses).

Authentication 

All REST APIs are protected via role-based authentication that is either BASIC or LDAP based, depending on how Lenses security has been setup. In order to be able to use the APIs, you will need to first authenticate via an appropriate user, then receive an access token and use that token for any subsequent request.

    POST /api/login

    Data Params:
        user: string
        password: string

    Error Code:
        401 UNAUTHORIZED

Example Request

# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X POST -H "Content-Type:application/json" -d '{"user":"admin",  "password":"XXXXX"}' ${HOST}/api/login --compress -s | jq -r .'token')
echo $TOKEN

Example Response

    {
        "success": true,
        "token": "a1f44cb8-0f37-4b96-828c-57bbd8d4934b",
        "user": {
            "id": "admin",
            "name": "Admin User",
            "email": null,
            "roles": ["admin", "read", "write", "nodata"]
        },
        "schemaRegistryDelete": true
    }

Note: Once an Access Token has been retrieved, it will need to be used in every subsequent request to any API call -H "X-Kafka-Lenses-Token:${TOKEN}"

Data API 

The REST APIs for getting Data allows you to get data from topics by sending LSQL Queries. You can also subscribe and produce messages to the Kafka Topic Live Stream via the Web Socket APIs

LSQL URL Encoding 

The LSQL statements need to be encoded in order to fire the request. You may want to follow standard instructions here for the encoding: https://www.w3schools.com/tags/ref_urlencode.asp Here is an example:

Assume the query bellow:

    SELECT * FROM `topicA`
    WHERE _vtype='AVRO'
    AND _ktype='AVRO'
    LIMIT 1000

The Encoded version will look like this:

    SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000

LSQL Validation 

    GET /api/sql/validation

    URL Params:
        sql: string (Encoded URL LSQL query), Required

Example Request

# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X GET -H "Content-Type:application/json" ${HOST}/api/sql/validation?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000 --compress -s | jq -r .'token')
echo $TOKEN

Example Response

    {
        "isValid": true,
        "line": 0,
        "column": 0,
        "message": null
    }

Example Error

    {
        "isValid": false,
        "line": 4,
        "column": 1,
        "message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n     ... "
    }

LSQL Get Data 

    GET /api/sql/data

    URL Params:
        sql: string (Encoded URL LSQL query), Required

Example Request

# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X GET -H "Content-Type:application/json" ${HOST}/api/sql/data?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000 --compress -s | jq -r .'token')
echo $TOKEN

Example Response

    {
        "messages": [
            {
                "timestamp": 1510605052735,
                "partition": 0,
                "key": "my-key",
                "offset": 0,
                "topic": "myTopic",
                "value": "my-value"
            },
            {
                ...
            }
        ],
        "offsets": [
            {
                "partition": 0,
                "min": 0,
                "max": 10000000
            },
            {
                ...
            }
        ]
    }

Example Error

    {
        "isValid": false,
        "line": 4,
        "column": 1,
        "message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n     ... "
    }

Kafka Topic API 

Create Kafka Topic API 

    POST /api/topics

    Data Params
        topicName: string, Required
        replication: int
        partitions: int
        configs: topic key - value

Example Request

    {
        "topicName": "topicA",
        "replication": 1,
        "partitions": 1,
        "configs": {
            "cleanup.policy": "compact",
            "compression.type": "snappy"
        }
    }

Delete Kafka Topic API 

    DELETE /api/topics/(string: topicName)

    Route Params:
        topicName: string

Update Kafka Topic config API 

    PUT /api/topics/config/(string: topicName)

    Route Params:
        topicName: string

    Data Params:
        configs: array of topic config key-values

Example Request

    PUT /api/topics/config/topicA

    {
        "configs": [{
            "key": "cleanup.policy",
            "value": "compact"
        }]
    }

Get Topic info API 

    GET api/topics/(string: topicName)

    Route Params:
        topicName: string

Example Response

    {
        "topicName": "topicA",
        "keyType": "AVRO",
        "valueType": "AVRO",
        "partitions": 1,
        "replication": 1,
        "isControlTopic": false,
        "keySchema": null
        "valueSchema": null,
        "messagesPerSecond": 0,
        "totalMessages": 1737056563,
        "timestamp": 1515415557251,
        "isMarkedForDeletion": false,
        "config": [{
            "configuration": "cleanup.policy",
            "value": "compact",
            "defaultValue": "delete",
            "documentation": "A string that is either \"delete\" or \"compact\". This string designates the retention policy to use on old log segments. The default policy (\"delete\") will discard old segments when their retention time or size limit has been reached. The \"compact\" setting will enable log compaction on the topic."
        }],
        "consumers": [],
        "messagesPerPartition": [{
            "partition": 0,
            "messages": 1737056563,
            "begin": 0,
            "end": 1737056563
        }]
    }

Kafka Streaming SQL API 

Before using the Processor API make sure you are aware of which mode your Lenses instance is running to execute the processors.

Create processor 

    POST /api/streams

    Data Params
        name: string, Required
        sql: string, Required
        runners: int
        clusterName: string, applies for scale modes
        namespace: string, applies for Kubernetes mode
        pipeline: string, applies for Kubernetes mode

Example Request

   {
        "name": "myProcessor",
        "sql": "SET `autocreate`=true;INSERT INTO `topicB` SELECT * FROM `topicA` WHERE  _ktype='BYTES' AND _vtype='AVRO'",
        "runners": 1,
        "clusterName": "myCluster",
        "namespace": "ns"
    }

Pause Processor 

    PUT /api/streams/(string: processorName)/pause

    Route Params
        processorName: string, Required

Resume Processor 

    PUT /api/streams/(string: processorName)/resume

    Route Params
        processorName: string, Required

Update Processor Runners 

    PUT /api/streams/(string: processorName)/scale/(int: numberOfRunners)

    Route Params
        processorName: string, Required
        numberOfRunners: int, Required

Delete Processor 

    DELETE /api/streams/(string: processorName)

    Route Params
        processorName: string, Required

Kafka Connect API 

Kafka Connect APIs are getting proxied via Lenses. In case multiple connect clusters are managed via Lenses you will need to include the alias used for this cluster as per your Lenses Configuration.

    # List active connectors
    GET /api/proxy-connect/(string: clusterAlias)/connectors

    # Create new connector
    POST /api/proxy-connect/(string: clusterAlias)/connectors [CONNECTOR_CONFIG]

    # Get information about a specific connector
    GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)

    # Get connector config
    GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config

    # Set connector config
    PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config

    # Get connector status
    GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/status

    # Pause a connector
    PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/pause

    # Resume a paused connector
    PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/resume

    # Restart a connector
    POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/restart

    # Get list of connector tasks
    GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks

    # Get current status of a task
    GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/status

    # Restart a connector task
    POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/restart

    # Remove a running connector
    DELETE /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)

    # List available connector plugins
    GET /api/proxy-connect/(string: clusterAlias)/connector-plugins

Kafka Schemas API 

The Schema Registry proxy API interact directly with the schema registry services and iterate over the available hosts and in case of a failure retries with the next available one

    # List all available subjects
    GET /api/proxy-sr/subjects

    # List all versions of a particular subject
    GET /api/proxy-sr/subjects/(string: subject)/versions

    # Delete a subject and associated compatibility level
    DELETE /api/proxy-sr/subjects/(string: subject)

    # Get the schema for a particular subject id
    GET /api/proxy-sr/schemas/ids/{int: id}

    # Get the schema at a particular version
    # - versionId string "latest" or numeric values between 1 and 2^31-1
    #   The string “latest” refers to the last registered schema under the specified subject.
    #   Note that there may be a new latest schema that gets registered right after this request is served.
    GET /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)

    # Register a new schema under a particular subject
    POST /api/proxy-sr/subjects/(string: subject)/versions

    # Delete a particular version of a subject
    DELETE /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)

    # Update global compatibility level
    PUT /api/proxy-sr/config

    # Get global compatibility level
    GET /api/proxy-sr/config

    # Change compatibility level of a subject
    PUT /api/proxy-sr/config/(string: subject)

    # Get compatibility level of a subject
    GET /api/proxy-sr/config/(string: subject)

Tip: You can still use the rest endpoints for Schema Registry and Kafka Connect directly. Lenses only proxies the queries to the correct endpoint by also tracking the relevant auditing information!