Home > Software engineering >  Asyncify string joining in Python
Asyncify string joining in Python

Time:01-28

I have the following code snippet which I want to transform into asynchronous code (data tends to be a large Iterable):

transformed_data = (do_some_transformation(d) for d in data)
stacked_jsons = "\n\n".join(json.dumps(t, separators=(",", ":")) for t in transformed_data)

I managed to rewrite the do_some_transformation-function to be async so I can do the following:

transformed_data = (await do_some_transformation(d) for d in data)
async_generator = (json.dumps(event, separators=(",", ":")) async for t in transformed_data)
stacked_jsons = ???

What's the best way to incrementally join the jsons produced by the async generator so that the joining process is also asynchronous? This snippet is part of a larger I/O-bound-application which and has many asynchronous components and thus would profit from asynchifying everything.

CodePudding user response:

More in depth explanation about my comment:

Asyncio is a great tool if your processor has a lot of waiting to do. For example: when you make request to a db over the network, after the request is sent your cpu just does nothing until it gets an answer.

Using the async await syntax you can have your processor execute other tasks while "waiting" for the current one to finish. this does not mean it runs them in parallel. There is only one task running at a time.

In your case (for what i can see) the cpu never waits for something it is constantly running string operations.

if you want to run these operations in parallel you might want to take a look at ProcesPools. This is not bound by a single process and core but will spread the processing over several cores to run it in parallel.

from concurrent.futures import ProcessPoolExecutor

def main():
    with ProcessPoolExecutor() as executor:
        transformed_data = executor.map(do_some_transformation, data) #returns an iterable

    stacked_jsons = "\n\n".join(json.dumps(t, separators=(",", ":")) for t in transformed_data)

if __name__ == '__main__':
    main()

I hope the provided code can help you.

ps. The if __name__ part is required

edit: i saw your comment about 10k dicts, assume you have 8 cores (ignore multithreading) then each process will only transform 1250 dicts, instead of the 10k your main thread does now. These processes run simultaniously and although the performance increase is not linear it should process them a lot faster.

CodePudding user response:

The point of str.join is to transform an entire list at once.1 If items arrive incrementally, it can be advantageous to accumulate them one by one.

async def join(by: str, _items: 'Iterable[str]') -> str:
    result = ""
    async for item in _items:
        if result and by:
            result  = by
        result  = item
    return result

This utility can directly digest the async generator:

stacked_jsons = join("\n\n", (json.dumps(event, separators=(",", ":")) async for t in transformed_data))

The async for loop is sufficient to let the async iterable suspend between items so that other tasks may run. The primary advantage of this approach is that even for very many items, this never stalls the event loop for longer than adding the next item.


When it is know that the data is small enough that str.join runs in adequate time, one can directly convert the data to a list instead and use str.join:

stacked_jsons = "\n\n".join([json.dumps(event, separators=(",", ":")) async for t in transformed_data])

1 Even when joining an iterable, str.join will internally turn it into a list first.

  •  Tags:  
  • Related