Java Concurrency

Create worker threads using Runnable, Callable and use an ExecutorService to concurrently execute tasks

Runnable

We can create a thread by passing an implementation of Runnable to a Thread constructor. There are three ways to do it:

class Task implements Runnable {
  @Override
  public void run() {
      System.out.println("Running");
  }
}

public class Test {
  public static void main(String args[]) {
    Runnable r = new Task();
    Thread thread = new Thread(r);
    thread.start();
  }
}

Or with an anonymous class:

public class Test {
  public static void main(String args[]) {
    Runnable r = new Runnable() {
      @Override
      public void run() {
          System.out.println("Running");
      }
    };
    Thread thread = new Thread(r);
    thread.start();
  }
}

Or with a lambda expression:

public class Test {
  public static void main(String args[]) {
    Runnable r = () -> System.out.println("Running");
    Thread thread = new Thread(r);
    thread.start();
  }
}

Callable

The Callable interface is similar to Runnable, they're both designed to be executed by another thread, a Runnable however, does not return a result and cannot throw a checked exception. We can create a Callable in three ways:

class Task implements Callable<Integer> {
  @Override
  public Integer call() {
      int n = 0;
      for(int i = 0; i < 100; i++) { n += i; }
      return n;
  }
}

public class Test {
  public static void main(String args[]) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Callable c = new Task();
    Future<Long> future = executor.submit(c);
    try {
        Long result = future.get(); //waits for the thread to complete
        System.out.println(result);
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    executor.shutdown();
  }
}

Or with an anonymous class:

public class Test {
  public static void main(String args[]) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Callable c = new Callable<Integer>() {
      public Integer call() {
          int n = 0;
          for(int i = 0; i < 100; i++) { n += i; }
          return n;
      }
    };
    Future<Long> future = executor.submit(c);
    try {
        Long result = future.get(); //waits for the thread to complete
        System.out.println(result);
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    executor.shutdown();
  }
}

Or with a lambda expression:

public class Test {
  public static void main(String args[]) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Callable c = () -> {
          int n = 0;
          for(int i = 0; i < 100; i++) { n += i; }
          return n;
      };
    Future<Long> future = executor.submit(c);
    try {
        Long result = future.get(); //waits for the thread to complete
        System.out.println(result);
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    executor.shutdown();
  }
}

ExecutorService

The ExecutorService interface represents a mechanism that executes tasks in the background.

You can use the Executors factory class to create ExecutorService implementations. Some examples are:

// Creates an Executor that uses a single worker thread operating off an unbounded queue.
ExecutorService es1 = Executors.newSingleThreadExecutor();
// Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
ExecutorService es2 = Executors.newFixedThreadPool(10);
// Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
ExecutorService es3 = Executors.newScheduledThreadPool(10);

There are a few methods to execute task with an ExecutorService:

execute(Runnable)

The execute method takes a Runnable, and executes it asynchronously:

executorService.execute(new Runnable() {
    public void run() {
        System.out.println("A task");
    }
});
executorService.shutdown();

submit(Runnable)

This method takes a Runnable but returns a Future object. This object returns null when the Runnable has finished executing:

Future future = executorService.submit(new Runnable() {
    public void run() {
        System.out.println("A task");
    }
});
future.get(); //Blocks until the Runnable has finished
executorService.shutdown();

submit(Callable)

This version of the method takes a Callable, and returns a Future object with a result when it has finished executing:

Future future = executorService.submit(new Callable<String>() {
    public String call() {
        return "A callable";
    }
});
System.out.println(future.get()); //Blocks until the Callable has finished
executorService.shutdown();

invokeAny(Collection<? extends Callable<T>>)

This method executes the given tasks returning the result of one that has completed successfully. You have no guarantee about which of the Callable's results you'll get, just one of the ones that finish. When one of the tasks complete or throws an exception, the rest are canceled. For example:

