抜粋翻訳 PEP 342: Coroutines via Enhanced Generators
概要
このPEPはジェネレータのAPIと構文を改善して、シンプルなコルーチンとして使えるようにすることを提案する。
動機
コルーチンはいろいろなアルゴリズムを表現する自然な方法である。たとえば、シミュレーション、ゲーム、非同期I/O、そしてイベントドリブンプログラミングや協調的マルチタスクの他の形だ。 Pythonのジェネレータ関数はほとんどコルーチンだ、しかし完全にではない、なぜなら値を作って処理を一時停止することは許すが、処理を再開するときに値や例外を渡す方法がないからだ。
実装例
1. ジェネレータ関数が最初に呼ばれたときに自動的に最初のyieldまで処理が進むようにする “consumer” デコレータ:
def consumer(func): def wrapper(*args,**kw): gen = func(*args, **kw) gen.next() return gen wrapper.__name__ = func.__name__ wrapper.__dict__ = func.__dict__ wrapper.__doc__ = func.__doc__ return wrapper
2. “consumer”デコレータを使って”reverse generator”を作る。これは、複数の画像を受け取ってサムネイルの並んだページを作り、それを他のcomsumerに送る。このような関数は、それぞれは複雑な内部状態を持ちうるような”consumers”を、効率的な処理パイプラインを作るためにつなぎあわせることができる:
@consumer def thumbnail_pager(pagesize, thumbsize, destination): while True: page = new_image(pagesize) rows, columns = pagesize / thumbsize pending = False try: for row in xrange(rows): for column in xrange(columns): thumb = create_thumbnail((yield), thumbsize) page.write( thumb, col*thumbsize.x, row*thumbsize.y ) pending = True except GeneratorExit: # close() was called, so flush any pending output if pending: destination.send(page) # then close the downstream consumer, and exit destination.close() return else: # we finished a page full of thumbnails, so send it # downstream and keep on looping destination.send(page) @consumer def jpeg_writer(dirname): fileno = 1 while True: filename = os.path.join(dirname,"page%04d.jpg" % fileno) write_jpeg((yield), filename) fileno += 1 # Put them together to make a function that makes thumbnail # pages from a list of images and other parameters. # def write_thumbnails(pagesize, thumbsize, images, output_dir): pipeline = thumbnail_pager( pagesize, thumbsize, jpeg_writer(output_dir) ) for image in images: pipeline.send(image) pipeline.close()
3. シンプルなコルーチンのスケジューラ、またの名を「トランポリン」とも言う。これは呼びたいコルーチンをyieldすることで、コルーチンが他のコルーチンを”call”できるようにする。 yieldされた値がジェネレータではない場合は、そのコルーチンを”call”したコルーチンに返される同様にコルーチンが例外を投げた場合には、その例外は”call”したコルーチンに伝搬する。結果としてこの例は、yield式をルーチンを起動するために使っている限りにおいて、Stackless Pythonで使われているシンプルなタスクレットのように振舞う。さもなくば”block”する。これはとてもシンプルな例に過ぎない。もっと洗練されたスケジューラも可能である (たとえば既存のPython用GTaskletフレームワーク(http://www.gnome.org/~gjc/gtasklet/gtasklets.html) やpeak.eventsフレームワーク(http://peak.telecommunity.com/)はすでによく似た機能を実装している。しかし現在はジェネレータに値や例外を渡す方法がないために奇妙なワークアラウンドを使っている。):
import collections class Trampoline: """Manage communications between coroutines""" running = False def __init__(self): self.queue = collections.deque() def add(self, coroutine): """Request that a coroutine be executed""" self.schedule(coroutine) def run(self): result = None self.running = True try: while self.running and self.queue: func = self.queue.popleft() result = func() return result finally: self.running = False def stop(self): self.running = False def schedule(self, coroutine, stack=(), value=None, *exc): def resume(): try: if exc: value = coroutine.throw(value,*exc) else: value = coroutine.send(value) except: if stack: # send the error back to the "caller" self.schedule( stack[0], stack[1], *sys.exc_info() ) else: # Nothing left in this pseudothread to # handle it, let it propagate to the # run loop raise if isinstance(value, types.GeneratorType): # Yielded to a specific coroutine, push the # current one on the stack, and call the new # one with no args self.schedule(value, (coroutine,stack)) elif stack: # Yielded a result, pop the stack and send the # value to the caller self.schedule(stack[0], stack[1], value) # else: this pseudothread has ended self.queue.append(resume)
4. シンプルなechoサーバとトランポリンを使ってそれを走らせるコード (“nonblocking_read”, “nonblocking_write” と、たとえば接続が閉じられたときにはConnectionLostを投げるようなその他のI/Oコルーチンの存在を仮定している):
# coroutine function that echos data back on a connected # socket # def echo_handler(sock): while True: try: data = yield nonblocking_read(sock) yield nonblocking_write(sock, data) except ConnectionLost: pass # exit normally if connection lost # coroutine function that listens for connections on a # socket, and then launches a service "handler" coroutine # to service the connection # def listen_on(trampoline, sock, handler): while True: # get the next incoming connection connected_socket = yield nonblocking_accept(sock) # start another coroutine to handle the connection trampoline.add( handler(connected_socket) ) # Create a scheduler to manage all our coroutines t = Trampoline() # Create a coroutine instance to run the echo_handler on # incoming connections # server = listen_on( t, listening_socket("localhost","echo"), echo_handler ) # Add the coroutine to the scheduler t.add(server) # loop forever, accepting connections and servicing them # "in parallel" # t.run()