Scala Advent Calendar JP 2010 Day 22: Scala Actor + NIO

この記事は Scala Advent Calendar JP 2010 22 日目(12/28)です。
Scala Actor + NIO で Echo server を書いてみました。
毎度の事ですが、突っ込み添削大歓迎です。

Source

解る方用の説明をさらっと行うと、Actor は Thread Pool として利用しており、NIO 単体で利用するより高速に動作します。当然、接続毎に Thread を消費するよりも高速です。コードは、Supervisor と FSM という考え方を使って整理しています。
詳細な説明は、コード全文の後に行います。
不要な var が二カ所ありましたので削除しました。*1

import scala.actors.{Actor, TIMEOUT, Exit}
import scala.actors.Actor.State.{New, Terminated}
import scala.util.logging.{Logged, ConsoleLogger}
import scala.util.control.Exception.allCatch
import scala.collection.JavaConversions._
import scala.util.Random

import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
import java.nio.charset.Charset
import java.io.IOException

object EchoServer {
  def main (args:Array[String]) {
    val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory
    supervisor.start
    Thread.sleep(60000)
    supervisor.stop
  }
}

sealed abstract class SupervisorMessage
case class  Link(childActor: Actor) extends SupervisorMessage
case object Stop                    extends SupervisorMessage

