This page describes how to use User defined functions in Lenses SQL Processors.
User-defined Aggregation Functions (UDAF) are plugins that allow users to extend the list of supported aggregation functions currently offered in Lenses.
To code such a function, first, it is required to pull this JVM dependency and implement the UserDefinedAggregateFunction interface:
io.lenses:lenses-sql-udf:4.0.0
Currently, any UDAF implementing the interfaces in the above repository needs to be put in package `io.lenses.sql.udf` for Lenses to pick them up.
Implementing a streaming UDAF
When specifying a UDAF, only the first argument can reference a field, function or expression, the remaining ones can only be static i.e neither fields nor functions.
The following are valid examples of UDAF invocations:
udaf_function(table.foo)
udaf_function(1)
udaf_function("foo")
udaf_function(concat("bar", table.foo))
udaf_fuction(table.foo, 1 , "bar")
udaf_fuction(table.foo, 1 )
udaf_fuction(table.foo, 2.0 )
udaf_fuction(table.foo, "bar" , 2, "foo")
publicinterfaceUserDefinedAggregateFunctionextendsUserDefinedFunction {Stringname(); /** * Defines a mapping from input to output types. * * @param argType The data type of the single argument. * @return The data type of the result. * @throwsUdfException if the given data type is not supported by this UDF. */DataTypetyper(DataType argType) throwsUdfException; /** * The empty/initial aggregate value of this function. * * @return the initial value. */Valueempty(); /** * This method can be overridden by users to perform initialization logic. * It is guaranteed to be called before empty, add, merge, and in case of table-based aggregations, subtract. * * @param args The tail of the arguments passed to the UDAF. */defaultvoidinit(Value[] args) {} /** * Handles adding a new value to the current aggregated value. * * @param aggregateKey The key of the field being aggregated on. * @param aggregatedValue The current aggregated value. * @param toBeAdded The value to be added to the current aggregated value. * @return The updated aggregated value. * @throwsUdfException if adding the new value to the current aggregated value fails. */Valueadd(Value aggregateKey,Value aggregatedValue,Value toBeAdded) throwsUdfException; /** * Merges two aggregated values in case of windowing. * * @param aggregateKey The key of the field being aggregated on. * @param first The first aggregated value. * @param second The second aggregated value. * @return The combined aggregated value. * @throwsUdfException if the merge fails. */Valuemerge(Value aggregateKey,Value first,Value second) throwsUdfException; /** * An optional final step that calculates and returns the result. * * @return An optional function that will be called with the aggregated value to calculate the final result. */defaultOptional<FinalStep> finalStep() {returnOptional.empty(); }}
String name()
When a query specifies a function name:
SELECT STREAM foo(bar1,bar2,bar3)FROM sourceGROUP BY foo.bar;
Lenses will first check the list of pre-defined functions for a matching name. If no match is found, Lenses will then proceed to check if a user defined function (UDF/UDAF) exists. It does so by checking whether any of the available UDFs/UDAFs name() method returns a name that matches the desired function. If so, the UDF/UDAF will be used.
Package
Please make sure that the UDAF implementation class belongs to one of the packages specified by the lenses.sql.udf.packages configuration option.
This function allows Lenses to know the type the aggregation will be returning.
Notice that only a single argument type is provided. The provided type will be the one for the first argument of the aggregation.
This is due to the limitation described above where only the first argument can be a field selection.
IMPORTANT: Currently Lenses will not support returning an Optional data type from this method. Doing so will result in undefined behaviour.
Value empty()
The empty aggregation value. This value has to share the same schema with the values returned from the add and subtract method.
e.g: when calculating a COUNT, this value can return 0
e.g: when calculating the maximum value, this can return Integer.MIN_VALUE
default void init(Value[] args)
This method will be called before any other method in the aggregation.
It can be used to set some config flags based on the provided configuration arguments (all the arguments except the first).
Value add(Value aggregateKey, Value aggregatedValue, Value toBeAdded) throws UdfException;
This value allows users to specify how a new value can be added to the current aggregation. As a return value, it expects the new aggregated value to be returned:
when calculating a “minimum” type of function, one could use this method to compare the newly found value toBeAdded with the current minimum value (aggregatedValue). If lower, toBeAdded can be returned.
Note: If no finalStep is defined, the result returned by the FinalStep calulate method has to be consistent with the DataType returned by the typer function.
Value merge(Value aggregateKey, Value first, Value second) throws UdfException;
When session windows are used, it may be necessary to merge the aggregation results from two windows into one.
This specifies how this merger is done.
default Optional finalStep()
In some situations (see custom average example) the intermediate aggregated value and the final one do not match. In such situations, Lenses allows users to provide a final step that maps the intermediate value into the final one.
Note: If defined, the result returned by the FinalStep calulate method has to be consistent with the DataType defined in typer.
Implementing a Table UDAF
Table UDAFs are in all aspects similar to Stream aggregation with the only exception being the addition of a subtract method.
publicinterfaceUserDefinedTableAggregateFunctionextendsUserDefinedAggregateFunction { /** * Handles removal of a value from the aggregation. * * @param aggregateKey The key of the field being aggregated on. * @param aggregatedValue The current aggregated value. * @param toBeSubtracted The value to be subtracted from the current aggregated value. * @return The updated aggregated value. * @throwsUdfException if the subtraction failed. */Valuesubtract(Value aggregateKey,Value aggregatedValue,Value toBeSubtracted) throwsUdfException;}
Value subtract(Value aggregateKey, Value aggregatedValue, Value toBeSubtracted) throws UdfException;
This method allows users to specify how a value should be removed from an aggregation. For example, when implementing a count aggregation function, one can use this function to subtract 1 from the aggregated value
Example: A custom implementation of an “average” table aggregation function
Testing is an important part of any development. In order to test your UDF we recommend following the example tests published in the Lenses UDF Example Repository