List<Callable<String>> callables = new ArrayList<Callable<String>>();
callables.add(new Callable<String>() {
    public String call() {
        return "Callable 1";
    }
});
callables.add(new Callable<String>() {
    public String call() {
        return "Callable 2";
    }
});
callables.add(new Callable<String>() {
    public String call() {
        return "Callable 3";
    }
});
String result = executorService.invokeAny(callables);
System.out.println(result);
executorService.shutdown();

Sometimes it will print "Callable 1", sometimes "Callable 2", and other times "Callable 3".

invokeAll(Collection<? extends Callable<T>>)

This method executes the given tasks, returning a list of Futures holding their status and results when all complete. Future.isDone() is true for each element of the returned list. A completed task could have terminated either normally or by throwing an exception:

List<Callable<String>> callables = new ArrayList<Callable<String>>();
callables.add(new Callable<String>() {
    public String call() {
        return "Callable 1";
    }
});
callables.add(new Callable<String>() {
    public String call() {
        return "Callable 2";
    }
});
callables.add(new Callable<String>() {
    public String call() {
        return "Callable 3";
    }
});
List<Future<String>> futures = executorService.invokeAll(callables);
for(Future<String> f : futures){
    System.out.println(f.get());
}
executorService.shutdown();

When you are done using the ExecutorService you should shut it down, so the threads do not keep running. To terminate the threads inside the ExecutorService you call its shutdown() method. The ExecutorService will not shut down immediately, but it will no longer accept new tasks, and once all threads have finished current tasks, the ExecutorService shuts down. If you want to shut down the ExecutorService immediately, you can call the shutdownNow() method. This will attempt to stop all executing tasks right away, and skips all non-processed tasks.

 

Identify potential threading problems among deadlock, starvation, livelock, and race conditions

Deadlock

Deadlock describes a situation where two or more threads are blocked forever, waiting for each other. Example:

public class TestThread {
   public static Object lock1 = new Object();
   public static Object lock2 = new Object();

   public static void main(String args[]) {

      Thread t1 = new Thread(new Task1());
      Thread t2 = new Thread(new Task2);
      t1.start();
      t2.start();
   }

   private static class Task1 implements Runnable {
      public void run() {
         synchronized (lock1) {
            System.out.println("Task 1: Holding lock 1...");
            try {
              Thread.sleep(10);
            }
            catch (InterruptedException e) {}
            System.out.println("Task 1: Waiting for lock 2...");
            synchronized (lock2) {
               System.out.println("Task 1: Holding lock 1 & 2...");
            }
         }
      }
   }

   private static class Task2 implements Runnable {
      public void run() {
         synchronized (lock2) {
            System.out.println("Task 2: Holding lock 2...");
            try {
              Thread.sleep(10);
            }
            catch (InterruptedException e) {}
            System.out.println("Task 2: Waiting for lock 1...");
            synchronized (lock1) {
               System.out.println("Task 2: Holding lock 1 & 2...");
            }
         }
      }
   } 
}

Starvation

Lock starvation occurs when a thread, having lesser priority than other ones, is constantly waiting for a lock, never able to take it because other thread(s) with higher priority are constantly acquiring the lock.

Livelock

A LiveLock is like a deadlock in the sense that two (or more) threads are blocking each others. But with the livelock, each thread is waiting "actively", trying to resolve the problem on its own. A live lock occurs when the combination of these processes' efforts to resolve the problem makes it impossible for them to ever terminate. For example, if two threads detect a deadlock, and try to "step aside" for each other, without proper care, they will end up being stuck in a loop always "stepping aside" and never managing to move forwards.

Race conditions

A race condition is a situation where two threads compete for the same resource and they try to change it at the same time, doing it in a way that causes unexpected results. The problem happens when for example, one thread checks if the value is X, then do something that depends on that value and another thread does something to the value in between the check and the do.

 

Use synchronized keyword and java.util.concurrent.atomic package to control the order of thread execution

