Source Streaming

Source Streaming

Akka HTTP supports completing a request with an Akka Source[T, _], which makes it possible to easily build and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack.

It is possible to complete requests with raw Source[ByteString, _], however often it is more convenient to stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array, or CSV stream (where each element is separated by a new-line).

In the following sections we investigate how to make use of the JSON Streaming infrastructure, however the general hints apply to any kind of element-by-element streaming you could imagine.

JSON Streaming

JSON Streaming is a term refering to streaming a (possibly infinite) stream of element as independent JSON objects as a continuous HTTP request or response. The elements are most often separated using newlines, however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another use case.

In the below examples, we'll be refering to the Tweet and Measurement case classes as our model, which are defined as:

case class Tweet(uid: Int, txt: String)
case class Measurement(id: String, value: Int)

And as always with spray-json, we provide our (Un)Marshaller instances as implicit values using the jsonFormat## method to generate them statically:

object MyJsonProtocol
  extends akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
  with spray.json.DefaultJsonProtocol {

  implicit val tweetFormat = jsonFormat2(Tweet.apply)
  implicit val measurementFormat = jsonFormat2(Measurement.apply)
}

Responding with JSON Streams

In this example we implement an API representing an infinite stream of tweets, very much like Twitter's Streaming API.

Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an Akka Streams Source[T,_]. One such trait, containing the needed marshallers is SprayJsonSupport, which uses spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the akka-http-spray-json-experimental module.

Once the general infrastructure is prepared we import our model's marshallers, generated by spray-json (Step 1), and enable JSON Streaming by making an implicit EntityStreamingSupport instance available (Step 2). Akka HTTP pre-packages JSON and CSV entity streaming support, however it is simple to add your own, in case you'd like to stream a different content type (for example plists or protobuf).

The final step is simply completing a request using a Source of tweets, as simple as that:

// [1] import "my protocol", for marshalling Tweet objects:
import MyJsonProtocol._

// [2] pick a Source rendering support trait:
// Note that the default support renders the Source as JSON Array
implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

val route =
  path("tweets") {
    // [3] simply complete a request with a source of tweets:
    val tweets: Source[Tweet, NotUsed] = getTweets
    complete(tweets)
  }

// tests ------------------------------------------------------------
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))
val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`))

Get("/tweets").withHeaders(AcceptJson) ~> route ~> check {
  responseAs[String] shouldEqual
    """[""" +
    """{"uid":1,"txt":"#Akka rocks!"},""" +
    """{"uid":2,"txt":"Streaming is so hot right now!"},""" +
    """{"uid":3,"txt":"You cannot enter the same river twice."}""" +
    """]"""
}

// endpoint can only marshal Json, so it will *reject* requests for application/xml:
Get("/tweets").withHeaders(AcceptXml) ~> route ~> check {
  handled should ===(false)
  rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`)))
}

The reason the EntityStreamingSupport has to be enabled explicitly is that one might want to configure how the stream should be rendered. We'll dicuss this in depth in the next section though.

Customising response rendering mode

Since it is not always possible to directly and confidently answer the question of how a stream of T should look on the wire, the EntityStreamingSupport traits come into play and allow fine-tuning the streams rendered representation.

For example, in case of JSON Streaming, there isn't really one standard about rendering the response. Some APIs prefer to render multiple JSON objects in a line-by-line fashion (Twitter's streaming APIs for example), while others simply return very large arrays, which could be streamed as well.

Akka defaults to the second one (streaming a JSON Array), as it is correct JSON and clients not expecting a streaming API would still be able to consume it in a naive way if they'd want to.

The line-by-line aproach however is also pretty popular even though it is not valid JSON. It's relatively simplicity for client-side parsing is a strong point in case to pick this format for your Streaming APIs. Below we demonstrate how to reconfigure the support trait to render the JSON as

import MyJsonProtocol._

// Configure the EntityStreamingSupport to render the elements as:
// {"example":42}
// {"example":43}
// ...
// {"example":1000}
val start = ByteString.empty
val sep = ByteString("\n")
val end = ByteString.empty

implicit val jsonStreamingSupport = EntityStreamingSupport.json()
  .withFramingRenderer(Flow[ByteString].intersperse(start, sep, end))

val route =
  path("tweets") {
    // [3] simply complete a request with a source of tweets:
    val tweets: Source[Tweet, NotUsed] = getTweets
    complete(tweets)
  }

// tests ------------------------------------------------------------
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))

