User Defined Aggregate Functions

User-defined Aggregation Functions (UDAF) are plugins that allow users to extend the list of supported aggregation functions currently offered in Lenses.

To code such a function, first, it is required to pull this JVM dependency and implement the UserDefinedAggregateFunction interface:

io.lenses:lenses-sql-udf:4.0.0

Currently, any UDAF implementing the interfaces in the above repository needs to be put in package `io.lenses.sql.udf` for Lenses to pick them up.

Implementing a streaming UDAF

When specifying a UDAF, only the first argument can reference a field, function or expression, the remaining ones can only be static i.e neither fields nor functions.

The following are valid examples of UDAF invocations:

  • udaf_function(table.foo)

  • udaf_function(1)

  • udaf_function("foo")

  • udaf_function(concat("bar", table.foo))

  • udaf_fuction(table.foo, 1 , "bar")

  • udaf_fuction(table.foo, 1 )

  • udaf_fuction(table.foo, 2.0 )

  • udaf_fuction(table.foo, "bar" , 2, "foo")

public interface UserDefinedAggregateFunction extends UserDefinedFunction {

    String name();
    /**
     * Defines a mapping from input to output types.
     *
     * @param argType The data type of the single argument.
     * @return The data type of the result.
     * @throws UdfException if the given data type is not supported by this UDF.
     */
    DataType typer(DataType argType) throws UdfException;

    /**
     * The empty/initial aggregate value of this function.
     *
     * @return the initial value.
     */
    Value empty();

    /**
     * This method can be overridden by users to perform initialization logic.
     * It is guaranteed to be called before empty, add, merge, and in case of table-based aggregations, subtract.
     *
     * @param args The tail of the arguments passed to the UDAF.
     */
    default void init(Value[] args) {}

    /**
     * Handles adding a new value to the current aggregated value.
     *
     * @param aggregateKey The key of the field being aggregated on.
     * @param aggregatedValue The current aggregated value.
     * @param toBeAdded The value to be added to the current aggregated value.
     * @return The updated aggregated value.
     * @throws UdfException if adding the new value to the current aggregated value fails.
     */
    Value add(Value aggregateKey, Value aggregatedValue, Value toBeAdded) throws UdfException;

    /**
     * Merges two aggregated values in case of windowing.
     *
     * @param aggregateKey The key of the field being aggregated on.
     * @param first The first aggregated value.
     * @param second The second aggregated value.
     * @return The combined aggregated value.
     * @throws UdfException if the merge fails.
     */
    Value merge(Value aggregateKey, Value first, Value second) throws UdfException;

    /**
     * An optional final step that calculates and returns the result.
     *
     * @return An optional function that will be called with the aggregated value to calculate the final result.
     */
    default Optional<FinalStep> finalStep() {
        return Optional.empty();
    }

}

String name()

When a query specifies a function name:

SELECT STREAM foo(bar1,bar2,bar3)
FROM source
GROUP BY foo.bar;

Lenses will first check the list of pre-defined functions for a matching name. If no match is found, Lenses will then proceed to check if a user defined function (UDF/UDAF) exists. It does so by checking whether any of the available UDFs/UDAFs name() method returns a name that matches the desired function. If so, the UDF/UDAF will be used.

Package

Please make sure that the UDAF implementation class belongs to one of the packages specified by the lenses.sql.udf.packages configuration option.

DataType typer(DataType argType) throws UdfException

This function allows Lenses to know the type the aggregation will be returning.

Notice that only a single argument type is provided. The provided type will be the one for the first argument of the aggregation.

This is due to the limitation described above where only the first argument can be a field selection.

IMPORTANT: Currently Lenses will not support returning an Optional data type from this method. Doing so will result in undefined behaviour.

Value empty()

The empty aggregation value. This value has to share the same schema with the values returned from the add and subtract method.

  • e.g: when calculating a COUNT, this value can return 0

  • e.g: when calculating the maximum value, this can return Integer.MIN_VALUE

default void init(Value[] args)

This method will be called before any other method in the aggregation.

It can be used to set some config flags based on the provided configuration arguments (all the arguments except the first).

