7.6. AsyncIO

7.6.1. Rationale

  • Running asynchronously: 3s + 1s + 1s = bit over 3s [execution time]

import asyncio


async def a():
    print('A: started')
    await asyncio.sleep(2)
    print('A: finished')
    return 'a'

async def b():
    print('B: started')
    await asyncio.sleep(1)
    print('B: finished')
    return 'b'

async def c():
    print('C: started')
    await asyncio.sleep(3)
    print('C: finished')
    return 'c'


async def main():
    result = await asyncio.gather(
        a(),
        b(),
        c(),
    )
    print(f'Result: {result}')


if __name__ ==  '__main__':
    asyncio.run(main())

# A: started
# B: started
# C: started
# B: finished
# A: finished
# C: finished
# Result: ['a', 'b', 'c']

7.6.2. Running Program

  • asyncio.run(coro, *, debug=False)

  • Execute the coroutine coro and return the result

  • Takes care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool.

  • Cannot be called when another asyncio event loop is running in the same thread.

  • Always creates a new event loop and closes it at the end.

  • It should be used as a main entry point for asyncio programs, and should ideally only be called once.

import asyncio


async def main():
    await asyncio.sleep(1)
    print('hello')


asyncio.run(main())

7.6.3. Awaitables

  • Object is an awaitable if it can be used in an await expression

  • There are three main types of awaitable objects:

    • coroutines,

    • Tasks,

    • Futures.

7.6.4. Sleeping

  • coroutine asyncio.sleep(delay, result=None)

  • Block for delay seconds.

  • If result is provided, it is returned to the caller when the coroutine completes

import asyncio


async def main():
    result = await asyncio.sleep(1, 'done')
    print(result)


asyncio.run(main())
# done

