For Development HEAD DRAFTSearch (procedure/syntax/module):

12.9 control.plumbing - Plumbing ports

Module: control.plumbing

This module provides utilities to use ports as communication channles between threads. Although some use cases don’t involve threads, the general mechanism implicitly use threads. For example, a pump connects an input port and an output port, making the data available from the input port to be written to the output port. It spawns a thread to read from the given input.

With this module, you can construct a system that takes data from ports, and delivers it to other ports. It is like you construct plumbing system with multiple inlets and outlets; hence we named it plumbing.

See make-plumbing below for the complete specification of plumbing.

Plumbing system

Function: make-plumbing

{control.plumbing} Creates and returns a new plumbing.

To function, a plumbing need to have one or more inlets and one or more outlets. Data flowing in from inlets will flow out to outlets.

Each inlet/outlet port can be either an input port or an output port; so we’ll have the following four cases:

Inlet input port

An input port that provides data from a producer. A dedicated thread is spanwed to read and handle the data from this port.

Inlet output port

An output port, to which a producer can write data. Data written to this port will be available to outlet ports.

Outlet input port

An input port from which a consumer can read data. It has an internal buffer to keep data flowed in from inlets.

Outlet output port

An output port to which the data will be written.

All read/write operations are mutexed, so you can use a plumbing as a communication channel between threads.

The following example connects subprocess’s stdout and stderr into the current output port. Both output of the subprocess is automatically read from the pipe and sent to the current output port as produced.

(use gauche.process)
(use control.plumbing)

