akka-streamについて
まずはさっくり作ってみる
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Flow, Sink, Source} object Main extends App { implicit val system = ActorSystem("sample") implicit val materializer = ActorMaterializer() val source = Source(1 to 10) val flow = Flow[Int].map(_ * 2) val sink = Sink.foreach[Int] {println(_)} source.via(flow).to(sink).run() }
出力
2 4 6 8 10 12 14 16 18 20
自爆させてみる
val flow = Flow[Int].map { case 3 => throw new RuntimeException("invalid value") case x => x * 2 }
結果
2 4
3(3 * 2 = 6))以降処理されなくなったので、察するにFlowの処理をするActorが一つ起動していて、そいつがStopしたのではないかと思われる。
次にRestartFlowにしてみる。
val flow = RestartFlow.withBackoff[Int, Int]( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ) { () => Flow[Int].map { case 3 => throw new RuntimeException("invalid value") case x => x * 2 } }
結果
2 4 8 10 12 14 16 18 20
4(4 * 2 = 8)以降も処理されている。
ということで、DBへの接続など、外部リソースを使う処理はRestartFlowなどにするのがよさそう。 そして、処理できなかったメッセージはSourceに残して再処理できるようにしておく必要がありそう。