7.6.5. Coroutines

  • Python coroutines are awaitables

  • Coroutines declared with the async/await syntax is the preferred way of writing asyncio applications. 1

  • Term 'coroutine' can be used for two closely related concepts 1:

    • a coroutine function: an async def function;

    • a coroutine object: an object returned by calling a coroutine function.

  • Python distinguishes between a coroutine function and a coroutine object

  • Write a coroutine function by putting async in front of the def

  • Only a coroutine function can use await, non-coroutine functions cannot.

  • Calling a coroutine function does not execute it, but rather returns a coroutine object. (This is analogous to generator functions - calling them doesn't execute the function, it returns a generator object, which we then use later.)

  • To execute a coroutine object, either:

    • use it in an expression with await in front of it, or

    • use asyncio.run(coroutine_object()), or

    • schedule it with ensure_future() or create_task().

import asyncio


async def work():
    return 'done'


async def main():
    result = await work()
    print(result)


asyncio.run(main())
# done

7.6.6. Tasks

  • asyncio.create_task(coro, *, name=None)

  • Tasks are used to schedule coroutines concurrently

  • Wrap the coro coroutine into a Task and schedule its execution.

  • Return the Task object:

    • can be used to cancel execution

    • can be awaited until it is complete

  • The task is executed in the loop returned by get_running_loop()

  • RuntimeError is raised if there is no running loop in current thread.

  • Tasks are used to run coroutines in event loops.

  • If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future.

  • When the Future is done, the execution of the wrapped coroutine resumes.

  • Use the high-level asyncio.create_task() function to create Tasks.

  • Manual instantiation of Tasks is discouraged.

import asyncio


async def work():
    return 'done'


async def main():
    task = asyncio.create_task(work())
    result = await task
    print(result)


asyncio.run(main())
# done
import asyncio


async def a():
    print(f'A: started')
    await asyncio.sleep(2)
    print(f'A: finished')


async def b():
    print(f'B: started')
    await asyncio.sleep(1)
    print(f'B: finished')


async def c():
    print(f'C: started')
    await asyncio.sleep(3)
    print(f'C: finished')


async def main():
    t1 = asyncio.create_task(a())
    t2 = asyncio.create_task(b())
    t3 = asyncio.create_task(c())
    await t1
    await t2
    await t3


if __name__ == '__main__':
    asyncio.run(main())

# A: started
# B: started
# C: started
# B: finished
# A: finished
# C: finished

Selected Task methods:

  • class asyncio.Task(coro, *, loop=None, name=None) - A Future-like object that runs a Python coroutine. Not thread-safe.

  • method asyncio.Task.cancel(msg=None) - Request the Task to be cancelled. This arranges for a CancelledError exception to be thrown into the wrapped coroutine on the next cycle of the event loop.

  • method asyncio.Task.cancelled() - Return True if the Task is cancelled.

  • method asyncio.Task.done() - Return True if the Task is done.

  • method asyncio.Task.result() - Return the result of the Task. If the result isn't yet available, raise InvalidStateError.

  • method asyncio.Task.exception() - Return the exception of the Task

  • method asyncio.Task.add_done_callback(callback, *, context=None) - Add a callback to be run when the Task is done.

  • method asyncio.Task.remove_done_callback(callback) - Remove callback from the callbacks list.

  • method asyncio.Task.set_name(value) - Set the name of the Task.

  • method asyncio.Task.get_name() - Return the name of the Task.

7.6.7. Futures

  • Low-level awaitable object

  • Represents an eventual result of an asynchronous operation

  • When a Future object is awaited it means that the coroutine will wait until the Future is resolved in some other place

  • Future objects in asyncio are needed to allow callback-based code to be used with async/await.

  • Normally there is no need to create Future objects at the application level code.

7.6.8. Running Tasks Concurrently

  • awaitable asyncio.gather(*aws, return_exceptions=False)

  • Run awaitable objects in the aws sequence concurrently.

  • If any awaitable in aws is a coroutine, it is automatically scheduled as a Task.

  • If all awaitables are completed successfully, the result is an aggregate list of returned values.

  • The order of result values corresponds to the order of awaitables in aws.

  • If return_exceptions is:

    • False (default): the first raised exception is immediately propagated to the task that awaits on gather(). Other awaitables in the aws sequence won't be cancelled and will continue to run.

    • True: exceptions are treated the same as successful results, and aggregated in the result list.

  • If gather() is cancelled, all submitted awaitables (that have not completed yet) are also cancelled.

  • If any Task or Future from the aws sequence is cancelled, it is treated as if it raised CancelledError – the gather() call is not cancelled in this case.

  • This is to prevent the cancellation of one submitted Task/Future to cause other Tasks/Futures to be cancelled.

import asyncio


async def a():
    print(f'A: started')
    await asyncio.sleep(2)
    print(f'A: finished')
    return 'a'

async def b():
    print(f'B: started')
    await asyncio.sleep(1)
    print(f'B: finished')
    return 'b'

async def c():
    print(f'C: started')
    await asyncio.sleep(3)
    print(f'C: finished')
    return 'c'


async def main():
    result = await asyncio.gather(
        a(),
        b(),
        c(),
    )
    print(f'Result: {result}')


if __name__ ==  '__main__':
    asyncio.run(main())

# A: started
# B: started
# C: started
# B: finished
# A: finished
# C: finished
# Result: ['a', 'b', 'c']

7.6.9. Shielding from Cancellation

  • awaitable asyncio.shield(aw)

  • Protect an awaitable object from being cancelled.

import asyncio

async def work():
    return 'done'


async def main():
    try:
        res = await shield(work())
    except CancelledError:
        res = None


asyncio.run(main())

7.6.10. Timeouts

  • coroutine asyncio.wait_for(aw, timeout)

  • Wait for the aw awaitable to complete with a timeout.

  • Timeout can either be None or a float or int number of seconds to wait for.

  • If timeout is None, block until the future completes.

  • If a timeout occurs, it cancels the task and raises asyncio.TimeoutError

  • If the wait is cancelled, the future aw is also cancelled.

import asyncio

HOUR = 3600


async def work():
    await asyncio.sleep(HOUR)
    return 'done'


async def main():
    try:
        await asyncio.wait_for(work(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())
# timeout!

7.6.11. Wait

  • coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

  • Run awaitable objects in the aws iterable concurrently and block until the condition specified by return_when.

  • The aws iterable must not be empty.

  • timeout: float|int if specified, maximum number of seconds to wait before returning.

  • wait() does not cancel the futures when a timeout occurs.

  • return_when indicates when this function should return. It must be one of the following constants:

    • FIRST_COMPLETED - The function will return when any future finishes or is cancelled.

    • FIRST_EXCEPTION - The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.

    • ALL_COMPLETED - The function will return when all futures finish or are cancelled.

done, pending = await asyncio.wait(aws)
  • Does not raise asyncio.TimeoutError

  • Futures or Tasks that aren’t done when the timeout occurs are simply returned in the second set (pending).

import asyncio


async def work():
    return 'done'


async def main():
    task = asyncio.create_task(work())
    done, pending = await asyncio.wait({task})

    if task in done:
        print('work is done')

asyncio.run(main())
# work is done

7.6.12. As Completed

  • asyncio.as_completed(aws, *, timeout=None)

  • Run awaitable objects in the aws iterable concurrently.

  • Return an iterator of coroutines.

  • Each coroutine returned can be awaited to get the earliest next result from the iterable of the remaining awaitables.

  • Raises asyncio.TimeoutError if the timeout occurs before all Futures are done.

import asyncio


async def a():
    print(f'A: started')
    await asyncio.sleep(2)
    print(f'A: finished')
    return 'a'


async def b():
    print(f'B: started')
    await asyncio.sleep(1)
    print(f'B: finished')
    return 'b'


async def c():
    print(f'C: started')
    await asyncio.sleep(3)
    print(f'C: finished')
    return 'c'


async def main():
    work = [a(), b(), c()]
    for coro in asyncio.as_completed(work):
        result = await coro
        print(result)


if __name__ == '__main__':
    asyncio.run(main())

# C: started
# B: started
# A: started
# B: finished
# b
# A: finished
# a
# C: finished
# c

7.6.13. Running in Threads

  • coroutine asyncio.to_thread(func, /, *args, **kwargs)

  • Asynchronously run function func in a separate thread.

  • Any *args and **kwargs supplied for this function are directly passed to func.

  • Return a coroutine that can be awaited to get the eventual result of func.

  • This coroutine function is intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if they were ran in the main thread.

import asyncio
import time


def work():
    print(f'Work started {time.strftime("%X")}')
    time.sleep(2)  # Blocking
    print(f'Work done at {time.strftime("%X")}')


async def main():
    print(f'Started main at {time.strftime("%X")}')

    await asyncio.gather(
        asyncio.to_thread(work),
        asyncio.sleep(1))

    print(f'Finished main at {time.strftime("%X")}')


asyncio.run(main())
# Started main at 02:42:45
# Work started 02:42:45
# Work done at 02:42:47
# Finished main at 02:42:47

Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don’t have one, asyncio.to_thread() can also be used for CPU-bound functions.

7.6.14. Introspection

  • asyncio.current_task(loop=None) - Return the currently running Task instance, or None if no task is running.

  • asyncio.all_tasks(loop=None) - Return a set of not yet finished Task objects run by the loop.

  • If loop is None, get_running_loop() is used for getting current loop.

7.6.15. Event loops

Async code can only run inside an event loop. The event loop is the driver code that manages the cooperative multitasking. You can create multiple threads and run different event loops in each of them. For example, Django uses the main thread to wait for incoming requests, so we can’t run an asyncio event loop there, but we can start a separate worker thread for our event loop. 2

An event loop runs in a thread (typically the main thread) and executes all callbacks and Tasks in its thread. While a Task is running in the event loop, no other Tasks can run in the same thread. When a Task executes an await expression, the running Task gets suspended, and the event loop executes the next Task. 3

import asyncio


async def work(*args, **kwargs):
    # do stuff...
    return result


result = asyncio.run(work(1, 2, 3))

Since Python 3.7 there is asyncio.run(). Before you had to get_event_loop() and then run_until_complete():

import asyncio


async def a():
    print(f'A: started')
    await asyncio.sleep(2)
    print(f'A: finished')


async def b():
    print(f'B: started')
    await asyncio.sleep(1)
    print(f'B: finished')


async def c():
    print(f'C: started')
    await asyncio.sleep(3)
    print(f'C: finished')


async def main():
    await asyncio.gather(
        a(),
        b(),
        c(),
    )


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

# A: started
# B: started
# C: started
# B: finished
# A: finished
# C: finished

7.6.16. Queue

  • asyncio queues are designed to be similar to classes of the queue module.

  • Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.

  • Note that methods of asyncio queues don’t have a timeout parameter; use`` asyncio.wait_for()`` function to do queue operations with a timeout.

FIFO Queue (first in, first out):

  • class asyncio.Queue(maxsize=0)

  • If maxsize is less than or equal to zero, the queue size is infinite.

  • Unlike the standard library threading queue, the size of the queue is always known and can be returned by calling the qsize() method.

  • maxsize - Number of items allowed in the queue.

  • empty() - Return True if the queue is empty, False otherwise.

  • full() - Return True if there are maxsize items in the queue.

  • coroutine get() - Remove and return an item from the queue. If queue is empty, wait until an item is available.

  • get_nowait() - Return an item if one is immediately available, else raise QueueEmpty.

  • coroutine join() - Block until all items in the queue have been received and processed.

  • coroutine put(item) - Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item.

  • put_nowait(item) - Put an item into the queue without blocking. If no free slot is immediately available, raise QueueFull.

  • qsize() - Return the number of items in the queue.

  • task_done() - Indicate that a formerly enqueued task is complete.

Priority Queue:

  • class asyncio.PriorityQueue

  • Retrieves entries in priority order (lowest first).

  • Entries are typically tuples of the form (priority_number, data).

LIFO Queue (last in, first out):
  • class asyncio.LifoQueue

  • Retrieves most recently added entries first.

Exceptions:

  • exception asyncio.QueueEmpty - Raised when get_nowait() method is called on an empty queue.

  • exception asyncio.QueueFull - Raised when put_nowait() method is called on a queue that has reached its maxsize.

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())
# worker-0 has slept for 0.26 seconds
# worker-0 has slept for 0.41 seconds
# worker-1 has slept for 0.89 seconds
# worker-2 has slept for 0.98 seconds
# worker-0 has slept for 0.59 seconds
# worker-0 has slept for 0.09 seconds
# worker-0 has slept for 0.11 seconds
# worker-2 has slept for 0.53 seconds
# worker-1 has slept for 0.91 seconds
# worker-1 has slept for 0.21 seconds
# worker-0 has slept for 0.87 seconds
# worker-2 has slept for 0.86 seconds
# worker-2 has slept for 0.11 seconds
# worker-2 has slept for 0.23 seconds
# worker-0 has slept for 0.53 seconds
# worker-1 has slept for 0.89 seconds
# worker-0 has slept for 0.53 seconds
# worker-0 has slept for 0.10 seconds
# worker-2 has slept for 0.86 seconds
# worker-1 has slept for 0.82 seconds
# ====
# 3 workers slept in parallel for 3.74 seconds
# total expected sleep time: 10.79 seconds

7.6.17. Streams

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()
    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')
    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))
