Java 8 Parallel Stream – Handling Non-Concurrent Unordered Collectors

java-8java-stream

Suppose I have this custom collector :

  public class CustomToListCollector<T> implements Collector<T, List<T>, List<T>> {

     @Override
     public Supplier<List<T>> supplier() {
         return ArrayList::new;
     }

     @Override
     public BiConsumer<List<T>, T> accumulator() {
         return List::add;
     }

     @Override
     public BinaryOperator<List<T>> combiner() {
         return (l1, l2) -> {
            l1.addAll(l2);
            return l1;
         };
     }

     @Override
     public Function<List<T>, List<T>> finisher() {
         return Function.identity();
     }

     @Override
     public Set<java.util.stream.Collector.Characteristics> characteristics() {
         return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
     }
}

This is exactly the Collectors#toList implementation with one minor difference: there's also UNORDERED characteristics added.

I would assume that running this code :

    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

    for (int i = 0; i < 100_000; i++) {
        List<Integer> result = list.parallelStream().collect(new CustomToListCollector<>());
        if (!result.equals(list)) {
            System.out.println(result);
            break;
        }
    }

should actually produce some result. But it does not.

I've looked under the hood a bit. ReferencePipeline#collect first checks if the stream is parallel, if the collector is concurrent and if the collector is unordered. Concurrent is missing, so it delegates to a method evaluate by creating a TerminalOp out of this collector. This under the hood is a ReducingSink, that actually cares if the collector is unordered or not:

         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                   ? StreamOpFlag.NOT_ORDERED
                   : 0;
        }
    }; 

I have not debugged further since it gets pretty complicated fast.

Thus may be there is a shortcut here and someone could explain what I am missing. It is a parallel stream that collects elements in a non-concurrent unordered collector. Shouldn't there be no order in how the threads combine the results together? If not, how is the order imposed here (by whom)?

Best Answer

Note that the result is the same when using list .parallelStream() .unordered() .collect(Collectors.toList()), in either case, the unordered property is not used within the current implementation.

But let’s change the setup a little bit:

List<Integer> list = Collections.nCopies(10, null).stream()
    .flatMap(ig -> IntStream.range(0, 100).boxed())
    .collect(Collectors.toList());
List<Integer> reference = new ArrayList<>(new LinkedHashSet<>(list));

for (int i = 0; i < 100_000; i++) {
    List<Integer> result = list.parallelStream()
      .distinct()
      .collect(characteristics(Collectors.toList(), Collector.Characteristics.UNORDERED));
    if (!result.equals(reference)) {
        System.out.println(result);
        break;
    }
}

using the characteristics collector factory of this answer
The interesting outcome is that in Java 8 versions prior to 1.8.0_60, this has a different outcome. If we use objects with distinct identities instead of the canonical Integer instance, we could detect that in these earlier versions, not only the order of the list differs, but that the objects in the result list are not the first encountered instances.

So the unordered characteristic of a terminal operation was propagated to the stream, affecting the behavior of distinct(), similar to that of skip and limit, as discussed here and here.

As discussed in the second linked thread, the back-propagation has been removed completely, which is reasonable when thinking about it a second time. For distinct, skip and limit, the order of the source is relevant and ignoring it just because the order will be ignored in subsequent stages is not right. So the only remaining stateful intermediate operation that could benefit from back-propagation would be sorted, which would be rendered obsolete when the order is being ignored afterwards. But combining sorted with an unordered sink is more like a programming error anyway…

For stateless intermediate operations the order is irrelevant anyway. The stream processing works by splitting the source into chunks, apply all stateless intermediate operations on their elements independently and collecting into a local container, before merging into the result container. So the merging step is the only place, where respecting or ignoring the order (of the chunks) will have an impact on the result and perhaps on the performance.

But the impact isn’t very big. When you implement such an operation, e.g. via ForkJoinTasks, you simply split a task into two, wait for their completion and merge them. Alternatively, a task may split off a chunk into a sub-task, process its remaining chunk in-place, wait for the sub-task and merge. In either case, merging the results in order comes naturally due to the fact that the initiating task has hands on references to the adjacent tasks. To merge with different chunks instead, the associated sub-tasks first have to be found somehow.

The only benefit from merging with a different task would be that you can merge with the first completed task, if the tasks need different time to complete. But when waiting for a sub-task in the Fork/Join framework, the thread won’t be idle, the framework will use the thread for working on other pending tasks in-between. So as long as the main task has been split into enough sub-tasks, there will be full CPU utilization. Also, the spliterators attempt to split into even chunks to reduce the differences between the computing times. It’s very likely that the benefit of an alternative unordered merging implementation doesn’t justify the code duplication, at least with the current implementation.

Still, reporting an unordered characteristic allows the implementation to utilize it when beneficial and implementations can change.