抜粋翻訳 PEP 342: Coroutines via Enhanced Generators

概要

このPEPはジェネレータのAPIと構文を改善して、シンプルなコルーチンとして使えるようにすることを提案する。

動機

コルーチンはいろいろなアルゴリズムを表現する自然な方法である。たとえば、シミュレーション、ゲーム、非同期I/O、そしてイベントドリブンプログラミングや協調的マルチタスクの他の形だ。 Pythonのジェネレータ関数はほとんどコルーチンだ、しかし完全にではない、なぜなら値を作って処理を一時停止することは許すが、処理を再開するときに値や例外を渡す方法がないからだ。

実装例

1. ジェネレータ関数が最初に呼ばれたときに自動的に最初のyieldまで処理が進むようにする “consumer” デコレータ:

def consumer(func):
    def wrapper(*args,**kw):
        gen = func(*args, **kw)
        gen.next()
        return gen
    wrapper.__name__ = func.__name__
    wrapper.__dict__ = func.__dict__
    wrapper.__doc__  = func.__doc__
    return wrapper

2. “consumer”デコレータを使って”reverse generator”を作る。これは、複数の画像を受け取ってサムネイルの並んだページを作り、それを他のcomsumerに送る。このような関数は、それぞれは複雑な内部状態を持ちうるような”consumers”を、効率的な処理パイプラインを作るためにつなぎあわせることができる:

@consumer
def thumbnail_pager(pagesize, thumbsize, destination):
    while True:
        page = new_image(pagesize)
        rows, columns = pagesize / thumbsize
        pending = False
        try:
            for row in xrange(rows):
                for column in xrange(columns):
                    thumb = create_thumbnail((yield), thumbsize)
                    page.write(
                        thumb, col*thumbsize.x, row*thumbsize.y
                    )
                    pending = True
        except GeneratorExit:
            # close() was called, so flush any pending output
            if pending:
                destination.send(page)

            # then close the downstream consumer, and exit
            destination.close()
            return
        else:
            # we finished a page full of thumbnails, so send it
            # downstream and keep on looping
            destination.send(page)

@consumer
def jpeg_writer(dirname):
    fileno = 1
    while True:
        filename = os.path.join(dirname,"page%04d.jpg" % fileno)
        write_jpeg((yield), filename)
        fileno += 1


# Put them together to make a function that makes thumbnail
# pages from a list of images and other parameters.
#
def write_thumbnails(pagesize, thumbsize, images, output_dir):
    pipeline = thumbnail_pager(
        pagesize, thumbsize, jpeg_writer(output_dir)
    )

    for image in images:
        pipeline.send(image)

    pipeline.close()

3. シンプルなコルーチンのスケジューラ、またの名を「トランポリン」とも言う。これは呼びたいコルーチンをyieldすることで、コルーチンが他のコルーチンを”call”できるようにする。 yieldされた値がジェネレータではない場合は、そのコルーチンを”call”したコルーチンに返される同様にコルーチンが例外を投げた場合には、その例外は”call”したコルーチンに伝搬する。結果としてこの例は、yield式をルーチンを起動するために使っている限りにおいて、Stackless Pythonで使われているシンプルなタスクレットのように振舞う。さもなくば”block”する。これはとてもシンプルな例に過ぎない。もっと洗練されたスケジューラも可能である (たとえば既存のPython用GTaskletフレームワーク(http://www.gnome.org/~gjc/gtasklet/gtasklets.html) やpeak.eventsフレームワーク(http://peak.telecommunity.com/)はすでによく似た機能を実装している。しかし現在はジェネレータに値や例外を渡す方法がないために奇妙なワークアラウンドを使っている。):

import collections

class Trampoline:
    """Manage communications between coroutines"""

    running = False

    def __init__(self):
        self.queue = collections.deque()

    def add(self, coroutine):
        """Request that a coroutine be executed"""
        self.schedule(coroutine)

    def run(self):
        result = None
        self.running = True
        try:
            while self.running and self.queue:
                func = self.queue.popleft()
                result = func()
            return result
        finally:
            self.running = False

    def stop(self):
        self.running = False

    def schedule(self, coroutine, stack=(), value=None, *exc):
        def resume():
            try:
                if exc:
                    value = coroutine.throw(value,*exc)
                else:
                    value = coroutine.send(value)
            except:
                if stack:
                    # send the error back to the "caller"
                    self.schedule(
                        stack[0], stack[1], *sys.exc_info()
                    )
                else:
                    # Nothing left in this pseudothread to
                    # handle it, let it propagate to the
                    # run loop
                    raise

            if isinstance(value, types.GeneratorType):
                # Yielded to a specific coroutine, push the
                # current one on the stack, and call the new
                # one with no args
                self.schedule(value, (coroutine,stack))

            elif stack:
                # Yielded a result, pop the stack and send the
                # value to the caller
                self.schedule(stack[0], stack[1], value)

            # else: this pseudothread has ended

        self.queue.append(resume)

4. シンプルなechoサーバとトランポリンを使ってそれを走らせるコード (“nonblocking_read”, “nonblocking_write” と、たとえば接続が閉じられたときにはConnectionLostを投げるようなその他のI/Oコルーチンの存在を仮定している):

# coroutine function that echos data back on a connected
# socket
#
def echo_handler(sock):
    while True:
        try:
            data = yield nonblocking_read(sock)
            yield nonblocking_write(sock, data)
        except ConnectionLost:
            pass  # exit normally if connection lost

# coroutine function that listens for connections on a
# socket, and then launches a service "handler" coroutine
# to service the connection
#
def listen_on(trampoline, sock, handler):
    while True:
        # get the next incoming connection
        connected_socket = yield nonblocking_accept(sock)

        # start another coroutine to handle the connection
        trampoline.add( handler(connected_socket) )

# Create a scheduler to manage all our coroutines
t = Trampoline()

# Create a coroutine instance to run the echo_handler on
# incoming connections
#
server = listen_on(
    t, listening_socket("localhost","echo"), echo_handler
)

# Add the coroutine to the scheduler
t.add(server)

# loop forever, accepting connections and servicing them
# "in parallel"
#
t.run()