Do you really need Await ?

When doing asyc processing in Scala, you might use Future and Await. But you can avoid the latter. Here are some explanations.

If you are doing asynchronous processing in Scala, chances are high that you met Future. If you are lucky enough, you do not because you work with cats-effect or a similar effect library. There are many pitfalls when working with Future and in this article, we will talk about a very obvious one, which is even clearly mentioned in the standard library : Await. From time to time, I encounter it in production code and so, I replace it with safer patterns. Here are some of them.

Await as a way to get the result of a Future

Most of the time, this is the situation where I meet Await in production code:

import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
val future: Future[String] = Future {
  val inner = Future {
    Thread.sleep(1000)
    "Done inner"
  }
  val res   = Await.result(inner, Duration.Inf)
  s"We did it : $res"
}
Await.result(future, Duration.Inf)

Why is it dangerous ?

Depending on your ExecutionContext you can create a deadlock, especially if this whole thing ends up being in a future itself, as in this example:

import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
val future: Future[String] = Future {
  val inner = Future {
    Thread.sleep(1000)
    "Done inner"
  }
  val res   = Await.result(inner, Duration.Inf)
  s"We did it : $res"
}
Await.result(future, Duration.Inf)

If you use the provided global execution context, you will be fine. But if you use a custom context with a single thread, it will never terminate:

import scala.concurrent.duration._
import scala.concurrent._
import java.util.concurrent.Executors
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
val future: Future[String]        = Future {
  val inner = Future {
    Thread.sleep(1000)
    "Done inner"
  }
  val res   = Await.result(inner, Duration.Inf)
  s"We did it : $res"
}
Await.result(future, Duration.Inf)

In this situation, we artificially created a lack of threads to execute futures. This highlights two things :

  • blocking in Future should be avoided
  • Await blocks !

This example is very simplistic, but it may have several forms in real word.

Overcoming

If your intent is to just use the result of a future in a non-effectful function (a function whose type looks like A => B, then the combinator you are looking for is map. If you want to use the result as the input of a computation returning a Future, you want to use flatMap.

So, instead of the code above, we could just :

import scala.concurrent.duration._ 
import scala.concurrent._ 
import scala.concurrent.ExecutionContext.Implicits.global 
val future: Future[String] = Future { 
  Thread.sleep(1000) // Or better, some real computation that takes some time
  "done" 
}.map(doSomething)

Or if doSomething returns a future itself, we can use flatMap like this:

import scala.concurrent.duration._ 
import scala.concurrent._ 
import scala.concurrent.ExecutionContext.Implicits.global 
def doSomething(str: String): Future[String] = // Some meaningful computation
val future: Future[String] = Future { 
  Thread.sleep(1000) // Or better, some real computation that takes some time
  "done" 
}.flatMap(doSomething)

But you may find something else of interest with Await, which is the ability to set a timeout. This is the other use-case that can easily be replaced.

Using Await as a timeout

Sometimes, the useful thing in Await.result is its second parameter which is the maximum duration to wait for a Future to complete (either successfully or with failure). But this behavior can also be replaced to stay in the context of Future. We just need to provide a mechanism to race two Futures. Fortunately this already exists and it is called firstCompletedOf. This way we can have a delayed Future which will complete with a failure acting as the timeout. This can be achieved with a ScheduledExecutorService when using standard Scala, or with a Scheduler when akka is involved.
Note that in this implementation, timeout does not mean cancel. Achieving proper cancellation with Futureis a tough topic which must be handled carefully. Here is however two implementations of a timeout mechanism. For both use examples, we use an implicit class to provide a nice syntax.

Basic timeout

import scala.concurrent.duration._
import scala.concurrent._

import scala.concurrent.ExecutionContext.Implicits.global

val scheduler = Executors.newSingleThreadScheduledExecutor()

def failAfter[A](duration: FiniteDuration): Future[A] = {
  val p = Promise[A]()
  scheduler.schedule(
    () => p.failure(new TimeoutException("Timeout") with NoStackTrace),
    duration.toMillis,
    java.util.concurrent.TimeUnit.MILLISECONDS
  )
  p.future
}

implicit class FutureOps[A](val f: Future[A]) extends AnyVal {
  def timeout(after: FiniteDuration): Future[A] =
    Future.firstCompletedOf(List(failAfter[A](after), f))

}

val never = Promise[Int]().future

never.timeout(3.seconds).onComplete {
  case Failure(exception) => println(s"Error ${exception.getMessage}")
  case Success(value)     => println("Success")
}

Timeout with akka

import akka.actor.ActorSystem
import scala.concurrent.duration._
import scala.concurrent._

implicit val actorSystem: ActorSystem = ActorSystem()
implicit val ec: ExecutionContext     = actorSystem.dispatcher

def failAfter[A](duration: FiniteDuration): Future[A] =
  akka.pattern.after(duration)(Future.failed(new TimeoutException("Timeout") with NoStackTrace))

implicit class FutureOps[A](val f: Future[A]) extends AnyVal {
  def timeout(after: FiniteDuration): Future[A] = Future.firstCompletedOf(List(failAfter[A](after), f))
}

val never = Promise[Int]().future

never.timeout(3.seconds).onComplete {
  case Failure(exception) => println(s"Error: ${exception.getMessage}")
  case Success(value)     => println("Success")
}

Usage in akka-stream

Last but not least, I sometimes encouter Await in custom akka stream stages. This happens when the stage needs to call some external API or perform an async processing. Let's say we have a def fetch[A](): Future[A] method that we want to call from a custom stage; in this case, calling the push method directly like this fetch().map(e => push(out, e)) won't work. The temptation to use Await is high but it is forgetting that akka provides us with the necessary tool : getAsyncCallback.

Conclusion

Except at the edges of your program, it is perfectly fine to stay in the Future context. We saw how everything is done to make working with them easily. Test frameworks like scalatest or munit provide first class support for Future so even in tests, you do not need Await. One thing we do not discuss in this blog post is cancellation : in our samples, we saw how to timeout but not how to stop computation on timeout. This is a though issue where some answers can be found in Viktor Klang's post. Alternatively, you can also drop Future from your codebase and use cats-effect, a runtime providing a pure asynchronous runtime.