LogoLogo
HomeProductsDownload Community Edition
6.0
  • Lenses DevX
  • Kafka Connectors
6.0
  • Overview
  • What's New?
    • Version 6.0.5
      • Features / Improvements & Fixes
    • Version 6.0.4
      • Features / Improvements & Fixes
    • Version 6.0.3
      • Features / Improvements & Fixes
    • Version 6.0.2
    • Version 6.0.1
    • Version 6.0.0-la.2
      • Features / Improvements & Fixes
    • Version 6.0.0-la.1
      • Features / Improvements & Fixes
    • Version 6.0.0-la.0
      • Features / Improvements & Fixes
    • Version 6.0.0-alpha.20
      • Features / Improvements & Fixes
      • Helm
    • Version 6.0.0-alpha.19
      • Features / Improvements & Fixes
      • Helm
    • Version 6.0.0-alpha.18
      • Features / Improvements & Fixes
      • Helm
    • Version 6.0.0-alpha.17
      • Features / Improvements & Fixes
      • Helm
    • Version 6.0.0-alpha.16
    • Version 6.0.0-alpha.14
  • Getting Started
    • Setting Up Community Edition
      • Hands-On Walk Through of Community Edition
    • Connecting Lenses to your Kafka environment
      • Overview
      • Install
  • Deployment
    • Installation
      • Kubernetes - Helm
        • Deploying HQ
        • Deploying an Agent
      • Docker
        • Deploying HQ
        • Deploying an Agent
      • Linux
        • Deploying HQ
        • Deploying an Agent
    • Configuration
      • Authentication
        • Admin Account
        • Basic Authentication
        • SSO & SAML
          • Overview
          • Azure SSO
          • Google SSO
          • Keycloak SSO
          • Okta SSO
          • OneLogin SSO
          • Generic SSO
      • HQ
        • Configuration Reference
      • Agent
        • Overview
        • Provisioning
          • Overview
          • HQ
          • Kafka
            • Apache Kafka
            • Aiven
            • AWS MSK
            • AWS MSK Serverless
            • Azure EventHubs
            • Azure HDInsight
            • Confluent Cloud
            • Confluent Platform
            • IBM Event Streams
          • Schema Registries
            • Overview
            • AWS Glue
            • Confluent
            • Apicurio
            • IBM Event Streams Registry
          • Kafka Connect
          • Zookeeper
          • AWS
          • Alert & Audit integrations
          • Infrastructure JMX Metrics
        • Hardware & OS
        • Memory & CPU
        • Database
        • TLS
        • Kafka ACLs
        • Rate Limiting
        • JMX Metrics
        • JVM Options
        • SQL Processor Deployment
        • Logs
        • Plugins
        • Configuration Reference
  • User Guide
    • Environments
      • Create New Environment
    • Lenses Resource Names (LRNs)
    • Identity & Access Management
      • Overview
      • Users
      • Groups
      • Roles
      • Service Accounts
      • IAM Reference
      • Example Policies
    • Topics
      • Global Topic Catalogue
      • Environment Topic Catalogue
        • Finding topics & fields
        • Searching for messages
        • Inserting & deleting messages
        • Viewing topic metrics
        • Viewing topic partitions
        • Topic Settings
        • Adding metadata & tags to topics
        • Managing topic configurations
        • Approval requests
        • Downloading messages
        • Backup & Restore
    • SQL Studio
      • Concepts
      • Best practices
      • Filter by timestamp or offset
      • Creating & deleting Kafka topics
      • Filtering
      • Limit & Sampling
      • Joins
      • Inserting & deleting data
      • Aggregations
      • Metadata fields
      • Views & synonyms
      • Arrays
      • Managing queries
    • Applications
      • Connectors
        • Overview
        • Sources
        • Sinks
        • Secret Providers
      • SQL Processors
        • Concepts
        • Projections
        • Joins
        • Lateral Joins
        • Aggregations
        • Time & Windows
        • Storage format
        • Nullibility
        • Settings
      • External Applications
        • Registering via SDK
        • Registering via REST
    • Schemas
    • Monitoring & Alerting
      • Infrastructure Health
      • Alerting
        • Alert Reference
      • Integrations
      • Consumer Groups
    • Self Service & Governance
      • Data policies
      • Audits
      • Kafka ACLs
      • Kafka Quotas
    • Topology
    • Tutorials
      • SQL Processors
        • Data formats
          • Changing data formats
          • Rekeying data
          • Controlling AVRO record names and namespaces
          • Changing the shape of data
        • Filtering & Joins
          • Filtering data
          • Enriching data streams
          • Joining streams of data
          • Using multiple topics
        • Aggregations
          • Aggregating data in a table
          • Aggregating streams
          • Time window aggregations
        • Complex types
          • Unwrapping complex types
          • Working with Arrays
        • Controlling event time
      • SQL Studio
        • Querying data
        • Accessing headers
        • Deleting data from compacted topics
        • Working with JSON
    • SQL Reference
      • Expressions
      • Functions
        • Aggregate
          • AVG
          • BOTTOMK
          • COLLECT
          • COLLECT_UNIQUE
          • COUNT
          • FIRST
          • LAST
          • MAXK
          • MAXK_UNIQUE
          • MINK
          • MINK_UNIQUE
          • SUM
          • TOPK
        • Array
          • ELEMENT_OF
          • FLATTEN
          • IN_ARRAY
          • REPEAT
          • SIZEOF
          • ZIP_ALL
          • ZIP
        • Conditions
        • Conversion
        • Date & Time
          • CONVERT_DATETIME
          • DATE
          • DATETIME
          • EXTRACT_TIME
          • EXTRACT_DATE
          • FORMAT_DATE
          • FORMAT_TIME
          • FORMAT_TIMESTAMP
          • HOUR
          • MONTH_TEXT
          • MINUTE
          • MONTH
          • PARSE_DATE
          • PARSE_TIME_MILLIS
          • PARSE_TIME_MICROS
          • PARSE_TIMESTAMP_MILLIS
          • PARSE_TIMESTAMP_MICROS
          • SECOND
          • TIMESTAMP
          • TIME_MICROS
          • TIMESTAMP_MICROS
          • TIME_MILLIS
          • TIMESTAMP_MILLIS
          • TO_DATE
          • TO_DATETIME
          • TOMORROW
          • TO_TIMESTAMP
          • YEAR
          • YESTERDAY
        • Headers
          • HEADERASSTRING
          • HEADERASINT
          • HEADERASLONG
          • HEADERASDOUBLE
          • HEADERASFLOAT
          • HEADERKEYS
        • JSON
          • JSON_EXTRACT_FIRST
          • JSON_EXTRACT_ALL
        • Numeric
          • ABS
          • ACOS
          • ASIN
          • ATAN
          • CBRT
          • CEIL
          • COSH
          • COS
          • DEGREES
          • DISTANCE
          • FLOOR
          • MAX
          • MIN
          • MOD
          • NEG
          • POW
          • RADIANS
          • RANDINT
          • ROUND
          • SIGN
          • SINH
          • SIN
          • SQRT
          • TANH
          • TAN
        • Nulls
          • ISNULL
          • ISNOTNULL
          • COALESCE
          • AS_NULLABLE
          • AS_NON_NULLABLE
        • Obfuscation
          • ANONYMIZE
          • MASK
          • EMAIL
          • FIRST1
          • FIRST2
          • FIRST3
          • FIRST4
          • LAST1
          • LAST2
          • LAST3
          • LAST4
          • INITIALS
        • Offsets
        • Schema
          • TYPEOF
          • DUMP
        • String
          • ABBREVIATE
          • BASE64
          • CAPITALIZE
          • CENTER
          • CHOP
          • CONCAT
          • CONTAINS
          • DECODE64
          • DELETEWHITESPACE
          • DIGITS
          • DROPLEFT
          • DROPRIGHT
          • ENDSWITH
          • INDEXOF
          • LEN
          • LOWER
          • LPAD
          • MKSTRING
          • REGEXP
          • REGEX_MATCHES
          • REPLACE
          • REVERSE
          • RPAD
          • STARTSWITH
          • STRIPACCENTS
          • SUBSTR
          • SWAPCASE
          • TAKELEFT
          • TAKERIGHT
          • TRIM
          • TRUNCATE
          • UNCAPITALIZE
          • UPPER
          • UUID
        • User Defined Functions
        • User Defined Aggregate Functions
      • Deserializers
      • Supported data formats
        • Protobuf
  • Resources
    • Downloads
    • CLI
      • Environment Creation
    • API Reference
      • API Authentication
      • Websocket Spec
      • Lenses API Spec
        • Authentication
        • Environments
        • Users
        • Groups
        • Roles
        • Service Accounts
        • Meta
        • Settings
        • License
        • Topics
        • Applications
          • SQL Processors
          • Kafka Connectors
          • External Applications
        • Kafka ACLs & Quotas
        • Kafka Consumer Groups
        • Schema Registry
        • SQL Query Management
        • Data Policies
        • Alert Channels
        • Audit Channels
        • Provisioning State
        • Agent Metadata
        • Backup & Restore
        • As Code
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Setting up our example
  • Find big transactions
  • Filtering by date
  • Sampling with random functions

