Topology

Viewing topology is a powerful feature of Lenses and automatically all Connectors and SQL based KStream applications are depicted. Starting with Lenses 2.1, you can now bring your own consumer and producer applications to the topologies. All you have to do is use the Apache 2.0 topology-client that provides a simple builder for topologies, which is then pushed to a Kafka topic to be picked up by Lenses at runtime. In addition, Lenses also supports metrics for Kafka Streams, Akka Streams and Spark Structured Streaming.

../../_images/topology-apps.png

This guide will cover installation of the client, building your topology, and publishing of metrics for each of the three streaming libraries.

Installation

The topology client is available on maven central. You will require at least the core builder, which is used for submitting topologies. Users of maven can use the following coordinates:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-topology-client-core</artifactId>
    <version>1.0.0</version>
</dependency>

Uses of gradle can use:

compile 'com.landoop:lenses-topology-client:1.0.0'

In addition, if you plan to publish metrics you will need an extra module which has the implementations needed. Those modules will be covered in following sections.

Building a Topology

Topologies are represented as a series of nodes, where the series of nodes will begin with one or more kafka topics, and optionally end with an output topic. In between will be a series of nodes, such as count, group, and so on, which represent the operations that your streaming library is performing. Each node has a name which uniquely identifies it, and a type which informs Lenses as to the purpose of that node - a topic, a transformation step, a filter, and so on. In the case of topic nodes, the name of the node should be the name of topic.

Nodes also need to declare any parent nodes, so that the Lenses UI knows how to render the relationships between the nodes. For example, a node a that reads from a topic b would need to declare the topic node as it’s parent.

For example, if we consider the classic word count application that was the original poster child application for Hadoop, and rework it for a streaming platform, then we would be reading lines of data; splitting up each line into words; grouping common words together; counting each group of words; then publishing the (word, count) tuples to an output topic.

Here is the full Java code of this five node topology “word count app”, with an input topic, a split operation, a groupby operation, the count operation, and finally the output topic. The builder entry method is start and we complete with build.

Topology topology = TopologyBuilder.start(AppType.KafkaStreams, "spark-streaming-wordcount")
        .withTopic(inputTopic)
        .withDescription("Raw lines of text")
        .withRepresentation(Representation.TABLE)
        .endNode()
        .withNode("split", NodeType.SPLIT)
        .withDescription("Split lines into words")
        .withRepresentation(Representation.TABLE)
        .withParent(inputTopic)
        .endNode()
        .withNode("groupby", NodeType.GROUPBY)
        .withDescription("Group by word")
        .withRepresentation(Representation.TABLE)
        .withParent("split")
        .endNode()
        .withNode("count", NodeType.COUNT)
        .withDescription("Count instances of each word")
        .withRepresentation(Representation.TABLE)
        .withParent("groupby")
        .endNode()
        .withTopic(outputTopic)
        .withParent("count")
        .withDescription("Write output to the output topic")
        .withRepresentation(Representation.TABLE)
        .endNode()
        .build();

Notice that when we first create the builder we supply the type of application we are building, as well as a name for this application. These are used for display in the UI and are useful when there are multiple topologies to be rendered.

Also, look at the first node. This does not declare a parent, which is because it is an input topic and therefore has no upstream dependencies. The rest of the nodes declare the previous node as a parent.

Submitting the Topology

To submit the topology to Lenses, we must create an instance of the TopologyClient. To do this, we use the static build method on the KafkaTopologyClient class that requires typical Kafka producer properties. This particular implementation sends the topologies via kafka topics for persistence.

Properties topologyProps = new Properties();
topologyProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
/* Customise the target topics. If so, make sure `lenses.topics.external.topology` and
`lenses.topics.external.metrics` are set accordingly.

topologyProps.put("lenses.topics.topology", "_lenses_topology");
topologyProps.put("lenses.topics.metrics", "_lenses_topology_metrics");
*/

TopologyClient client = KafkaTopologyClient.create(topologyProps);
client.register(topology);

After submission, the Lenses UI will display the topology. In the following screenshot you can see three topologies that have been submitted, each one reading from a shared input topic but writing to an individual output topic.

And in the next screenshot you can see the focus on an individual app. Notice that each stage in the app is represented in the lower half of the screen.

To remove a topology - usually once the stream has been closed or the app terminated - we can use the delete() method on the client.

