LIVESENSE Data Analytics Blog

リブセンスのデータ分析、機械学習、分析基盤に関する取り組みをご紹介するブログです。

LivesenseDataAnalyticsBlog

MENU

BigData-JAWS勉強会でAirflowのことを話してきました

データプラットフォームチームのよしたけです。

さて先日のBigData-JAWS 勉強会 #12にて、「リブセンスのデータ分析基盤を支えるRedshiftとAirflow」というタイトルで発表させていただきました。

Airflowについては、このブログでも Airflow を用いたデータフロー分散処理 でご紹介させていただきましたが、今回、弊社での活用事例を交えてお話させていただきました。

発表後のQAや懇親会で、Airflowの導入を検討しているが実際どういうところが大変なのか? とか運用上のつらみ、とかそういうところを気にされている方が多かった印象でした。

AirflowはPythonでデータフローを記述するため、柔軟に何でもできるという強みがあり、反面、やりすぎると改修が大変になり運用しづらくなるデメリットもあるように思います。 私たちはDAG側にはロジックを盛り込まずRake側でロジックを組み上げていくような運用をしているため、比較的変更、修正はカジュアルに回せていると思いますが、 反面、Airflowの豊富なオペレータや機能が使いこなせていないというジレンマもあったりします。 このあたりの加減は今後も改善を進めていきたいなと思っています。

競争優位性構築のための人間中心機械学習〜CVRからUXへ〜 

テクノロジカルマーケティング部データマーケティンググループにてデータサイエンティスト兼UXアーキテクトをしている新保と申します。普段は機械学習を中心としたデータ活用の推進や新規機能のユーザ体験の設計をしています。ここ1年程リブセンスではサービスの戦略レイヤーや主要機能と結合度の高い領域に対して機械学習を適用していくことに挑戦しており、今回はそれらを実際にどのように行っているかをご紹介したいと思います。

パッチ型機械学習の成功体験とその限界

 本題に入る前にリブセンスのデータ活用の歴史について少しお話しします。以前に別のメンバーが投稿した記事に詳しい説明がありますがリブセンスでは2014年にデータ活用の専門組織を立ち上げてから現在に至るまで機械学習のビジネス活用を継続的に行っています。初期の頃は既存サービスの枠組みの中でサービスとの結合度が出来る限り低く、かつ利益インパクトが大きい領域に機械学習を適用することからスタートし、比較的早い段階で成功を収めています。2015年にはレコメンドシステムやアトリビューションモデルなどが主要サービスに本格導入され、当時の全社売上50億円規模に対して安定的に年間1〜2億円程度の利益インパクトを残せるようになりました。一方で成功を収めたことでより高次の新しい課題が見え始めたのもこの時期です。課題の1つは機械学習の開発環境の整備です。こちらについては過去の記事に詳しく解説されていますので

マルチコンテナ構成による機械学習アルゴリズムとアプリケーションの疎結合化 - LIVESENSE Data Analytics Blog

などを御覧ください。

もう1つの課題はデータ活用(機械学習含む)によってCVR向上を通して利益貢献に結びけることに成功した一方で、既存サービスにパッチを当てるような機械学習の使い方ではその成果がサービスの競争優位性構築に結びつけるまでには至らなかったことです。そこで利益貢献を維持しながらデータ活用によってCVR向上よりも大きな成果、サービス自体の競争優位性構築に貢献することが2016年以降のグループのテーマになっていきます。これは現時点で100のサービスをを105にするためではなく中長期的に200、300に延ばしていくために機械学習を始めとするデータ活用を進めて行くということです。

パッチからデザインへ

この方針のもと2016年以降はアルゴリズムの洗練化
を継続しつつ、ユーザ体験の設計段階まで踏み込むプロジェクトを継続的に立ち上げています。これは機械学習がサービスと切り離された機能として提供されるのではなく、サービスの中核を担う行動デザインとインタラクションデザインのレイヤーから関わることを意味します。少し前にGoogle Design blogの記事「Humuman-centerd Machine learning と類似した取組みをリブセンスにおいても約1年前から行っており、次のようなプロセスでプロジェクトを回しています。

f:id:livesense-analytics:20180319193618p:plain

このプロセスのポイントは「既存のデータから機械学習で出来ることを考える」から「コンセプトを先に決めて必要なデータをすべて取る」という方式に変更した点です。Webサービスにおいて機械学習はあくまでユーザへの価値提供を実現するための強力な選択肢の1つであると考え、機械学習ありきではなくユーザへの価値提供を最上位におき、提供価値の具体化と行動デザインのフェーズで機械学習の活用を検討している点で人間中心な機械学習と言えるでしょう。

