Executing tasks with the Executor Framework in Java4 min read

Java offers us many ways to write multi-threading applications, at the early stage, we probably create a new thread for each task by manually extending the Thread class, this seems to work fine when the number of tasks is small, and the overhead of creating a new thread is negligible. However, when the number of tasks increased, the thread-per-task approach incurs some serious problems:

  • Significant computing resources are spent just creating and destroying threads.
  • When the number of runnable threads is greater than the number of available processors, some threads sit idle and consume a lot of memory.
  • The code seems to work fine on the dev environment, but later can fail miserably in production by some fatal exception such as OutOfMemoryError during the high load.

What we can do better? Java has introduced the Executor framework since 1.7. Instead of creating threads manually, tasks – which are logical units of work are submitted to the thread pool as Runnable. With the concept of the thread pool, it brings us several benefits:

  • We don’t have to manually create threads by hand and manage their states, the thread pool will be in charge of these.
  • Reduce resource consumption and overhead, since threads created in the thread pool can be reused.
  • With better responsiveness, the tasks submitted can be executed immediately by the available worker thread.

We’ve got what the thread pool brings to the table, now let’s look at the Executor interface:

public interface Executor {
   void execute(Runnable command);
}

This interface looks pretty simple, with only one method execute, which takes a Runnable task, and this Runnable task will be submitted to the thread pool. The primary abstraction for task execution here is not the Thread class, but the Executor.

The Executor provides us a standard way of decoupling tasks submission and execution, you submit your tasks to the Executor as Runnable, and the execution of these tasks will be handled by the Executor at some point.

We can create a new ExecutorService (extends the Executor interface with more APIs) simply by using the Executors factory class:

ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

Here we provide the number of threads that we want the thread pool to manage is equal to our available processors, and now we are ready to submit tasks to the executor:

executor.submit(() -> {
    // submitted task
});

Once the task is submitted, it’s then placed in the task queue, available threads will pick up tasks in the queue and execute them, as visualized in the figure below:

An example: Getting Github repository information

We’re interested in getting information on multiple repositories in a given account, and information provided by one repository doesn’t depend on others, so we can leverage the power of the Executor to obtain repositories’ information such as repo name, description, number of stars, etc…concurrently.

There are some simple steps that we want to take in order, first, we define an object named RepoInfo that holds the properties of a GitHub repo:

@ToString
public record RepoInfo(String owner, String repoName, int noStars, String description) { }

Then we create a task that sends a request to Github’s API endpoint to fetch URIs of all public repositories for a given user, here I use my personal Github account for this purpose:

   public List<String> getReposByUser(String username) {
        List<String> urls = new ArrayList<>();
        try (CloseableHttpClient closeableHttpClient = HttpClients.createDefault()
        ) {
            HttpGet httpGet = ...;
            // setting headers...
            try (CloseableHttpResponse closeableResponse = closeableHttpClient.execute(httpGet)) {
                String response = ...;
                JsonArray arrResponse = JsonParser.parseString(response).getAsJsonArray();
                for(final var resp : arrResponse) {
                    JsonObject respObj = resp.getAsJsonObject();
                    String url = respObj.get("url").getAsString();
                    urls.add(url);
                }
            }
            return urls;
        } catch (Exception e) {
            logger.error("Error while fetching repo urls: {}", e.getMessage(), e);
            return List.of();
        }
    }

Once we have the list of URLs, the next step we fetch detailed repo information and store it somewhere:

@Slf4j
public final class RepoInfoTask implements Runnable {    
    private final Object lock = new Object();
    private final List<RepoInfo> repos;
    private final String endpoint;

    public RepoInfoTask(List<RepoInfo> repos, String endpoint) {
        this.repos = repos;
        this.endpoint = endpoint;
    }

    @Override
    public void run() {
        try (CloseableHttpClient closeableHttpClient = HttpClients.createDefault()
        ) {
            logger.info("Current thread: {}", Thread.currentThread().getName());
            HttpGet httpGet = new HttpGet(endpoint);
            // setting headers for REST APIs...
            CloseableHttpResponse closeableResponse = ...;
            String response = ...;
            JsonObject jsonResp = ...;
            
            JsonObject ownerObj = jsonResp.get("owner").getAsJsonObject();
            String owner = ownerObj.get("login").getAsString();
            String repoName = jsonResp.get("name").getAsString();
            int noStars = jsonResp.get("stargazers_count").getAsInt();
            String desc = jsonResp.get("description").getAsString();
            RepoInfo repoInfo = new RepoInfo(owner, repoName, noStars, desc);
            synchronized (lock) {
                repos.add(repoInfo);
            }
        } catch (Exception e) {
            logger.error("Fail to get repo information for url: {}", endpoint);
        }
    }
}

