Parallel execution in Python (process rabbitmq's messages in parallel)
I have a heavy function. Execution this func takes up a lot of computing resources (it's StableDiffusion model from keras framework, but i don't think it matters much). Also, I have message broker sending data for this function. I want to execute function with every message from broker in a separate process.
Multithreading and asyncio can't provide real parallelism in python. I tried multiprocessing, but programm still works sequentially.
Here is example using multiprocessing:
That heavy function:
def generate_image(message, id):
model = StableDiffusion(img_height=512, img_width=512, jit_compile=True)
img = model.text_to_image(
prompt=message,
batch_size=1, # How many images to generate at once
num_steps=10, # Number of iterations (controls image quality)
seed=1539, # Set this to always get the same image from the same prompt
)
image_name = 'images/' + id + '.png'
Image.fromarray(img[0]).save(image_name)
Function to read RabbitMQ (take it from aio_pika doc: https://aio-pika.readthedocs.io/en/latest/quick-start.html#simple-consumer)
async def main() -> None:
connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/",)
queue_name = "requestQueue"
async with connection:
# Creating channel
channel = await connection.channel()
# Will take no more than 10 messages in advance
await channel.set_qos(prefetch_count=10)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=False, durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
json_str = json.loads(message.body)
generate_image(json_str.get('message'), json_str.get('id'))
And my attempt to run the processing process in parallel:
def between_func():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
loop.close()
if __name__ == "__main__":
t1 = Process(target=between_func())
t2 = Process(target=between_func())
t1.start()
t2.start()
How to execute these functions in parallel using multithreading? Or maybe using another instruments?
Comments
Post a Comment