Building micro services through Event Driven Architecture part13 : Consume events from Apache KAFKA and project streams into ElasticSearch.
Building microservices through Event Driven Architecture part13 : Read model projection.
This tutorial is the 13th part of a series : Building microservices through Event Driven Architecture.
The previous step is about Building microservices through Event Driven Architecture part12 : Produce events to Apache KAFKA
In this tutorial, I will show how to read streams from KAFKA and project streams into ElasticSearch.
I have to consume messages from KAFKA, the messages that I read from KAFKA are event streams. So I a have to project these streams into a structural representation. Then I will index the projections into ElasticSearch.
So I will build a consumer that subscribes to KAFKA and listen to events. If it receives an event , it will use projections to create a structural presentation of the event. And finally store it to a nosql database ElasticSearch.
Projecting events
In practice, reading thousands of events will take too long, Instead we could precalculate the current state and store it to a nosql database.
The projection can be defined as the current state derived from a sequence of events
I define a base generic class Entity, so each projection will derive from it.
I define a abstract generic class Projection, that takes a list of events and applies them to the concrete class ( SpeechProjection in our case).
SpeechProjection is a class that represent the entity that I want to rebuild its state from the events (SpeechCreatedEvent, SpeechTitleChangedEvent, SpeechDescriptionChangedEvent, SpeechUrlChangedEvent and SpeechTypeChangedEvent ).
So for each event related to the given entity ( speech), I have to apply the event to the entity.
Introduction to ElasticSearch
Elasticsearch is a distributed, RESTful search and analytics engine capable of addressing a growing number of use cases. As the heart of the Elastic Stack, it centrally stores your data for lightning fast search, fine‑tuned relevancy, and powerful analytics that scale with ease. https://www.elastic.co/elasticsearch/
Go to the following link to install elasticsearch : https://www.elastic.co/downloads/elasticsearch
You can verify if the installation is OK by running the following command curl http://localhost:9200/
or Invoke-RestMethod http://localhost:9200
with PowerShell
The following code create a generic repository to connect to elastic search, and perform CRUD operations.
Create a worker service
The ASP.NET Core Worker Service template provides a starting point for writing long running service apps.
We can use worker service to build applications which do not require user interaction or perform periodic and long-running workloads.
I will use Worker Service to build a consumer service that consume events from APACHE KAFKA and index them into ElasticSearch
ConsumerHostedService
ConsumerHostedService is a background service that host ConsumerService
ConsumerService
ConsumerService calls a service bus that receives notifications from a Kafka when a new event is produced .
Service Bus
KafkaClient
The KafkaClient implements ReceiveAsync of IServiceBusProvider. It subscribe to a Kafka topic, so when an event is published to that topic, it notifies a mediator service.
ElasticSearchNotifier implements INotificationHandler. The responsability of this class is to deserialize the input event and index it to elasticsearch.
Testing
Start zookeeper
zookeeper-server-start.bat config\zookeeper.properties
Start Kafka
kafka-server-start.bat config\server.properties
Start ElasticSearch
bin/kibana
http://localhost:5601
Start the following projects :
- LogCorner.EduSync.SignalR.Server
- LogCorner.EduSync.Speech.Producer
- LogCorner.EduSync.Speech.Consumer
Start the following project :
- LogCorner.EduSync.Speech.Presentation
Start Postman and post a new command
You should see the following output on the comsumer console, consuming the command posted on postman
Code source is available here :
- https://github.com/logcorner/LogCorner.EduSync.Speech.Command
- https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/Feature/Task/IndexMessagesToElasticSearch
Thanks for reading, if you have any feedback, feel free to post it
Regards