Automate SQL processors creation


In this tutorial, we will see how to automate the process for creating SQL processors.

Setting up our example 

The requirement is to automate an SQL processor’s deployment, including creating the topic and associated data schema. We have an input topic, let’s call it ‘input,’ the topic message key is text(i.e., STRING), and the message value is an Avro with this schema:

{
  "type": "record",
  "name": "ConnectDefault",
  "namespace": "io.lenses.connect.avro",
  "fields": [
    {
      "name": "EVENTID",
      "type": "string"
    },
    {
      "name": "EVENTDATETIME",
      "type": "string"
    },
    {
      "name": "CLIENTID",
      "type": "string"
    },
    {
      "name": "CUSTOMERCATEGORY",
      "type": "string"
    },
    {
      "name": "CUSTOMERID",
      "type": "string"
    },
    {
      "name": "ENTITYID",
      "type": "string"
    },
    {
      "name": "EVENTCORRELATIONID",
      "type": ["null", "string"],
      "default": null
    },
    {
      "name": "EVENTSOURCE",
      "type": "string"
    },
    {
      "name": "SUPERPRODUCTID",
      "type": "string"
    },
    {
      "name": "TENANTID",
      "type": "string"
    },
    {
      "name": "TREATMENTCODE",
      "type": ["null", "string"],
      "default": null
    },
    {
      "name": "SAPREASONCODE",
      "type": "string"
    },
    {
      "name": "TRXAMOUNT",
      "type": "string"
    },
    {
      "name": "TRXCODE",
      "type": "string"
    },
    {
      "name": "TRXDESCRIPTION",
      "type": "string"
    },
    {
      "name": "SourceType",
      "type": ["null", "string"],
      "default": null
    }
  ]
}

Then we have the SQL processor code to rename and re-align the fields:

SET defaults.topic.autocreate=true;

INSERT INTO output1
SELECT STREAM
    EVENTID as EventID
    , EVENTDATETIME as EventDateTime
    , EVENTSOURCE as EventSource
    , TENANTID as TenantId
    , ENTITYID as EntityId
    , CUSTOMERID as CustomerID
    , CLIENTID as ClientID
    , SUPERPRODUCTID as SuperProductID
    , CUSTOMERCATEGORY as CustomerCategory
    , SAPREASONCODE as SAPReasonCode
    , TRXAMOUNT as TrxAmount
    , TRXDESCRIPTION as TrxDescription
    , TREATMENTCODE as TreatmentCode
    , SourceType
FROM input

Step 1 - Create the security group 

Navigate to the Admin section using the top bar navigation. From the left navigation system, navigate to Groups and add this group.

Content of Lenses group screen

Step 2 - Create the service account 

From there follow the left navigation system to go to Service accounts page, and add a new entry:

Content of service account screen

Grab the token provided for the service account; step 3 will use it.

Step 3 - Create the topic and the schema 

Lenses SQL processors require schema even if they are not Avro. Creating a topic that uses AVRO for the message key or value will first register it with Schema Registry.

First, let’s create the request payload. Save this JSON to a file named body.json

{
  "name": "input",
  "replication": 1,
  "partitions": 3,
  "configs": {},
  "format": {
    "key": {
      "format": "STRING"
    },
    "value": {
      "format": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.lenses.connect.avro\",\"fields\":[{\"name\":\"EVENTID\",\"type\":\"string\"},{\"name\":\"EVENTDATETIME\",\"type\":\"string\"},{\"name\":\"CLIENTID\",\"type\":\"string\"},{\"name\":\"CUSTOMERCATEGORY\",\"type\":\"string\"},{\"name\":\"CUSTOMERID\",\"type\":\"string\"},{\"name\":\"ENTITYID\",\"type\":\"string\"},{\"name\":\"EVENTCORRELATIONID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EVENTSOURCE\",\"type\":\"string\"},{\"name\":\"SUPERPRODUCTID\",\"type\":\"string\"},{\"name\":\"TENANTID\",\"type\":\"string\"},{\"name\":\"TREATMENTCODE\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SAPREASONCODE\",\"type\":\"string\"},{\"name\":\"TRXAMOUNT\",\"type\":\"string\"},{\"name\":\"TRXCODE\",\"type\":\"string\"},{\"name\":\"TRXDESCRIPTION\",\"type\":\"string\"},{\"name\":\"SourceType\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
    }
  }
}

Paste the content after running this command

cat > body.json
[Paste the JSON above]
[Hit Enter then CTRL+C]

Lenses authentication and authorization require a token passed via X-Kafka-Lenses-Token. Since we created the service account entry, this token is already available.

curl --header "Content-Type: application/json" \
  --header "X-Kafka-Lenses-Token: demo-sa:48cd653d-1889-42b6-9f38-6e5116db691a" \
  -i -X POST \
  --data @body.json \
  http://localhost:24015/api/v1/kafka/topic

Step 3 - Create the SQL processor 

It is assumed the SQL processor deployment mode is IN_PROC, the development mode. Lenses supports KUBERNETES for its production mode. For more details about the other modes, see SQL details.

Before we issue the CURL command, let’s create the request body JSON.

{
  "name": "myProcessor",
  "runnerCount": 1,
  "sql": "SET defaults.topic.autocreate=true;\n\nINSERT INTO output1\nSELECT STREAM \n    EVENTID as EventID\n    , EVENTDATETIME as EventDateTime\n    , EVENTSOURCE as EventSource\n    , TENANTID as TenantId\n    , ENTITYID as EntityId\n    , CUSTOMERID as CustomerID\n    , CLIENTID as ClientID\n    , SUPERPRODUCTID as SuperProductID\n    , CUSTOMERCATEGORY as CustomerCategory\n    , SAPREASONCODE as SAPReasonCode\n    , TRXAMOUNT as TrxAmount\n    , TRXDESCRIPTION as TrxDescription\n    , TREATMENTCODE as TreatmentCode\n    , SourceType\nFROM input",
  "settings": {}
}
cat > processor.json
[Paste the json above]
[Hit Enter then CTRL+C]
curl --header "Content-Type: application/json" \
  --header "X-Kafka-Lenses-Token: demo-sa:48cd653d-1889-42b6-9f38-6e5116db691a" \
  -i -X POST \
  --data @processor.json \
  http://localhost:24015/api/v1/streams

Result and Conclusion 

Once the last curl command is executed, Lenses will have the processor ready for processing your data:

Content of Lenses SQL processor

In this tutorial, you learned how to automate your data pipelines and even integrate your CI/CD with Lenses to create your SQL processors automatically.

  • Create a topic and its associated schema via the Lenses HTTP endpoints
  • Create the SQL processor via the Lenses HTTP endpoints

Good luck and happy streaming!

--
Last modified: April 17, 2024