LogoLogo
HomeProductsDownload Community Edition
6.0
  • Lenses DevX
  • Kafka Connectors
6.0
  • Overview
  • What's New?
    • Version 6.0.5
      • Features / Improvements & Fixes
    • Version 6.0.4
      • Features / Improvements & Fixes
    • Version 6.0.3
      • Features / Improvements & Fixes
    • Version 6.0.2
    • Version 6.0.1
    • Version 6.0.0-la.2
      • Features / Improvements & Fixes
    • Version 6.0.0-la.1
      • Features / Improvements & Fixes
    • Version 6.0.0-la.0
      • Features / Improvements & Fixes
    • Version 6.0.0-alpha.20
      • Features / Improvements & Fixes
      • Helm
    • Version 6.0.0-alpha.19
      • Features / Improvements & Fixes
      • Helm
    • Version 6.0.0-alpha.18
      • Features / Improvements & Fixes
      • Helm
    • Version 6.0.0-alpha.17
      • Features / Improvements & Fixes
      • Helm
    • Version 6.0.0-alpha.16
    • Version 6.0.0-alpha.14
  • Getting Started
    • Setting Up Community Edition
      • Hands-On Walk Through of Community Edition
    • Connecting Lenses to your Kafka environment
      • Overview
      • Install
  • Deployment
    • Installation
      • Kubernetes - Helm
        • Deploying HQ
        • Deploying an Agent
      • Docker
        • Deploying HQ
        • Deploying an Agent
      • Linux
        • Deploying HQ
        • Deploying an Agent
    • Configuration
      • Authentication
        • Admin Account
        • Basic Authentication
        • SSO & SAML
          • Overview
          • Azure SSO
          • Google SSO
          • Keycloak SSO
          • Okta SSO
          • OneLogin SSO
          • Generic SSO
      • HQ
        • Configuration Reference
      • Agent
        • Overview
        • Provisioning
          • Overview
          • HQ
          • Kafka
            • Apache Kafka
            • Aiven
            • AWS MSK
            • AWS MSK Serverless
            • Azure EventHubs
            • Azure HDInsight
            • Confluent Cloud
            • Confluent Platform
            • IBM Event Streams
          • Schema Registries
            • Overview
            • AWS Glue
            • Confluent
            • Apicurio
            • IBM Event Streams Registry
          • Kafka Connect
          • Zookeeper
          • AWS
          • Alert & Audit integrations
          • Infrastructure JMX Metrics
        • Hardware & OS
        • Memory & CPU
        • Database
        • TLS
        • Kafka ACLs
        • Rate Limiting
        • JMX Metrics
        • JVM Options
        • SQL Processor Deployment
        • Logs
        • Plugins
        • Configuration Reference
  • User Guide
    • Environments
      • Create New Environment
    • Lenses Resource Names (LRNs)
    • Identity & Access Management
      • Overview
      • Users
      • Groups
      • Roles
      • Service Accounts
      • IAM Reference
      • Example Policies
    • Topics
      • Global Topic Catalogue
      • Environment Topic Catalogue
        • Finding topics & fields
        • Searching for messages
        • Inserting & deleting messages
        • Viewing topic metrics
        • Viewing topic partitions
        • Topic Settings
        • Adding metadata & tags to topics
        • Managing topic configurations
        • Approval requests
        • Downloading messages
        • Backup & Restore
    • SQL Studio
      • Concepts
      • Best practices
      • Filter by timestamp or offset
      • Creating & deleting Kafka topics
      • Filtering
      • Limit & Sampling
      • Joins
      • Inserting & deleting data
      • Aggregations
      • Metadata fields
      • Views & synonyms
      • Arrays
      • Managing queries
    • Applications
      • Connectors
        • Overview
        • Sources
        • Sinks
        • Secret Providers
      • SQL Processors
        • Concepts
        • Projections
        • Joins
        • Lateral Joins
        • Aggregations
        • Time & Windows
        • Storage format
        • Nullibility
        • Settings
      • External Applications
        • Registering via SDK
        • Registering via REST
    • Schemas
    • Monitoring & Alerting
      • Infrastructure Health
      • Alerting
        • Alert Reference
      • Integrations
      • Consumer Groups
    • Self Service & Governance
      • Data policies
      • Audits
      • Kafka ACLs
      • Kafka Quotas
    • Topology
    • Tutorials
      • SQL Processors
        • Data formats
          • Changing data formats
          • Rekeying data
          • Controlling AVRO record names and namespaces
          • Changing the shape of data
        • Filtering & Joins
          • Filtering data
          • Enriching data streams
          • Joining streams of data
          • Using multiple topics
        • Aggregations
          • Aggregating data in a table
          • Aggregating streams
          • Time window aggregations
        • Complex types
          • Unwrapping complex types
          • Working with Arrays
        • Controlling event time
      • SQL Studio
        • Querying data
        • Accessing headers
        • Deleting data from compacted topics
        • Working with JSON
    • SQL Reference
      • Expressions
      • Functions
        • Aggregate
          • AVG
          • BOTTOMK
          • COLLECT
          • COLLECT_UNIQUE
          • COUNT
          • FIRST
          • LAST
          • MAXK
          • MAXK_UNIQUE
          • MINK
          • MINK_UNIQUE
          • SUM
          • TOPK
        • Array
          • ELEMENT_OF
          • FLATTEN
          • IN_ARRAY
          • REPEAT
          • SIZEOF
          • ZIP_ALL
          • ZIP
        • Conditions
        • Conversion
        • Date & Time
          • CONVERT_DATETIME
          • DATE
          • DATETIME
          • EXTRACT_TIME
          • EXTRACT_DATE
          • FORMAT_DATE
          • FORMAT_TIME
          • FORMAT_TIMESTAMP
          • HOUR
          • MONTH_TEXT
          • MINUTE
          • MONTH
          • PARSE_DATE
          • PARSE_TIME_MILLIS
          • PARSE_TIME_MICROS
          • PARSE_TIMESTAMP_MILLIS
          • PARSE_TIMESTAMP_MICROS
          • SECOND
          • TIMESTAMP
          • TIME_MICROS
          • TIMESTAMP_MICROS
          • TIME_MILLIS
          • TIMESTAMP_MILLIS
          • TO_DATE
          • TO_DATETIME
          • TOMORROW
          • TO_TIMESTAMP
          • YEAR
          • YESTERDAY
        • Headers
          • HEADERASSTRING
          • HEADERASINT
          • HEADERASLONG
          • HEADERASDOUBLE
          • HEADERASFLOAT
          • HEADERKEYS
        • JSON
          • JSON_EXTRACT_FIRST
          • JSON_EXTRACT_ALL
        • Numeric
          • ABS
          • ACOS
          • ASIN
          • ATAN
          • CBRT
          • CEIL
          • COSH
          • COS
          • DEGREES
          • DISTANCE
          • FLOOR
          • MAX
          • MIN
          • MOD
          • NEG
          • POW
          • RADIANS
          • RANDINT
          • ROUND
          • SIGN
          • SINH
          • SIN
          • SQRT
          • TANH
          • TAN
        • Nulls
          • ISNULL
          • ISNOTNULL
          • COALESCE
          • AS_NULLABLE
          • AS_NON_NULLABLE
        • Obfuscation
          • ANONYMIZE
          • MASK
          • EMAIL
          • FIRST1
          • FIRST2
          • FIRST3
          • FIRST4
          • LAST1
          • LAST2
          • LAST3
          • LAST4
          • INITIALS
        • Offsets
        • Schema
          • TYPEOF
          • DUMP
        • String
          • ABBREVIATE
          • BASE64
          • CAPITALIZE
          • CENTER
          • CHOP
          • CONCAT
          • CONTAINS
          • DECODE64
          • DELETEWHITESPACE
          • DIGITS
          • DROPLEFT
          • DROPRIGHT
          • ENDSWITH
          • INDEXOF
          • LEN
          • LOWER
          • LPAD
          • MKSTRING
          • REGEXP
          • REGEX_MATCHES
          • REPLACE
          • REVERSE
          • RPAD
          • STARTSWITH
          • STRIPACCENTS
          • SUBSTR
          • SWAPCASE
          • TAKELEFT
          • TAKERIGHT
          • TRIM
          • TRUNCATE
          • UNCAPITALIZE
          • UPPER
          • UUID
        • User Defined Functions
        • User Defined Aggregate Functions
      • Deserializers
      • Supported data formats
        • Protobuf
  • Resources
    • Downloads
    • CLI
      • Environment Creation
    • API Reference
      • API Authentication
      • Websocket Spec
      • Lenses API Spec
        • Authentication
        • Environments
        • Users
        • Groups
        • Roles
        • Service Accounts
        • Meta
        • Settings
        • License
        • Topics
        • Applications
          • SQL Processors
          • Kafka Connectors
          • External Applications
        • Kafka ACLs & Quotas
        • Kafka Consumer Groups
        • Schema Registry
        • SQL Query Management
        • Data Policies
        • Alert Channels
        • Audit Channels
        • Provisioning State
        • Agent Metadata
        • Backup & Restore
        • As Code
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Query Data via SQL
  • Establishing a connection
  • Query Request
  • Protocol
  • Heartbeat
  • Error
  • Stats
  • Metadata
  • End
  • Bad Record
  • Sentinel
  • Record
  • Examples

