【yield】Python のジェネレータとコルーチン

今回は Python のジェネレータとコルーチンについて調べたので備忘録として残しておきます。本記事の内容は以下です。

実行環境は macOS 10.13.6, Python 3.5.1 です。

ジェネレータ

反復はデータ処理の基本となるが, メモリに収まりきらない規模のデータをスキャンする場合, 要素を怠惰に1つずつ取ってくる方法が必要となる。怠惰な実装では値の生成をギリギリまで引き伸ばすことでメモリを節約できる。これを Iterator パターンと言う。

Iterator パターンを抽象化するには Python 2.2 で追加された yield キーワードを使いジェネレータを構築する。yield には「生成する」, 「譲る」という2つの意味がある。イテレータがコレクションから要素を取得するのに対し, ジェネレータは要素を生成する。

ジェネレータはイテレータとして動作する。Python インタプリタはオブジェクト x に対して反復処理するときは組み込み関数 iter(x) を呼びイテレータを取得する。メソッド __iter__ を実装しイテレータを返すオブジェクトはイテラブルとなる。Python のコレクションはすべてイテラブルである。

呼び出されるとジェネレータオブジェクトを返す関数をジェネレータ関数 (PEP 255) と言う。yield キーワードを持つ Python の関数はすべてジェネレータ関数となる。

ジェネレータ関数を呼び出し, ジェネレータオブジェクト g を取得する。

In [1]: def gen_123():
   ...:     yield 1
   ...:     yield 2
   ...:     yield 3
   ...:

In [2]: g = gen_123()

In [3]: next(g)
Out[3]: 1

In [4]: next(g)
Out[4]: 2

In [5]: next(g)
Out[5]: 3

In [6]: next(g)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-114-5f315c5de15b> in <module>()
----> 1 next(g)

StopIteration:

ジェネレータオブジェクト g に対して next() を呼ぶと gen_123() 内の次の yield まで処理が進む。ジェネレータ関数の末尾に到達すると Iterator プロトコルに従って StopIteration を上げる。

次はジェネレータを使ってフィボナッチ数列を計算する。

In [7]: def fibonacci():
   ...:     a, b = 0, 1
   ...:     while True:
   ...:         yield a
   ...:         a, b = b, a + b
   ...:

In [8]: fibonacci()
Out[8]: <generator object fibonacci at 0x1056e07d8>

In [9]: f = fibonacci()

In [10]: f
Out[10]: <generator object fibonacci at 0x1056e0bf8>

In [11]: next(f)
Out[11]: 0

In [12]: next(f)
Out[12]: 1

In [13]: next(f)
Out[13]: 1

In [14]: next(f)
Out[14]: 2

In [15]: next(f)
Out[15]: 3

In [16]: next(f)
Out[16]: 5

In [17]: next(f)
Out[17]: 8

コルーチン

Python 2.5 以降 yield キーワードを式で利用できるようになった。send() を使いジェネレータ関数内の yield 式の値となるデータを送信できる。
これにより, ジェネレータと呼び出し元の間で値を送受することで協調動作するプロシージャとしてのコルーチンとなる。(PEP 342)

In [18]: def simple_coroutine():
   ....:     print('-> coroutine started')
   ....:     x = yield
   ....:     print('-> coroutine received', x)
   ....:

In [19]: coro = simple_coroutine()

In [20]: coro
Out[20]: <generator object simple_coroutine at 0x1056e0ba0>

In [21]: next(coro)
-> coroutine started

In [22]: coro.send(42)
-> coroutine received 42
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-5-a195864a2426> in <module>()
----> 1 coro.send(42)

StopIteration:

ジェネレータオブジェクト coro を取得した時点では yield で待機していないので send() でデータを送信することができない。そのため, next() を呼び出しジェネレータを開始させる。次に send() で 42 という値を送信し, コルーチンにある yield は 42 と評価される。send() も next() 同様に次の yield までジェネレータを進める。コルーチンの末尾に到達すると StopIteration を上げる。

コルーチンの状態

inspect は活動中のオブジェクトの情報を取得する標準ライブラリ。ジェネレータおよびコルーチンは以下のいずれかの状態を取る。

  • GEN_CREATED: 実行開始を待機
  • GEN_RUNNING: 現在実行中
  • GEN_SUSPENDED: yield 式で現在一時停止
  • GEN_CLOSED: 実行完了

