Fork-Join Framework was added as part of Java 7 which makes use of the ExecutorService interface to distribute tasks to the worker threads in the thread pool. The difference in Fork-Join is that the ideal worker threads can steal queued subtasks from the other tasks and execute them.
also read:
There are a few important classes you should be aware of:
- ForkJoinPool
- RecursiveTask
- ForkJoinTask
- RecursiveAction
ForkJoinTask:
This represents the abstract task which runs within the ForkJoinPool. They are lightweight threads in the sense quite a lot of tasks can run withing few threads with some restrictions. There are two methods: fork() and join(). fork() is used to spawn new tasks, where as join() waits for the completion of the task. There are two extensions of ForkJoinTask- RecursiveTask and RecursiveAction. RecursiveTask is used when the task is expected to return some value, while RecursiveAction is used when the task preforms some action but doesn’t return any value. It is recommended to extend either RecursiveAction or RecursiveTask and not directly extend ForkJoinTask.
RecursiveTask:
This is a subclass of ForkJoinTask whose compute method returns some value.
RecursiveAction:
This is a subclass of ForkJoinTask whose compute method doesn’t return any value.
ForkJoinPool:
It is an ExecutorService for running ForkJoinTasks and also managing and monitoring the tasks. It employs worker-stealing algorithm where in idle threads can pick subtasks created by other active tasks and execute them. In this way there is efficient execution of tasks spawned by other tasks.
Lets see how we can find sum of some 1000 integers using ForkJoin framework. This is quite a trivial example and not worthy of being implemented as ForkJoin. The main motive behind this is to throw some light of the implementation aspects of ForkJoin.
In this example we will divide the array of integers into half and assign each half to a RecursiveTask. If the array size is less than 20 elements then we assign it to another RecursiveTask which would return the computed sum of those array elements.
RecursiveTask for computing the sum
//Task for computing the sum of array elements. class SumCalculatorTask extends RecursiveTask<Integer>{ int [] numbers; SumCalculatorTask(int[] numbers){ this.numbers = numbers; } @Override protected Integer compute() { int sum = 0; for (int i : numbers){ sum += i; } return sum; } }
The compute method has to be overridden with the actual task to be performed. In the above case its iterate through the elements of the array and return the computed sum.
RecursiveTask for dividing the array
We create a RecursiveTask for dividing the array into two parts and assign each part to another RecursiveTask for further dividing. We continue dividing the array and stop dividing when the array has less than 20 elements.
class NumberDividerTask extends RecursiveTask<Integer>{ int [] numbers; NumberDividerTask(int [] numbers){ this.numbers = numbers; } /* * If the array has less than 20 elements, then * just compute the sum, else split the array further. */ @Override protected Integer compute() { int sum = 0; List<RecursiveTask<Integer>> forks = new ArrayList<>(); if ( numbers.length > 20){ NumberDividerTask numberDividerTaskFirst = new NumberDividerTask(Arrays .copyOfRange(numbers, 0, numbers.length/2)); NumberDividerTask numberDividerTaskSecond = new NumberDividerTask(Arrays .copyOfRange(numbers, numbers.length/2, numbers.length)); forks.add(numberDividerTaskFirst); forks.add(numberDividerTaskSecond); numberDividerTaskFirst.fork(); numberDividerTaskSecond.fork(); } else{ SumCalculatorTask sumCalculatorTask = new SumCalculatorTask(numbers); forks.add(sumCalculatorTask); sumCalculatorTask.fork(); } //Combine the result from all the tasks for ( RecursiveTask<Integer> task : forks){ sum += task.join(); } return sum; } }
Each of the above task spawns either 2 other NumberDividerTask or a SumCalculatorTask. Each task keeps a track of the sub tasks it has spawned in the forks. At the end of the task we wait for all the tasks in the forks list to finish by invoking join() method and add the return values from each of the subtasks.
To invoke the above defined tasks we make use of ForkJoinPool and create a NumberDividerTask task by giving it the array whose sum we wish to compute.
public class ForkJoinTest { static ForkJoinPool forkJoinPool = new ForkJoinPool(); public static final int LENGTH = 1000; public static void main(String[] args) { int [] numbers = new int[LENGTH]; //Create an array with some values. for(int i=0; i<LENGTH; i++){ numbers[i] = i * 2; } /* * Invoke the NumberDividerTask with the array * which in turn creates multiple sub tasks. */ int sum = forkJoinPool.invoke(new NumberDividerTask(numbers)); System.out.println("Sum: "+sum); } }
The output: Sum: 999000.
This is just a basic, trivial example of using ForkJoin framework. I know this sum computation could have been implemented sequentially, but the idea here is to understand the working of the ForkJoin. In future I would try and post a ForkJoin version of some Divide and Conquer algorithm.