前回は yield from (PEP 380) を用いたジェネレータベースのコルーチンを紹介しました。
今回は Python 3.5 で追加された async/await キーワード を用いたネイティブコルーチンや非同期イテレータ, 非同期コンテキストマネージャについてです。本記事の内容は以下です。
実行環境は macOS 10.13.6, Python 3.6.7, IPython 7.2.0 です。
async/await
async/await キーワード (PEP 492) によるコルーチンはネイティブコルーチン (Native coroutines) と呼ばれ, ジェネレータベースのコルーチンとは明確に区別される。
例として key が名前, value が list of floats のデータで構成される dict から, key ごとのデータのカウントと平均値を求める問題を考える。
In [1]: data = {
...: 'girls;kg':
...: [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
...: 'girls;m':
...: [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
...: 'boys;kg':
...: [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
...: 'boys;m':
...: [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
...: }
前回書いた yield from を用いたジェネレータベースのコルーチン averager をネイティブコルーチンで書き換える。
averager に __await__() を定義する。__await__() で awaitable プロトコルを実装している限り, 生成されたオブジェクトは awaitable となる。await 式で使えるオブジェクトの場合 inspect.isawaitable() は True を返す。inspect は活動中のオブジェクトの情報を取得する標準ライブラリである。
In [2]: class averager(object):
...: def __await__(self):
...: total = 0.0
...: count = 0
...: average = None
...: while True:
...: term = yield
...: if term is None:
...: break
...: total += term
...: count += 1
...: average = total/count
...: return (count, average)
...:
In [3]: from inspect import iscoroutinefunction, iscoroutine, isawaitable
In [4]: ave = averager()
In [5]: isawaitable(ave)
Out[5]: True
ネイティブコルーチン関数は async def 文で定義する。 コルーチン関数内で await 式を用いることで, 結果が利用可能になるまでコルーチンの実行を停止することが出来る。ネイティブコルーチン関数であるかは inspect.iscoroutinefunction() で確認でき, async def で定義された関数に対して True を返す。
In [6]: async def grouper(results, key):
...: while True:
...: results[key] = await averager()
...:
In [7]: iscoroutinefunction(grouper)
Out[7]: True
ネイティブコルーチンを起動する。 async def で生成されたコルーチンに対して inspect.iscoroutine() は True を返す。
In [8]: results = {}
In [9]: for key, values in data.items():
...: group = grouper(results, key)
...: group.send(None)
...: for value in values:
...: group.send(value)
...: group.send(None)
...:
In [10]: iscoroutine(group)
Out[10]: True
In [11]: isawaitable(group)
Out[11]: True
In [12]: results
Out[12]:
{'girls;kg': (10, 42.040000000000006),
'girls;m': (10, 1.4279999999999997),
'boys;kg': (9, 40.422222222222224),
'boys;m': (9, 1.3888888888888888)}
ネイティブコルーチンでもジェネレータベースのコルーチンと同様の結果が得られた。
interoperability
ネイティブコルーチンとジェネレータベースのコルーチンで相互運用性が求められる場合がある。その場合, 従来のジェネレータベースのコルーチンに @asyncio.coroutine デコレータを付けることで, awaitable となり async def 内で使用することができる。
In [13]: import types
...: @asyncio.coroutine
...: def averager():
...: total = 0.0
...: count = 0
...: average = None
...: while True:
...: term = yield
...: if term is None:
...: break
...: total += term
...: count += 1
...: average = total/count
...: return (count, average)
In [14]: ave = averager()
In [15]: isawaitable(ave)
Out[15]: True
続いて, async/await の導入により非同期的に使えるようにサポートされたイテレーションプロトコルとコンテキストマネージャについて。
async for
非同期イテレータ (asynchronous iterator) はイテレータ内で非同期のコードを呼ぶことができる。
非同期イテレータは __aiter__() と __anext__() を定義する必要がある。また, __aiter__() で非同期イテレータオブジェクトを返す必要があり, __anext__() で次の値となる awaitable を返し, さらにイテレートが終了した時に StopAsyncIteration を上げるように実装する必要がある。
class AsyncIterable:
def __aiter__(self):
return self
async def __anext__(self):
data = await self.fetch_data()
if data:
return data
else:
raise StopAsyncIteration
async for は非同期イテレータをより簡便に使うための構文。
async for TARGET in ITER:
BLOCK
else:
BLOCK2
意味的には以下と等価で, __anext__() でイテレータを進め, 最後に StopAsyncIteration を捕捉する。
iter = (ITER)
iter = type(iter).__aiter__(iter)
running = True
while running:
try:
TARGET = await type(iter).__anext__(iter)
except StopAsyncIteration:
running = False
else:
BLOCK
else:
BLOCK2
async with
非同期コンテキストマネージャの前にコンテキストマネージャの話をする。
コンテキストマネージャ (context manager) は with (PEP 343) 文の実行時にランタイムコンテキストを定義するオブジェクト。用途としては, リソースに関する環境を制御, 具体的にはリソースのロックとアンロック, ファイルのオープンとクローズなどがある。
with 文はブロックの実行をコンテキストマネージャによって定義されたメソッドでラップするために使われ, try/except/finally によるパターンをカプセル化し表面上のコードから除くことができる。with 文は評価されるとコンテキストマネージャを取得する。
with EXPR as VAR:
BLOCK
非同期コンテキストマネージャ (asynchronous context manager) は __aenter__() と __aexit__() の中で実行を一時停止することができるコンテキストマネージャ。
__aenter__() にはリソースのセットアップなどコンテキストに入る時の処理, __aexit__() にはリソースの解放などコンテキストから抜ける時の処理を実装する。また, 共に awaitable を返す必要がある。
class AsyncContextManager:
async def __aenter__(self):
await log('entering context')
async def __aexit__(self, exc_type, exc, tb):
await log('exiting context')
非同期コンテキストマネージャは async with 内で使える。async with の構文は以下となる。
async with EXPR as VAR:
BLOCK
意味的には以下と等価。
mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__(mgr)
VAR = await aenter
try:
BLOCK
except:
if not await aexit(mgr, *sys.exc_info()):
raise
else:
await aexit(mgr, None, None, None)
以下が具体例。
In [16]: import asyncio
...:
...: async def coro(name, lock):
...: print('coro {}: waiting for lock'.format(name))
...: async with lock:
...: print('coro {}: holding the lock'.format(name))
...: await asyncio.sleep(1)
...: print('coro {}: releasing the lock'.format(name))
...:
In [17]: loop = asyncio.get_event_loop()
In [18]: lock = asyncio.Lock()
In [19]: coros = asyncio.gather(coro(1, lock), coro(2, lock))
In [20]: loop.run_until_complete(coros)
coro 1: waiting for lock
coro 1: holding the lock
coro 2: waiting for lock
coro 1: releasing the lock
coro 2: holding the lock
coro 2: releasing the lock
Out[21]: [None, None]
In [22]: loop.close()
2 は 1 が lock を解放するのを待ち, 解放されたタイミングで 2 は非同期に lock を獲得する。
loop.run_until_complete() にコルーチンを与えることでイベントループが開始される。 asyncio.gather() は与えられたコルーチンオブジェクトの結果を一つにまとめたフューチャを返す。
おわりに
Python の機能を調べるときは結局, 公式ドキュメントや PEP (Python Enhancement Proposal) に辿り着くことが多いです。
[1] タスクとコルーチン
[2] What is the difference between @types.coroutine and @asyncio.coroutine decorators?
[3] Pythonのジェネレータ、コルーチン、ネイティブコルーチン、そしてasync/await
[4] 3.4.4. 非同期コンテキストマネージャ (Asynchronous Context Manager)
[5] 3.3.8. with文とコンテキストマネージャ