Sunday, November 17, 2024
Google search engine
HomeLanguagesJavaJava – ForkJoinPool vs ExecutorService

Java – ForkJoinPool vs ExecutorService

ForkJoinPool is designed to be used for CPU-intensive workloads. The default number of threads in ForkJoinPool is equal to the number of CPUs on the system. If any threads go into waiting for state due to calling join() on some other ForkJoinTask a new compensatory thread is started to utilize all CPUs of the system. ForkJoinPool has a common pool which can be get by calling ForkJoinPool.commonPool() static method. The aim of this design is to use only a single ForkJoinPool in the system with the number of threads being equal to the number of processors on the system. It can utilize the full computation capacity of the system if all ForkJoinTasks are doing computation-intensive activities.

But in real-life scenarios tasks are a mix of CPU and IO intensive tasks. IO intensive tasks are a bad choice for a ForkJoinPool. You should use the Executor service for doing IO-intensive tasks. In ExecutorService you can set a number of threads according to the IO capacity of your system instead of the CPU capacity of your system.

If you want to call an IO intensive operation from a ForkJoinTask then you should create a class that implements ForkJoinPool.ManagedBlocker interface and do IO intensive operation in block() method. You need to call your ForkJoinPool.ManagedBlocker implementation using static method ForkJoinPool.managedBlock(). This method creates compensatory threads before calling block() method. block() method is supposed to do IO operation and store result in some instance variable. After calling ForkJoinPool.managedBlock() you are supposed to call your business method to get the result of the IO operation. This way you can mix CPU-intensive operations with IO-intensive operations. A classic example is WebCrawler where you fetch pages from the internet which is an IO-intensive operation and after that, you need to parse the HTML page to extract links which is a CPU-intensive operation.

Procedure:

The main() method in order to achieve the goal is as follows:

  1. Creates a fixed thread pool of 50 threads for executing HTTP calls in parallel.
  2. Get reference to ForkJoinPool.commonPool()
  3. Creates two instances of MyRecursiveTask and submit them to ForkJoinPool
  4. MyRecurciveTask calls FetchPage.block() method using ForkJoinPool.managedBlock() method
  5. FetchPage.block() method submit the task to the fixed thread pool and wait for result
  6. MyRecursiveTask receives the content of the page
  7. MyRecursiveTask calculates the SHA sum of the content of the page and returns it
  8. main() method prints SHA sum of the URL’s

Implementation:

We have not implemented a full WebCrawler but a sample code where we are fetching web pages using an ExecutorService with 50 threads so do we are using a common pool of ForkJoinPool for submitting ForkJoinTasks. Our ForkJoinTask submits the page fetch request to ExecutorService and wait for result using ForkJoinPool.managedBlock() static method. After getting the page it calculates the SHA-256 sum for the content of the page and stores it in a ConcurrentHashMap. This way we can make full use of the CPU capacity of the system and the IO capacity of the system.

Example

Java




// Java Program to Showcase When to use
// ForkJoinPool vs ExecutorService
  
// Importing required classes
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
  
// Class 1
// helper class implementing ForkJoinPool
class ForkJoinPoolTest {
  
    // ManagedBlocker and fetches Page for provided in
    // constructor inside block() method of the interface.
  
    // The result is stored in private variable pageBytes
    // which is returned by method getPage()
  
    // Method 1
    // TO fetch data from page
    public static class FetchPage
        implements ForkJoinPool.ManagedBlocker {
  
        private String url;
        private ExecutorService executorSerivce;
        private byte[] pageBytes;
  
        private static ConcurrentHashMap<String, byte[]>
            pagesMap = new ConcurrentHashMap<>();
  
        public FetchPage(String url,
                         ExecutorService executorSerivce)
        {
  
            // This keyword refers to current instance
            // itself
            this.url = url;
            this.executorSerivce = executorSerivce;
        }
  
        // Method 2
        // ForkJoinPool.managedBlock() method
        @Override
        // It also start a compensatory thread while current
        // thread is blocked in this method.
        public boolean block() throws InterruptedException
        {
  
            if ((pageBytes = pagesMap.get(url)) != null) {
                return true;
            }
  
            Callable<byte[]> callable
                = new Callable<byte[]>() {
                      public byte[] call() throws Exception
                      {
  
                          CloseableHttpClient client
                              = HttpClients.createDefault();
                          HttpGet request
                              = new HttpGet(url);
                          CloseableHttpResponse response
                              = client.execute(request);
                          return EntityUtils.toByteArray(
                              response.getEntity());
                      }
                  };
  
            Future<byte[]> future
                = executorSerivce.submit(callable);
  
            // Try block to check for exceptions
            try {
                pageBytes = future.get();
            }
  
            // Catch block to handle the exceptions
            catch (InterruptedException
                   | ExecutionException e) {
                pageBytes = null;
            }
            return true;
        }
  
        // Method 3
        // Returning true if result is ready and
        // There is no need to call block() method.
        @Override public boolean isReleasable()
        {
  
            if (pageBytes != null) {
                return true;
            }
            return false;
        }
  
        // Method 4
        public byte[] getPage() { return pageBytes; }
    }
  
