LIVESENSE Data Analytics Blog

リブセンスのデータ分析、機械学習、分析基盤に関する取り組みをご紹介するブログです。

LivesenseDataAnalyticsBlog

MENU

Airflow を用いたデータフロー分散処理

こんにちは。テクノロジカルマーケティング部 データプラットフォームチームの村上です。

弊社各サービスのデータ分析基盤であるLivesense Analytics(以降LA)の開発、運用を行っています。

本日は弊社データプラットフォームでも活用しているフロー管理ツールAirflowについて、分散処理の概要や最近の更新についてご紹介します。

Airflowについて

Airflowとは

github.com Airflowはデータ処理フローを管理するツールです。元々はAirbnb社が開発して公開したソフトウェアです。その後Apacheソフトウェア財団のサポートを受けて現在はApache Airflowという正式名称になっています(本ブログでは以下Airflowと記述します)。ライセンスはApache License2.0です。本体コードはpythonで作られています。

2018年2月現在、正式リリースされているバージョンは1.9です。また、本ドキュメントのコードや説明はAirflow1.9をベースとします。

LA へのAirflow導入の背景

LAの主データベースにはRedshiftを採用しています。 LA 上のデータは弊社各メディアのフロントサイドのWebアクセスデータとサーバーサイドの顧客データ、そしてそれらをサマライズしたデータ等で構成されています。 サーバーサイドデータについては個人情報を匿名化しRedshiftに投入するだけで良いのですが、Webアクセスデータについては広告を制御したり詳細な顧客分析をする用途に用いられるので、生データのままではなく、広告チャンネルやセッションでの統合やサマライズなどを施した加工済みデータが求められます。 このような分析サイドの要求に答えるため、アクセスログデータの加工処理バッチを複数開発してきましたが、これらをcronで運用していたため管理が難しい状況になっていました。

このような背景から、データフロー処理ツールAirflowの導入を行いました。

Airflowは分散処理そのものを自前で実装せずに外部のフレームワークを利用しています。外部のフレームワークを利用することでシステムの柔軟な構成が可能となっている一方、Airflowを構成するサービスの構成、役割分担を理解していないと、効果的な設定を行うことが出来ません。

本稿ではAirflowを構成する各サービスの役割やDAGの書き方を中心に導入方法を解説します。

Airflow+CeleryExecutorで分散処理を行う

以下、弊社でも現在利用しているCeleryExecutorを使った分散処理のお話をしたいと思います。

インストールと設定の概略、常駐プロセスや分散処理用のパラーメータの設定等について触れます

そもそもCeleryとは

Celeryは複数のノードで分散して非同期でタスクキュー/ジョブキュー処理を行うためのフレームワークです。

Celeryはノード間のメッセージのやり取りにRabbitMQやRedis等のミドルウェアを使用します。Celeryではこれをbrokerと呼んでいます。一旦brokerを設置すれば、後は処理ノードを増やすだけで簡単にタスクの処理分散を実現できるのがCeleryの利点です。

CeleryExecutorはAirflowのクラスです。CeleryExecutorを使うと、Celeryを使ってAirflowタスクの分散処理を行えるようになります。

構成

f:id:livesense-analytics:20180201123428p:plain

CeleryExecutorを利用したAirflowサーバーをセットアップしてみます。

webserverとscheduler、workerを一つのサーバーにインストールします。 *1

systemdが動いているlinuxにインストールすることを想定した手順です。*2

また、python3, pip が使える前提です。

インストール

$ mkdir ~/airflow

$ export AIRFLOW_HOME=~/airflow

$ pip install apache-airflow

$ pip install "apache-airflow[mysql, celery]"

$ airflow initdb

pip install airflow と打ってしまうと、パッケージ名変更(Airflow1.8.1)以前のバージョンがインストールされますので注意して下さい。

基本設定

予め以下のデータベース類が用意されている前提です

  • airflow metadata database(DAGスケジュールやDAG Runの情報が入っているデータベース )
    • MySQLを使用
  • celery backend database (Celeryバックエンド用データベース MySQL, PostgreSQL, oracle が使える)
    • MySQLを使用
  • celery message broker (タスクキューイング用 redis, RabbitMQ等が使える)
    • redisを使用

airflow initdb を行うと、設定ファイルairflow.cfg のテンプレートが生成されますので、環境に応じて修正を加えてゆきます。

airflow.cfg

[core]

airflow_home = /path/to/airflow

executor = CeleryExecutor

sql_alchemy_conn = mysql://[metadata databaseのエンドポイント]

...

[celery]

celery_result_backend = db+mysql://[celery backend databaseのエンドポイント]

broker_url = redis://celery brokerのエンドポイント

...

設定したら、airflow initdbを行って、メタデータを初期化します。

$ airflow initdb

コンソールからwebserverを実行して、ブラウザからhttp://localhost:8080 にアクセスし、設定がうまくいっていることを確認します。

$ airflow webserver

f:id:livesense-analytics:20180131141028p:plain

Airflowの常駐サービス

Airflowには3種類の主要な常駐サービスがあります。

これらのサービスはデータベースやキューイングマネージャを介して完全に独立して動作するので、別々のサーバーで動作させることもできます。*3

特に分散処理を行う場合は、workerを動作させるworkerサーバーを複数並列に実行することになると思いますが、そのようなことも問題なく実現できます。*4

  • airflow webserver
    • AirflowのWebUIの処理、出力を行います
  • airflow scheduler
    • DAGの状態を監視して、スケジュールに応じてDAG Runを作成します
    • CeleryExecutorを使用する場合、Operatorのキューイングを行い、各ワーカーにOperatorの処理を分散します
    • (Sequential Executor を使用する場合、schedulerの子プロセスでDAG Runの処理が行われます)
  • airflow worker
    • schedulerがキューイングしたOperatorの処理を実行し、結果を格納します
    • (Sequential Executor を使用する場合には airflow worker を常駐させる必要はありません)

systemdで常駐化

以下のsystemdスクリプトを使用します scheduler, webserver, worker を常駐化します github.com セットアップの詳細は割愛します

ここまでの設定をすることでlinuxマシン上でwebserver, scheduler, workerが常駐して動き続けているはずです。

CeleryExecutorを利用する場合の並列処理周りのパラメーター

airflowには並列実行系のパラメーターが複数あり、理解しにくいので整理します

