# Websocket Spec

## Query Data via SQL  <a href="#data-access---query-data-via-sql" id="data-access---query-data-via-sql"></a>

A WebSocket allows the client to request data using SQL as input.&#x20;

Use the following URL to open a WebSocket connection. Depending on deployment, use ws (insecure) or wss (secure) connection:

```
WS | WSS ${LENSES_URL}/environment/${YOUR_ENVIRONMENT_NAME}/proxy/api/ws/v2/sql/execute
```

{% hint style="info" %}
The query is per environment, you need to add the environment name.&#x20;
{% endhint %}

## Establishing a connection

To authenticate, create a [service account](https://docs.lenses.io/latest/devx/6.0/user-guide/iam/service-accounts) and set the token in the header for the Authorization Bearer.&#x20;

Once the connection is established, the server expects the client to send a JSON message with the following structure:

```json
{
  "sql": String,
  "live": Optional[Boolean],
  "stats": Optional[Int]
}
```

For example, in Python:

```python
import os, websockets, json, asyncio

import ssl
#Optional if using self-signed cert with the websocket:
# ssl_context = ssl._create_unverified_context()

async def execute_SQL(environmentName: str, sql: str) -> dict:

    #FOR Secure Websocket use:
    #  url = str("wss://") + os.getenv("HQ_HOST") + "/api/v1/environments/" + environmentName + "/proxy/api/ws/v2/sql/execute"
    url = str("ws://") + os.getenv("HQ_HOST") + "/api/v1/environments/" + environmentName + "/proxy/api/ws/v2/sql/execute"
    headers = {"Authorization": "Bearer " + os.getenv('HQ_TOKEN')}

    data = []
    try:
        request = {
            "sql": sql,
            "live": False,
            "stats": 2
        }

        #FOR WSS use:
        #async with websockets.connect(url, ssl=ssl_context, additional_headers=headers) as websocket:
        async with websockets.connect(url, additional_headers=headers) as websocket:
            await websocket.send(json.dumps(request))
            async for message in websocket:
                response = json.loads(message)
                if response.get('type') == 'RECORD':
                    data.append(response)

    except websockets.exceptions.WebSocketException as e:

        print(f"An error occurred: {e}")
        return None
    print(data)

asyncio.run(execute_SQL("demo", "SELECT * FROM input LIMIT 100;"))
```

## Query Request

| Field | Description                                                                               | Type               |
| ----- | ----------------------------------------------------------------------------------------- | ------------------ |
| sql   | The SQL query used to fetch data                                                          | string             |
| live  | If set, enables partial results for aggregation queries.                                  | optional\[boolean] |
| stats | Millis interval to receive query stats. If not provided the stats information is not sent | optional\[int]     |

## Protocol

The client should wait for incoming messages and stop when EOF is received. The server sends JSON-encoded messages, the payload structure is:

```json
{
  "type": <Enumeration>,
  "data": <Dependent on the type field>
}
```

Since there are multiple types of messages the server sends, the `type` attribute determines the data structure for the `data` payload. Below is the list of possible `type` values:

| Value     | Description                                                                                                                                                           |
| --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| HEARTBEAT | To keep the connection alive, the server injects an empty message.                                                                                                    |
| RECORD    | The message represents a data record.                                                                                                                                 |
| ERROR     | The message contains information about an error that occurred.                                                                                                        |
| STATS     | The message contains information about the current execution.Only activated if the `stats` field in the request is present.                                           |
| METADATA  | The message contains a list of metadata fields each record message will contain. For example: offset, partition,timestamp, \_\_keysize, \_\_valueSize, valueSchemaId. |
| SCHEMA    | Reserved.                                                                                                                                                             |
| PAGE\_END | Reserved.                                                                                                                                                             |
| END       | The message signals the end of the execution. The server will close the socket after this message is sent.                                                            |
| BADRECORD | The message contains information about the Kafka message which cannot be read. For example, if the topic expects Avro and the payload is not a valid Avro.            |
| SENTINEL  | The message contains information about the execution termination reason when query thresholds are reached.                                                            |

### Heartbeat  <a href="#heartbeat" id="heartbeat"></a>

The `data` the attribute is null. The client should ignore these messages.

```json
{
  "type": "HEARTBEAT"
}
```

### Error  <a href="#error" id="error"></a>

In this case, the payload structure is:

```json
{
  "type": "HEARTBEAT",
  "data": String
}
```

### Stats  <a href="#stats" id="stats"></a>

When requested the payload format is:

```json
{
  "type": "HEARTBEAT",
  "data":   {
    id:              String,
    bytesRead:       Long,
    sources:         Map[UUID, String],
    sourceConfigs:   Map[UUID, Map[String, Any]],
    sourcesStats:    Map[UUID, Map[Int, Map[String, Long]]],
    badRecordsTotal: Long,
    recordsSkipped:  Long,
    results:         Long,
    recordsScanned:  Long,
    duration:        Long,
    startedAt:       Long,
}
```

A possible payload can be:

```json
{
  "type": "STATS",
  "data": {
    "id": "6a29ef99-3914-4640-b0dc-ec4a07c7ffc5",
    "bytesRead": 51972426,
    "sources": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": "cc_payments"
    },
    "sourceConfigs": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "live.aggs": true,
        "max.idle.time": 20000,
        "format.timestamp": true,
        "show.bad.records": true,
        "kafka.offset.timeout": 10000,
        "max.query.time": 3600000,
        "max.size": 209715200
      }
    },
    "sourcesStats": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "0": {
          "begin": 0,
          "offsetBound": 541950,
          "bytesRead": 51972426,
          "recordsScanned": 541951,
          "end": 541950
        }
      }
    },
    "badRecordsTotal": 0,
    "recordsSkipped": 0,
    "results": 119,
    "recordsScanned": 541951,
    "duration": 18196,
    "startedAt": 1678901355694
  }
}
```

### Metadata  <a href="#metadata" id="metadata"></a>

The payload format is:

```json
{
  "type": "METADATA",
  "data": {
    "fields": Array[String]
  }
}
```

### End  <a href="#end" id="end"></a>

The payload format is:

```json
{
  "type": "END"
}
```

### Bad Record  <a href="#bad-record" id="bad-record"></a>

The payload format is:

```json
{
  "type": "BADRECORD",
  "data": {
    "metadata": {
      "offset": Optional[Long],
      "partition": Optional[Long],
      "timestamp": Optional[Long],
      "table": String,
      "sourceId": String
    }
  }
}
```

### Sentinel  <a href="#sentinel" id="sentinel"></a>

The payload format is:

```json
{
  "type": "SENTINEL",
  "data": {
    "event": String,
    "reason": {
      "type": String,
      "timeout":  Optional[Long],
      "source":   String,
      "sourceId": String,
      "amount":Long        
    }
  }
}
```

### Record  <a href="#record" id="record"></a>

Given the `type` RECORD the payloads contains the data returned to the client, and it is dependent on the SQL input. The payload format is:

```json
{
  "type": "RECORD",
  "data": {
    "fields": <dynamic>,
    "value": <dynamic>,
    "metadata": <dynamic>,
    "rownum": Optional[Long]
  }
}
```

| Value         | Description                                                                                                                                                                            |
| ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| data.key      | Returns the underlying Kafka message key value. Only applied when `*` projection is used. When a key projection is used (i.e. \_key.MMSI), the value is returned as `data.value.MMSI`. |
| data.data     | Will contain the output generated by the SQL projections. If `*` is used for the SQL projection, it will return the Kafka message value structure.                                     |
| data.metadata | Returned when no aggregations are involved. It contains the Kafka message partition, offset, byte size information                                                                     |
| data.rownum   | Reserved optional long value.                                                                                                                                                          |

## Examples  <a href="#examples" id="examples"></a>

For a topic which contains nested data for both key and value.

```sql
SELECT * FROM sea_vessel_position_reports
```

The payload format is:

```json
{
  "type": "RECORD",
  "data": {
    "key": {
      "MMSI": 219005662
    },
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}
```

If the Kafka message key is null the response would be:

```json
{
  "type": "RECORD",
  "data": {
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}
```

Projecting the key will not return the `key` attribute:

```sql
SELECT _key.MMSI FROM sea_vessel_position_reports
```

produces the following output:

```json
{
  "type": "RECORD",
  "data": {
    "value": {
      "MMSI": 265527470
    },
    "metadata": {
      "offset": 86,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:29.107Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 86
  }
}
```

Aggregating data will not return the `key` and `metadata` attributes:

```SQL
SELECT count(*), currency 
FROM cc_payments
GROUP BY currency
```

```json
{
  "type": "RECORD",
  "data": {
    "value": {
      "count": 20066,
      "currency": "NOR"
    },
    "rownum": 90
  },
  "rowId": "fc767952-97e5-46cf-868c-7cbfbcde21fb",
  "statementIndex": null
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/devx/6.0/resources/api-reference/websocket-spec.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
