User Defined Aggregate Functions
User-defined Aggregation Functions (UDAF) are plugins that allow users to extend the list of supported aggregation functions currently offered in Lenses.
To code such a function, first, it is required to pull this JVM dependency and implement the UserDefinedAggregateFunction
interface:
Currently, any UDAF implementing the interfaces in the above repository needs to be put in package `io.lenses.sql.udf` for Lenses to pick them up.
Implementing a streaming UDAF
When specifying a UDAF, only the first argument can reference a field, function or expression, the remaining ones can only be static i.e neither fields nor functions.
The following are valid examples of UDAF invocations:
udaf_function(table.foo)
udaf_function(1)
udaf_function("foo")
udaf_function(concat("bar", table.foo))
udaf_fuction(table.foo, 1 , "bar")
udaf_fuction(table.foo, 1 )
udaf_fuction(table.foo, 2.0 )
udaf_fuction(table.foo, "bar" , 2, "foo")
String name()
When a query specifies a function name:
Lenses will first check the list of pre-defined functions for a matching name. If no match is found, Lenses will then proceed to check if a user defined function (UDF/UDAF) exists. It does so by checking whether any of the available UDFs/UDAFs name()
method returns a name that matches the desired function. If so, the UDF/UDAF will be used.
Package
Please make sure that the UDAF
implementation class belongs to one of the packages specified by the lenses.sql.udf.packages
configuration option.
DataType typer(DataType argType) throws UdfException
This function allows Lenses to know the type the aggregation will be returning.
Notice that only a single argument type is provided. The provided type will be the one for the first argument of the aggregation.
This is due to the limitation described above where only the first argument can be a field selection.
IMPORTANT: Currently Lenses will not support returning an Optional data type from this method. Doing so will result in undefined behaviour.
Value empty()
The empty aggregation value. This value has to share the same schema with the values returned from the add
and subtract
method.
e.g: when calculating a COUNT, this value can return 0
e.g: when calculating the maximum value, this can return
Integer.MIN_VALUE
default void init(Value[] args)
This method will be called before any other method in the aggregation.
It can be used to set some config flags based on the provided configuration arguments (all the arguments except the first).
Value add(Value aggregateKey, Value aggregatedValue, Value toBeAdded) throws UdfException;
This value allows users to specify how a new value can be added to the current aggregation. As a return value, it expects the new aggregated value to be returned:
when calculating a “minimum” type of function, one could use this method to compare the newly found value
toBeAdded
with the current minimum value (aggregatedValue). If lower, toBeAdded can be returned.
Note: If no finalStep
is defined, the result returned by the FinalStep calulate
method has to be consistent with the DataType returned by the typer
function.
Value merge(Value aggregateKey, Value first, Value second) throws UdfException;
When session windows are used, it may be necessary to merge the aggregation results from two windows into one.
This specifies how this merger is done.
default Optional finalStep()
In some situations (see custom average example) the intermediate aggregated value and the final one do not match. In such situations, Lenses allows users to provide a final step that maps the intermediate value into the final one.
Note: If defined, the result returned by the FinalStep calulate
method has to be consistent with the DataType defined in typer
.
Implementing a Table UDAF
Table UDAFs are in all aspects similar to Stream aggregation with the only exception being the addition of a subtract
method.
Value subtract(Value aggregateKey, Value aggregatedValue, Value toBeSubtracted) throws UdfException;
This method allows users to specify how a value should be removed from an aggregation. For example, when implementing a count
aggregation function, one can use this function to subtract 1 from the aggregated value
Example: A custom implementation of an “average” table aggregation function
Testing
Testing is an important part of any development. In order to test your UDF we recommend following the example tests published in the Lenses UDF Example Repository
Last updated