Developing Concurrent applications using ExecutorService Framework in Java

We all have used Thread, Runnable to develop multi-threaded applications in Java. While we used Thread and Runnable we had a lot of work to do in terms of assigning the task to the Thread, starting the thread to waiting for it to complete the execution to get the result from each thread. In Java 5 there were some really good constructs including ExecutorService Framework in Java introduced for developing concurrent applications using the multi-threading support provided by JVM. These were added to the java.util.concurrent package.

also read:

There are lots of developers who worked on Pre Java 5 versions and developed their own Concurrent frameworks around the basic support Java provided. Lot of them still continue to use their frameworks. In this post I would like to see how we can evolve a simple problem implemented in a sequential way to make use of Thread and Runnable and then to make use of some advanced constructs in java.util.concurrent package more specifically ExecutorService Framework in Java.

Problem is: There is a list of 11 integer arrays each of different size. We wish to compute the sum of the integers in each of those arrays. We can do this sequentially picking one array at a time or use multiple threads for each array and compute the sum and add it to a data structure or use the ExecutorService framework to divide the operation into multiple task and obtain the result from each task.
The main aim of this simple problem/activity is to see how we can exploit the power of the language API.

Note: This code sample would work on Java 7 only as I have made use of the Files API provided in Java 7.

Before we start with the calculation on sum, we have stored the arrays in a file and places it in some directory(In this case I have placed it in the file named “arrays” in my linux home directory). We then read the file and construct the List of integer arrays. The contents of the file are:

4,5,6,7,84,567,1234,678,23,1234,567,89,22,32,56,77,63
67,12,34,21,23,81,49,4,1,9,49,2312
123,78,234,568,98,38,234,9,7,4,2,1,23,67
34,56,78,96,23,58,28,18,235,875,12,6,7,8,4
75,2435,7,345,234,123,1267,4,56,73,4,5,6,7,8
23,78,234,568,98,38,234,9,7,4,2,1,2,67,61,8
534,56,78,96,23,58,28,18,23,875,12,6,7,8,4,12,7
5,255,7,345,24,123,17,4,56,73,4,5,6,7,8
13,78,234,568,98,38,234,9,7,4,2,1,23,67
4,56,78,96,23,58,28,18,235,875,12,6,7,8,4
7,235,7,35,234,123,126,4,56,73,4,5,6,7,8

And the code which loads the above data in to a List of List of Integers is:

 private static List<List<Integer>> loadDataFromFile()
          throws IOException {
    Path path = Paths.get("/home/mohamed/arrays");

    List<String> linesInFile = Files.readAllLines(path, 
        Charset.defaultCharset());

    List<List<Integer>> arrays = new ArrayList<>();

    for ( String s : linesInFile){

      String [] sArray = s.split(",");

      List<Integer> integers = new ArrayList<>();

      for ( String sInt : sArray){
        
        integers.add(Integer.parseInt(sInt));
        
      }
      arrays.add(integers);
    }

    return arrays;
  }

Sequential approach to finding the sum of each array

In each of the examples below I would just give the method which calculates the sum and out put of that method. At the end of the post there is a link to download the complete code.

 private static List<Integer> calculateSequentially(
        List<List<Integer>> integerArrays) 
 {
    List<Integer> sumOfEachArray = new ArrayList<>();

    for ( List<Integer> integers : integerArrays)
    {
      int sum = 0;
      for(Integer i : integers)
      {
        sum+=i;
      }

      sumOfEachArray.add(sum);

    }

    return sumOfEachArray;
  }

The above code is very straight forward- iterate through each array and find its sum.
The output is:

Array 1:4748
Array 2:2662
Array 3:1486
Array 4:1538
Array 5:4649
Array 6:1434
Array 7:1845
Array 8:939
Array 9:1376
Array 10:1508
Array 11:930

Before I go into solving the above problem in a concurrent way, let me caution you about some of the performance problems one would encounter related to concurrent applications:

  • Cost incurred due to context switching- When ever the currently running thread is replaced by another thread to execute, there is a context switch involved which includes saving of the state of the thread executing currently and then restoring the state of the thread to be executed. If the CPU spends lot of time in context switching- may be due to frequent thread scheduling or threads completing very quickly then it may not get time to do useful work.
  • Threads may block due to lock contention which slows down the application.
  • Memory synchronisation related overhead- as there are multiple threads accessing the shared memory, there is a lot of overhead involved in maintaining the consistent state of the data in the memory. At times there are lot of blocking calls in the APIs used which further delay the execution/completion of the threads.

