4.3

You are viewing documentation for an older version of Lenses.io View latest documentation here

Examples

The Docker container has been setup to create and produce data to a handful of Kafka topics. The producers (data generators) are enabled by default. In order to disable the examples from being executed set the environment variable -e SAMPLEDATA=0 in the docker run command.

Build SQL Streaming Flows for Kafka 

Connectors 

The docker ships with a collection of source and sink Kafka Connectors. There is one Kafka Connect Worker with all the connectors pre-set in the classpath so they are ready to be used.

From the UI go to Connectors > New Connector

Follow the instructions of each connector to launch.

SQL Processors 

From the UI go to SQL Processors > New Processor

Here is few examples for SQL Processors for the generated data.

SET defaults.topic.autocreate = true;
INSERT INTO position_reports_Accurate
SELECT STREAM * FROM sea_vessel_position_reports
WHERE Accuracy = true
SET defaults.topic.autocreate = true;
INSERT INTO position_reports_latitude_filter
SELECT STREAM Speed, Heading, Latitude, Longitude, Radio
FROM sea_vessel_position_reports
WHERE Latitude > 58
SET defaults.topic.autocreate = true;
INSERT INTO position_reports_MMSI_large
SELECT STREAM *
FROM sea_vessel_position_reports
WHERE MMSI > 100000
SET defaults.topic.autocreate = true;
INSERT INTO backblaze_smart_result
SELECT STREAM (smart_1_normalized + smart_3_normalized) AS sum1_3_normalized, serial_number
FROM backblaze_smart
WHERE _key.serial_number LIKE 'Z%'
SET defaults.topic.autocreate=true;
SET auto.offset.reset='earliest';
SET commit.interval.ms='10000';

INSERT INTO cc_data_json
  STORE VALUE AS JSON
SELECT STREAM * FROM cc_data;

INSERT INTO cc_payments_json
  STORE VALUE AS JSON
SELECT STREAM * FROM cc_payments;

WITH tableCards AS (
  SELECT TABLE
    number,
    customerFirstName,
    customerLastName,
    blocked
  FROM cc_data_json
);

WITH joined AS (
  SELECT STREAM
    p.merchantId,
	p.creditCardId,
	p.currency
  FROM cc_payments_json AS p JOIN tableCards AS c
  WHERE c.blocked = false
);

INSERT INTO frauds_detection_5
  STORE VALUE AS JSON
SELECT STREAM
  count(*) as attempts
FROM joined
WINDOW BY TUMBLE 5s
GROUP BY joined.merchantId;