Rui:ThreadLibrary:Executor
http://gauche.svn.sourceforge.net/viewvc/gauche/Gauche-threadlib/trunk/concurrent/executor.scm
<thread-pool-executor>はスレッドプールを抽象化したクラスです。java.util.concurrent.ThreadPoolExecutorをGaucheで実装しました。
<thread-pool-executor>のインスタンスはmakeで作成します。インスタンスの作成時に引数を渡してスレッドプールの構成をカスタマイズすることができます。
(make <thread-pool-executor> :core-pool-size core-pool-size :maximum-pool-size maximum-pool-size :keep-alive-time keep-alive-time :work-queue work-queue :rejected-execution-handler rejected-execution-handler)
- スレッド数: core-pool-sizeがスレッドプールのスレッドの最小数、maximum-pool-sizeが最大数を指定します。<thread-pool-executor>のインスタンスが作成されたときはスレッド数は0です。新しいタスクがサブミットされた時、スレッドプールのスレッドの個数がcore-pool-sizeに達していないなら、必ず新しいスレッドが作成されます。core-pool-sizeに達している場合、すべてのスレッドがビジーでかつスレッド数がmaximum-pool-sizeより小さいなら、新規スレッドが作成されます。
- アイドルなスレッドの終了: スレッドがkeep-alive-timeで指定された時間より長くアイドルになっていて、かつcore-pool-sizeより多くのスレッドが動いている場合、そのスレッドは終了します。
- キューの指定: work-queueは、タスクを保持するブロッキングキューです。キューされるタスクの個数に制限をかけたい場合、保持できる要素の個数に限りのあるキューを与えることができます。
- 拒否されたタスクの処理: キューが一杯の場合、新規のタスクは実行されません。代わりに、rejected-execution-handlerで渡した手続きが、タスクをサブミットしようとしたスレッドで呼ばれます。rejected-execution-handlerはタスクとスレッドプールのインスタンスを受け取ります。
タスクのサブミットはexecutor-submitで行います。
(executor-submit executor thunk)
thunkをラップしたfutureオブジェクトが返ります。返り値にfuture-getを呼ぶと、thunkがスレッドプールで実行されたあと、その値が結果として返ります。
スレッドプールの終了はexecutor-shutdown!、終了処理の完了を待つのはexecutor-await-terminationで行います。
(executor-shutdown! executor) (executor-await-termination executor timeout)
スレッドプールを使って簡単なmapreduceを書いてみましょう。タスクの特性によって結果は異なりますが、マルチコアシステムではタスクを分割して並行に実行することで性能が向上することがあります。
(define (simple-mapreduce nthread mapper reducer lis) (let1 executor (make-thread-pool-executor nthread) (define (submit-task data) (executor-submit executor (lambda () (mapper data)))) (begin0 (reducer (map future-get (map submit-task lis))) (executor-shutdown! executor))))
このsimple-mapreduceは、nthread個のスレッドを持つスレッドプールを作成して、lisの各要素についてmapperを呼びます。mapperはスレッドプールで実行されます。最後にreducerが結果をまとめることが期待されています。