Tuesday, 6 December 2022

How to use Kafka to send and receive image using python and localhost?

Prerequisites:-

https://hevodata.com/blog/how-to-install-kafka-on-ubuntu/


pip install kafka-python


Producer code

import time
import random
from datetime import datetime
from kafka import KafkaProducer
import cv2
import json
import time
import numpy as np

ktf_host = "localhost:9092"
# Kafka Producer
producer = KafkaProducer(
bootstrap_servers=[ktf_host],
api_version=(0,10,1)
)

if __name__ == '__main__':
# Infinite loop - runs until you kill the program
image = cv2.imread("small.png")
print("image.shape: ", image.shape)
ret, buffer = cv2.imencode('.jpg', image)
while True:
# Send it to our 'messages' topic
print(f'Producing image @ {image.shape}')
t1 = time.time()
jtmp2 = {"data2": "test2"}
jdata = b"!@#$".join([buffer.tobytes(), json.dumps(jtmp2).encode("utf-8")])
producer.send('messages', jdata)
jdata = ""
t2 = time.time()
print("elapsed time (ms): ", (t2-t1)*1000)
# Sleep for a random number of seconds
time_to_sleep = random.randint(1, 3)
time.sleep(time_to_sleep)

consumer code

from io import BytesIO
from PIL import Image
import numpy as np
import cv2
import time
from kafka import KafkaConsumer
import json

if __name__ == '__main__':
# Kafka Consumer
ktf_host = "localhost:9092"
consumer = KafkaConsumer(
'messages',
bootstrap_servers=[ktf_host],
api_version=(0,10,1)
)
for message in consumer:
begin = time.time()
strs = message.value.split(b'!@#$')
jdata = json.loads(strs[1])
print(jdata)
end = time.time()
print("elapsed time (ms): ", (end - begin)*1000)

idata = BytesIO(strs[0])
pil_image = Image.open(idata).convert("RGB")
np_image = np.array(pil_image)
bgr_image = cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR)
rs_image = cv2.resize(bgr_image, (640,640))
print(rs_image.shape)
#cv2.imshow("image", rs_image)
#cv2.waitKey(3)



spawn code:-

import time
import random
from datetime import datetime
from kafka import KafkaProducer
import cv2
import json
import time
import numpy as np
from multiprocessing import Process

ktf_host = "localhost:9092"


def run(camid):
# Kafka Producer
producer = KafkaProducer(
bootstrap_servers=[ktf_host],
api_version=(0,10,1),
buffer_memory=320000000
)
image = cv2.imread("birds.png")
print("image.shape: ", image.shape)
ret, buffer = cv2.imencode('.jpg', image)
while True:
# Send it to our 'messages' topic
# print(f'Producing image @ {image.shape}, {camid}')
t1 = time.time()
jtmp2 = {"data2": camid, "time": str(time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(time.time())))}
print(jtmp2)
jdata = b"!@#$".join([buffer.tobytes(), json.dumps(jtmp2).encode("utf-8")])
producer.send('messages', jdata)
jdata = ""
t2 = time.time()
# print("elapsed time (ms): ", (t2-t1)*1000)
# Sleep for a random number of seconds
time_to_sleep = random.randint(1, 3)
time.sleep(time_to_sleep)

if __name__ == '__main__':
# Infinite loop - runs until you kill the program
# instantiating process with arguments
procs = []
for camid in range(30):
# print(name)
proc = Process(target=run, args=(camid,))
procs.append(proc)
proc.start()

# complete the processes
for proc in procs:
proc.join()

No comments:

Post a Comment