Some fun with akka-streams and akka-http

You may have heard about Reactive Programming, the Reactive manifesto or the attempt to standardize the concept of streams. Quick note, let's forget about this last one, it is not an end-user standard. Akka Streams is an implementation of this concept of streams based on the popular Akka actor model. In other words, it brings Streams on top of an ActorSystem.

I have been investigating quickly those days with this technology, and here are some examples and resources to give you a first view of what is inside this project.

Let's start with a simple real life feature...

At work, for an exploratory work, we need to listen to changes made on a CouchDB server. To do so, CouchDB provides us with a nice API which allows us to track down changes. It is really easy to use and for this article, we will only need to know that this API consists of only one REST endpoint :

http://couchhost:port/db/_changes?feed=continuous&heartbeat=30000

This simple endpoint will keep the connection with the CouchDB server open forever because of the combination of the feed and heartbeat parameters. Note that the heartbeat will pop into the request body every 30 seconds. For more information on this, just check the doc. In future entries, I may write about CouchDB.

So, now, let's write some code !


import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._

import scala.concurrent.Future

object Step1 extends App{

  implicit val system = ActorSystem("learning")
  import system.dispatcher

  implicit val materializer = ActorFlowMaterializer()

  val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
    Http().outgoingConnection("localhost", 5984)

  Source.single(HttpRequest(uri = "/db/_changes?feed=continuous&heartbeat=30000"))
    .via(connectionFlow)
    .map(_.entity.dataBytes)
    .flatten(FlattenStrategy.concat)
    .map(_.decodeString("UTF-8"))
    .runForeach(println).onComplete(_ => system.shutdown())

}

This example in a very few lines does a lot of things. First of all, you will notice that we create an ActorSystem, that's obvious since we are using akka underneath. What is new is the implicit Materializer. Actually, until we called the runForeach method, nothing was materialized : the methods via, map, flatten only return a shape, a blueprint that is only here to describe a Flow. So, until we run a Flow we cannot process data. Materialization happens when we call the runForeach method.

The connectionFlow variable is a Flow which handles HttpRequests and then actually makes the http call to the specified server. In this example, we use the low-level API provided by akka-http. But some features are already available on top of it. Just check the docs.

The Source we used in this sample is a single, which means only one element will be emitted by this source. In our case it is a single http request. After a successful request, we get back a response, which contains an entity. As the API we use with couchDB is a streaming API, the couchDB server will return a possibly infinite entity. That is why the call _.entity.dataBytes returns a source. The enclosing map function returns a Flowof Source. That is perfectly fine and akka-streams provides a simple stage to flatten the flow and continue with a Flow of ByteString. There we flatten the flow using a concat strategy which will handle every incoming source in a queue, passing to the next one once the currently handled Source is completed (no element left in it).
The rest of this example is not really relevant as we use a huge shortcut (and quite inexact) to create strings from what we got back as ByteStrings.

So there ends our first example with akka-streams and akka-http. Next time, we will improve this example to interpret the data instead of printing it on the console.