One should avoid premature optimisation and also decide whether concurrency is really required in the application. Adding features which are not really necessary is often an overhead. If the tasks can be completed fast and you dont expect too many of them coming in at the same time its not necessary to execute them concurrently. The examples which follow are just to illustrate the various concepts involved in using the ExecutorService Framework in Java.

Using Multi-Threaded approach to finding sum

After a brief insight into the performance aspects of concurrent applications, lets go ahead and make the sum calculation concurrent. Lets spawn one thread for each array and let the thread calculate the sum. In this case we make use of a ConcurrentHashMap to allow each thread to update its result into that map.

The Runnable which takes an Array and calculates the sum of its elements is defined below. In addition it has a name of the array which it is dealing with and also a reference to the instance of the ConcurrentHashMap which is shared among multiple threads.

class ArraySumCalculator implements Runnable{

  ConcurrentHashMap<String, Integer> sumMap;
  List<Integer> integers;
  String arrayName;
  
  ArraySumCalculator(ConcurrentHashMap<String, Integer> sumMap,
                     List<Integer> integers,
                     String arrayName){

    this.sumMap = sumMap;
    this.integers = integers;
    this.arrayName = arrayName;

  }
  
  @Override
  public void run() {

    int sum = 0;
    for ( Integer i : integers){
      sum += i;
    }

    sumMap.put(arrayName, sum);

  }
}

And the below code would create multiple threads and launch them by passing in the corresponding ArraySumCalculator instance.

