高レベルサーバーサイドAPI
Akka HTTPは 低レベルサーバーサイドAPI に加えて、RESTfulウェブサービスをエレガントに定義する為の、とても柔軟な"ルーティングDSL"を提供しています。これは低レベルAPIが残してきた、URIの分解、コンテンツネゴシエーション及び静的ファイルの供給など、典型的なWebサーバ及びフレームワークが提供する高レベルな機能を提供しています。
注釈
リクエスト/レスポンスエンティティのストリーミングにおける性質 の節を読むことを推奨します。この節では、ストリーミングファーストではないHTTPサーバーから移行している人に向けたフルスタックのストリーミングコンセプトに則った説明をしています。
最小の例
これは完全な、そしてとても基本的なルーティングDSLによるAkka HTTPアプリケーションです。
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn
object WebServer {
def main(args: Array[String]) {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val route =
path("hello") {
get {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}
HTTPサーバーをlocalhostで起動し、 /hello
に対するGETリクエストに簡単なレスポンスを返しています。
より長い例
以下のAkka HTTPのルートは幾つかの機能を見せるために定義したものです。結果としてサービスは実際には何の役にも立ちませんが、ルーティングDSLでの実際のAPI定義がどのようなものになるかを感じさせます:
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.coding.Deflate
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import akka.http.scaladsl.model.StatusCodes.MovedPermanently
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
// types used by the API routes
type Money = Double // only for demo purposes, don't try this at home!
type TransactionResult = String
case class User(name: String)
case class Order(email: String, amount: Money)
case class Update(order: Order)
case class OrderItem(i: Int, os: Option[String], s: String)
// marshalling would usually be derived automatically using libraries
implicit val orderUM: FromRequestUnmarshaller[Order] = ???
implicit val orderM: ToResponseMarshaller[Order] = ???
implicit val orderSeqM: ToResponseMarshaller[Seq[Order]] = ???
implicit val timeout: Timeout = ??? // for actor asks
implicit val ec: ExecutionContext = ???
implicit val mat: ActorMaterializer = ???
implicit val sys: ActorSystem = ???
// backend entry points
def myAuthenticator: Authenticator[User] = ???
def retrieveOrdersFromDB: Seq[Order] = ???
def myDbActor: ActorRef = ???
def processOrderRequest(id: Int, complete: Order => Unit): Unit = ???
val route = {
path("orders") {
authenticateBasic(realm = "admin area", myAuthenticator) { user =>
get {
encodeResponseWith(Deflate) {
complete {
// marshal custom object with in-scope marshaller
retrieveOrdersFromDB
}
}
} ~
post {
// decompress gzipped or deflated requests if required
decodeRequest {
// unmarshal with in-scope unmarshaller
entity(as[Order]) { order =>
complete {
// ... write order to DB
"Order received"
}
}
}
}
}
} ~
// extract URI path element as Int
pathPrefix("order" / IntNumber) { orderId =>
pathEnd {
(put | parameter('method ! "put")) {
// form extraction from multipart or www-url-encoded forms
formFields(('email, 'total.as[Money])).as(Order) { order =>
complete {
// complete with serialized Future result
(myDbActor ? Update(order)).mapTo[TransactionResult]
}
}
} ~
get {
// debugging helper
logRequest("GET-ORDER") {
// use in-scope marshaller to create completer function
completeWith(instanceOf[Order]) { completer =>
// custom
processOrderRequest(orderId, completer)
}
}
}
} ~
path("items") {
get {
// parameters to case class extraction
parameters(('size.as[Int], 'color ?, 'dangerous ? "no"))
.as(OrderItem) { orderItem =>
// ... route using case class instance created from
// required and optional query parameters
}
}
}
} ~
pathPrefix("documentation") {
// optionally compresses the response with Gzip or Deflate
// if the client accepts compressed responses
encodeResponse {
// serve up static content from a JAR resource
getFromResourceDirectory("docs")
}
} ~
path("oldApi" / Remaining) { pathRest =>
redirect("http://oldapi.example.com/" + pathRest, MovedPermanently)
}
}
tream random numbers" in compileOnlySpec {
高レベルAPIでHTTPサーバーの失敗を処理する
Akka HTTPサーバの初期化もしくは実行中に、障害が起こる状況はいくつもあります。Akkaの既定の動作では、全ての障害のログを記録しますが、ログの記録に加えて、例えば、アクターシステムを停止したり、外部のモニタリング装置に明示的に通知したりなど、応答したい時があるかもしれません。
バインドの失敗
例えば、サーバーが与えられたポートをバインドする事が出来なかった場合があります。例えば、ポートが既に他のアプリケーションに使用されていた場合や、ポートに特権がある場合(例えば、 root
のみ使用可能な場合)があります。これらの場合、"binding future" はすぐに失敗し、Futureの完了をリスニングしていれば、対応する事が出来ます:
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.concurrent.Future
object WebServer {
def main(args: Array[String]) {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future onFailure in the end
implicit val executionContext = system.dispatcher
val handler = get {
complete("Hello world!")
}
// let's say the OS won't allow us to bind to 80.
val (host, port) = ("localhost", 80)
val bindingFuture: Future[ServerBinding] =
Http().bindAndHandle(handler, host, port)
bindingFuture.onFailure {
case ex: Exception =>
log.error(ex, "Failed to bind to {}:{}!", host, port)
}
}
}
注釈
発生する可能性のある失敗の種類についての低いレベルにおける概要と、それらの詳細な制御については 低レベルAPIでHTTPサーバの障害を処理する を参照して下さい。
ルーティングDSL内部における失敗と例外
ルート内部における例外の処理は、 Exception Handling の節で詳細にドキュメント化されている ExceptionHandler
を供給することによって実行されます。例外を適切なのエラーコードと、人が理解できる失敗の説明によって、 HttpResponse
に変換する事が出来ます。
ファイルアップロード
高レベルディレクティブ向けのアップロード処理は FileUploadDirectives を参照して下さい。
例えば、簡単な file の入力を持つファイルアップロード処理は、 Multipart.FormData エンティティを受信することで実行する事が出来ます。ボディ部は、 全てすぐに利用できるものというよりは寧ろ Source であり、ボディ部の個別のペイロードは、ファイルとフォームフィールドの両方のストリームを消費する必要があることに注意して下さい。
ここに、アップロードされたファイルを一時ファイルとしてディスクにダンプし、フィールドを収集して、架空のデータベースにエントリを保存する簡単な例を示します:
val uploadVideo =
path("video") {
entity(as[Multipart.FormData]) { formData =>
// collect all parts of the multipart as it arrives into a map
val allPartsF: Future[Map[String, Any]] = formData.parts.mapAsync[(String, Any)](1) {
case b: BodyPart if b.name == "file" =>
// stream into a file as the chunks of it arrives and return a future
// file to where it got stored
val file = File.createTempFile("upload", "tmp")
b.entity.dataBytes.runWith(FileIO.toPath(file.toPath)).map(_ =>
(b.name -> file))
case b: BodyPart =>
// collect form field values
b.toStrict(2.seconds).map(strict =>
(b.name -> strict.entity.data.utf8String))
}.runFold(Map.empty[String, Any])((map, tuple) => map + tuple)
val done = allPartsF.map { allParts =>
// You would have some better validation/unmarshalling here
db.create(Video(
file = allParts("file").asInstanceOf[File],
title = allParts("title").asInstanceOf[String],
author = allParts("author").asInstanceOf[String]))
}
// when processing have finished create a response for the user
onSuccess(allPartsF) { allParts =>
complete {
"ok!"
}
}
}
}
アップロードされたファイルは、前の例のように一時ファイルに保存するのではなく、着信時に変換することができます。 この例では、任意の数の .csv
ファイルを受け入れ、それらを行に解析し、さらに処理するためにアクターに送る前に各行を分割します:
val splitLines = Framing.delimiter(ByteString("\n"), 256)
val csvUploads =
path("metadata" / LongNumber) { id =>
entity(as[Multipart.FormData]) { formData =>
val done: Future[Done] = formData.parts.mapAsync(1) {
case b: BodyPart if b.filename.exists(_.endsWith(".csv")) =>
b.entity.dataBytes
.via(splitLines)
.map(_.utf8String.split(",").toVector)
.runForeach(csv =>
metadataActor ! MetadataActor.Entry(id, csv))
case _ => Future.successful(Done)
}.runWith(Sink.ignore)
// when processing have finished create a response for the user
onSuccess(done) { _ =>
complete {
"ok!"
}
}
}
}
サーバー側のHTTPSを設定する
サーバーサイドでHTTPSを設定し使用する事に関する詳細なドキュメントは、 Server-Side HTTPS Support を参照して下さい。
Contents