import asyncio


async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print(f"Received {message!r} from {addr!r}")
    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()
    print("Close the connection")
    writer.close()


async def main():
    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')
    async with server:
        await server.serve_forever()

asyncio.run(main())

7.6.18. Synchronization Primitives

Mutex Lock:

  • Class asyncio.Lock()

  • Can be used to guarantee exclusive access to a shared resource

  • Not thread-safe.

lock = asyncio.Lock()

async with lock:
    # access shared state

Condition object:

  • class asyncio.Condition(lock=None)

  • Not thread-safe.

cond = asyncio.Condition()

async with cond:
    await cond.wait()

Semaphore:

  • class asyncio.Semaphore(value=1)

  • Manages an internal counter which is decremented by each acquire() call and incremented by each release() call.

  • The counter can never go below zero.

  • When acquire() finds that it is zero, it blocks, waiting until some task calls release().

sem = asyncio.Semaphore(10)

async with sem:
    # work with shared resource

Event:

  • class asyncio.Event()

  • Can be used to notify multiple asyncio tasks that some event has happened.

  • coroutine wait() - Wait until the event is set. If the event is set, return True immediately. Otherwise block until another task calls set().

  • set() - Set the event. All tasks waiting for event to be set will be immediately awakened.

  • clear() - Clear (unset) the event. Tasks awaiting on wait() will now block until the set() method is called again.

  • is_set() - Return True if the event is set.

