REST API

Lenses provides a rich set of REST APIs that can be used to interact with Apache Kafka, topics, offsets, consumers as well as the micro-services of your data streaming platform. Lenses takes security as a first-class citizen and provides role-based access and auditing on APIs and protects sensitive data such as passwords.

Note

Lenses exposes a single PORT and serves both the UI and the APIs from the same PORT. For example, if Lenses is accessible at http://localhost:3030 then the API is accessible under http://localhost:3030/api/xx using content-type:application/json

Error Codes

All APIs use a standard HTTP error codes for any requests that return an HTTP status indicating an error (4xx or 5xx statuses).

Authentication API

All requests must be authenticated using an HTTP Header x-kafka-lenses-token:myToken. You can obtain the token via the following login API or you can use a service account.

All REST APIs are protected via role-based authentication that is either BASIC or LDAP based, depending on how Lenses security has been set up. In order to be able to use the APIs, you will need to first authenticate via an appropriate user, then receive an access token and use that token for any subsequent request.

POST /api/login

Data Params

  • user, string
  • password, string

Error Code

401 UNAUTHORIZED

To run below example, we recommend installing the jq tool

Example Request

# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X POST -H "Content-Type:application/json" -d '{"user":"admin",  "password":"XXXXX"}' ${HOST}/api/login --compress -s | jq -r .'token')
echo $TOKEN

Example Response

{
    "success": true,
    "token": "a1f44cb8-0f37-4b96-828c-57bbd8d4934b",
    "user": {
        "id": "admin",
        "name": "Admin User",
        "email": null,
        "roles": ["admin", "read", "write", "nodata"]
    },
    "schemaRegistryDelete": true
}

Note

Once an Access Token has been retrieved, it will need to be used in every subsequent request to any API call -H "X-Kafka-Lenses-Token:${TOKEN}"

Topic API

Create Topic

POST /api/topics

Data Params

  • topicName, string, Required
  • replication, int
  • partitions, int
  • configs, key - value

Example Request

{
    "topicName": "topicA",
    "replication": 1,
    "partitions": 1,
    "configs": {
        "cleanup.policy": "compact",
        "compression.type": "snappy"
    }
}

Delete Topic

DELETE /api/topics/(string: topicName)

Route Params

  • topicName, string

Update Topic Configuration

PUT /api/topics/config/(string: topicName)

Route Params

  • topicName, string

Data Params

  • configs, an array of topic config key-values

Example Request

PUT /api/topics/config/topicA

{
    "configs": [{
        "key": "cleanup.policy",
        "value": "compact"
    }]
}

Get Topic information

GET api/topics/(string: topicName)

Route Params

  • topicName, string

Example Response

{
    "topicName": "topicA",
    "keyType": "AVRO",
    "valueType": "AVRO",
    "partitions": 1,
    "replication": 1,
    "isControlTopic": false,
    "keySchema": null
    "valueSchema": null,
    "messagesPerSecond": 0,
    "totalMessages": 1737056563,
    "timestamp": 1515415557251,
    "isMarkedForDeletion": false,
    "config": [{
        "configuration": "cleanup.policy",
        "value": "compact",
        "defaultValue": "delete",
        "documentation": "A string that is either \"delete\" or \"compact\". This string designates the retention policy to use on old log segments. The default policy (\"delete\") will discard old segments when their retention time or size limit has been reached. The \"compact\" setting will enable log compaction on the topic."
    }],
    "consumers": [],
    "messagesPerPartition": [{
        "partition": 0,
        "messages": 1737056563,
        "begin": 0,
        "end": 1737056563
    }]
}

Data API

The REST APIs for getting Data allows you to get data from topics by sending LSQL Queries. You can also subscribe and produce messages to the Kafka Topic Live Stream via the Web Socket APIs

LSQL URL Encoding

The LSQL statements need to be encoded in order to fire the request. You may want to follow standard instructions here for the encoding: https://www.w3schools.com/tags/ref_urlencode.asp Here is an example:

Assume the query below:

SELECT * FROM `topicA`
WHERE _vtype='AVRO'
AND _ktype='AVRO'
LIMIT 1000

The Encoded version will look like this:

SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000

LSQL Validation

GET /api/sql/validation

URL Params

  • sql, string (Encoded URL LSQL query), Required

Example Request

# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X GET -H "Content-Type:application/json" ${HOST}/api/sql/validation?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000 --compress -s | jq -r .'token')
echo $TOKEN

Example Response

{
    "isValid": true,
    "line": 0,
    "column": 0,
    "message": null
}

Example Error

{
    "isValid": false,
    "line": 4,
    "column": 1,
    "message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n    <EOF> ... "
}

LSQL Get Data

GET /api/sql/data

URL Params

  • sql, string (Encoded URL LSQL query), Required
  • stats, integer, stats are enabled and pushed out every x(stats=4000 means every 4 seconds), Optional
  • offsets, boolean, the end record will pull the offsets for the topic and will be attached to the end record, Optional

How to make a Request

The client should be configured to expect Server Side Events

"Accept": "application/json, text/event-stream"
GET /api/sql/data?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000&stats=4000&offsets=true

The client should wait(infinite loop/forever) for incoming messages and stop when EOF.

Each message starts with "data:" followed by a number;the number indicates the type of the data received(see below). Based on the message type number, you should handle each message differently. Here are the possible values for the message type:

  • 0 = heartbeat, just to keep the connection open, you must SKIP this (add continue), i.e data:0
  • 1 = the actual JSON record which contains some metadata and the value string field which is the data coming from the topic, i.e data:1{timestamp:..., partition:..., value:"{...}"}
  • 2 = the stop JSON message, there you can exit the loop, it contains some useful fields as well, i.e data:2{isTimeRemaining:true/false, size:...., offsets:...}
  • 3 = the error JSON message (if any), report back to the user or log it somewhere, i.e data:3{fromLine:...,toLine:...,fromColumn:...,toColumn:...,error:...}
You can use Lenses-CLI code to see the response data structure:

Example Response for a record(message type 1)

data:1{"timestamp":1532460006300,"partition":2,"key":"{\"key\":\"value\"}","offset":560207858,"topic":"","value":"{the fetched data}"}

Example Response for stop(message type 2)

data:2{"size":2062,"isTimeRemaining":true,"offsets":[{"partition":2,"min":560207858,"max":568636162},{"partition":1,"min":338050641,"max":347448266},{"partition":0,"min":355312551,"max":364217747}],"isStopped":false,"totalRecords":3,"isTopicEnd":false,"skippedRecords":0,"recordsLimit":3,"totalSizeRead":2062}

Example Response for an error(message type 3)

data:3{"fromLine":4,"toLine":4,"fromColumn":1,"toColumn":1,"error":"Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n    <EOF> ... "}

Note

Follow the implementation for Go or Python for more details

Processor API

Before using the Processor API make sure you are aware of which mode your Lenses instance is running to execute the processors (Lenses Configuration).

Get All Processors

GET /api/streams

Example response

{
    [
        "targets" : {
            "cluster" : "minikube",
            "version" : null,
            "namespaces" : [ "lenses-sql" ]
        },
        "streams" : {
            "id": "lsql_c7f177994db640cab95a99900c4fb7e7",
            "name": "cc_payments_freq_1min",
            "clusterName": "MINISHIFT",
            "user": "admin",
            "namespace": "lenses-sql",
            "uptime": 3387090,
            "sql": "SET autocreate=true;SET `commit.interval.ms`='120000';SET `auto.offset.reset`='latest'; INSERT INTO `cc_payments_1m_freq` SELECT STREAM COUNT(*) as `count` FROM `cc_payments`WHERE _ktype='BYTES' and _vtype='BYTES' GROUP BY tumble(1, m)",
            "runners": 1,
            "deploymentState": "RUNNING",
            "topicValueDecoder": "LONG",
            "pipeline": "lsql",
            "startTs": 1522180383208,
            "toTopic": "cc_payments_1m_freq",
            "runnerState": {
                "e94433cf-cf99-4e46-9adc-407bb06b35c2": {
                "id": "e94433cf-cf99-4e46-9adc-407bb06b35c2",
                "worker": "worker1",
                "state": "RUNNING",
                "errorMsg": ""
                }
            }
        }
    ]
}

Get a Processor

Retrieve a specific processor.

GET /api/streams/(string: processorId)

Route Params

  • processorId is the id of the processor

The response is a SqlStream.

Example response

