【async/await】Python のネイティブコルーチン, 非同期イテレータ, 非同期コンテキストマネージャ

前回は yield from (PEP 380) を用いたジェネレータベースのコルーチンを紹介しました。
今回は Python 3.5 で追加された async/await キーワード を用いたネイティブコルーチンや非同期イテレータ, 非同期コンテキストマネージャについてです。本記事の内容は以下です。

実行環境は macOS 10.13.6, Python 3.6.7, IPython 7.2.0 です。

async/await

async/await キーワード (PEP 492) によるコルーチンはネイティブコルーチン (Native coroutines) と呼ばれ, ジェネレータベースのコルーチンとは明確に区別される。

例として key が名前, value が list of floats のデータで構成される dict から, key ごとのデータのカウントと平均値を求める問題を考える。

In [1]: data = {
    ...:     'girls;kg':
    ...:         [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    ...:     'girls;m':
    ...:         [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    ...:     'boys;kg':
    ...:         [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    ...:     'boys;m':
    ...:         [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
    ...: }

前回書いた yield from を用いたジェネレータベースのコルーチン averager をネイティブコルーチンで書き換える。

averager に __await__() を定義する。__await__() で awaitable プロトコルを実装している限り, 生成されたオブジェクトは awaitable となる。await 式で使えるオブジェクトの場合 inspect.isawaitable() は True を返す。inspect は活動中のオブジェクトの情報を取得する標準ライブラリである。

In [2]: class averager(object):
   ...:     def __await__(self):
   ...:         total = 0.0
   ...:         count = 0
   ...:         average = None
   ...:         while True:
   ...:             term = yield
   ...:             if term is None:
   ...:                 break
   ...:             total += term
   ...:             count += 1
   ...:             average = total/count
   ...:         return (count, average)
   ...:

In [3]: from inspect import iscoroutinefunction, iscoroutine, isawaitable

In [4]: ave = averager()

In [5]: isawaitable(ave)
Out[5]: True

ネイティブコルーチン関数は async def 文で定義する。 コルーチン関数内で await 式を用いることで, 結果が利用可能になるまでコルーチンの実行を停止することが出来る。ネイティブコルーチン関数であるかは inspect.iscoroutinefunction() で確認でき, async def で定義された関数に対して True を返す。

In [6]: async def grouper(results, key):
    ...:     while True:
    ...:         results[key] = await averager()
    ...:

In [7]: iscoroutinefunction(grouper)
Out[7]: True

ネイティブコルーチンを起動する。 async def で生成されたコルーチンに対して inspect.iscoroutine() は True を返す。

In [8]: results = {}

In [9]: for key, values in data.items():
    ...:     group = grouper(results, key)
    ...:     group.send(None)
    ...:     for value in values:
    ...:         group.send(value)
    ...:     group.send(None)
    ...:

In [10]: iscoroutine(group)
Out[10]: True

In [11]: isawaitable(group)
Out[11]: True

In [12]: results
Out[12]:
{'girls;kg': (10, 42.040000000000006),
 'girls;m': (10, 1.4279999999999997),
 'boys;kg': (9, 40.422222222222224),
 'boys;m': (9, 1.3888888888888888)}

ネイティブコルーチンでもジェネレータベースのコルーチンと同様の結果が得られた。

interoperability

ネイティブコルーチンとジェネレータベースのコルーチンで相互運用性が求められる場合がある。その場合, 従来のジェネレータベースのコルーチンに @asyncio.coroutine デコレータを付けることで, awaitable となり async def 内で使用することができる。

In [13]: import types
   ...: @asyncio.coroutine
   ...: def averager():
   ...:     total = 0.0
   ...:     count = 0
   ...:     average = None
   ...:     while True:
   ...:         term = yield
   ...:         if term is None:
   ...:             break
   ...:         total += term
   ...:         count += 1
   ...:         average = total/count
   ...:     return (count, average)

In [14]: ave = averager()

In [15]: isawaitable(ave)
Out[15]: True

続いて, async/await の導入により非同期的に使えるようにサポートされたイテレーションプロトコルとコンテキストマネージャについて。

async for

非同期イテレータ (asynchronous iterator) はイテレータ内で非同期のコードを呼ぶことができる。
非同期イテレータは __aiter__() と __anext__() を定義する必要がある。また, __aiter__() で非同期イテレータオブジェクトを返す必要があり, __anext__() で次の値となる awaitable を返し, さらにイテレートが終了した時に StopAsyncIteration を上げるように実装する必要がある。

class AsyncIterable:
    def __aiter__(self):
        return self

    async def __anext__(self):
        data = await self.fetch_data()
        if data:
            return data
        else:
            raise StopAsyncIteration

async for は非同期イテレータをより簡便に使うための構文。

async for TARGET in ITER:
    BLOCK
else:
    BLOCK2

意味的には以下と等価で, __anext__() でイテレータを進め, 最後に StopAsyncIteration を捕捉する。

iter = (ITER)
iter = type(iter).__aiter__(iter)
running = True
while running:
    try:
        TARGET = await type(iter).__anext__(iter)
    except StopAsyncIteration:
        running = False
    else:
        BLOCK
else:
    BLOCK2

async with

非同期コンテキストマネージャの前にコンテキストマネージャの話をする。

コンテキストマネージャ (context manager) は with (PEP 343) 文の実行時にランタイムコンテキストを定義するオブジェクト。用途としては, リソースに関する環境を制御, 具体的にはリソースのロックとアンロック, ファイルのオープンとクローズなどがある。
with 文はブロックの実行をコンテキストマネージャによって定義されたメソッドでラップするために使われ, try/except/finally によるパターンをカプセル化し表面上のコードから除くことができる。with 文は評価されるとコンテキストマネージャを取得する。

with EXPR as VAR:
    BLOCK

非同期コンテキストマネージャ (asynchronous context manager) は __aenter__() と __aexit__() の中で実行を一時停止することができるコンテキストマネージャ。
__aenter__() にはリソースのセットアップなどコンテキストに入る時の処理, __aexit__() にはリソースの解放などコンテキストから抜ける時の処理を実装する。また, 共に awaitable を返す必要がある。

class AsyncContextManager:
    async def __aenter__(self):
        await log('entering context')

    async def __aexit__(self, exc_type, exc, tb):
        await log('exiting context')

非同期コンテキストマネージャは async with 内で使える。async with の構文は以下となる。

async with EXPR as VAR:
    BLOCK

意味的には以下と等価。

mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__(mgr)

VAR = await aenter
try:
    BLOCK
except:
    if not await aexit(mgr, *sys.exc_info()):
        raise
else:
    await aexit(mgr, None, None, None)

以下が具体例。

In [16]: import asyncio
    ...:
    ...: async def coro(name, lock):
    ...:     print('coro {}: waiting for lock'.format(name))
    ...:     async with lock:
    ...:         print('coro {}: holding the lock'.format(name))
    ...:         await asyncio.sleep(1)
    ...:         print('coro {}: releasing the lock'.format(name))
    ...:

In [17]: loop = asyncio.get_event_loop()

In [18]: lock = asyncio.Lock()

In [19]: coros = asyncio.gather(coro(1, lock), coro(2, lock))

In [20]: loop.run_until_complete(coros)
coro 1: waiting for lock
coro 1: holding the lock
coro 2: waiting for lock
coro 1: releasing the lock
coro 2: holding the lock
coro 2: releasing the lock
Out[21]: [None, None]

In [22]: loop.close()

2 は 1 が lock を解放するのを待ち, 解放されたタイミングで 2 は非同期に lock を獲得する。
loop.run_until_complete() にコルーチンを与えることでイベントループが開始される。 asyncio.gather() は与えられたコルーチンオブジェクトの結果を一つにまとめたフューチャを返す。

おわりに

Python の機能を調べるときは結局, 公式ドキュメントPEP (Python Enhancement Proposal) に辿り着くことが多いです。

[1] タスクとコルーチン
[2] What is the difference between @types.coroutine and @asyncio.coroutine decorators?
[3] Pythonのジェネレータ、コルーチン、ネイティブコルーチン、そしてasync/await
[4] 3.4.4. 非同期コンテキストマネージャ (Asynchronous Context Manager)
[5] 3.3.8. with文とコンテキストマネージャ