4.3

Snapshot Engine

Enhanced the information provided by the /api/ws/v2/sql/execute endpoint 

New fields and new message types were added, old messages and message formats remain unchanged.

Bad records are now streamed by the socket 

A new message type was added so that bad records (records that for some reason could not be read) can be streamed.

Bad records for kafka source will include the information shown below:

{
  "type": "BADRECORD",
  "data": {
    "metadata": {
      "timestamp": 1618843939222,
      "partition": 3,
      "offset": 5032,
      "sourceId": "4ceebad2-7c5d-4937-ab79-39c6be492aed",
      "table": "example_topic"
    }
  },
  "rowId": null,
  "statementIndex": null
}

Sentinel records added 

A sentinel record type was added. Currently these are used to indicate that a given query was abruptly stopped.

For each early termination reason, a different shape was added:

  • The scanned size is bigger than allowed amount

    {
      "type": "SENTINEL",
      "data": {
        "reason": {
          "type": "SCANNEDSIZE",
          "sourceId": "4ceebad2-7c5d-4937-ab79-39c6be492aed",
          "source": "example_topic",
          "amount": 1000
        },
        "event": "SOURCETERMINATION"
      },
      "rowId": null,
      "statementIndex": null
    }
    
  • An idle timeout was reached while retrieving data from a source

    {
      "type": "SENTINEL",
      "data": {
        "reason": {
          "type": "IDLETIMEOUT",
          "timeout": 0,
          "sourceId": "4ceebad2-7c5d-4937-ab79-39c6be492aed",
          "source": "example_topic"
        },
        "event": "SOURCETERMINATION"
      },
      "rowId": null,
      "statementIndex": null
    }
    
  • The query time for a given source was exceeded

    {
      "type": "SENTINEL",
      "data": {
        "reason": {
          "type": "COMPLETIONTIMEOUT",
          "timeout": 0,
          "sourceId": "4ceebad2-7c5d-4937-ab79-39c6be492aed",
          "source": "example_topic"
        },
        "event": "SOURCETERMINATION"
      },
      "rowId": null,
      "statementIndex": null
    }
    

New Stats fields 

Stats messages now include detailed information regarding the query run. The new message looks as follows:

{
  "type": "STATS",
  "data": {
    "id": "674de6ad-9fbd-478c-b9da-b050c5b2deec",
    "bytesRead": 81323,
    "sources": {
      "4ceebad2-7c5d-4937-ab79-39c6be492aed": "example_topic"
    },
    "sourceConfigs": {
      "4ceebad2-7c5d-4937-ab79-39c6be492aed": {
        "max.idle.time": 20000,
        "show.bad.records": true,
        "kafka.offset.timeout": 10000,
        "max.query.time": 3600000,
        "max.size": 209715200
      }
    },
    "sourcesStats": {
      "4ceebad2-7c5d-4937-ab79-39c6be492aed": {
        "0": {
          "badRecords": 1,
          "bytesRead": 92592,
          "recordsScanned": 41,
          "begin": 0,
          "end": 40,
          "offsetBound": 10
        }
      }
    },
    "badRecordsTotal": 1,
    "recordsSkipped": 0,
    "results": 1,
    "recordsScanned": 39,
    "duration": 1,
    "startedAt": 1618847917476
  },
  "rowId": null,
  "statementIndex": null
}
propertymeaning
$.sourceseach topic reference in the query is assigned a unique sourceId; the id to topic name mapping is defined in this object
$.sourceConfigsincludes the config options used to instantiate a given source
$.sourcesStatsan object containing, for each source, the corresponding collected stats
$.sourcesStats[‘sourceId’]the stats for a specific source per partition; for non kafka sources partition -1 is used
$.sourcesStats[‘sourceId’].[‘partition’]the object containing the list of stats collected for a given source partition; all fields are optional and not guaranteed to appear
$.sourcesStats[‘sourceId’].[‘partition’].badRecordscount of bad records read for the given source and partition
$.sourcesStats[‘sourceId’].[‘partition’].bytesReadtotal bytes scanned while reading a given source and partition
$.sourcesStats[‘sourceId’].[‘partition’].recordsScannedcount of all scanned records while reading a given source and partition
$.sourcesStats[‘sourceId’].[‘partition’].beginoffset of the first element scanned for a given source and partition
$.sourcesStats[‘sourceId’].[‘partition’].endoffset of the last element scanned for a given source and partition
$.sourcesStats[‘sourceId’].[‘partition’].offsetBoundthe last offset the snapshot engine expect to scan for a give source and partition
$.badRecordsTotalcount of read bad records

Remove support for skip.bad.records 

Since bad records are now streamable the behavior defined by setting SET skip.bad.records=true|false is no longer useful and is therefore no longer supported in the SNAPSHOT engine.

Added show.bad.records 

A new flag was added to define if the socket should stream/not stream the bad records read from the topic.