Was this helpful?

Export as PDF
  1. Resources
  2. API Reference

Websocket Spec

PreviousAPI AuthenticationNextLenses API Spec

Last updated 16 days ago

Was this helpful?

Query Data via SQL

A WebSocket allows the client to request data using SQL as input.

Use the following URL to open a WebSocket connection. Depending on deployment, use ws (insecure) or wss (secure) connection:

WS | WSS ${LENSES_URL}/environment/${YOUR_ENVIRONMENT_NAME}/proxy/api/ws/v2/sql/execute

The query is per environment, you need to add the environment name.

Establishing a connection

To authenticate, create a and set the token in the header for the Authorization Bearer.

Once the connection is established, the server expects the client to send a JSON message with the following structure:

{
  "sql": String,
  "live": Optional[Boolean],
  "stats": Optional[Int]
}

For example, in Python:

import os, websockets, json, asyncio

import ssl
#Optional if using self-signed cert with the websocket:
# ssl_context = ssl._create_unverified_context()

async def execute_SQL(environmentName: str, sql: str) -> dict:

    #FOR Secure Websocket use:
    #  url = str("wss://") + os.getenv("HQ_HOST") + "/api/v1/environments/" + environmentName + "/proxy/api/ws/v2/sql/execute"
    url = str("ws://") + os.getenv("HQ_HOST") + "/api/v1/environments/" + environmentName + "/proxy/api/ws/v2/sql/execute"
    headers = {"Authorization": "Bearer " + os.getenv('HQ_TOKEN')}

    data = []
    try:
        request = {
            "sql": sql,
            "live": False,
            "stats": 2
        }

        #FOR WSS use:
        #async with websockets.connect(url, ssl=ssl_context, additional_headers=headers) as websocket:
        async with websockets.connect(url, additional_headers=headers) as websocket:
            await websocket.send(json.dumps(request))
            async for message in websocket:
                response = json.loads(message)
                if response.get('type') == 'RECORD':
                    data.append(response)

    except websockets.exceptions.WebSocketException as e:

        print(f"An error occurred: {e}")
        return None
    print(data)

