User Defined Aggregate Functions
io.lenses:lenses-sql-udf:4.0.0Implementing a streaming UDAF
public interface UserDefinedAggregateFunction extends UserDefinedFunction {
String name();
/**
* Defines a mapping from input to output types.
*
* @param argType The data type of the single argument.
* @return The data type of the result.
* @throws UdfException if the given data type is not supported by this UDF.
*/
DataType typer(DataType argType) throws UdfException;
/**
* The empty/initial aggregate value of this function.
*
* @return the initial value.
*/
Value empty();
/**
* This method can be overridden by users to perform initialization logic.
* It is guaranteed to be called before empty, add, merge, and in case of table-based aggregations, subtract.
*
* @param args The tail of the arguments passed to the UDAF.
*/
default void init(Value[] args) {}
/**
* Handles adding a new value to the current aggregated value.
*
* @param aggregateKey The key of the field being aggregated on.
* @param aggregatedValue The current aggregated value.
* @param toBeAdded The value to be added to the current aggregated value.
* @return The updated aggregated value.
* @throws UdfException if adding the new value to the current aggregated value fails.
*/
Value add(Value aggregateKey, Value aggregatedValue, Value toBeAdded) throws UdfException;
/**
* Merges two aggregated values in case of windowing.
*
* @param aggregateKey The key of the field being aggregated on.
* @param first The first aggregated value.
* @param second The second aggregated value.
* @return The combined aggregated value.
* @throws UdfException if the merge fails.
*/
Value merge(Value aggregateKey, Value first, Value second) throws UdfException;
/**
* An optional final step that calculates and returns the result.
*
* @return An optional function that will be called with the aggregated value to calculate the final result.
*/
default Optional<FinalStep> finalStep() {
return Optional.empty();
}
}DataType typer(DataType argType) throws UdfException
Value empty()
default void init(Value[] args)
Value add(Value aggregateKey, Value aggregatedValue, Value toBeAdded) throws UdfException;
Value merge(Value aggregateKey, Value first, Value second) throws UdfException;
default Optional finalStep()
Implementing a Table UDAF
Value subtract(Value aggregateKey, Value aggregatedValue, Value toBeSubtracted) throws UdfException;
Example: A custom implementation of an “average” table aggregation function
Testing
Last updated
Was this helpful?

