JCIP Notes - Interruption and Cancellation
This note summarizes Chapter 7 Cancellation and Shutdown in Java Concurrency in Practice, by Brian Goetz et al. Thread interruption is a delicate mechanism, and the chapter in JCIP is not particularly straightforward to read. Here, I list key points from the book, put together some helpful examples and try to answer some questions that once confused me.
Studying this topic is also helpful in understanding how cancellation works in Kotlin coroutines.
What is interruption?
Interruption is a cooperative mechanism that lets one thread ask another to stop what it is doing.
Calling Thread.interrupt()
sets this thread’s interrupted status. Blocking library methods like Thread.sleep()
and Object.wait()
try to detect when a thread has been interrupted. By convention, they do two things:
- clear the interrupted status and
- throw the checked
InterruptedException
.
For example:
// AbstractQueuedSynchronizer.ConditionObject
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// ...
}
Note the static helper method Thread.interrupted()
has the side effect of clearing current thread’s interrupted
status.
Interruption is commonly used to support cancellation.
There is nothing in the API or language specification that ties interruption to any specific cancellation semantics, but in practice, using interruption for anything but cancellation is fragile and difficult to sustain in larger applications. Java Concurrency In Practice, §7.1.1
How to support interruption in our code?
When our code calls a blocking method, we know this thread is interrupted if it throws the InterruptedException
.
Since it’s a checked exception, we’re forced to deal with this exception explicitly in our code. It’s critical that we
handle this exception appropriately. See the next section for detail.
If our code does its own thing (like computing some prime numbers) and doesn’t call other blocking method, it can query
the thread status by calling Thread.isInterrupted()
. Unlike static Thread.interrupted()
, this method doesn’t have
the side effect of clearing the interrupted status.
public class Thread {
// Returns the interrupted status.
public boolean isInterrupted() {...}
// Returns the interrupted status and also clearing it.
// JCIP remarks this method is "poorly named".
public static boolean interrupted() {...}
// ...
}
Responding to InterruptionException
There are two options:
- propagate the
InterruptedException
to the caller, or - restore the interruption status.
Particularly, don’t swallow an interruption request except when the code implements the thread’s interruption policy. The interruption signal needs to be preserved so that code higher up the stack can also react to cancellation.
Here are three examples that illustrate these points.
Propagate to caller
public static void producePrimes(BlockingQueue<BigInteger> queue)
throws InterruptedException {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted()) {
p = p.nextProbablePrime();
queue.put(p);
}
}
producePrimes
calls the blocking BlockingQueue.put
method. It automatically propagates the
checked InterrutionException
by listing it in the method signature.
This is often the most sensible policy if you can get away with it (§5.4). However, forcing the caller to deal with the exception may be undesirable. Also, this approach doesn’t work when the surrounding code is constrained not to be a throwing method, in which case we’ll have to use the next approach.
Restore interrupted status
// adapted from Listing 5.10
class PrimeProducerTask(
private val queue: BlockingQueue<BigInteger>
) : Runnable {
private var p = BigInteger.ONE
override fun run() {
try {
p = p.nextProbablePrime()
queue.put(p)
} catch (e: InterruptedException) {
println("got $e")
Thread.currentThread().interrupt()
}
}
}
In this example, the code that throws InterruptedException
is part of a Runnable
. Because the Runnable
contract
doesn’t throw this exception, we cannot propagate it directly. Instead, we have to manually restore the current thread’s
interrupted status by calling Thread.currentThread().interrupt()
.
Special case when implementing a thread
// Listing 7.5
class PrimeProducer(
private val queue: BlockingDeque<BigInteger>
) : Thread() {
override fun run() {
try {
var p = BigInteger.ONE
while (!currentThread().isInterrupted) {
p = p.nextProbablePrime()
queue.put(p)
}
} catch (consumed: InterruptedException) {
}
}
fun cancel() = interrupt()
}
The third example shows a special case where it’s acceptable to swallow the InterruptedException
. Notice the code
implements a Thread
object directly, so we know there is no other code higher up the stack that needs to react to the
interruption request.
Our code here also determines the thread’s interruption policy. A thread’s interruption policy determines how a thread interprets an interruption request—what it does when one is detected. The common policy is to finish the task at hand, clean up resources and exit. Here, the thread exits when it detects the interruption signal.
Another example in point is Listing 7.15.
What happens if we swallow the InterruptedException
?
Roman Elizarov shared an example on Twitter:
Having cleared the name of future.cancel(true) in previous thread here’s one more thread interruption puzzle. Would doSomeWork function always finish orderly in around 3 secs (no other CPU load in the system)? If not, what could go wrong? pic.twitter.com/gjtJhKTJNb
— Roman Elizarov (@relizarov) October 16, 2019
val executor = Executors.newCachedThreadPool()
fun doSomeWork() {
val future = executor.submit {
var i = 1
while (true) {
log.info("Working ${i++}")
Thread.sleep(1000)
}
}
Thread.sleep(3000)
future.cancel(true)
try { future.get() }
}
Would doSomeWork
finish in approximately 3 seconds? What could go wrong?
The answer is: it depends on the implementation of log.info
call. A logging library may optimize performance by
storing log events in something like a bounded BlockingQueue
. Then, a background thread consumes the queue and writes
to the output stream. This approach can reduce context switch overhead. Section 7.2.1 gives an example of a logging
service and section 11.6 expands on performance considerations.
When log producers get ahead of consumers, the queue
reaches its capacity and BlockingQueue.put
starts to block. If an interrupt happens then, it
throws InterruptedException
. For a logging library, propagating the exception to the caller makes it
difficult to use. So internally it needs to catch InterruptedException
and use Thread.currentThread.interrupt()
to
restore the interrupted status. If it swallows the exception, the future.cancel
request in the example will get lost,
and the code won’t exit in a timely fashion.
Don’t interrupt threads you don’t own
We shouldn’t interrupt arbitrary threads. The rule is that you should know a thread’s interruption policy before interrupting it.
An example where it’s appropriate to interrupt is ExecutorService.shutdownNow()
, as an executor service clearly owns
and manages its worker threads.
The book presents an example where it’s bad to interrupt:
// Listing 7.8
public class TimedRun1 {
private static final ScheduledExecutorService cancelExec =
Executors.newScheduledThreadPool(1);
public static void timedRun(Runnable r,
long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
r.run();
}
}
Some possible bad outcomes of this code:
- If the task completes before the timout, the cancellation task could go off after
timedRun
has returned to its caller. We don’t know what code will be running on thetaskThread
when that happens. - The code in
Runnable r
may not be responsive to interruption. In this case,timedRun
won’t be able to return according to the specified timeout.
Listing 7.9 solves these issues by running the alien Runnable
in a
dedicated task, which is well-worth studying.
But the better approach is to submit the task to an ExecutorService
and cancel the task via its Future
, as shown
in Listing 7.10. Keep in mind that the primary abstraction for task execution
in the Java class libraries is not Thread
, but Executor
(§6.2).
Comparison with Kotlin coroutines
Cancellation in Kotlin coroutines works similarly. Both systems
- are cooperative,
- use an exception to deliver the interruption / cancellation signal.
Some notable differences:
No checked exception
Notice that if InterruptedException
were not designed as a checked exception, we wouldn’t need to catch the exception
and do the Thread.currentThread().interrupt()
trick. We can just propagate the exception automatically. In theory, a
checked exception encourages proper code in place to deal with interruption. But in practice, it’s easy to make the
mistake of swallowing the exception.
Kotlin abandoned checked exceptions. In Kotlin coroutines, cancellation works by throwing CancellationException
. All
the suspending functions in the Kotlin coroutines library are cancellable. They check for the cancellation state and
throw CancellationException
when cancelled, similar to how blocking methods in Java check current thread’s interrupted
status and throw the InterruptedException
. The exception can be propagated when we call those cancellable suspend
functions. In this way, we don’t even need to know a lot about this exception. Everything works automatically.
However, this leaves a similar gotcha to swallowing Java’s InterruptedException
. We may use something
like runCatching
in the standard library around a suspend function call to catch everything, including
the CancellationException
.
@OptIn(ExperimentalTime::class)
fun main() = runBlocking {
val job = launch {
runCatching {
fetchData()
}
// Pretend we continue to do other work...
while (true) {}
}
delay(200)
job.cancel()
}
// A suspend function that can throw exception.
suspend fun fetchData(): Int {
delay(1000)
if (Random.nextInt(0, 10) < 2)
throw IllegalStateException("whoops")
return 1
}
Normally, the code should exit when the coroutine gets cancelled. But the CancellationException
gets caught in runCatching
, so code in the coroutine continues to run.
An issue asks to rethrow CancellationException
in runCatching
. However, the Kotlin team seems reluctant to support this. Catching exceptions all over the place in
application code is a bad idea anyway. In the previous example, the IllegalStateException
also gets lost. Ideally,
the application should handle exceptions centrally in one place, and make sure they are reported for tracking down bugs.
Cancelled state is permanent
When we cancel a coroutine, the transition to the cancelled state is permanent. The library doesn’t try to clear the interrupted flag like Java’s interruption.
If we check the coroutine state via CoroutineScope.isActive
or call other well-behaved suspend functions in the
previous example, we can observe the cancelled state and exit even if we previously swallowed
the CancellationException
.
fun main() = runBlocking {
val now = TimeSource.Monotonic.markNow()
val job = launch {
runCatching {
fetchData()
}
// Pretend we continue to do other work...
while (isActive) {
}
}
delay(200)
job.cancel()
println(now.elapsedNow())
}
runCatching
suspend functions could break Kotlin coroutine’s cancellation mechanism.
Why clear the interrupted flag before throwing InterruptedException
?
It seems thread interruption behaves more like a one-shot event system. Remember, there is nothing that specifically ties interruption to cancellation.
Conceptually, interruption sounds less permanent than
cancellation. A thread signals that it requests another thread to stop what it’s doing by first setting a boolean flag
in the target thread. However, the code running on that thread might be busy doing its own stuff without looking at this
flag. So the blocking methods in the library follow the convention to check the interrupted flag and throw
an InterruptedException
. When that happens, the interruption mechanism finally gets the chance to deliver that signal.
By then, the interrupted status has served its purpose and gets cleared. As a checked exception is in use, we’re sure
a signal handler is in place.