人間中心の機械学習の実施プロセス

 ここからはフェーズ①〜②においてリブセンスで実際に行っている具体的なプロセスをご紹介します。基本的にはISO9241-210にて定義されているHCDのプロセスをベースにしています。まだまだ試行錯誤の日々ですが現在我々のチームでは次のステップで進めています。

  1. ターゲットセグメントの決定
    まず最初にターゲットになりうるユーザを対象に幅広くデプスインタビュー行い、定性分析によるユーザセグメンテーションを行います。セグメントの軸には様々な方法がありますが我々のチームでは行動パターンの類似性で切り分けた後にメインターゲットとするセグメントを決定しています。

  2. ユーザの価値抽出
    ターゲットセグメントに属するユーザの価値感の抽出分析を行います。デプスインタビューで得た発言録からユーザの行為と意図を抽出して分析を行うことでユーザのもつ本質的欲求を分析します。手法はいくつかありますが、時間がないときは上位下位関係分析、余裕があるときはKJ法やKA法などを用いることが多いです。その後As-isのカスタマージャーニーマップを作成して現在のユーザ行動やタッチポイントごとの課題を可視化します。期間と予算の関係上分析のデータソースとしてはstep1で行ったデプスインタビューの結果を再利用することが多いです。

  3. コンセプト設計とユーザ体験の可視化
    提供するユーザ体験のコアとなるコンセプトをプロジェクトメンバー全員で出し合います。この段階では提供価値にフォーカスしたいため一旦技術的なことは忘れます。次にコンセプトアイディアを「ユーザへの提供価値の大きさ」と「機械学習を使うことによる付加価値大きさ」の二次元マップ上に配置します。ここで高い提供価値を与えるアイディアの機械学習依存度が高い場合はデータサイエンティストとや機械学習エンジニアと実現可能性について検討を行います。ただしこの時点での確認はアイディアが荒唐無稽すぎたり、AIがなんとかするといった抽象的すぎるものになってないかなどの確認であって、具体的なアルゴリズム検討には踏み込みません。また機械学習依存度の低いアイディアの提供価値が一番大きいと判断された場合や機械学習を使うことにより効果が小さい場合は機械学習を使わない選択をすることも多いです。 仮コンセプト確定後は構造化シナリオ法を用いてユーザのサイト上における行動シナリオを可視化します。

    f:id:livesense-analytics:20180320103257p:plain

  4. コンセプト評価とプロトタイピング
    ユーザインタビューによってコンセプト評価を行い、フィードバックを反映してコンセプトを修正していきます。例えば昨年度実施した転職会議の新機能のコンセプト設計では計3回のコンセプト修正と評価を行っています。定性だけで確信が持てない場合はコンセプトが固まった時点で定量アンケート調査で量的な担保を取る場合も多いです。また、プロトタイピングツールを使用してコンセプトをUIに落とし込んで実際にユーザ使ってもらった上での感想も聞いています。プロトの段階でアルゴリズムを開発するとコストが掛かりすぎるためレコメンドシステムであれば予めユーザに好みを聞いておいて人手で選んだ求人を見せたり、チャットボットであればUI上はボットに見せて裏側では人がやりとするようにして極力アルゴリズムを開発せずに擬似的な体験を提供してユーザ評価を行うようにします。   

  5. 開発プロジェクトへの展開
    サービスリリースに向けてデザインプロセスに関わったメンバーを中心に開発プロジェクトを立ち上げます。この段階で機械学習エンジニアをプロジェクトメンバーに本格的にアサインして具体的なアルゴリズムの検討を開始します。

このプロセスでは機械学習の利用を前提とせず提供価値に着目していますが、一方で最初から機械学習の活用アイディアがあるケースもよくあるかと思います。その場合は調査からスタートすると時間がかかりすぎるのでstep1〜step3をプラグマティックペルソナで代替して高速にアイディアの検証とピボットを繰り返していくことで効率的にプロジェクトを進めていく方法をとるのが良いのではないかと考えています。

以上がリブセンスで行っている機械学習を活用したおおまかな体験設計のプロセスです。直近の事例では就活会議での就活生と企業のマッチング機能をこのプロセスを採用して設計しています。アルゴリズム改善のみでマッチング精度を上げるのではなく、就活会議に訪れたユーザが適性な企業とのマッチングに至るまでに必要なユーザの体験設計を最初に行い、ユーザ体験がユーザに刺さるかどうかを事前検証後に必要な機能・アルゴリズム・データを収集・開発しています。就活会議における具体的なサービス設計の事例についてはまた別の機会にご紹介できればと思います。

