i@yujinyan.me

Blog

JCIP Notes - Interruption and Cancellation

This note summarizes Chapter 7 Cancellation and Shutdown in Java Concurrency in Practice, by Brian Goetz et al. Thread interruption is a delicate mechanism, and the chapter in JCIP is not particularly straightforward to read. Here, I list key points from the book, put together some helpful examples and try to answer some questions that once confused me.

Studying this topic is also helpful in understanding how cancellation works in Kotlin coroutines.

What is interruption?

Interruption is a cooperative mechanism that lets one thread ask another to stop what it is doing.

Calling Thread.interrupt() sets this thread’s interrupted status. Blocking library methods like Thread.sleep() and Object.wait() try to detect when a thread has been interrupted. By convention, they do two things:

  • clear the interrupted status and
  • throw the checked InterruptedException.

For example:

// AbstractQueuedSynchronizer.ConditionObject
public final void await() throws InterruptedException {
  if (Thread.interrupted()) 
    throw new InterruptedException(); 
  // ...
}

Note the static helper method Thread.interrupted() has the side effect of clearing current thread’s interrupted status.

Interruption is commonly used to support cancellation.

There is nothing in the API or language specification that ties interruption to any specific cancellation semantics, but in practice, using interruption for anything but cancellation is fragile and difficult to sustain in larger applications. Java Concurrency In Practice, §7.1.1

How to support interruption in our code?

When our code calls a blocking method, we know this thread is interrupted if it throws the InterruptedException. Since it’s a checked exception, we’re forced to deal with this exception explicitly in our code. It’s critical that we handle this exception appropriately. See the next section for detail.

If our code does its own thing (like computing some prime numbers) and doesn’t call other blocking method, it can query the thread status by calling Thread.isInterrupted(). Unlike static Thread.interrupted(), this method doesn’t have the side effect of clearing the interrupted status.

public class Thread {
  // Returns the interrupted status.
  public boolean isInterrupted() {...}

  // Returns the interrupted status and also clearing it.
  // JCIP remarks this method is "poorly named".
  public static boolean interrupted() {...}

  // ...
}

Responding to InterruptionException

There are two options:

  • propagate the InterruptedException to the caller, or
  • restore the interruption status.

Particularly, don’t swallow an interruption request except when the code implements the thread’s interruption policy. The interruption signal needs to be preserved so that code higher up the stack can also react to cancellation.

Here are three examples that illustrate these points.

Propagate to caller

public static void producePrimes(BlockingQueue<BigInteger> queue)
    throws InterruptedException { 
  BigInteger p = BigInteger.ONE;
  while (!Thread.currentThread().isInterrupted()) {
    p = p.nextProbablePrime();
    queue.put(p);
  }
}

producePrimes calls the blocking BlockingQueue.put method. It automatically propagates the checked InterrutionException by listing it in the method signature.

This is often the most sensible policy if you can get away with it (§5.4). However, forcing the caller to deal with the exception may be undesirable. Also, this approach doesn’t work when the surrounding code is constrained not to be a throwing method, in which case we’ll have to use the next approach.

Restore interrupted status

// adapted from Listing 5.10
class PrimeProducerTask(
  private val queue: BlockingQueue<BigInteger>
) : Runnable {
  private var p = BigInteger.ONE
  override fun run() {
    try {
      p = p.nextProbablePrime()
      queue.put(p)
    } catch (e: InterruptedException) { 
      println("got $e")
      Thread.currentThread().interrupt() 
    }
  }
}

In this example, the code that throws InterruptedException is part of a Runnable. Because the Runnable contract doesn’t throw this exception, we cannot propagate it directly. Instead, we have to manually restore the current thread’s interrupted status by calling Thread.currentThread().interrupt().

Special case when implementing a thread

// Listing 7.5
class PrimeProducer(
  private val queue: BlockingDeque<BigInteger>
) : Thread() { 
  override fun run() {
    try {
      var p = BigInteger.ONE
      while (!currentThread().isInterrupted) {
        p = p.nextProbablePrime()
        queue.put(p)
      }
    } catch (consumed: InterruptedException) { 
    }
  }

  fun cancel() = interrupt()
}

The third example shows a special case where it’s acceptable to swallow the InterruptedException. Notice the code implements a Thread object directly, so we know there is no other code higher up the stack that needs to react to the interruption request.

Our code here also determines the thread’s interruption policy. A thread’s interruption policy determines how a thread interprets an interruption request—what it does when one is detected. The common policy is to finish the task at hand, clean up resources and exit. Here, the thread exits when it detects the interruption signal.

Another example in point is Listing 7.15.

What happens if we swallow the InterruptedException?

Roman Elizarov shared an example on Twitter:

val executor = Executors.newCachedThreadPool()

fun doSomeWork() {
  val future = executor.submit {
	var i = 1
	while (true) {
	  log.info("Working ${i++}")
	  Thread.sleep(1000)
	}
  }
  Thread.sleep(3000)
  future.cancel(true)
  try { future.get() }
}

Would doSomeWork finish in approximately 3 seconds? What could go wrong?

The answer is: it depends on the implementation of log.info call. A logging library may optimize performance by storing log events in something like a bounded BlockingQueue. Then, a background thread consumes the queue and writes to the output stream. This approach can reduce context switch overhead. Section 7.2.1 gives an example of a logging service and section 11.6 expands on performance considerations.