import asyncio


async def listener(event):
    print(f'Waiting for event')
    await event.wait()
    print(f'Event processed')


async def main():
    myevent = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    handler = asyncio.create_task(listener(myevent))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    myevent.set()

    # Wait until processing is complete
    await handler


asyncio.run(main())
# Waiting for event
# Event processed

7.6.19. Debug

  • By default asyncio runs in production mode.

  • Asyncio has a debug mode which can be enabled by:

    • Setting the PYTHONASYNCIODEBUG environment variable to 1.

    • Using the Python Development Mode.

    • Passing debug=True to asyncio.run().

    • Calling loop.set_debug().

  • In addition to enabling the debug mode, consider also:

    • setting the log level of the asyncio logger to logging.basicConfig(level=logging.DEBUG)

    • configuring the warnings module to display ResourceWarning warnings. One way of doing that is by using the -W default command line option.

  • When the debug mode is enabled:

    • asyncio checks for coroutines that were not awaited and logs them; this mitigates the 'forgotten await' pitfall.

    • Many non-threadsafe asyncio APIs (such as loop.call_soon() and loop.call_at() methods) raise an exception if they are called from a wrong thread.

    • The execution time of the I/O selector is logged if it takes too long to perform an I/O operation.

    • Callbacks taking longer than 100ms are logged.

    • The loop.slow_callback_duration attribute can be used to set the minimum execution duration in seconds that is considered 'slow'.

7.6.20. References

1(1,2)

https://docs.python.org/3/library/asyncio-task.html

2

https://cheat.readthedocs.io/en/latest/python/asyncio.html

3

https://docs.python.org/3/library/asyncio-dev.html#concurrency-and-multithreading