Prerequisites:-
https://hevodata.com/blog/how-to-install-kafka-on-ubuntu/
pip install kafka-python
Producer code
import time import random from datetime import datetimefrom kafka import KafkaProducerimport cv2import jsonimport timeimport numpy as np
ktf_host = "localhost:9092"# Kafka Producerproducer = KafkaProducer( bootstrap_servers=[ktf_host], api_version=(0,10,1))
if __name__ == '__main__': # Infinite loop - runs until you kill the program image = cv2.imread("small.png") print("image.shape: ", image.shape) ret, buffer = cv2.imencode('.jpg', image) while True: # Send it to our 'messages' topic print(f'Producing image @ {image.shape}') t1 = time.time() jtmp2 = {"data2": "test2"} jdata = b"!@#$".join([buffer.tobytes(), json.dumps(jtmp2).encode("utf-8")]) producer.send('messages', jdata) jdata = "" t2 = time.time() print("elapsed time (ms): ", (t2-t1)*1000) # Sleep for a random number of seconds time_to_sleep = random.randint(1, 3) time.sleep(time_to_sleep)
consumer code
from io import BytesIOfrom PIL import Imageimport numpy as npimport cv2import timefrom kafka import KafkaConsumerimport json
if __name__ == '__main__': # Kafka Consumer ktf_host = "localhost:9092" consumer = KafkaConsumer( 'messages', bootstrap_servers=[ktf_host], api_version=(0,10,1) ) for message in consumer: begin = time.time() strs = message.value.split(b'!@#$') jdata = json.loads(strs[1]) print(jdata) end = time.time() print("elapsed time (ms): ", (end - begin)*1000)
idata = BytesIO(strs[0]) pil_image = Image.open(idata).convert("RGB") np_image = np.array(pil_image) bgr_image = cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR) rs_image = cv2.resize(bgr_image, (640,640)) print(rs_image.shape) #cv2.imshow("image", rs_image) #cv2.waitKey(3)
spawn code:-
import time
import random
from datetime import datetime
from kafka import KafkaProducer
import cv2
import json
import time
import numpy as np
from multiprocessing import Process
ktf_host = "localhost:9092"
def run(camid):
# Kafka Producer
producer = KafkaProducer(
bootstrap_servers=[ktf_host],
api_version=(0,10,1),
buffer_memory=320000000
)
image = cv2.imread("birds.png")
print("image.shape: ", image.shape)
ret, buffer = cv2.imencode('.jpg', image)
while True:
# Send it to our 'messages' topic
# print(f'Producing image @ {image.shape}, {camid}')
t1 = time.time()
jtmp2 = {"data2": camid, "time": str(time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(time.time())))}
print(jtmp2)
jdata = b"!@#$".join([buffer.tobytes(), json.dumps(jtmp2).encode("utf-8")])
producer.send('messages', jdata)
jdata = ""
t2 = time.time()
# print("elapsed time (ms): ", (t2-t1)*1000)
# Sleep for a random number of seconds
time_to_sleep = random.randint(1, 3)
time.sleep(time_to_sleep)
if __name__ == '__main__':
# Infinite loop - runs until you kill the program
# instantiating process with arguments
procs = []
for camid in range(30):
# print(name)
proc = Process(target=run, args=(camid,))
procs.append(proc)
proc.start()
# complete the processes
for proc in procs:
proc.join()
No comments:
Post a Comment