Actorおさらい
akka-streamを検討しているので今一度Actorについておさらい。
サンプル
import akka.actor.{Actor, ActorSystem, Props} object Main extends App { val system = ActorSystem("sample") val actor = system.actorOf(Props[SampleActor]) Range(1, 10).foreach(actor ! _) } class SampleActor extends Actor { override def preStart(): Unit = { super.preStart() println("start : " + this) } override def postStop(): Unit = { println("stop : " + this) super.postStop() } override def postRestart(reason: Throwable): Unit = { super.postRestart(reason) println("restart : " + this) } def receive = { case x => println(x) } }
Acotorにメッセージが送られるので、以下のような結果になる。
start : SampleActor@5f1fb65d 1 2 3 4 5 6 7 8 9
3のとき自爆させてみる
def receive = { case 3 => throw new RuntimeException("invalid value") case x => println(x) }
再起動されている。
start : SampleActor@24764cc2 1 2 stop : SampleActor@24764cc2 start : SampleActor@d5ee644 restart : SampleActor@d5ee644 4 5 6 7 8 9
デフォルトのスーパーバイザーのストラテジーが例外でリスタートするようになっている。
final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: DeathPactException ⇒ Stop case _: Exception ⇒ Restart }
デフォルトをいじるのがややめんどくさそなのでSupervisorを使う。
class SampleSupervisor extends Actor { val actor = context.actorOf(Props[SampleActor]) override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy override def preStart(): Unit = { super.preStart() println("start : " + this) } def receive = { case x => actor ! x } }
3のところで停止し、4以降処理されなくなる。
start : SampleSupervisor@668e93b9 start : SampleActor@ddad94 1 2 stop : SampleActor@ddad94 [ERROR] [02/03/2019 11:48:42.257] [sample-akka.actor.default-dispatcher-3] [akka://sample/user/$a/$a] invalid value java.lang.RuntimeException: invalid value
メッセージの再送は行われない
Actorモデルを勉強を始めたときに勘違いしていたのが、再起動かけたらエラーになった処理を再処理してくれるかというところで、結論をいうとメッセージを再送しない限りは再処理しない。メッセージを再送する仕組みは作り込むかすればできそうだけど、もっと上位で再処理できるようにした方がいいと思う(最後にコミットして、途中でこけたやつは次回の処理対象になる、など)
次はakka-streamについて書く。