技術×マーケでメールマーケティングを進めていくぞ!という気持ち

はじめに

こんにちは。テクノロジカルマーケティング部でアナリストをしております、高橋です。

さて、今回のテーマはメールマーケティングです。とその前に、我々の部署名の由来についてご紹介します。「テクノロジカルマーケティング」には、単なる技術開発にとどまることなく技術をビジネス価値に転換する、あるいは技術のチカラでお客様を幸せにするサービスを創る、そんな想いが込められております。

このような思想のもと、メールマガジンの在り方や運用を見直し、テクノロジーとマーケティングの両面から変革していこうとする取り組みが、今年の1月にスタートしました。本記事ではその概要と、何を目指していきたいかについてお話しできればと思います。これまで本ブログでも紹介してきました、LivesenseAnalytics(データウェアハウス、以降LA)やLivesenseBrain(機械学習プラットフォーム、以降LB)の実践事例となります。

背景

舞台となりますのは、弊社が運用している中途採用メディアです。求人情報を載せたメールを求職者様に配信し、開封、URLクリックを経て、応募までいけば無事コンバージョンとなります。同メディアにおけるメールマーケティングの歴史は比較的古く、以前から協調フィルタリングやコンテンツベースのレコメンドアルゴリズムも採用されていました。

ただ、こうしたアルゴリズムをもとにしたレコメンドメールは、既存のメールとは独立にラインナップに追加されてきており、既存メールの中には全配信に近いメールも存在していました。またレコメンドメール自体も、リリース後数年を経過して十分なメンテナンスが出来ておらず、当初ほどのパフォーマンスが出なくなっていました。他にも同じような形で個々にメールマーケティングのテコ入れが行われてきた結果として、メールの送りすぎや、それに起因すると思われるオプトアウトの増加が発生していたというのがプロジェクト発足時の状況です。

新しいメールマーケティングの目指す姿

そこで、この取り組みでどういう状態にしていきたいかを検討し、以下の3点を目指す姿として定義しました。

1) 適切な求人

求人数はそれほど潤沢にあるわけでもないので、モデルの精度よりもルールベースに近い形でシンプルかつ柔軟に調整が効くロジックを採用。職種や勤務地、その他の条件(社風や給与など)ごとのマッチングを実現でき、メールのテーマに合わせてそれぞれのマッチングの強弱を変えることで、希望条件に合わせつつバリエーションを確保できる状態。どのメールにも何らかのマッチングロジックを適用し、全体の求人紹介精度を高い水準に保つ。

2) 適切な頻度

アクセスログや過去のメール反応をもとに、メールでのアプローチが有効と判断されるお客様には高頻度に、そうでない方にはあまり出しすぎないようにしながら、全体の配信数をコントロール可能。主力となるメールは優先的に高頻度に配信され、一方でお客様を飽きさせないよう様々な切り口から求人を紹介できるバリエーション豊かな構成。オプトアウトが低減され配信対象者数が増えることで、じわりじわりとCV期待値が上がっていく状態。

3) 適切な手段

顧客データの細かな分析結果(特に求人案件を閲覧するときの行動を中心に)に基づき、メールの件名・本文、配信する時間帯、文体やデザインのトンマナ、求人の数や掲載項目(属性)などをお客様ごとに変えていく、いわゆるパーソナライズがなされた状態。これらについては常にA/Bテストが走り、KPIを日々モニタリングしながら改善PDCAが並列超高速で回っている状態。パフォーマンスを上げるためのノウハウが蓄積され、将来的に他のメディアでの展開を見据える。

これらを仕組みとして用意することで、冒頭に述べたような技術とマーケの両輪でメール価値の最大化を実現できると考えています。メールマーケティング自体は比較的シンプルなマネタイズ構造ですので、ここをチューニングすることで早期のビジネスインパクトが見込めると判断しました。

取り組み内容

まだ一部実装が追いついていない箇所も含みますし、今後変更になる可能性も大いにあるのですが、少しだけ中身をお見せしたいと思います。

PLAN

週に一回の会議で、新規メールのアイデア出しをしたり、既存メールの改善点やA/Bテストテーマの検討を行います。新しいメールは最低でも週に一本投入するようにしていて、良好な結果が出ればそのままラインナップ入りとなります。改善やテストはスピードと数を重視していて、思いついたら即実行のスタンスで動いています。メディアの価値・信頼を毀損せず、なおかつ尖った内容の文面を書くというのは意外と大変なのですが、本文を少し変えただけでも結果が大きく変わるのがメールの面白いところです。現在は約60種類のメールが稼働しており、この数はさらに増えていく予定です。