client.delete(topology);

Important

To configure the target topics for topology and metrics set the lenses.topics.external.topology and lenses.topics.external.metrics to the topics required and make sure Lenses configuration is in sync with these values. If the values are not set they default to __topology and __topology__metrics.

Kafka Streams Metrics

For more advanced applications, you may wish to display metrics in addition to the topology overlay. In the case of Kafka Streams this is as simple as providing an implementation of the KafkaClientSupplier interface when creating the KafkaStreams object. This interface is used by the streams library to instantiate instances of producers and consumers. By using a custom supplier, the topology client is able to access the metrics for each producer and consumer and dispatch these to Lenses.

Firstly, you will require this maven module:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-topology-client-kafka</artifactId>
    <version>1.0.0</version>
</dependency>

Then, you must instantiate an instance of TopologyKafkaStreamsClientSupplier, which is created by passing in the topology object that describes the application, as well as the client from earlier. This instance is then provided to the Kafka Streams constructor.

For example:

KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(streamProps), new TopologyKafkaStreamsClientSupplier(client, topology));

And that’s all that is required.

Note: When you create the topology instance, set the app type to AppType.KafkaStreams so that the UI renders with the correct visualization.

Akka Stream Metrics

If you have an application written using Akka Streams, and you are also using the Reactive Kafka connector for Akka Streams, then the topology client can publish metrics. The process is slightly more involved than the one line change required for Kafka Streams but is still relatively simple.

Firstly, you will require the following maven module:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-topology-client-akka-streams-kafka_2.12</artifactId>
    <version>1.0.0</version>
</dependency>

Note: This dependency is for Scala 2.12. If you are using Scala 2.11 then replace this dependency with lenses-topology-client-akka-streams-kafka_2.11.

For metrics on a producer, you must create a KafkaProducer from the usual producer settings _before_ it is passed to the Akka stream as a sink and then register that producer with the topology client.

// setup your producer settings as normal
final ProducerSettings<String, String> producerSettings =
        ProducerSettings
                .create(system, new StringSerializer(), new StringSerializer())
                .withBootstrapServers("localhost:9092");

// using the producer settings, create a kafka producer
KafkaProducer<String, String> producer = producerSettings.createKafkaProducer();

// register the producer with the topology client along with the topic name you will publish to
client.register(topology.getAppName(), outputTopic, new KafkaMetricsBuilder(producer));

// create your sink as normal, except pass in the kafka producer you created earlier
sink = Producer.plainSink(producerSettings, producer);

For consumer metrics, Reactive Kafka exposes these on the Consumer.Control instance that is returned as the materialized value. After the stream is materialized, you will have a handle to this control which can be registered directly with the topology.

// create your stream as normal, and retain a reference to the materialized value
Consumer.Control control = Consumer.plainSource(consumerSettings, Subscriptions.topics(inputTopic))
                            ...<stream-flow>...
                            .toMat(producer, Keep.left())
                            .run(materializer);

// register the control with the topology client
client.register(topology.getAppName(), inputTopic, new AkkaStreamsKafkaMetricBuilder(control));

Note: When you create the topology instance, set the app type to AppType.AkkaStreams so that the UI renders with the correct visualization.

Spark Structured Metrics

If you have an application written using Spark Structured Streams that consumes messages from Kafka, then the topology client can publish consumer metrics using a custom Format.

You will require the following maven module:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-topology-client-kafka-spark</artifactId>
    <version>1.0.0</version>
</dependency>

Then you will need to make two small changes to the code that creates the streaming Dataset. The first is to use the lenses-kafka format rather than the default kafka format. This is a slightly modified version of the kafka source that retains a handle to the KafkaConsumer and publishes metrics exposed by that consumer. The second change is to add an option that informs the metrics publisher the name of the topology and the topics we are interested in.

Dataset<Row> words = spark
        .readStream()
        // the next line switches to the modified kafka format
        .format("lenses-kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        // the next line sets the topology name and topics so the metrics publisher knows which topics we are interested in
        .option("kafka.lenses.topology.description", topology.getDescription())
        .option("subscribe", inputTopic)
        .load();

And that’s all that’s required. The Dataset can now be used as you would in any other application.

Note: When you create the topology instance, set the app type to AppType.SparkStreaming so that the UI renders with the correct visualization.