Use asyncio coroutine to run functions in parallel?
I have the following code which read data from database (read_db
) and write the data to parquet file (data.to_parquet
). Both I/O operations take a while to run.
def main():
while id < 1000:
logging.info(f'reading - id: {id}')
data = read_db(id) # returns a dataframe
logging.info(f'saving - id: {id}')
data.to_parquet(f'{id}.parquet')
logging.info(f'saved - id: {id}')
id += 1
It's slow so I want read_db(n+1)
and to_parquet(n)
running concurrently. I need to keep each step of id
finishing sequentially though. Here is the asynchronous version
def async_wrap(f):
@wraps(f)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
p = partial(f, *args, **kwargs)
return await loop.run_in_executor(executor, p)
return run
async def main():
read_db_async = async_wrap(read_db)
while id < 1000:
logging.info(f'reading - id: {id}')
data = await read_db_async(id) # returns a dataframe
logging.info(f'saving - id: {id}')
to_parquet_async = async_wrap(data.to_parquet)
await data.to_parquet(f'{id}.parquet')
logging.info(f'saved - id: {id}')
id += 1
asyncio.get_event_loop().run_until_complete(main())
I excepted to see the some out of order of logs:
reading - id: 1
saving - id: 1 (saving 1 and reading 2 run in parallel)
reading - id: 2
saved - id: 1
saving - id: 2
reading - id: 3
saved - id: 2
.....
But, the actually logs are the same of synchronous code?
reading - id: 1
saving - id: 1
saved - id: 1
reading - id: 2
saving - id: 2
saved - id: 2
reading - id: 3
.....
from Recent Questions - Stack Overflow https://ift.tt/2KRubuR
https://ift.tt/eA8V8J
Comments
Post a Comment