A projection represents the ability to project a given input value onto a
target location in an output record. Projections are the main building block of
A projection is composed of several parts (see Syntax below).Projections have a source section that allows to select a specific value and to ensure that it will be present in the output, in the desired structure and with the desired name, as described by the target section.
In the below query:
INSERT INTO target-topic SELECT STREAM CONCAT('a', 'b') AS result1 , field4 , (1 + field1) AS _key.a , _key.field2 AS result3 , 5 + 7 AS constantField , CASE WHEN field3 = 'Robert' THEN 'It's bobby' WHEN field3 = 'William' THEN 'It's willy' ELSE 'Unknown' END AS who_is_it FROM input-topic;
These are projections:
- CONCAT(‘a’, ‘b’) as result1
- (1 + field1) as _key.a
- _key.field2 as result3
- 5 + 7 as constantField
- CASE … as who_is_it
It is worth highlighting that projections themselves are stateless. While the calculation of the source value could be stateful (depending on the type of expression being evaluated - see Syntax below), the act of making the value available in the output is not a stateful operations.
The precise syntax of a projection is as follows:
<expression> [as [<facet>.]<alias>]|[<facet>]] |------------||-------------------------------| source target
In the above,
 indicates optional sections and
| is to be read as OR.
<expression>: the source section must consist of a valid SQL expression , which will be evaluated to the value to be projected.
this section is optional. When missing, the output target facet (see Concepts section if this word is unfamiliar) will be
<alias>will be defaulted to the string representation of the source expression.
as: this is the Lenses SQL keyword that makes specifying the target of the projection explicit.
[<facet>.]<alias>]|[<facet>]: this nested section, which must be specified whenever
asis used, is composed of two mutually exclusive sub-sections.
[<facet>.]<alias>]: this optional section is a string that will be used as field name in the output record. Optionally, the output facet where the field will be projected (e.g.
_value) can be also specified.
[<facet>]: this optional section specifies that the result of the projection will make up the whole of the indicated facet of output record (e.g. the whole key or the whole value). See Projections and Storage Formats below to understand the implications of this.
The above syntax highlights something important about the relationship between projections and facets: a projection can have a source facet and will always have a target facet and while they are related, they might not always be the same.
In the above example query:
field4is a projection from value facet to value facet
(1 + field1) as _key.ais a projection from value facet to key facet
_key.field2 as result3is a projection from key facet to value facet
5 + 7 as constantFieldis a projection to value facet but with no source facet, as the expression does not depend on the input record
This makes SQL projections very expressive, but it also means that attention needs to be paid when facets are manipulated explicitly. Details, edge cases and implications of this will be discussed below.
This is the default type of projection when a field is selected within the source expression.
Unless otherwise specified, the source facet of a projection will always be
In light of the above, the following query contains only projections that read
_value facet of the source:
INSERT INTO target-topic SELECT STREAM CONCAT('--> ', _value.field2, ' <--') , field1 , LENGTH(field4) , _value.field1 AS aliased , 5 + field3 FROM input-topic;
Also, notice that
_value.field1 as aliased are reading the exact same input field and in the former case the
_value facet is implicit.
A projection can also access a selected field on the key facet.
INSERT INTO target-topic SELECT STREAM CONCAT('--> ', _key.field2, ' <--') , _key.field1 , LENGTH(_key.field4) , 5 + _key.field3 FROM input-topic;
The above query contains only projections that read from the
_key facet of the source.
This kind of projections behave exactly the same as any other projection, but because of specific interactions with other mechanics in LSQL Engine Streaming mode, they can’t be used in the same query with Aggregations or Joins .
All examples of projections described until now focused on selecting a field from either the key or the value of an input record. However, a projection can also read a whole facet and project it to the output.
INSERT INTO target-topic SELECT STREAM _key AS old-key , _value AS old-value FROM input-topic;
In the above query, there are two projections:
_key as old-key: this is projecting the whole key of
input-topiconto a field
_value as old-value: this is projecting the whole value of
input-topiconto a field
For more details about the rules around using aliases (as done in the above example), see below .
This can be useful when the input source uses a primitive storage format for either one or both facets, but it is desirable to map such facets to named fields within a more complex output structure, as it could be the case in the above query. This said, projections from whole facets are supported for all storage formats, not only the primitive ones.
As it should be clear by all examples in this page so far, projections can be freely mixed within a single
SELECT statement; the same query can have many projections, some of which could be reading from the key of the input record, some others from the value and yet others returning literal constants.
Lenses SQL is designed to support this mixed usage and to calculate the appropriate resulting structure given the schemas of all the projections’ inputs.
Lenses SQL assigns a special meaning to
* when used as a projection.
* is used without any further qualification, it is interpreted as an instruction to project all fields from key and value to the output.
INSERT INTO target-topic SELECT STREAM * FROM input-topic;
The result of this query is that
target-topic will have exactly the same fields, schema and data than
* is explicitly qualified, the meaning becomes more precise and it will limit the fields to be selected to only the ones belonging to the qualified source (and optionally facet).
INSERT INTO target-topic SELECT STREAM i1.* , i2.field1 FROM input-topic1 AS i1 JOIN input-topic2 AS i2 WITHIN 1h;
The above shows how a qualified wildcard projection can be used to target all the fields of a
specific source. Additionally, a qualified wildcard can be used in addition to other normal
The target of a projection is the location within the output record where the result of the projection’s expression is going to be mapped.
As previously mentioned, Lenses SQL uses the keyword
as to explicitly control this.
as, it is possible to:
- Assign an alias to the projected field in the result. For example,
field1 as aliased-field1is reading
field1and projecting its value to a field called
aliased-field1. Notice that, as no facet information is specified, this will be targeting the value of the output record.
- Project directly to nested fields within structures
INSERT INTO target-topic SELECT STREAM field1 AS x.a , field2 AS x.b, FROM input-topic;
The above query will result in a field
x that is a structure and that contains two fields
- Control the facet to which field is projected. For example,
field1 as _key.field1is reading
field1(from the value facet) and projecting to a field with the same name on the key of the output. Depending on the source being projected, doing this might have important implications .
- Project over the whole target value or key
field1 as _valueis reading
field1and projecting it over the whole value_of the output. One important thing about this is that Lenses SQL allows only one projection of this kind per facet per query. What this means is that the following query would be invalid, because only one projection can target
_valuewithin the same query:
INSERT INTO target-topic SELECT STREAM field1 AS _value , field2 AS _value, FROM input-topic;
In order to avoid potential errors, Lenses defines the following rules for defining aliases:
- Alias can’t add new fields to to non struct properties.
an_int AS foo.bar, field1 AS foo.bar.field1(since
barwill be an
INTwe can’t define a property
- Aliases cannot override previously defined properties.
field1 AS foo.bar.field1, field2 as foo.bar(setting
foo.barwith the contents of
field2would override the value of
- Fields cannot have duplicated names.
field1 AS foo.bar, field2 as foo.bar(setting
foo.barwith the contents of
field2would override it’s previous content)
Projecting on Key is a feature that can be useful in situations where it is desirable to quickly change the key of a Table or a Stream, maybe in preparation for further operations (e.g. joins etc…). This feature is sometimes referred to as re-keying within the industry.
However, one important implication of using this feature is that Kafka uses the key to determine in what partition a record must be stored; by changing the key of the record, the resulting partitioning of the output topic might differ from the one of the input topic. While there is nothing wrong with this, it is something that must be understood clearly when using this feature.
Sometimes it is desirable to limit the input records to be projected based on some predicate.
For example, we might want to project
output-topic, but only if
field3 contains a specific value.
INSERT INTO target-topic SELECT STREAM field1 , field2, FROM input-topic WHERE field3 = 'select_me';
This is the
WHERE clause is used: to filter the input dataset by some predicate, applying the rest of the query only to records that match the predicate.
The syntax for this clause is simply
WHERE <expression>, where
<expression> is a valid arbitrarily nested Lenses SQL boolean expression.
INSERT INTO target-topic SELECT STREAM field1 , field2, FROM input-topic WHERE (field3 = 'select_me' AND LENGTH(CONCAT(field1, field2)) >= 5) OR field4 = (field5 + 5);
Projections have a close relationship with the storage format of their target.
By default, if a query contains more than one projection for the same facet, then that facet’s storage format will be a structure (which type of structure exactly depends on other factors that are not relevant here).
INSERT INTO target-topic SELECT STREAM field1 as result1 , field2 FROM input-topic;
The above query will make
target-topic’s value storage format a structure (e.g.
JSON) with two fields named:
field2. The storage format for
target-topic’s key will be the same as the
input-topic’s, as there are no projections target that facet.
The storage format of the output can be explicitly changed by a projection, however. This will often be the case when a projection on a whole facet is used. Consider the following query:
INSERT INTO target-topic SELECT STREAM field1 AS result1 , field2 AS _key, FROM input-topic;
In this case,
target-topic’s value storage format will still be a structure, but its key will depend on
schema. For example, if
field2 is a string, then
target-topic’s key will be changed to
STRING (assuming it was
STRING already). The same behaviour applies to the value facet.
One example where this can be relevant is when a projection is used to map the result of a single field in the target topic. Consider the following query:
INSERT INTO target-topic SELECT STREAM field1 AS _value FROM input-topic;
This query will project
field1 on the whole value facet, and this will result in a change of storage format as well.
This behaviour is quite common when a single projection is used, because more often than not the desired output in such
scenario will be the content of the field rather than a structure with a single field.