Messages and Events: a look into Event Driven Development
More often than not you wonder how YouTube works. How do they work is a complex topic. One part of it is event driven development where data is reacted to. In this article will learn what messaging queues are and what event streams are then will see a demonstration of a simple message queue producer/consumer that illustrates how things in the real internet happen at large scale.
Will be using Apache Kafka which is a distributed event streaming platform. This article is not about Kafka but about the idea of Kafka. I will use C# to show the demo, any programming language will do.
Message Queues:
What is a message queue. Let’s say you have an ATM, there is a queue of 10 people all lined up to withdraw/deposit cash. Each new person comes and waits, he stops at the end of the queue and moves towards the front until he at the front of the queue. The end of the queue is called the tail and the front is called the head. This is precisely what a message queue is.
A message containing a payload and some metadata is produced by a producer application and placed into the queue. Until a consuming application comes and picks up this message. RabbitMQ is a system which does that. So for example, a user signed up on your website, you then publish a message to the queue and later another program consumes the queue and finds your message and processes the business logic required. These 2 programs maybe written in different languages, one in C# and another in Typescript.
Why use queues in the first place? If a user signs up, might as well persist their details in the database and respond to the client right-away. Yes you could do that. However let’s say you need to issue an Email congratulating the user on signing up and you don’t want to do it in the same program because it takes time to hear back from the Email service, so you publish to a queue. Another example is video compression, do you want a user who uploaded a video to wait doing nothing until a video is compressed or just do it in the background and then notify the user when the compression is done. Plenty of real-world cases exist, from YouTube’s way of uploading videos to the way banks handle transactions as messages.
Ok cool, it takes off the pressure of a program and delegates it to the other or the consumer to be exact. A consumer can even limit the number of messages processed per second and let the queue fill up until each message gets it’s turn. So you get throughput control. You can even have multiple consumers and/or multiple consumers. Maybe you want to batch process the messages once an hour/day and the rest of the time have no consumer. It is a design decision.
An example of a queue for an airline book system below:
When a message is handled it is deleted and that is it.
A topic is like a queue line where people stand. That is, imagine having 4 ATMS, each ATM has it’s own topic (line). We can have a topic named Potato where all messages about that topic will end up, the same way we could have one for Tomato and so on. A topic can publish to subtopics and so on, but here were getting into specific messaging queues platforms.
Some messaging queues have the concept of an exchange where that exchange is like a train track that is split into many. Others have just topics. Here we have the concept of a fan-out, it basically means distribute to all. i.e. an exchange can distribute to all topics subscribed to it.
Keep in mind message queues are not databases but literally messaging queues, queues of data. Are intended for transfer of data from one system to the other.
Event Streams:
An event stream is a message queue plus more features. It is a specialized design philoshopy. It is a reactive paradigm, that is, events are messages that have timely meaning. Here Kafka is an event stream.
A stream is like a conveyer belt of stuff in a factory. Each stuff unit passes through a consumer and the next and the next until it reaches the end of the supply chain where it is done processing. One caveat, a message here stays for a certain period of time until it is gotten rid of or expired. Some messages may get rerouted back to a certain point in the supply chain illustrating the case of failure of data processing.
Event streams are fan-out by design, unlike message queues, data is persisted here which allows for a replay of messages (from first to last). The mindset is different, everything in the stream is an event that occurred at some time. Multiple groups of different consumers can process events at the same time unlike messaging queues. An example is streaming video, each viewer is a consumer, some are watching the start of the stream, some are 2 minutes behind and some are live with the streamer.
Typically real-time systems utilize streams heavilly like in the case of Uber where the following article demonstrates how they use Kafka for disaster recover: https://eng.uber.com/kafka/.
When to use Messaging Queues and Event Streams:
If you are just building a blog, don’t use any of the above unless thousands of new posts are posted per second. Messaging queues and streams introduce complexity and cost.
However, modern serverless solutions excel with queues. For example, I run a narration serverless job when I want AWS Polly to narrate an article in my blog. This is a background job and done on occasions, I do write a post directly to database. Queues have their uses but you need to know when and why to use them. In my case I believe in decoupling software functions, because a narration job may take 1 second or 1 minute, their maybe some network error in process and so I handle it using Amazon Simple Message Queue.
Example that will be building:
We will build a simple student login/logout in a school API that produces a message to Kafka and a consumer picks up that message to do processing. The code is just an illustration of the idea of the message queue. All the source code of the application will be below: https://github.com/Morr0/Kafka-Tutorial-Example
I need to use Apache Zookeeper to manage Kakfa, Kafka can’t work right now without it, however in the near future of this article’s release date, you will be able to use Kafka without it. Zookeeper is essentially what manages Kafka when we are talking about more than 1 Kafka server or partition.
Setting up Kafka:
So there are 2 choices here, either run each program separately or run them together using Docker Compose. I will run them using Docker Compose.
Setting up the dockercompose.yml file:
version: "3"
networks:
kafka:
driver: bridge
Here I am creating a network so Zookeeper can easily run Kafka.
Next, will add the services, which are Kafka and Zookeeper:
services:
zookeeper:
image: bitnami/zookeeper
ports:
- "2181:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- kafka
kafka:
depends_on:
- zookeeper
image: bitnami/kafka
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
networks:
- kafka
Here I am declaring the containers I am using. I am assigning both to be on the same network we created above. Also, we are making sure the kafka container gets run after Zookeeper.
Flexibility of Docker Compose is really good, if I did the same above in the command line alone, it would be hard to debug and see what’s going on. Here there is a template that works the same.
Let’s run the orchestration unit:
docker compose up
Now wait. Will look at the logs of each container and see that no errors have risen. Sweet.
To terminate, I will leave it running:
docker compose down
Next will write the basic producer/consumer in .NET 5.
The producer/consumer:
A producer is 1 project, a consumer is another and arguably a common domain object is a third. Since code is in the same .NET 5 framework, I can afford to use a domain project that shares objects. More on this in the remarks section.
I will create a common domain project so I can deserialize the consumed message.
dotnet new classlib EventStreaming.Domain
Will add the following enum:
namespace EventStreaming.Domain.Models
{
public enum StudentRecordState : byte
{
LoggedIn,
LoggedOut
}
}
And the following model class:
namespace EventStreaming.Domain.Models
{
public class StudentRecordUpdate
{
public string StudentId { get; set; }
public StudentRecordState State { get; set; }
}
}
Since it is going to be shared, will install the [NuGet] package, the offical Kafka client:
dotnet add package Confluent.Kafka
That’s it for the domain project.
Will create the producer, I will make a WebApi project so I can produce based on request:
dotnet new webapp EventStreaming.PublicApi
Then reference the domain project:
dotnet add reference ../EventStreaming.Domain/EventStreaming.Domain.csproj
In the Startup.cs file will add a service for Kafka in the ConfigureServices method:
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
services.AddSingleton(_ => new ProducerBuilder<Null, string>(config).Build());
I added it as a singleton, your use case may vary. Note the generic argument <Null, string>, this is crucial. For each different Kafka message, you should declare the respective service, why? because of the Kafka client. The Null is the type of the message’s key, a key identifies a message, it allows you to better utilize Kafka features such as ordering messages. The actual Null object is part of the Kafka client, you would use string if you needed to use it. Next is the value, this case is string. The value is neccessary unlike the key.
Will create a controller named StudentRecordController to handle the following base path Record:
using System;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using EventStreaming.Domain.Models;
using EventStreaming.PublicApi.Dtos;
using Microsoft.AspNetCore.Mvc;
namespace EventStreaming.PublicApi.Controllers
{
[ApiController]
[Route("Record")]
public class StudentRecordController : ControllerBase
{
private readonly IProducer<Null, string> _producer;
public StudentRecordController(IProducer<Null, string> producer)
{
_producer = producer;
}
[HttpPut("Update")]
public async Task<IActionResult> UpdateStudentRecord([FromBody] StudentRecordUpdateRequest request)
{
var studentRecordUpdate = new StudentRecordUpdate
{
State = Enum.Parse<StudentRecordState>(request.State),
StudentId = request.StudentId
};
string serializedObj = JsonSerializer.Serialize(studentRecordUpdate);
await _producer.ProduceAsync("Student", new Message<Null, string>
{
Value = serializedObj,
Timestamp = new Timestamp(DateTime.UtcNow)
});
return Ok();
}
}
}
Here handling the following endpoint /Record/Update on PUT method. Note I am serializing the object into JSON string. This is important, it illustrates the fact that messages carry data and not objects. That is, if the consumer is in another language such as Node.js, they can parse the string back into JSON object. Obviously you are not limited to JSON, send any string you wish. Not I left the Key property of the Kafka message null because I declared a Null key type above. Also, look at the Student, this is the topic name in Kafka we chose, will create one if doesn’t exist.
That’s it for my producer.
Creating the consumer, will be a console project that keeps polling:
dotnet new console EventStreaming.Consumer
dotnet add reference ../EventStreaming.Domain/EventStreaming.Domain.csproj
The main method will just be:
static void Main(string[] args)
{
Console.WriteLine("Running consumer");
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "1"
};
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
consumer.Subscribe("Student");
while (true)
{
var result = consumer.Consume(5);
if (string.IsNullOrEmpty(result?.Message?.Value)) continue;
var obj = JsonSerializer.Deserialize<StudentRecordUpdate>(result.Message.Value);
Console.WriteLine("Received a message");
Console.WriteLine($"Student Id: {obj.StudentId} updated state to {obj.State.ToString()}");
}
}
I have an infinite loop that keeps polling for messages.
Now all done, any consumer logic can be done on received messages.
Running the producer and consumer (order doesn’t matter) for each project:
dotnet run
Will issue an API call to the producer and expect immediately a log in the console of the consumer. I am using Postman to test the API.
You can always run only a producer and run the consumer later, Kafka will persist the messages in the same order they were received.
Multiple consumers:
There is an interesting Kafka feature that I want you to know, note where the GroupId above in the ConsumerConfig, this means no two consumers in the same group id will get the same message. We obviously don’t have multiple consumers but you could run consumers on different groups and see the effect.
Use cases of streams:
Here I reference some text from the following: https://kafka.apache.org/uses
It outlines where you find streams used:
- Messaging: chatting between friends
- Website Activity Tracking: monitor user flow in website and which pages are popular in the last 30 minutes
- Fitness data streams: people’s heartbeats, inform of an emergency if a suspect pattern occurred in the last 10 minutes
Event Sourcing:
This is a fantastic use case of streams. Imagine a bank, every single transaction has to be stored. Event sourcing is basically just storing changes of state over time unlike database CRUD operations where they store/read current state of bank account for example. I encourage to read about this further at the link below: https://docs.microsoft.com/en-us/azure/architecture/patterns/event-sourcing
Remarks on code:
Above, I chose to have a shared domain project between the producer and consumer projects. Because all my code is in the same framework, I can afford to do that, you may be designing an application that consumes a producer that you have no control of, this is common, what is more common is transferring a common format like JSON across the wire. So the language doesn’t matter as long as you know how to produce or use the data.
General remarks:
Development at low scale is database driven, you could afford to write the database immediately, no complications. Until you scale, you get new users, users that don’t pay for services, notifications and plenty of others, you get into queues and streams. Data becomes everywhere and a lot of it needs care. Batch processing does some jobs, however you want real-time. Not only that you want fraud detection, intrusion detection and real-time security. This is all complex. Well that is where plenty of streams and messaging queues would be appreciative because each part from this paragraph would be decoupled and has it’s own system. Handling of data would be dependent on the system responsible and not the entire ecosystem. In fact, the more complex your ecosystem gets the more messaging queues and event streams will be needed to facilitate responsibilities. What was covered here is part of big distributed systems spanning one or many different geographies.
You can query real-time data as they are streamed.
I did not discuss the implementations. Note that one Kafka server is generally not sufficient, hundreds if not thousands are deployed in YouTube or in other large systems. They are all coordinated by Zookeeper.
Here are some videos I recommend to you from YouTube:
If a general streaming technology like Kafka doesn’t suit you, remember you can always look for competition or roll your own.
Conclusion:
Simple systems expand to more autonomous complex systems over time. The same way a company’s management structure changes from when it was on 10 employees to 100 to 1000. Events produce reactions.
Thanks for reading this.