Microservices

Applications using new technologies and dynamic architectures are created to cope with the ever changing business and technology landscape. Because of their flexibility and maintainability, faster deliveries of business requirements are possible.

A lot of business data is produced as a sequence of events (or event stream). For example, web or mobile app interactions, IoT devices and bank transactions generate events continuously. Microservices are designed on top of an event-driven architecture, and a growing number of them are leveraging Apache Kafka for their event stream.

With these microservices powering mission-critical systems, it is important to have a good understanding of the entire data flow as well as metrics for each processing node involved. Lenses topology functionality has been built for such scenarios. The end result is full control over how the mission-critical systems are performing and a go-to place to see your entire processing landscape.

Lenses Microservices API

To appear in the topology, a microservice application needs to use the Lenses lightweight library. Its API allows developers to easily make their application visible in the topology screen. Here are the dependencies to reference:

 <dependency>
        <groupId>com.landoop</groupId>
        <artifactId>lenses-topology-client-core</artifactId>
        <version>${topology.client.version}</version>
    </dependency>
    <dependency>
        <groupId>com.landoop</groupId>
        <artifactId>lenses-topology-client-kafka</artifactId>
        <version>${topology.client.version}</version>
</dependency>

A microservice can consume and produce events at the same time, or it can only consume or it can only produce event records. All three application types are covered by the API detailed below:

  /**
 * Creates a TopologyClient which registers the application topology with Lenses and sends the metrics associated with each KafkaProducer and KafkaConsumer
 * instances provided
 *
 * @param appName           - The name of your microservice application
 * @param producerTopicsMap - A map of producers to the topics they send Kafka records to
 * @param consumerTopicsMap - A map of consumers and the topics they read message from
 * @param properties        - An instance of {@link Properties}
 * @return An instance of {@link TopologyClient} which will publish the metrics for the topology to Lenses
 * @throws IOException Throws IOException
 */
public static TopologyClient create(final String appName,
                                    final Map<KafkaProducer<?, ?>, List<String>> producerTopicsMap,
                                    final Map<KafkaConsumer<?, ?>, List<String>> consumerTopicsMap,
                                    final Properties properties)

/**
 * Creates a topology client which publishes the topology associated with a Kafka producer and the metrics for it to a Kafka topic which Lenses reads.
 *
 * @param appName    - The name of your application/microservice
 * @param producer   - The instance of the {@link KafkaProducer}
 * @param topics     - A list of topics the producer publishes records to
 * @param properties - An instance of {@link Properties} used to create the producer instance
 * @return An instance of {@link TopologyClient} which monitors the producer metrics and sends them for Lenses to pick up.
 * @throws IOException - Throws IOException
 */
public static TopologyClient fromProducer(final String appName,
                                          final KafkaProducer<?, ?> producer,
                                          final List<String> topics,
                                          final Properties properties)

/**
 * Creates a topology client which publishes the topology associated with a Kafka producer and the metrics for it to a Kafka topic
 * which Lenses reads.
 *
 * @param appName      - The name of your application/microservice
 * @param producer     - The instance of the {@link KafkaProducer}
 * @param topics       - A list of topics the producer publishes records to
 * @param properties   - An instance of {@link Properties} used to create the producer instance
 * @param keyEncoder   - Represents the format of a Kafka record key sent by producer
 * @param valueEncoder - Represents the format of a Kafka record value sent by producer
 * @return An instance of {@link TopologyClient} which monitors the producer metrics and sends them for Lenses to pick up.
 * @throws IOException - Throws IOException
 */
public static TopologyClient fromProducer(final String appName,
                                          final KafkaProducer<?, ?> producer,
                                          final List<String> topics,
                                          final Properties properties,
                                          final DecoderType keyEncoder,
                                          final DecoderType valueEncoder)

/**
 * Creates a topology client which publishes the topology associated with a Kafka producer and the metrics for it to a Kafka topic
 * which Lenses reads.
 *
 * @param appName    - The name of your application/microservice
 * @param consumer   - The instance of the {@link KafkaConsumer}
 * @param topics     - A list of topics the producer publishes records to
 * @param properties - An instance of {@link Properties} used to create the producer instance
 * @return An instance of {@link TopologyClient} which monitors the producer metrics and sends them for Lenses to pick up.
 * @throws IOException Throws IOException
 */
public static TopologyClient fromConsumer(final String appName,
                                          final KafkaConsumer<?, ?> consumer,
                                          final List<String> topics,
                                          final Properties properties) throws IOException