When log producers get ahead of consumers, the queue reaches its capacity and BlockingQueue.put starts to block. If an interrupt happens then, it throws InterruptedException. For a logging library, propagating the exception to the caller makes it difficult to use. So internally it needs to catch InterruptedException and use Thread.currentThread.interrupt() to restore the interrupted status. If it swallows the exception, the future.cancel request in the example will get lost, and the code won’t exit in a timely fashion.

Don’t interrupt threads you don’t own

We shouldn’t interrupt arbitrary threads. The rule is that you should know a thread’s interruption policy before interrupting it.

An example where it’s appropriate to interrupt is ExecutorService.shutdownNow(), as an executor service clearly owns and manages its worker threads.

The book presents an example where it’s bad to interrupt:

// Listing 7.8
public class TimedRun1 {
  private static final ScheduledExecutorService cancelExec =
    Executors.newScheduledThreadPool(1);

  public static void timedRun(Runnable r,
                              long timeout, TimeUnit unit) {
    final Thread taskThread = Thread.currentThread(); 
    cancelExec.schedule(new Runnable() {
      public void run() {
        taskThread.interrupt(); 
      }
    }, timeout, unit);
    r.run();
  }
}

Some possible bad outcomes of this code:

  • If the task completes before the timout, the cancellation task could go off after timedRun has returned to its caller. We don’t know what code will be running on the taskThread when that happens.
  • The code in Runnable r may not be responsive to interruption. In this case, timedRun won’t be able to return according to the specified timeout.

Listing 7.9 solves these issues by running the alien Runnable in a dedicated task, which is well-worth studying.

But the better approach is to submit the task to an ExecutorService and cancel the task via its Future, as shown in Listing 7.10. Keep in mind that the primary abstraction for task execution in the Java class libraries is not Thread, but Executor (§6.2).

Comparison with Kotlin coroutines

Cancellation in Kotlin coroutines works similarly. Both systems

  • are cooperative,
  • use an exception to deliver the interruption / cancellation signal.

Some notable differences:

No checked exception

Notice that if InterruptedException were not designed as a checked exception, we wouldn’t need to catch the exception and do the Thread.currentThread().interrupt() trick. We can just propagate the exception automatically. In theory, a checked exception encourages proper code in place to deal with interruption. But in practice, it’s easy to make the mistake of swallowing the exception.

Kotlin abandoned checked exceptions. In Kotlin coroutines, cancellation works by throwing CancellationException. All the suspending functions in the Kotlin coroutines library are cancellable. They check for the cancellation state and throw CancellationException when cancelled, similar to how blocking methods in Java check current thread’s interrupted status and throw the InterruptedException. The exception can be propagated when we call those cancellable suspend functions. In this way, we don’t even need to know a lot about this exception. Everything works automatically.

However, this leaves a similar gotcha to swallowing Java’s InterruptedException. We may use something like runCatching in the standard library around a suspend function call to catch everything, including the CancellationException.

@OptIn(ExperimentalTime::class)
fun main() = runBlocking {
  val job = launch {
    runCatching { 
      fetchData()
    }

    // Pretend we continue to do other work...
    while (true) {}
  }

  delay(200)
  job.cancel() 
}

// A suspend function that can throw exception.
suspend fun fetchData(): Int {
  delay(1000)
  if (Random.nextInt(0, 10) < 2)
    throw IllegalStateException("whoops")
  return 1
}

Normally, the code should exit when the coroutine gets cancelled. But the CancellationException gets caught in runCatching, so code in the coroutine continues to run.

An issue asks to rethrow CancellationException in runCatching. However, the Kotlin team seems reluctant to support this. Catching exceptions all over the place in application code is a bad idea anyway. In the previous example, the IllegalStateException also gets lost. Ideally, the application should handle exceptions centrally in one place, and make sure they are reported for tracking down bugs.

Cancelled state is permanent

When we cancel a coroutine, the transition to the cancelled state is permanent. The library doesn’t try to clear the interrupted flag like Java’s interruption.

If we check the coroutine state via CoroutineScope.isActive or call other well-behaved suspend functions in the previous example, we can observe the cancelled state and exit even if we previously swallowed the CancellationException.

fun main() = runBlocking {
  val now = TimeSource.Monotonic.markNow()
  val job = launch {
    runCatching {
      fetchData()
    }

    // Pretend we continue to do other work...
    while (isActive) { 
    }
  }

  delay(200)
  job.cancel()
  println(now.elapsedNow())
}
🚨

runCatching suspend functions could break Kotlin coroutine’s cancellation mechanism.

Why clear the interrupted flag before throwing InterruptedException?

It seems thread interruption behaves more like a one-shot event system. Remember, there is nothing that specifically ties interruption to cancellation.

Conceptually, interruption sounds less permanent than cancellation. A thread signals that it requests another thread to stop what it’s doing by first setting a boolean flag in the target thread. However, the code running on that thread might be busy doing its own stuff without looking at this flag. So the blocking methods in the library follow the convention to check the interrupted flag and throw an InterruptedException. When that happens, the interruption mechanism finally gets the chance to deliver that signal. By then, the interrupted status has served its purpose and gets cleared. As a checked exception is in use, we’re sure a signal handler is in place.

评论区 Discussions · 也可前往 GitHub 评论区 互动