Websocket Spec

Data Access - Query Data via SQL

A WebSocket allows the client to request data using SQL as input.

Use the following URL to open a WebSocket connection. Depending on deployment, use ws (not-secure) or wss (secure) connection:

WS | WSS ${LENSES_URL}/api/ws/v2/sql/execute

Once the connection is established, the server expects the client to send a JSON message with the following structure:

{
  "token": String,
  "sql": String,
  "live": Optional[Boolean],
  "stats": Optional[Int]
}

The client should wait for incoming messages and stop when EOF is received. The server sends JSON-encoded messages, the payload structure is:

{
  "type": <Enumeration>,
  "data": <Dependent on the type field>
}

Since there are multiple types of messages the server sends, the type attribute determines the data structure for the data payload. Below is the list of possible type values:

Message types

Heartbeat

The data the attribute is null. The client should ignore these messages.

{
  "type": "HEARTBEAT"
}

Error

In this case, the payload structure is:

{
  "type": "HEARTBEAT",
  "data": String
}

Stats

When requested the payload format is:

{
  "type": "HEARTBEAT",
  "data":   {
    id:              String,
    bytesRead:       Long,
    sources:         Map[UUID, String],
    sourceConfigs:   Map[UUID, Map[String, Any]],
    sourcesStats:    Map[UUID, Map[Int, Map[String, Long]]],
    badRecordsTotal: Long,
    recordsSkipped:  Long,
    results:         Long,
    recordsScanned:  Long,
    duration:        Long,
    startedAt:       Long,
}

A possible payload can be:

{
  "type": "STATS",
  "data": {
    "id": "6a29ef99-3914-4640-b0dc-ec4a07c7ffc5",
    "bytesRead": 51972426,
    "sources": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": "cc_payments"
    },
    "sourceConfigs": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "live.aggs": true,
        "max.idle.time": 20000,
        "format.timestamp": true,
        "show.bad.records": true,
        "kafka.offset.timeout": 10000,
        "max.query.time": 3600000,
        "max.size": 209715200
      }
    },
    "sourcesStats": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "0": {
          "begin": 0,
          "offsetBound": 541950,
          "bytesRead": 51972426,
          "recordsScanned": 541951,
          "end": 541950
        }
      }
    },
    "badRecordsTotal": 0,
    "recordsSkipped": 0,
    "results": 119,
    "recordsScanned": 541951,
    "duration": 18196,
    "startedAt": 1678901355694
  }
}

Metadata

The payload format is:

{
  "type": "METADATA",
  "data": {
    "fields": Array[String]
  }
}

End

The payload format is:

{
  "type": "END"
}

Bad Record

The payload format is:

{
  "type": "BADRECORD",
  "data": {
    "metadata": {
      "offset": Optional[Long],
      "partition": Optional[Long],
      "timestamp": Optional[Long],
      "table": String,
      "sourceId": String
    }
  }
}

Sentinel

The payload format is:

{
  "type": "SENTINEL",
  "data": {
    "event": String,
    "reason": {
      "type": String,
      "timeout":  Optional[Long],
      "source":   String,
      "sourceId": String,
      "amount":Long        
    }
  }
}

Record

Given the type RECORD the payloads contains the data returned to the client, and it is dependent on the SQL input. The payload format is:

{
  "type": "RECORD",
  "data": {
    "fields": <dynamic>,
    "value": <dynamic>,
    "metadata": <dynamic>,
    "rownum": Optional[Long]
  }
}

Examples

For a topic which contains nested data for both key and value.

SELECT * FROM sea_vessel_position_reports

The payload format is:

{
  "type": "RECORD",
  "data": {
    "key": {
      "MMSI": 219005662
    },
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}

If the Kafka message key is null the response would be:

{
  "type": "RECORD",
  "data": {
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}

Projecting the key will not return the key attribute:

SELECT _key.MMSI FROM sea_vessel_position_reports

produces the following output:

{
  "type": "RECORD",
  "data": {
    "value": {
      "MMSI": 265527470
    },
    "metadata": {
      "offset": 86,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:29.107Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 86
  }
}

Aggregating data will not return the key and metadata attributes:

SELECT count(*), currency 
FROM cc_payments
GROUP BY currency
{
  "type": "RECORD",
  "data": {
    "value": {
      "count": 20066,
      "currency": "NOR"
    },
    "rownum": 90
  },
  "rowId": "fc767952-97e5-46cf-868c-7cbfbcde21fb",
  "statementIndex": null
}

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.