Was this helpful?

Export as PDF
  1. User Guide
  2. Tutorials
  3. SQL Processors
  4. Filtering & Joins

Filtering data

This page describes a tutorial to filter records in a Kafka topic with Lenses SQL Processors.

Filtering messages and copying them to a topic can be achieved using the WHERE clause.

Setting up our example

In our example, we have a topic where our application registers bank transactions.

We have a topic called payments where records have this shape:


KEY: "6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: {
  "amount": "12.10",
  "currency": "EUR",
  "from_account": "xxx",
  "to_account": "yyy", 
  "time": 1591970345000
}

We can replicate such a structure running the following query in SQL Studio:

CREATE TABLE payments(
    amount double
    , currency string
    , from_account string
    , to_account string
    , time datetime
)
FORMAT (string, avro);

Each event has a unique string key generated by the upstream system.

We can again use SQL Studio to insert some data to play wi

INSERT INTO payments(
    _key
    , amount
    , currency 
    , from_account
    , to_account 
    , time
)
VALUES
("6A461C60-02F3-4C01-94FB-092ECBDE0837", 10, "EUR", "account-1", "account-2", 1590970345000),
("E5DA60E8-F622-43B2-8A93-B958E01E8AB3", 100000, "EUR", "account-1", "account-3", 1591070346000),
("0516A309-FB2B-4F6D-A11F-3C06A5D64B68", 5300, "USD", "account-2", "account-3", 1591170347000),
("0871491A-C915-4163-9C4B-35DEA0373B41", 6500, "EUR", "account-3", "account-1", 1591270348000),
("2B557134-9314-4F96-A640-1BF90887D846", 300, "EUR", "account-1", "account-4", 1591370348000),
("F4EDAE35-45B4-4841-BAB7-6644E2BBC844", 3400, "EUR", "account-2", "account-1", 1591470349000),
("F51A912A-96E9-42B1-9AC4-42E923A0A6F8", 7500, "USD", "account-2", "account-3", 1591570352000),
("EC8A08F1-75F0-49C8-AA08-A5E57997D27A", 2500000, "USD", "account-1", "account-3", 1591670356000),
("9DDBACFF-D42B-4042-95AC-DCDD84F0AC32", 1000, "GBP", "account-2", "account-3", 1591870401000)
;

