/ java

About the upcoming Java 9 release

Photo by Alejandro Benėt on Unsplash

As you may already know, Java 9 will be release soon (likely in October 2017). It's been several years now that we talk about it. Mainly because of the Jigsaw project on which a lot of buzz has been done. In my opinion, Jigsaw is clearly not the most interesting feature of Java 9. But I won't troll here about that ! So, here, I will discuss a small API improvement : the improvement of the Process API.

A nice API

This work was in the scope for JEP 102 and aims to provide a better interface for managing native process calls. The main drawback with the API (up to Java 8) was that it was blocking. At some point, you couldn't escape from the p.waitFor() call. Now, this time is over and calling p.onExit() will return a CompletableFuture<Process>.

There is always a but...

Unfortunately, there is always something to notice before using this API naively. First, let's read the documentation. At the end of the documentation for the onExit method, it's stated :

This implementation executes waitFor() in a separate thread repeatedly until it returns successfully. If the execution of waitFor is interrupted, the thread's interrupt status is preserved.

Hum... That is annoying ! The waiting is done in an arbitrary thread. Let's dive into the implementation, to see how it works under the hood.

Back to the sources

As you may already know the sources of the JDK are a bit disturbing when browsed for the first time. So, to make things short, here are the links to the Process shared implementation and to the specific Unix implementation.

In the Unix implementation, here is the code which is relevant for us :


    public CompletableFuture onExit() {
        return ProcessHandleImpl.completion(pid, false)
                .handleAsync((unusedExitStatus, unusedThrowable) -> {
                    boolean interrupted = false;
                    while (true) {
                        // Ensure that the concurrent task setting the exit status has completed
                        try {
                            waitFor();
                            break;
                        } catch (InterruptedException ie) {
                            interrupted = true;
                        }
                    }
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                    return this;
                });
    }

There we see that the class involved is in fact ProcessHandle, which is a new interface along Process. The static method completion is in the shared implementation (ProcessHandleImpl). This is this method which is responsible for building the initial CompletableFuture.

To better understand this code, let's remind that in the call that interests us the shouldReap flag is set to false. Here is a modified version, without the code


static CompletableFuture completion(long pid, boolean shouldReap) {
        // check canonicalizing cache 1st
        ExitCompletion completion = completions.get(pid);
        // re-try until we get a completion that shouldReap => isReaping
        while (completion == null || (shouldReap && !completion.isReaping)) {
            ExitCompletion newCompletion = new ExitCompletion(shouldReap);
            if (completion == null) {
                completion = completions.putIfAbsent(pid, newCompletion);
            } else {
                completion = completions.replace(pid, completion, newCompletion)
                    ? null : completions.get(pid);
            }
            if (completion == null) {
                // newCompletion has just been installed successfully
                completion = newCompletion;
                // spawn a thread to wait for and deliver the exit value
                processReaperExecutor.execute(() -> {
                    int exitValue = waitForProcessExit0(pid, shouldReap);
                    if (exitValue == NOT_A_CHILD) {
                        // pid not alive or not a child of this process
                        // If it is alive wait for it to terminate
                        long sleep = 300;     // initial milliseconds to sleep
                        int incr = 30;        // increment to the sleep time
                        long startTime = isAlive0(pid);
                        long origStart = startTime;
                        while (startTime >= 0) {
                            try {
                                Thread.sleep(Math.min(sleep, 5000L)); // no more than 5 sec
                                sleep += incr;
                            } catch (InterruptedException ie) {
                                // ignore and retry
                            }
                            startTime = isAlive0(pid);  // recheck if is alive
                            if (origStart > 0 && startTime != origStart) {
                                // start time changed, pid is not the same process
                                break;
                            }
                        }
                        exitValue = 0;
                    }
                    newCompletion.complete(exitValue);
                    // remove from cache afterwards
                    completions.remove(pid, newCompletion);
                });
            }
        }
        return completion;
    }

We can see that the completion implementation used is ExitCompletion. There is a task submitted to a processReaperExecutor which is involved. This executor is created as an unbounded thread pool which will create a new Thread for each process on which you called onExit. This may not be what you want, especially if you are calling several long processes at the same time.

Trade-off

In the end, this nice and easy to use API that fits well with the asynchronous tools of the JDK doesn't come without a price. If it's fine to use it on small programs that do not call "too numerous" native processes, beware when calling tons of them, especially if they are long running processes or short lived processes as there is no thread recycling in this implementation (and creating new threads has a price).

The best advice I can give you is to roll out your own watcher. There are several ways to achieve this. Use a ScheduledExecutorService bound to a single thread, for instance.