{
    "streams" : {
        "id": "lsql_c7f177994db640cab95a99900c4fb7e7",
        "name": "cc_payments_freq_1min",
        "clusterName": "MINISHIFT",
        "user": "admin",
        "namespace": "lenses-sql",
        "uptime": 3387090,
        "sql": "SET autocreate=true;SET `commit.interval.ms`='120000';SET `auto.offset.reset`='latest'; INSERT INTO `cc_payments_1m_freq` SELECT STREAM COUNT(*) as `count` FROM `cc_payments`WHERE _ktype='BYTES' and _vtype='BYTES' GROUP BY tumble(1, m)",
        "runners": 1,
        "deploymentState": "RUNNING",
        "topicValueDecoder": "LONG",
        "pipeline": "lsql",
        "startTs": 1522180383208,
        "toTopic": "cc_payments_1m_freq",
        "runnerState": {
            "e94433cf-cf99-4e46-9adc-407bb06b35c2": {
            "id": "e94433cf-cf99-4e46-9adc-407bb06b35c2",
            "worker": "worker1",
            "state": "RUNNING",
            "errorMsg": ""
            }
        }
    }
}

Create Processor

POST /api/streams

Data Params

  • name, string, Required
  • sql, string, Required
  • runners, int
  • clusterName, string, used for both Kubernetes and Connect modes
  • namespace, string, used only for Kubernetes mode
  • pipeline, string, used only for Kubernetes mode

Example Request

{
     "name": "myProcessor",
     "sql": "SET `autocreate`=true;INSERT INTO `topicB` SELECT * FROM `topicA` WHERE  _ktype='BYTES' AND _vtype='AVRO'",
     "runners": 1,
     "clusterName": "myCluster",
     "namespace": "ns"
 }

Response

The response includes the id for the processor.

Pause Processor

PUT /api/streams/(string: processorId)/pause

Route Params

  • processorId, string, Required

Resume Processor

PUT /api/streams/(string: processorId)/resume

Route Params

  • processorId, string, Required

Update Processor Runners

PUT /api/streams/(string: processorId)/scale/(int: numberOfRunners)

Route Params

  • processorId, string, Required
  • numberOfRunners, int, Required

Delete Processor

DELETE /api/streams/(string: processorId)

Route Params

  • processorId, string, Required

Access Control Lists API

Kafka Access Control Lists can be viewed and set via the rest endpoints.

Create/Update

PUT /api/acl

Data Params

  • resourceType, string, required
  • resourceName, string, required
  • principal, string, required
  • permissionType, string, required (either Allow or Deny)
  • host, string, required
  • operation, string, required

ACL Operations

The following operations are valid (depending on the Kafka version)

Resource Type Operation
Topic Read
Topic Write
Topic Describe
Topic Delete
Topic DescribeConfigs
Topic AlterConfigs
Topic All
Group Read
Group Describe
Group All
Cluster Create
Cluster ClusterAction
Cluster DescribeConfigs
Cluster AlterConfigs
Cluster IdempotentWrite
Cluster Alter
Cluster Describe
Cluster All
TransactionalId Describe
TransactionalId Write
TransactionalId All

Example Request

Allow read from any host on topic transactions for principal principalType:principalName.

{
    "resourceType": "Topic",
    "resourceName": "transactions",
    "principal": "principalType:principalName" ,
    "permissionType": "Allow",
    "host": "*",
    "operation": "Read"
}

Get ACLs

GET /api/acl

Example Response

[{
    "resourceType": "Topic",
    "resourceName": "transactions",
    "principal": "principalType:principalName" ,
    "permissionType": "Allow",
    "host": "*",
    "operation": "Read"
}]

Delete ACLs

DELETE /api/acl

Example Request

{
    "resourceType": "Topic",
    "resourceName": "transactions",
    "principal": "principalType:principalName" ,
    "permissionType": "Allow",
    "host": "*",
    "operation": "Read"
}

Alert API

Alerts are divided into two categories:

  • Infrastructure - These are out of the box alerts that can be toggled on and off
  • Consumer group - These are user-defined alerts on consumer groups

Alert notifications are the result of an Alert Setting Condition being met on an Alert Setting.

Get All Alert Settings

Retrieves the configured alert settings

GET /api/alerts/settings

