In the previous post related to go concurrency patterns we took a look at fan-out with context cancellation. Results were collected in an unordered fashion but what if we wanted to keep the same order as the input?
For users of the python language,
concurrent.futures.ThreadPoolExecutor is an
invaluable tool for asynchronous processing of data. Its
map method allows
to process an array of values through a function and collect the results in
the same order as the input on the return.
We’ll attempt to replicate this behavior through some changes to our previous fan-out example.
If you want more details on the channel mechanics involved check-out the previous post.
We’ll tour the biggest changes from the previous example.
Mappable interface allows to retain the architectural logic separated
from the data being processed. A
Mappable object needs to have a
Index in an
Process method needing to be applied on it.
Instead of solely passing the value for treatment we pass a composite of value+index. This will allow the collector to know how to reassemble the slice.
We finish up by implementing the remainder of the
We declare the channels with the
Mappable type. In this case, since the
type is an interface and we’ve implemented our methods in
value with reference
pointers we’ll need to pass it by reference.
Passing by reference can be more efficient when the structure is larger. Care must be taken not to override the value during treatment.
value structure is small it would not be an issue to pass it as a value through the channel.
We start a number of fan-out workers consuming from the input channel (inChan).
These workers will process the event as defined by the input value type (simulated here with a
The return of the
Process method is a
Mappable type which is compatible with
our output channel.
The return of the
Process method of
value could be of a different type than
as long as the
Mappable interface is respected.
A possible advantage to using the
Mappable interface here is being able to have
multiple types of processable events in the same mappable array.
fanoutWorkers will mark the waitgroup as done once the
inChan is empty and the
last value has been processed since the
inputData function will close the
once the last input value has been sent.
Output values are then collected from the output channel (
outChanValues) until the
channel is closed (which will happen once the waitgroup associated with the workers
is done processing all of the input).
The collected value is then asserted to its result type and the array reconstructed ordered by result.
Since in our example, conveniently, the input values are the same as the output values we can verify if the order was kept by comparing the output array to the input string.
For more real-life scenarios implementing a
Result interface could provide better
abstraction to the results of the processing, such as the
Mappable interface brought
to the input values.
Unlike the previous example that could be adapted to work with a constant input stream this pattern relies on the input data being a fixed size and as such is not usually suitable for applications that serve as middleware using message brokers (in an ETL pipeline for example).
Due to its fixed input and output size, extremely large inputs will need to be batched to avoid running out of memory.
In case the processed values need to be taken by another treatment task in order of arrival we could keep track of the current index in treatment and achieve a pipeline style architecture.
Although not as versatile as unordered result collection, it is feasible to
map sort of processing using go channels at the cost of input
size flexibility and extra context passing.