Projections

This page describes using projections for data in Kafka with Lenses SQL Processors.

A projection represents the ability to project a given input value onto a target location in an output record. Projections are the main building block of SELECT statements.

A projection is composed of several parts. Projections have a source section that allows one to select a specific value and to ensure that it will be present in the output, in the desired structure and with the desired name, as described by the target section.

In the below query:

INSERT INTO target-topic
SELECT STREAM
    CONCAT('a', 'b') AS result1
    , field4
    , (1 + field1) AS _key.a
    , _key.field2 AS result3
    ,  5 + 7 AS constantField
    ,  CASE
        WHEN field3 = 'Robert' THEN 'It's bobby'
        WHEN field3 = 'William' THEN 'It's willy'
        ELSE 'Unknown'
       END AS who_is_it
FROM input-topic;

These are projections:

  • CONCAT(‘a’, ‘b’) as result1

  • field4

  • (1 + field1) as _key.a

  • _key.field2 as result3

  • 5 + 7 as constantField

  • CASE … as who_is_it

It is worth highlighting that projections themselves are stateless. While the calculation of the source value could be stateful (depending on the type of expression being evaluated, the act of making the value available in the output is not a stateful operation.

Syntax

The precise syntax of a projection is:

 <expression> [as [<facet>.]<alias>]|[<facet>]]
|------------||-------------------------------|
    source                  target

In the above, [] indicates optional sections and | is to be read as OR.

  • Source:

    • <expression>: the source section must consist of a valid SQL expression, which will be evaluated to the value to be projected.

  • Target: this section is optional. When missing, the output target facet will be _value and the <alias> will be defaulted to the string representation of the source expression.

    • as: this is the Lenses SQL keyword that makes specifying the target of the projection explicit.

    • [<facet>.]<alias>]|[<facet>]: this nested section, which must be specified whenever as is used, is composed of two mutually exclusive sub-sections.

      • [<facet>.]<alias>]: this optional section is a string that will be used as the field name in the output record. Optionally, the output _facet_ where the field will be projected (e.g. _key or _value) can be also specified.

      • [<facet>]: this optional section specifies that the result of the projection will make up the whole of the indicated facet of the output record (e.g. the whole key or the whole value).

The above syntax highlights something important about the relationship between projections and facets: a projection can have a source facet and will always have a target facet and while they are related, they might not always be the same.

In the above example query:

  • field4 is a projection from value facet to value facet

  • (1 + field1) as _key.a is a projection from the value facet to the key facet

  • _key.field2 as result3 is a projection from the key facet to the value facet

  • 5 + 7 as constantField is a projection to value facet but with no source facet, as the expression does not depend on the input record

This makes SQL projections very expressive, but it also means that attention needs to be paid when facets are manipulated explicitly. Details, edge cases and implications of this will be discussed below.

Source of a Projection

Projection from a _value field

This is the default type of projection when a field is selected within the source expression. Unless otherwise specified, the source facet of a projection will always be _value.

In light of the above, the following query contains only projections that read from the _value facet of the source:

INSERT INTO target-topic
SELECT STREAM
    CONCAT('--> ', _value.field2, ' <--')
    , field1
    , LENGTH(field4)
    , _value.field1 AS aliased
    , 5 + field3
FROM input-topic;

Also, notice that field1 and _value.field1 as aliased are reading the exact same input field and in the former case the _value facet is implicit.

Projection from a key field

A projection can also access a selected field on the key facet.

INSERT INTO target-topic
SELECT STREAM
    CONCAT('--> ', _key.field2, ' <--')
    , _key.field1
    , LENGTH(_key.field4)
    , 5 + _key.field3
FROM input-topic;

The above query contains only projections that read from the _key facet of the source.

This kind of projection behaves exactly the same as any other projection, but because of specific interactions with other mechanics in Lenses SQL Engine Streaming mode, they can’t be used in the same query with Aggregations or Joins.

Projection from whole facets

All examples of projections described until now focused on selecting a field from either the key or the value of an input record. However, a projection can also read a whole facet and project it to the output.

INSERT INTO target-topic
SELECT STREAM
    _key AS old-key
    , _value AS old-value
FROM input-topic;

In the above query, there are two projections:

  • _key as old-key: This is projecting the whole key of input-topic onto a field old-key on target-topic

  • _value as old-value: This is projecting the whole value of input-topic onto a field old-value on target-topic

For more details about the rules around using aliases (as done in the above example).

This can be useful when the input source uses a primitive storage format for either one or both facets, but it is desirable to map such facets to named fields within a more complex output structure, as could be the case in the above query. This said projections from whole facets are supported for all storage formats, not only the primitive ones.

Mix and match

As it should be clear from all the examples on this page so far, projections can be freely mixed within a single SELECT statement; the same query can have many projections, some of which could be reading from the key of the input record, some others from the value and yet others returning literal constants.

Lenses SQL is designed to support this mixed usage and to calculate the appropriate resulting structure given the schemas of all the projections’ inputs.

Wildcard projections

Lenses SQL assigns a special meaning to * when used as a projection.

Unqualified Wildcard projection

When * is used without any further qualification, it is interpreted as an instruction to project all fields from _key_ and _value_ to the output.

