Parallel execution in Python (process rabbitmq's messages in parallel)

I have a heavy function. Execution this func takes up a lot of computing resources (it's StableDiffusion model from keras framework, but i don't think it matters much). Also, I have message broker sending data for this function. I want to execute function with every message from broker in a separate process.
Multithreading and asyncio can't provide real parallelism in python. I tried multiprocessing, but programm still works sequentially.

Here is example using multiprocessing:

That heavy function:

def generate_image(message, id):
    model = StableDiffusion(img_height=512, img_width=512, jit_compile=True)
    img = model.text_to_image(
        prompt=message,
        batch_size=1,  # How many images to generate at once
        num_steps=10,  # Number of iterations (controls image quality)
        seed=1539,  # Set this to always get the same image from the same prompt
    )
    image_name = 'images/' + id + '.png'
    Image.fromarray(img[0]).save(image_name)

Function to read RabbitMQ (take it from aio_pika doc: https://aio-pika.readthedocs.io/en/latest/quick-start.html#simple-consumer)

async def main() -> None:
    connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/",)
    queue_name = "requestQueue"

    async with connection:
        # Creating channel
        channel = await connection.channel()
        # Will take no more than 10 messages in advance
        await channel.set_qos(prefetch_count=10)
        # Declaring queue
        queue = await channel.declare_queue(queue_name, auto_delete=False, durable=True)
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    json_str = json.loads(message.body)
                    generate_image(json_str.get('message'), json_str.get('id'))

And my attempt to run the processing process in parallel:

def between_func():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    loop.run_until_complete(main())
    loop.close()


if __name__ == "__main__":
    t1 = Process(target=between_func())
    t2 = Process(target=between_func())
    t1.start()
    t2.start()

How to execute these functions in parallel using multithreading? Or maybe using another instruments?



Comments

Popular posts from this blog

Spring Elasticsearch Operations

Object oriented programming concepts (OOPs)

Spring Boot and Vaadin : Filtering rows in Vaadin Grid