We’ve created a Runnable task that helps us crawl detailed repository info of the given endpoint, at the end of the method, since the repo list is shared between multiple threads, so in the critical region we must use the synchronized block to correctly add elements to the list.

Next, we will initialize an executor, in which we submit our tasks and let it take care of the execution:

@AllArgsConstructor
@Sfl4j
public class RepoInfoTaskExecutor {
    private final GithubRepoService githubRepoService;
    private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public List<RepoInfo> getRepoInfos(String username) {
        List<RepoInfo> repoInfos = new ArrayList<>();
        List<String> urls = githubRepoService.getReposByUser(username);
        List<Future<?>> futures = new ArrayList<>();
        
        for (final String url : urls) {
            Future<?> futureRepoInfo = executor.submit(new RepoInfoTask(repoInfos, url));
            futures.add(futureRepoInfo);
        }
        
        for (final Future<?> future : futures) {
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                logger.error("Fail to get repo info: {}", e.getMessage(), e);
                Thread.currentThread().interrupt();
            }
        }
        return repoInfos;
    }
}

Notice that the ExecutorService#submit() method returns a Future. In Java, the Future class wraps up something that is potentially available in the future. Like in the example above, when we call the submit method, we don’t get the result right away but Future<?>, because the task must first be submitted to the queue, wait for a thread becomes available, and that thread then executes the task.

After the first for loop, we are guaranteed that there are at most N (= number of available processors) threads executing our tasks concurrently. In the second loop, we block the current thread by calling the Future#get() method to wait for all tasks to be completed. Once all is done, we return back the list.

Now let’s create a main program to test our code:

public static void main(String[] args) {
    GithubRepoService githubRepoService = new GithubRepoService();
    RepoInfoTaskExecutor repoInfoTaskExecutor = new RepoInfoTaskExecutor(githubRepoService);
    List<RepoInfo> repoInfos = repoInfoTaskExecutor.getRepoInfos("namvdo");
    for(final RepoInfo repo : repoInfos) {
        String repoInfo = String.format("""
                    Owner: %s
                    Repo name: %s
                    No stars: %d
                    Description: %s
                    """, repo.owner(), repo.repoName(), repo.noStars(), repo.description());
        logger.info("{}", repoInfo);
    }
}

When executing the main program, we will get something like this:

Owner: namvdo
Repo name: ion-sfu
No stars: 0
Description: Pure Go WebRTC SFU

Owner: namvdo
Repo name: jammming
No stars: 1
Description: Learn how to work fetch data from API and React

namvdo
Repo name: Java
No stars: 0
Description: All Algorithms implemented in Java

We can also observe which thread is currently executing a task by the log that we have written above:

Current thread: pool-1-thread-3
Current thread: pool-1-thread-5
Current thread: pool-1-thread-4

Terminate the program

When you run the example, you probably realize that even if we finish getting all the detailed repo info, the program still doesn’t terminate. This is because the ExecutorService is created by a non-daemon thread, and the JVM can’t exit if there is still a non-daemon thread running. Let’s have a closer look on the ExecutorService interface to see how we can shutdown it:

public interface ExecutorService extends Executor { 
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    // other methods...
}

The lifecycle of the ExecutorService has 3 states: running, shutting down, and terminated. Initially, the ExecutorService is created in the running state and the shutdown() initiates a graceful shutdown, it will execute all previously submitted tasks, but no new task will be accepted, once all tasks are completed, the ExecutorService will transit to the terminated state. We now can add this line of code at the end of the main method to terminate the program:

repoInfoTaskExecutor.executor.shutdown();

The shutdownNow() method abruptly shut down the ExecutorService, it cancels all actively executing tasks, halts all waiting tasks, and returns them.

0 0 votes
Article Rating
Previous Article
Next Article
Subscribe
Notify of
guest
0 Comments
Most Voted
Newest Oldest
Inline Feedbacks
View all comments
Every support is much appreciated ❤️

Buy Me a Coffee

0
Would love your thoughts, please comment.x
()
x