class EchoServerSupervisor(
  port: Int = 10000
) extends Actor with EchoServerLoggerFactory {
  trapExit = true

  val logger   = makeLogger()
  val acceptor = new EchoServerAcceptor(this, logger, port)

  def act() {
    startChildren()
    loop {
      react {
        case Link(child: Actor) =>
          link(child)
        case Exit(child: Actor, 'normal) if child == acceptor =>
          exit("stop")
        case Exit(child: Actor, 'normal) =>
        case Exit(child: Actor, reason: Exception) =>
          logger.write("receive Exit: %s" format reason.getMessage)
          restartChild(child)
        case Exit(child: Actor, reason) =>
          logger.write("receive Exit: %s" format reason)
          restartChild(child)
        case Stop =>
          acceptor.stop()
        case unknown =>
          logger.write("unknown message [%s], ignoring" format unknown)
      }
    }
  }

  def startChildren() {
    Seq(logger, acceptor) foreach { child =>
      child.getState match {
        case New        => startChild(child)
        case Terminated => exit("Could not restart server.")
        case _          =>
      }
    }
  }

  def startChild(child: Actor) {
    link(child)
    child.start
  }

  def restartChild(child: Actor) {
    link(child)
    child.restart
  }

  def stop = this ! Stop
}

sealed abstract class LoggerMessage
case class Log(message: String) extends LoggerMessage

class EchoServerLogger extends Actor with Logged {
  def act = loop {
    react {
      case Log(message) => log(message)
      case unknown      => log("unknown message [%s], ignoring" format unknown)
    }
  }

  def write(message: String) {
    if (this.mailboxSize < 100) {
      this ! Log(message)
    }
  }
}

trait EchoServerLoggerFactory {
  def makeLogger(): EchoServerLogger =
    new EchoServerLogger()
}

trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory {
  override def makeLogger(): EchoServerLogger =
    new EchoServerLogger() with ConsoleLogger
}

class EchoServerAcceptor(
  supervisor: EchoServerSupervisor,
  logger:     EchoServerLogger,
  port:       Int
) extends Actor {
  val selector = Selector.open()

  val serverChannel = {
    val channel = ServerSocketChannel.open()
    channel.configureBlocking(false)

    val socket = channel.socket
    socket.setReuseAddress(true)
    socket.bind(new InetSocketAddress(port))

    channel
  }

  val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT)

  val random = Random

  logger.write("Start echo server. listen port is %d" format port)

  def act {
    while (true) {
      selector.select()
      handleKeys()
      receiveStop()
      if (random.nextInt(500000) == 0) {
        throw new Exception("acceptor exception test")
      }
    }
  }

  def receiveStop() {
    receiveWithin(0) {
      case Stop =>
        serverChannel.close()
        selector.close()
        logger.write("Stop echo server.")
        exit()
      case TIMEOUT =>
      case unknown =>
        logger.write("unknown message [%s], ignoring" format unknown)
    }
  }

  def stop() = {
    this ! Stop
    selector.wakeup
  }

  def handleKeys() {
    selector.selectedKeys foreach { key =>
      if (key.isValid) handleKey(key)
    }
    selector.selectedKeys.clear()
  }

  def handleKey(key: SelectionKey) {
    if (serverKey == key && key.isAcceptable) {
      accept()
    } else {
      val handler = key.attachment.asInstanceOf[EchoServerHandler]
      if (key.isReadable) {
        handler.sendMessage(Read)
      }
      if (key.isWritable) {
        handler.sendMessage(Write)
      }
    }
  }

  def accept() {
    serverChannel.accept() match {
      case channel: SocketChannel =>
        val remoteAddress = channel.socket.getRemoteSocketAddress.toString
        logger.write("connect from [%s]" format remoteAddress) 

        channel.configureBlocking(false)

        val handler = new EchoServerHandler(logger, channel);
        supervisor ! Link(handler)
        handler.start

        channel.register(
          selector,
          SelectionKey.OP_READ | SelectionKey.OP_WRITE,
          handler
        )
      case _ =>
    }
  }
}

sealed abstract class HandlerMessage
case object Read  extends HandlerMessage
case object Write extends HandlerMessage

class EchoServerHandler(
  logger:  EchoServerLogger,
  channel: SocketChannel
) extends Actor {
  type State = PartialFunction[Any, Unit]

  val buffer        = ByteBuffer.allocate(1024)
  val decoder       = Charset.forName("UTF-8")
  val remoteAddress = channel.socket.getRemoteSocketAddress.toString
  val MessageLine   = """^(.*)[\r\n]{0,2}$""".r

  def sendMessage(message: HandlerMessage) {
    if (this.mailboxSize < 5000) {
      this ! message
    }
  }

  def act = {
    buffer.clear()
    reactWithin(100)(ack)
  }

  def ack: State = {
    case Write   => react(doAck())
    case TIMEOUT => close()
  }

  def read: State = {
    case Read    => reactWithin(100)(doRead())
    case Write   => react(read)
    case TIMEOUT => react(read)
    case unknown =>
      logger.write("unknown message [%s], ignoring" format unknown) 
      react(read)
  }

  def write: State = {
    case Write   => react(doWrite())
    case TIMEOUT => react(read)
  }

  def doAck(): State = {
    "hello\r\n".map(_.hashCode.toByte).foreach(buffer.put)
    buffer.flip()
    writeBuffer()
  }

  def doRead(): State = {
    allCatch opt channel.read(buffer) match {
      case Some(0)  => read
      case Some(-1) => close()
      case Some(_)  =>
        buffer.flip()
        handleMessage()
      case None =>
        logger.write("read error")
        close()
    }
  }

  def close(): Nothing = {
    channel.close()
    logger.write("disconnect from [%s]" format remoteAddress)
    exit()
  }

  def handleMessage(): State = {
    val message = getMessage
    printLog("read", message)
    message match {
      case MessageLine("exit") => close()
      case MessageLine("test") => throw new Exception("handler exception test")
      case _                   => write
    }
  }

  def doWrite(): State = {
    printLog("write", getMessage)
    writeBuffer()
  }

  def getMessage: String = {
    val message = decoder.decode(buffer).toString
    buffer.flip()
    message
  }

  def printLog(state: String, message: String) {
    logger.write("%s-Actor[%s] %s %s: %s".format(
      Thread.currentThread,
      this,
      state,
      remoteAddress,
      message
    ))
  }

  def writeBuffer(): State = {
    allCatch opt channel.write(buffer) match {
      case Some(_) =>
        buffer.clear()
        read
      case None =>
        logger.write("write error")
        close()
    }
  }
}

Actor Design

Echo Server を作るにあたり、まず始めに、各 Actor の役割を考え、Supervisor Tree の構成を考えます。
舞台(VM)に様々な役割を持った俳優(Worker Actor)が多数登場する予定であるため、監督(Supervisor Actor)の指示の元に役割をこなしてもらいます。

今回は、下記の Actor を用意しました。

クラス名 概要
EchoServerSupervisor 監督。生殺与奪の権限を持つ
EchoServerLogger ログを残す
EchoServerAcceptor Client からの接続を Accept する
EchoServerHandler Client から送られてきた文字列をエコーする

Supervisor Tree の構成は下記となります。

                          +----------------------+
                          | EchoServerSupervisor |
                          +-----------+----------+
                                      |
            +-------------------------+------------------------+
            |                         |                        | 
+-----------+----------+  +-----------+----------+    +--------|-------------+
|   EchoServerLogger   |  |  EchoServerAcceptor  |   +---------|------------+|
+----------------------+  +----------------------+  +----------+-----------+|+ 
                                                    |  EchoServerHandler   |+
                                                    +----------------------+

各 Actor を繋ぐ線は、リンク関係を表しています。リンクについては後述します。

本来であれば、Worker の new は Supervisor の仕事ですが、今回は手抜きで EchoServerAcceptor に EchoServerHandler の new を任せています。本当は駄目ですw;*2
更に言うなら、EchoServerSupervisor と EchoServerHandler の間に助監督(別の Supervisor)を配置する方が良いのですが、それも手抜きで省略してます。

では、もう少し詳細に実際のコードを交えつつ、各 Actor の役割を説明していきます。

EchoServerSupervisor

start メソッドで全ての Actor が起動し、stop メソッドで全ての Actor が停止します。
また、各 Actor が例外などで停止した場合、停止理由を参考に、再起動、停止、無視などの処理を行います。
Supervisor は全体の監督者であるため、上記以外の仕事は一切行いません。これは、余計な仕事を行う事で不慮の事故で停止するなどという事態を避けるための措置でもあります。(監視者と監視対象者が同一人物だと、ろくな事がありませんw;)

さて、上記を実現する場合、幾つか Scala Actor の基本的な特性を知っておく必要があります。まずは、コンストラクタから見て行きましょう。コードは下記の通りです。

  trapExit = true

  val logger   = makeLogger()
  val acceptor = new EchoServerAcceptor(this, logger, port)

「trapExit = true」とする事で、リンクしている Actor が停止した際に scala.actors.Exit というメッセージを受信するようになります。
ここでリンクについて簡単に説明しておきます。
Actor を継承すると link というメソッドが使えます。この link の第一引数にリンクしたい Actor を渡すと、相互リンク状態になります。相互リンクしている Actor の内、片方が停止すると、もう片方も停止します。
例えば「A <-> B <-> C」の状態で、B が停止すると A, C 共に停止します。C が停止すると、まず B が停止し、引きずられて A も停止します。しかし、A で「trapExit = true」とした場合、B が停止すると C は停止しますが、A は scala.actors.Exit を受信するだけとなります。
ただし、一つだけ例外があり「trapExit = false」即ちデフォルト状態で、リンク先の Actor が引数無しの exit() で停止した場合は、引きずられて停止する事はありません。

makeLogger() については、EchoServerLogger の箇所で説明します。

次に start すると評価される act を見てみましょう。

  def act() {
    startChildren()
    loop {
      // ..snip..
    }
  }

loop に入る前に startChildren を評価しています。startChildren は、下記の通りです。

  def startChildren() {
    Seq(logger, acceptor) foreach { child =>
      child.getState match {
        case New        => startChild(child)
        case Terminated => exit("Could not restart server.")
        case _          =>
      }
    }
  }

コンストラクタで予め new しておいた logger と acceptor という actor を link して start しています。(実際の link, start は startChild 内で行っています)
Actor は、getChildgetState*3 を評価すると下記のような状態を返します。

New new した直後。まだ start されてない
Runnable 何らかの処理を実行中
Suspended react() 中
TimedSuspended reactWithin() 中
Blocked receive() 中
TimedBlocked receiveWithin() 中
Terminated 終了している

上記は全て scala.actors.Actor.State で定義されています。
今回の場合、New であれば link, start しますが、一つでも Terminated であれば全体を停止するので、start -> stop -> start は許可していないという事になります。*4

次に loop {...} の中を見てみましょう。

    loop {
      react {
        case Link(child: Actor) =>
          link(child)
        case Exit(child: Actor, 'normal) if child == acceptor =>
          exit("stop")
        case Exit(child: Actor, 'normal) =>
        case Exit(child: Actor, reason: Exception) =>
          logger.write("receive Exit: %s" format reason.getMessage)
          restartChild(child)
        case Exit(child: Actor, reason) =>
          logger.write("receive Exit: %s" format reason)
          restartChild(child)
        case Stop =>
          acceptor.stop()
        case unknown =>
          logger.write("unknown message [%s], ignoring" format unknown)
      }
    }

case を上から順番に説明すると、下記の通りです。

  • 受け取った Actor をリンクする
  • acceptor が通常終了(exit() した)ならば、全体を停止
  • acceptor 以外が通常終了ならば無視
  • リンク先の Actor が終了したならば再起動
  • リンク先の Actor が終了したならば再起動(理由が Exception ではない)
  • acceptor を停止
  • 想定外のメッセージが来たのでログに書き出し

Actor の中で exit() を評価すると、Actor の停止と共にリンク先へ 'normal という停止理由を伝播します。'normal 以外を伝播したい場合は、exit(reason) を評価します。reason の型は何でも構いません。Actor の中で例外が発生すると、reason には例外のオブジェクトが入ります。

最後の case で全てのメッセージを受信していますが、これは、メッセージが溜まらないようにするための措置です。現在の作りでは、Link, Exit, Stop 以外のメッセージが届く事はありませんが、念のためメッセージを消化する事でメッセージが溜まらない事を保証しています(メッセージが溜まりすぎると、Actor のメッセージ受信処理が重くなります)。

logger.write は、EchoServerLogger の箇所で、acceptor.stop の箇所は、EchoserverAcceptor の箇所でそれぞれ説明します。

Supervisor という概念は一般化できるため、普通はライブラリ化します。実際、Erlang, Akka 共にライブラリ化されています。

EchoServerLogger

ログの出力だけを行います。複数の Thread から同時に書き込みが行われてもログが壊れないように Actor 化しています。このような、全体で一つだけのリソースに対して副作用を伴う操作を行う場合、Actor に包んでしまう事で Thread-safe にできます。

ちなみに、ログを残す write のコードは下記の通りです。

  def write(message: String) {
    if (this.mailboxSize < 100) {
      this ! Log(message)
    }
  }

this.mailboxSize で、現在、この Actor に溜まっているメッセージの数を取得できます。今回は、100 以上であればメッセージを送らないので、あまりにも大量にログが出力される状況であれば、ログを取りこぼす事となります。実際には、ログの重要度や負荷の具合などを鑑みて値を変更するなり、制限を外すなりする必要があります。(他にも方法はありますが、それはまた別の機会に…)

以下、Actor とは関係ない内容であるため、EchoServerAcceptor の説明まで読み飛ばして頂いて構いません。

今回は、scala.util.logging.Logged トレイトを継承しているので、new する際に ConsoleLogger などを with するとコンソールにログが出力されます。log メソッドを持つトレイトを自作する事で、ファイルに出力するなどのカスタマイズが可能となります。しかし、EchoServerLogger は、利用者が直接 new する事を想定しておらず、Supervisor を new した際に何らかのトレイトを with する事で Logger の動作を切り替えられるようにしています。
具体的には、下記のようにするとログがコンソールに出力され…

    val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory

下記のようにするとログは出力されません。

    val supervisor = new EchoServerSupervisor()

EchoServerLoggerFactory と、その子である EchoServerConsoleLoggerFactory トレイトは、extends したクラスに makeLogger という EchoServerLogger のインスタンスを返すメソッドを提供します。

EchoServerSupervisor が EchoServerLogger を直接 new していない理由は、上記を実現するためです。

EchoServerAcceptor

ポートの Listen を行い、クライアントの接続を監視し、接続があれば EchoServerHandler に処理を振ります。この Actor が今回のコードの要となります。

まずは、コンストラクタから見て行きましょう。

  val selector = Selector.open()

  val serverChannel = {
    val channel = ServerSocketChannel.open()
    channel.configureBlocking(false)

    val socket = channel.socket
    socket.setReuseAddress(true)
    socket.bind(new InetSocketAddress(port))

    channel
  }

  val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT)

  val random = Random

  logger.write("Start echo server. listen port is %d" format port)

NIO は、Selector で Channel を監視します。上記は、Listen 用の Channel を開いた後にノンブロックモードに設定し、OP_ACCEPT だけを監視対象として設定しています。詳しくは、NIO のマニュアルなどをご覧ください。
random は、デバック用です。説明は後述します。

次に、act を見てみましょう。

  def act {
    while (true) {
      selector.select()
      handleKeys()
      receiveStop()
      if (random.nextInt(500000) == 0) {
        throw new Exception("acceptor exception test")
      }
    }
  }

ここで「おや?」と思った方は、Scala Actor 中級者です。実はこれ、Actor と Thread が一対一となっており、一つの Thread を占有しています。特に理由が無い限り react で Thread を手放すのが普通ですが、今回は理由があって手放してしません。これは、selector.select() で必ず Thread がブロックされてしまうので、react で手放す意味が無いからです。しかも、実際に動作させると解るのですが、while (true) {...} の箇所は、凄い勢いでループしまくるので、もしここをループを排除して react にしてしまうと、大量のメッセージが溜まる事になり、無駄な負荷が発生してしまいます。

ちなみに、ここで random が登場しますが、これは 50 万回に一回だけ例外を発生させるというテストコードです。例外が発生する事で、この Actor は終了してしまいますが、Supervisor により再起動されます。

では、handleKeys の前に簡単な receiveStop を見てみましょう。

  def receiveStop() {
    receiveWithin(0) {
      case Stop =>
        serverChannel.close()
        selector.close()
        logger.write("Stop echo server.")
        exit()
      case TIMEOUT =>
      case unknown =>
        logger.write("unknown message [%s], ignoring" format unknown)
    }
  }

ご覧の通り、Stop を待つだけなのですが receiveWithin を使っています。待ち時間に 0 を指定する事で、メッセージが無ければ即 TIMEOUT が発生するので、Thread をブロックせずにメッセージを消化できます。また、そもそも while の中であるため react にする必要がありません。

次に、先ほどは飛ばしてしまった handleKeys を見てみましょう。

  def handleKeys() {
    selector.selectedKeys foreach { key =>
      if (key.isValid) handleKey(key)
    }
    selector.selectedKeys.clear()
  }

selectedKeys の戻り値は java.util.Set ですが、scala.collection.JavaConversions._ を import しているので foreach を利用できます。
NIO の仕様上、selectedKeys は、処理後に利用者がキーを取り除く事になっているので、clear で一括削除しています。selectedKeys は、Thread-safe ではないので、必ず一つの Thread からのみ操作するようにして下さい。ちなみに、foreach {...} の中で selector.selectedKeys.remove(key) でも良いと思います。

次に handleKey を見てみましょう。

  def handleKey(key: SelectionKey) {
    if (serverKey == key && key.isAcceptable) {
      accept()
    } else {
      // ..snip..
    }
  }

key が Accept 可能なら accept を評価します。else {...} を理解するのは、accept の理解が必要であるため、続けて accept を見てみましょう。

  def accept() {
    serverChannel.accept() match {
      case channel: SocketChannel =>
        val remoteAddress = channel.socket.getRemoteSocketAddress.toString
        logger.write("connect from [%s]" format remoteAddress) 

        channel.configureBlocking(false)

        val handler = new EchoServerHandler(logger, channel);
        supervisor ! Link(handler)
        handler.start

        channel.register(
          selector,
          SelectionKey.OP_READ | SelectionKey.OP_WRITE,
          handler
        )
      case _ =>
    }
  }

accept すると SoketChannel 型の channel が取得できるので、それをノンブロックモードに設定し、OP_READ と OP_WRITE を監視対象に設定しています。もう少し詳細に見て行きましょう。

        val handler = new EchoServerHandler(logger, channel);
        supervisor ! Link(handler)
        handler.start

まず、上記ですが、実際に接続してきたクライアントとのやりとりを担当する EchoServerHandler を new し、Supervisor とリンクを行い、起動しています。

        channel.register(
          selector,
          SelectionKey.OP_READ | SelectionKey.OP_WRITE,
          handler
        )

次に、上記ですが、Selector に監視対象として登録する際、任意のオブジェクトを付属できるので、Actor を付属しています。

という事で、もう一度 handleKey を見てみましょう。今度は省略せずに全文です。

  def handleKey(key: SelectionKey) {
    if (serverKey == key && key.isAcceptable) {
      accept()
    } else {
      val handler = key.attachment.asInstanceOf[EchoServerHandler]
      if (key.isReadable) {
        handler.sendMessage(Read)
      }
      if (key.isWritable) {
        handler.sendMessage(Write)
      }
    }
  }

key から、先ほど付属した Actor を attachment で取得しています(asInstanceOf が格好悪い)。その取得した handler に対して、key が読み込み可能であれば Read を、書き込み可能であれば Write を送っています。

EchoServerHandler の sendMessage を先に見てみましょう。

  def sendMessage(message: HandlerMessage) {
    if (this.mailboxSize < 5000) {
      this ! message
    }
  }

先ほどの Logger と同じで、メッセージ 5000 を超えると送信されません。先ほどと同じように取りこぼしが発生するので、用件に合わせて値を変更するなり、制限を外すなりします。

EchoServerAcceptor の説明の最後に、終了の処理順について説明します。
まずは、EchoServerSupervisor の該当するコードを見てみます。

class EchoServerSupervisor(
  port: Int = 10000
) extends Actor with EchoServerLoggerFactory {

  //..snip..

  def act() {
    //..snip..
    loop {
      react {
        //..snip..

        // (3)
        case Exit(child: Actor, 'normal) if child == acceptor =>
          exit("stop")

        //..snip..

        // (2)
        case Stop =>
          acceptor.stop()

        //..snip..
      }
    }
  }

  //..snip..

  // (1)
  def stop = this ! Stop
}

main で supervisor.stop を評価すると、(1),(2),(3) の順番に評価されます。いきなり (3) の exit("stop") を評価する事で、リンクしている Actor を全て強制停止してしまうと Selector や ServerChannel が開いたままになってしまいます。そこで、一旦 (2) で EchoServerAcceptor に Selector や ServerChannel の close や Actor の exit(exit しないと (3) に処理が来ない) などの後始末をさせてから (3) で EchoServerAcceptor 以外のリンク中の Actor を停止しています。

では、本題の EchoServerAcceptor の stop を見てみましょう。

class EchoServerAcceptor(
  supervisor: EchoServerSupervisor,
  logger:     EchoServerLogger,
  port:       Int
) extends Actor {
  //..snip..
  def act {
    while (true) {
      selector.select()
      //..snip..
      receiveStop()
      //..snip..
    }
  }

  def receiveStop() {
    receiveWithin(0) {
      case Stop =>
        serverChannel.close()
        selector.close()
        logger.write("Stop echo server.")
        exit()
      case TIMEOUT =>
      case unknown =>
        logger.write("unknown message [%s], ignoring" format unknown)
    }
  }

  def stop() = {
    this ! Stop
    selector.wakeup
  }
} 

receiveStop の説明は、前述したので省略します。
stop 内で EchoServerAcceptor に Stop を送っていますが、act の while {...} 内で selector.select() の評価中である場合、Thread はブロックされており、Stop はメールボックスに溜まるだけです。そこで selector.wakeup を EchoServerAcceptor とは「異なる Thread」で評価する事により、強制的に selector.select() を終了させています。そうする事で、case Stop の処理が評価され、無事 close, exit が行われる事となります。

ここは、素直に trapExit = true で終了時に close する事も考えられますが、Erlanger 的に trapExit = true は、システムプロセスに昇格するという事であり、特別な意味があるので避けています。Scala 的には、ただのフラグなので、どうでも良い気もしますが…。

EchoServerHandle

いよいよ最後の Actor です。この Actor は、実際のクライアントとのやりとりを行います。
ここで java.nio.ByteBuffer が出てくるのですが、説明が長くなってしまうので今回は説明しません。ご興味がある方は、各自調べてみて下さい。

この Actor は、状態を持つので、先に状態遷移図を見てみましょう。

+-------------------------------------------------+
|                     ack                         |
+-------------------------------------------------+
      ↓(write an ack message within 100ms)
+-------------------------------------------------+
|                     read                        |
+-------------------------------------------------+
 ↓(read a message) ↑(write a message or timeout)
+-------------------------------------------------+
|                     write                       |
+-------------------------------------------------+

初めは ack 状態から始まり、ack メッセージをクライアントへ送る事で read 状態へ移行します。以降は、read 状態と write 状態を行ったり来たりします。
上記を実現するために、有限状態機械(Finite State Machine)(略して FSM)を使うと分岐が減ってコードがすっきりします。

では、FSM に関連する箇所を見てみましょう。

class EchoServerHandler(
  logger:  EchoServerLogger,
  channel: SocketChannel
) extends Actor {
  type State = PartialFunction[Any, Unit]

  // ..snip..

  def act = {
    // ..snip..
    reactWithin(100)(ack)
  }

  def ack: State = {
    case Write   => react(doAck())
    case TIMEOUT => close()
  }

  def read: State = {
    case Read    => reactWithin(100)(doRead())
    case Write   => react(read)
    case TIMEOUT => react(read)
    case unknown =>
      logger.write("unknown message [%s], ignoring" format unknown) 
      react(read)
  }

  def write: State = {
    case Write   => react(doWrite())
    case TIMEOUT => react(read)
  }

  // ..snip..
}

react は、部分関数を引数にとるので、それを状態としています。ack, read, write がメッセージを処理する部分関数であり、状態でもあります。状態を受け渡す際、毎回 PartialFunction[Any, Unit] と記述するのは嫌なので State という別名を Type alias で与えてます。

以下は、各状態毎の概要です。

状態 概要
ack act で設定される一番始めの状態です。Write を受信すると doAck を評価します。doAck は read を返します。100ms 以内に他の状態に遷移しない場合は close します。
read Read を受信すると doRead を評価します。doRead は read か write を返します。それ以外の場合は、read 状態を保ち続けます。
write doWrite は read を返すので、Write を受信するか、100ms 経過すると read 状態に遷移します。

ack, write 状態の場合、Read を受信しても無視するので、read 状態へ移行するまでメッセージが溜り続ける事になります。
ちなみに、いろいろな所にタイムアウト処理が入っているのは、TCP Half-Closed 対策です。

それでは各処理の詳細を見て行きましょう。まずは、close からです。

  def close(): Nothing = {
    channel.close()
    logger.write("disconnect from [%s]" format remoteAddress)
    exit()
  }

Actor の exit は、Nothing を返します。Nothing は全ての型のサブタイプであるため State も含まれます。

次に doAck と doWrite を見てみましょう。

  def doAck(): State = {
    "hello\r\n".map(_.hashCode.toByte).foreach(buffer.put)
    buffer.flip()
    writeBuffer()
  }

  // ..snip..

  def doWrite(): State = {
    var message = getMessage
    printLog("write", message)
    writeBuffer()
  }

  // ..snip..

  def writeBuffer(): State = {
    allCatch opt channel.write(buffer) match {
      case Some(_) =>
        buffer.clear()
        read
      case None =>
        logger.write("write error")
        close()
    }
  }

doAck と doWrite は、ほぼ同じ内容であるため説明を省略し、doAck と doWrite の内部で使用されている writeBuffer について説明します。
channel.write は、よく例外が発生するので、例外を補足しなければ Actor が停止してしまい、Supervisor によって再起動されてしまいます。ちなみに EchoServerHandler は再起動すると、ack から状態から始まります。そこで、例外を補足するために scala.util.control.Exception.allCatch を使用します。allCatch は、結果を Some に包んでくれ、例外が発生すると None を返します。よって上記は、channel.write が正常に行われるならば read 状態へ移行し、例外が発生した場合は close する処理となります。

では、次に doRead を見てみましょう。

  def doRead(): State = {
    allCatch opt channel.read(buffer) match {
      case Some(0)  => read
      case Some(-1) => close()
      case Some(_)  =>
        buffer.flip()
        handleMessage()
      case None =>
        logger.write("read error")
        close()
    }
  }

特に説明する箇所はありませんが、一点だけ補足しておきます。TCP Half-Closed になると、key が isReadable や isWritableであるにも関わらず channel.read や channel.write で例外が発生するので、例外を補足して close しています。

最後に handleMessage を見てみましょう。

    var message = getMessage
    printLog("read", message)
    message match {
      case MessageLine("exit") => close()
      case MessageLine("test") => throw new Exception("handler exception test")
      case _                   => write
    }

MessageLine には、scala.util.matching.Regexインスタンスが入っており、Extractor の代わりに利用しています。クライアントから "exit" を受信するとクライアントとの接続を切断し、"test" を受信すると例外が発生し、それ以外であれば、write 状態に移行します。

FSM も Supervisor 同様に一般化できるので、Erlang, Akka 共にライブラリ化されています。

Echo Client in Erlang

おまけすが、上記の Echo Server を攻撃する Script を Erlang で書いたので、公開しておきます。*5
Scala で書いた Server に Scala で攻撃しても、Scala の有用性を証明できないのではないかと思い、あえて Erlang で書いてます。
まだ作りかけであるため、経過時間を計ったりはできません。

#!/usr/bin/env escript

% Maybe you need run the ulimit command.
%  e.g. ulimit -n 10000

-define(HOST, "localhost").
-define(PORT, 10000).

main([_ProcessCount, _RequestCount]=Args) ->
  do(lists:map(fun (Arg) -> string:to_integer(Arg) end, Args));
main(_) ->
  usage().

usage() ->
  io:fwrite("Usage: echo_client.erl [process count] [request count]~n"),
  halt(1).

do([
  {ProcessCount, _},
  {RequestCount, _}
])   when is_integer(ProcessCount) andalso 0 < ProcessCount
  andalso is_integer(RequestCount) andalso 0 < RequestCount
->
  io:format(
    "process count:~B, request count:~B~n",
    [ProcessCount, RequestCount]
  ),
  run_test(ProcessCount, RequestCount),
  io:format("end~n");
do(_) ->
  usage().

run_test(ProcessCount, RequestCount) ->
  process_flag(trap_exit, true),
  Parent = self(),
  lists:foreach(fun (_Pid) ->
    receive
      finish -> ok;
      Error  -> io:fwrite("~p~n", [Error]), ok
    end
  end,
  lists:map(fun (_Count) ->
    spawn_link(fun () -> echo(RequestCount, Parent) end)
  end,
  lists:seq(1, ProcessCount))).

echo(RequestCount, Parent) ->
  {ok, Socket} = gen_tcp:connect(?HOST, ?PORT, [{active, false}, binary, {packet, line}, {reuseaddr, true}]),
  {ok, <<"hello\r\n">>} = gen_tcp:recv(Socket, 0, 10000),

  lists:foreach(fun (_Count) ->
    ok = gen_tcp:send(Socket, <<"foo\r\n">>),
    {ok, <<"foo\r\n">>} = gen_tcp:recv(Socket, 0, 500)
  end, lists:seq(1, RequestCount)),

  ok = gen_tcp:close(Socket),

  Parent ! finish.

まとめ

来年(2011年)中には、これScala 版を作りたい所です。
ToDo としては、下記が考えられます。

  • ボトルネックとなる箇所が二カ所あるので修正する
    • 暗黙の型変換は不要
    • Handler にメッセージ送り過ぎ
    • 常に OP_WRITE を監視しており、無駄な WRITE メッセージを送っているので修正する*6
  • もう少し 非同期サーバ構築フレームワークとして汎用的に再設計する
    • Supervisor が起動する Actors は、DSL で指定できるようにする
    • Supervisor を simple_one_for_one に対応しつつ、Supervisor Tree の組み直し
    • Handler 制作者に read/write/close を直接させない
    • 限定継続と組み合わせて read/write をまとめる

遊びなので、あまり本気で取り組もうと考えてませんし、仕事等では Netty を素直に使うと思います。*7
え?これ 5 分で書いたのかって?んなわけないでしょ!何日も掛かりましたよ orz

私は、Actor, Thread, Async I/O, CPS, STM は、共存させた時に力を発揮するものであり、対立軸で考えると損をすると考えています。また、私は職業 Web エンジニアですが Actor(Erlang), Thread(ObjC), Async I/O(Erlang, Perl), CPS(Perl, JavaScript) を使って仕事をこなしており、これらは、決してアカデミックなものではなく、実際に使えるものだと考えています。Scala は単体で、これら全てを網羅しており、学ぶにしても使うにしても非常にお得な言語だと考えています。

*1:@xuwei_k さん、ご指摘ありがとうございます

*2:Erlanger の皆様にお叱りを頂きそう

*3:@kaigaiengineer さん、ご指摘ありがとうございます

*4:深い意味はありません。再起動をサポートするのもアリだと思います。

*5:Tsung Plugin は挫折したので、また今度

*6:@ScalaTohoku さん、ご指摘ありがとうございます

*7:誰か、一緒にやりませんか?