Viewing topology is a powerful feature of Lenses and automatically all Connectors and SQL based KStream applications are depicted.
Starting with Lenses 2.1, you can now bring your own consumer and producer applications to the topologies. All you have to do
is use the Apache 2.0
topology-client that provides a simple builder for topologies, which is then pushed to a Kafka topic
to be picked up by Lenses at runtime. In addition, Lenses also supports metrics for Kafka Streams, Akka Streams and Spark
Structured Streaming. You can find full working examples on our Github <https://github.com/Landoop/lenses-topology-example>.
This guide will cover client installation, building your topology, and publishing of metrics for each of the three streaming libraries.
The topology client is available on maven central. You will require at least the core builder, which is used for submitting topologies. Users of maven can use the following coordinates:
<dependency> <groupId>com.landoop</groupId> <artifactId>lenses-topology-client-core</artifactId> <version>1.0.0</version> </dependency>
Uses of gradle can use:
In addition, if you plan to publish metrics you will need an extra module which has the implementations needed. Those modules will be covered in following sections.
Building a Topology¶
Topologies are represented as a series of nodes, where the series of nodes will begin with one or more kafka topics, and optionally
end with an output topic. In between will be a series of nodes, such as
group, and so on, which represent the
operations that your streaming library is performing. Each node has a name which uniquely identifies it, and a type which informs
Lenses as to the purpose of that node - a topic, a transformation step, a filter, and so on. In the case of topic nodes, the name of
the node should be the name of topic.
Nodes also need to declare any parent nodes, so that the Lenses UI knows how to render the relationships between the nodes. For example,
a that reads from a topic
b would need to declare the topic node as it’s parent.
For example, if we consider the classic word count application that was the original poster child application for Hadoop, and rework it for a streaming platform, then we would be reading lines of data; splitting up each line into words; grouping common words together; counting each group of words; then publishing the (word, count) tuples to an output topic.
Here is the full Java code of this five node topology “word count app”, with an input
split operation, a
count operation, and finally the output
topic. The builder entry method is
start and we complete with
Topology topology = TopologyBuilder.start(AppType.KafkaStreams, "spark-streaming-wordcount") .withTopic(inputTopic) .withDescription("Raw lines of text") .withRepresentation(Representation.TABLE) .endNode() .withNode("split", NodeType.SPLIT) .withDescription("Split lines into words") .withRepresentation(Representation.TABLE) .withParent(inputTopic) .endNode() .withNode("groupby", NodeType.GROUPBY) .withDescription("Group by word") .withRepresentation(Representation.TABLE) .withParent("split") .endNode() .withNode("count", NodeType.COUNT) .withDescription("Count instances of each word") .withRepresentation(Representation.TABLE) .withParent("groupby") .endNode() .withTopic(outputTopic) .withParent("count") .withDescription("Write output to the output topic") .withRepresentation(Representation.TABLE) .endNode() .build();
Notice that when we first create the builder we supply the type of application we are building, as well as a name for this application. These are used for display in the UI and are useful when there are multiple topologies to be rendered.
Also, look at the first node. This does not declare a parent, which is because it is an input topic and therefore has no upstream dependencies. The rest of the nodes declare the previous node as a parent.
Submitting the Topology¶
To submit the topology to Lenses, we must create an instance of the
TopologyClient. To do this, we use the static build
method on the
KafkaTopologyClient class that requires typical Kafka producer properties. This particular implementation sends the topologies
via kafka topics for persistence.
Properties topologyProps = new Properties(); topologyProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); /* Customise the target topics. If so, make sure `lenses.topics.external.topology` and `lenses.topics.external.metrics` are set accordingly. topologyProps.put("lenses.topics.topology", "_lenses_topology"); topologyProps.put("lenses.topics.metrics", "_lenses_topology_metrics"); */ TopologyClient client = KafkaTopologyClient.create(topologyProps); client.register(topology);
After submission, the Lenses UI will display the topology. In the following screenshot you can see three topologies that have been submitted, each one reading from a shared input topic but writing to an individual output topic.
And in the next screenshot you can see the focus on an individual app. Notice that each stage in the app is represented in the lower half of the screen.
To remove a topology - usually once the stream has been closed or the app terminated - we can use the
delete() method on the client.
To configure the target topics for topology and metrics set the lenses.topics.external.topology and lenses.topics.external.metrics to the topics required and make sure Lenses configuration is in sync with these values. If the values are not set they default to __topology and __topology__metrics.