Best practices

Does Apache Kafka have indexing?

No. Apache Kafka does not have the full indexing capabilities in the payload (indexes typically come at a high cost even on an RDBMS / DB or a system like Elastic Search), however, Kafka indexes the metadata.

The only filters Kafka supports are topic, partition and offsets or timestamps.

When querying Kafka topic data with SQL such as

SELECT *
FROM topicA
WHERE transaction_id=123

a full scan will be executed, and the query processes the entire data on that topic to identify all records that match the transaction id.

If the Kafka topic contains a billion 50KB messages - that would require querying 50 GB of data. Depending on your network capabilities, brokers’ performance, any quotas on your account, and other parameters, fetching 50 GB of data could take some time! Even more, if the data is compressed. In the last case, the client has to decompress it before parsing the raw bytes to translate into a structure to which the query can be applied.

Query by partition, offset or timestamp to avoid full scans.

Understanding bad records

When Lenses can’t read (deserialize) your topic’s messages, it classifies them as “bad records”. This happens for one of the following reasons:

  • Kafka records are corrupted. On an AVRO topic, a rogue producer might have published a different format

  • Lenses topic settings do not match the payload data. Maybe a topic was incorrectly given AVRO format when it’s JSON or vice versa

  • If AVRO payload is involved, maybe the Schema Registry is down or not accessible from the machine running Lenses

By default, Lenses skips them and displays the records’ metadata in the Bad Records tab. If you want to force stop the query in such case use:

SET skip.bad.records=false;
SELECT * FROM topicA LIMIT 100

Control execution

Querying a table can take a long time if it contains a lot of records. The underlying Kafka topic has to be read, the filter conditions applied, and the projections made.

Additionally, the SELECT statement could end up bringing a large amount of data to the client. To be able to constrain the resources involved, Lenses allows for context customization, which ends up driving the execution, thus giving control to the user. Here is the list of context parameters to overwrite:

All the above values can be given a default value via the configuration file. Using lenses.sql.settings as prefix the format.timestamp can be set like this:

lenses.sql.settings.format.timestamp=true

Query tuning

Lenses SQL uses Kafka Consumer to read the data. This means that an advanced user with knowledge of Kafka could tweak the consumer properties to achieve better throughput. This would occur on very rare occasions. The query context can receive Kafka consumer settings. For example, the max.poll.records consumer can be set as:

SET max.poll.records= 100000;

SELECT *
FROM payments
LIMIT 1000000

Example

The fact is that streaming SQL is operating on unbounded streams of events: a query would normally be a never-ending query. In order to bring query termination semantics into Apache Kafka we introduced 4 controls:

  • LIMIT = 10000 - Force the query to terminate when 10,000 records are matched.

  • max.bytes = 20000000 - Force the query to terminate once 20 MBytes have been retrieved.

  • max.time = 60000 - Force the query to terminate after 60 seconds.

  • max.zero.polls = 8 - Force the query to terminate after 8 consecutive polls are empty, indicating we have exhausted a topic.

Thus, when retrieving data, you can set a limit of 1GB to the maximum number of bytes retrieved and a maximum query time of one hour like this:

SET max.bytes = 1000000000;
SET max.time = 60000000;

SELECT * 
FROM topicA 
WHERE customer.id = "XXX";

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.