asyncio.run(execute_SQL("demo", "SELECT * FROM input LIMIT 100;"))

Query Request

Field
Description
Type

sql

The SQL query used to fetch data

string

live

If set, enables partial results for aggregation queries.

optional[boolean]

stats

Millis interval to receive query stats. If not provided the stats information is not sent

optional[int]

Protocol

The client should wait for incoming messages and stop when EOF is received. The server sends JSON-encoded messages, the payload structure is:

{
  "type": <Enumeration>,
  "data": <Dependent on the type field>
}

Since there are multiple types of messages the server sends, the type attribute determines the data structure for the data payload. Below is the list of possible type values:

Value
Description

HEARTBEAT

To keep the connection alive, the server injects an empty message.

RECORD

The message represents a data record.

ERROR

The message contains information about an error that occurred.

STATS

The message contains information about the current execution.Only activated if the stats field in the request is present.

METADATA

The message contains a list of metadata fields each record message will contain. For example: offset, partition,timestamp, __keysize, __valueSize, valueSchemaId.

SCHEMA

Reserved.

PAGE_END

Reserved.

END

The message signals the end of the execution. The server will close the socket after this message is sent.

BADRECORD

The message contains information about the Kafka message which cannot be read. For example, if the topic expects Avro and the payload is not a valid Avro.

