2023-04-19

Parallel execution in Python (process rabbitmq's messages in parallel)

I have a heavy function. Execution this func takes up a lot of computing resources (it's StableDiffusion model from keras framework, but i don't think it matters much). Also, I have message broker sending data for this function. I want to execute function with every message from broker in a separate process.
Multithreading and asyncio can't provide real parallelism in python. I tried multiprocessing, but programm still works sequentially.

Here is example using multiprocessing:

That heavy function:

def generate_image(message, id):
    model = StableDiffusion(img_height=512, img_width=512, jit_compile=True)
    img = model.text_to_image(
        prompt=message,
        batch_size=1,  # How many images to generate at once
        num_steps=10,  # Number of iterations (controls image quality)
        seed=1539,  # Set this to always get the same image from the same prompt
    )
    image_name = 'images/' + id + '.png'
    Image.fromarray(img[0]).save(image_name)

Function to read RabbitMQ (take it from aio_pika doc: https://aio-pika.readthedocs.io/en/latest/quick-start.html#simple-consumer)

async def main() -> None:
    connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/",)
    queue_name = "requestQueue"

    async with connection:
        # Creating channel
        channel = await connection.channel()
        # Will take no more than 10 messages in advance
        await channel.set_qos(prefetch_count=10)
        # Declaring queue
        queue = await channel.declare_queue(queue_name, auto_delete=False, durable=True)
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    json_str = json.loads(message.body)
                    generate_image(json_str.get('message'), json_str.get('id'))

And my attempt to run the processing process in parallel:

def between_func():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    loop.run_until_complete(main())
    loop.close()


if __name__ == "__main__":
    t1 = Process(target=between_func())
    t2 = Process(target=between_func())
    t1.start()
    t2.start()

How to execute these functions in parallel using multithreading? Or maybe using another instruments?



No comments:

Post a Comment