(let ([p (run-process '(ping "127.0.0.1") :output :pipe :error :pipe)]
      [plumbing (make-plumbing)])
  (add-inlet-input-port! plumbing (process-output p))
  (add-inlet-input-port! plumbing (process-error p))
  (add-outlet-output-port! plumbing (current-output-port)))

For typical plumbing configurations, there are utility procedures defined so that you don’t need to connect pots manually; see “Preconfigured plumbings” section below.

Function: plumbing? obj

{control.plumbing} Returns #t if obj is a plumbing, #f if not.

Function: plumbing-inlet-ports plumbing
Function: plumbing-outlet-ports plumbing

{control.plumbing} Returns a list of ports connected to the plumbing’s inlets and outlets, respectively.

Function: plumbing-get-port plumbing name

{control.plumbing} The name argument must be a symbol. Returns a port connected to plumbing and with name. Returns #f if there’s no such ports.

Function: port-plumbing port

{control.plumbing} A port opened for a plumbing with open-outlet-input-port, open-inlet-output-port, or other utility procedures, keeps which plumbing it belongs to. This procedure returns the plumbing associated with port. If port is not made from a plumbing, #f is returned.

Note that an existing port added to a plumbing by add-inlet-output-port! etc. does not keep the plumbing it is attached to, and this procedure returns #f on such a port.

Creating and attaching ports to plumbing

Function: open-inlet-output-port plumbing :optional name

{control.plumbing} Creates an output port connected to plumbing’s inlet; that is, the data written to the port will flow to plumbing’s outlets.

The optional name should be a symbol or #f. If it is a symbol, the port will be retrieved from plumbing with plumbing-get-port.

Applying port-plumbing on the returned port returns plumbing.

Once you close the returned port, the port is removed from plumbing. Once all inlet ports are closed, the outlet ports of plumbing get EOF.

Function: add-inlet-input-port! plumbing iport :optional name

{control.plumbing} Connects an existing input port iport to plumbing’s inlet. A thread in plumbing reads from iport and feeds the data to its outlets.

The optional name should be a symbol or #f. If it is a symbol, the port will be retrieved from plumbing with plumbing-get-port.

Once EOF is read from iport, it is removed from plumbing. Once all inlet ports are closed, the outlet ports of plumbing get EOF.

Function: open-outlet-input-port plumbing :optional name

{control.plumbing} Creates an input port connected to plumbing’s outlet; that is, from that port you can read the data fed to plumbing’s inlets.

The optional name should be a symbol or #f. If it is a symbol, the port will be retrieved from plumbing with plumbing-get-port.

Applying port-plumbing on the returned port returns plumbing.

Once all inlets are closed, the returned port yields EOF. You can close the returned port at any time; then the port is removed from plumbing’s outlets and no data will be delivered to it.

Function: add-outlet-output-port! plumbing oport :optional name :key close-on-eof asynchronous

{control.plumbing} Connects an existing output port oport to plumbing’s outlet. Data flowed in from plumbing’s inlet will be written to oport.

The optional name should be a symbol or #f. If it is a symbol, the port will be retrieved from plumbing with plumbing-get-port.

You can close the returned port at any time; then the port is removed from plumbing’s outlets and no data will be delivered to it.

By default, oport is left as is after all inlets are closed; plumbing won’t write anything to oport but you can still write to it. If you specify a true value to the close-on-eof keyword argument, however, plumbing will close oport when all of its inlets are closed. It is useful, for example, when oport is a file output port and you want to close it once plumbing is done. However, if oport is a standard output port for logging purpose, you don’t want it to be automatically closed.

If a true value is given to the asynchronous keyword argument, a thread is spawn to feed the data to oport; it is useful if oport can be blocked, e.g. an OS pipe or a socket. Without asynchronous, blocked output to oport causes data flow to inlets to be blocked as well. With asynchronous, other threads can keep feeding to inlets.

Function: plumbing spec …

{control.plumbing} A utility plumbing constructor. It creates a new plumbing, and adds inlets and outlets as specified by specs, then returns the plumbing. Each spec must be one of the following forms. In the description, iport is an input port, oport is an output port, and name is a symbol.

(< iport)
(< iport name)

Adds iport to the plumbing’s inlet, optionally naming it with name.

(< name)

Creates an inlet output port with name name. The output port can be retrieved by plumbing-get-port.

(> oport)
(> oport name)
(> oport (option …))
(> oport name (option …))

Adds oport to the plumbing’s outlet, optional naming it with name. Options can be one of the keywords :close-on-eof, :coe, asynchronous, and :async. :close-on-eof and asynchronous has the effect of giving true to those keyword arguments to add-outlet-output-port!. :coe and :async are synonyms of :close-on-eof and :asynchronous, respectively.

(> name)

Creates an outlet input port with name name. The output port can be retrieved by plumbing-get-port.

Preconfigured plumbings

The following are convenience procedures to construct typical plumbings.

Function: make-pipe :key num-inlets num-outlets

{control.plumbing} A pipe is a passive plumbing device that has inlet output port(s) and outlet input port(s). Data written to the inlet output ports will be buffered and available to read from the outlet input ports. This procedure creates a pipe with num-inlets inlet output ports and num-outlets outlet input ports, and returns two values, a list of inlet ports and a list of outlet ports.

Both num-inlets and num-outlets should be a positive exact integer. If omitted, 1 is assumed.

If there are more than one outlets, data is duplicated; each consumer reads the same data. Buffers are attached to each outlet, so each consumer can read the outlet in its own pace.

The created plumbing can be obtained by applying port-plumbing to any of the returned ports.

This procedure is similar to sys-pipe, which creates a pair of ports using OS’s pipe mechanism (see Other file operations). However, the pipe device created by make-pipe is a user-land construct and only for inter-thread communication; it can’t be used for inter-process communication. On the other hand, you can have n-in m-out pipe using make-pipe.

Function: make-pump inlet-iports outlet-oports

{control.plumbing} A pump is an active plumbing device that reads data from the given inlet input ports, and write it out to the given outlet output ports.

The inlet-iports argument must be a list of input ports, and outlet-oports argument must be a list of output ports.

A dedicated thread is spawn to each inlet input port to read from it.

Function: open-broadcast-output-port oport …

{control.plumbing} All the arguments must be output ports. Returns an output port such that data written to it go to every oport. Internally, it creates a plumbing, adds oports to its outlet, and returns a new inlet output port. The plumbing can be retrieved by calling port-plumbing on the returned port.

This is Common Lisp’s make-broadcast-stream.

Function: open-tapping-input-port inlet-iport outlet-oport :key close-on-eof

{control.plumbing} It creates a pump, a plumbing that reads from inlet-iport and writes out the read data to outlet-oport. Additionally, it creates and returns an outlet input port, from which you can read the data flowing in the pump.

An example use case is to monitor process output. You spawn a process with its stdout connected to a pipe, then call (open-tapping-input-port (process-output subprocess) (current-output-port)). With this, the subprocess’s output automatically flows to your process’s output. However, that data is also available from the returned input port, and you can check it.

Note that the data passed from inlet-iport to outlet-oport are buffered for the returned input port until you read from it. If you’re done with monitoring, you can close the returned input port safely; the data will no longer be buffered, but the pump will keep running until the input reaches EOF.

The pump plumbing is available by calling port-plumbing on the returned input port.



For Development HEAD DRAFTSearch (procedure/syntax/module):
DRAFT