airflow.cfg

[core]

parallelism = x

dag_concurrency = x

max_active_runs_per_dag = x

...

[celery]

celeryd_concurrency = x

...

設定名 意味
parallelism 分散処理クラスタ全体で実行可能なプロセス数
dag_concurrency 一つのワーカで同時実行可能な最大プロセス数
max_active_runs_per_dag DAG内部で同時実行可能な最大タスク数
celeryd_concurrency(Airflow1.9.1以降ではworker_concurrency) 一つのCeleryワーカで同時実行可能な最大プロセス数

これらのパラメーターを適切に設定することで、分散処理のパフォーマンスをチューニングすることができると思います。

workerサーバーを複数台立ててタスクを分散処理する

f:id:livesense-analytics:20180201130947p:plain

同じ設定でworkerサービスのみを常駐させるサーバーを別途作成すれば、workerの処理を分散させることができます。

DAG

DAGとは

グラフ理論における有向非巡回グラフ のことです。有向非巡回グラフの場合、

  • 有向:辺には矢印のように方向がある
  • 非巡回:矢印に沿ってノードをたどっていった時にループが存在しない

といった特徴があります。

ループが存在しないので、DAGではノード間の依存順序を必ず解決できます。この特徴は依存関係があるワークフローを柔軟に途中実行したりする場合に有用なので、多くのワークフロー管理ツールが依存関係グラフとしてDAGを採用しています。AirflowもワークフローをDAGで記述します。

AirflowのDAGについて詳しくはこちらのスライドで説明されています。

DAGファイルを書く

Airflowチュートリアル

Airflowではデータフローをpythonのコードで記述します。このファイルを単にDAG又はDAGファイルと呼びます。

DAGファイルは airflow.cfgで設定したdags_folder (デフォルトでは $AIRFLOW_HOME/dags ) に設置します。

以下、DAGファイルの書き方について説明します。

1. dagオブジェクトを生成

DAGを定義するには、pythonのグローバルスコープにDAGオブジェクトを生成します。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