DO

メール配信のキモである、リスト作成(誰にどの案件を紹介するか)はLBのインフラ上で実行されます。テーマに沿って求職者や求人案件をLAから抽出し、いくつかのマッチングやレスポンス予測を経て、リストが出力されるこの一連の処理は現在のところR言語で記述しています(試験フェーズのため)。使っているのはランダムフォレストやコサイン類似度などで、機械学習の初学者にも馴染みのあるものばかりです。これらの処理は全メールで共通のものになっていて、個々のマッチングを使うかどうかと、スコアの閾値設定を各メールで自由に設定できるようになっています。また、マッチングの際に算出されたスコアなどはLAに連携され、すぐに検証することが可能です。

CHECK

週に一回の会議で、データを見ながら翌週のラインナップを編成します。顧客セグメントごとに、曜日や時間帯などある程度決められた配信枠を設けてあり、そこにどのメールを当てはめていくかを検討します。ちょうど某アイドルグループの総選挙のような形で、神セブンと呼ばれるパフォーマンスTOP7メールがゴールデンタイムでの配信を勝ち取ります。改善の結果、頻繁に順位が入れ替わるので、総選挙は毎週行っています。正直を言うと、そのたびに設定を変えるのは面倒なところもあるのですが、飽きずに楽しむ秘訣と割り切って対応しています。

ラインナップ編成のイメージ
ラインナップ編成のイメージ

おわりに ~今後の展望~

始まったばかりの取り組みで今後の展望を語るのは時期尚早かもしれませんが、いくつか構想はあります。

まずは他のメディアへの横展開。テクマのような全社横断組織で運用している意義がここにあります。次に他のチャネルへの縦展開。メール以外でもお客様にアプローチできる手段は色々あるかと思いますので、それらを併用しながら相乗効果を狙っていくのが一つの目標です。また、営業との連携。メールマーケティングの取り組み自体をクライアントに興味を持ってもらうことで、求人案件を掲載したいと思っていただける状態を作りたい。そして最後に、利用してくださるお客様の幸せな転職の機会を少しでも増やせるようになればと思いながら、これから頑張っていきます。

それでは続きはまた次回に。最後までお読みいただき、ありがとうございました。

Developers Summit 2018 発表資料

こんにちは。テクノロジカルマーケティング部の谷村です。

本日、私と田中の2名で デブサミ2018において、データを活かす組織の作りかた、事業に寄り添う分析・機械学習基盤の育てかたというタイトルでお話する機会をいただきました。

早速ですが、発表資料についてそのまま公開させていただいています。

前半:Livesenseの場合のデータを活かす組織の作りかた(谷村)

後半:事業に寄り添うデータ基盤の育て方(田中)

Airflow を用いたデータフロー分散処理

こんにちは。テクノロジカルマーケティング部 データプラットフォームチームの村上です。

弊社各サービスのデータ分析基盤であるLivesense Analytics(以降LA)の開発、運用を行っています。

本日は弊社データプラットフォームでも活用しているフロー管理ツールAirflowについて、分散処理の概要や最近の更新についてご紹介します。

Airflowについて

Airflowとは

github.com Airflowはデータ処理フローを管理するツールです。元々はAirbnb社が開発して公開したソフトウェアです。その後Apacheソフトウェア財団のサポートを受けて現在はApache Airflowという正式名称になっています(本ブログでは以下Airflowと記述します)。ライセンスはApache License2.0です。本体コードはpythonで作られています。

2018年2月現在、正式リリースされているバージョンは1.9です。また、本ドキュメントのコードや説明はAirflow1.9をベースとします。

LA へのAirflow導入の背景

LAの主データベースにはRedshiftを採用しています。 LA 上のデータは弊社各メディアのフロントサイドのWebアクセスデータとサーバーサイドの顧客データ、そしてそれらをサマライズしたデータ等で構成されています。 サーバーサイドデータについては個人情報を匿名化しRedshiftに投入するだけで良いのですが、Webアクセスデータについては広告を制御したり詳細な顧客分析をする用途に用いられるので、生データのままではなく、広告チャンネルやセッションでの統合やサマライズなどを施した加工済みデータが求められます。 このような分析サイドの要求に答えるため、アクセスログデータの加工処理バッチを複数開発してきましたが、これらをcronで運用していたため管理が難しい状況になっていました。