Single AlertSetting structure.

  • conditions is the condition to be met for the alert to be fired. Type of a Map[string][string], can be empty
  • conditionRegex is the regular expression to validate the condition set based on the conditionTemplate. Type of string, can be empty
  • description is the description of the alert. Type of string, cannot be empty
  • enabled is a boolean flag to indicate if this alert is enabled
  • docs is a documentation link. Type of string, can be empty
  • id is the id of this alert. Type of int, must be positive
  • category is the category of the alert, either Infrastructure or Consumers
  • isAvailable indicates whether or not the alert is available, because some alerts need JMX enabled (If JMX not available, some alerts are disabled). Type of boolean, true/false
  • conditionTemplate is an optional string template for the condition, .e.g lag >= $Threshold-Number on group $Consumer-Group and topic $Topic-Name

Example Response

{
  "categories": {
    "Infrastructure": [
      {
        "id": 1,
        "description": "License is invalid",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1000,
        "description": "Kafka Broker is down",
        "category": "Infrastructure",
        "enabled": false,
        "isAvailable": true
      },
      {
        "id": 1001,
        "description": "Zookeeper Node is down",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1002,
        "description": "Connect Worker is down",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1003,
        "description": "Schema Registry is down",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1004,
        "description": "Alert Manager is down",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1005,
        "description": "Under replicated partitions",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1006,
        "description": "Partitions offline",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1007,
        "description": "Active Controllers",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1008,
        "description": "Multiple Broker Versions",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1009,
        "description": "File-open descriptors high capacity on Brokers",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1010,
        "description": "Average % the request handler is idle",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1011,
        "description": "Fetch requests failure",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1012,
        "description": "Produce requests failure",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1013,
        "description": "Broker disk usage is greater than the cluster average",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1014,
        "description": "Leader Imbalance",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      }
    ],
    "Consumers": [
      {
        "id": 2000,
        "description": "Consumer Lag exceeded",
        "category": "Consumers",
        "enabled": true,
        "docs": "Raises an alert when the consumer lag
exceeds the threshold",
        "conditionTemplate": "lag >= $Threshold-Number
on group $Consumer-Group and topic $Topic-Name",
        "conditionRegex": "lag >= ([1-9][0-9]*) on group (\\b\\S+\\b) and topic (\\b\\S+\\b)",
        "conditions": {
          "28bbad2b-69bb-4c01-8e37-28e2e7083aa9": "lag
>= 100000 on group group and topic topicA",
          "4318a5a7-32e4-43af-9f3f-438e64d14a11": "lag
>= 1000000 on group consumerA and topic topicA",
          "92dd89a1-83c0-4251-8610-36fce780a824": "lag
>= 1000000 on group minikube.default.london-keyword-count and topic reddit_posts",
          "bdb01792-b79f-44d0-a39e-7f3e0906a89b": "lag
>= 100000 on group connect-nullsink and topic topicA",
          "f4dd550f-bcb6-4e02-b28b-18bbd426aaf2": "lag
>= 100000 on group groupA and topic topicA"
        },
        "isAvailable": true
      }
    ]
  }
}

Get an Alert Setting

Retrieves the specified alert setting

GET /api/alerts/settings/{alert_setting_id: Int}

Route Params

  • alert_setting_id is the alert setting id to retrieve. Type of Int

For more information on available identifiers, see: Alert Identifiers .

The response is AlertSetting.

  • conditions is the condition to be met for the alert to be fired. Type of a Map[string][string], can be empty
  • conditionRegex is the regular expression to validate the condition set based on the conditionTemplate. Type of string, can be empty
  • description is the description of the alert. Type of string, cannot be empty
  • enabled is a boolean flag to indicate if this alert is enabled
  • docs is a documentation link. Type of string, can be empty
  • id is the id of this alert. Type of int, must be positive
  • category is the category of the alert, either Infrastructure or Consumers
  • isAvailable indicates whether or not the alert is available, because some alerts need JMX enabled (If JMX not available, some alerts are disabled). Type of boolean, true/false
  • conditionTemplate is an optional string template for the condition, .e.g lag >= $Threshold-Number on group $Consumer-Group and topic $Topic-Name

Example Response