synchronized

You have to be careful when multiple threads access shared variables, since it can result in a race condition. For example, in a method like this:

int n = 0;

void m() {
    this.n = this.n + 1;
}

The increment of variable n is vulnerable to concurrency. We can use the synchronized keyword to fix this. We can, for example, synchronize the method:

int n = 0;

synchronized void m() {
    this.n = this.n + 1;
}

Or just a block of code:

int n = 0;

void m() {
  synchronized (this) {
    this.n = this.n + 1;
  }
}

Internally, Java uses a so monitor or lock to manage synchronization. This monitor is bound to an object. For synchronized instance methods, the lock is on the instance of the corresponding object. For static methods, it's the class. For synchronized blocks, the object can be specified (the example use this to refer to the instance the method belongs to). Only the thread that acquires the lock has access to the method (or block).

java.util.concurrent.atomic

java.concurrent.atomic contains classes to perform atomic operations. With an atomic operation, you can safely perform the operation in parallel on multiple threads without using the synchronized keyword or locks.

We have for example:

  • AtomicBoolean, AtomicInteger, AtomicLong, and AtomicReference<V> to update a value of the corresponding type (or object reference) atomically.
  • AtomicIntegerArray, AtomicLongArray, and AtomicReferenceArray<E> to update the elements of the corresponding array type (or object reference) atomically.
  • DoubleAdder and LongAdder, where one or more variables together maintain an initially zero sum of the corresponding type.
  • DoubleAccumulator and LongAccumulator, where one or more variables together maintain a running value of the corresponding type updated using a supplied binary operator.

The synchronized example above can be changed to use an AtomicInteger in this way:

AtomicInteger n = new AtomicInteger(); // creates an AtomicInteger with the initial value 0.

void m() {
    n.incrementAndGet();
}

Here are some common operations for the atomic classes:

AtomicInteger ai = new AtomicInteger(10);
int val = ai.get(); // Get the value
ai.set(15); // Set the value

int expectedValue = 15;
int newValue      = 20;
// If the value of ai equals expectedValue, ai is set to newValue
ai.compareAndSet(expectedValue, newValue);

ai = new AtomicInteger(10);
val = ai.getAndAdd(10); // val contains 10 but ai contains 20
val = ai.addAndGet(10); // val and ai contain 30

val = ai.getAndDecrement(); // val contains 30 but ai contains 29
val = ai.decrementAndGet(); // val and ai contain 28

val = ai.getAndIncrement(); // val contains 28 but ai contains 29
val = ai.incrementAndGet(); // val and ai contain 30

The methods updateAndGet() and getAndUpdate() accept a lambda expression in order to perform a function (an IntUnaryOperator in the case of AtomicInteger) upon the value, for example:

AtomicInteger ai = new AtomicInteger(10);
ai.updateAndGet(i -> i * 5); // ai contains 50

The methods accumulateAndGet() and getAndAccumulate() accept a lambda expression of type IntBinaryOperator (in the case of AtomicInteger) that updates the current value with the results of applying the given function to the current and given values. The function is applied with the current value as its first argument, and the given value as the second argument. For example:

AtomicInteger ai = new AtomicInteger(10);
atomicInt.accumulateAndGet(5, (a, b) -> a + b) // ai contains 15

In the case of LongAdder and DoubleAdder, they can be used to consecutively add values to a number. For example:

ExecutorService executor = Executors.newFixedThreadPool(3);
LongAdder la = new LongAdder();
IntStream.range(0, 1000)
    .forEach(i -> executor.submit(la::increment)); //Adds one to la, 1000 times
System.out.println(la.sum());   // la contains 1000

This class provides the methods add(long) (adds the given value) and increment() (add one) and is thread-safe. But instead of just summing up a single result, this class maintains a set of variables internally to reduce contention over threads. The result can be retrieved by calling sum() or sumThenReset() (gets the value and reset the sum to zero). This class is prefered over the atomic classes when updates from multiple threads are more common than reads.

