Monday, 19 June 2023

How to create a kafka service using docker and python?

ref: https://towardsdatascience.com/kafka-docker-python-408baf0e1088


Apache Kafka: Docker Container and examples in Python

How to install Kafka using Docker and produce/consume messages in Python

Apache Kafka is a stream-processing software platform originally developed by LinkedIn, open sourced in early 2011 and currently developed by the Apache Software Foundation. It is written in Scala and Java.

Intro to Streams by Confluent

Key Concepts of Kafka

Kafka is a distributed system that consists of servers and clients.

  • Some servers are called brokers and they form the storage layer. Other servers run Kafka Connect to import and export data as event streams to integrate Kafka with your existing system continuously.
  • On the other hand, clients allow you to create applications that read, write and process streams of events. A client could be a producer or a consumer. A producer writes (produces) events to Kafka while a consumer read and process (consumes) events from Kafka.

Servers and clients communicate via a high-performance TCP network protocol and are fully decoupled and agnostic of each other.

But what is an event? In Kafka, an event is an object that has a key, a value and a timestamp. Optionally, it could have other metadata headers. You can think an event as a record or a message.

One or more events are organized in topics: producers can write messages/events on different topics and consumers can choose to read and process events of one or more topics. In Kafka, you can configure how long events of a topic should be retained, therefore, they can be read whenever needed and are not deleted after consumption.

A consumer cosumes the stream of events of a topic at its own pace and can commit its position (called offset). When we commit the offset we set a pointer to the last record that the consumer has consumed.

From the servers side, topics are partitioned and replicated.

  • A topic is partitioned for scalability reason. Its events are spread over different Kafka brokers. This allows clients to read/write from/to many brokers at the same time.
  • For availability and fault-tolerance every topic can also be replicated. It means that multiple brokers in different datacenters could have a copy of the same data.

For a detailed explanation on how Kafka works, check its official website.

Enough introduction! Let’s see how to install Kafka in order to test our sample Python scripts!

Install Kafka using Docker

As data scientists, we usually find Kafka already installed, configured and ready to be used. For the sake of completeness, in this tutorial, let’s see how to install an instance of Kafka for testing purpose. To this purpose, we are going to use Docker Compose and Git. Please, install them on your system if they are not installed.

Kafka docker repository.
Image by author. Kafka docker repository.

In your working directory, open a terminal and clone the GitHub repository of the docker image for Apache Kafka. Then change the current directory in the repository folder.

git clone https://github.com/wurstmeister/kafka-docker.git 
cd kafka-docker/

Inside kafka-docker, create a text file named docker-compose-expose.yml with the following content (you can use your favourite text editor):

version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "topic_test:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock

Now, you are ready to start the Kafka cluster with:

docker-compose -f docker-compose-expose.yml up

If everything is ok, you should see logs from zookeeper and kafka.

Logs from Zookeeper and Kafka.
Image by author. Logs from Zookeeper and Kafka.

In case you want to stop it, just run

docker-compose stop

in a separate terminal session inside kafka-docker folder.

For a complete guide on Kafka docker’s connectivity, check it’s wiki.

Producer and Consumer in Python

In order to create our first producer/consumer for Kafka in Python, we need to install the Python client.

pip install kafka-python

Then, create a Python file called producer.py with the code below.

from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
for j in range(9999):
print("Iteration", j)
data = {'counter': j}
producer.send('topic_test', value=data)
sleep(0.5)

In the code block above:

  • we have created a KafkaProducer object that connects of our local instance of Kafka;
  • we have defined a way to serialize the data we want to send by trasforming it into a json string and then encoding it to UTF-8;
  • we send an event every 0.5 seconds with topic named “topic_test and the counter of the iteration as data. Instead of the couter, you can send anything.

Now we are ready to start the producer:

python producer.py

The script should print the number of iteration every half second.

[...]
Iteration 2219
Iteration 2220
Iteration 2221
Iteration 2222
[...]

Let’s leave the producer terminal session running and define our consumer in a separate Python file named consumer.py with the following lines of code.

from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
'topic_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
event_data = event.value
# Do whatever you want
print(event_data)
sleep(2)

In the script above we are defining a KafkaConsumer that contacts the server “localhost:9092 ” and is subscribed to the topic “topic_test”. Since in the producer script the message is jsonfied and encoded, here we decode it by using a lambda function in value_deserializer. In addition,

  • auto_offset_reset is a parameter that sets the policy for resetting offsets on OffsetOutOfRange errors; if we set “earliest” then it will move to the oldest available message, if “latest” is set then it will move to the most recent;
  • enable_auto_commit is a boolean parameter that states whether the offset will be periodically committed in the background;
  • group_id is the name of the consumer group to join.

In the loop we print the content of the event consumed every 2 seconds. Instead of printing, we can perfom any task like writing it to a database or performing some real time analysis.

At this point, if we run

python consumer.py

we should receive as output something like:

{'counter': 0}
{'counter': 1}
{'counter': 2}
{'counter': 3}
{'counter': 4}
{'counter': 5}
{'counter': 6}
[...]

The complete documentation of parameters of Python producer/consumer classes can be found here.

Now you are ready to use Kafka in Python!

No comments:

Post a Comment