Akka Stream MetricsΒΆ

If you have an application written using Akka Streams, and you are also using the Reactive Kafka connector for Akka Streams, then the topology client can publish metrics. The process is slightly more involved than the one line change required for Kafka Streams but is still relatively simple.

Firstly, you will require the following maven module:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-topology-client-akka-streams-kafka_2.12</artifactId>
    <version>1.0.0</version>
</dependency>

Note: This dependency is for Scala 2.12. If you are using Scala 2.11 then replace this dependency with lenses-topology-client-akka-streams-kafka_2.11.

For metrics on a producer, you must create a KafkaProducer from the usual producer settings _before_ it is passed to the Akka stream as a sink and then register that producer with the topology client.

// setup your producer settings as normal
final ProducerSettings<String, String> producerSettings =
        ProducerSettings
                .create(system, new StringSerializer(), new StringSerializer())
                .withBootstrapServers("localhost:9092");

// using the producer settings, create a kafka producer
KafkaProducer<String, String> producer = producerSettings.createKafkaProducer();

// register the producer with the topology client along with the topic name you will publish to
client.register(topology.getAppName(), outputTopic, new KafkaMetricsBuilder(producer));

// create your sink as normal, except pass in the kafka producer you created earlier
sink = Producer.plainSink(producerSettings, producer);

For consumer metrics, Reactive Kafka exposes these on the Consumer.Control instance that is returned as the materialized value. After the stream is materialized, you will have a handle to this control which can be registered directly with the topology.

// create your stream as normal, and retain a reference to the materialized value
Consumer.Control control = Consumer.plainSource(consumerSettings, Subscriptions.topics(inputTopic))
                            ...<stream-flow>...
                            .toMat(producer, Keep.left())
                            .run(materializer);

// register the control with the topology client
client.register(topology.getAppName(), inputTopic, new AkkaStreamsKafkaMetricBuilder(control));

Note: When you create the topology instance, set the app type to AppType.AkkaStreams so that the UI renders with the correct visualization.