    // Class 2
    // Static class
    static class MyRecursiveTask
        extends RecursiveTask<String> {
  
        // This class implements RecurciveTask which fetches
        // page for the URL specified in constructor by
        // calling FetchPage class using
        // ForkJoinPool.managedBlock() method and calculates
        // SHA sum for the content of the page.
        private String url;
        private ExecutorService executorSerivce;
  
        // Method 1
        public MyRecursiveTask(
            String url, ExecutorService executorSerivce)
        {
  
            // This keyword refers to current instance
            // itself
            this.url = url;
            this.executorSerivce = executorSerivce;
        }
  
        // Method 2
        protected String compute()
        {
  
            // Try block to check for exceptions
            try {
  
                FetchPage fp
                    = new FetchPage(url, executorSerivce);
                ForkJoinPool.managedBlock(fp);
                byte[] bytes = fp.getPage();
                if (bytes != null) {
                    String code
                        = toHexString(getSHA(bytes));
                    hashPageMap.put(url, code);
                    return code;
                }
            }
  
            // Handling exceptions
            catch (InterruptedException
                   | NoSuchAlgorithmException e) {
                return null;
            }
            return null;
        }
    }
  
    // Method 3
    private static ConcurrentHashMap<String, String>
        hashPageMap = new ConcurrentHashMap<>();
  
    // Method 4
    // Main driver method
    public static void main(String[] args)
    {
        ExecutorService executorSerivce
            = Executors.newFixedThreadPool(50);
        ForkJoinPool forkJoinPool
            = ForkJoinPool.commonPool();
  
        MyRecursiveTask task1 = new MyRecursiveTask(
            "https://www.yahoo.com", executorSerivce);
        MyRecursiveTask task2 = new MyRecursiveTask(
            "https://www.google.com", executorSerivce);
  
        Future<String> f1 = forkJoinPool.submit(task1);
        Future<String> f2 = forkJoinPool.submit(task2);
  
        try {
  
            String res1 = f1.get();
            String res2 = f2.get();
            System.out.println(
                "URL:https://www.yahoo.com SHAsum:" + res1);
            System.out.println(
                "URL:https://www.yahoo.com SHAsum:" + res2);
            executorSerivce.shutdown();
        }
        catch (InterruptedException
               | ExecutionException e) {
  
            // Display the exception along with line number
            // using printStackTrace() method
            e.printStackTrace();
        }
    }
  
    // Method 5
    // to calculate SHA sum for input byte[] and
    // return result as byte array
    public static byte[] getSHA(byte[] input)
        throws NoSuchAlgorithmException
    {
  
        // Static getInstance method is called with hashing
        // SHA
        MessageDigest md
            = MessageDigest.getInstance("SHA-256");
  
        // digest() method called
        // to calculate message digest of an input
        // and return array of byte
        return md.digest(input);
    }
  
    // Method 6
    // To converts input byte[] to hexadecimal
    // representation.
    public static String toHexString(byte[] hash)
    {
  
        // Converting byte array into signum representation
        BigInteger number = new BigInteger(1, hash);
  
        // Converting message digest into hex value
        StringBuilder hexString
            = new StringBuilder(number.toString(16));
  
        // Padding with leading zeros
        // using left shift operator
        while (hexString.length() < 32) {
            hexString.insert(0, '0');
        }
        return hexString.toString();
    }
}


Output: 

URL:https://www.yahoo.com SHAsum:12f45bce974edce01b457e01c7c0a60b480eff319fcdf4869fc2f48afb3af3fb
URL:https://www.yahoo.com SHAsum:a15ad023eda65e8e289dde4198bd822fdcbf3a87ccb54afbcef7be2feeb6e5bd

The above output prints URLs and SHA sum of the content. 

RELATED ARTICLES

Most Popular

Recent Comments