View the latest documentation 5.5
A projection represents the ability to project a given input value onto a target location in an output record. Projections are the main building block of SELECT statements.
SELECT
A projection is composed of several parts (see Syntax below).Projections have a source section that allows to select a specific value and to ensure that it will be present in the output, in the desired structure and with the desired name, as described by the target section.
In the below query:
INSERT INTO target-topic SELECT STREAM CONCAT('a', 'b') AS result1 , field4 , (1 + field1) AS _key.a , _key.field2 AS result3 , 5 + 7 AS constantField , CASE WHEN field3 = 'Robert' THEN 'It's bobby' WHEN field3 = 'William' THEN 'It's willy' ELSE 'Unknown' END AS who_is_it FROM input-topic;
These are projections:
It is worth highlighting that projections themselves are stateless. While the calculation of the source value could be stateful (depending on the type of expression being evaluated - see Syntax below), the act of making the value available in the output is not a stateful operation.
The precise syntax of a projection is:
<expression> [as [<facet>.]<alias>]|[<facet>]] |------------||-------------------------------| source target
In the above, [] indicates optional sections and | is to be read as OR.
[]
|
<expression>
_value
<alias>
as
[<facet>.]<alias>]|[<facet>]
[<facet>.]<alias>]
_facet_
_key
[<facet>]
The above syntax highlights something important about the relationship between projections and facets: a projection can have a source facet and will always have a target facet and while they are related, they might not always be the same.
In the above example query:
field4
(1 + field1) as _key.a
_key.field2 as result3
5 + 7 as constantField
This makes SQL projections very expressive, but it also means that attention needs to be paid when facets are manipulated explicitly. Details, edge cases and implications of this will be discussed below.
This is the default type of projection when a field is selected within the source expression. Unless otherwise specified, the source facet of a projection will always be _value.
In light of the above, the following query contains only projections that read from the _value facet of the source:
INSERT INTO target-topic SELECT STREAM CONCAT('--> ', _value.field2, ' <--') , field1 , LENGTH(field4) , _value.field1 AS aliased , 5 + field3 FROM input-topic;
Also, notice that field1 and _value.field1 as aliased are reading the exact same input field and in the former case the _value facet is implicit.
field1
_value.field1 as aliased
A projection can also access a selected field on the key facet.
INSERT INTO target-topic SELECT STREAM CONCAT('--> ', _key.field2, ' <--') , _key.field1 , LENGTH(_key.field4) , 5 + _key.field3 FROM input-topic;
The above query contains only projections that read from the _key facet of the source.
This kind of projections behave exactly the same as any other projection, but because of specific interactions with other mechanics in Lenses SQL Engine Streaming mode, they can’t be used in the same query with Aggregations or Joins.
All examples of projections described until now focused on selecting a field from either the key or the value of an input record. However, a projection can also read a whole facet and project it to the output.
INSERT INTO target-topic SELECT STREAM _key AS old-key , _value AS old-value FROM input-topic;
In the above query, there are two projections:
_key as old-key
input-topic
old-key
target-topic
_value as old-value
old-value
For more details about the rules around using aliases (as done in the above example), see below.
This can be useful when the input source uses a primitive storage format for either one or both facets, but it is desirable to map such facets to named fields within a more complex output structure, as it could be the case in the above query. This said, projections from whole facets are supported for all storage formats, not only the primitive ones.
As it should be clear by all examples in this page so far, projections can be freely mixed within a single SELECT statement; the same query can have many projections, some of which could be reading from the key of the input record, some others from the value and yet others returning literal constants.
Lenses SQL is designed to support this mixed usage and to calculate the appropriate resulting structure given the schemas of all the projections’ inputs.
Lenses SQL assigns a special meaning to * when used as a projection.
*
When * is used without any further qualification, it is interpreted as an instruction to project all fields from _key_ and _value_ to the output.
_key_
_value_
INSERT INTO target-topic SELECT STREAM * FROM input-topic;
The result of this query is that target-topic will have exactly the same fields, schema and data than input-topic.
When * is explicitly qualified, the meaning becomes more precise and it will limit the fields to be selected to only the ones belonging to the qualified source (and optionally facet).
INSERT INTO target-topic SELECT STREAM i1.* , i2.field1 FROM input-topic1 AS i1 JOIN input-topic2 AS i2 WITHIN 1h;
The above shows how a qualified wildcard projection can be used to target all the fields of a specific source. Additionally, a qualified wildcard can be used in addition to other normal projections (e.g. i2.field1).
i2.field1
The target of a projection is the location within the output record where the result of the projection’s expression is going to be mapped. As previously mentioned, Lenses SQL uses the keyword as to explicitly control this.
Using as, it is possible to:
field1 as aliased-field1
aliased-field1
INSERT INTO target-topic SELECT STREAM field1 AS x.a , field2 AS x.b, FROM input-topic;
The above query will result in a field x that is a structure and that contains two fields a and b.
x
a
b
field1 as _key.field1
field1 as _value
INSERT INTO target-topic SELECT STREAM field1 AS _value , field2 AS _value, FROM input-topic;
In order to avoid potential errors, Lenses defines the following rules for defining aliases:
an_int AS foo.bar, field1 AS foo.bar.field1
bar
INT
field1 AS foo.bar.field1, field2 as foo.bar
foo.bar
field2
field1 AS foo.bar, field2 as foo.bar
Projecting on Key is a feature that can be useful in situations where it is desirable to quickly change the key of a Table or a Stream, maybe in preparation for further operations (e.g. joins etc…). This feature is sometimes referred to as re-keying within the industry.
However, one important implication of using this feature is that Kafka uses the key to determine in what partition a record must be stored; by changing the key of the record, the resulting partitioning of the output topic might differ from the one of the input topic. While there is nothing wrong with this, it is something that must be understood clearly when using this feature.
Sometimes it is desirable to limit the input records to be projected based on some predicate.
For example, we might want to project field1 and field2 of input-topic onto output-topic, but only if field3 contains a specific value.
output-topic
field3
INSERT INTO target-topic SELECT STREAM field1 , field2, FROM input-topic WHERE field3 = 'select_me';
This is the WHERE clause is used: to filter the input dataset by some predicate, applying the rest of the query only to records that match the predicate.
WHERE
The syntax for this clause is simply WHERE <expression>, where <expression> is a valid arbitrarily nested Lenses SQL boolean expression.
WHERE <expression>
INSERT INTO target-topic SELECT STREAM field1 , field2, FROM input-topic WHERE (field3 = 'select_me' AND LENGTH(CONCAT(field1, field2)) >= 5) OR field4 = (field5 + 5);
Projections have a close relationship with the storage format of their target.
By default, if a query contains more than one projection for the same facet, then that facet’s storage format will be a structure (which type of structure exactly depends on other factors that are not relevant here).
INSERT INTO target-topic SELECT STREAM field1 as result1 , field2 FROM input-topic;
The above query will make target-topic’s value storage format a structure (e.g. AVRO or JSON) with two fields named: result1 and field2. The storage format for target-topic’s key will be the same as the input-topic’s, as there are no projections target that facet.
AVRO
JSON
result1
The storage format of the output can be explicitly changed by a projection, however. This will often be the case when a projection on a whole facet is used. Consider the following query:
INSERT INTO target-topic SELECT STREAM field1 AS result1 , field2 AS _key, FROM input-topic;
In this case, target-topic’s value storage format will still be a structure, but its key will depend on field2’s schema. For example, if field2 is a string, then target-topic’s key will be changed to STRING (assuming it was not STRING already). The same behavior applies to the _value_ facet.
STRING
One example where this can be relevant is when a projection is used to map the result of a single field in the target topic. Consider the following query:
INSERT INTO target-topic SELECT STREAM field1 AS _value FROM input-topic;
This query will project field1 on the whole value facet, and this will result in a change of storage format as well. This behavior is quite common when a single projection is used, because more often than not the desired output in such scenario will be the content of the field rather than a structure with a single field.
On this page