There are several issues going on here in parallel, as it were.
The first is that solving a problem in parallel always involves performing more actual work than doing it sequentially. Overhead is involved in splitting the work among several threads and joining or merging the results. Problems like converting short strings to lower-case are small enough that they are in danger of being swamped by the parallel splitting overhead.
The second issue is that benchmarking Java program is very subtle, and it is very easy to get confusing results. Two common issues are JIT compilation and dead code elimination. Short benchmarks often finish before or during JIT compilation, so they're not measuring peak throughput, and indeed they might be measuring the JIT itself. When compilation occurs is somewhat non-deterministic, so it may cause results to vary wildly as well.
For small, synthetic benchmarks, the workload often computes results that are thrown away. JIT compilers are quite good at detecting this and eliminating code that doesn't produce results that are used anywhere. This probably isn't happening in this case, but if you tinker around with other synthetic workloads, it can certainly happen. Of course, if the JIT eliminates the benchmark workload, it renders the benchmark useless.
I strongly recommend using a well-developed benchmarking framework such as JMH instead of hand-rolling one of your own. JMH has facilities to help avoid common benchmarking pitfalls, including these, and it's pretty easy to set up and run. Here's your benchmark converted to use JMH:
package com.stackoverflow.questions;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
public class SO23170832 {
@State(Scope.Benchmark)
public static class BenchmarkState {
static String[] array;
static {
array = new String[1000000];
Arrays.fill(array, "AbabagalamagA");
}
}
@GenerateMicroBenchmark
@OutputTimeUnit(TimeUnit.SECONDS)
public List<String> sequential(BenchmarkState state) {
return
Arrays.stream(state.array)
.map(x -> x.toLowerCase())
.collect(Collectors.toList());
}
@GenerateMicroBenchmark
@OutputTimeUnit(TimeUnit.SECONDS)
public List<String> parallel(BenchmarkState state) {
return
Arrays.stream(state.array)
.parallel()
.map(x -> x.toLowerCase())
.collect(Collectors.toList());
}
}
I ran this using the command:
java -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1
(The options indicate five warmup iterations, five benchmark iterations, and one forked JVM.) During its run, JMH emits lots of verbose messages, which I've elided. The summary results are as follows.
Benchmark Mode Samples Mean Mean error Units
c.s.q.SO23170832.parallel thrpt 5 4.600 5.995 ops/s
c.s.q.SO23170832.sequential thrpt 5 1.500 1.727 ops/s
Note that results are in ops per second, so it looks like the parallel run was about three times faster than the sequential run. But my machine has only two cores. Hmmm. And the mean error per run is actually larger than the mean runtime! WAT? Something fishy is going on here.
This brings us to a third issue. Looking more closely at the workload, we can see that it allocates a new String object for each input, and it also collects the results into a list, which involves lots of reallocation and copying. I'd guess that this will result in a fair amount of garbage collection. We can see this by rerunning the benchmark with GC messages enabled:
java -verbose:gc -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1
This gives results like:
[GC (Allocation Failure) 512K->432K(130560K), 0.0024130 secs]
[GC (Allocation Failure) 944K->520K(131072K), 0.0015740 secs]
[GC (Allocation Failure) 1544K->777K(131072K), 0.0032490 secs]
[GC (Allocation Failure) 1801K->1027K(132096K), 0.0023940 secs]
# Run progress: 0.00% complete, ETA 00:00:20
# VM invoker: /Users/src/jdk/jdk8-b132.jdk/Contents/Home/jre/bin/java
# VM options: -verbose:gc
# Fork: 1 of 1
[GC (Allocation Failure) 512K->424K(130560K), 0.0015460 secs]
[GC (Allocation Failure) 933K->552K(131072K), 0.0014050 secs]
[GC (Allocation Failure) 1576K->850K(131072K), 0.0023050 secs]
[GC (Allocation Failure) 3075K->1561K(132096K), 0.0045140 secs]
[GC (Allocation Failure) 1874K->1059K(132096K), 0.0062330 secs]
# Warmup: 5 iterations, 1 s each
# Measurement: 5 iterations, 1 s each
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: com.stackoverflow.questions.SO23170832.parallel
# Warmup Iteration 1: [GC (Allocation Failure) 7014K->5445K(132096K), 0.0184680 secs]
[GC (Allocation Failure) 7493K->6346K(135168K), 0.0068380 secs]
[GC (Allocation Failure) 10442K->8663K(135168K), 0.0155600 secs]
[GC (Allocation Failure) 12759K->11051K(139776K), 0.0148190 secs]
[GC (Allocation Failure) 18219K->15067K(140800K), 0.0241780 secs]
[GC (Allocation Failure) 22167K->19214K(145920K), 0.0208510 secs]
[GC (Allocation Failure) 29454K->25065K(147456K), 0.0333080 secs]
[GC (Allocation Failure) 35305K->30729K(153600K), 0.0376610 secs]
[GC (Allocation Failure) 46089K->39406K(154624K), 0.0406060 secs]
[GC (Allocation Failure) 54766K->48299K(164352K), 0.0550140 secs]
[GC (Allocation Failure) 71851K->62725K(165376K), 0.0612780 secs]
[GC (Allocation Failure) 86277K->74864K(184320K), 0.0649210 secs]
[GC (Allocation Failure) 111216K->94203K(185856K), 0.0875710 secs]
[GC (Allocation Failure) 130555K->114932K(199680K), 0.1030540 secs]
[GC (Allocation Failure) 162548K->141952K(203264K), 0.1315720 secs]
[Full GC (Ergonomics) 141952K->59696K(159232K), 0.5150890 secs]
[GC (Allocation Failure) 105613K->85547K(184832K), 0.0738530 secs]
1.183 ops/s
Note: the lines beginning with #
are normal JMH output lines. All the rest are GC messages. This is just the first of the five warmup iterations, which precedes five benchmark iterations. The GC messages continued in the same vein during the rest of the iterations. I think it's safe to say that the measured performance is dominated by GC overhead and that the results reported should not be believed.
At this point it's unclear what to do. This is purely a synthetic workload. It clearly involves very little CPU time doing actual work compared to allocation and copying. It's hard to say what you really are trying to measure here. One approach would be to come up with a different workload that is in some sense more "real." Another approach would be to change the heap and GC parameters to avoid GC during the benchmark run.
The Javadocs for Collection.(parallelS|s)tream()
and Stream
itself don't answer the question, so it's off to the mailing lists for the rationale. I went through the lambda-libs-spec-observers archives and found one thread specifically about Collection.parallelStream() and another thread that touched on whether java.util.Arrays should provide parallelStream() to match (or actually, whether it should be removed). There was no once-and-for-all conclusion, so perhaps I've missed something from another list or the matter was settled in private discussion. (Perhaps Brian Goetz, one of the principals of this discussion, can fill in anything missing.)
The participants made their points well, so this answer is mostly just an organization of the relevant quotes, with a few clarifications in [brackets], presented in order of importance (as I interpret it).
parallelStream() covers a very common case
Brian Goetz in the first thread, explaining why Collections.parallelStream()
is valuable enough to keep even after other parallel stream factory methods have been removed:
We do not have explicit parallel versions of each of these [stream factories]; we did
originally, and to prune down the API surface area, we cut them on the
theory that dropping 20+ methods from the API was worth the tradeoff of
the surface yuckiness and performance cost of .intRange(...).parallel()
.
But we did not make that choice with Collection.
We could either remove the Collection.parallelStream()
, or we could add
the parallel versions of all the generators, or we could do nothing and
leave it as is. I think all are justifiable on API design grounds.
I kind of like the status quo, despite its inconsistency. Instead of
having 2N stream construction methods, we have N+1 -- but that extra 1
covers a huge number of cases, because it is inherited by every
Collection. So I can justify to myself why having that extra 1 method
is worth it, and why accepting the inconsistency of going no further is
acceptable.
Do others disagree? Is N+1 [Collections.parallelStream() only] the practical choice here? Or should we go
for the purity of N [rely on Stream.parallel()]? Or the convenience and consistency of 2N [parallel versions of all factories]? Or is
there some even better N+3 [Collections.parallelStream() plus other special cases], for some other specially chosen cases we
want to give special support to?
Brian Goetz stands by this position in the later discussion about Arrays.parallelStream()
:
I still really like Collection.parallelStream; it has huge
discoverability advantages, and offers a pretty big return on API
surface area -- one more method, but provides value in a lot of places,
since Collection will be a really common case of a stream source.
parallelStream() is more performant
Brian Goetz:
Direct version [parallelStream()] is more performant, in that it requires less wrapping (to
turn a stream into a parallel stream, you have to first create the
sequential stream, then transfer ownership of its state into a new
Stream.)
In response to Kevin Bourrillion's skepticism about whether the effect is significant, Brian again:
Depends how seriously you are counting. Doug counts individual object
creations and virtual invocations on the way to a parallel operation,
because until you start forking, you're on the wrong side of Amdahl's
law -- this is all "serial fraction" that happens before you can fork
any work, which pushes your breakeven threshold further out. So getting
the setup path for parallel ops fast is valuable.
Doug Lea follows up, but hedges his position:
People dealing with parallel library support need some attitude
adjustment about such things. On a soon-to-be-typical machine,
every cycle you waste setting up parallelism costs you say 64 cycles.
You would probably have had a different reaction if it required 64
object creations to start a parallel computation.
That said, I'm always completely supportive of forcing implementors
to work harder for the sake of better APIs, so long as the
APIs do not rule out efficient implementation. So if killing
parallelStream
is really important, we'll find some way to
turn stream().parallel()
into a bit-flip or somesuch.
Indeed, the later discussion about Arrays.parallelStream()
takes notice of lower Stream.parallel() cost.
stream().parallel() statefulness complicates the future
At the time of the discussion, switching a stream from sequential to parallel and back could be interleaved with other stream operations. Brian Goetz, on behalf of Doug Lea, explains why sequential/parallel mode switching may complicate future development of the Java platform:
I'll take my best stab at explaining why: because it (like the stateful
methods (sort, distinct, limit)) which you also don't like, move us
incrementally farther from being able to express stream pipelines in
terms of traditional data-parallel constructs, which further constrains
our ability to to map them directly to tomorrow's computing substrate,
whether that be vector processors, FPGAs, GPUs, or whatever we cook up.
Filter-map-reduce map[s] very cleanly to all sorts of parallel computing
substrates; filter-parallel-map-sequential-sorted-limit-parallel-map-uniq-reduce
does not.
So the whole API design here embodies many tensions between making it
easy to express things the user is likely to want to express, and doing
is in a manner that we can predictably make fast with transparent cost
models.
This mode switching was removed after further discussion. In the current version of the library, a stream pipeline is either sequential or parallel; last call to sequential()
/parallel()
wins. Besides side-stepping the statefulness problem, this change also improved the performance of using parallel()
to set up a parallel pipeline from a sequential stream factory.
exposing parallelStream() as a first-class citizen improves programmer perception of the library, leading them to write better code
Brian Goetz again, in response to Tim Peierls's argument that Stream.parallel()
allows programmers to understand streams sequentially before going parallel:
I have a slightly different viewpoint about the value of this sequential
intuition -- I view the pervasive "sequential expectation" as one if the
biggest challenges of this entire effort; people are constantly
bringing their incorrect sequential bias, which leads them to do stupid
things like using a one-element array as a way to "trick" the "stupid"
compiler into letting them capture a mutable local, or using lambdas as
arguments to map that mutate state that will be used during the
computation (in a non-thread-safe way), and then, when its pointed out
that what they're doing, shrug it off and say "yeah, but I'm not doing
it in parallel."
We've made a lot of design tradeoffs to merge sequential and parallel
streams. The result, I believe, is a clean one and will add to the
library's chances of still being useful in 10+ years, but I don't
particularly like the idea of encouraging people to think this is a
sequential library with some parallel bags nailed on the side.
Best Answer
The problem is that current implementation of Stream API along with the current implementation of
IteratorSpliterator
for unknown size source badly splits such sources to parallel tasks. You were lucky having more than 1024 files, otherwise you would have no parallelization benefit at all. Current Stream API implementation takes into account theestimateSize()
value returned fromSpliterator
. TheIteratorSpliterator
of unknown size returnsLong.MAX_VALUE
before split and its suffix always returnsLong.MAX_VALUE
as well. Its splitting strategy is the following:MAX_BATCH
size is reached (which is 33554432 elements).ArraySpliterator
iterating over the created array as prefix, leaving itself as suffix.Suppose you have 7000 files. Stream API asks for estimated size,
IteratorSpliterator
returnsLong.MAX_VALUE
. Ok, Stream API asks theIteratorSpliterator
to split, it collects 1024 elements from the underlyingDirectoryStream
to the array and splits toArraySpliterator
(with estimated size 1024) and itself (with estimated size which is stillLong.MAX_VALUE
). AsLong.MAX_VALUE
is much much more than 1024, Stream API decides to continue splitting the bigger part without even trying to split the smaller part. So the overall splitting tree goes like this:So after that you have five parallel tasks to be executed: actually containing 1024, 2048, 3072, 856 and 0 elements. Note that even though the last chunk has 0 elements, it still reports that it has estimatedly
Long.MAX_VALUE
elements, so Stream API will send it to theForkJoinPool
as well. The bad thing is that Stream API thinks that further splitting of first four tasks is useless as their estimated size is much less. So what you get is very uneven splitting of the input which utilizes four CPU cores max (even if you have much more). If your per-element processing takes roughly the same time for any element, then the whole process would wait for the biggest part (3072 elements) to complete. So maximum speedup you may have is 7000/3072=2.28x. Thus if sequential processing takes 41 seconds, then the parallel stream will take around 41/2.28 = 18 seconds (which is close to your actual numbers).Your work-around solution is completely fine. Note that using
Files.list().parallel()
you also have all the inputPath
elements stored in the memory (inArraySpliterator
objects). Thus you will not waste more memory if you manually dump them into theList
. Array-backed list implementations likeArrayList
(which is currently created byCollectors.toList()
) can split evenly without any problems, which results in additional speed-up.Why such case is not optimized? Of course it's not impossible problem (though implementation could be quite tricky). It seems that it's not high-priority problem for JDK developers. There were several discussions on this topic in mailing lists. You may read Paul Sandoz message here where he comments on my optimization effort.