doilux’s tech blog

ITに関する備忘録。 DDP : http://doiluxng.hatenablog.com/entry/2018/01/01/195409

Actorおさらい

akka-streamを検討しているので今一度Actorについておさらい。

サンプル

import akka.actor.{Actor, ActorSystem, Props}

object Main extends App {

  val system = ActorSystem("sample")
  val actor = system.actorOf(Props[SampleActor])
  Range(1, 10).foreach(actor ! _)
}


class SampleActor extends Actor {

  override def preStart(): Unit = {
    super.preStart()
    println("start : " + this)
  }

  override def postStop(): Unit = {
    println("stop : " + this)
    super.postStop()
  }

  override def postRestart(reason: Throwable): Unit = {
    super.postRestart(reason)
    println("restart : " + this)
  }

  def receive = {
    case x => println(x)
  }
}

Acotorにメッセージが送られるので、以下のような結果になる。

start : SampleActor@5f1fb65d
1
2
3
4
5
6
7
8
9

3のとき自爆させてみる

  def receive = {
    case 3 => throw new RuntimeException("invalid value")
    case x => println(x)
  }

再起動されている。

start : SampleActor@24764cc2
1
2
stop : SampleActor@24764cc2
start : SampleActor@d5ee644
restart : SampleActor@d5ee644
4
5
6
7
8
9

デフォルトのスーパーバイザーのストラテジーが例外でリスタートするようになっている。

  final val defaultDecider: Decider = {
    case _: ActorInitializationException ⇒ Stop
    case _: ActorKilledException         ⇒ Stop
    case _: DeathPactException           ⇒ Stop
    case _: Exception                    ⇒ Restart
  }

デフォルトをいじるのがややめんどくさそなのでSupervisorを使う。

class SampleSupervisor extends Actor {

  val actor = context.actorOf(Props[SampleActor])

  override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy

  override def preStart(): Unit = {
    super.preStart()
    println("start : " + this)
  }

  def receive = {
    case x => actor ! x
  }
}

3のところで停止し、4以降処理されなくなる。

start : SampleSupervisor@668e93b9
start : SampleActor@ddad94
1
2
stop : SampleActor@ddad94
[ERROR] [02/03/2019 11:48:42.257] [sample-akka.actor.default-dispatcher-3] [akka://sample/user/$a/$a] invalid value
java.lang.RuntimeException: invalid value

メッセージの再送は行われない

Actorモデルを勉強を始めたときに勘違いしていたのが、再起動かけたらエラーになった処理を再処理してくれるかというところで、結論をいうとメッセージを再送しない限りは再処理しない。メッセージを再送する仕組みは作り込むかすればできそうだけど、もっと上位で再処理できるようにした方がいいと思う(最後にコミットして、途中でこけたやつは次回の処理対象になる、など)

次はakka-streamについて書く。