Get("/tweets").withHeaders(AcceptJson) ~> route ~> check {
  responseAs[String] shouldEqual
    """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" +
    """{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" +
    """{"uid":3,"txt":"You cannot enter the same river twice."}"""
}

Another interesting feature is parallel marshalling. Since marshalling can potentially take much time, it is possible to marshal multiple elements of the stream in parallel. This is simply a configuration option on EntityStreamingSupport and is configurable like this:

import MyJsonProtocol._
implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
  EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

path("tweets") {
  val tweets: Source[Tweet, NotUsed] = getTweets
  complete(tweets)
}

The above shown mode perserves ordering of the Source's elements, which may sometimes be a required property, for example when streaming a strictly ordered dataset. Sometimes the contept of strict-order does not apply to the data being streamed though, which allows us to exploit this property and use an unordered rendering.

This also is a configuration option and is used as shown below. Effectively this will allow Akka's marshalling infrastructure to concurrently marshallup to parallelism elements and emit the first which is marshalled onto the HttpResponse:

import MyJsonProtocol._
implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
  EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = true)

path("tweets" / "unordered") {
  val tweets: Source[Tweet, NotUsed] = getTweets
  complete(tweets)
}

This allows us to _potentially_ render elements faster onto the HttpResponse, since it can avoid "head of line blocking", in case one element in front of the stream takes a long time to marshall, yet others after it are very quick to marshall.

Consuming JSON Streaming uploads

Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with the server and is feeding it with one line of measurement data.

In this example, we want to consume this data in a streaming fashion from the request entity, and also apply back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure will be applied automatically thanks to using Akka HTTP/Streams).

// [1] import "my protocol", for unmarshalling Measurement objects:
import MyJsonProtocol._

// [2] enable Json Streaming
implicit val jsonStreamingSupport = EntityStreamingSupport.json()

// prepare your persisting logic here
val persistMetrics = Flow[Measurement]

val route =
  path("metrics") {
    // [3] extract Source[Measurement, _]
    entity(asSourceOf[Measurement]) { measurements =>
      // alternative syntax:
      // entity(as[Source[Measurement, NotUsed]]) { measurements =>
      val measurementsSubmitted: Future[Int] =
        measurements
          .via(persistMetrics)
          .runFold(0) { (cnt, _) => cnt + 1 }

      complete {
        measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n"""))
      }
    }
  }

// tests ------------------------------------------------------------
// uploading an array or newline separated values works out of the box
val data = HttpEntity(
  ContentTypes.`application/json`,
  """
    |{"id":"temp","value":32}
    |{"id":"temp","value":31}
    |
  """.stripMargin)

Post("/metrics", entity = data) ~> route ~> check {
  status should ===(StatusCodes.OK)
  responseAs[String] should ===("""{"msg":"Total metrics received: 2"}""")
}

// the FramingWithContentType will reject any content type that it does not understand:
val xmlData = HttpEntity(
  ContentTypes.`text/xml(UTF-8)`,
  """|<data id="temp" value="32"/>
     |<data id="temp" value="31"/>""".stripMargin)

Post("/metrics", entity = xmlData) ~> route ~> check {
  handled should ===(false)
  rejection should ===(UnsupportedRequestContentTypeRejection(Set(ContentTypes.`application/json`)))
}

Simple CSV streaming example

Akka HTTP provides another EntityStreamingSupport out of the box, namely csv (comma-separated values). For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming modes is fairly simple, one only has to make sure that an implicit Marshaller of the requested type is available, and that the streaming support operates on the same Content-Type as the rendered values. Otherwise you'll see an error during runtime that the marshaller did not expose the expected content type and thus we can not render the streaming response).

// [1] provide a marshaller to ByteString
implicit val tweetAsCsv = Marshaller.strict[Tweet, ByteString] { t =>
  Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
    val txt = t.txt.replaceAll(",", ".")
    val uid = t.uid
    ByteString(List(uid, txt).mkString(","))
  })
}

// [2] enable csv streaming:
implicit val csvStreaming = EntityStreamingSupport.csv()

val route =
  path("tweets") {
    val tweets: Source[Tweet, NotUsed] = getTweets
    complete(tweets)
  }

// tests ------------------------------------------------------------
val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`))

Get("/tweets").withHeaders(AcceptCsv) ~> route ~> check {
  responseAs[String] shouldEqual
    "1,#Akka rocks!" + "\n" +
    "2,Streaming is so hot right now!" + "\n" +
    "3,You cannot enter the same river twice."
}

Implementing custom EntityStreamingSupport traits

The EntityStreamingSupport infrastructure is open for extension and not bound to any single format, content type or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses Marshaller[T, ByteString] instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON).

When implementing a custom support trait, one should simply extend the EntityStreamingSupport abstract class, and implement all of it's methods. It's best to use the existing implementations as a guideline.

Contents