activatorパターン
オレオレデザインパターン
解決したい課題
DBMSのトランザクション機能が使えない(例:マイクロサービスをまたぐなど)場合に読み取り一貫性を実現する
設計
例えば携帯電話で月の途中でプランを変えた(普通のプランから、Youtubeの通信をカウントしないプラン)場合
ユーザー -> 契約管理システム: プレン変更申し込み 契約管理システム -> 契約管理システム: 新プラン契約を作成(新プランはまだ他システムから参照できない) 契約管理システム -> 通信量集計システム: Youtubeを除外した通信量を集計(重い処理、Cloud DataFlowとかの分散処理とかで実行) 通信量集計システム --> 契約管理システム: activate(ここで他システムから新プランが参照できるようになる)
この例だと重い処理が一つだけど、重い処理を複数並列で走らせなくてはならない時はactivate処理を発行するところを別システムに切り出したりできる。
基本的にはRDBMSでデータを管理すればactivatorを作り込まなくていいので楽。重い処理が時間がかかってトランザクションの長さが他の処理に影響を及ぼす場合に検討すればいいと思う。
あとは、RDBMSで読み取り一貫性を担保するのと違って、未activateのデータも他システムに公開できる利点はあるっちゃあるが、それを本当に利点と言っていいかどうかは個人的には疑問
akka-streamについて
まずはさっくり作ってみる
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Flow, Sink, Source} object Main extends App { implicit val system = ActorSystem("sample") implicit val materializer = ActorMaterializer() val source = Source(1 to 10) val flow = Flow[Int].map(_ * 2) val sink = Sink.foreach[Int] {println(_)} source.via(flow).to(sink).run() }
出力
2 4 6 8 10 12 14 16 18 20
自爆させてみる
val flow = Flow[Int].map { case 3 => throw new RuntimeException("invalid value") case x => x * 2 }
結果
2 4
3(3 * 2 = 6))以降処理されなくなったので、察するにFlowの処理をするActorが一つ起動していて、そいつがStopしたのではないかと思われる。
次にRestartFlowにしてみる。
val flow = RestartFlow.withBackoff[Int, Int]( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ) { () => Flow[Int].map { case 3 => throw new RuntimeException("invalid value") case x => x * 2 } }
結果
2 4 8 10 12 14 16 18 20
4(4 * 2 = 8)以降も処理されている。
ということで、DBへの接続など、外部リソースを使う処理はRestartFlowなどにするのがよさそう。 そして、処理できなかったメッセージはSourceに残して再処理できるようにしておく必要がありそう。
Actorおさらい
akka-streamを検討しているので今一度Actorについておさらい。
サンプル
import akka.actor.{Actor, ActorSystem, Props} object Main extends App { val system = ActorSystem("sample") val actor = system.actorOf(Props[SampleActor]) Range(1, 10).foreach(actor ! _) } class SampleActor extends Actor { override def preStart(): Unit = { super.preStart() println("start : " + this) } override def postStop(): Unit = { println("stop : " + this) super.postStop() } override def postRestart(reason: Throwable): Unit = { super.postRestart(reason) println("restart : " + this) } def receive = { case x => println(x) } }
Acotorにメッセージが送られるので、以下のような結果になる。
start : SampleActor@5f1fb65d 1 2 3 4 5 6 7 8 9
3のとき自爆させてみる
def receive = { case 3 => throw new RuntimeException("invalid value") case x => println(x) }
再起動されている。
start : SampleActor@24764cc2 1 2 stop : SampleActor@24764cc2 start : SampleActor@d5ee644 restart : SampleActor@d5ee644 4 5 6 7 8 9
デフォルトのスーパーバイザーのストラテジーが例外でリスタートするようになっている。
final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: DeathPactException ⇒ Stop case _: Exception ⇒ Restart }
デフォルトをいじるのがややめんどくさそなのでSupervisorを使う。
class SampleSupervisor extends Actor { val actor = context.actorOf(Props[SampleActor]) override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy override def preStart(): Unit = { super.preStart() println("start : " + this) } def receive = { case x => actor ! x } }
3のところで停止し、4以降処理されなくなる。
start : SampleSupervisor@668e93b9 start : SampleActor@ddad94 1 2 stop : SampleActor@ddad94 [ERROR] [02/03/2019 11:48:42.257] [sample-akka.actor.default-dispatcher-3] [akka://sample/user/$a/$a] invalid value java.lang.RuntimeException: invalid value
メッセージの再送は行われない
Actorモデルを勉強を始めたときに勘違いしていたのが、再起動かけたらエラーになった処理を再処理してくれるかというところで、結論をいうとメッセージを再送しない限りは再処理しない。メッセージを再送する仕組みは作り込むかすればできそうだけど、もっと上位で再処理できるようにした方がいいと思う(最後にコミットして、途中でこけたやつは次回の処理対象になる、など)
次はakka-streamについて書く。
Groovyの==
Groovyの==は、Comparableを実装している場合はcompareToを使って検証するらしい。
Comparableを実装していない場合
@AllArgsConstructor public class Zombie { private final int no; private final String name; } class ZombieTest extends Specification { def "test"() { setup: def sakura = new Zombie(1, "sakura") def saki = new Zombie(1, "saki") // 本当は2号 expect: (sakura == saki) == false sakura.equals(saki) == false } }
Comparableを実装している場合
@AllArgsConstructor public class Zombie implements Comparable<Zombie> { private final int no; private final String name; @Override public int compareTo(Zombie o) { return no - o.no; } } class ZombieTest extends Specification { def "test"() { setup: def sakura = new Zombie(1, "sakura") def saki = new Zombie(1, "saki") // 本当は2号 expect: (sakura == saki) == true sakura.equals(saki) == false } }
sakura == saki
はsakura.compareTo(saki) == 0
で判定しているとな。
参考
Scalaのadd(a: Int, b: Int)とadd(a: Int)(b: Int)の違い
(a: Int, b: Int)
の部分、引数リストって言うらしい。
Scalaメソッド定義メモ(Hishidama's Scala def Memo)
部分適用したいときはこうなる
add(a: Int, b: Int)
の場合
scala> def add(a: Int, b: Int): Int = a + b add: (a: Int, b: Int)Int scala> def addEx(b: Int): Int = add(3, b) addEx: (b: Int)Int scala> addEx(7) res8: Int = 10
add(a: Int)(b: Int)
の場合
scala> def add(a: Int)(b: Int): Int = a + b add: (a: Int)(b: Int)Int scala> def addEx = add(3)_ addEx: Int => Int scala> addEx(4) res7: Int = 7
下の場合だとbの型がかわってもコンパイルエラーにならない。
↑の理由から、implicitパラメータは分けて書くのだと思われる。
trait UserRepository { def findBy(id: domain.user.Id)(implicit reqId: common.request.Id): User }
改善を進めたかったらダッシュボードが必要だった
今関わっているシステムはファーストリリースを終えて、スプリントを回しながら改善・機能追加していくフェーズに入りました。 最初の頃は改善がうまく進みませんでしたが、ダッシュボードを導入して一気に改善が進みました。
※ちょっと抽象的な記述が多いですがご容赦ください。
ダッシュボード導入前の状況
- ビジネス部門から気になる数値進捗の案件の情報が共有される
- ↑を調査する(ここですごく時間がかかる)
- 調査結果をもとに仮説をたてる
- 評価環境で検証する
- 「気になる箇所」は改善したが副作用で違う問題が、、、
- 調査をやり直すと別の怪しい箇所が。。。
- ↑の繰り返しで全然改善進まないorz
上記の状況において、"気になる箇所の詳細を調査する"ことの時間短縮を図る目的でダッシュボードを導入しました。
ダッシュボードの導入とその効果
システムを分析し、いくつかの指標を定めて見える化しました。ちなみにここで書いている「指標」はCPU使用率とかいったシステムのメトリクス(もちろん、これらも見える化しています)ではなく、もう少しサービスよりの指標です。
作ったダッシュボードを朝会で確認するようにしたところ、以下の効果が得られました。
調査時間の短縮
システムのどの部分に改善の余地があるかがすぐにわかるようになりました。これで調査にかかっていた時間が大幅に短縮されました。
イレギュラーなのか改善すべきなのかがわかるようになる
今までは「木をみて森をみず」だったので、怪しいデータがあったときにシステムで対応すべきものか、サービスレベルを超えたものなのかが判断できませんでした(結果、イレギュラーを救おうとして失敗したことも) ダッシュボードですべてのビジネス案件を一つのボードで見える化したことにより、対応すべきものとそうでないものがはっきりわかるようになりました。
施策の目的を定めやすい
施策を打つときには、「どういう状態になれば成功したと言えるか」を定める必要があるかと思います(成功定義、これを満たせない場合は切り戻しの対象になる)ダッシュボード、もとい指標をさだめることで、「この指標が改善したら成功」といったような成功定義がやりやすくなりました。
次にダッシュボードを作るための個人的なおすすめを書きます。
Stackdriver Logging + BigQuery + Tableau
個人的にこの技術スタックがオススメです。アプリケーションは指標の基になるデータをログに吐き出してStackdriver Loggingに集約、エクスポートの機能で該当するログレコードをBigQueryに連携、最後にTableauで可視化します。特定のデータを深掘りしたくなった場合は、BigQueryの検索結果をGoogleスプレッドシートにエクスポートして分析しています。
また、Stackdriverの「ログベースの指標」を使ってダッシュボードを作ることもできます(さらに、この指標をもとにしたアラートも設定できます。この機能もめっちゃおすすめです)個人的には、一時的に計測したい指標はこの機能で可視化し、指標の有用性が確認できたらTableauでダッシュボードに組み込む、といった使い方がアリだと思います。
Looker
最近気になっているやつです。某M社が採用しているらしい。どうでもいいですが、Loocker社のカスタマーサポート部門の名称はCustomer Love, Dept(カスタマー・ラブ部)らしい。
まとめ
改善をすすめるためにはダッシュボードが必要でした。ダッシュボードがあれば施策の手も打ちやすくなるし、成功定義もその確認もしやすくなっていいこといっぱいあります。さあ、ダッシュでダッシュボードをつくろう!!
GASで現在時刻から現在のスプリントを取得する
スプリントが金曜~翌木曜で、現在時刻から今のスプリントを取得する。 例えば、今日が2018.09.17なら、2018.09.14-20という文字列を取得する。
とりあえず、今週金曜日を取得するコードはこちら
var this_day = new Date() var this_friday = new Date(this_day) this_friday.setDate(this_day.getDate() - (this_day.getDay() - 5));
なんかすごく冗長なので、Momentjsとやらを使ってみる。
こうなった。
var this_day = Moment.moment() var this_friday = Moment.moment(this_day).isoWeekday(5);
で、スプリントの期間を取得するのはこうなる
/** * 引数に指定した日付からスプリントの文字列を取得する * Momentjsに依存 * https://tonari-it.com/gas-moment-js-moment/ */ function getActiveSprint(){ // this_day = 2018.09.17 -> 2018.09.14-20 var this_day = Moment.moment().startOf('day') var td = this_day.format('YYYY-MM-DD HH:mm:SS') var this_friday = Moment.moment(this_day).isoWeekday(5); var previous_friday = (this_day < this_friday) ? Moment.moment(this_friday).subtract(7, 'days') : Moment.moment(this_friday) var this_thursday = Moment.moment(this_day).isoWeekday(4); var next_thursday = (this_thursday < this_day) ? Moment.moment(this_thursday).add(7, 'days') : Moment.moment(this_thursday) return previous_friday.format('YYYY.MM.DD') + '-' + next_thursday.format('DD') }
さらに共通化してこうなった
/** * 引数に指定した日付からスプリントの文字列を取得する * Momentjsに依存 * https://tonari-it.com/gas-moment-js-moment/ */ function getActiveSprint(dt){ var this_day = Moment.moment(dt).startOf('day') var previous_friday = getNextOrPreNDay(this_day, 'PREVIOUS', 'FRI') var next_thursday = getNextOrPreNDay(this_day, 'NEXT', 'THU') return previous_friday.format('YYYY.MM.DD') + '-' + next_thursday.format('DD') } /** * dtから次または前のN曜日の日付を返す */ function getNextOrPreNDay(dt, nxPre, day) { var dayStrs = ['SUN', 'MON', 'THE', 'WED', 'THU', 'FRI', 'SAT']; var dayNum = dayStrs.indexOf(day) var this_day = Moment.moment(dt).startOf('day') var this_n_day = Moment.moment(this_day).isoWeekday(dayNum); var previous_n_day = (this_day < this_n_day) ? Moment.moment(this_n_day).subtract(7, 'days') : Moment.moment(this_n_day) var next_n_day = (this_n_day < this_day) ? Moment.moment(this_n_day).add(7, 'days') : Moment.moment(this_n_day) if ( nxPre == 'NEXT' ) { return next_n_day; } else if(nxPre == 'PREVIOUS') { return previous_n_day } }
テスト
QUnit.helpers( this ); function doGet( e ) { QUnit.urlParams( e.parameter ); QUnit.config({ title: "GAS のユニットテスト" }); QUnit.load( myTests ); return QUnit.getHtml(); }; function myTests() { module('getActiveSprint 関数テスト'); test('正常系', function () { strictEqual(getActiveSprint(new Date('2018-09-14')), '2018.09.14-20', '2018-09-14は金曜日なので、14-20'); strictEqual(getActiveSprint(new Date('2018-09-13')), '2018.09.07-13', '2018-09-13は木曜日なので、7-13'); }); }