private static ConcurrentHashMap<String, Integer> calculateUsingThreads(
          List<List<Integer>> integerArrays){

    ConcurrentHashMap<String, Integer> sumMap =
            new ConcurrentHashMap<>(integerArrays.size());
    
    List<Thread> threads = new ArrayList<>();
    int i = 1;
    
    //Create thread instances for each array
    for ( List<Integer> integers : integerArrays){
      
      String arrayName = "Array "+i;
      
      Thread thread = new Thread(
              new ArraySumCalculator(sumMap,integers,arrayName));
      threads.add(thread);
      i++;
    }

    //Now launch all the threads at the same time
    for ( Thread thread : threads){
      thread.start();
      try {
        thread.join();
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }

    return sumMap;
  }

The above code first creates the Thread instances along with the corresponding ArraySumCalculator instance and collects them in a List. And then iterates through this List to launch each thread and then join with the launched thread so that the Main thread would wait until all the threads have completed execution and updated the map.
The output of the above method would be:

Array 1:4748
Array 2:2662
Array 3:1486
Array 4:1538
Array 5:4649
Array 6:1434
Array 7:1845
Array 8:939
Array 9:1376
Array 10:1508
Array 11:930

In this example we had to do 2 things which are not recommended:

  • use a common Map (thought its supports concurrent updates) to store the output generated in each thread.
  • manage the launching of each thread, which ideally should be managed by a framework. In the above example we created 11 threads i.e one thread for each array. But the underlying processor may support execution of only 2 threads concurrently. So there is lot of context switching involved. We could have used a ThreadPool but managing the ThreadPool would be another liability.

As I said at the start of the post that Java 5 provides a concurrency framework called Executor which provides a flexible and powerful framework for asynchronous task execution that supports a wide variety of task execution policies. It provides a standard means of decoupling task submission from task execution, describing tasks with Runnable.

Using Executor with Runnable

In this example lets still stick with the idea of multiple Runnables updating the ConcurrentHashMap and instead of we spawning different threads, lets leave it to the Executor to allocate tasks to different threads. Executor provides static factory methods to create thread pool:

  • newFixedThreadPool: A fixed-size thread pool creates threads as tasks are submitted, up to the maximum pool size, and then attempts to keep the pool size constant.
  • newCachedThreadPool: A cached thread pool has more flexibility to reap idle threads when the current size of the pool exceeds the demand for processing and to add new threads when the demand increases.
  • newSingleThreadExecutor: A single threaded executor creates a single worker thread to process tasks, replaces it if it dies unexpectedly.
  • newScheduledThreadPool: A fixed-size thread pool that supports delayed and periodic task execution.

In the below example we make use of a Fixed sized thread pool where in the size is decided by the number of CPUs available in the hardware. For compute intensive tasks, an N CPU processor system achieves optimum utilisation with a thread pool of N+1 threads.

If we just use Executor, we will not be able to shutdown it. An Executor implementation is likely to create threads for processing tasks. But the JVM cannot exit until all the nondaemon threads have terminated, so failing to shutdown an Executor could prevent the JVM from exiting. The ExecutorService interface extends Executor adding a number of methods for lifecycle management. We will see how we can use this interface in the below example:

private static ConcurrentHashMap<String, Integer> calculateUsingExecutor(
        List<List<Integer>> integerArrays){

  ConcurrentHashMap<String, Integer> sumMap = new
          ConcurrentHashMap<>(integerArrays.size());

  //Number of threads = 1 more than number of processors.
  ExecutorService executor = Executors.newFixedThreadPool(
          Runtime.getRuntime().availableProcessors()+1);

  int i = 1;
  for(List<Integer> integers : integerArrays){

    String arrayName = "Array "+i;

    executor.execute(
            new ArraySumCalculator(sumMap,integers, arrayName));
    i++;
  }

  //Shutdown the executor
  executor.shutdown();

  //Await for sometime so as to allow the pending tasks to be picked
  try {

    executor.awaitTermination(60,TimeUnit.SECONDS);

  } catch (InterruptedException e) {

    Thread.currentThread().interrupt();
  }

  return sumMap;
}

In the above example we make use of the static factory method to create an instance of ExecutorService and provide the number of threads to be created in the thread pool. We then ask the executor to execute each of the Runnables. The ExecutorService assigns the tasks to the threads in the thread pool. The shutdown() method of ExecutorService is invoked which allows the tasks already submitted to complete and doesn’t accept any new tasks. We then wait for termination by invoking awaitTermination and pass the time to wait for. The output for the above would be same as that for other cases.

There are a few problems in this approach as well:

  • we still are making use of the common Map for updating the results and still use the same Runnable.
  • we need to use some timeout to wait so that all the tasks are picked up for execution and not terminate prematurely.

There’s yet another way to overcome the above shortcomings. And this use another class called: Callable.

Using ExecutorService with Callable

The problem with Runnable is that its run method doesn’t return any result. Callable interface is similar to Runnable but it supports returning value from its call method. In our example we would calculate the sum of the integers in the list and return their sum instead of updating it in the map.

class ArraySumCallable implements Callable<Integer>{

  List<Integer> integers;

  ArraySumCallable(List<Integer> integers){
    
    this.integers = integers;
  }
  
  @Override
  public Integer call() throws Exception {
    
    Integer sum = 0;
    
    for (Integer i : integers){
      sum += i;
    }
    
    return sum;

  }
}

Lets use the above Callable and ExecutorService to calculate the sum of the elements in various arrays. We initially create all the Callable instances providing the integer array to each one. And then make use of invokeAll method of the ExecutorService to submit all the Callables for execution. The invokeAll method returns a List of Future. A Future represents a return of the asynchronous operation. In our case the operation is to calculate the sum of the integers in the array. We need not maintain a common map for threads to update. Instead we could get instances of Future for each task and then invoke the get() method on the instance of Future to obtain the result of the task. The get method is the blocking call i.e it waits for the result from the task. Lets have a look at the code:

public static List<Integer> calculateSumUsingCallable(
        List<List<Integer>> integerArrays){

  List<Callable<Integer>> callables = new ArrayList<>(integerArrays.size());

  List<Integer> arraySum = new ArrayList<>(integerArrays.size());
  
  //Create callables for each array
  for ( List<Integer> integers : integerArrays){
    Callable<Integer> callable = new ArraySumCallable(integers);
    callables.add(callable);
  }

  //get fixed thread pool executor
  ExecutorService executorService = Executors.newFixedThreadPool(
          Runtime.getRuntime().availableProcessors() + 1);


  try {

    //Submit all the callables and obtain their Futures
    List<Future<Integer>> futures =
            executorService.invokeAll(callables);

    executorService.shutdown();

    //Iterate through the futures and get the sum
    for ( Future<Integer> future : futures){
      Integer sum = future.get();

      arraySum.add(sum);


    }

  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();

  } catch (ExecutionException e) {
    System.out.println("Exception while calculating sum");

  }

  return arraySum;

}

The output is the same as given in the earlier examples.

To summarise:

  1. Creates a sequential program to calculate the sum of the elements of each array from the list of arrays
  2. Used multiple threads to calculate the sum, found that we had to manage the thread creation and also make use of common map which was error prone
  3. We made use of ExecutorService along with Runnable to create different tasks and leave it to framework to allocate threads for each task.
  4. Made use of ExecutorService Framework in Java along with Callable so as to create result bearing tasks and used Future to get the result from each task.

One can download the complete sample code from here.

If any one is interested to learn more about using ExecutorService Framework in Java and other concurrency constructs, you can pick Java Concurrency In Practice.

Comments

comments

About Mohamed Sanaulla

In his day job he works on developing enterprise applications using ADF. He is also the moderator of JavaRanch forums and an avid blogger.

Trackbacks

  1. [...] Using ExecuterService framework to develop applications which support concurrent execution of tasks. More details can be read in the post here. [...]

Speak Your Mind

*