We can create a thread by passing an implementation of Runnable to a Thread constructor. There are three ways to do it:
class Task implements Runnable {
@Override
public void run() {
System.out.println("Running");
}
}
public class Test {
public static void main(String args[]) {
Runnable r = new Task();
Thread thread = new Thread(r);
thread.start();
}
}
Or with an anonymous class:
public class Test {
public static void main(String args[]) {
Runnable r = new Runnable() {
@Override
public void run() {
System.out.println("Running");
}
};
Thread thread = new Thread(r);
thread.start();
}
}
Or with a lambda expression:
public class Test {
public static void main(String args[]) {
Runnable r = () -> System.out.println("Running");
Thread thread = new Thread(r);
thread.start();
}
}
The Callable interface is similar to Runnable, they're both designed to be executed by another thread, a Runnable however, does not return a result and cannot throw a checked exception. We can create a Callable in three ways:
class Task implements Callable<Integer> {
@Override
public Integer call() {
int n = 0;
for(int i = 0; i < 100; i++) { n += i; }
return n;
}
}
public class Test {
public static void main(String args[]) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable c = new Task();
Future<Long> future = executor.submit(c);
try {
Long result = future.get(); //waits for the thread to complete
System.out.println(result);
} catch (ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
Or with an anonymous class:
public class Test {
public static void main(String args[]) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable c = new Callable<Integer>() {
public Integer call() {
int n = 0;
for(int i = 0; i < 100; i++) { n += i; }
return n;
}
};
Future<Long> future = executor.submit(c);
try {
Long result = future.get(); //waits for the thread to complete
System.out.println(result);
} catch (ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
Or with a lambda expression:
public class Test {
public static void main(String args[]) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable c = () -> {
int n = 0;
for(int i = 0; i < 100; i++) { n += i; }
return n;
};
Future<Long> future = executor.submit(c);
try {
Long result = future.get(); //waits for the thread to complete
System.out.println(result);
} catch (ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
The ExecutorService interface represents a mechanism that executes tasks in the background.
You can use the Executors factory class to create ExecutorService implementations. Some examples are:
// Creates an Executor that uses a single worker thread operating off an unbounded queue.
ExecutorService es1 = Executors.newSingleThreadExecutor();
// Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
ExecutorService es2 = Executors.newFixedThreadPool(10);
// Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
ExecutorService es3 = Executors.newScheduledThreadPool(10);
There are a few methods to execute task with an ExecutorService:
The execute method takes a Runnable, and executes it asynchronously:
executorService.execute(new Runnable() {
public void run() {
System.out.println("A task");
}
});
executorService.shutdown();
This method takes a Runnable but returns a Future object. This object returns null when the Runnable has finished executing:
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("A task");
}
});
future.get(); //Blocks until the Runnable has finished
executorService.shutdown();
This version of the method takes a Callable, and returns a Future object with a result when it has finished executing:
Future future = executorService.submit(new Callable<String>() {
public String call() {
return "A callable";
}
});
System.out.println(future.get()); //Blocks until the Callable has finished
executorService.shutdown();
This method executes the given tasks returning the result of one that has completed successfully. You have no guarantee about which of the Callable's results you'll get, just one of the ones that finish. When one of the tasks complete or throws an exception, the rest are canceled. For example:
List<Callable<String>> callables = new ArrayList<Callable<String>>();
callables.add(new Callable<String>() {
public String call() {
return "Callable 1";
}
});
callables.add(new Callable<String>() {
public String call() {
return "Callable 2";
}
});
callables.add(new Callable<String>() {
public String call() {
return "Callable 3";
}
});
String result = executorService.invokeAny(callables);
System.out.println(result);
executorService.shutdown();
Sometimes it will print "Callable 1", sometimes "Callable 2", and other times "Callable 3".
This method executes the given tasks, returning a list of Futures holding their status and results when all complete. Future.isDone()
is true for each element of the returned list. A completed task could have terminated either normally or by throwing an exception:
List<Callable<String>> callables = new ArrayList<Callable<String>>();
callables.add(new Callable<String>() {
public String call() {
return "Callable 1";
}
});
callables.add(new Callable<String>() {
public String call() {
return "Callable 2";
}
});
callables.add(new Callable<String>() {
public String call() {
return "Callable 3";
}
});
List<Future<String>> futures = executorService.invokeAll(callables);
for(Future<String> f : futures){
System.out.println(f.get());
}
executorService.shutdown();
When you are done using the ExecutorService you should shut it down, so the threads do not keep running. To terminate the threads inside the ExecutorService you call its shutdown()
method. The ExecutorService will not shut down immediately, but it will no longer accept new tasks, and once all threads have finished current tasks, the ExecutorService shuts down. If you want to shut down the ExecutorService immediately, you can call the shutdownNow()
method. This will attempt to stop all executing tasks right away, and skips all non-processed tasks.
Deadlock describes a situation where two or more threads are blocked forever, waiting for each other. Example:
public class TestThread {
public static Object lock1 = new Object();
public static Object lock2 = new Object();
public static void main(String args[]) {
Thread t1 = new Thread(new Task1());
Thread t2 = new Thread(new Task2);
t1.start();
t2.start();
}
private static class Task1 implements Runnable {
public void run() {
synchronized (lock1) {
System.out.println("Task 1: Holding lock 1...");
try {
Thread.sleep(10);
}
catch (InterruptedException e) {}
System.out.println("Task 1: Waiting for lock 2...");
synchronized (lock2) {
System.out.println("Task 1: Holding lock 1 & 2...");
}
}
}
}
private static class Task2 implements Runnable {
public void run() {
synchronized (lock2) {
System.out.println("Task 2: Holding lock 2...");
try {
Thread.sleep(10);
}
catch (InterruptedException e) {}
System.out.println("Task 2: Waiting for lock 1...");
synchronized (lock1) {
System.out.println("Task 2: Holding lock 1 & 2...");
}
}
}
}
}
Lock starvation occurs when a thread, having lesser priority than other ones, is constantly waiting for a lock, never able to take it because other thread(s) with higher priority are constantly acquiring the lock.
A LiveLock is like a deadlock in the sense that two (or more) threads are blocking each others. But with the livelock, each thread is waiting "actively", trying to resolve the problem on its own. A live lock occurs when the combination of these processes' efforts to resolve the problem makes it impossible for them to ever terminate. For example, if two threads detect a deadlock, and try to "step aside" for each other, without proper care, they will end up being stuck in a loop always "stepping aside" and never managing to move forwards.
A race condition is a situation where two threads compete for the same resource and they try to change it at the same time, doing it in a way that causes unexpected results. The problem happens when for example, one thread checks if the value is X, then do something that depends on that value and another thread does something to the value in between the check and the do.
You have to be careful when multiple threads access shared variables, since it can result in a race condition. For example, in a method like this:
int n = 0;
void m() {
this.n = this.n + 1;
}
The increment of variable n is vulnerable to concurrency. We can use the synchronized keyword to fix this. We can, for example, synchronize the method:
int n = 0;
synchronized void m() {
this.n = this.n + 1;
}
Or just a block of code:
int n = 0;
void m() {
synchronized (this) {
this.n = this.n + 1;
}
}
Internally, Java uses a so monitor or lock to manage synchronization. This monitor is bound to an object. For synchronized instance methods, the lock is on the instance of the corresponding object. For static methods, it's the class. For synchronized blocks, the object can be specified (the example use this to refer to the instance the method belongs to). Only the thread that acquires the lock has access to the method (or block).
java.concurrent.atomic contains classes to perform atomic operations. With an atomic operation, you can safely perform the operation in parallel on multiple threads without using the synchronized keyword or locks.
We have for example:
The synchronized example above can be changed to use an AtomicInteger in this way:
AtomicInteger n = new AtomicInteger(); // creates an AtomicInteger with the initial value 0.
void m() {
n.incrementAndGet();
}
Here are some common operations for the atomic classes:
AtomicInteger ai = new AtomicInteger(10);
int val = ai.get(); // Get the value
ai.set(15); // Set the value
int expectedValue = 15;
int newValue = 20;
// If the value of ai equals expectedValue, ai is set to newValue
ai.compareAndSet(expectedValue, newValue);
ai = new AtomicInteger(10);
val = ai.getAndAdd(10); // val contains 10 but ai contains 20
val = ai.addAndGet(10); // val and ai contain 30
val = ai.getAndDecrement(); // val contains 30 but ai contains 29
val = ai.decrementAndGet(); // val and ai contain 28
val = ai.getAndIncrement(); // val contains 28 but ai contains 29
val = ai.incrementAndGet(); // val and ai contain 30
The methods updateAndGet()
and getAndUpdate()
accept a lambda expression in order to perform a function (an IntUnaryOperator in the case of AtomicInteger) upon the value, for example:
AtomicInteger ai = new AtomicInteger(10);
ai.updateAndGet(i -> i * 5); // ai contains 50
The methods accumulateAndGet()
and getAndAccumulate()
accept a lambda expression of type IntBinaryOperator (in the case of AtomicInteger) that updates the current value with the results of applying the given function to the current and given values. The function is applied with the current value as its first argument, and the given value as the second argument. For example:
AtomicInteger ai = new AtomicInteger(10);
atomicInt.accumulateAndGet(5, (a, b) -> a + b) // ai contains 15
In the case of LongAdder and DoubleAdder, they can be used to consecutively add values to a number. For example:
ExecutorService executor = Executors.newFixedThreadPool(3);
LongAdder la = new LongAdder();
IntStream.range(0, 1000)
.forEach(i -> executor.submit(la::increment)); //Adds one to la, 1000 times
System.out.println(la.sum()); // la contains 1000
This class provides the methods add(long)
(adds the given value) and increment()
(add one) and is thread-safe. But instead of just summing up a single result, this class maintains a set of variables internally to reduce contention over threads. The result can be retrieved by calling sum()
or sumThenReset()
(gets the value and reset the sum to zero). This class is prefered over the atomic classes when updates from multiple threads are more common than reads.
LongAccumulator and DoubleAccumulator are a more generalized version of the previous classes. Here's an example:
LongAccumulator acc = new LongAccumulator((a, b) -> a + b , 1L);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 5)
.forEach(i -> executor.submit(() -> acc.accumulate(i)));
System.out.println(acc.getThenReset()); //acc contains 15
In the example, a LongAccumulator is created with the function a + b
and an initial value of one. With every call to accumulate(i), both the current result and the value i are passed as parameters to the lambda expression. A LongAccumulator also maintains a set of variables internally to reduce contention over threads. The result can be retrieved by calling get()
or getThenReset()
(gets the value and reset the variables to zero).
The BlockingQueue interface represents a thread-safe queue. Generally, a thread produces objects, while another thread consumes them.
A BlockingQueue has four forms of methods:
Throws Exception | Special Value | Blocks | Times Out | |
---|---|---|---|---|
Insert | add(e) |
offer(e) |
put(e) |
offer(e, timeout, timeunit) |
Remove | remove() |
poll() |
take() |
poll(timeout, timeunit) |
Examine | element() |
peek() |
Throws Exception: If the attempted operation is not possible immediately, an exception is thrown.
Special Value: If the attempted operation is not possible immediately, a special value is returned (often true / false).
Blocks: If the attempted operation is not possible immediately, the method call blocks until it is.
Times Out: If the attempted operation is not possible immediately, the method call blocks until it is, but waits no longer than the given timeout.
It is not possible to insert null into a BlockingQueue. If you try to insert null, the BlockingQueue will throw a NullPointerException.
The implementations of the BlockingQueue are:
Here's an example using ArrayBlockingQueue:
class Producer implements Runnable {
private BlockingQueue<String> queue = null;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
// The sleeps calls will cause the Consumer to block while waiting for objects in the queue.
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private BlockingQueue<String> queue = null;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) throws Exception {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
The BlockingDeque interface in the java.util.concurrent class represents a thread-safe deque. A deque is a "Double Ended Queue", a queue which you can insert and take elements from, in both ends.
The implementation of the BlockingDeque is:
Like a BlockingQueue, a BlockingDeque has four forms of methods:
For the First Element:
Throws Exception | Special Value | Blocks | Times Out | |
---|---|---|---|---|
Insert | addFirst(e) |
offerFirst(e) |
putFirst(e) |
offerFirst(e, timeout, timeunit) |
Remove | removeFirst() |
pollFirst() |
takeFirst() |
pollFirst(timeout, timeunit) |
Examine | getFirst() |
peekFirst() |
For the Last Element:
Throws Exception | Special Value | Blocks | Times Out | |
---|---|---|---|---|
Insert | addLast(e) |
offerLast(e) |
putLast(e) |
offerLast(e, timeout, timeunit) |
Remove | removeLast() |
pollLast() |
takeLast() |
pollLast(timeout, timeunit) |
Examine | getLast() |
peekLast() |
For example:
BlockingDeque<String> deque = new LinkedBlockingDeque<>();
deque.addFirst("a");
deque.addLast("b");
String b = deque.takeLast();
String a = deque.takeFirst();
The ConcurrentMap interface represents a Map that can handle concurrent access.
The implementation of the ConcurrentMap is:
Since it extends form Map, it has the same methods as a normal map and some others for concurrent access:
ConcurrentMap<String, String> concurrentMap = new ConcurrentHashMap<>();
concurrentMap.put("key", "value");
Object value = concurrentMap.get("key");
// Puts a new value into the map only if no value exists for the given key
String val = map.putIfAbsent("key2", "value2");
// Returns the value for the given key. If doesn't exist, the passed default value is returned
value = map.getOrDefault("hi", "or not");
However, Java 8 adds new methods that support functional programming.
The method forEach()
accepts a BiConsumer lambda expression with both the key and value of the map passed as parameters. It replaces for-each loops:
concurrentMap.forEach((key, value) -> System.out.println(key + "=" + value));
The method replaceAll()
accepts a BiFunction lambda expression. The function is called with the key and the value of each map entry returning a new value to be assigned for the current key:
concurrentMap.replaceAll((key, value) -> value.toUpperCase());
To transform a single entry, use compute()
. The method accepts both the key to be computed and a bi-function. There are two variations, computeIfAbsent()
and computeIfPresent()
, that work only if the key is absent or present respectively:
concurrentMap.compute("key", (key, value) -> value.toUpperCase());
The method merge()
can be used to unify a new value with an existing value in the map. It accepts a key, the new value to be merged into the existing entry and a bi-function to specify the merging behavior of both values:
concurrentMap.merge("key", "newVal", (oldVal, newVal) -> oldVal + " merged with " + newVal);
System.out.println(concurrentMap.get("key")); // It prints "value merged with newVal"
The ConcurrentNavigableMap interface is a java.util.NavigableMap with support for concurrent access and for its submaps. The submaps are the maps returned by various methods like headMap()
, subMap()
and tailMap()
.
The implementation of ConcurrentNavigableMap is:
The headMap(T toKey)
method returns a view of the map containing the keys which are strictly less than the given key. Changes to the original map are reflected in the head map:
ConcurrentNavigableMap<String, String> map = new ConcurrentSkipListMap<>();
map.put("1", "one");
map.put("2", "two");
map.put("3", "three");
ConcurrentNavigableMap headMap = map.headMap("2");
headMap contains a ConcurrentNavigableMap with the key "1", since only this key is strictly less than "2".
The tailMap(T fromKey)
method returns a view of the map containing the keys which are greater than or equal to the given fromKey.Changes to the original map are reflected in the head map:
ConcurrentNavigableMap<String, String> map = new ConcurrentSkipListMap<>();
map.put("1", "one");
map.put("2", "two");
map.put("3", "three");
ConcurrentNavigableMap tailMap = map.tailMap("2");
tailMap contains the keys "2" and "3" because these two keys are greater than or equal to "2".
The subMap()
method returns a view of the original map which contains all keys from (including) to (excluding) two keys given as parameters to the method:
ConcurrentNavigableMap<String, String> map = new ConcurrentSkipListMap<>();
map.put("1", "one");
map.put("2", "two");
map.put("3", "three");
ConcurrentNavigableMap subMap = map.subMap("2", "3");
submap contains only the key "2", because only this key is greater than or equal to "2" and smaller than "3".
The CyclicBarrier class is a synchronization mechanism that allows a set of threads to all wait for each other to reach a common barrier point. The barrier is called cyclic because it can be re-used after the waiting threads are released.
The waiting threads wait at the CyclicBarrier until either:
await()
)CyclicBarrier.reset()
method is called by some external thread.When you create a CyclicBarrier you specify how many threads wait for it, before releasing them:
CyclicBarrier barrier = new CyclicBarrier(2);
The CyclicBarrier supports a barrier action, which is a Runnable that is executed once the last thread arrives. You pass tit in its constructor:
Runnable barrierAction = () -> System.out.println("Barrier Action") ;
CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);
And here is how a thread waits at a CyclicBarrier:
barrier.await();
// Specifying a 20 seconds timeout to release the threads, even if not all threads are waiting at the CyclicBarrier
barrier.await(10, TimeUnit.SECONDS);
You can see a complete example in the javadoc.
The CopyOnWriteArrayList class is a thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.
The iterator of CopyOnWriteArrayList is fail-safe and doesn't throw a ConcurrentModificationException even if underlying CopyOnWriteArrayList is modified once the iteration begins, because the iterator is operating on a separate copy of ArrayList. For that reason, all the updates made on CopyOnWriteArrayList are not available to the iterator. However, element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw an UnsupportedOperationException.
With CopyOnWriteArrayList, there is no lock on read, so this operation is faster. Because of this, CopyOnWriteArrayList is most useful when you have few updates and inserts and many concurrent reads than using, for example, Collections.synchronizedList(arrayList)
.
The Fork/Join Framework is designed for work that can be broken down into smaller tasks, with its results combined to produce the final result. One important concept is that ideally no worker thread is idle, idle workers steal the work from those workers who are busy, this is known as work-stealing.
It follows this algorithm:
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
The core classes of the Fork-Join framework are ForkJoinPool and ForkJoinTask.
ForkJoinPool is an implementation of ExecutorService that employs the work-stealing algorithm. It can be created like this:
ForkJoinPool pool = new ForkJoinPool(int); //creates a ForkJoinPool with the indicated parallelism level (number of initial threads in the pool)
ForkJoinPool pool = new ForkJoinPool(); //equivalent to new ForkJoinPool(Runtime.availableProcessors())
There are different ways of submitting a task to the ForkJoinPool:
void execute(ForkJoinTask<?> task)
Arranges for (asynchronous) execution of the given task.
void execute(Runnable task)
Executes the given command at some time in the future.
<T> T invoke(ForkJoinTask<T> task)
Performs the given task, returning its result upon completion.
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
Executes the given tasks, returning a list of Futures holding their status and results when all complete.
<T> ForkJoinTask<T> submit(Callable<T> task)
Submits a value-returning task for execution and returns a Future representing the pending results of the task.
<T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
Submits a ForkJoinTask for execution.
ForkJoinTask<?> submit(Runnable task)
Submits a Runnable task for execution and returns a Future representing that task.
<T> ForkJoinTask<T> submit(Runnable task, T result)
Submits a Runnable task for execution and returns a Future representing that task.
ForkJoinTask is an abstract class for creating tasks that run within a ForkJoinPool. RecursiveAction and RecursiveTask are its subclasses. The only difference between these two classes is that the RecursiveAction does not return a value while RecursiveTask does.
The main methods of ForkJoinTask are:
fork()
method allows a ForkJoinTask to be planned for asynchronous execution. A new task can be created with this method.In this example, the program finds the minimum number from a large array:
public class MinimumTaskFinder extends RecursiveTask<Integer> {
private static final int SEQUENTIAL_THRESHOLD = 5;
private final int[] data;
private final int start;
private final int end;
public MinimumTaskFinder(int[] data, int start, int end) {
this.data = data;
this.start = start;
this.end = end;
}
public MinimumTaskFinder(int[] data) {
this(data, 0, data.length);
}
@Override
protected Integer compute() {
final int length = end - start;
if (length < SEQUENTIAL_THRESHOLD) {
return computeDirectly();
}
final int split = length / 2;
final MinimumTaskFinder left = new MinimumTaskFinder(data, start, start + split);
left.fork();
final MinimumTaskFinder right = new MinimumTaskFinder(data, start + split, end);
return Math.min(right.compute(), left.join());
}
private Integer computeDirectly() {
int min = Integer.MAX_VALUE;
for (int i = start; i < end; i++) {
if (data[i] < min) {
min = data[i];
}
}
return min;
}
public static void main(String[] args) {
int[] data = new int[10000];
Random random = new Random();
for (int i = 0; i < data.length; i++) {
data[i] = random.nextInt(1000);
}
ForkJoinPool pool = new ForkJoinPool();
MinimumTaskFinder task = new MinimumTaskFinder(data);
System.out.println(pool.invoke(task));
}
}
If the size of the array is less than a threshold, then find the minimum directly by iterating over the array. Otherwise, since the problem can be broken into chunks, split the array into two halves, recurse on each half and wait for them to complete (join). Once the value is reduced to the threshold, the tasks are not further divided for parallelism. Finally, once we have the result of each half, we can find the minimum of the two and return it.
You can execute streams in parallel so Java partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results. It's important that the operations are stateless and can be executed in an arbitrary order.
A stream is not parallel by default. To make a parallel stream, invoke the method Collection.parallelStream
(if you're working with a collection) or BaseStream.parallel
:
List l = new ArrayList();
l.parallelStream().forEach(System.out:println);
// Or
Stream.of("1", "2", "3").parallel().forEach(System.out:println);
Parallel streams use a common ForkJoinPool available via the ForkJoinPool.commonPool()
method. The size of the thread-pool depends on the amount of available physical CPU cores:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());
The value can be modified by setting the following JVM parameter to a non-negative integer:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=4
A reduction operation combines all elements into a single result, such as finding the sum or maximum of a set of numbers, or accumulating elements into a list. So in addition to reduce()
, collect()
, sum()
, max()
, or count()
are also reduction operations.
The reduce()
method has the following versions:
Optional<T> reduce(BinaryOperator<T> accumulator)
Performs a reduction on the elements of this stream, using an associative accumulation function, and returns an Optional describing the reduced value, if any.
T reduce(T identity, BinaryOperator<T> accumulator)
Performs a reduction on the elements of this stream, using the provided identity value and an associative accumulation function, and returns the reduced value.
<U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)
Performs a reduction on the elements of this stream, using the provided identity, accumulation and combining functions.
A reduce operation is parallelizable as long as the function(s) used to process the elements are associative and stateless. For example:
int sum = numbers.parallelStream().reduce(0, (x,y) -> x+y);
Can be parallelized with no modification:
int sum = numbers.parallelStream().reduce(0, (x,y) -> x+y);
Reduction can operate on subsets of the data in parallel, and then combine the intermediate results to get the final answer.
A reduce operation on elements of type <T> yielding a result of type <U> requires three parameters:
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
Here, the identity element is both an initial seed value for the reduction and a default result if there are no input elements. The accumulator function takes a partial result and the next element and produces a new partial result. The combiner function combines two partial results to produce a new partial (or the final) result. Example:
int sum = numbers.parallelStream()
.reduce(0,
(sum, x) -> sum + x,
(x, y) -> x + y);