Find big transactions

Let’s assume we need to detect significant transactions that will be then fed into our anti-fraud system.

We want to copy those transactions into a new topic, maintaining the content of the records as it is.

For our first example, we will use a simple predicate to filter transactions with an amount larger than 5000, regardless of the currency.

Lenses SQL supports all the common comparison operators to compare values, so for our goal it is enough to use a WHERE statement with a >= condition:

SET defaults.topic.autocreate=true;

INSERT INTO big_payments
SELECT STREAM *
FROM payments
WHERE amount >= 5000

Checking the records emitted by the processor, we see that we got the transactions we were looking for.

Because of the * projection, records content has not changed.

KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, ... }
------------------------------------
KEY:"0516A309-FB2B-4F6D-A11F-3C06A5D64B68"
VALUE: { amount:5300, ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, ... }

Not all currencies are the same, so we would like to add a specific threshold for each currency. As a first cut, we combine multiple conditions with ANDs and ORs:

SET defaults.topic.autocreate=true;

INSERT INTO big_eur_usd_payments
SELECT STREAM *
FROM payments
WHERE
    (amount >= 5000 AND currency = 'EUR') OR
    (amount >= 5500 AND currency = 'USD')

As an improvement, we want to capture the threshold for each currency in a single expression. We will use a CASE statement for that:

SET defaults.topic.autocreate=true;

INSERT INTO big_payments_case
SELECT STREAM *
FROM payments
WHERE
    amount >= (CASE 
        WHEN currency = 'EUR' THEN 5000
        WHEN currency = 'USD' THEN 5500
        WHEN currency = 'GBP' THEN 4500
        ELSE 5000
    END)

getting the results:

KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, currency:"EUR", ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, currency:"USD", ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", ... }

Filtering by date

In this section, we will find all the transactions that happened during the (UTC) night. To do that we can use one of our many date and time functions.

You will also see how to use a CAST expression to convert from one type to another.

SET defaults.topic.autocreate=true;

INSERT INTO night_payments
SELECT STREAM *
FROM payments
WHERE
    CAST(HOUR(time) AS INT) >= 0 AND
    CAST(HOUR(time) AS INT) <= 5 

Checking the output, we can see that only one transaction satisfied our predicate:

KEY:"6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: { amount:10, currency:"EUR", time:1590970345000, ... }
------------------------------------
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", time:1591070346000, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", time:1591670356000, ... }

Sampling with random functions

Let’s imagine that we have to build some intelligence around all the payments we process, but we do not have the capacity and the need to process all of them.

We decided then to build a reduced copy of the payments topic, with only 10% of the original records.

To do that we are going to use our RANDINT function:

SET defaults.topic.autocreate=true;

INSERT INTO payments_sample
SELECT STREAM *
FROM payments
WHERE CAST(ABS(RANDINT()) AS DOUBLE) / 2147483647 < 0.01

RANDINT generates a random integer, we take its absolute value to make sure it is positive, and we then normalise the result dividing it by the max possible integer, getting an (almost) uniform sample of numbers between 0 and 1.

We have to CAST to double on the way; otherwise, the division would be between integers, and the result would always be 0.

PreviousFiltering & JoinsNextEnriching data streams

Last updated 6 months ago

Was this helpful?