このような背景から、データフロー処理ツールAirflowの導入を行いました。

Airflowは分散処理そのものを自前で実装せずに外部のフレームワークを利用しています。外部のフレームワークを利用することでシステムの柔軟な構成が可能となっている一方、Airflowを構成するサービスの構成、役割分担を理解していないと、効果的な設定を行うことが出来ません。

本稿ではAirflowを構成する各サービスの役割やDAGの書き方を中心に導入方法を解説します。

Airflow+CeleryExecutorで分散処理を行う

以下、弊社でも現在利用しているCeleryExecutorを使った分散処理のお話をしたいと思います。

インストールと設定の概略、常駐プロセスや分散処理用のパラーメータの設定等について触れます

そもそもCeleryとは

Celeryは複数のノードで分散して非同期でタスクキュー/ジョブキュー処理を行うためのフレームワークです。

Celeryはノード間のメッセージのやり取りにRabbitMQやRedis等のミドルウェアを使用します。Celeryではこれをbrokerと呼んでいます。一旦brokerを設置すれば、後は処理ノードを増やすだけで簡単にタスクの処理分散を実現できるのがCeleryの利点です。

CeleryExecutorはAirflowのクラスです。CeleryExecutorを使うと、Celeryを使ってAirflowタスクの分散処理を行えるようになります。

構成

f:id:livesense-analytics:20180201123428p:plain

CeleryExecutorを利用したAirflowサーバーをセットアップしてみます。

webserverとscheduler、workerを一つのサーバーにインストールします。 *1

systemdが動いているlinuxにインストールすることを想定した手順です。*2

また、python3, pip が使える前提です。

インストール

$ mkdir ~/airflow

$ export AIRFLOW_HOME=~/airflow

$ pip install apache-airflow

$ pip install "apache-airflow[mysql, celery]"

$ airflow initdb

pip install airflow と打ってしまうと、パッケージ名変更(Airflow1.8.1)以前のバージョンがインストールされますので注意して下さい。

基本設定

予め以下のデータベース類が用意されている前提です

  • airflow metadata database(DAGスケジュールやDAG Runの情報が入っているデータベース )
    • MySQLを使用
  • celery backend database (Celeryバックエンド用データベース MySQL, PostgreSQL, oracle が使える)
    • MySQLを使用
  • celery message broker (タスクキューイング用 redis, RabbitMQ等が使える)
    • redisを使用

airflow initdb を行うと、設定ファイルairflow.cfg のテンプレートが生成されますので、環境に応じて修正を加えてゆきます。

airflow.cfg

[core]

airflow_home = /path/to/airflow

executor = CeleryExecutor

sql_alchemy_conn = mysql://[metadata databaseのエンドポイント]

...

[celery]

celery_result_backend = db+mysql://[celery backend databaseのエンドポイント]

broker_url = redis://celery brokerのエンドポイント

...

設定したら、airflow initdbを行って、メタデータを初期化します。

$ airflow initdb

コンソールからwebserverを実行して、ブラウザからhttp://localhost:8080 にアクセスし、設定がうまくいっていることを確認します。

$ airflow webserver

f:id:livesense-analytics:20180131141028p:plain

Airflowの常駐サービス

Airflowには3種類の主要な常駐サービスがあります。

これらのサービスはデータベースやキューイングマネージャを介して完全に独立して動作するので、別々のサーバーで動作させることもできます。*3

特に分散処理を行う場合は、workerを動作させるworkerサーバーを複数並列に実行することになると思いますが、そのようなことも問題なく実現できます。*4

  • airflow webserver
    • AirflowのWebUIの処理、出力を行います
  • airflow scheduler
    • DAGの状態を監視して、スケジュールに応じてDAG Runを作成します
    • CeleryExecutorを使用する場合、Operatorのキューイングを行い、各ワーカーにOperatorの処理を分散します
    • (Sequential Executor を使用する場合、schedulerの子プロセスでDAG Runの処理が行われます)
  • airflow worker
    • schedulerがキューイングしたOperatorの処理を実行し、結果を格納します
    • (Sequential Executor を使用する場合には airflow worker を常駐させる必要はありません)

systemdで常駐化

以下のsystemdスクリプトを使用します scheduler, webserver, worker を常駐化します github.com セットアップの詳細は割愛します

ここまでの設定をすることでlinuxマシン上でwebserver, scheduler, workerが常駐して動き続けているはずです。

CeleryExecutorを利用する場合の並列処理周りのパラメーター

