Introduction

The fan-out pattern is particularly useful when having to process a batch of data that is not interdependent. Go’s concurrency model lends itself to this pattern very well.

Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O. A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.

Sameer Ajmani1

What we’re implementing in this post doesn’t respect the last part of the fan-in pattern “by multiplexing the input channels onto a single channel” as it relies on a single channel already on the workers for result collection.

This should not be an issue when the collection phase is very fast and should prove simpler in terms of channel management.

Specifications

Here are some specifications for the implementation:

  1. The task itself is a syscall to a system command that can be long.
  2. If interrupted by signals it should gracefully exit and properly abort current calls.
  3. Errors should not interrupt other workers but accumulate for post-treatment.
  4. The tasks can be memory heavy -> goroutines should be limited.

Implementation

We’ll tour the main function for each component.

25
26
numOfWorkers := runtime.NumCPU()
fmt.Fprintln(os.Stdout, "Running on", numOfWorkers, "goroutines.")

First we define the number of workers for our fan-out, runtime.NumCPU() allows us to retrieve the current amount of threads in the system (this includes hyperthreading).

29
30
31
32
33
34
35
36
37
38
var (
  ctx, cancel   = context.WithCancel(context.Background())
  inChan        = make(chan string)                        // Input values
  outChanValues = make(chan string)                        // Output values
  outChanErrors = make(chan error)                         // Output errors
  succeeded     = []string{}                               // Collected Output values
  failed        = []string{}                               // Collected Input values that failed
)

defer cancel()

We declare the variables we’ll be using. Some optimizations here could include making the output channel buffered so it doesn’t lock the workers that finish tasks at the same time

Ex: outChanValues = make(chan string, 10)

The size of this buffer depends on machine memory available and size of the values outputted. To avoid locking a small number should be enough as the collector is orders of magnitude faster than the producer.

Finally we defer the cancel func of the context for cleanup purposes.

40
41
42
43
44
45
go func() {
  sigterm := make(chan os.Signal, 1)
  signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
  <-sigterm
  cancel()
}()

This goroutine overrides SIGINT and SIGTERM, proceeding to them lock until one of these signals are triggered (by a Ctrl+C possibly).

If one of these signals is triggered the context cancel is run. Sending a signal (context cancel) to the workers to shut down.

48
49
50
51
52
53
wg := &sync.WaitGroup{} // Waitgroup for workers
wg.Add(numOfWorkers)

for s := 0; s < numOfWorkers; s++ {
  go fanoutWorker(ctx, wg, inChan, s, outChanValues, outChanErrors)
}

We create a wait group for the workers to make sure we only close their output channel once all workers have returned. We then proceed to launch these workers.

105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
func fanoutWorker(ctx context.Context, wg *sync.WaitGroup, inChan <-chan string,
	routineName int, valOut chan<- string, errOut chan<- error) {
	defer wg.Done()

	for {
		select {
		case <-ctx.Done(): // Signal handling
			return
		case name, ok := <-inChan: // Process input
			if !ok {
				return
			}

			// Random sleep between 1-5s handled as syscalls.
			cmd := exec.CommandContext(ctx, "sleep", fmt.Sprint(rand.Intn(5-1)+1))
			if err := cmd.Run(); err == nil {
				valOut <- name
			} else {
				errOut <- &ErrProcessing{event: name, inner: err}
			}
		}
	}
}

The worker’s anatomy is simple. One input channel and two output channels with a possible context canceling.

The workers revolve in a for loop, at each time checking if the context is done, whereby the worker returns and signals the wait group it is done, or tries to consume the input channel.

113
114
115
116
case name, ok := <-inChan:
  if !ok {
    return
  }

If the input channel is empty and closed the channel will return ok=false, signaling there is no more input to be consumed and therefore triggering a return and wait group done().

56
57
// Input data
go inputData(inChan)
 97
 98
 99
100
101
102
103
104
// Insert data into the input channel and signal it's done
func inputData(inChan chan<- string) {
	for _, v := range strings.Fields(Names) {
		inChan <- v
	}

	close(inChan)
}

This code simulates the population of the input channel. In this example’s case we use strings.Fields to separate a large sentence by whitespace. This means our channel will be composed of a queue of individual words. (ex: [“dynasty”, “regret”, “appalling”, “creative”, “accessories”])

At the end we close the input channel. The workers will remain consuming until the input channel is closed and empty.

This snippet can be executed before or after launching the worker routines as the workers will loop waiting for either a context signal or an input value.

58
59
60
61
62
go func() {
  wg.Wait()
  close(outChanValues)
  close(outChanErrors)
}()

We start another go routine that will unblock once all the workers have returned (once the input channel is empty and closed). After this routine is unblocked the output channels are closed.

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
for {
    select {
    case value, ok := <-outChanValues:
        if ok {
        fmt.Fprintf(os.Stdout, "Success: %s\n", value)
        succeeded = append(succeeded, value)
        } else {
            outChanValues = nil
        }
    case err, ok := <-outChanErrors:
        if ok {
            var errP *ErrProcessing
            if errors.As(err, &errP) {
                failed = append(failed, errP.event)
            } else {
                fmt.Fprintln(os.Stderr, "unhandled error:", err)
            }
        } else {
            outChanErrors = nil
        }
    }

    if outChanValues == nil && outChanErrors == nil {
        break
    }
}

On the main thread we enter a for loop that will cycle between the two output channels until they are both empty and closed.

It is not necessary to have a wg.Wait() in the main thread since the main thread is locked in the loop until the channels are closed by the go routine with the wg.Wait().

Implementation limits

There are several issues this pattern may face depending on the size of the input and output and the speed of the task compared to the collection. Is the content enough to

Large input data

On a large input data, the coalescing done by this snippet is not optimal. Specially if the tasks generate and send large structures.

In that case you’d want to do any final post treatment on the spot (loading to DB, output to file or stdout) and not gather the results.

Large output data

Be aware that if the post treatment is heavy it will stop workers that are done from sending their treated result as the channel will always be occupied.

If the size of the output values is predictable a buffered channel can be used according to the capacity of the instance outChanValues = make(chan string, 10).

Full Code

Conclusion

It is very fast to get a fan-out & collector pattern working in Go and it is a useful pattern for processing input values in tasks that are generally long serially but not interdependent.

For python users, this is a very similar pattern to ThreadPoolExecutor with submit and collecting through concurrent.futures.as_completed.


  1. Go blog post on go concurrency patterns, 13 March 2014 ↩︎