WebSocket API


Data Access - Query Data via SQL 

A websocket allowing the client to request data using SQL as input. The information on the Lenses SQL Snapshot engined can be read here.

Use the following URL to open a websocket connection. Depending on deployment, use ws (not-secure) or wss (secure) connection:

WS | WSS ${LENSES_URL}/api/ws/v2/sql/execute

Once the connection is established, the server expects the client to send a JSON message with the following structure:

{
  "token": String,
  "sql": String,
  "live": Optional[Boolean],
  "stats": Optional[Int]
}
FieldDescriptionType
tokenHow often to refresh kafka topic list and configslong
sqlThe SQL query used to fetch datastring
liveIf set, enables partial results for aggregation queries.optional[boolean]
statsMillis interval to receive query stats. If not provided the stats information is not sentoptional[int]

The client should wait for incoming messages and stop when EOF is received. The server sends JSON encoded messages, the payload structure is:

{
  "type": <Enumeration>,
  "data": <Dependent on the type field>
}

Since there are multiple types of messages the server sends, the type attribute determines the data structure for the data payload. Below is the list of possible type values:

ValueDescription
HEARTBEATTo keep the connection alive, the server injects an empty message.
RECORDThe message represents a data record.
ERRORThe message contains information about an error that occurred.
STATSThe message contains information about the current execution.Only activated if the stats field in the request is present.
METADATAThe message contains a list of metadata fields each record message will contain. For example: offset, partition,timestamp, __keysize, __valueSize, valueSchemaId.
SCHEMAReserved.
PAGE_ENDReserved.
ENDThe message signals the end of the execution. The server will close the socket after this message is sent.
BADRECORDThe message contains information about the Kafka message which cannot be read. For example, if the topic expects Avro and the payload is not a valid Avro.
SENTINELThe message contains information about the execution termination reason when query thresholds are reached.

Message types 

Heartbeat 

The data attribute is null. The client should ignore these messages.

{
  "type": "HEARTBEAT"
}

Error 

In this case the payload structure is:

{
  "type": "HEARTBEAT",
  "data": String
}

Stats 

When requested the payload format is:

{
  "type": "HEARTBEAT",
  "data":   {
    id:              String,
    bytesRead:       Long,
    sources:         Map[UUID, String],
    sourceConfigs:   Map[UUID, Map[String, Any]],
    sourcesStats:    Map[UUID, Map[Int, Map[String, Long]]],
    badRecordsTotal: Long,
    recordsSkipped:  Long,
    results:         Long,
    recordsScanned:  Long,
    duration:        Long,
    startedAt:       Long,
}

A possible payload can be:

{
  "type": "STATS",
  "data": {
    "id": "6a29ef99-3914-4640-b0dc-ec4a07c7ffc5",
    "bytesRead": 51972426,
    "sources": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": "cc_payments"
    },
    "sourceConfigs": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "live.aggs": true,
        "max.idle.time": 20000,
        "format.timestamp": true,
        "show.bad.records": true,
        "kafka.offset.timeout": 10000,
        "max.query.time": 3600000,
        "max.size": 209715200
      }
    },
    "sourcesStats": {
      "ad1afc73-3dec-41e7-b679-aa35c55e3328": {
        "0": {
          "begin": 0,
          "offsetBound": 541950,
          "bytesRead": 51972426,
          "recordsScanned": 541951,
          "end": 541950
        }
      }
    },
    "badRecordsTotal": 0,
    "recordsSkipped": 0,
    "results": 119,
    "recordsScanned": 541951,
    "duration": 18196,
    "startedAt": 1678901355694
  }
}

Metadata 

The payload format is:

{
  "type": "METADATA",
  "data": {
    "fields": Array[String]
  }
}

End 

The payload format is:

{
  "type": "END"
}

Bad Record 

The payload format is:

{
  "type": "BADRECORD",
  "data": {
    "metadata": {
      "offset": Optional[Long],
      "partition": Optional[Long],
      "timestamp": Optional[Long],
      "table": String,
      "sourceId": String
    }
  }
}

Sentinel 

The payload format is:

{
  "type": "SENTINEL",
  "data": {
    "event": String,
    "reason": {
      "type": String,
      "timeout":  Optional[Long],
      "source":   String,
      "sourceId": String,
      "amount":Long        
    }
  }
}

Record 

Given the type RECORD the payloads contains the data returned to the client, and it is dependent on the SQL input. The payload format is:

{
  "type": "RECORD",
  "data": {
    "fields": <dynamic>,
    "value": <dynamic>,
    "metadata": <dynamic>,
    "rownum": Optional[Long]
  }
}
ValueDescription
data.keyReturns the underlying Kafka message key value. Only applied when * projection is used. When a key projection is used (i.e. _key.MMSI), the value is returned as data.value.MMSI.
data.dataWill contain the output generated by the SQL projections. If * is used for the SQL projection, it will return the Kafka message value structure.
data.metadataReturned when no aggregations are involved. It contains the Kafka message partition, offset, byte size information
data.rownumReserved optional long value.

Examples 

For a topic which contains nested data for both key and value.

SELECT * FROM sea_vessel_position_reports

The payload format is:

{
  "type": "RECORD",
  "data": {
    "key": {
      "MMSI": 219005662
    },
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}

If the Kafka message key is null the response would be:

{
  "type": "RECORD",
  "data": {
    "value": {
      "Type": 1,
      "Repeat": 0,
      "MMSI": 219005662,
      "Speed": 0.1,
      "Accuracy": false,
      "Longitude": 12.570543333333333,
      "Latitude": 55.85040166666667,
      "location": "55.850402,12.570543",
      "Course": 177.9,
      "Heading": 511,
      "Second": 20,
      "RAIM": false,
      "Radio": 33577,
      "Status": 3,
      "Turn": -128.0,
      "Maneuver": 0,
      "Timestamp": 1491318144456507320
    },
    "metadata": {
      "offset": 71,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:24.267Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 71
  }
}

Projecting the key will not return the key attribute:

SELECT _key.MMSI FROM sea_vessel_position_reports

produces the following output:

{
  "type": "RECORD",
  "data": {
    "value": {
      "MMSI": 265527470
    },
    "metadata": {
      "offset": 86,
      "partition": 1,
      "timestamp": "2023-03-13T11:42:29.107Z",
      "__keysize": 10,
      "__valuesize": 79,
      "keySchemaId": 4,
      "valueSchemaId": 5
    },
    "rownum": 86
  }
}

Aggregating data will not return the key and metadata attributes:

SELECT count(*), currency 
FROM cc_payments
GROUP BY currency
{
  "type": "RECORD",
  "data": {
    "value": {
      "count": 20066,
      "currency": "NOR"
    },
    "rownum": 90
  },
  "rowId": "fc767952-97e5-46cf-868c-7cbfbcde21fb",
  "statementIndex": null
}
--
Last modified: September 26, 2024