Gauche:イテレータの反転の汎用化

Gauche:イテレータの反転の汎用化

yamasushi(2013/03/26 04:23:30 UTC)イテレータの反転を汎用的にしてみました。

; イテレータ反転
(define-module inverter
  (use gauche.generator)
  (use util.stream)
  ;
  (use util.queue)
  (use gauche.threads)
  ;
  (export
    generator-inverter
    stream-inverter
    mtgenerator-inverter
    mtstream-inverter
    ;
    pipeline
  )
)
(select-module inverter)

;イテレータを反転してgeneratorにする手続きを返す
; produce     ..... yield手続きを引数にもつ手続き
; consume     ..... ジェネレータを引数にもつ手続き
(define (generator-inverter) 
  (^[produce consume] (consume (generate produce) ) ) )

;イテレータを反転してstreamににする手続きを返す
; produce     ..... yield手続きを引数にもつ手続き
; consume     ..... ジェネレータを引数にもつ手続き
(define (stream-inverter) 
  (^[produce consume]
    (consume (iterator->stream 
               (^[next end]
                 (produce (^x (if (eof-object? x) (end) (next x) ) ) ) ) 
               ) ) ) )

(define (mtqueue->generator mtq)
  (generate (^[yield]
              (let loop [(xc (dequeue/wait! mtq))]
                (if (eof-object? xc)
                  (yield (eof-object))
                  (begin
                    (yield xc)
                    (loop (dequeue/wait! mtq) ) ) ) ) ) )
  )

(define (mtqueue->stream mtq)
  (iterator->stream (^[next end]
                      (let loop [(xc (dequeue/wait! mtq))]
                        (if (eof-object? xc)
                          (end)
                          (begin
                            (next xc)
                            (loop (dequeue/wait! mtq) ) ) ) ) ) )
  )

;イテレータを反転してgeneratorなどににする手続き(スレッドを使用)
; mtquere->x  ..... mtqueureからgeneratorなどにする手続き
; produce     ..... yield手続きを引数にもつ手続き
; consume     ..... ジェネレータを引数にもつ手続き
(define (mt-inverter mtqueue->x produce consume :key (queue-size 100))
  (define (make-producer mtq)
    (^ []
      (guard (e [else 
                  (enqueue/wait! mtq (eof-object)) 
                  (print (standard-error-port) (ref e 'message) ) 
                  (raise e) ] )
        (produce (cut enqueue/wait! mtq <>))
        (enqueue/wait! mtq (eof-object))
        ) ) )

  (define (make-consumer mtq producer-thread)
    (^ []
      (guard (e [else 
                  (print (standard-error-port) (ref e 'message) ) 
                  (thread-terminate! producer-thread)
                  (raise e) ]) 
        (consume (mtqueue->x mtq) ) ) ) )    

  (and-let* [ ( mtq (make-mtqueue :max-length queue-size))
             ( p (make-thread (make-producer mtq ) ) )
             ( c (make-thread (make-consumer mtq p ) ) )
             (tc (thread-start! c))
             (tp (thread-start! p)) ]
      (thread-join! tp)
      (thread-join! tc) ; <----consumerの値を返す 
      ) ) 

;イテレータを反転してgeneratorにする手続きを返す(スレッドを使用)
; produce     ..... yield手続きを引数にもつ手続き
; consume     ..... ジェネレータを引数にもつ手続き
(define (mtgenerator-inverter :key (queue-size 100))
  (^[produce consume]
    (mt-inverter mtqueue->generator produce consume :queue-size queue-size) ) )

;イテレータを反転してstreamににする手続きを返す(スレッドを使用)
; produce     ..... yield手続きを引数にもつ手続き
; consume     ..... ジェネレータを引数にもつ手続き
(define (mtstream-inverter :key (queue-size 100))
  (^[produce consume] 
    (mt-inverter mtqueue->stream produce consume :queue-size queue-size) ) )


; cmd-x .... ( g yield ) : g ... source generator , yield .... output
(define (pipeline inverter cmd-in cmd-out)
  (^[ g yield]
    (inverter
      (^[yield]
        (cmd-in g yield) )
      (^g
        (cmd-out g yield) ) ) ) ) 

これを使い、ポートを「反転」します。

; ポートの反転
(define-module port-inverter
  (use gauche.vport)
  (use gauche.generator)
  (use gauche.uvector)
  ;
  (use inverter)
  (export
    mtport-inverter
    ))
(select-module port-inverter)

; :key
;    queue-size  ..... スレッド同期につかうキューサイズ
;    buffer-size ..... データ転送バッファサイズ
; --->
;    [^ (produce consume) ... ]
;       produce .... 出力ポートを引数にもつ手続き(producer)
;       consume .... 入力ポートを引数にもつ手続き(consumer)
(define (mtport-inverter :key (queue-size 100) (buffer-size 8000) )
  (let1 inverter (mtgenerator-inverter :queue-size queue-size)
    (^[produce consume]
      (inverter
        (^[yield]
          (let1 outp (make <buffered-output-port> 
                           :buffer-size buffer-size
                           :flush (^ (u8v flag)
                                    (yield (u8vector-copy u8v))
                                    (uvector-length u8v) ) )
            (produce outp)
            (yield (eof-object))
            ))
        (^g
          (let1 vp (make <buffered-input-port> 
                         :buffer-size buffer-size
                         :fill (^(u8v-dst)
                                 (let1 u8v-src (g)
                                   (if (eof-object? u8v-src)
                                     0
                                     (let [(ndst (uvector-length u8v-dst))
                                           (nsrc (uvector-length u8v-src ))]
                                       (if (>= ndst nsrc )
                                         (begin 
                                           (u8vector-copy! u8v-dst 0 u8v-src ) 
                                           ;#?= u8v-dst
                                           nsrc )
                                         (begin 
                                           ; ここには来ないはずだが念の為
                                           (error "ndst < nsrc") ) ) ) ) ) ) ) 
            (consume vp)
            ) ) ) ) ) )

すると、http-getをcall-with系のようにアクセスできます。

; http-getのパラメタ指定してhttp-get、procに仮想ポートを渡す。
(define (call-with-input-http-get http-param proc :key (queue-size 100) (buffer-size 8000) )
  (let1 inverter (mtport-inverter :queue-size queue-size :buffer-size buffer-size)
    (inverter
      (^[outp] (apply http-get (append http-param `(:sink ,outp :flusher ,(^ _ (flush outp) #t) ))))
      (^[inp]  (proc inp) ) ) ) )
(define call-with-input-http call-with-input-http-get)

; http-postのパラメタ指定してhttp-post、procに仮想ポートを渡す。
(define (call-with-input-http-post http-param proc :key (queue-size 100) (buffer-size 8000) )
  (let1 inverter (mtport-inverter :queue-size queue-size :buffer-size buffer-size)
    (inverter
      (^[outp] (apply http-post (append http-param `(:sink ,outp :flusher ,(^ _ (flush outp) #t) ))))
      (^[inp]  (proc inp) ) ) ) )

Tags: gauche.generator, util.stream

More ...