INSERT INTO target-topic
SELECT STREAM *
FROM input-topic;

The result of this query is that target-topic will have exactly the same fields, schema and data than input-topic.

Qualified wildcard projection

When * is explicitly qualified, the meaning becomes more precise and it will limit the fields to be selected to only the ones belonging to the qualified source (and optionally facet).

INSERT INTO target-topic
SELECT STREAM
    i1.*
    , i2.field1
FROM input-topic1 AS i1 JOIN input-topic2 AS i2
WITHIN 1h;

The above shows how a qualified wildcard projection can be used to target all the fields of a specific source. Additionally, a qualified wildcard can be used in addition to other normal projections (e.g. i2.field1).

Target of a Projection

The target of a projection is the location within the output record where the result of the projection’s expression is going to be mapped. As previously mentioned, Lenses SQL uses the keyword as to explicitly control this.

Using as, it is possible to:

  • Assign an alias to the projected field in the result. For example, field1 as aliased-field1 is reading field1 and projecting its value to a field called aliased-field1. Notice that, as no facet information is specified, this will be targeting the value of the output record.

  • Project directly to nested fields within structures

INSERT INTO target-topic
SELECT STREAM
    field1 AS x.a
    , field2 AS x.b,
FROM input-topic;

The above query will result in a field x that is a structure that contains two fields a and b.

  • Control the facet to which the field is projected. For example, field1 as _key.field1 is reading field1 (from the value facet) and projecting to a field with the same name on the key of the output. Depending on the source being projected, doing this might have important implications.

  • Project over the whole target value or key For example, field1 as _value is reading field1 and projecting it over the whole value of the output. One important thing about this is that Lenses SQL allows only one projection of this kind per facet per query. What this means is that the following query would be invalid, because only one projection can target _value within the same query:

INSERT INTO target-topic
SELECT STREAM  
    field1 AS _value
    , field2 AS _value,
FROM input-topic;

Rules for nested aliases

In order to avoid potential errors, Lenses defines the following rules for defining aliases:

  • Alias can’t add new fields to non-struct properties.

    • e.g: an_int AS foo.bar, field1 AS foo.bar.field1 (since bar will be an INT we can’t define a property field1 under it)

  • Aliases cannot override previously defined properties.

    • e.g: field1 AS foo.bar.field1, field2 as foo.bar (setting foo.bar with the contents of field2 would override the value of field1)

  • Fields cannot have duplicated names.

    • e.g: field1 AS foo.bar, field2 as foo.bar (setting foo.bar with the contents of field2 would override it’s previous content)

Implications of projecting on key

Projecting on Key is a feature that can be useful in situations where it is desirable to quickly change the key of a Table or a Stream, maybe in preparation for further operations (e.g. joins etc…). This feature is sometimes referred to as re-keying within the industry.

However, one important implication of using this feature is that Kafka uses the key to determine in what partition a record must be stored; by changing the key of the record, the resulting partitioning of the output topic might differ from one of the input topics. While there is nothing wrong with this, it is something that must be understood clearly when using this feature.

Filtering

Sometimes it is desirable to limit the input records to be projected based on some predicate.

For example, we might want to project field1 and field2 of input-topic onto output-topic, but only if field3 contains a specific value.

INSERT INTO target-topic
SELECT STREAM  
    field1
    , field2,
FROM input-topic
WHERE field3 = 'select_me';

This is the WHERE clause is used: to filter the input dataset by some predicate, applying the rest of the query only to records that match the predicate.

The syntax for this clause is simply WHERE <expression>, where <expression> is a valid arbitrarily nested Lenses SQL boolean expression.

INSERT INTO target-topic
SELECT STREAM  
    field1
    , field2,
FROM input-topic
WHERE (field3 = 'select_me' AND LENGTH(CONCAT(field1, field2)) >= 5)
    OR field4 = (field5 + 5);

Projections and Storage Formats

Projections have a close relationship with the storage format of their target.

By default, if a query contains more than one projection for the same facet, then that facet’s storage format will be a structure (which type of structure exactly depends on other factors that are not relevant here).

INSERT INTO target-topic
SELECT STREAM  
    field1 as result1
    , field2
FROM input-topic;

The above query will make target-topic’s value storage format a structure (e.g. AVRO or JSON) with two fields named: result1 and field2. The storage format for target-topic’s key will be the same as the input-topic’s, as there are no projections targeting that facet.

The storage format of the output can be explicitly changed by a projection, however. This will often be the case when a projection on a whole facet is used. Consider the following query:

INSERT INTO target-topic
SELECT STREAM  
    field1 AS result1
    , field2 AS _key,
FROM input-topic;  

In this case, target-topic’s value storage format will still be a structure, but its key will depend on field2’s schema. For example, if field2 is a string, then target-topic’s key will be changed to STRING (assuming it was not STRING already). The same behavior applies to the _value_ facet.

One example where this can be relevant is when a projection is used to map the result of a single field in the target topic. Consider the following query:

INSERT INTO target-topic
SELECT STREAM field1 AS _value
FROM input-topic;

This query will project field1 on the whole value facet, and this will result in a change of storage format as well. This behaviour is quite common when a single projection is used, because more often than not the desired output in such a scenario will be the content of the field rather than a structure with a single field.

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.