Java 8 Parallel Streams

Using Parallel Stream

Java 8 streams comes with some convenience to do parallel processing by just calling the method parallel() or parallelStream() and all the magic happens behind the scene. But it comes with a price. If you are not careful about a few things it can actually backfire and cause problems with performance or may even cause concurrency problems or race conditions.

A few things to take into consideration:

  1. Are the operations in the stream stateful or stateless?

Consider the following example:

words.stream()
.filter(
word->word.startsWith("a")||word.startsWith("e")||word.startsWith("i")
)
  .forEach(System.out::println);

Here the filter operation is stateless operation because it only operates on one element at a time. It does not depend on any other elements to complete the operation.

Consider the following. Inorder for the elements of the stream to be sorted the stream needs to keep track of other elements for comparison. So it is not just operating on one element.

words.stream()
        .filter(
word->word.startsWith("a")||word.startsWith("e")||word.startsWith("i"))
        .sorted()
        .forEach(System.out::println);

Parallel operation should only be done when you have stateless streams. If we introduce parallel() in a stateful stream it will give less performant operation than when it is executed without parallel() method.

How to find out whether an operation is parallel or not?

The easiest way is to use Java docs for Stream. Each method has documentation of whether they are stateful or stateless.

 

2.Do I have an external object to store values of results of the stream?

List<String> result = new ArrayList<>();

words.stream()
        .filter(
word->word.startsWith("a")||word.startsWith("e")||word.startsWith("i"))
        .forEach(
                result::add
        );

In the above case if we introduce parallel operation as follows it would not only be less performant but would give rise to race conditions as the ArrayList is not thread safe. One quick fix to avoid race condition is to use a CopyOnWriteArrayList(). But again this won’t be performant for larger datasets because of the copy operation.

List<String> result = new CopyOnWriteArrayList<>();
words.stream()
        .parallel()
        .filter(
word->word.startsWith("a")||word.startsWith("e")||word.startsWith("i"))
        .forEach(
                result::add
        );

A better solution would be to use the collect method which is a special type of reduce operation. So the above example could be rewritten with collect() as follows.

List<String> result = words.stream()
        .parallel()
        .filter(
word->word.startsWith("a")||word.startsWith("e")||word.startsWith("i"))
        .collect(Collectors.toList());

This is guaranteed to be thread-safe. The collect method is called a mutable collection as it collects data in a mutable container.

How much can the thread pool grow?

When the parallel() method is invoked on a stream it uses Fork/Join API. By default Fork/Join uses all threads available in the thread pool. This could become problematic if you have a large data set coming in and it uses all your threads in the thread pool of your Application Server.

You can use the following system property to set the number of threads to be used by Fork/Join pool.

System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,4);

 

Leave a Reply

Your email address will not be published. Required fields are marked *