Async Pyxis GQL Usage
In this page we will give some examples on how to use the asynchronous Pyxis GQL module
(pyxis_gql_async.py
).
General remarks about asynchronous code
To start, let’s take a good analogy from Miguel Grinberg’s 2017 PyCon talk about how asynchronous code works:
Chess master Judit Polgár hosts a chess exhibition in which she plays multiple amateur players. She has two ways of conducting the exhibition: synchronously and asynchronously.
- Assumptions:
24 opponents
Judit makes each chess move in 5 seconds
Opponents each take 55 seconds to make a move
Games average 30 pair-moves (60 moves total)
Synchronous version: Judit plays one game at a time, never two at the same time, until the game is complete. Each game takes (55 + 5) * 30 == 1800 seconds, or 30 minutes. The entire exhibition takes 24 * 30 == 720 minutes, or 12 hours.
Asynchronous version: Judit moves from table to table, making one move at each table. She leaves the table and lets the opponent make their next move during the wait time. One move on all 24 games takes Judit 24 * 5 == 120 seconds, or 2 minutes. The entire exhibition is now cut down to 120 * 30 == 3600 seconds, or just 1 hour.
Source: [RealPython] and [MiguelGrinberg]
Therefore, async code can have a speedup in IO-bound scenarios by “freeing” the execution runtime to do other things while a certain function waits for a response. This is different than parallellizing and threading.
In async code, it is fundamental that the the functions that are being alternated are non-blocking. Otherwise, the runtime will not be able to alternate between them.
When calling async functions, we have to create an async loop, which contains the functions that will be alternated. In the analogy, this is similar to setting up the set of tables and boards for the exhibition. The runtime will alternate between the functions in that context, and then resume usual synchronous execution.
Consequently, async functions cannot be run directly, as the usual synchronous python functions
can. Instead, they need to be awaited inside some async loop. Python reserves a few special
keywords (await
, async with
, async for
) to be used only inside asynchronous functions,
which are created with create async def
.
Finally, if you put only one function inside an async loop, you will make things work as if they were synchronous, and therefore you will lose the async speedup. This would be analogous to organizing an exhibition with just one board for Polgár – it is a standard match.
If you have never used asynchronous code in python, references [RealPython] and [AsyncIO] are good sources.
Usage examples
Basic calls
Usually, when we write async code, we will rely on other async libraries that implement lower-level
functionality. Our code will then have to await
the async functions of those libraries. For
example, we could await a get
request made with aiohttp
:
url = "https://docs.aiohttp.org/en/stable/index.html"
async with aiohttp.ClientSession() as session:
response = await session.get(url)
We can than make an async loop with asyncio.run
:
async def main():
url = "https://docs.aiohttp.org/en/stable/index.html"
async with aiohttp.ClientSession() as session:
response = await session.get(url)
return response
if __name__ == "__main__":
response = asyncio.run(main())
print(f"{response}")
Scheduling tasks and collecting results with gather
We can shedule a batch of tasks to run “together” (in the async sense) and then gather their results
with asyncio.create_task
and asyncio.gather
:
task1 = asyncio.create_task(async_function1(...))
task2 = asyncio.create_task(async_function2(...))
await asyncio.gather(task1, task2)
This is a simple option for when we just need to collect a set of results, without acting on each of them separately.
If you have a variable number of tasks that need to be gathered, you could use a generator:
async for obj in my_async_generator:
task = asyncio.create_task(process_obj(obj))
tasks.append(task)
await asyncio.gather(*tasks)
The discussions in [AsyncFor] are very good to check.
Chaining results
If, on the other hand, you want to process each result as they become available, you can use
asyncio.as_completed
. In this example, there are a few results that we want to ignore, so we
make a conditional aggregation after awaiting each execution:
x_values = [...]
results_to_skip = [...]
collected_results = []
for f in asyncio.as_completed(
[async_function(x) for x in x_values]
):
result = await f
if result not in results_to_skip:
collected_results.append(result)
Again, the discussions in [AsyncFor] are very good to check.
Usual structure of async code
We will usually have the following elements when we use async code:
lower level async functions that implement a given task
async orchestrators, which create an async loop and aggregates several lower level async functions (for example using
asyncio.gather
orasync for
)a synchronous wrapper function, that calls the async orchestrator and integrates it in the context of the synchronous flow (for example with
asyncio.run
)
It might be the case that a given module or package the latter two cases reside in
externally-calling code. For the asynchronous Pyxis GQL module in Freshmaker
(pyxis_gql_async.py
), this is precisely the case: the module itself provides the PyxisAsyncGQL
class with several async functions; it is up to the other modules that will use it
to build the async loop and aggregate those functions in them as needed.
Concrete example for using PyxisAsyncGQL
In this next example, we will call PyxisAsyncGQL.get_repository_by_path
several times, each for
a given path and registry, and aggregate each result conditionally.
async def aggregate_paths():
path_registry_vals:list[tuple[str, str]] = [...]
results_to_skip = [...]
collected_results = []
for f in asyncio.as_completed(
[PyxisAsyncGQL.get_repository_by_path(*x) for x in path_registry_vals]
):
result = await f
if result not in results_to_skip:
collected_results.append(result)
def main():
asyncio.run(aggregate_paths())