Akka Streams metrics

If you have an application written using Akka Streams and you are also using the Reactive Kafka connector for Akka Streams, then the topology client can publish metrics. The process is more complex than the one line change required for Kafka Streams but is still relatively simple.

First, you will require the following maven module:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-topology-client-akka-streams-kafka_2.12</artifactId>
    <version>1.0.0</version>
</dependency>

For metrics on a producer, you must create a KafkaProducer from the usual producer settings before it is passed to the Akka stream as a sink and then register that producer with the topology client.

// setup your producer settings as normal
final ProducerSettings<String, String> producerSettings =
        ProducerSettings
                .create(system, 
                        new StringSerializer(), 
                        new StringSerializer())
                .withBootstrapServers("localhost:9092");

// using the producer settings, create a Kafka producer
KafkaProducer<String, String> producer = 
    producerSettings.createKafkaProducer();

// register the producer with the topology client along with the topic name you will publish to
client.register(
        topology.getAppName(), 
        outputTopic, 
        new KafkaMetricsBuilder(producer));

// create your sink as normal, except pass in the Kafka producer you created earlier
sink = Producer.plainSink(producerSettings, producer);

For consumer metrics, Reactive Kafka exposes these on the Consumer.Control instance that is returned as the materialized value. After the stream is materialized, you will have a handle to this control which can be registered directly with the topology.

// create your stream as normal, and retain a reference to the materialized value
Consumer.Control control = Consumer.plainSource(consumerSettings, Subscriptions.topics(inputTopic))
                            ...<stream-flow>...
                            .toMat(producer, Keep.left())
                            .run(materializer);

// register the control with the topology client
client.register(topology.getAppName(), inputTopic, new AkkaStreamsKafkaMetricBuilder(control));