In a previous post we looked at parallel reduction and gave a conceptual background on using the ForkJoin model to parallelise an embarrassingly parallel operation. Reduction is at the heart of building parallel algorithms. In this post, we will to take this knowledge a step further and use it in building a custom Collector that works with a parallel pipeline.
Motivation, Why Would you build a Collector?
- Curiosity.
- The collector you need is not provided by the JDK.
- You don’t want to use an old style loops, not because loops are bad but they are hard to parallelise.
- The last thing you want to do is writing a ForkJoin Task.
A SumAndCount Collector
The collector that we will be building works on a Stream on Integers, accumulates the sum in a BigDecimal and counts the elements in the source Stream.
public class SumAndCount{ BigDecimal sum = BigDecimal.ZERO; int count; }
Ingredients for a Parallel Collector
- Supplier, a function that can be used to instantiate a partial result.
- BiConsumer, a function that takes a partial result and an element from the source stream and adds to the partial result.
- BinaryOperator, a merge function that takes two partial results, merges them and returns the result.
Putting it all together
Given the ingredients above, we are ready to go. Note that no synchronization is required here,parallelism is not concurrency.
Collector<Integer, SumAndCount, SumAndCount> collector = Collector.of(SumAnCount::new, (sumAndCount, item) -> { sumAndCount.sum = sumAndCount.sum.add(BigDecimal.valueOf(item)); sumAndCount.count++; }, (s1, s2) -> { s1.sum = s1.sum.add(s2.sum); s1.count += s2.count; return s1; });
Thats a collector that we can hand to the Stream API and the API will take it from there and parallelise the job for you.
SumAndCount sumAndCount = Stream.iterate(0,i->i+10).limit(8000_000).parallel().collect(collector);