Introduction
In the previous post related to go concurrency patterns we took a look at fan-out with context cancellation. Results were collected in an unordered fashion but what if we wanted to keep the same order as the input?
For users of the python language, concurrent.futures.ThreadPoolExecutor
is an
invaluable tool for asynchronous processing of data. Its map
method allows
to process an array of values through a function and collect the results in
the same order as the input on the return.
We’ll attempt to replicate this behavior through some changes to our previous fan-out example.
If you want more details on the channel mechanics involved check-out the previous post.
Implementation
We’ll tour the biggest changes from the previous example.
Types
|
|
The Mappable
interface allows to retain the architectural logic separated
from the data being processed. A Mappable
object needs to have a Index
in an
array and Process
method needing to be applied on it.
|
|
Instead of solely passing the value for treatment we pass a composite of value+index. This will allow the collector to know how to reassemble the slice.
We finish up by implementing the remainder of the Mappable
interface.
Processing
|
|
We declare the channels with the Mappable
type. In this case, since the Mappable
type is an interface and we’ve implemented our methods in value
with reference
pointers we’ll need to pass it by reference.
Passing by reference can be more efficient when the structure is larger. Care must be taken not to override the value during treatment.
Since the value
structure is small it would not be an issue to pass it as a value through the channel.
|
|
|
|
We start a number of fan-out workers consuming from the input channel (inChan).
These workers will process the event as defined by the input value type (simulated here with a sleep
).
The return of the Process
method is a Mappable
type which is compatible with
our output channel.
The return of the Process
method of value
could be of a different type than value
as long as the Mappable
interface is respected.
A possible advantage to using the Mappable
interface here is being able to have
multiple types of processable events in the same mappable array.
|
|
The fanoutWorker
s will mark the waitgroup as done once the inChan
is empty and the
last value has been processed since the inputData
function will close the inChan
once the last input value has been sent.
Collection
|
|
Output values are then collected from the output channel (outChanValues
) until the
channel is closed (which will happen once the waitgroup associated with the workers
is done processing all of the input).
The collected value is then asserted to its result type and the array reconstructed ordered by result.
|
|
Since in our example, conveniently, the input values are the same as the output values we can verify if the order was kept by comparing the output array to the input string.
For more real-life scenarios implementing a Result
interface could provide better
abstraction to the results of the processing, such as the Mappable
interface brought
to the input values.
|
|
Full Code
Limitations
Unlike the previous example that could be adapted to work with a constant input stream this pattern relies on the input data being a fixed size and as such is not usually suitable for applications that serve as middleware using message brokers (in an ETL pipeline for example).
Due to its fixed input and output size, extremely large inputs will need to be batched to avoid running out of memory.
Future work
In case the processed values need to be taken by another treatment task in order of arrival we could keep track of the current index in treatment and achieve a pipeline style architecture.
Conclusion
Although not as versatile as unordered result collection, it is feasible to
implement a map
sort of processing using go channels at the cost of input
size flexibility and extra context passing.