inspect.getgeneratorstate() はジェネレータおよびコルーチンの現在の状態を返す。

In [23]: from inspect import getgeneratorstate

In [24]: def simple_coroutine2(a):
   ....:     print('-> coroutine started: a=', a)
   ....:     b = yield a
   ....:     print('-> coroutine received: b=', b)
   ....:     c = yield a + b
   ....:     print('-> coroutine received: c=', c)
   ....:

In [25]: coro2 = simple_coroutine2(14)

In [26]: getgeneratorstate(coro2)
Out[26]: 'GEN_CREATED'

In [27]: next(coro2)
-> coroutine started: a= 14
Out[27]: 14

In [28]: getgeneratorstate(coro2)
Out[28]: 'GEN_SUSPENDED'

In [29]: coro2.send(28)
-> coroutine received: b= 28
Out[29]: 42

In [30]: getgeneratorstate(coro2)
Out[30]: 'GEN_SUSPENDED'

In [31]: coro2.send(99)
-> coroutine received: c= 99
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-39-770f78303c0d> in ()
----> 1 coro2.send(99)

StopIteration:

In [32]: getgeneratorstate(coro2)
Out[32]: 'GEN_CLOSED'

ジェネレータオブジェクトを生成した時点では GEN_CREATED の状態で next() を呼び出すと GEN_SUSPENDED となり, コルーチンの末尾に到達すると GEN_CLOSED となることが確認できた。

平均値を計算するコルーチン

その時点の平均値を計算するコルーチン averager() を書く。

In [33]: def averager():
   ....:     total = 0.0
   ....:     count = 0
   ....:     average = None
   ....:     while True:
   ....:         term = yield average
   ....:         total += term
   ....:         count += 1
   ....:         average = total/count
   ....:

In [34]: avg = averager()

In [35]: next(avg)

In [36]: avg.send(10)
Out[36]: 10.0

In [37]: avg.send(30)
Out[37]: 20.0

In [38]: avg.send(5)
Out[38]: 15.0

上記は while-loop から抜ける術がないのでコルーチンは実行完了状態 (GEN_CLOSED) とならない。
そこで, averager() を最終的な結果を取得できるように, データのカウントと平均値を返すように変更する。averager() に None を送信すると while-loop から抜け StopIteration を上げるが, この例外を捕捉してコルーチンが返す値を取得する。

In [39]: def averager():
   ....:         total = 0.0
   ....:         count = 0
   ....:         average = None
   ....:         while True:
   ....:                 term = yield
   ....:                 if term is None:
   ....:                         break
   ....:                 total += term
   ....:                 count += 1
   ....:                 average = total/count
   ....:         return (count, average)
   ....:

In [40]: avg2 = averager()

In [41]: next(avg2)

In [42]: avg2.send(10)

In [43]: avg2.send(30)

In [44]: avg2.send(5)

In [45]: try:
   ....:     avg2.send(None)
   ....: except StopIteration as e:
   ....:     result = e.value
   ....:

In [46]: result
Out[46]: (3, 15.0)

throw() と close()

Python 2.5 以降のジェネレータオブジェクトは呼び出し元 (caller) から明示的に例外を送信する throw() と close() の2つのメソッドが使える。

例として, DemoException という例外クラスと, この例外をハンドリングするコルーチンを実装する。

In [47]: class DemoException(Exception):
   ....:     """ An Exception type for the demonstration."""
   ....:

In [48]: def demo_exc_handling():
   ....:     print('-> coroutine started')
   ....:     while True:
   ....:         try:
   ....:             x = yield
   ....:         except DemoException:
   ....:             print('DemoException handled.')
   ....:         else:
   ....:             print('-> coroutine started: {!r}'.format(x))
   ....:     raise RuntimeError('This line should never run.')
   ....:

まずは close() を使い実行完了状態とする例。

In [49]: coro3 = demo_exc_handling()

In [50]: next(coro3)
-> coroutine started

In [51]: coro3.send(11)
-> coroutine started: 11

In [52]: coro3.send(22)
-> coroutine started: 22