airflowには並列実行系のパラメーターが複数あり、理解しにくいので整理します

airflow.cfg

[core]

parallelism = x

dag_concurrency = x

max_active_runs_per_dag = x

...

[celery]

celeryd_concurrency = x

...

設定名 意味
parallelism 分散処理クラスタ全体で実行可能なプロセス数
dag_concurrency 一つのワーカで同時実行可能な最大プロセス数
max_active_runs_per_dag DAG内部で同時実行可能な最大タスク数
celeryd_concurrency(Airflow1.9.1以降ではworker_concurrency) 一つのCeleryワーカで同時実行可能な最大プロセス数

これらのパラメーターを適切に設定することで、分散処理のパフォーマンスをチューニングすることができると思います。

workerサーバーを複数台立ててタスクを分散処理する

f:id:livesense-analytics:20180201130947p:plain

同じ設定でworkerサービスのみを常駐させるサーバーを別途作成すれば、workerの処理を分散させることができます。

DAG

DAGとは

グラフ理論における有向非巡回グラフ のことです。有向非巡回グラフの場合、

  • 有向:辺には矢印のように方向がある
  • 非巡回:矢印に沿ってノードをたどっていった時にループが存在しない

といった特徴があります。

ループが存在しないので、DAGではノード間の依存順序を必ず解決できます。この特徴は依存関係があるワークフローを柔軟に途中実行したりする場合に有用なので、多くのワークフロー管理ツールが依存関係グラフとしてDAGを採用しています。AirflowもワークフローをDAGで記述します。

AirflowのDAGについて詳しくはこちらのスライドで説明されています。

DAGファイルを書く

Airflowチュートリアル

Airflowではデータフローをpythonのコードで記述します。このファイルを単にDAG又はDAGファイルと呼びます。

DAGファイルは airflow.cfgで設定したdags_folder (デフォルトでは $AIRFLOW_HOME/dags ) に設置します。

以下、DAGファイルの書き方について説明します。

1. dagオブジェクトを生成

DAGを定義するには、pythonのグローバルスコープにDAGオブジェクトを生成します。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

