Prerequisites:-
https://hevodata.com/blog/how-to-install-kafka-on-ubuntu/
pip install kafka-python
Producer code
import time import random from datetime import datetimefrom kafka import KafkaProducerimport cv2import jsonimport timeimport numpy as np
ktf_host = "localhost:9092"# Kafka Producerproducer = KafkaProducer(    bootstrap_servers=[ktf_host],    api_version=(0,10,1))
if __name__ == '__main__':    # Infinite loop - runs until you kill the program    image = cv2.imread("small.png")    print("image.shape: ", image.shape)    ret, buffer = cv2.imencode('.jpg', image)    while True:                # Send it to our 'messages' topic        print(f'Producing image @ {image.shape}')        t1 = time.time()        jtmp2 = {"data2": "test2"}        jdata = b"!@#$".join([buffer.tobytes(), json.dumps(jtmp2).encode("utf-8")])        producer.send('messages', jdata)        jdata = ""        t2 = time.time()        print("elapsed time (ms): ", (t2-t1)*1000)                # Sleep for a random number of seconds        time_to_sleep = random.randint(1, 3)        time.sleep(time_to_sleep)
consumer code
from io import BytesIOfrom PIL import Imageimport numpy as npimport cv2import timefrom kafka import KafkaConsumerimport json
if __name__ == '__main__':    # Kafka Consumer     ktf_host = "localhost:9092"    consumer = KafkaConsumer(        'messages',        bootstrap_servers=[ktf_host],        api_version=(0,10,1)    )    for message in consumer:        begin = time.time()        strs = message.value.split(b'!@#$')         jdata = json.loads(strs[1])        print(jdata)        end = time.time()        print("elapsed time (ms): ", (end - begin)*1000)
        idata = BytesIO(strs[0])        pil_image = Image.open(idata).convert("RGB")        np_image = np.array(pil_image)         bgr_image = cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR)        rs_image = cv2.resize(bgr_image, (640,640))        print(rs_image.shape)        #cv2.imshow("image", rs_image)        #cv2.waitKey(3)
spawn code:-
import time 
import random 
from datetime import datetime
from kafka import KafkaProducer
import cv2
import json
import time
import numpy as np
from multiprocessing import Process
ktf_host = "localhost:9092"
def run(camid):
    # Kafka Producer
    producer = KafkaProducer(
        bootstrap_servers=[ktf_host],
        api_version=(0,10,1),
        buffer_memory=320000000
    )
    image = cv2.imread("birds.png")
    print("image.shape: ", image.shape)
    ret, buffer = cv2.imencode('.jpg', image)
    while True:        
        # Send it to our 'messages' topic
        # print(f'Producing image @ {image.shape}, {camid}')
        t1 = time.time()
        jtmp2 = {"data2": camid, "time": str(time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(time.time())))}
        print(jtmp2)
        jdata = b"!@#$".join([buffer.tobytes(), json.dumps(jtmp2).encode("utf-8")])
        producer.send('messages', jdata)
        jdata = ""
        t2 = time.time()
        # print("elapsed time (ms): ", (t2-t1)*1000)
        # Sleep for a random number of seconds
        time_to_sleep = random.randint(1, 3)
        time.sleep(time_to_sleep)
if __name__ == '__main__':
    # Infinite loop - runs until you kill the program
    # instantiating process with arguments
    procs = []
    for camid in range(30):
        # print(name)
        proc = Process(target=run, args=(camid,))
        procs.append(proc)
        proc.start()
    # complete the processes
    for proc in procs:
        proc.join()
 
No comments:
Post a Comment