Websocket Spec
Query Data via SQL
A WebSocket allows the client to request data using SQL as input.
Use the following URL to open a WebSocket connection. Depending on deployment, use ws (insecure) or wss (secure) connection:
WS | WSS ${LENSES_URL}/environment/${YOUR_ENVIRONMENT_NAME}/proxy/api/ws/v2/sql/execute
Establishing a connection
To authenticate, create a service account and set the token in the header for the Authorization Bearer.
Once the connection is established, the server expects the client to send a JSON message with the following structure:
{
"sql": String,
"live": Optional[Boolean],
"stats": Optional[Int]
}
For example, in Python:
import os, websockets, json, asyncio
import ssl
#Optional if using self-signed cert with the websocket:
# ssl_context = ssl._create_unverified_context()
async def execute_SQL(environmentName: str, sql: str) -> dict:
#FOR Secure Websocket use:
# url = str("wss://") + os.getenv("HQ_HOST") + "/api/v1/environments/" + environmentName + "/proxy/api/ws/v2/sql/execute"
url = str("ws://") + os.getenv("HQ_HOST") + "/api/v1/environments/" + environmentName + "/proxy/api/ws/v2/sql/execute"
headers = {"Authorization": "Bearer " + os.getenv('HQ_TOKEN')}
data = []
try:
request = {
"sql": sql,
"live": False,
"stats": 2
}
#FOR WSS use:
#async with websockets.connect(url, ssl=ssl_context, additional_headers=headers) as websocket:
async with websockets.connect(url, additional_headers=headers) as websocket:
await websocket.send(json.dumps(request))
async for message in websocket:
response = json.loads(message)
if response.get('type') == 'RECORD':
data.append(response)
except websockets.exceptions.WebSocketException as e:
print(f"An error occurred: {e}")
return None
print(data)
asyncio.run(execute_SQL("demo", "SELECT * FROM input LIMIT 100;"))
Query Request
sql
The SQL query used to fetch data
string
live
If set, enables partial results for aggregation queries.
optional[boolean]
stats
Millis interval to receive query stats. If not provided the stats information is not sent
optional[int]
Protocol
The client should wait for incoming messages and stop when EOF is received. The server sends JSON-encoded messages, the payload structure is:
{
"type": <Enumeration>,
"data": <Dependent on the type field>
}
Since there are multiple types of messages the server sends, the type
attribute determines the data structure for the data
payload. Below is the list of possible type
values:
HEARTBEAT
To keep the connection alive, the server injects an empty message.
RECORD
The message represents a data record.
ERROR
The message contains information about an error that occurred.
STATS
The message contains information about the current execution.Only activated if the stats
field in the request is present.
METADATA
The message contains a list of metadata fields each record message will contain. For example: offset, partition,timestamp, __keysize, __valueSize, valueSchemaId.
SCHEMA
Reserved.
PAGE_END
Reserved.
END
The message signals the end of the execution. The server will close the socket after this message is sent.
BADRECORD
The message contains information about the Kafka message which cannot be read. For example, if the topic expects Avro and the payload is not a valid Avro.
SENTINEL
The message contains information about the execution termination reason when query thresholds are reached.
Heartbeat
The data
the attribute is null. The client should ignore these messages.
{
"type": "HEARTBEAT"
}
Error
In this case, the payload structure is:
{
"type": "HEARTBEAT",
"data": String
}
Stats
When requested the payload format is:
{
"type": "HEARTBEAT",
"data": {
id: String,
bytesRead: Long,
sources: Map[UUID, String],
sourceConfigs: Map[UUID, Map[String, Any]],
sourcesStats: Map[UUID, Map[Int, Map[String, Long]]],
badRecordsTotal: Long,
recordsSkipped: Long,
results: Long,
recordsScanned: Long,
duration: Long,
startedAt: Long,
}
A possible payload can be:
{
"type": "STATS",
"data": {
"id": "6a29ef99-3914-4640-b0dc-ec4a07c7ffc5",
"bytesRead": 51972426,
"sources": {
"ad1afc73-3dec-41e7-b679-aa35c55e3328": "cc_payments"
},
"sourceConfigs": {
"ad1afc73-3dec-41e7-b679-aa35c55e3328": {
"live.aggs": true,
"max.idle.time": 20000,
"format.timestamp": true,
"show.bad.records": true,
"kafka.offset.timeout": 10000,
"max.query.time": 3600000,
"max.size": 209715200
}
},
"sourcesStats": {
"ad1afc73-3dec-41e7-b679-aa35c55e3328": {
"0": {
"begin": 0,
"offsetBound": 541950,
"bytesRead": 51972426,
"recordsScanned": 541951,
"end": 541950
}
}
},
"badRecordsTotal": 0,
"recordsSkipped": 0,
"results": 119,
"recordsScanned": 541951,
"duration": 18196,
"startedAt": 1678901355694
}
}
Metadata
The payload format is:
{
"type": "METADATA",
"data": {
"fields": Array[String]
}
}
End
The payload format is:
{
"type": "END"
}
Bad Record
The payload format is:
{
"type": "BADRECORD",
"data": {
"metadata": {
"offset": Optional[Long],
"partition": Optional[Long],
"timestamp": Optional[Long],
"table": String,
"sourceId": String
}
}
}
Sentinel
The payload format is:
{
"type": "SENTINEL",
"data": {
"event": String,
"reason": {
"type": String,
"timeout": Optional[Long],
"source": String,
"sourceId": String,
"amount":Long
}
}
}
Record
Given the type
RECORD the payloads contains the data returned to the client, and it is dependent on the SQL input. The payload format is:
{
"type": "RECORD",
"data": {
"fields": <dynamic>,
"value": <dynamic>,
"metadata": <dynamic>,
"rownum": Optional[Long]
}
}
data.key
Returns the underlying Kafka message key value. Only applied when *
projection is used. When a key projection is used (i.e. _key.MMSI), the value is returned as data.value.MMSI
.
data.data
Will contain the output generated by the SQL projections. If *
is used for the SQL projection, it will return the Kafka message value structure.
data.metadata
Returned when no aggregations are involved. It contains the Kafka message partition, offset, byte size information
data.rownum
Reserved optional long value.
Examples
For a topic which contains nested data for both key and value.
SELECT * FROM sea_vessel_position_reports
The payload format is:
{
"type": "RECORD",
"data": {
"key": {
"MMSI": 219005662
},
"value": {
"Type": 1,
"Repeat": 0,
"MMSI": 219005662,
"Speed": 0.1,
"Accuracy": false,
"Longitude": 12.570543333333333,
"Latitude": 55.85040166666667,
"location": "55.850402,12.570543",
"Course": 177.9,
"Heading": 511,
"Second": 20,
"RAIM": false,
"Radio": 33577,
"Status": 3,
"Turn": -128.0,
"Maneuver": 0,
"Timestamp": 1491318144456507320
},
"metadata": {
"offset": 71,
"partition": 1,
"timestamp": "2023-03-13T11:42:24.267Z",
"__keysize": 10,
"__valuesize": 79,
"keySchemaId": 4,
"valueSchemaId": 5
},
"rownum": 71
}
}
If the Kafka message key is null the response would be:
{
"type": "RECORD",
"data": {
"value": {
"Type": 1,
"Repeat": 0,
"MMSI": 219005662,
"Speed": 0.1,
"Accuracy": false,
"Longitude": 12.570543333333333,
"Latitude": 55.85040166666667,
"location": "55.850402,12.570543",
"Course": 177.9,
"Heading": 511,
"Second": 20,
"RAIM": false,
"Radio": 33577,
"Status": 3,
"Turn": -128.0,
"Maneuver": 0,
"Timestamp": 1491318144456507320
},
"metadata": {
"offset": 71,
"partition": 1,
"timestamp": "2023-03-13T11:42:24.267Z",
"__keysize": 10,
"__valuesize": 79,
"keySchemaId": 4,
"valueSchemaId": 5
},
"rownum": 71
}
}
Projecting the key will not return the key
attribute:
SELECT _key.MMSI FROM sea_vessel_position_reports
produces the following output:
{
"type": "RECORD",
"data": {
"value": {
"MMSI": 265527470
},
"metadata": {
"offset": 86,
"partition": 1,
"timestamp": "2023-03-13T11:42:29.107Z",
"__keysize": 10,
"__valuesize": 79,
"keySchemaId": 4,
"valueSchemaId": 5
},
"rownum": 86
}
}
Aggregating data will not return the key
and metadata
attributes:
SELECT count(*), currency
FROM cc_payments
GROUP BY currency
{
"type": "RECORD",
"data": {
"value": {
"count": 20066,
"currency": "NOR"
},
"rownum": 90
},
"rowId": "fc767952-97e5-46cf-868c-7cbfbcde21fb",
"statementIndex": null
}
Last updated
Was this helpful?