LIVESENSE Data Analytics Blog

リブセンスのデータ分析、機械学習、分析基盤に関する取り組みをご紹介するブログです。

DigdagとEmbulkで行うDB同期の管理

データプラットフォームグループの松原です。 弊社各サービスのデータ分析基盤であるLivesense Analytics(以降LA)の開発、運用を行っています。
今回はLAで行っている分析のためにサービス側のデータ(テーブル)を、Redshiftへ同期を行う処理について紹介します。

概要

LAではデータウェアハウスとしてRedshiftを運用しており、社内から比較的自由に利用できる様にしています。
LAで取り扱っているデータはアクセスログが中心ですが、分析を行う利用者からはLA由来のデータ以外にも自分たちのサービスのデータを用いて分析を行いたい、という要望がよく出てきます。
サービスのデータには個人情報を含むものも少なくありませんが、分析基盤として社内にデータを解放するためにはそのような情報は削る必要があります。
そこで個人情報をマスキングしたサービス側データを利用できるよう、Redshiftに同期しています。

システム構成(概要)

大まかなシステム構成としては、下図のようになっています。

f:id:livesense-analytics:20190227151830p:plain:w650

多くのテーブルを同期しているので、カラム単位のマスキング処理や加工処理はEmbulkで行わず、同期元のデータベースであらかじめ行っておきます。

Digdagの選定理由

BigData-JAWS勉強会でAirflowのことを話してきましたにもあるように、すでにAirflowを運用していますが、ここではDigdagを選定しています。
主な選定理由としては以下があります。

  1. 構築や運用の容易さ
    Digdagはサーバーモードで動かしても、JavaとPostgreSQLの動く環境があれば運用を開始できます。
    また、Airflowと比べると、今のところ構築・アップグレードなどの運用は容易です。
  2. やりたいことがシンプル
    今回はsh(embluk)、redshift、httpオペレータで用途として足りそうで、それぞれのタスク間の依存関係も複雑では無いです。
    そのためタスク間の依存関係はYAMLで宣言的に書いたほうがメンテンスしやすいと考えました。
    また、DAG自体も複雑で無いため、どこまで実行できたか・どこで失敗したのかの可視性は求めていないのでダッシュボードも簡易なもので問題ありませんでした。

これらの理由により、Airflowで統一するより用途によりツールを使い分けた方が運用が容易なのでは、という結論になりました。

規模感

Redshiftへ同期しているデータに関する情報ですが以下のとおりです。

同期元のデータベース: 13個
同期を行っているテーブル数: 280テーブル
同期対象のデータベースの種類: 2種類(MySQL、PostgreSQL)
同期先: 2種類(Reshift、Redshift Spectrum)

これらのテーブルは、毎日業務開始前に前日のデータを同期しています。
同期はデータの増分を追加するのではなく、全量差し替えています。

Digファイルの依存関係

現時点では同期対象のテーブル数が280テーブル程度あるため、テーブル個別のデータ変換処理はなるべく行わないようにしています。
embulkやdigファイルを共通化しておくことで、同期対象テーブルの追加などの依頼があった場合にはテーブルとカラムの情報が書かれたファイル(下記の例だとtest1_tables.dig)のみを変更すれば良いようにしています。
また、テーブル個別のデータ変換処理はできるだけ行わないようにしていますが、変換が必要な場合はAirflow側で処理を行うようにしています。

2つのDBと同期を行う際のdigファイルの依存関係は、下図ようになります。

それぞれのファイルは以下のような役割で分けています。

ファイル名 概要
test1.dig Workflowを表すdig
test1_secret.dig 同期元への接続情報を保持しているdig
test1_tables.dig 同期対象のTable,Columnを定義しているdig
from-mysql-to-redshift.dig Embulkの実行・Redshiftへの処理を定義しているdig
in-mysql-out-s3.yml.liquid Embulkの設定ファイル
create.sql 一意になるテーブル名を作成
copy.sql EmbulkでS3においたファイルを、create.sqlで作成したテーブルにCOPY
swap.sql 既存のテーブルとSWAPし既存のテーブルを削除

スケジュール設定、通知・エラー処理などを省略すると、主だったdigファイルは以下のようになります。

--test1.dig
_export:
  mysql_database: MySQLの接続先DB名(ex. tes1)
  la_schema: redshift上のスキーマ名
  !include : digdag/environment/env.dig
  redshift:
    host: redshiftのホスト名
    database: redshiftのDB名
    user: la_importer
    ssl: true
    schema: ${la_schema}
    strict_transaction: false
    connect_timeout: 600s

+task:
  _export:
    !include : digdag/secret/test1_secret.dig

  for_each>:
    !include : digdag/media/test1_tables.dig
  _do:
    !include : digdag/flow/from-mysql-to-redshift.dig
--digdag/media/test1_tables.dig
mysql_table_info:
  - table: table1
    column: id, created_at, updated_at
  - table: table2
    column: id, created_at, updated_at
--digdag/flow/from-mysql-to-redshift.dig
_export:
  la_table: ${mysql_table_info.table}
  s3_file_path: mediadb/${la_schema}/${la_table}

+from-mysql-to-s3:
  sh>: embulk run -b /home/digdag/embulk_bundle /home/digdag/dag/embulk/in-mysql-out-s3.yml.liquid
  _export:
    mysql_table: ${mysql_table_info.table}
    mysql_select: ${mysql_table_info.column}

+mysql_redshift_create_temptable:
  redshift>: digdag/flow/queries/create.sql

+mysql_redshift_copy_2_temptable:
  redshift>: digdag/flow/queries/copy.sql

+mysql_redshift_swap_table:
  redshift>: digdag/flow/queries/swap.sql

運用上行っていること

以下のようなことを行って、運用コストを下げています。

  • Digdagで同期するテーブル情報の作成・Redshift用のCreate Table文(SORTKEY, DISTKEYは手動で設定)の作成のスクリプト化
  • タスクのログをS3に出力し、(shオペレーターから呼び出した)EmblukのログをLambdaでパース処理を行いエラー内容をSlackへ通知
  • Redashで同期済みデータのヘルスチェック
  • Errbotとmogを用いてChatOps(DAGの実行)
  • Redshift上に同期したが、1年以上利用されてないテーブルの割り出しと、今後も同期するのかの確認(不要テーブルを同期対象から外す)

これらを含めると、全体では以下のような構成になります。
f:id:livesense-analytics:20190220113717p:plain:w650

まとめ

もともとは Airflow を用いたデータフロー分散処理にあるバッチ処理と同様にRakeとwhenever(Cron)で運用されていたものをDigdagに置き換えました。
置き換えを行うことで、メンテナンスコストはだいぶ下げることが出来たのではないかと思います。
現在はデータソースがどこにあるのかと処理の複雑さによって、AirflowとDigdagのどちらを使うのかを使い分けています。

今後やっていきたいこと

今後もDigdagを用いた同期の改善の他にも、以下のようなこともやってきたいと考えています。

  • DigdagとAirflowの連携の強化(同期完了をトリガーとしてAirflowのDAG実行)
  • マスキング処理にProxySQLを使うことで、任意のタイミングでの同期の実行
  • CDC(Change Data Capture)を用いた変更内容の(ほぼ)リアルタイムの同期