Clients / Python

The Lenses python-library is a Python client enabling Python developers and data scientists to take advantage of the Rest and WebSocket endpoints Lenses exposes. Users can:

  • Manage topics
  • Manage schemas
  • Manage processors
  • Manage connectors
  • Browse topic data via SQL
  • Subscribe to live continuous SQL queries via SQL

Installation

Dependencies

Install Pip3

curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

python3.5 get-pip.py
rm -f get-pip.py

Install Lenses Python

Clone the Python repo and execute the following command inside the repo:

python3 setup.py install

You should have already install Python3 on your system:

Or just use the pip3:

pip3 install lenses_python

Getting started

Basic authentication

First, we import the Python 3 client:

from lenses_python.lenses import lenses
data=lenses(<url>,<username>,<password>)

Next get the credentials and the roles for our account:

data.GetCredentials()

Example code:

data=lenses("http://127.0.0.1:3030","admin","admin")
data.GetCredentials()
{
  'schemaRegistryDelete': True,
   'success': True,
   'token': 'c439f615-e511-4dfd-863b-76ad285b7572',
   'user': {
     'email': None,
     'id':'admin',
     'name': 'Lenses Admin',
     'roles': ['admin', 'read', 'write', 'nodata']
   }
}

Kerberos authentication

In this case we will connect with Kerberos. First of all you should install the kinit and set the configuration in path /etc/krb5.conf. Then you should run the kinit utility to create a Kerberos token, which will later be retrieved by the Python 3 library.

from lenses_python.lenses import lenses
data = lenses(url=<url>,kerberos_mode=1)

SQL Data Handlers

Using the Lenses SQL Engine we can utilize Lenses rest endpoints to execute SQL queries on Kafka topics. This is achieved using the SqlHandler.

data.SqlHandler(<query>, <optional argument is_extract_pandas>,<optional argument stats>, <optional datetimelist>, <optional formatinglist>)

If we do not supply the is_extract_pandas parameter the output will be in JSON format, the value of this parameter is True or False. Otherwise, the output will be pandas data frame. The stats parameter is an integer. If we want the output as pandas data frames, we have the option to use the two optional arguments datetimelist and formatinglist. With these arguments, we convert the datetime strings to datetime objects.

  • datetimelist - is a list which contains all the keys including datetime strings
  • formatinglist - is a list which contains the date format for each element in the list datetimelist

If all the formatting of dates is same, we put only one in formatinglist.

For more info about the format check this page http://strftime.org/.

Example of use arguments datetimelist and formatinglist,

data.SqlHandler(
  'SELECT * FROM `nyc_yellow_taxi_trip_data`',
  ['tpep_dropoff_datetime'],
  ['%Y-%m-%d %H:%M:%S'])

Managing Topics

Topics Information

To list topics:

data.TopicsNames()

To obtain the information for a specific topic and return a dictionary:

data.TopicInfo(<topic name>)

If we want to list all topic information, we execute the following command:

data.GetAllTopics()

The output of this is a list of JSON records, with all topics information.

Update topic configuration

data.UpdateTopicConfig(<topic name>, <optional configuration>, <optional filename>)

Example configuration:

{
  "configs": [
    {
      "key": "cleanup.policy",
      "value": "compact"
    }
  ]
}

There are three options for setting the parameters:

  1. Topic name and configuration argument (second). This will update the topic with the provided configuration
  2. Topic name and configuration file (third). This will update the topic with the configuration from the file
  3. Provide only the configuration file with the topic name and config in the default section. This will load and update the topics specified in the configuration file. For example:
[Default]
topicname: my-topic
config:{
  "configs": [
    {
      "key": "cleanup.policy",
      "value": "compact"
    }
  ]
}

Create new topic

data.CreateTopic(<topic name>, <replication>, <partitions>, <optional configuration>, <optional filename>)

Example of configuration:

{
  "cleanup.policy": "compact",
  "compression.type": "snappy"
}

There are three options for setting the parameters:

  1. Topic name and configuration argument (second). This will update the topic with the provided configuration
  2. Topic name and configuration file (third). This will update the topic with the configuration from the file
  3. Provide only the configuration file with the topic name and config in the default section. This will load and update the topics specified in the configuration file.

Example

config = {"cleanup.policy": "compact","compression.type": "snappy"}
data.CreateTopic("test-topic",1,1,config)

Delete topic

data.DeleteTopic(<topic name>)

Delete topic records

With this one we can delete specific records of given topic. We should give as input the topic, the partition and the offset. For example, if we want to delete the first 100 records of topicA and from partition 0, we should have something like this:

data.DeleteTopicRecords(topicA,"0","100")

Managing SQL Processors

Using Lenses we can deploy and manage SQL processors.

Create new Processor

data.CreateProcessor(<processor name>, <sql query>, <runners>, <cluster name>, <optional namespace>, <optional pipeline>)

The parameters are:

  • sql query - The Lenses SQL to run
  • runners - The number of runners to spawn
  • cluster name - The cluster name, either the Connect cluster name or the Kubernetes cluster name. Use IN_PROC
  • optional namespace - Kubernetes namespace, only applicable in Kubernetes mode
  • optional pipeline - Kubernetes pipeline tag, only applicable in Kubernetes mode

Example

data.CreateProcessor("new-processor",
  "SET autocreate=true;INSERT INTO topicB SELECT * FROM topicA",
  1,
  "dev",
  "ns",
  "pipeline")

On successfully registration Lenses will return an id for the processor. For example something like this lsql_818a158c50a24d71952652ab49e75637

Deleting a Processor

data.DeleteProcessor(<processor name>)

Resume a Processor

data.ResumeProcessor(<processor name>)

Pause a Processor

data.PauseProcessor(<processor name>)

Scale a Processor

Scaling a processor involves changing the number of runners, either threads for IN_PROC mode, connect tasks from CONNECT mode or pods for KUBERNETES mode.

data.UpdateProcessor(<processor name>, <number of runners>)

Managing Schemas

Best practice is to use AVRO as the message format for your data. Lenses supports schema registries from both Confluent and Hortonworks (compatibility mode 0).

List all Subjects

data.GetAllSubjects()

List Subject Versions

data.ListVersionsSubj(<name of subject>)

Get a Schema by ID

data.GetSchemaById(<subject\'s id>)

Register new Schema

data.RegisterNewSchema(<name of schema>, <optional schema>, <optional filename>)

There are three options for setting the parameters:

  1. Topic name and schema argument (second). This will register the topic with the provided schema.
  2. Topic name and schema file (third). This will register the topic with the schema from the file.
  3. Provide the schema file only. This will register the schema specified in the configuration file.

Example of schema configuration

{"schema": "{
  "type":"record",
   "name":"reddit_post_key",
   "namespace":"com.landoop.social.reddit.post.key",
   "fields":[
     {
       "name":"subreddit_id",
       "type":"string"
     }
   ]
 }"
}

Example file configuration, no schema name:

[Default]
{"schema": "{
  "type":"record",
  "name":"reddit_post_key",
  "namespace":"com.landoop.social.reddit.post.key",
  "fields":[
    {
      "name":"subreddit_id",
      "type":"string"
    }
  ]
 }"
}

Get Global Compatibility

data.GetGlobalCompatibility()

Get Compatibility of a Subject

data.GetCompatibility(<subject name>)

Delete specific Subjects

data.DeleteSubj(<name of subject>)

Delete Schema by Version

data.DeleteSchemaByVersion(<name of subject>, <version of subject>)

Change Compatibility of Subject

data.ChangeCompatibility(<name of subject>, <optional compatibility>, <optional filename>)

Example of compatibility

{'compatibility': 'BACKWARD'}

There are three options for setting the parameters:

  1. Subject name and compatibility (second). This will update the subject with the provided compatibility level.
  2. Subject name and compatibility file (third). This will update the schema with the compatibility from the file.
  3. Provide only the compatibility file. This will set the subject to the compatibility specified in the configuration file.

For example:

[Default]
compatibility:{"compatibility": "BACKWARD"}

Update Global Compatibility

This command updates the compatibility on the Schema Registry servers:

data.UpdateGlobalCompatibility(<optional compatibility>, <optional filename>)

Example

{'compatibility': 'BACKWARD'}

There are two options for setting the parameters:

1. Compatibility. This will update the schema registries with the provided compatibility level. 2 Provide only the compatibility file. This will set the schema registry to the compatibility specified in the configuration file.

For example:

[Default]
compatibility:{"compatibility": "BACKWARD"}

Managing Connectors

Lenses allows you manage Kafka Connect Connectors to load and unload data from Kafka.

List Connectors

data.ListAllConnectors(<name of cluster>)

Get Connector Information

This command retrieves the status plus configuration of connectors:

data.GetInfoConnector(<name of cluster>, <name of connector>)

Example

connectorsnames = data.ListAllConnectors("dev")
data.GetInfoConnector("dev",connectorsnames[0])

The output will be something like this:

{
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "file": "/var/log/broker.log",
    "name": "logs-broker",
    "tasks.max": "1",
    "topic": "logs_broker"
  },
  "name": "logs-broker",
  "tasks": [{"connector": "logs-broker", "task": 0}]
}