{
  "id": 2000,
  "description": "Consumer Lag exceeded",
  "category": "Consumers",
  "enabled": true,
  "docs": "Raises an alert when the consumer lag exceeds the threshold",
  "conditionTemplate": "lag >= $Threshold-Number on group $Consumer-Group and topic $Topic-Name",
  "conditionRegex": "lag >= ([1-9][0-9]*) on group (\\b\\S+\\b) and topic (\\b\\S+\\b)",
  "conditions": {
    "28bbad2b-69bb-4c01-8e37-28e2e7083aa9": "lag >= 100000 on group group and topic topicA",
    "4318a5a7-32e4-43af-9f3f-438e64d14a11": "lag >= 1000000 on group consumerA and topic topicA",
    "92dd89a1-83c0-4251-8610-36fce780a824": "lag >= 1000000 on group minikube.default.london-keyword-count and topic reddit_posts",
    "bdb01792-b79f-44d0-a39e-7f3e0906a89b": "lag >= 100000 on group connect-nullsink and topic topicA",
    "f4dd550f-bcb6-4e02-b28b-18bbd426aaf2": "lag >= 100000 on group groupA and topic topicA"
  },
  "isAvailable": true
}

Enabled an Alert Setting

Enables an alert setting

PUT /api/alerts/settings/{alert_setting_id: Int}

Route Params

  • alert_setting_id is the alert setting to enable. Type of Int

For more information on available identifiers, see: Alert Identifiers .

Get Alert Setting Conditions

Retrieves the conditions for an alert setting

GET /api/alerts/settings/{alert_setting_id: Int}/condition

Route Params

  • alert_setting_id the alert setting id to get the condition for. Type of Int

For more information on available identifiers, see: Alert Identifiers .

Example Response

{
    "28bbad2b-69bb-4c01-8e37-28e2e7083aa9": "lag >= 100000 on group group and topic topicA",
    "4318a5a7-32e4-43af-9f3f-438e64d14a11": "lag >= 1000000 on group consumerA and topic topicA",
    "92dd89a1-83c0-4251-8610-36fce780a824": "lag >= 1000000 on group minikube.default.london-keyword-count and topic reddit_posts",
    "bdb01792-b79f-44d0-a39e-7f3e0906a89b": "lag >= 100000 on group connect-nullsink and topic topicA",
    "f4dd550f-bcb6-4e02-b28b-18bbd426aaf2": "lag >= 100000 on group groupA and topic topicA"
}

Create/Update an Alert Setting Condition

POST /api/alerts/settings/{alert_setting_id: Int}/condition/{condition_uuid: String}

Route Params

The request is a plain text expression for the specific condition:

  • alert_setting_id is the alert setting id to set the condition for. Type of Int
  • condition_uuid is the condition id to update. Type of String

For more information on available identifiers, see: Alert Identifiers .

Example Request: lag >= 100000 on group group and topic topicA

Delete an Alert Setting Condition

DELETE /api/alerts/settings/{alert_setting_id: Int}/condition/{condition_uuid: String)

Data Params

  • alert_setting_id is the alert setting id to delete the condition for. Type of Int
  • condition_uuid the condition id to delete. Type of String

For more information on available identifiers, see: Alert Identifiers .

Get All Alert Notifications

GET /api/alerts

The response is List of Alerts.

  • labels is a list of key-value pairs. It will contain at least severity
  • annotations is a list of key-value pairs. The keys will contain the summary, source, and docs keys.
  • startsAt is the time, in ISO format, for when the alert starts
  • endsAt is the time the alert ended at
  • generatorURL is a unique URL identifying the creator of this alert. It matches AlertManager requirements for providing this field
  • alertId is a unique identifier for the setting corresponding to this alert. See id in the alert settings API.

Example Response

[{
    "endsAt": null,
    "startsAt": "2018-03-27T21:23:23.634+02:00",
    "alertId": 1000,
    "annotations": {
        "source": "",
        "summary": "Broker on 1 is down"
    },
    "labels": {
        "category": "Infrastructure",
        "severity": "HIGH",
        "instance": "instance101"
    },
    "generatorURL": "http://lenses"
}]

Alert Identifiers

License

Alert Identifier Description
1 License is invalid

Infrastructure