In [53]: coro3.close()

In [54]: getgeneratorstate(coro3)
Out[54]: 'GEN_CLOSED'

次に throw() を使い DemoException を投げ捕捉する例。 while-loop からは抜けないため一時停止状態である。

In [55]: coro4 = demo_exc_handling()

In [56]: next(coro4)
-> coroutine started

In [57]: coro4.send(11)
-> coroutine started: 11

In [58]: coro4.throw(DemoException)
DemoException handled.

In [59]: getgeneratorstate(coro4)
Out[59]: 'GEN_SUSPENDED'

最後に ZeroDivisionError を投げる例。この例外はハンドリングされず RuntimeError を上げるためコルーチンは実行完了状態となる。

In [60]: coro5 = demo_exc_handling()

In [61]: next(coro5)
-> coroutine started

In [62]: coro5.send(11)
-> coroutine started: 11

In [63]: coro5.throw(ZeroDivisionError)
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
<ipython-input-65-2670fe693047> in <module>()
----> 1 coro5.throw(ZeroDivisionError)

 in demo_exc_handling()
      3     while True:
      4         try:
----> 5             x = yield
      6         except DemoException:
      7             print('DemoException handled.')

ZeroDivisionError:

In [64]: getgeneratorstate(coro5)
Out[64]: 'GEN_CLOSED'

yield from

yield from (PEP 380) は Python 3.3 で追加されたコルーチンの拡張。

  • デリゲーションジェネレータ (delegating generator): yield from <iterable> 式を含むジェネレータ関数
  • サブジェネレータ (subgenerator): yield from 式の <iterable> の部分から取得したジェネレータ

最も外側に位置する呼び出し元 (caller) から最も内側にあるサブジェネレータに対して双方向チャネルを開き, 直接的に値を送信したり返したりできる。yield from に似た構造は他の言語では await と呼ばれることがある。

例として, key が名前, value が list of floats のデータで構成される dict から, key ごとのデータのカウントと平均値を yield from を使い求める。

In [65]: data = {
    'girls;kg':
        [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m':
        [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg':
        [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m':
        [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

以下の grouper() はパイプとして機能するデリゲーションジェネレータである。デリゲーションジェネレータは yield from を使ってサブジェネレータを呼び出す。(そのサブジェネレータがデリゲーションジェネレータとなり連結することもできる。)

In [66]: def grouper(results, key):
   ....:     while True:
   ....:          results[key] = yield from averager()
   ....:

yield from x は任意のイテラブルであるオブジェクト x に対して iter(x) を呼び出し, イテレータを取得する。averager コルーチンの実装は先ほどと同じだが, ここではサブジェネレータとして使われる。

In [67]: results = {}

In [68]: for key, values in data.items():
   ....:     group = grouper(results, key)
   ....:     next(group)
   ....:     for value in values:
   ....:          group.send(value)
   ....:     group.send(None)
   ....:

In [69]: results
Out[69]:
{'boys;kg': (9, 40.422222222222224),
 'boys;m': (9, 1.3888888888888888),
 'girls;kg': (10, 42.040000000000006),
 'girls;m': (10, 1.4279999999999997)}

group は grouper を呼び出すことで得られるジェネレータオブジェクトでコルーチンとして動作する。next(group) を呼ぶとサブジェネレータの averager を呼び出し yield from により一時停止する。group.send(value) で grouper に値を送信するとサブジェネレータの averager の term = yield に行き着く。data の key ごとに全ての値を送信後, group.send(None) で grouper に None を送信することで, その時点の averager をひとつ終了させ一時停止状態 (GEN_SUSPENDED) となり grouper は実行を再開できる状態となる。サブジェネレータ averager が終了しないとデリゲーションジェネレータは yield from で一時停止したままとなる。

このように, yield from は内側の for-loop を置き換える以外にも, 内側のジェネレータを外側のジェネレータの呼び出し元に直接結びつけるチャネルを構築できる。

おわりに

参考書籍は『Fluent Python ―Pythonicな思考とコーディング手法 』(14章 イテラブル、イテレータ、ジェネレータ, 16章 コルーチン) です。

次回は Python 3.5 で追加された async/await キーワード (PEP 492) について紹介する予定です。