Get Connector Configuration

data.GetConnectorConfig(<name of cluster>, <name of connector>)

Example

connectors = data.ListAllConnectors("dev")
data.GetConnectorConfig("dev", connectors[0])

Example output:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
  "file": "/var/log/broker.log",
  "name": "logs-broker",
  "tasks.max": "1",
  "topic": "logs_broker"
}

Get Connector Status

data.GetConnectorStatus(<name of cluster>, <name of connector>)

Example

connectors = data.ListAllConnectors("dev")
data.GetConnectorStatus("dev",connectors[0])

Example output:

{
  "connector" : { "state": "RUNNING", "worker_id": "172.17.0.3:8083" },
  "name" : "logs-broker",
  "tasks" : [{ "id": 0, "state": "RUNNING", "worker_id": "172.17.0.3:8083" }]
}

Get Connector Tasks

data.GetConnectorTasks(<name of cluster>, <name of connector>)

Example

connectors = data.ListAllConnectors("dev")
data.GetConnectorTasks("dev",connectors[0])

Example of output:

[
  {
    "config": {
      "file": "/var/log/broker.log",
      "task.class": "org.apache.kafka.connect.file.FileStreamSourceTask",
      "topic": "logs_broker"
    },
    "id": {"connector": "logs-broker", "task": 0}
  }
]

Get Status of specific Task

data.GetStatusTask(<name of cluster>, <name of connector>, <task id>)

Restart Connector Task

data.RestartConnectorTask(<name of cluster>, <name of connector>,<task id>)

Create new Connector

data.CreateConnector(<name of cluster>, <optional configuration>, <optional filename>)

Example configuration:

{
  "config": {
    "connect.coap.kcql": "1",
    "connector.class": "com.datamountaineer.streamreactor.connect.coap.sink.CoapSinkConnector"
  },
  "name": "name"
}

There are three options for setting the parameters:

  1. Set the cluster, name of the connector and the config.
  2. Set the cluster, name of the connector and the file to load the connector config from.
  3. Set the configuration file name only, specifying the cluster and connector name in the default section.
[Default]
cluster: my-cluster
connector: my-connector

config: {
  "config": {
    "connect.coap.kcql": "1",
    "connector.class": "com.datamountaineer.streamreactor.connect.coap.sink.coapsinkconnector"
  },
  "name": "name"
}

Set Connector Configuration

data.SetConnectorConfig(<name of cluster>, <name of connector>, <optional configuration>, <optional filename>)

Example configuration file:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
  "task.max": 5,
  "topics": "nyc_yellow_taxi_trip_data,reddit_posts,sea_vessel_position_reports,telecom_italia_data",
  "file": "/dev/null",
  "name": "nullsink"
}

There are three options for setting the parameters:

  1. Set the cluster, name of the connector and the config
  2. Set the cluster, name of the connector and the file to load the connector config from
  3. Set the configuration file name only, specifying the cluster and connector name in the default section.
[Default]
cluster: my-cluster
connector: my-connector

config: {
  "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
  "task.max": 5,
  "topics": "nyc_yellow_taxi_trip_data,reddit_posts,sea_vessel_position_reports, telecom_italia_data",
  "file": "/dev/null",
  "name": "nullsink"
}

Delete a Connector

To delete a connector, call the DeleteConnector method, supplying the name of the cluster and the connector name.

data.DeleteConnector(<name of cluster>, <name of connector>)

Continuous Queries

Lenses allows clients to submit SQL and subscribe to the output of the query continuously via web sockets.

Subscribing

A client can SUBSCRIBE to a topic via SQL as follows:

data.SubscribeHandler(<url>, <client id>, <sql query>, <write>, <filename>, <print_results>, <optional datetimelist>, <optional formatinglist>)

Parameters:

  • url - The Lenses WebSocket endpoint to subscribe to
  • client id - A unique identifier of the client
  • sql - The Lenses SQL query to run
  • write - A boolean parameter and is pre-defined as False. If defined as True, data is saved in a file. The file name is defined by filename parameter
  • filename - Name of the file to store the output to
  • print_results - Print the incoming data, default false.

The optional arguments datetimelist and formatinglist convert datetime strings to datetime objects. With these arguments we convert the datetime strings to datetime objects:

  • datetimelist - is a list which contains all the keys including datetime strings
  • formatinglist - is a list which contains the date format for each element in the list datetimelist

If all the formatting of dates is same, we put only one in formatinglist.

For more info about the format check this page http://strftime.org/.

Publishing