SENTINEL

The message contains information about the execution termination reason when query thresholds are reached.

Heartbeat

The data the attribute is null. The client should ignore these messages.

{
  "type": "HEARTBEAT"
}

Error

In this case, the payload structure is:

{
  "type": "HEARTBEAT",
  "data": String
}

Stats

When requested the payload format is:

{
  "type": "HEARTBEAT",
  "data":   {
    id:              String,
    bytesRead:       Long,
    sources:         Map[UUID, String],
    sourceConfigs:   Map[UUID, Map[String, Any]],
    sourcesStats:    Map[UUID, Map[Int, Map[String, Long]]],
    badRecordsTotal: Long,
    recordsSkipped:  Long,
    results:         Long,
    recordsScanned:  Long,
    duration:        Long,
    startedAt:       Long,
}

A possible payload can be:

{
  "type": "STATS",
  "data": {
    "id": "6a29ef99-3914-4640-b0dc-ec4a07c7ffc5",
    "bytesRead": 51972426,
    "sources": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": "cc_payments"
    },
    "sourceConfigs": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "live.aggs": true,
        "max.idle.time": 20000,
        "format.timestamp": true,
        "show.bad.records": true,
        "kafka.offset.timeout": 10000,
        "max.query.time": 3600000,
        "max.size": 209715200
      }
    },
    "sourcesStats": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "0": {
          "begin": 0,
          "offsetBound": 541950,
          "bytesRead": 51972426,
          "recordsScanned": 541951,
          "end": 541950
        }
      }
    },
    "badRecordsTotal": 0,
    "recordsSkipped": 0,
    "results": 119,
    "recordsScanned": 541951,
    "duration": 18196,
    "startedAt": 1678901355694
  }
}

Metadata

The payload format is:

{
  "type": "METADATA",
  "data": {
    "fields": Array[String]
  }
}

End

The payload format is:

{
  "type": "END"
}

Bad Record

The payload format is:

{
  "type": "BADRECORD",
  "data": {
    "metadata": {
      "offset": Optional[Long],
      "partition": Optional[Long],
      "timestamp": Optional[Long],
      "table": String,
      "sourceId": String
    }
  }
}

Sentinel

The payload format is:

{
  "type": "SENTINEL",
  "data": {
    "event": String,
    "reason": {
      "type": String,
      "timeout":  Optional[Long],
      "source":   String,
      "sourceId": String,
      "amount":Long        
    }
  }
}

Record

Given the type RECORD the payloads contains the data returned to the client, and it is dependent on the SQL input. The payload format is:

{
  "type": "RECORD",
  "data": {
    "fields": <dynamic>,
    "value": <dynamic>,
    "metadata": <dynamic>,
    "rownum": Optional[Long]
  }
}
Value
Description

data.key

Returns the underlying Kafka message key value. Only applied when * projection is used. When a key projection is used (i.e. _key.MMSI), the value is returned as data.value.MMSI.

data.data

Will contain the output generated by the SQL projections. If * is used for the SQL projection, it will return the Kafka message value structure.

data.metadata

Returned when no aggregations are involved. It contains the Kafka message partition, offset, byte size information

data.rownum

Reserved optional long value.

Examples

For a topic which contains nested data for both key and value.

SELECT * FROM sea_vessel_position_reports

The payload format is:

{
  "type": "RECORD",
  "data": {
    "key": {
      "MMSI": 219005662
    },
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}

If the Kafka message key is null the response would be:

{
  "type": "RECORD",
  "data": {
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}

Projecting the key will not return the key attribute:

SELECT _key.MMSI FROM sea_vessel_position_reports

produces the following output:

{
  "type": "RECORD",
  "data": {
    "value": {
      "MMSI": 265527470
    },
    "metadata": {
      "offset": 86,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:29.107Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 86
  }
}

Aggregating data will not return the key and metadata attributes:

SELECT count(*), currency 
FROM cc_payments
GROUP BY currency
{
  "type": "RECORD",
  "data": {
    "value": {
      "count": 20066,
      "currency": "NOR"
    },
    "rownum": 90
  },
  "rowId": "fc767952-97e5-46cf-868c-7cbfbcde21fb",
  "statementIndex": null
}
service account