Introduction

In the previous post related to go concurrency patterns we took a look at fan-out with context cancellation. Results were collected in an unordered fashion but what if we wanted to keep the same order as the input?

For users of the python language, concurrent.futures.ThreadPoolExecutor is an invaluable tool for asynchronous processing of data. Its map method allows to process an array of values through a function and collect the results in the same order as the input on the return.

We’ll attempt to replicate this behavior through some changes to our previous fan-out example.

If you want more details on the channel mechanics involved check-out the previous post.

Implementation

We’ll tour the biggest changes from the previous example.

Types

13
14
15
16
type Mappable interface {
	Index() int
	Process() Mappable
}

The Mappable interface allows to retain the architectural logic separated from the data being processed. A Mappable object needs to have a Index in an array and Process method needing to be applied on it.

18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type value struct {
	name  string
	index int
}

func (v *value) Index() int {
	return v.index
}

// Process the value and return the result as the same type but copied
func (v *value) Process() Mappable {
	// Random sleep between 1-5s handled.
	time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second)

	return &value{name: v.name, index: v.index}
}

Instead of solely passing the value for treatment we pass a composite of value+index. This will allow the collector to know how to reassemble the slice.

We finish up by implementing the remainder of the Mappable interface.

Processing

45
46
inChan        = make(chan Mappable)     // Input values
outChanValues = make(chan Mappable, 10) // Output values

We declare the channels with the Mappable type. In this case, since the Mappable type is an interface and we’ve implemented our methods in value with reference pointers we’ll need to pass it by reference.

Passing by reference can be more efficient when the structure is larger. Care must be taken not to override the value during treatment.

Since the value structure is small it would not be an issue to pass it as a value through the channel.

50
51
52
53
54
55
wg := &sync.WaitGroup{} // Waitgroup for workers
wg.Add(numOfWorkers)

for s := 0; s < numOfWorkers; s++ {
	go fanoutWorker(wg, inChan, s, outChanValues)
}
91
92
93
94
95
96
97
98
func fanoutWorker(wg *sync.WaitGroup, inChan <-chan Mappable,
	routineName int, valOut chan<- Mappable) {
	defer wg.Done()

	for name := range inChan {
		valOut <- name.Process()
	}
}

We start a number of fan-out workers consuming from the input channel (inChan).

These workers will process the event as defined by the input value type (simulated here with a sleep).

The return of the Process method is a Mappable type which is compatible with our output channel. The return of the Process method of value could be of a different type than value as long as the Mappable interface is respected.

A possible advantage to using the Mappable interface here is being able to have multiple types of processable events in the same mappable array.

91
92
93
94
95
go func() {
	// Once input data is treated and all workers have returned, close the output channel
	wg.Wait()
	close(outChanValues)
}()

The fanoutWorkers will mark the waitgroup as done once the inChan is empty and the last value has been processed since the inputData function will close the inChan once the last input value has been sent.

Collection

67
68
69
70
71
72
73
outputValues := make([]string, len(inputValues)) // Collected Output values

for v := range outChanValues {
	fmt.Fprintf(os.Stdout, "Success: %v\n", v)
	realValue := v.(*value)
	outputValues[v.Index()] = realValue.name
}

Output values are then collected from the output channel (outChanValues) until the channel is closed (which will happen once the waitgroup associated with the workers is done processing all of the input).

The collected value is then asserted to its result type and the array reconstructed ordered by result.

75
76
77
78
79
if Names == strings.Join(outputValues, " ") {
	fmt.Fprintln(os.Stdout, "Order was respected, input order is the same as output.")
} else {
	fmt.Fprintln(os.Stdout, "Order was not respected, input order is not the same as output.")
}

Since in our example, conveniently, the input values are the same as the output values we can verify if the order was kept by comparing the output array to the input string.

For more real-life scenarios implementing a Result interface could provide better abstraction to the results of the processing, such as the Mappable interface brought to the input values.

1
2
3
4
type Result interface {
	Mappable
	Value() string
}

Full Code

Limitations

Unlike the previous example that could be adapted to work with a constant input stream this pattern relies on the input data being a fixed size and as such is not usually suitable for applications that serve as middleware using message brokers (in an ETL pipeline for example).

Due to its fixed input and output size, extremely large inputs will need to be batched to avoid running out of memory.

Future work

In case the processed values need to be taken by another treatment task in order of arrival we could keep track of the current index in treatment and achieve a pipeline style architecture.

Conclusion

Although not as versatile as unordered result collection, it is feasible to implement a map sort of processing using go channels at the cost of input size flexibility and extra context passing.