Alert Identifier Description
1000 Kafka Broker is down
1001 Zookeeper Node is down
1002 Connect Worker is down
1003 Schema Registry is down
1004 Alert Manager is down
1005 Under replicated partitions
1006 Partitions offline
1007 Active Controllers
1008 Multiple Broker Versions
1009 File-open descriptors high capacity on Brokers
1010 Average % the request handler is idle
1011 Fetch requests failure
1012 Produce requests failure
1013 Broker disk usage is lower than the average of the cluster
1014 Leader Imbalance

Consumers

Alert Identifier Description
2000 Consumer Lag exceeded

Connect

Alert Identifier Description
3000 Connector deleted

Topics

Alert Identifier Description
4000 Topic has been created
4001 Topic has been deleted

Alert SSE API

Alert notifications can be pushed in real-time to the client via a Send Server Event endpoint.

GET /api/sse/alerts

The response is text/event-stream of Alerts. Keep alive empty heartbeats are sent every 10 seconds. It sends data:{alertId: …, labels: …}, the Alert is after the data: message prefix. When empty data: then it means stop, if empty messages received or the length is smaller than 5 then ignore the incoming message, it’s the heartbeat.

  • labels is a list of key-value pairs. It will contain at least severity
  • annotations is a list of key-value pairs. The keys will contain the summary, source, and docs keys.
  • startsAt is the time, in ISO format, for when the alert starts
  • endsAt is the time the alert ended at
  • generatorURL is a unique URL identifying the creator of this alert. It matches AlertManager requirements for providing this field
  • alertId is a unique identifier for the setting corresponding to this alert. See id in the alert settings API.

Example Response

data:{
    "endsAt": null,
    "startsAt": "2018-03-27T21:23:23.634+02:00",
    "alertId": 1000,
    "annotations": {
        "source": "",
        "summary": "Broker on 1 is down"
    },
    "labels": {
        "category": "Infrastructure",
        "severity": "HIGH",
        "instance": "instance101"
    },
    "generatorURL": "http://lenses"
}

Prometheus API

Prometheus can be used to poll metric information from infrastructure services.

Some important metrics, such as consumer lag are not exposed by Kafka. Lenses provides an additional Prometheus API available at http://lenseshost:port/metrics to be added to the Prometheus targets in order to bring in additional critical monitoring information. Authentication is not required, so that Prometheus can freely poll this API.

GET /metrics

The response is a List of prometheus entries, for the consumer lag per partition and the aggregated lag per topic.

lenses_partition_consumer_lag{topic="iot_data",partition="3",consumerGroup="my.group.a"} 537
lenses_topic_consumer_lag{topic="iot_data",consumerGroup="my.group.a"} 4176

Quota API

Quotas can be viewed and set by the rest endpoints.

Get Quotas

GET /api/quotas

The response is List of Quotas.

  • entityType can be either CLIENT, CLIENTS, CLIENTS DEFAULT, USER, USERS, USERCLIENT or USERS DEFAULT
  • entityName is Kafka client id for CLIENT and CLIENTS and user name for USER, USER and USERCLIENT
  • child is optional and only present for entityType USERCLIENT and is the client id
  • properties is a map of the quota constraints, producer_byte_rate, consumer_byte_rate and request_percentage
  • url from this quota in Lenses

Example Response

[
    {
        "entityType": "CLIENT",
        "entityName" : "my-client-id",
        "properties": {
                        "producer_byte_rate" : "100000",
                        "consumer_byte_rate" : "200000",
                        "request_percentage" : "75"
                        },
        "url" : "/api/quotas/clients"
    }
]

Create/Update Quota - All Users

Default for all users.

PUT /api/quotas/users

Data Params

  • config The quota contraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update Quota - User all Clients

Create/update for all client ids for a particular user.

PUT /api/quotas/users/(string: user)/clients

Route Params

  • user The user to set the quota for

Data Params

  • config The quota contraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update a Quota - User/Client pair

Quotas for a user and client id pair.

PUT /api/quotas/users/(string: user)/clients/(string: client-id)

Route Params

  • user The user to set the quota for
  • client-id The client id to set the quota for

Data Params

  • config The quota contraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update a Quota - User

Quota for a user.

PUT /api/quotas/users/(string: user)

Route Params

  • user The user to set the quota for

Data Params

  • config The quota contraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update Quota - All Clients

Default for all clients.

PUT /api/quotas/clients

Data Params

  • config The quota contraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update a Quota - Client

Quotas for a client id.

PUT /api/quotas/clients/(string: client-id)