#dagを介してオペレーターに渡す引数を定義
operator_args = {
    'owner': 'airflow', 
    'depends_on_past': False,  #指定したタスクの上流タスクの実行が失敗した場合、タスクを実行するかどうかを設定します
                               #True:実行する False:実行しない
    'start_date': datetime(2018, 2, 1),  #DAG Runの生成期間の開始日時です
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

#dagの生成
dag = DAG(
    dag_id='dag_example1', #dagの識別子
    default_args=operator_args, #オペレーターに渡す引数を渡します
    catchup=False, 
    schedule_interval='0 0 * * *' #cronの時間形式で記述します
)
...

DAGの主な引数をまとめます

引数 意味
dag_id DAGに付ける識別名。
schedule_interval DAG実行間隔を指定します。cron形式の表記ができます(ex. '0 13 * * *')または '@daily' '@hourly' 等の表記もできます。詳細はドキュメントを参照して下さい
default_args operator生成時に指定する引数をdictionary形式で渡します。DAGファイルを記述する際には、一つのDAGに複数のオペレーターを紐付ける事が多いと思います。そのような場合、DAGにoperatorの引数を渡しておくとコード量が減って見やすくなります。
catchup(Airflow1.8以降) True/False で記述。 schedulerでDAG Runの遡り生成(Catchup)を行うかどうかを指定します。Falseを指定するとCatchupしなくなります。Catchupについては後述

2. オペレーターインスタンスの生成

task1 = BashOperator(
    task_id='task1', #taskの識別子
    bash_command='date', #bashで実行する内容。BashOperator特有
    dag=dag #オペレーターをdagと結びつけます
)

task2 = BashOperator(
    task_id='task2', 
    bash_command='sleep 5',  
    dag=dag 
)

DAGオブジェクトを生成したら、次はDAG内での処理単位となるオペレーターオブジェクトを生成します。 Airflowはオペレーターオブジェクト単位でデータフローを作り上げるという用途を想定して作られているので、各種データベースと接続するためのオペレーターが定義されています。

オペレーターは種類が多いですが、以下、主要なオペレーターの概要をまとめます

incubator-airflow/airflow/contrib/operators at master · apache/incubator-airflow · GitHub

incubator-airflow/concepts.rst at master · apache/incubator-airflow · GitHub

オペレーター 内容
SSHExecuteOperator(Airflow1.9で廃止) SSHで他のサーバーにログインしてコマンド実行するためのオペレーター
SSHOperator(Airlfow1.9から採用) 新しいSSH用のオペレーター
BashOperator workerローカルでbashコマンドを実行するためのオペレーター
PythonOperator workerローカルで任意のPython関数を実行するためのオペレーター
EmailOperator emailを送信するためのオペレーター
HTTPOperator HTTPリクエストを送信するためのオペレーター
MySqlOperator, SqliteOperator等 SQLクエリを実行するためのオペレーター
Sensor 指定時間、ファイル、データベース行、S3 key等を取得できるまで待つためのオペレーター

3. オペレーターインスタンス間の依存関係を記述

オペレーターのインスタンス同士を依存関係で結びます。

依存関係を記述するには、set_downstream()、 set_upstream() 関数を使いますが、Airflow1.8からはシフト演算子も使えるようになりました。

Bitshift Composition

f:id:livesense-analytics:20180201150723p:plain

#operator1を実行してからoperator2を実行する
#古い表記例
task1.set_downstream(task2)
#または
task2.set_upstream(task1)

#新しい表記例(Airflow1.8以降)
task1 >> task2
#または
task2 << task1

4. DAGファイルの読み込み

f:id:livesense-analytics:20180201151427p:plain

f:id:livesense-analytics:20180201151537p:plain

ここまでの設定を dags_folder 上にあるファイルに保存したらDAGの設定は完了です。

記述したグラフに矛盾や間違いがなければAirflowが自動で読み込んでくれるはずです。

読み込まれない場合はschedulerを再起動してみましょう。*5

Catchupについて

DAG Run

DAG Run はDAGの一回分の実行を表現するメタデータ上の概念です。 DAGの実行をする際には、スケジュール実行する場合も、一回限りの即時実行をする場合も、DAG Runがメタデータ上に作成されます。 DAG Runはsuccess, failed, running等の実行状態を保持しています。

Catchup動作

scheduler は DAGの schedule_interval で設定されている時間になるとDAG Runを生成してDAGの処理を始めます。

この動作は一見何でもないように見えますが、実は、 scheduler は前回のDAG Runが存在するかどうかチェックしています。

前回分のDAG Runがあれば最新のDAG Runを一つ生成するだけですが、 前回分のDAG Runがない場合、履歴をたどっていって最も近い過去に実行されたDAG Runから現在時刻までの間のDAG Runをまとめて生成します。 過去に一度もDAG Runが生成されていない場合、DAGに定義されている start_date から現在時刻までのDAG Runsを一括生成します。

この動作を Airflow では Catchup と呼んでいます。

例えば、あるDAGを1日一回実行していましたが何らかの事情で1月3日分まででで止めてしまったとします。その後、2月1日にDAGの実行を再開すると、1月4日から2月1日の分のDAG Runが一気に生成されて順次実行されます。

f:id:livesense-analytics:20180131195311p:plain

DAGからexecution_dateを受け取って指定日1日分のデータを処理するようなフローの場合はこれで良いのですが、最新のデータだけを処理すれば良いフローの場合、DAG Runがたくさん生成されても困ってしまいます。

このような場合にはCatchupを無効にすることができます。方法は2つあります。

  1. airflow.cfg を書きかえてCatchupを無効にする

    全てのDAGでCatchupが無効になります

    設定するには、airflow.cfgを書き換えます

    [scheduler]

    catchup_by_default = True

  2. DAGの引数に指定する

    指定したDAG内だけでCatchupが無効になります

    catchup = True / catchup = False

実際にLAにAirflowを導入してみて

Airflowを導入して良かったところ

  • Airflow側で実行ログを保存してくれる トラブルの際にタスク実行サーバーにログインしてログを確認する必要がなくなりました

  • タスク毎の実行履歴が見られる 実行履歴からログが見られるので状況の把握や問題の原因究明がやりやすくなりました

  • 依存関係があるデータフロー処理タスクをDAGでコード記述できる 依存関係の記述がやりやすくなりました。 また、データ処理フローをコードで記述できるので、バージョン管理しやすくなりました

  • データフローをグラフィカルに確認できる 複雑なデータフローを記述する際に、依存関係に間違いがないか一目で確認できるようになりました

微妙だった所

  • DAGを消してもWebUIからdag_id名が消えない

    リネームしたDAG等が残ってしまいます。そういう仕様だと思って割り切って使っていますが、リネームする項目が増えた時にどうしようかと思ってしまいます

  • DAG導入のタイミングによっては、ExecutionDateとStartDateがズレてしまう

    残念ながら、この挙動については原因が追いきれてません

まとめ

今回はAirflow分散処理の概要についてご説明させて頂きました。以下にAirflowの最近の更新履歴概要の抄訳を付録としまして置きましたので最近の状況についてご確認いただければと思います。

付録:Airflow 最近の更新状況

Apache Airflowはこの1年で2回の大きなアップデート(version 1.9, version 1.8)を行っています。 以前とは設定項目などが色々変わっていますので、以前利用されていた方も最新の状況を確認されるのが良いと思います。

incubator-airflow/UPDATING.md at master · apache/incubator-airflow · GitHub

以下、抄訳です。

次期バージョン(予定)

Celeryコンフィギュレーションパラメータ名が変更になります

  • celeryd_concurrency から worker_concurrency に変更
  • celery_result_backend から result_backend に変更

GCP Dataflow Operators

  • Dataflow job labeling が Dataflow {Java,Python} Operator でサポートされます

Airflow 1.9

リリース 2017/12

SSH HookがサブプロセスベースのSSHからParamikoライブラリを使用したものに切替え

  • SSHExecuteOperatorは廃止になりました。SSHOperatorを使用して下さい
  • 併せて、SFTPOperatorが追加されました

airflow.hooks.S3_hook.S3Hookがboto2からBoto3を使うようになりました

  • s3_conn_idは受け付けなくなりました。aws_conn_idを使って下さい
  • デフォルトコネクションがs3_defaultからaws_defaultに変更

ログシステムの作り変え

  • pythonのloggingモジュールを使うように変更されました

  • 既存のLoggingMixinクラスを拡張することでロギングをカスタマイズできるようになりました

Dask Executor の導入

  • Dask分散クラスタでタスクを実行できるようになりました

Airflow 1.8.1

リリース 2017/5

パッケージ名がairflowからapache-airflowに変更されました

Airflow 1.8

リリース 2017/3

metadataデータベースの構造が変更されたので、バージョンアップするにはmetadataのアップグレードが必要になりました

  • metadataデータベースをアップグレードするには、airflow upgradedb を実行する必要があります

poolの管理がより厳密になりました

  • バージョン1.7.1では、許可されている以上の数のプールを取得できる問題がありましたが、今回のバージョンからできなくなりました

スケジューラーのオプションが新しくなりました

  • child_process_log_directory

    • 堅牢性を向上するために、DAGSの処理はスケジューラーとは別のプロセスを立てて実行するようになりましたのでそれぞれがログを出力します
    • それぞれのDAGS処理プロセスが、Airflow.cfg上で設定した child_process_log_directory 以下にログファイルを出力します
  • コマンドラインオプション num_runs の意味が変わりました

    • スケジューラーの最大リトライ回数 から run_duration時間内の最大リトライ回数に変更されました
  • min_file_process_interval

    • 省略
  • dag_dir_list_interval

    • 省略
  • catchup_by_default

    • 本ドキュメントで触れました

DAG処理中のエラーがWebUIに表示されなくなりました

  • DAG処理エラーは airflow.cfgで設定されるchild_process_log_directory 以下に出力されます

新しく登録したDAGはデフォルトでポーズ状態になります

  • Airflow.cfg 内に dags_are_paused_at_creation = False と記述することで以前のふるまいに戻すことができます。

コンテキスト変数が Hive config に渡せるようになりました

Google Cloud Operator と Hook が整理されました

deprecated扱いになり、将来的に削除される機能があります

  • HookとOperatorはairflowオブジェクトから直接参照できなくなります。サブモジュールからimportして下さい。
  • Operator._init_()は任意の引数を許容しなくなります
  • secure_modeはデフォルトでTrueになります

*1:(実際に分散処理を行う場合にはwebserverとschedulerで一台、workerだけのマシン複数台で運用すると思いますが)

*2:具体的なディストリビューション等についてはここでは言及しません

*3:当然ですが metadata database としてローカルマシンのデータベースを使用している場合 webserver と worker を別々のサーバーで動かすことは出来ません

*4:Airflowは分散マシン間のDAGファイルの同期について一切面倒を見てくれません。workerを複数動かす場合、バージョン管理ツールや構成管理ツール等を用いて、各workerサーバーのDAGファイルの中身をユーザーが同期しておく必要があります

*5:スケジューラーはairflow.cfgのmin_file_process_intervalに設定されている間隔で更新をしていますが、この設定値には下限があって、デフォルト値の0設定でも3分間隔になるようです。また、新しいファイルの監視の設定はdag_dir_list_intervalでこちらはデフォルト5分間隔で管理しているようです。