#dagを介してオペレーターに渡す引数を定義
operator_args = {
    'owner': 'airflow', 
    'depends_on_past': False,  #指定したタスクの上流タスクの実行が失敗した場合、タスクを実行するかどうかを設定します
                               #True:実行する False:実行しない
    'start_date': datetime(2018, 2, 1),  #DAG Runの生成期間の開始日時です
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

#dagの生成
dag = DAG(
    dag_id='dag_example1', #dagの識別子
    default_args=operator_args, #オペレーターに渡す引数を渡します
    catchup=False, 
    schedule_interval='0 0 * * *' #cronの時間形式で記述します
)
...

DAGの主な引数をまとめます

引数 意味
dag_id DAGに付ける識別名。
schedule_interval DAG実行間隔を指定します。cron形式の表記ができます(ex. '0 13 * * *')または '@daily' '@hourly' 等の表記もできます。詳細はドキュメントを参照して下さい
default_args operator生成時に指定する引数をdictionary形式で渡します。DAGファイルを記述する際には、一つのDAGに複数のオペレーターを紐付ける事が多いと思います。そのような場合、DAGにoperatorの引数を渡しておくとコード量が減って見やすくなります。
catchup(Airflow1.8以降) True/False で記述。 schedulerでDAG Runの遡り生成(Catchup)を行うかどうかを指定します。Falseを指定するとCatchupしなくなります。Catchupについては後述

2. オペレーターインスタンスの生成

task1 = BashOperator(
    task_id='task1', #taskの識別子
    bash_command='date', #bashで実行する内容。BashOperator特有
    dag=dag #オペレーターをdagと結びつけます
)

task2 = BashOperator(
    task_id='task2', 
    bash_command='sleep 5',  
    dag=dag 
)

DAGオブジェクトを生成したら、次はDAG内での処理単位となるオペレーターオブジェクトを生成します。 Airflowはオペレーターオブジェクト単位でデータフローを作り上げるという用途を想定して作られているので、各種データベースと接続するためのオペレーターが定義されています。

オペレーターは種類が多いですが、以下、主要なオペレーターの概要をまとめます

incubator-airflow/airflow/contrib/operators at master · apache/incubator-airflow · GitHub

incubator-airflow/concepts.rst at master · apache/incubator-airflow · GitHub

オペレーター 内容
SSHExecuteOperator(Airflow1.9で廃止) SSHで他のサーバーにログインしてコマンド実行するためのオペレーター
SSHOperator(Airlfow1.9から採用) 新しいSSH用のオペレーター
BashOperator workerローカルでbashコマンドを実行するためのオペレーター
PythonOperator workerローカルで任意のPython関数を実行するためのオペレーター
EmailOperator emailを送信するためのオペレーター
HTTPOperator HTTPリクエストを送信するためのオペレーター
MySqlOperator, SqliteOperator等 SQLクエリを実行するためのオペレーター
Sensor 指定時間、ファイル、データベース行、S3 key等を取得できるまで待つためのオペレーター

3. オペレーターインスタンス間の依存関係を記述

オペレーターのインスタンス同士を依存関係で結びます。

依存関係を記述するには、set_downstream()、 set_upstream() 関数を使いますが、Airflow1.8からはシフト演算子も使えるようになりました。

Bitshift Composition

f:id:livesense-analytics:20180201150723p:plain

#operator1を実行してからoperator2を実行する
#古い表記例
task1.set_downstream(task2)
#または
task2.set_upstream(task1)

#新しい表記例(Airflow1.8以降)
task1 >> task2
#または
task2 << task1

4. DAGファイルの読み込み

f:id:livesense-analytics:20180201151427p:plain

f:id:livesense-analytics:20180201151537p:plain

ここまでの設定を dags_folder 上にあるファイルに保存したらDAGの設定は完了です。

記述したグラフに矛盾や間違いがなければAirflowが自動で読み込んでくれるはずです。

読み込まれない場合はschedulerを再起動してみましょう。*5

Catchupについて

DAG Run

DAG Run はDAGの一回分の実行を表現するメタデータ上の概念です。 DAGの実行をする際には、スケジュール実行する場合も、一回限りの即時実行をする場合も、DAG Runがメタデータ上に作成されます。 DAG Runはsuccess, failed, running等の実行状態を保持しています。

Catchup動作

scheduler は DAGの schedule_interval で設定されている時間になるとDAG Runを生成してDAGの処理を始めます。

この動作は一見何でもないように見えますが、実は、 scheduler は前回のDAG Runが存在するかどうかチェックしています。

前回分のDAG Runがあれば最新のDAG Runを一つ生成するだけですが、 前回分のDAG Runがない場合、履歴をたどっていって最も近い過去に実行されたDAG Runから現在時刻までの間のDAG Runをまとめて生成します。 過去に一度もDAG Runが生成されていない場合、DAGに定義されている start_date から現在時刻までのDAG Runsを一括生成します。

この動作を Airflow では Catchup と呼んでいます。

例えば、あるDAGを1日一回実行していましたが何らかの事情で1月3日分まででで止めてしまったとします。その後、2月1日にDAGの実行を再開すると、1月4日から2月1日の分のDAG Runが一気に生成されて順次実行されます。

f:id:livesense-analytics:20180131195311p:plain

DAGからexecution_dateを受け取って指定日1日分のデータを処理するようなフローの場合はこれで良いのですが、最新のデータだけを処理すれば良いフローの場合、DAG Runがたくさん生成されても困ってしまいます。

このような場合にはCatchupを無効にすることができます。方法は2つあります。

  1. airflow.cfg を書きかえてCatchupを無効にする

    全てのDAGでCatchupが無効になります

    設定するには、airflow.cfgを書き換えます

    [scheduler]

    catchup_by_default = True

  2. DAGの引数に指定する

    指定したDAG内だけでCatchupが無効になります

    catchup = True / catchup = False

実際にLAにAirflowを導入してみて

Airflowを導入して良かったところ

  • Airflow側で実行ログを保存してくれる トラブルの際にタスク実行サーバーにログインしてログを確認する必要がなくなりました

  • タスク毎の実行履歴が見られる 実行履歴からログが見られるので状況の把握や問題の原因究明がやりやすくなりました

  • 依存関係があるデータフロー処理タスクをDAGでコード記述できる 依存関係の記述がやりやすくなりました。 また、データ処理フローをコードで記述できるので、バージョン管理しやすくなりました

  • データフローをグラフィカルに確認できる 複雑なデータフローを記述する際に、依存関係に間違いがないか一目で確認できるようになりました

微妙だった所

  • DAGを消してもWebUIからdag_id名が消えない

    リネームしたDAG等が残ってしまいます。そういう仕様だと思って割り切って使っていますが、リネームする項目が増えた時にどうしようかと思ってしまいます

  • DAG導入のタイミングによっては、ExecutionDateとStartDateがズレてしまう

    残念ながら、この挙動については原因が追いきれてません

まとめ

今回はAirflow分散処理の概要についてご説明させて頂きました。以下にAirflowの最近の更新履歴概要の抄訳を付録としまして置きましたので最近の状況についてご確認いただければと思います。

付録:Airflow 最近の更新状況

Apache Airflowはこの1年で2回の大きなアップデート(version 1.9, version 1.8)を行っています。 以前とは設定項目などが色々変わっていますので、以前利用されていた方も最新の状況を確認されるのが良いと思います。

incubator-airflow/UPDATING.md at master · apache/incubator-airflow · GitHub

以下、抄訳です。

次期バージョン(予定)

Celeryコンフィギュレーションパラメータ名が変更になります

  • celeryd_concurrency から worker_concurrency に変更
  • celery_result_backend から result_backend に変更

GCP Dataflow Operators

  • Dataflow job labeling が Dataflow {Java,Python} Operator でサポートされます

Airflow 1.9

リリース 2017/12

SSH HookがサブプロセスベースのSSHからParamikoライブラリを使用したものに切替え

  • SSHExecuteOperatorは廃止になりました。SSHOperatorを使用して下さい
  • 併せて、SFTPOperatorが追加されました

airflow.hooks.S3_hook.S3Hookがboto2からBoto3を使うようになりました

  • s3_conn_idは受け付けなくなりました。aws_conn_idを使って下さい
  • デフォルトコネクションがs3_defaultからaws_defaultに変更

ログシステムの作り変え

  • pythonのloggingモジュールを使うように変更されました

  • 既存のLoggingMixinクラスを拡張することでロギングをカスタマイズできるようになりました

Dask Executor の導入

  • Dask分散クラスタでタスクを実行できるようになりました

Airflow 1.8.1

リリース 2017/5

パッケージ名がairflowからapache-airflowに変更されました

Airflow 1.8

リリース 2017/3

metadataデータベースの構造が変更されたので、バージョンアップするにはmetadataのアップグレードが必要になりました

  • metadataデータベースをアップグレードするには、airflow upgradedb を実行する必要があります

poolの管理がより厳密になりました

  • バージョン1.7.1では、許可されている以上の数のプールを取得できる問題がありましたが、今回のバージョンからできなくなりました

スケジューラーのオプションが新しくなりました

  • child_process_log_directory

    • 堅牢性を向上するために、DAGSの処理はスケジューラーとは別のプロセスを立てて実行するようになりましたのでそれぞれがログを出力します
    • それぞれのDAGS処理プロセスが、Airflow.cfg上で設定した child_process_log_directory 以下にログファイルを出力します
  • コマンドラインオプション num_runs の意味が変わりました

    • スケジューラーの最大リトライ回数 から run_duration時間内の最大リトライ回数に変更されました
  • min_file_process_interval

    • 省略
  • dag_dir_list_interval

    • 省略
  • catchup_by_default

    • 本ドキュメントで触れました

DAG処理中のエラーがWebUIに表示されなくなりました

  • DAG処理エラーは airflow.cfgで設定されるchild_process_log_directory 以下に出力されます

新しく登録したDAGはデフォルトでポーズ状態になります

  • Airflow.cfg 内に dags_are_paused_at_creation = False と記述することで以前のふるまいに戻すことができます。

コンテキスト変数が Hive config に渡せるようになりました

Google Cloud Operator と Hook が整理されました

deprecated扱いになり、将来的に削除される機能があります

  • HookとOperatorはairflowオブジェクトから直接参照できなくなります。サブモジュールからimportして下さい。
  • Operator._init_()は任意の引数を許容しなくなります
  • secure_modeはデフォルトでTrueになります

*1:(実際に分散処理を行う場合にはwebserverとschedulerで一台、workerだけのマシン複数台で運用すると思いますが)

*2:具体的なディストリビューション等についてはここでは言及しません

*3:当然ですが metadata database としてローカルマシンのデータベースを使用している場合 webserver と worker を別々のサーバーで動かすことは出来ません

*4:Airflowは分散マシン間のDAGファイルの同期について一切面倒を見てくれません。workerを複数動かす場合、バージョン管理ツールや構成管理ツール等を用いて、各workerサーバーのDAGファイルの中身をユーザーが同期しておく必要があります

*5:スケジューラーはairflow.cfgのmin_file_process_intervalに設定されている間隔で更新をしていますが、この設定値には下限があって、デフォルト値の0設定でも3分間隔になるようです。また、新しいファイルの監視の設定はdag_dir_list_intervalでこちらはデフォルト5分間隔で管理しているようです。

Kubernetes を利用したコンテナベース機械学習基盤の構築

データプラットフォームチームの野本です。機械学習基盤の構築やその周辺アプリケーションの実装を行っています。以前は DOOR 賃貸の開発運用をしていてこんなことなどしてました。

機械学習システム運用の課題

リブセンスでは 2014 年ごろから機械学習システムの開発導入を行っており以降様々な機械学習システムを各サービスに導入してきました。また自社でのデータ分析基盤の運用も行うようになってから機械学習システムの開発の幅が広がり導入の要望も次第に増えてきました。(参考:リブセンスのデータ専門組織のこれまでとこれから)

当初は機械学習システムに対する運用知見などが少なかったため、専用のインフラというものは保持せず各サービスのインフラに相乗りし、サービスのアプリケーションと密に連携し機械学習システムを実装運用することが多かったです。各サービスは元々オンプレミスで運用されていたものが多かったのですが、現在は AWS などクラウド環境への移行を行ったり新規サービスは最初からクラウドを利用したりと各サービスのインフラは多様になってきています。 また、機械学習システム自体の数も増えており、次第に以下のような課題が顕著になってきました。

  • 導入するサービスのインフラに依存しがちになる
    • 利用したい言語 / ライブラリに制限
    • マシンリソースの制限
  • 似たようなシステムの複数サービスへの導入
    • 同じような実装が色々なところにちらばる
    • 運用コストが増える
  • 機械学習エンジニアが機械学習の実装以外のことにリソースを取られることがある
    • デプロイ作業
    • 各サービスのインフラ上でのオペレーション / 調査

これらの課題を解決するため、また運用の知見やノウハウも蓄積されてきたこともあり機械学習基盤を構築することとなりました。

機械学習基盤の構築

マルチコンテナ構成による機械学習アルゴリズムとアプリケーションの疎結合化でも触れていますが、GCP / GKE 上にコンテナを利用した機械学習基盤の構築を進めています。 現状のシステム概要図は以下です。

f:id:livesense-analytics:20180117130701p:plain

ざっくりとした概要図ですが、以下にその目的と全てではないですが実際にどのよう構成になっているかを簡単に解説をしていきます。

柔軟な機械学習システムの開発 / 検証 / 運用を目指す

今までの運用から、機械学習システムの開発において次のような課題解決の実現を目指しました。

  • 利用言語 / ライブラリの柔軟な選択
    • その為のプロビジョニング負荷の軽減
    • 開発者のローカル環境と本番の差異をなくす
  • モデルのアルゴリズム / パラメータを繰り返し調整しながら開発が出来る環境
  • 計算リソースの柔軟な確保
  • 本番同等の検証環境

これらを実現するのには現在はやはり Docker のようなコンテナを利用するのがベターであると考え、それを運用する環境として Kubernetes を採用することとしました。

Kubernetes の利用

Kubernetes の機能の詳細の紹介はここでは省きますが、Kubernetes を選んだ理由は次のようなものが上げられます。

  • ほぼ全ての設定をコード化 (yaml) 出来る
    • 開発 / 本番ごとに共通の設定で環境を構築出来る
  • デプロイ / ロールバックを素早く行える
  • クラスタ内の各 Pod / Service の連携が楽
  • ConfigMap / Secrets を利用して設定を共通化出来る
  • CronJob により簡易にバッチを運用出来る

実際に利用してみると Kubernetes 上でのコンテナのデプロイは本当に簡単/柔軟に行え、これであれば柔軟な機械学習システムの開発に十分活用出来ると思い採用に至りました。

GCP / GKE の採用

Kubernetes を運用する環境としては GCP / GKE を採用しました。

現在の機械学習基盤構築チームの前提として

  • 現状機械学習基盤には専任のインフラエンジニアがいない
  • アプリケーションや機械学習システムの開発運用も並行して行っている

といった状況で、出来るだけインフラ管理のコストがかからない GKE のようなマネージドサービスの利用が理想です。

他にも以下のような GCP 利用のメリットがあります。

  • リブセンスでは Google Apps を利用しているので Google のアカウント連携が出来る
  • GCE インスタンスの起動が速い
  • GKE の Stackdriver Logging との連携が便利
  • タグによるネットワークのルーティングが便利
  • Identity-Aware Proxy によるアプリケーションの認証

今回は 0 からの構築のため大規模な移行などのような状況よりも選択の自由度が大きかったこともあり GCP / GKE を全面的に利用することに大きな障壁はあまりなかったので採用出来ました。

コンテナレジストリ

リリース前の検証クラスタも構築しており、本番と同等のコンテナ / Kubernetes の設定を検証出来るようになっています。 Docker のレジストリには GCR (Google Container Registry) を利用しています。 GCR はそれ専用の独立した GCP プロジェクトで運用しており、本番 / 検証プロジェクト双方からアクセス出来るようになっています。 またコンテナのビルドには Container Builder を利用しており、基本的に GitHub 上のリポジトリで master へのマージが行われた際にビルドするようになっています。

helm / デプロイ

Kubernetes の設定は最初は標準の yaml で行っていましたが現在は helm を利用しています。 helm にすることにより、検証 / 本番環境の設定の切り替えが容易に行えます。 またデプロイは担当のエンジニアがまだ小人数であることもあり、各エンジニアのローカルマシンから行っています。 より多くのエンジニアがこの基盤を利用し始める場合には学習コストがそれなりにあったり不慮の事故にもつながるので今後改善すべき課題と思っています。

ちなみに手元で kubectl を操作する時 auto completion を使うととても便利です。

CronJob

CronJob は Kubernetes で提供されている機能で、いわゆる crontab の機能を Kubernetes 上で実現出来るものです。 現状稼動しているシステムはほぼバッチであり、その運用を考えた時フローがさほど複雑でないバッチの実行にワークフローエンジンのようなものの運用を行わないで CronJob を利用出来るのはとても便利です。 基盤の構築開始時には alpha クラスタのみでしか利用出来なく、実際に利用出来るようになるまでは crond で kubectl exec などをスケジュール実行するような簡易的なコンテナを作成して対応していましたが現在は CronJob に置き換えています。

データ分析基盤の活用

今までもブログなどで繰り返し紹介していますが、リブセンスでは自社でデータ分析基盤を構築運用しています。 データ分析基盤は AWS 上に構築していて、各サービスの各種ログはこの基盤上に構築している AWS Redshift に集約して保持しています。 各サービスの開発者 / 運用者は日常的にこのデータを使ってサイト性能などを分析しており、既存の機械学習システムも基本的にはこのデータを元に実装しています。 なので、機械学習基盤からも手軽にこの Redshift にアクセス出来る必要があるのですが、データ周りのことはこのデータ分析基盤を中心に考えることによりむしろ機械学習基盤の構築にフォーカスを絞ることが出来ます。

現在は機械学習基盤からの接続は Pgpool をプロキシとして利用し Redshift にアクセスするように構築しています。

  • Redshift へのアクセス許可を固定 IP で行う
  • 各コンテナからは Pgpool の URL でアクセス
  • ロードバランサを利用した冗長構成も取りやすい
  • 現在は利用していないが、必要であればクエリキャッシュも導入出来る
  • GCE での運用ではあるが、Container-Optimized OS を利用し Pgpool をコンテナ化して運用している

データ分析基盤の活用の現状と今後は以下のように考えています。

  • データが AWS 上に保持、計算は GCP 上で処理となっているが、現状バッチがメインなので転送コストやレイテンシは今のところそれほど問題ではない
  • リアルタイム性が必要になるなど機械学習固有の要求が発生するようなログ収集は機械学習基盤側で構築していくかもしれない
  • 各サービスのログ以外のデータも一部データ分析基盤には取り込まれているのでそれを利用出来る
    • 存在しないデータが出てきた時のために embulk を利用した取り込みも行えるようにはなっている

まとめ

よかったところ

  • Docker / Kubernetes の利用によるデプロイの早さ / 容易さ
  • Docker を利用することでアプリケーション個別のプロビジョニングがほぼ必要ない
  • コンテナ化 / helm による設定により環境のコード化を強制出来る
    • どのような環境を構築するのかが分かりやすい
    • 検証環境 / 本番での差異が出にくい
  • GCP の各サービスの活用も視野に入れやすい
    • Flexible Environment になり便利になった GAE の活用
    • データ分析基盤で補えないログに対する BigQuery の活用
    • ML 系サービスの活用

今後やっていきたいこと

  • CI / CD の確立
    • 出来るだけ Kubernetes のことを知らないエンジニアでも利用出来るように
    • 出来るだけ機械学習システムの開発フローに寄り添ったものに
    • Spinnaker などのツールも検証中
  • 計算リソースの柔軟なスケーリング
    • 機械学習にはつきものの潤沢な計算リソースの分配を手軽に
    • GPU の利用も機会があれば
  • 機械学習固有の問題解決
    • モデルの精度監視環境の構築
    • オンラインシステムの構築
  • などなど

機械学習基盤の構築はまだ始まったばかりで現状は既存のシステムの移行を行いながらこれら基盤の構築を進めているという現状です。変化のとても速い Kubernetes 周辺技術を適切に取り込んでいくとともに、基盤上で新たなシステムを実装しながらより柔軟な基盤を作っていければと思っています。(半年後には全然違う構成になってるかもしれません。)また、今後この基盤上で実装していく個別の機械学習システムに関しても紹介出来ればと思います。

Livesense Analyticsを支えるELT/ETL処理と運用

データプラットフォームグループの松原です。 弊社各サービスのデータ分析基盤であるLivesense Analytics(以降LA)の開発、運用を行っています。
今回はLAで行っているELT/ETL処理について紹介したいと思います。

LAでのELT/ETL処理概要

そもそもELT/ETLとは

ETLとはデータを整形してからDWH/データマートへ取り込む処理のことで、Extract(抽出) -> Transform(変換) -> Load(読み込み) の順で処理を行うためETLと呼ばれています。
ELTとはデータをDWH/データマートへ取り込んだのちに変換を行う処理のことで、Extract(抽出) -> Load(読み込み) -> Transform(変換) の順で処理を行うためELTと呼ばれています。

LAでのELT/ETL処理概要

LAではEMRを利用するまでは、ELTを中心に行っていましたが、EMRを利用してからはELT/ETLを両方行うようになりました。
LAでは大まかには以下のような構成でELT/ETL処理を行っています。

f:id:livesense-analytics:20180110112342p:plain:w500

LAでは主に3つELT/ETL処理があり概要は以下のようになっています。

  1. Redshiftで実行するSQLバッチによるELT
    LAの運用初期からあり、利用者向けのテーブルの作成などを行っています。
  2. EMR Sparkを利用したETL
    SQLでは実現しづらい複雑な処理(セッションの集計処理など)を実施しています。
  3. GlueによるELT/ETL
    コールドデータ(利用頻度の低いデータ)等をRedshift Spectrumで利用しやすいようにデータフォマットの変換などを実施しています。

LAでのELT/ETL処理

ワークフローエンジン

LAではETLのためのワークフローエンジンとしてAirflowを利用しスケジュール実行しています。
ちょうど導入を検討したタイミングで用途に合っていたから導入したという消極的な採用理由ですが、1.8にアップグレードしてからは1.7で不満があった以下の2点の不満も解消され運用しやすくなりました。

  1. DAG単位でCatchup設定が可能になり、Taskが詰まった際に過去のTaskを実行する必要がなくなった。
  2. Celery Backendを利用した際に、頻繁に実行されるヘルスチェック系のタスクを登録した際のCPU負荷が下がった。

Redshiftで実行するSQLバッチによるELT

LA運用初期からあるELT処理で、Redshiftに格納済みのデータから利用者向けのテーブルの生成や、分析者向けのテーブルの生成を主に行っています。
ここで生成している主要なデータは以下のようなものになります。

  1. RAWデータを加工し利用しやすくしたサマリーテーブル。
  2. 特定期間のみのデータを保持し、インターリーブソートキーを付けた利用者向けテーブル。
  3. 分析者向けのテーブル。

構成としては以下のようになります。

f:id:livesense-analytics:20180110104007p:plain:w400

EMR Sparkを利用したETL

主に以下の2点の理由で昨年ぐらいから、主要なサービスについてはSQLバッチから置き換えを行っています。

  1. ストリーム+SQLでデータを処理しづらいケースに対応する。
  2. 過去データのデータ定義の変更をRAWデータから再生成しやすくする。

将来的には 、サマリーテーブルのデータ定義の修正などで全データを再生成する際にRedshift上で再生成する運用コストが高い*1のでその辺りもEMRで処理をしたいと考えています。 ただし、SQLバッチによるELT処理も引き続き利用していくつもりです。

構成としては以下のようになります。

f:id:livesense-analytics:20180110143932p:plain

現状はストリームで処理は行っておらず定期バッチ処理が中心なためEMRは常時起動するのではなく、必要な時にAiflow経由で起動・停止させるという運用をしています。

Glueバッチ

昨年リリースされたGlueについても、Redshift Spectrumを利用してRedshiftのディスクを節約するために利用しています。
利用用途として以下の2つで、EMRを立てるほどでもないけどというような処理で利用しています。

  1. 終了してしまったサービスのデータをRedshiftからS3に移しParquetに変換を行う。
  2. コールドデータや、Redshiftに投入するにはデータ量が多すぎるものについては最初からRedshift Spectrumで利用できるようにParquetに変換を行いS3に保管する。

EMRの処理を置き換えることは現時点では想定しておらず、上記のようなデータフォーマットの変換という限定的な使い方をしています。

f:id:livesense-analytics:20180110144001p:plain:w500

上記の構成は利用用途の2番目の用途で利用しており、GlueのTrigger機能は利用せずにAirflowでGlue Jobの実行、異常検知を行っています。
1番目の用途については割と雑にGlue Scriptを書いて手動で実行しています。

まとめ

今回はLivesense AnalyticでのETL / ELT処理について紹介しました。
リブセンスのデータ分析基盤の全貌の資料公開から比べると比較的変更があり、紹介しきれていなかった部分も紹介できたかと思います。
今回紹介したELT / ELT処理や、前回紹介したエンコードタイプの変更の他にも色々な施策に取り組んでおります。それらについても別の機会にご紹介できればと思います

*1:ディスクの空き状況によっては別Redshiftクラスタを立ち上げ処理する必要がでたりするので、費用面よりも運用の手間という意味合いが強いです。

リブセンスのデータ専門組織のこれまでとこれから

はじめに

こんにちは。テクノロジカルマーケティング部の谷村です。

テクノロジカルマーケティング部(以下、テクマ部)は、 リブセンス内のデータ分析や機械学習、そのための基盤開発までデータまわりを手広くやっている部門です。

リブセンスはHR領域や不動産領域を中心として複数のメディアを運営しています。組織的にはメディア毎に事業部を編成する、いわゆる事業部制を採用しています。 メディア毎の意思決定スピードや戦略の柔軟性、等々が事業部制のメリットかと思いますが、テクマ部についてはこれら各事業を支援する形で横串の横断組織として編成されています。 横断の組織としているのは所属メンバーの専門性を高める目的や、全社状況にあわせたアサインメントを行う目的などがあります。 今回は、リブセンスがテクマ部を中心としてどのようにデータと向き合っているかをご紹介させていただきます。

リブセンスのデータ分析のこれまで

私がリブセンスに入社したのが2013年でしたが、リブセンスでは当時からデータドリブンな意思決定を大事にする文化がありました。 例えば、営業でもSQLを使うという文化、入社初日に目にして驚いたのを今も覚えています(営業さんまで、社員全員がSQLを使う 「越境型組織」 ができるまでの3+1のポイント)。

そんな中でここ数年のデータ関連のテクマ部での取り組みは、大きな流れとして下記のように進展してきました。

時期 概要 詳細・データ
~ 2014 アドホック分析中心 データは主に会員マスタや売上等のメディアのデータベースを中心に利用。各施策や広告効果などを多変量解析や時系列等で分析。
~ 2016 自社分析基盤の開発
機械学習のサービスへの適用
Web上での行動ログとメディアのデータベースを一元管理するようにDWHを整えたことで、全てのKPIを連続的に分析することが可能に。さらにユーザーに紐付いた行動ログが蓄積されたことで、レコメンドアルゴリズムや予測モデルのメディアへの組み込みが可能に。
2017 ~ 機械学習のインフラ整備
UX専門のグループ立ち上げ
データについてはアクセスログに限らず外部データを含めた取り込みを強化。各メディア毎にバラバラに開発してきた機械学習システムについて、基盤の統一を開始。

2015年には分析基盤となるDWHも整ったことで、個別の事業での機械学習の活用も進み、収益面でも大きな成果を残すことが出来ました。 そして昨年からは次のデータ活用フェーズを念頭に動きを開始しています。こちらの背景については後ほど説明させてください。

データに関わるメンバーの主な役割

弊社では各人を細かく職種で分けることはやっていませんが、大まかに意識している役割は下記のような形です。

役割 業務内容
データエンジニア 分析基盤開発・運用、データ収集、大量データの操作
機械学習基盤エンジニア 機械学習システム(インフラ・コード)の開発を担当
アルゴリズム開発者 アルゴリズムの実装、検証
データアナリスト データサイエンスを活用したプロダクト改善の企画立案
データベースマーケティングの推進
UXリサーチャー/UXデザイナー UXデザインの手法を駆使してサービスの課題・潜在ニーズの発見・解決方法の検討から実行を支援

正直なところ、呼称は定まっているわけではなく、役割も社内外の状況にあわせて年々変化しています。 例えば、機械学習基盤エンジニアとアルゴリズム開発者については以前は区別せずに「機械学習エンジニア」としていました。

各自のコアスキルの向上や、プロジェクトチームの編成を考える際にこれらの役割を意識していますが、現実には複数の役割を兼ねて動いているメンバーが多くなっています。

現在のテクマ部内のチーム構成

テクマ部では今年から3グループ体制で動いています。

グループ 所属メンバー ミッション
Data Marketing データアナリスト
UXリサーチャー/UXデザイナー
データからユーザー価値への転換の設計プロセスを担うミッション
Data Platform データエンジニア
機械学習基盤エンジニア
アルゴリズム開発者
データからユーザー価値への転換する仕組み(システム)を作る
Infra Structure インフラエンジニア 全サービスのインフラに責任を持つ

グループ化の背景については後述しますが、ここでテクマ部の中に社内の各サービスのインフラを担当するインフラエンジニアのグループを配置しているのが弊社の特徴です。 データに関わらずインフラ全般を担当しているため、データに関わる役割での紹介は省略しましたが、部内にインフラエンジニアがいることでシステム運用経験豊富なメンバーから助言、ノウハウ共有を行ってもらえると同時に、開発時のハブの役割も果たしてもらっています。

これからのこと

リブセンスでもここ数年で、既存の仕組みをより効率的にまわすことに対してデータ分析や機械学習が有効に機能するようになっています。 これからもそのようなアプローチは継続していきますが、一方でそのような効率化のアプローチは限界を抱えていると考えています。 特に大きいのが、データそのものに変化を生み出せないことです。 人工知能という言葉も流行し、各社とも取り組みを強化されていますが、大抵の場合、ユーザーへの恩恵が大きいのはアルゴリズムの洗練よりも、どのようなデータを収集するかということと、収集したデータからどのような価値への転換を行ってユーザーに提供するかということだと思います。 既存のデータを使って効率化のためにデータを使っているだけだと、この肝心のデータが成長しません。

そこで次のチャレンジとしては、データの使い方だけではなく、データそのものから考えていくフェーズに移りたいと考えています。 取得するデータの設計であったり、どうやったらそのデータを蓄積し続けることが出来るかというデータが集まる仕組みの設計、 そしてデータから価値への変換の設計、ひいてはサービス価値の定義、そしてそれを実行できるアルゴリズムの実装と、試行錯誤しながら取り組んでいきたいと考えています。下図でいうと、緑の吹き出し部分以外への注力をより高めるイメージです。 会社としても弊社代表の村上がリアルデータエンジニアリングという呼び声で推進している取り組みになりますが、事業部門と協力して進めていきます。

f:id:livesense-analytics:20180109095609p:plain

上記のような背景で、テクマ部でも「データから価値への変換の設計」を行っていくことを意識してUXDを専門に取り組むグループを昨年組織しました。 さらに今年はUXとアナリストを1つのグループに配置しData Marketingグループとしました。性格の異なる分野を1つにまとめた実験的な試みですが、事業部門と共同で新しいサービスの種を見つけていけることを期待しています。

また、コンセプトが決まればデータの性質や用途によって適切なアリゴリズムを開発する必要が出てきます。しかもなるべく高速に。 そこで、これまではアドホックに各事業への機械学習を導入してきましたが、 今後はアルゴリズムの開発・検証を高速でまわせるように分析基盤に加えて機械学習の基盤の開発にも力をいれるべく、Data Platformグループを編成しました。

さいごに

少しでもリブセンス(特にテクマ部)に興味を持っていただいた方向けに追加情報です。

仕事の進め方

役割によって異なりますが、データエンジニアのように運用すべきシステムを持っている場合はチームでタスクを分割して働いています。データアナリストやUXリサーチャーはプロジェクトチームを組んで動くことが多いです。

分析ツールや環境は、タスクの特性と各人の好みでわりと自由度高くやっています。Pythonを使うメンバーもいればRを使うメンバーもいます。最近ではJuliaも使い始めたりしています。 分析基盤のDWHはAWS Redshiftを中心に構成していますが、機械学習のインフラにはGCPを選んでいます。これらについては、今後のブログでも紹介していく予定です。

日常の学習

新しい技術もどんどん出てきている領域なので、業務を通じて学習しています。リブセンス自体が勉強会が盛んな会社なのですが、テクマ内でも毎週のように勉強会を行っています。 論文輪読会や、統計モデリングの勉強会等の分析・機械学習系の勉強をはじめ、Amazon Redshiftなどそれぞれの業務にあわせた勉強会を開催しています。隣に議論出来るメンバーがいるのは楽しいです。

募集について

リブセンスでは一緒に働いていただける仲間を募集しています。 変化の激しいデータ周辺の領域ですが、それにあわせてリブセンスでは組織もミッションも変化していますので、変化を楽しみたい方にはおすすめです。

少しでも興味を持っていただけた方は弊社採用サイトから、是非エントリーお願いします!

将棋盤を画像認識する

Analytics チームで転職会議のレコメンドを開発している @na_o_ys です。今回は業務のことは忘れて、趣味の将棋の話をしたいと思います。

この数年で将棋の学習環境はずいぶんリッチになりました。通勤電車では将棋アプリのネット対局をして、自宅ではオープンソースの強豪 AI を使って棋譜検討し、日々将棋を楽しんでいます。
一方で、顔を突き合わせて盤と駒を使って指す対局が一番楽しいのは変わりがありません。 リアルの対局を AI で検討するために、盤面を手軽にコンピュータに入力したい というのが今回のテーマの発端です。

TL;DR

f:id:na_o_s:20171220134558p:plain

盤上の駒を高い精度で推定することができました。

処理は大きく 2 つのステップからなります。

  1. 盤面の正規化
    • 盤面の四隅の座標を特定し、元画像から正規化画像への射影変換を得る
  2. マス目毎の内容を推定する
    • マス目毎に画像を切り出し、駒の有無・種類を推定する

ちなみに上記画像は私と同僚の将棋で、現局面は後手番です。後手玉は詰めろ飛車取りですが次の一手を考えてみてください。

ステップ 1. 盤面の正規化

正規化画像への射影変換を得るには、盤面の四隅の座標を特定できれば十分です。画像処理の要素技術としてエッヂ検出、輪郭検出、線分検出など多様な特徴抽出手法があります。今回は、 輪郭検出 + 線分検出 + 焼きなまし法 を組み合わせることで、高い精度で四隅の座標を特定することができました。

輪郭検出

将棋盤の大まかな位置を特定するために 輪郭検出 を利用します。

f:id:na_o_s:20171220182735p:plain

大まかな位置は特定できますが、マス目とぴったり一致しません。輪郭検出では罫線だけでなく将棋盤の端や机の角も検出してしまうためです。ここで特定した大まかな座標を出発点として、次に紹介する焼きなまし法でイテラティブに精度を高めていきます。

なお輪郭検出の結果は多角形ではなく曲線のため、直接四隅の座標は得られません。四角形の頂点座標を得るために以下の処理を行っています。

1. 輪郭検出
2. 凸包で近似
3. 凸包の頂点から四隅の座標候補 (最も正方形に近いもの) を選択

線分検出 + 焼きなまし法

焼きなまし法 はパラメータ (ここでは四隅の座標) を振動させながら、目的関数が最小となる値を探索する手法です。

以下の要件を満たす目的関数を設計する必要があります。

目的関数 f:
    f(四隅の座標候補) = 真の座標にどれだけ近いか

まず 線分検出 により将棋盤の罫線を大雑把に抽出します。これを 罫線画像 と呼ぶことにします。

f:id:na_o_s:20171220182805p:plain

目的関数の中で、与えられた四隅の座標をもとに マスク画像 を生成します。マスク画像は四隅の座標から仮定される罫線の位置に近ければ近いほど明るく、罫線から遠いと暗くなるようにします。

f:id:na_o_s:20171220182915p:plain

このマスク画像を罫線画像に重ねると、罫線の重なり具合が大きければ多くの罫線が透過されるのに対して、重なり具合が小さければ一部しか透過されないことが想像できます。 すなわち マスク画像と罫線画像の内積 が目的関数として利用できるということです。

この目的関数を利用して焼きなまし法を行うことで、輪郭検出結果からぴったりした座標が得られます。

f:id:na_o_s:20171220182927p:plain

四隅の座標が得られればマス目毎の画像を得るのは簡単です。

f:id:na_o_s:20171220183118p:plain

ステップ 2. マス目の内容推定

ディープラーニングします。

学習データ

f:id:na_o_s:20171220125829p:plain

学習データはインターネットで集めた約 400 枚の駒画像 (手作業でひと駒ずつトリミングしたのですが、この作業が一番大変でした) と、iPhone のカメラで撮影した盤面 10 枚です。盤面はテレビ画面やネット対局のキャプチャを含みます。

ステップ 1 で作成した盤面正規化プログラムを利用して事前にマス目毎の画像に分割し、手動で駒の有無・種類をラベリングしました。

ネットワーク

入力は単一のマスの画像で、画素数は 64 * 64 です。出力はマスの内容で、空 or 駒の種類 (先手後手それぞれ 15 種類) を判別する 31 のラベルです。ネットワークは畳み込み & プーリング層が 3 層と全結合層 3 層の構成です。

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), input_shape=input_shape))
model.add(BatchNormalization())
model.add(PReLU())
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Conv2D(64, kernel_size=(3, 3), input_shape=input_shape))
model.add(BatchNormalization())
model.add(PReLU())
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Conv2D(128, kernel_size=(3, 3), input_shape=input_shape))
model.add(BatchNormalization())
model.add(PReLU())
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Flatten())
model.add(Dense(1024))
model.add(BatchNormalization())
model.add(PReLU())
model.add(Dropout(0.25))
model.add(Dense(1024))
model.add(BatchNormalization())
model.add(PReLU())
model.add(Dropout(0.5))
model.add(Dense(NUM_CLASSES, activation='softmax'))