Route Params

  • client-id The client id to set the quota for

Data Params

  • config The quota contraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Delete Quota - All Users

Delete default for all users.

DELETE /api/quotas/users

Data Params

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete Quota - User all Clients

Delete for all client ids for a user.

DELETE /api/quotas/users/(string: user)/clients

Route Params

  • user The user to delete the quota for

Data Params

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete a Quota - User/Client pair

Delete quotas for a user and client id pair.

DELETE /api/quotas/users/(string: user)/clients/(string: client-id)

Route Params

  • user The user to delete the quota for
  • client-id The client id to delete the quota for

Data Params

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete a Quota - User

Delete a quota for a user.

DELETE /api/quotas/users/(string: user)

Route Params

  • user The user to delete the quota for

Data Params

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete Quota - All Clients

Delete defaults for all clients.

DELETE /api/quotas/clients

Data Params

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete a Quota - Client

Delete quotas for a client id.

DELETE /api/quotas/clients/(string client-id)

Route Params

  • client-id The client id to delete the quota for

Data Params

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Quotas Precedence

Quotas specified using the above rest endpoints, create entries in zookeeper. The paths in zookeeper dictate the following order of precedence:

  1. /config/users/<user>/clients/<client-id>
  2. /config/users/<user>/clients/<default>
  3. /config/users/<user>
  4. /config/users/<default>/clients/<client-id>
  5. /config/users/<default>/clients/<default>
  6. /config/users/<default>
  7. /config/clients/<client-id>
  8. /config/clients/<default>

Tip

The highest priority quota is always enforced. To use an example if clientA has a client quota of 10MB and userA has a user quota 20MB and an application is running under clientA-userA then the quota will be 20MB because the user quota is of highest priority.

Connector API

Kafka Connect APIs are getting proxied via Lenses. In case multiple connect clusters are managed via Lenses you will need to include the alias used for this cluster as per your Lenses Configuration.

# List active connectors
GET /api/proxy-connect/(string: clusterAlias)/connectors

# Create new connector
POST /api/proxy-connect/(string: clusterAlias)/connectors [CONNECTOR_CONFIG]

# Get information about a specific connector
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)

# Get connector config
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config

# Set connector config
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config

# Get connector status
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/status

# Pause a connector
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/pause

# Resume a paused connector
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/resume

# Restart a connector
POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/restart

# Get list of connector tasks
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks

# Get current status of a task
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/status

# Restart a connector task
POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/restart

# Remove a running connector
DELETE /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)

# List available connector plugins
GET /api/proxy-connect/(string: clusterAlias)/connector-plugins

Schemas API

The Schema Registry proxy API interact directly with the schema registry services and iterate over the available hosts and in case of a failure retries with the next available one

# List all available subjects
GET /api/proxy-sr/subjects

# List all versions of a particular subject
GET /api/proxy-sr/subjects/(string: subject)/versions

# Delete a subject and associated compatibility level
DELETE /api/proxy-sr/subjects/(string: subject)

# Get the schema for a particular subject id
GET /api/proxy-sr/schemas/ids/{int: id}

# Get the schema at a particular version
# - versionId string "latest" or numeric values between 1 and 2^31-1
#   The string “latest” refers to the last registered schema under the specified subject.
#   Note that there may be a new latest schema that gets registered right after this request is served.
GET /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)

# Register a new schema under a particular subject
POST /api/proxy-sr/subjects/(string: subject)/versions

# Delete a particular version of a subject
DELETE /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)

# Update global compatibility level
PUT /api/proxy-sr/config

# Get global compatibility level
GET /api/proxy-sr/config

# Change compatibility level of a subject
PUT /api/proxy-sr/config/(string: subject)

# Get compatibility level of a subject
GET /api/proxy-sr/config/(string: subject)

Tip

You can still use the rest endpoints for Schema Registry and Kafka Connect directly. Lenses only proxies the queries to the correct endpoint by also tracking the relevant auditing information!

Logs API

GET api/logs/INFO
GET api/logs/METRICS

Example Response

A list of Log lines.