LongAccumulator and DoubleAccumulator are a more generalized version of the previous classes. Here's an example:

LongAccumulator acc = new LongAccumulator((a, b) -> a + b , 1L);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 5)
    .forEach(i -> executor.submit(() -> acc.accumulate(i)));
System.out.println(acc.getThenReset()); //acc contains 15

In the example, a LongAccumulator is created with the function a + b and an initial value of one. With every call to accumulate(i), both the current result and the value i are passed as parameters to the lambda expression. A LongAccumulator also maintains a set of variables internally to reduce contention over threads. The result can be retrieved by calling get() or getThenReset() (gets the value and reset the variables to zero).

 

Use java.util.concurrent collections and classes including CyclicBarrier and CopyOnWriteArrayList

BlockingQueue

The BlockingQueue interface represents a thread-safe queue. Generally, a thread produces objects, while another thread consumes them.

A BlockingQueue has four forms of methods:

Throws Exception Special Value Blocks Times Out
Insert add(e) offer(e) put(e) offer(e, timeout, timeunit)
Remove remove() poll() take() poll(timeout, timeunit)
Examine element() peek()  

Throws Exception: If the attempted operation is not possible immediately, an exception is thrown.
Special Value: If the attempted operation is not possible immediately, a special value is returned (often true / false).
Blocks: If the attempted operation is not possible immediately, the method call blocks until it is.
Times Out: If the attempted operation is not possible immediately, the method call blocks until it is, but waits no longer than the given timeout.

It is not possible to insert null into a BlockingQueue. If you try to insert null, the BlockingQueue will throw a NullPointerException.

The implementations of the BlockingQueue are:

  • ArrayBlockingQueue. A bounded blocking queue backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
  • DelayQueue. An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.
  • LinkedBlockingQueue. An optionally-bounded blocking queue based on linked nodes. This queue orders elements FIFO (first-in-first-out).
  • LinkedTransferQueue. An unbounded TransferQueue based on linked nodes. This queue orders elements FIFO (first-in-first-out) with respect to any given producer.
  • PriorityBlockingQueue. An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.
  • SynchronousQueue. A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate.

Here's an example using ArrayBlockingQueue:

class Producer implements Runnable {
    private BlockingQueue<String> queue = null;
    public Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            // The sleeps calls will cause the Consumer to block while waiting for objects in the queue.
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    private BlockingQueue<String> queue = null;
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {
    public static void main(String[] args) throws Exception {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();
        Thread.sleep(4000);
    }
}

Blocking Deque

The BlockingDeque interface in the java.util.concurrent class represents a thread-safe deque. A deque is a "Double Ended Queue", a queue which you can insert and take elements from, in both ends.

The implementation of the BlockingDeque is:

Like a BlockingQueue, a BlockingDeque has four forms of methods:
For the First Element:

Throws Exception Special Value Blocks Times Out
Insert addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, timeout, timeunit)
Remove removeFirst() pollFirst() takeFirst() pollFirst(timeout, timeunit)
Examine getFirst() peekFirst()  

For the Last Element:

Throws Exception Special Value Blocks Times Out
Insert addLast(e) offerLast(e) putLast(e) offerLast(e, timeout, timeunit)
Remove removeLast() pollLast() takeLast() pollLast(timeout, timeunit)
Examine getLast() peekLast()  

For example:

BlockingDeque<String> deque = new LinkedBlockingDeque<>();
deque.addFirst("a");
deque.addLast("b");
String b = deque.takeLast();
String a = deque.takeFirst();

ConcurrentMap

The ConcurrentMap interface represents a Map that can handle concurrent access.

The implementation of the ConcurrentMap is:

  • ConcurrentHashMap. A hash table supporting full concurrency of retrievals and high expected concurrency for updates.

Since it extends form Map, it has the same methods as a normal map and some others for concurrent access:

ConcurrentMap<String, String> concurrentMap = new ConcurrentHashMap<>();
concurrentMap.put("key", "value");
Object value = concurrentMap.get("key");

// Puts a new value into the map only if no value exists for the given key
String val = map.putIfAbsent("key2", "value2");

// Returns the value for the given key. If doesn't exist, the passed default value is returned
value = map.getOrDefault("hi", "or not");

However, Java 8 adds new methods that support functional programming.

The method forEach() accepts a BiConsumer lambda expression with both the key and value of the map passed as parameters. It replaces for-each loops:

concurrentMap.forEach((key, value) -> System.out.println(key + "=" + value));

The method replaceAll() accepts a BiFunction lambda expression. The function is called with the key and the value of each map entry returning a new value to be assigned for the current key:

concurrentMap.replaceAll((key, value) -> value.toUpperCase());

To transform a single entry, use compute(). The method accepts both the key to be computed and a bi-function. There are two variations, computeIfAbsent() and computeIfPresent(), that work only if the key is absent or present respectively:

concurrentMap.compute("key", (key, value) -> value.toUpperCase());

The method merge() can be used to unify a new value with an existing value in the map. It accepts a key, the new value to be merged into the existing entry and a bi-function to specify the merging behavior of both values:

concurrentMap.merge("key", "newVal", (oldVal, newVal) -> oldVal + " merged with " + newVal);
System.out.println(concurrentMap.get("key")); // It prints "value merged with newVal"

ConcurrentNavigableMap

The ConcurrentNavigableMap interface is a java.util.NavigableMap with support for concurrent access and for its submaps. The submaps are the maps returned by various methods like headMap(), subMap() and tailMap().

The implementation of ConcurrentNavigableMap is:

  • ConcurrentSkipListMap. A scalable concurrent ConcurrentNavigableMap implementation. The map is sorted according to the natural ordering of its keys, or by a Comparator provided at map creation time, depending on which constructor is used.

The headMap(T toKey) method returns a view of the map containing the keys which are strictly less than the given key. Changes to the original map are reflected in the head map:

ConcurrentNavigableMap<String, String> map = new ConcurrentSkipListMap<>();
map.put("1", "one");
map.put("2", "two");
map.put("3", "three");
ConcurrentNavigableMap headMap = map.headMap("2");

headMap contains a ConcurrentNavigableMap with the key "1", since only this key is strictly less than "2".

The tailMap(T fromKey) method returns a view of the map containing the keys which are greater than or equal to the given fromKey.Changes to the original map are reflected in the head map:

ConcurrentNavigableMap<String, String> map = new ConcurrentSkipListMap<>();
map.put("1", "one");
map.put("2", "two");
map.put("3", "three");
ConcurrentNavigableMap tailMap = map.tailMap("2");

tailMap contains the keys "2" and "3" because these two keys are greater than or equal to "2".

The subMap() method returns a view of the original map which contains all keys from (including) to (excluding) two keys given as parameters to the method:

ConcurrentNavigableMap<String, String> map = new ConcurrentSkipListMap<>();
map.put("1", "one");
map.put("2", "two");
map.put("3", "three");
ConcurrentNavigableMap subMap = map.subMap("2", "3");

submap contains only the key "2", because only this key is greater than or equal to "2" and smaller than "3".

CyclicBarrier

The CyclicBarrier class is a synchronization mechanism that allows a set of threads to all wait for each other to reach a common barrier point. The barrier is called cyclic because it can be re-used after the waiting threads are released.

The waiting threads wait at the CyclicBarrier until either:

  • The last thread arrives (calls await())
  • The thread is interrupted by another thread (another thread calls its interrupt() method)
  • Another waiting thread is interrupted
  • Another waiting thread times out while waiting at the CyclicBarrier
  • The CyclicBarrier.reset() method is called by some external thread.

