Akka Stream Metrics¶
If you have an application written using Akka Streams, and you are also using the Reactive Kafka connector for Akka Streams, then the topology client can publish metrics. The process is slightly more involved than the one line change required for Kafka Streams but is still relatively simple.
Firstly, you will require the following maven module:
<dependency> <groupId>com.landoop</groupId> <artifactId>lenses-topology-client-akka-streams-kafka_2.12</artifactId> <version>1.0.0</version> </dependency>
Note: This dependency is for Scala 2.12. If you are using Scala 2.11 then replace this dependency with
For metrics on a producer, you must create a
KafkaProducer from the usual producer settings _before_ it is passed to the Akka stream as a sink and then register that producer with the topology client.
// setup your producer settings as normal final ProducerSettings<String, String> producerSettings = ProducerSettings .create(system, new StringSerializer(), new StringSerializer()) .withBootstrapServers("localhost:9092"); // using the producer settings, create a kafka producer KafkaProducer<String, String> producer = producerSettings.createKafkaProducer(); // register the producer with the topology client along with the topic name you will publish to client.register(topology.getAppName(), outputTopic, new KafkaMetricsBuilder(producer)); // create your sink as normal, except pass in the kafka producer you created earlier sink = Producer.plainSink(producerSettings, producer);
For consumer metrics, Reactive Kafka exposes these on the
Consumer.Control instance that is returned as the materialized value. After the stream is materialized,
you will have a handle to this control which can be registered directly with the topology.
// create your stream as normal, and retain a reference to the materialized value Consumer.Control control = Consumer.plainSource(consumerSettings, Subscriptions.topics(inputTopic)) ...<stream-flow>... .toMat(producer, Keep.left()) .run(materializer); // register the control with the topology client client.register(topology.getAppName(), inputTopic, new AkkaStreamsKafkaMetricBuilder(control));
Note: When you create the topology instance, set the app type to
AppType.AkkaStreams so that the UI renders with the correct visualization.