[
    {
        "timestamp": 1532570466516,
        "logger": "akka.actor.ActorSystemImpl",
        "stacktrace": "",
        "thread": "default-akka.actor.default-dispatcher-3",
        "message": "Request: POST->http://localhost:24015/api/login returned 200 OK in 3ms",
        "time": "2018-07-26 04:01:06.516",
        "level": "INFO"
    },
    {
        "timestamp": 1532570466601,
        "logger": "akka.actor.ActorSystemImpl",
        "stacktrace": "",
        "thread": "default-akka.actor.default-dispatcher-3",
        "message": "Request: GET->http://localhost:24015/api/auth returned 200 OK in 2ms",
        "time": "2018-07-26 04:01:06.601",
        "level": "INFO"
    }
]

Note

The data structure is the same for both INFO and METRICS. Follow the implementation for Go for more details

Permissions Matrix

Data API

Method - Endpoint Admin Write Read NoData
GET /api/sql
GET /api/sql/data/ws
GET /api/sql/validation
GET /api/sql/active      
GET /api/sql/cancel?sql={id}      

Topic API

Method - Endpoint Admin Write Read NoData
GET /api/topics/defaultConfigs
GET /api/topics/{topicName}/brokerConfigs
POST /api/topics/    
PUT /api/topics/config    
DELETE /api/topics/{topicName}      
GET /api/topics/{topicName}/partitions
GET /api/topics/{topicName}
GET /api/topics/availableConfigKeys

Processor API

Method - Endpoint Admin Write Read NoData
GET /api/streams
GET /api/validation/streams
POST /api/streams    
GET /api/streams/{id}
GET /api/streams/{id}/metrics
DELETE /api/streams/{id}    
PUT /api/streams/[id]/pause    
PUT /api/streams/{id}/resume    
PUT /api/streams/{id}/scale/{scale}    

Connector API

Method - Endpoint Admin Write Read NoData
GET /api/proxy-connect/{clusterAlias}/connectors
POST /api/proxy-connect/{clusterAlias}/connectors [CONNECTOR_CONFIG]    
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/config
PUT /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/config    
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/status
PUT /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/pause    
PUT /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/resume    
POST /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/restart    
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/tasks
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/tasks/{task_id}/status
POST /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/tasks/{task_id}/restart    
DELETE /api/proxy-connect/{clusterAlias}/connectors/{connectorName}    
GET /api/proxy-connect/{clusterAlias}/connector-plugins

Schema API

Method - Endpoint Admin Write Read NoData
GET /api/proxy-sr/subjects
GET /api/proxy-sr/subjects/{subject}/versions
DELETE /api/proxy-sr/subjects/{subject}    
GET /api/proxy-sr/schemas/ids/{id}
GET /api/proxy-sr/subjects/{subject}/versions/{version}
POST /api/proxy-sr/subjects/{subject}/versions    
DELETE /api/proxy-sr/subjects/{subject}/versions/{version}    
PUT /api/proxy-sr/config    
GET /api/proxy-sr/config
PUT /api/proxy-sr/config/{subject}    
GET /api/proxy-sr/config/{subject}

ACL API

Method - Endpoint Admin Write Read NoData
GET /api/acl
PUT /api/acl    
DELETE /api/acl    

Quota API

Method - Endpoint Admin Write Read NoData
GET /api/quotas  
PUT /api/quotas/users    
DELETE /api/quotas/users    
PUT /api/quotas/clients    
DELETE /api/quotas/clients    
PUT /api/quotas/users/{userName}    
DELETE /api/quotas/users/{userName}    
PUT /api/quotas/clients/{clientId}    
DELETE /api/quotas/clients/{clientId}    
PUT /api/quotas/users/{userName}/clients    
DELETE /api/quotas/users/{userName}/clients    
PUT /api/quotas/users/{userName}/clients/{clientId}    
DELETE /api/quotas/users/{userName}/clients/{clientId}    

Alerts API

Method - Endpoint Admin Write Read NoData
GET /api/alerts
POST /api/alerts    
GET /api/alerts/settings    
GET /api/alerts/settings/{id}
PUT /api/alerts/settings/{id}    
GET /api/alerts/settings/{id}/condition
POST /api/alerts/settings/{id}/condition    
DELETE /api/alerts/settings/{id}/condition/{id}    

Security Concerns

We recommend protecting via firewall rules direct access to any REST APIs (i.e. Schema Registry, Kafka Connect, Kubernetes) and then use Lenses APIs to take advantage of refined secure access policies.