/**
 * Creates a topology client which publishes the topology associated with a Kafka producer and the metrics for it to a Kafka topic
 * which Lenses reads.
 *
 * @param appName      - The name of your application/microservice
 * @param consumer     - The instance of the {@link KafkaConsumer}
 * @param topics       - A list of topics the producer publishes records to
 * @param properties   - An instance of {@link Properties} used to create the producer instance
 * @param keyEncoder   - Represents the format of a Kafka record key sent by producer
 * @param valueEncoder - Represents the format of a Kafka record value sent by producer
 * @return An instance of {@link TopologyClient} which monitors the producer metrics and sends them for Lenses to pick up.
 * @throws IOException Throws IOException
 */
public static TopologyClient fromConsumer(final String appName,
                                          final KafkaConsumer<?, ?> consumer,
                                          final List<String> topics,
                                          final Properties properties,
                                          final DecoderType keyEncoder,
                                          final DecoderType valueEncoder)

Sample project

An example of microservices integrated with Lenses can be found on GitHub<https://github.com/Landoop/lenses-topology-example/tree/master/lenses-topology-example-microservice>. The end result of running the services can be seen in this screenshot taken from the Lenses Topology screen:

../../_images/lenses_topology_microservice.png

All three microservice PaymentsService, StorageAwsService and SuspiciousPaymentsService and the Kafka topics they use are rendered in the graph with their performance metrics. In the provided example, the PaymentsService processes all the incoming payments and translates each amount to the GBP currency and sends them to payments_xchg. The StorageAwsService takes the new records and stores them to S3 (this is a sample so the actual step of writing to S3 is omitted). If a payment record is suspicious, the PaymentsService will send it to suspicious_payments topic. These records will end up being processed by the SuspiciousPaymentsService; the actual implementation has been left out as well since is not the focus of this exercise.

Here is the command to build the project:

mvn clean compile assembly:single
mv target/lenses-topology-example-microservice-1.0.0-jar-with-dependencies.jar target/lenses_microservices.jar

Once the build has completed each service can be started individually. Here are the commands to do so:

#generate the data
java -cp lenses_microservices.jar  io.lenses.topology.example.microservice.PaymentsSimulatorApp PLAINTEXT://Kafka_Broker:Kafka_Broker_Port

#payments service
java -cp lenses_microservices.jar  io.lenses.topology.example.microservice.PaymentsServiceApp PLAINTEXT://Kafka_Broker:Kafka_Broker_Port

#storage service
java -cp lenses_microservices.jar  io.lenses.topology.example.microservice.S3StorageServiceApp PLAINTEXT://Kafka_Broker:Kafka_Broker_Port

#handle suspicious payments
java -cp lenses_microservices.jar  io.lenses.topology.example.microservice.SuspiciousPaymentsServiceApp PLAINTEXT://Kafka_Broker:Kafka_Broker_Port

PaymentsService is an application which consumes but also produces records. Below you can find the code utilized to make it visible in Lenses:

//https://github.com/Landoop/lenses-topology-example/blob/a82ab5fe00922d53203ef6afc31b60dc83998736/lenses-topology-example-microservice/src/main/java/io/lenses/topology/example/microservice/PaymentsService.java#L48
try (TopologyClient topologyClient = createTopology(properties, consumer, producer, suspiciousPaymentsProducer))

...

private TopologyClient createTopology(Properties properties,
                                KafkaConsumer<String, String> consumer,
                                KafkaProducer<String, String> producer,
                                KafkaProducer<String, String> suspiciousPaymentsProducer) throws IOException {

Map<KafkaProducer<?, ?>, List<String>> producersMap = new HashMap<>();
producersMap.put(producer, Collections.singletonList(convertedPaymentsTopic));
producersMap.put(suspiciousPaymentsProducer, Collections.singletonList(suspiciousPaymentsTopic));

Map<KafkaConsumer<?, ?>, List<String>> consumersMap = Collections.singletonMap(consumer, Collections.singletonList(PaymentsTopic));

return MicroserviceTopology.create("PaymentsService", producersMap, consumersMap, properties);
}

StorageAwsService service is an application which only consumes records. To integrate with Lenses, the code is less involved:

//https://github.com/Landoop/lenses-topology-example/blob/a82ab5fe00922d53203ef6afc31b60dc83998736/lenses-topology-example-microservice/src/main/java/io/lenses/topology/example/microservice/S3StorageService.java#L59
private TopologyClient createTopology(Properties properties, KafkaConsumer<String, String> consumer) throws IOException {
    return MicroserviceTopology.fromConsumer("StorageAwsService", consumer, Collections.singletonList(topic), properties);
}