An implementation for my use case

The main reason why I found this API was to achieve a goal in a real world application. My use case is the following : calling gdal_translate for large rasters. If you are not familiar with Gdal, let's just say that it is the tool for GIS in the FLOSS land. Calls to gdal_translate take time and I will need to call it often, sometimes in parallel.

Inside Java 9

The CompletableFuture class exists only since Java 8 and is not yet used widely in the java.util.concurrent package. But Java 9 add some valuable methods in that regard. While searching how to solve my problem, I found out that if Java 9 leave the various Executors implementations free of any reference to CompletableFuture, it adds some methods to the CompletableFuture class itself. In Java 9, there is a delayedExecutor method which is a factory for a n executor that execute tasks after some delay. It is a bit confusing to find this method in CompletableFuture, and it may be hard to see the connection with CompletableFuture but one of the reasons for it is hidden in the implementation of the method orTimeout. There, we can see that the internal mechanisms, under the hood use a ScheduledThreadPoolExecutor of one single (daemon) thread. This thread is a singleton, so it is used for all CompletableFutures.

Backporting to Java 8

Guess what ? My project uses Java 8 and is of course already in production ! I need to use Java 8. Let's see how we can prepare a simple implementation for my use case, inspired by the Java 9 API.

Let's define a class ProcessWatcher which internally uses a ScheduledExecutorService. This class will provide only one public method to run processes in an asynchronous fashion :


public CompletableFuture runProcessAsync(ProcessBuilder process, long timeout, TimeUnit timeUnit) {
        CompletableFuture handle = new CompletableFuture<>();
        underlying.execute(() -> {
            try {
                Process p = process.start();
                underlying.schedule(() -> handle.completeExceptionally(new TimeoutException()), timeout, timeUnit); // Schedule timeout
                underlying.schedule(new ProcessWatcher(p, handle), this.throttle, this.timeUnit);
            } catch (IOException e) {
                handle.completeExceptionally(e);
            }
        }); return handle;
}

This code is simple. It uses the underlying ScheduledExecutorService to create a forked process and position two scheduled tasks, one is in charge of managing the timeout. The other exists to check on a regular basis if the process is still running. Then we return the handle to the caller. The Java API is weak as it does not make any difference between the promise and the future. In other languages, such as Scala, only the one holding the promise is able to fulfill it, and the caller which gets the Future cannot call the equivalent method of complete. Anyway, I introduced an inner class in thid code, ProcessWatcher with is an implementation of Runnable. Again, it is very simple :


private class ProcessWatcher implements Runnable{
        final Process toWatch;
        final CompletableFuture handle;

        ProcessWatcher(Process toWatch, CompletableFuture handle) {
            this.toWatch = toWatch;
            this.handle = handle;
        }

        @Override
        public void run() {
            if (toWatch.isAlive()) {
                if (!handle.isDone()) {
                    // Re schedule a new task
                    underlying.schedule(new ProcessWatcher(toWatch, handle), throttle, timeUnit);
                } else {
                    // In case of timeout
                    toWatch.destroy();
                }
            } else {
                handle.complete(toWatch.exitValue());
            }
        }
}

As in the CompletableFuture implementation, I used the same pattern as the TaskSubmitter runnable. The only goal here is to check if the process is done yet, or if we need to check again later. And that's it for a first draft.

But wait...

To be honest, there is a Java wrapper which calls the C API of gdal under the hood. I did not played with it yet, but I am pretty sure it offers better performance than spawning a separate process to do the job (but i'm also sure it is not an async implementation !).

Conclusion

At first, the new Process API looked really nice and useful. Especially if you are waiting for non-blocking, async APIs in the JDK. CompletableFutures are fairly new in Java and it will take time to make the concept spread across the JDK. Native implementation in OS of the signaling on process ending may come one day and may be that day we won't need a watcher task to handle process completion. Anyway as with anything that seems magic, taking a look under the hood explains a lot. This investigation lead me to the new methods in CompletableFuture and a bunch of them will make your life easier, for sure !

A good option on my real world issue would be also to remind me that the machine running my program has physical constraints and limits. In some way, limiting the pressure on the time/CPU consuming calls to gdal will help. This could be done with reactive flows. But this could be the topic for (several) future posts !

On a personal note, it was the first time I was digging this much into the sources of the JDK. I really enjoyed it, but as for every first time, I may have missed something, so, do not hesitate to tell me your remarks and comments on twitter!

Emmanuel Nhan

29 years old French software developer. I started programming in C, just for fun when I was 13. Then I jumped in several techs to end up writing Scala code and enjoying DIY stuffs like 3D printing :)

Read More