ネットワーク構成は こちら の漢字認識率 98.5 % のものを参考にさせて頂きました。

手書き漢字データによるファインチューニング

用意した学習データは駒の書体の種類が少ない (30種類程度) ため、局所特徴量の抽出を担う畳み込み層の学習に適しません。そこで、15 万文字の手書き漢字・ひらがなからなる 手書教育漢字データベース ETL8 を利用して、畳み込み層のパラメータを事前に学習しました。

学習・推定に使う将棋駒データはノイズが多い一方で、ETL8 は非常にノイズが少ないクリアな画像データです。そのため、そのまま学習してしまうと畳み込み層がノイズに対応できず将棋駒をうまく識別できませんでした。そこで、ETL8 に以下のノイズを乗せて学習を行いました。

  • ガウスノイズ (強)
  • ガウスノイズ (弱)
  • 白黒反転
  • 回転
  • 拡大縮小

その後畳み込み層を固定し、将棋駒データを使って全結合層のみを学習させます。これにより、高い精度の特徴抽出器 (畳み込み層) をそのまま将棋駒の分類に利用できます。

学習結果

Epoch 120/120
loss: 0.1981 - acc: 0.9379 - val_loss: 0.3054 - val_acc: 0.9245

92 % 程度の精度で駒の分類ができました。

