Building microservices through Event Driven Architecture part12 : Produce events to Apache KAFKA.
This tutorial is the 12th part of a series : Building microservices through Event Driven Architecture.
The previous step is about building microservices through Event Driven Architecture part11: Continuous Integration
In this tutorial, I will show how to publish events to apache KAFKA.
When a command happen on the client side , then it will produce an event ( ex : PlaceOrderCommand => OrderCreatedEvent). New events are registered as uncommitted events by the aggregateroot and inserted to a append only table (eventstore).
Now I must produce this events to a service bus so that applications subscribed to the service bus can pick the events in order to process them.
On the next steps, I will have a consumer that will pick the events , and index them to a high performance no-sql database that will be used by the Query side of my application as backend database.
Introduction to Apache KAFKA
Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.
Perform advanced streaming data transformations with Apache Spark and Kafka in Azure HDInsight
Installation
Installation of Java SE Development Kit
Go to the following url, download and install java : https://www.oracle.com/fr/java/technologies/javase/javase-jdk8-downloads.html
Installation of Apache Kafka
Got to the following url and dowload then install Kafka : https://kafka.apache.org/downloads
Choose the latest stable release, in my case I choose the scala 2.13 kafka_2.13-2.6.0 version
On the next screen, I choose the suggested mirror to download the binaries.
Download and extract the .tgz archive file to a installation folder ( in my case C:\KAFKADEMO folder on my workstation) .
You should have the following on windows
To verify that the installation is working , go under C:\KAFKADEMO\kafka_2.13-2.6.0\bin\windows location and run the following command :
kafka-topics.bat
add an environment variables
this step is optionnal, you can edit your environnement variables and add your kafka installation folder to the path
Add a folder working_dir and to 2 sub folders zookeeper-data and kafka-data as on the following image
Start Zookeeper
To configure zookeeper, edit the zookeeper.properties file and update the dataDir directory as the following.
Edit C:\KAFKADEMO\kafka_2.13-2.6.0\config\zookeeper.properties
dataDir=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/zookeeper-data
Run the following command to start zookeeper :
zookeeper-server-start.bat config\zookeeper.properties
Start Kafka
To configure Kafka, edit the server.properties file and update the log.dirs directory as the following.
Edit C:\KAFKADEMO\kafka_2.13-2.6.0\config\server.properties
log.dirs=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/kafka-data
Run the following command to start kafka :
kafka-server-start.bat config\server.properties
Create topic
Run the following command to create a topic :
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1
Run the following command to list topics:
kafka-topics –zookeeper 127.0.0.1:2181 –list
Run the following command to describe topics :
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –describe
Run the following command to delete a topic :
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –delete
producer
To create a producer that produce events to an apache kafka topic (eventstream), run the following command :
kafka-console-producer –broker-list 127.0.0.1:9092 –topic eventstream
consumer
To start consuming events produced on topic (eventstream), run the following command :
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream
To start consuming all events produced on topic (eventstream) from the first event, run the following command :
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream –from-beginning
Introduction to Asp.Net Core SignalR
ASP.NET Core SignalR is an open-source library that simplifies adding real-time web functionality to apps. Real-time web functionality enables server-side code to push content to clients instantly.
Good candidates for SignalR:
- Apps that require high frequency updates from the server. Examples are gaming, social networks, voting, auction, maps, and GPS apps.
- Dashboards and monitoring apps. Examples include company dashboards, instant sales updates, or travel alerts.
- Collaborative apps. Whiteboard apps and team meeting software are examples of collaborative apps.
- Apps that require notifications. Social networks, email, chat, games, travel alerts, and many other apps use notifications.
Introduction to ASP.NET Core SignalR
Tutorial: Get started with ASP.NET Core SignalR
What is Azure SignalR Service?
Create a SignalR Hub :
To create a SignalR hub, I define the following interface so as to have a strongly typed hub
Hub Interfaces
- Task OnPublish(T payload);
To get notified when a message is published to the hub
- Task OnPublish(string topic, T payload);
To get notified when a message is published to a specific topic
- Task OnSubscribe(string connectionId, string topic);
To get notified when a client join a specific topic
- Task OnUnSubscribe(string connectionId, string topic);
To get notified when a client leaves a specific topic
The following interface is used to subscribe and publish events
Here is the hub
ISignalRNotifier is the interface that publish and receive messages
Publish events to SignalR hub
When a command happen, it is stored as an event to an eventstore, then the producer could pick the event from the event store and publish it to a service bus. I don’t want it to work like that because I will wonder what event are yet published or not (isPublihed = true/false) and update it accordindly.
So for more flexibility I will introduce a SignalR Hub. So the scenario that I will implement is :
When a command happen, it is stored as an event to a eventstore and then published to a SignalR hub topic. So clients interrested to that topic will get notified and then can process the event. The client can be a service bus, a mobile app, a Single Page Application , etc…
Let us go ahead and publish events to the SignalR hub from the command side of our system.
So I have to update the handle function of LogCorner.EduSync.Speech.Application.UseCases.EventSourcingHandler.cs file and add the following :
_publisher.PublishAsync(Topics.Speech, eventStore);
Create a worker service
Lut us create a worker service and add the following classes
ProducerHostedService
ProducerHostedService is a background service that host ProducerService
A backgroundService is a base class for implementing a long running IHostedService https://docs.microsoft.com/en-us/dotnet/api/microsoft.extensions.hosting.backgroundservice?view=dotnet-plat-ext-3.1&WT.mc_id=DOP-MVP-5003013
ProducerService
ProducerService subscribe to a signalR topic and handle events published on that topic.
It uses IServiceBus to send received events to a service bus topic
ServiceBus
ServiceBus use IServiceBusProvider interface to send messages to a service bus provider. So that I can switch to another service bus provider ( ex : RabbitMq, ect…) without changing implementation.
KafkaClient
KafkaClient send messages to kafka using Confluent.Kafka ( https://www.nuget.org/packages/Confluent.Kafka/ )
Testing
Start zookeeper
zookeeper-server-start.bat config\zookeeper.properties
Start Kafka
kafka-server-start.bat config\server.properties
Start consuming
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream
Start the following projects :
- LogCorner.EduSync.SignalR.Server
- LogCorner.EduSync.Speech.Producer
Start the following project :
- LogCorner.EduSync.Speech.Presentation
Start postman and post a new command
You should see the following output on the comsumer console, consuming the command posted on postman
Code source is available here :
- https://github.com/logcorner/LogCorner.EduSync.Speech.Command/tree/Feature/Task/AddSignalR
- https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/ProduceMessagesTokafka
Thanks for reading, if you have any feedback, feel free to post it
Regards