Note that the result is the same when using list .parallelStream() .unordered() .collect(Collectors.toList())
, in either case, the unordered property is not used within the current implementation.
But let’s change the setup a little bit:
List<Integer> list = Collections.nCopies(10, null).stream()
.flatMap(ig -> IntStream.range(0, 100).boxed())
.collect(Collectors.toList());
List<Integer> reference = new ArrayList<>(new LinkedHashSet<>(list));
for (int i = 0; i < 100_000; i++) {
List<Integer> result = list.parallelStream()
.distinct()
.collect(characteristics(Collectors.toList(), Collector.Characteristics.UNORDERED));
if (!result.equals(reference)) {
System.out.println(result);
break;
}
}
using the characteristics
collector factory of this answer
The interesting outcome is that in Java 8 versions prior to 1.8.0_60
, this has a different outcome. If we use objects with distinct identities instead of the canonical Integer
instance, we could detect that in these earlier versions, not only the order of the list differs, but that the objects in the result list are not the first encountered instances.
So the unordered characteristic of a terminal operation was propagated to the stream, affecting the behavior of distinct()
, similar to that of skip
and limit
, as discussed here and here.
As discussed in the second linked thread, the back-propagation has been removed completely, which is reasonable when thinking about it a second time. For distinct
, skip
and limit
, the order of the source is relevant and ignoring it just because the order will be ignored in subsequent stages is not right. So the only remaining stateful intermediate operation that could benefit from back-propagation would be sorted
, which would be rendered obsolete when the order is being ignored afterwards. But combining sorted
with an unordered sink is more like a programming error anyway…
For stateless intermediate operations the order is irrelevant anyway. The stream processing works by splitting the source into chunks, apply all stateless intermediate operations on their elements independently and collecting into a local container, before merging into the result container. So the merging step is the only place, where respecting or ignoring the order (of the chunks) will have an impact on the result and perhaps on the performance.
But the impact isn’t very big. When you implement such an operation, e.g. via ForkJoinTask
s, you simply split a task into two, wait for their completion and merge them. Alternatively, a task may split off a chunk into a sub-task, process its remaining chunk in-place, wait for the sub-task and merge. In either case, merging the results in order comes naturally due to the fact that the initiating task has hands on references to the adjacent tasks. To merge with different chunks instead, the associated sub-tasks first have to be found somehow.
The only benefit from merging with a different task would be that you can merge with the first completed task, if the tasks need different time to complete. But when waiting for a sub-task in the Fork/Join framework, the thread won’t be idle, the framework will use the thread for working on other pending tasks in-between. So as long as the main task has been split into enough sub-tasks, there will be full CPU utilization. Also, the spliterators attempt to split into even chunks to reduce the differences between the computing times. It’s very likely that the benefit of an alternative unordered merging implementation doesn’t justify the code duplication, at least with the current implementation.
Still, reporting an unordered characteristic allows the implementation to utilize it when beneficial and implementations can change.
The Collection
object used to receive the data being collected does not need to be concurrent. You can give it a simple ArrayList
.
That is because the collection of values from a parallel stream is not actually collected into a single Collection
object. Each thread will collect their own data, and then all sub-results will be merged into a single final Collection
object.
This is all well-documented in the Collector
javadoc, and the Collector
is the parameter you're giving to the collect()
method:
<R,A> R collect(Collector<? super T,A,R> collector)
Best Answer
It is safe to use a non-concurrent collector in a
collect
operation of a parallel stream.In the specification of the
Collector
interface, in the section with half a dozen bullet points, is this:This means that the various implementations provided by the
Collectors
class can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.I also recommend reading the Mutable Reduction section of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an
ArrayList
, which is not thread-safe.The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a
Supplier
function, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.