今後の課題

いくつか大きな課題点を挙げます。一つは盤面検出の焼きなまし法に 10 秒程度の時間がかかることです。マスク画像を用いた目的関数は微分可能でないため確率的勾配降下法が利用できません。うまく微分を計算できる目的関数を設計すれば処理時間は大きく改善されるはずです。他にも、罫線の周期性を利用してスマートに盤面検出できないかなど考えましたが、具体的な手法は思いつきませんでした。

もう一つは、成銀・成桂・成香の分類問題です。将棋の駒には様々な書体がありますが、ものによっては人間でもそれらの識別が困難な場合があります。

f:id:na_o_s:20171220142832p:plain

特長が大きかったり一度学習した書体であれば間違えませんが、未知の書体について充分な精度を保証するのはまだ難しいようです。

最後に最も大きな課題は、持ち駒の検出ができないことです。画像によって持ち駒の位置や角度が異なるため、盤上の駒の推定とは違った要素技術が必要になります。

まとめ

将棋盤の画像から、盤面の状態を 92 % 程度の精度で得ることができました。盤面の正規化には輪郭検出・線分検出・焼きなまし法を使い、駒の推定にはディープラーニングを利用しました。普段業務で画像処理やディープラーニングを扱っていないため、今回の手法には冗長な手順や簡単な見落としもあるかと思います。お気づきの点やアイデアがあれば是非コメント頂ければ幸いです。

今回のプログラムは GitHub で参照頂けます。

github.com

おまけ

Mac で動く Electron 製の棋譜検討アプリ を絶賛開発中です。2018 年初頭には公開したいなあ。