doilux’s tech blog

ITに関する備忘録。 DDP : http://doiluxng.hatenablog.com/entry/2018/01/01/195409

akka-streamについて

まずはさっくり作ってみる

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}

object Main extends App {

  implicit val system = ActorSystem("sample")
  implicit val materializer = ActorMaterializer()

  val source = Source(1 to 10)
  val flow = Flow[Int].map(_ * 2)
  val sink = Sink.foreach[Int] {println(_)}

  source.via(flow).to(sink).run()
}

出力

2
4
6
8
10
12
14
16
18
20

自爆させてみる

  val flow = Flow[Int].map {
    case 3 => throw new RuntimeException("invalid value")
    case x => x * 2
  }

結果

2
4

3(3 * 2 = 6))以降処理されなくなったので、察するにFlowの処理をするActorが一つ起動していて、そいつがStopしたのではないかと思われる。

次にRestartFlowにしてみる。

  val flow = RestartFlow.withBackoff[Int, Int](
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ) { () =>
    Flow[Int].map {
      case 3 => throw new RuntimeException("invalid value")
      case x => x * 2
    }
  }

結果

2
4
8
10
12
14
16
18
20

4(4 * 2 = 8)以降も処理されている。

ということで、DBへの接続など、外部リソースを使う処理はRestartFlowなどにするのがよさそう。 そして、処理できなかったメッセージはSourceに残して再処理できるようにしておく必要がありそう。