# Websocket

## Query Data via SQL <a href="#data-access---query-data-via-sql" id="data-access---query-data-via-sql"></a>

A WebSocket allows the client to request data using SQL as input.

Use the following URL to open a WebSocket connection. Depending on deployment, use ws (insecure) or wss (secure) connection:

```
WS | WSS ${LENSES_URL}/environment/${YOUR_ENVIRONMENT_NAME}/proxy/api/ws/v2/sql/execute
```

{% hint style="info" %}
The query is per environment, you need to add the environment name.
{% endhint %}

## Establishing a connection

To authenticate, create a [service account](https://docs.lenses.io/latest/api-reference/broken-reference) and set the token in the header for the Authorization Bearer.

Once the connection is established, the server expects the client to send a JSON message with the following structure:

```json
{
  "sql": String,
  "live": Optional[Boolean],
  "stats": Optional[Int]
}
```

For example, in Python:

```python
import os, websockets, json, asyncio

import ssl
#Optional if using self-signed cert with the websocket:
# ssl_context = ssl._create_unverified_context()

async def execute_SQL(environmentName: str, sql: str) -> dict:

    #FOR Secure Websocket use:
    #  url = str("wss://") + os.getenv("HQ_HOST") + "/api/v1/environments/" + environmentName + "/proxy/api/ws/v2/sql/execute"
    url = str("ws://") + os.getenv("HQ_HOST") + "/api/v1/environments/" + environmentName + "/proxy/api/ws/v2/sql/execute"
    headers = {"Authorization": "Bearer " + os.getenv('HQ_TOKEN')}

    data = []
    try:
        request = {
            "sql": sql,
            "live": False,
            "stats": 2
        }

        #FOR WSS use:
        #async with websockets.connect(url, ssl=ssl_context, additional_headers=headers) as websocket:
        async with websockets.connect(url, additional_headers=headers) as websocket:
            await websocket.send(json.dumps(request))
            async for message in websocket:
                response = json.loads(message)
                if response.get('type') == 'RECORD':
                    data.append(response)

    except websockets.exceptions.WebSocketException as e:

        print(f"An error occurred: {e}")
        return None
    print(data)

asyncio.run(execute_SQL("demo", "SELECT * FROM input LIMIT 100;"))
```

## Query Request

| Field | Description                                                                               | Type               |
| ----- | ----------------------------------------------------------------------------------------- | ------------------ |
| sql   | The SQL query used to fetch data                                                          | string             |
| live  | If set, enables partial results for aggregation queries.                                  | optional\[boolean] |
| stats | Millis interval to receive query stats. If not provided the stats information is not sent | optional\[int]     |

## Protocol

The client should wait for incoming messages and stop when EOF is received. The server sends JSON-encoded messages, the payload structure is:

```json
{
  "type": <Enumeration>,
  "data": <Dependent on the type field>
}
```

Since there are multiple types of messages the server sends, the `type` attribute determines the data structure for the `data` payload. Below is the list of possible `type` values:

| Value     | Description                                                                                                                                                           |
| --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| HEARTBEAT | To keep the connection alive, the server injects an empty message.                                                                                                    |
| RECORD    | The message represents a data record.                                                                                                                                 |
| ERROR     | The message contains information about an error that occurred.                                                                                                        |
| STATS     | The message contains information about the current execution.Only activated if the `stats` field in the request is present.                                           |
| METADATA  | The message contains a list of metadata fields each record message will contain. For example: offset, partition,timestamp, \_\_keysize, \_\_valueSize, valueSchemaId. |
| SCHEMA    | Reserved.                                                                                                                                                             |
| PAGE\_END | Reserved.                                                                                                                                                             |
| END       | The message signals the end of the execution. The server will close the socket after this message is sent.                                                            |
| BADRECORD | The message contains information about the Kafka message which cannot be read. For example, if the topic expects Avro and the payload is not a valid Avro.            |
| SENTINEL  | The message contains information about the execution termination reason when query thresholds are reached.                                                            |

### Heartbeat <a href="#heartbeat" id="heartbeat"></a>

The `data` the attribute is null. The client should ignore these messages.

```json
{
  "type": "HEARTBEAT"
}
```

### Error <a href="#error" id="error"></a>

In this case, the payload structure is:

```json
{
  "type": "HEARTBEAT",
  "data": String
}
```

### Stats <a href="#stats" id="stats"></a>

When requested the payload format is:

```json
{
  "type": "HEARTBEAT",
  "data":   {
    id:              String,
    bytesRead:       Long,
    sources:         Map[UUID, String],
    sourceConfigs:   Map[UUID, Map[String, Any]],
    sourcesStats:    Map[UUID, Map[Int, Map[String, Long]]],
    badRecordsTotal: Long,
    recordsSkipped:  Long,
    results:         Long,
    recordsScanned:  Long,
    duration:        Long,
    startedAt:       Long,
}
```

A possible payload can be:

```json
{
  "type": "STATS",
  "data": {
    "id": "6a29ef99-3914-4640-b0dc-ec4a07c7ffc5",
    "bytesRead": 51972426,
    "sources": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": "cc_payments"
    },
    "sourceConfigs": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "live.aggs": true,
        "max.idle.time": 20000,
        "format.timestamp": true,
        "show.bad.records": true,
        "kafka.offset.timeout": 10000,
        "max.query.time": 3600000,
        "max.size": 209715200
      }
    },
    "sourcesStats": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "0": {
          "begin": 0,
          "offsetBound": 541950,
          "bytesRead": 51972426,
          "recordsScanned": 541951,
          "end": 541950
        }
      }
    },
    "badRecordsTotal": 0,
    "recordsSkipped": 0,
    "results": 119,
    "recordsScanned": 541951,
    "duration": 18196,
    "startedAt": 1678901355694
  }
}
```

### Metadata <a href="#metadata" id="metadata"></a>

The payload format is:

```json
{
  "type": "METADATA",
  "data": {
    "fields": Array[String]
  }
}
```

### End <a href="#end" id="end"></a>

The payload format is:

```json
{
  "type": "END"
}
```

### Bad Record <a href="#bad-record" id="bad-record"></a>

The payload format is:

```json
{
  "type": "BADRECORD",
  "data": {
    "metadata": {
      "offset": Optional[Long],
      "partition": Optional[Long],
      "timestamp": Optional[Long],
      "table": String,
      "sourceId": String
    }
  }
}
```

### Sentinel <a href="#sentinel" id="sentinel"></a>

The payload format is:

```json
{
  "type": "SENTINEL",
  "data": {
    "event": String,
    "reason": {
      "type": String,
      "timeout":  Optional[Long],
      "source":   String,
      "sourceId": String,
      "amount":Long        
    }
  }
}
```

### Record <a href="#record" id="record"></a>

Given the `type` RECORD the payloads contains the data returned to the client, and it is dependent on the SQL input. The payload format is:

```json
{
  "type": "RECORD",
  "data": {
    "fields": <dynamic>,
    "value": <dynamic>,
    "metadata": <dynamic>,
    "rownum": Optional[Long]
  }
}
```

| Value         | Description                                                                                                                                                                            |
| ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| data.key      | Returns the underlying Kafka message key value. Only applied when `*` projection is used. When a key projection is used (i.e. \_key.MMSI), the value is returned as `data.value.MMSI`. |
| data.data     | Will contain the output generated by the SQL projections. If `*` is used for the SQL projection, it will return the Kafka message value structure.                                     |
| data.metadata | Returned when no aggregations are involved. It contains the Kafka message partition, offset, byte size information                                                                     |
| data.rownum   | Reserved optional long value.                                                                                                                                                          |

## Examples <a href="#examples" id="examples"></a>

For a topic which contains nested data for both key and value.

```sql
SELECT * FROM sea_vessel_position_reports
```

The payload format is:

```json
{
  "type": "RECORD",
  "data": {
    "key": {
      "MMSI": 219005662
    },
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}
```

If the Kafka message key is null the response would be:

```json
{
  "type": "RECORD",
  "data": {
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}
```

Projecting the key will not return the `key` attribute:

```sql
SELECT _key.MMSI FROM sea_vessel_position_reports
```

produces the following output:

```json
{
  "type": "RECORD",
  "data": {
    "value": {
      "MMSI": 265527470
    },
    "metadata": {
      "offset": 86,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:29.107Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 86
  }
}
```

Aggregating data will not return the `key` and `metadata` attributes:

```sql
SELECT count(*), currency 
FROM cc_payments
GROUP BY currency
```

```json
{
  "type": "RECORD",
  "data": {
    "value": {
      "count": 20066,
      "currency": "NOR"
    },
    "rownum": 90
  },
  "rowId": "fc767952-97e5-46cf-868c-7cbfbcde21fb",
  "statementIndex": null
}
```
