- Published on
Go: Asynchronous processing with ordered results
- Authors
- Name
- Jo Guerreiro
Introduction
In the previous post related to go concurrency patterns we took a look at fan-out with context cancellation. Results were collected in an unordered fashion but what if we wanted to keep the same order as the input?
For users of the python language, concurrent.futures.ThreadPoolExecutor
is an invaluable tool for asynchronous processing of data. Its map
method allows to process an array of values through a function and collect the results in the same order as the input on the return.
We'll attempt to replicate this behavior through some changes to our previous fan-out example.
If you want more details on the channel mechanics involved check-out the previous post.
Implementation
We'll tour the biggest changes from the previous example.
Types
type Mappable interface {
Index() int
Process() Mappable
}
The Mappable
interface allows to retain the architectural logic separated from the data being processed. A Mappable
object needs to have a Index
in an array and Process
method needing to be applied on it.
type value struct {
name string
index int
}
func (v *value) Index() int {
return v.index
}
// Process the value and return the result as the same type but copied
func (v *value) Process() Mappable {
// Random sleep between 1-5s handled.
time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second)
return &value{name: v.name, index: v.index}
}
Instead of solely passing the value for treatment we pass a composite of value+index. This will allow the collector to know how to reassemble the slice.
We finish up by implementing the remainder of the Mappable
interface.
Processing
inChan = make(chan Mappable) // Input values
outChanValues = make(chan Mappable, 10) // Output values
We declare the channels with the Mappable
type. In this case, since the Mappable
type is an interface and we've implemented our methods in value
with reference pointers we'll need to pass it by reference.
Passing by reference can be more efficient when the structure is larger. Care must be taken not to override the value during treatment.
Since the value
structure is small it would not be an issue to pass it as a value through the channel.
wg := &sync.WaitGroup{} // Waitgroup for workers
wg.Add(numOfWorkers)
for s := 0; s < numOfWorkers; s++ {
go fanoutWorker(wg, inChan, s, outChanValues)
}
func fanoutWorker(wg *sync.WaitGroup, inChan <-chan Mappable,
routineName int, valOut chan<- Mappable) {
defer wg.Done()
for name := range inChan {
valOut <- name.Process()
}
}
We start a number of fan-out workers consuming from the input channel (inChan).
These workers will process the event as defined by the input value type (simulated here with a sleep
).
The return of the Process
method is a Mappable
type which is compatible with our output channel. The return of the Process
method of value
could be of a different type than value
as long as the Mappable
interface is respected.
A possible advantage to using the Mappable
interface here is being able to have multiple types of processable events in the same mappable array.
go func() {
// Once input data is treated and all workers have returned, close the output channel
wg.Wait()
close(outChanValues)
}()
The fanoutWorker
s will mark the waitgroup as done once the inChan
is empty and the last value has been processed since the inputData
function will close the inChan
once the last input value has been sent.
Collection
outputValues := make([]string, len(inputValues)) // Collected Output values
for v := range outChanValues {
fmt.Fprintf(os.Stdout, "Success: %v\n", v)
realValue := v.(*value)
outputValues[v.Index()] = realValue.name
}
Output values are then collected from the output channel (outChanValues
) until the channel is closed (which will happen once the waitgroup associated with the workers is done processing all of the input).
The collected value is then asserted to its result type and the array reconstructed ordered by result.
if Names == strings.Join(outputValues, " ") {
fmt.Fprintln(os.Stdout, "Order was respected, input order is the same as output.")
} else {
fmt.Fprintln(os.Stdout, "Order was not respected, input order is not the same as output.")
}
Since in our example, conveniently, the input values are the same as the output values we can verify if the order was kept by comparing the output array to the input string.
For more real-life scenarios implementing a Result
interface could provide better abstraction to the results of the processing, such as the Mappable
interface brought to the input values.
type Result interface {
Mappable
Value() string
}
Full Code
Limitations
Unlike the previous example that could be adapted to work with a constant input stream this pattern relies on the input data being a fixed size and as such is not usually suitable for applications that serve as middleware using message brokers (in an ETL pipeline for example).
Due to its fixed input and output size, extremely large inputs will need to be batched to avoid running out of memory.
Future work
In case the processed values need to be taken by another treatment task in order of arrival we could keep track of the current index in treatment and achieve a pipeline style architecture.
Conclusion
Although not as versatile as unordered result collection, it is feasible to implement a map
sort of processing using go channels at the cost of input size flexibility and extra context passing.