A client can PUBLISH messages to a topic. The current version supports only string/json. In the future, we will add support for AVRO.

data.Publish(<url>, <client id>, <topic>, <key>, <value>)

Unsubscribe

A client can UNSUBSCRIBE from a topic.

data.Unscribe(<url>, <client id>, <topic>)

Commit

A client can COMMIT the (topic, partition) offsets

data.Commit(<url>, <client id>, <topic>, <partition>, <offset>)

ACLs Handler

Create/Update ACLs

data.SetAcl(<resourceType>,<resourceName>,<principal>,<permissionType>,<host>, <operation>)
  • resourceType , string, required
  • resourceName, string, required
  • principal, string, required
  • permissionType, string, required(either Allow or Deny)
  • host, string, required
  • operation, string, required

Example

data.SetAcl("Topic","transactions","GROUPA:UserA","Allow","*","Read")

The following operations are valid (depending on the Kafka version)

Resource Type Operation
Topic Read
Topic Write
Topic Describe
Topic Delete
Topic DescribeConfigs
Topic AlterConfigs
Topic All
Group Read
Group Describe
Group All
Cluster Create
Cluster ClusterAction
Cluster DescribeConfigs
Cluster AlterConfigs
Cluster IdempotentWrite
Cluster Alter
Cluster Describe
Cluster All
TransactionalId Describe
TransactionalId Write
TransactionalId All

Get ACLs

data.GetACL()

Return a list of dictionaries

Quota Handler

Get Quotas

data.GetQuotas()

Return a list of dictionaries

Create/Update Quota - All Users

data.SetQuotasAllUsers(config)
  • config The quota constraints.

Example of config

{
  "producer_byte_rate" : "100000",
  "consumer_byte_rate" : "200000",
  "request_percentage" : "75"
}

Create/Update Quota - User all Clients

data.SetQuotaUserAllClients(user, config)

Where,

  • user The user to set the quota for
  • config The quota constraints

Example of config

{
  "producer_byte_rate" : "100000",
  "consumer_byte_rate" : "200000",
  "request_percentage" : "75"
}

Create/Update a Quota - User/Client pair

data.SetQuotaUserClient(user, clientid, config)

Where,

  • user The user to set the quota for
  • clientid The client id to set the quota for
  • config The quota constraints

Example of config

{
  "producer_byte_rate" : "100000",
  "consumer_byte_rate" : "200000",
  "request_percentage" : "75"
}

Create/Update a Quota - User

data.SetQuotaUser(user, config)

Where

  • user The user to set the quota for
  • config The quota constraints

Example of config

{
  "producer_byte_rate" : "100000",
  "consumer_byte_rate" : "200000",
  "request_percentage" : "75"
}

Create/Update Quota - All Clients

data.SetQuotaAllClient(config)

Where,

  • config The quota constraints

Example of config,

{
  "producer_byte_rate" : "100000",
  "consumer_byte_rate" : "200000",
  "request_percentage" : "75"
}

Create/Update a Quota - Client

data.SetQuotaClient(clientid, config)
  • clientid The client id to set the quota for
  • config The quota constraints

Example of config,

{
  "producer_byte_rate" : "100000",
  "consumer_byte_rate" : "200000",
  "request_percentage" : "75"
}

Delete Quota - All Users

data.DeleteQutaAllUsers(config)
  • config The list of quota settings to delete.

For example,

config=["producer_byte_rate","consumer_byte_rate"]

Delete Quota - User all Clients

data. DeleteQuotaUserAllClients(user, config)
  • user The user to set the quota for
  • config The list of quota settings to delete.

For example

config=["producer_byte_rate","consumer_byte_rate"]

Delete a Quota - User/Client pair

data.DeleteQuotaUserClient(user, clientid, config)
  • user The user to set the quota for
  • clientid The client id to set the quota for
  • config The list of quota settings to delete.

For example

config=["producer_byte_rate","consumer_byte_rate"]

Delete a Quota - User

data.DeleteQuotaUser(user, config)
  • user The user to set the quota for
  • config The list of quota settings to delete.

For example

config=["producer_byte_rate","consumer_byte_rate"]

Delete Quota - All Clients

data.DeleteQuotaAllClients(config)
  • config The list of quota settings to delete.

For example

config=["producer_byte_rate","consumer_byte_rate"]

Delete a Quota - Client

data.DeleteQuotaClient(clientid, config)
  • clientid The client id to set the quota for
  • config The list of quota settings to delete.

For example

config=["producer_byte_rate","consumer_byte_rate"]

Policy Handler

data.view_policy()

View all versions for a policy

TroubleShooting

For troubleshooting or additional information and join our slack channel