When you create a CyclicBarrier you specify how many threads wait for it, before releasing them:

CyclicBarrier barrier = new CyclicBarrier(2);

The CyclicBarrier supports a barrier action, which is a Runnable that is executed once the last thread arrives. You pass tit in its constructor:

Runnable barrierAction = () -> System.out.println("Barrier Action") ;
CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);

And here is how a thread waits at a CyclicBarrier:

barrier.await();
// Specifying a 20 seconds timeout to release the threads, even if not all threads are waiting at the CyclicBarrier
barrier.await(10, TimeUnit.SECONDS);

You can see a complete example in the javadoc.

CopyOnWriteArrayList

The CopyOnWriteArrayList class is a thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.

The iterator of CopyOnWriteArrayList is fail-safe and doesn't throw a ConcurrentModificationException even if underlying CopyOnWriteArrayList is modified once the iteration begins, because the iterator is operating on a separate copy of ArrayList. For that reason, all the updates made on CopyOnWriteArrayList are not available to the iterator. However, element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw an UnsupportedOperationException.

With CopyOnWriteArrayList, there is no lock on read, so this operation is faster. Because of this, CopyOnWriteArrayList is most useful when you have few updates and inserts and many concurrent reads than using, for example, Collections.synchronizedList(arrayList).

 

Use parallel Fork/Join Framework

The Fork/Join Framework is designed for work that can be broken down into smaller tasks, with its results combined to produce the final result. One important concept is that ideally no worker thread is idle, idle workers steal the work from those workers who are busy, this is known as work-stealing.

It follows this algorithm:

if (problem is small)
    directly solve problem
else {
    split problem into independent parts
    fork new subtasks to solve each part
    join all subtasks
    compose result from subresults
}

The core classes of the Fork-Join framework are ForkJoinPool and ForkJoinTask.

ForkJoinPool

ForkJoinPool is an implementation of ExecutorService that employs the work-stealing algorithm. It can be created like this:

ForkJoinPool pool = new ForkJoinPool(int); //creates a ForkJoinPool with the indicated parallelism level (number of initial threads in the pool)
ForkJoinPool pool = new ForkJoinPool(); //equivalent to new ForkJoinPool(Runtime.availableProcessors())

There are different ways of submitting a task to the ForkJoinPool:

void execute(ForkJoinTask<?> task)

Arranges for (asynchronous) execution of the given task.

void execute(Runnable task)

Executes the given command at some time in the future.

<T> T invoke(ForkJoinTask<T> task)

Performs the given task, returning its result upon completion.

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

Executes the given tasks, returning a list of Futures holding their status and results when all complete.

<T> ForkJoinTask<T>    submit(Callable<T> task)

Submits a value-returning task for execution and returns a Future representing the pending results of the task.

<T> ForkJoinTask<T>    submit(ForkJoinTask<T> task)

Submits a ForkJoinTask for execution.

ForkJoinTask<?> submit(Runnable task)

Submits a Runnable task for execution and returns a Future representing that task.

<T> ForkJoinTask<T>    submit(Runnable task, T result)

Submits a Runnable task for execution and returns a Future representing that task.

ForkJoinTask

ForkJoinTask is an abstract class for creating tasks that run within a ForkJoinPool. RecursiveAction and RecursiveTask are its subclasses. The only difference between these two classes is that the RecursiveAction does not return a value while RecursiveTask does.

The main methods of ForkJoinTask are:

  • The fork() method allows a ForkJoinTask to be planned for asynchronous execution. A new task can be created with this method.
  • The join() method allows a ForkJoinTask to wait for the completion of another one.

In this example, the program finds the minimum number from a large array:

public class MinimumTaskFinder extends RecursiveTask<Integer> {

  private static final int SEQUENTIAL_THRESHOLD = 5;

  private final int[] data;
  private final int start;
  private final int end;

  public MinimumTaskFinder(int[] data, int start, int end) {
    this.data = data;
    this.start = start;
    this.end = end;
  }