Value add(Value aggregateKey, Value aggregatedValue, Value toBeAdded) throws UdfException;

This value allows users to specify how a new value can be added to the current aggregation. As a return value, it expects the new aggregated value to be returned:

  • when calculating a “minimum” type of function, one could use this method to compare the newly found value toBeAdded with the current minimum value (aggregatedValue). If lower, toBeAdded can be returned.

Note: If no finalStep is defined, the result returned by the FinalStep calulate method has to be consistent with the DataType returned by the typer function.

Value merge(Value aggregateKey, Value first, Value second) throws UdfException;

When session windows are used, it may be necessary to merge the aggregation results from two windows into one.

This specifies how this merger is done.

default Optional finalStep()

In some situations (see custom average example) the intermediate aggregated value and the final one do not match. In such situations, Lenses allows users to provide a final step that maps the intermediate value into the final one.

Note: If defined, the result returned by the FinalStep calulate method has to be consistent with the DataType defined in typer.

Implementing a Table UDAF

Table UDAFs are in all aspects similar to Stream aggregation with the only exception being the addition of a subtract method.

public interface UserDefinedTableAggregateFunction extends UserDefinedAggregateFunction {

    /**
     * Handles removal of a value from the aggregation.
     *
     * @param aggregateKey The key of the field being aggregated on.
     * @param aggregatedValue The current aggregated value.
     * @param toBeSubtracted The value to be subtracted from the current aggregated value.
     * @return The updated aggregated value.
     * @throws UdfException if the subtraction failed.
     */
    Value subtract(Value aggregateKey, Value aggregatedValue, Value toBeSubtracted) throws UdfException;

}

Value subtract(Value aggregateKey, Value aggregatedValue, Value toBeSubtracted) throws UdfException;

This method allows users to specify how a value should be removed from an aggregation. For example, when implementing a count aggregation function, one can use this function to subtract 1 from the aggregated value

Example: A custom implementation of an “average” table aggregation function

public class custom_average implements UserDefinedAggregateFunction {

    private Value calculateFinalResult(Value aggregatedValue) throws UdfException {
        long count = aggregatedValue.asRepeatedValue().get(0).toLongValue().get();
        long sum = aggregatedValue.asRepeatedValue().get(1).toLongValue().get();

        DoubleValue result;
        if (count == 0) {
            result = new DoubleValue(0d);
        } else {
            result = new DoubleValue(sum/count);
        }
        return result;
    }

    @Override
    public String name() {
        return "custom_average";
    }

    @Override
    public DataType typer(DataType argType) {
        return DataType.ltDouble();
    }

    @Override
    public Value empty() {
        LongValue count = new LongValue(0);
        LongValue product = new LongValue(0);
        return RepeatedValue.ofTwo(count, product);
    }

    @Override
    public Value add(Value aggregateKey, Value aggregatedValue, Value toBeAdded) throws UdfException {
        long count = aggregatedValue.asRepeatedValue().get(0).toLongValue().get();
        long product = aggregatedValue.asRepeatedValue().get(1).toLongValue().get();
        long added = toBeAdded.toLongValue().get();

        long updatedCount = count + 1;
        long updatedSum = product + added;

        return RepeatedValue.ofTwo(new LongValue(updatedCount), new LongValue(updatedSum));
    }

    @Override
    public Value merge(Value aggregateKey, Value first, Value second) throws UdfException {
        // sum up the counts and multiply the products
        long count1 = first.asRepeatedValue().get(0).toLongValue().get();
        long sum1 = first.asRepeatedValue().get(1).toLongValue().get();
        long count2 = second.asRepeatedValue().get(0).toLongValue().get();
        long sum2 = second.asRepeatedValue().get(1).toLongValue().get();

        long mergedCount = count1 + count2;
        long mergedProduct = sum1 + sum2;
        return RepeatedValue.ofTwo(new LongValue(mergedCount), new LongValue(mergedProduct));
    }

    @Override
    public Optional<FinalStep> finalStep() {
        return Optional.of(this::calculateFinalResult);
    }
}

Testing

Testing is an important part of any development. In order to test your UDF we recommend following the example tests published in the Lenses UDF Example Repository

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.