  public MinimumTaskFinder(int[] data) {
    this(data, 0, data.length);
  }

  @Override
  protected Integer compute() {
    final int length = end - start;
    if (length < SEQUENTIAL_THRESHOLD) {
      return computeDirectly();
    }
    final int split = length / 2;
    final MinimumTaskFinder left = new MinimumTaskFinder(data, start, start + split);
    left.fork();
    final MinimumTaskFinder right = new MinimumTaskFinder(data, start + split, end);
    return Math.min(right.compute(), left.join());
  }

  private Integer computeDirectly() {
    int min = Integer.MAX_VALUE;
    for (int i = start; i < end; i++) {
      if (data[i] < min) {
        min = data[i];
      }
    }
    return min;
  }

  public static void main(String[] args) {
    int[] data = new int[10000];
    Random random = new Random();
    for (int i = 0; i < data.length; i++) {
      data[i] = random.nextInt(1000);
    }

    ForkJoinPool pool = new ForkJoinPool();
    MinimumTaskFinder task = new MinimumTaskFinder(data);
    System.out.println(pool.invoke(task));
  }
}

If the size of the array is less than a threshold, then find the minimum directly by iterating over the array. Otherwise, since the problem can be broken into chunks, split the array into two halves, recurse on each half and wait for them to complete (join). Once the value is reduced to the threshold, the tasks are not further divided for parallelism. Finally, once we have the result of each half, we can find the minimum of the two and return it.

 

Use parallel Streams including reduction, decomposition, merging processes, pipelines and performance.

You can execute streams in parallel so Java partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results. It's important that the operations are stateless and can be executed in an arbitrary order.

A stream is not parallel by default. To make a parallel stream, invoke the method Collection.parallelStream (if you're working with a collection) or BaseStream.parallel:

List l = new ArrayList();
l.parallelStream().forEach(System.out:println);
// Or
Stream.of("1", "2", "3").parallel().forEach(System.out:println);

Parallel streams use a common ForkJoinPool available via the ForkJoinPool.commonPool() method. The size of the thread-pool depends on the amount of available physical CPU cores:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());

The value can be modified by setting the following JVM parameter to a non-negative integer:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=4

Reduction

A reduction operation combines all elements into a single result, such as finding the sum or maximum of a set of numbers, or accumulating elements into a list. So in addition to reduce(), collect(), sum(), max(), or count() are also reduction operations.

The reduce() method has the following versions:

Optional<T>    reduce(BinaryOperator<T> accumulator)

Performs a reduction on the elements of this stream, using an associative accumulation function, and returns an Optional describing the reduced value, if any.

T    reduce(T identity, BinaryOperator<T> accumulator)

Performs a reduction on the elements of this stream, using the provided identity value and an associative accumulation function, and returns the reduced value.

<U> U    reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)

Performs a reduction on the elements of this stream, using the provided identity, accumulation and combining functions.

A reduce operation is parallelizable as long as the function(s) used to process the elements are associative and stateless. For example:

int sum = numbers.parallelStream().reduce(0, (x,y) -> x+y);

Can be parallelized with no modification:

int sum = numbers.parallelStream().reduce(0, (x,y) -> x+y);

Reduction can operate on subsets of the data in parallel, and then combine the intermediate results to get the final answer.

A reduce operation on elements of type <T> yielding a result of type <U> requires three parameters:

 <U> U reduce(U identity,
              BiFunction<U, ? super T, U> accumulator,
              BinaryOperator<U> combiner);

Here, the identity element is both an initial seed value for the reduction and a default result if there are no input elements. The accumulator function takes a partial result and the next element and produces a new partial result. The combiner function combines two partial results to produce a new partial (or the final) result. Example:

int sum = numbers.parallelStream()
                  .reduce(0, 
                    (sum, x) -> sum + x,
                    (x, y) -> x + y);

 

Content