LIVESENSE Data Analytics Blog

リブセンスのデータ分析、機械学習、分析基盤に関する取り組みをご紹介するブログです。

Argo によるコンテナネイティブなデータパイプラインのワークフロー管理

データプラットフォームグループの野本です。主に機械学習基盤の構築やそれにまつわるアプリケーションの開発をしています。 以前までの記事で現在 Kubernetes を利用して機械学習基盤の構築を進めているという紹介をしましたが、機械学習システムに付きものだと思われるワークフローのジョブ管理に Argo という Kubernetes 上で動作するワークフローエンジンを導入し使いはじめまてみました。まだ色々試している段階でもあるのですが現状でどんな感じで使っているのか紹介してみようと思います。

ワークフローエンジンの選定に関して

現在機械学習基盤では先に紹介した以前の記事 や マルチコンテナ構成による機械学習アルゴリズムとアプリケーションの疎結合化 のような形で機械学習システムの構築を進めています。特に後者の具体例のように各アプリケーションを疎結合にうまく動かせるように出来るのが理想です。 これらを踏まえ以下のような点を重視してワークフローエンジンを検討しました。

コンテナによる実行が手軽に出来る

  • 各ジョブで利用する言語をジョブに適したものだったり担当者が得意なものに柔軟に変えることが出来る
  • 複数のシステムでコアとなるコンテナをライブラリ的に利用したい

ワークフローを宣言的に記述出来る

  • ワークフローの定義をファイルとしてバージョン管理したい
    • UI での設定よりも再現性がある
    • GitOps のような自動化を導入しやすい

出来るだけ手軽に導入 / 運用出来る

  • 依存するミドルウェアなどが極力少ない
  • 学習コストが低い

人員的なリソースや基盤構築のフェーズにもよると思いますが、現状は以上のようなことを念頭にワークフローエンジンの導入を考えていたところ、Argo - The Workflow Engine for Kubernetes というプロダクトと遭遇しました。

Argo とは

f:id:livesense-analytics:20180712183339p:plain

Argo とは Kubernetes 上で動作するコンテナネイティブなワークフローエンジンです。Kubernetes の CRD (Custome Resource Definition) として実装されており、標準の Deployment などの Kubernetes リソースと同等に yaml ファイルなどでワークフローの定義を管理出来るようになっています。

コンテナネイティブなワークフローエンジン

  • 各ジョブは Pod により実行される
  • ジョブ内 sidecar コンテナを利用可能
  • Kubernetes の他のリソースと同様に ConfigMap や Volume を参照可能
  • ダッシュボード用の argo-ui という Deployment の存在
    • 現状は実行済みワークフローの参照のみ

Kubernetes の CRD として実装されている

  • インストールされるのは workflow-controller と argo-ui の Deployment と argo-ui 用の Service、ConfigMap / Secrets で他のミドルウェアなどは必要としない
  • kubectl コマンドで他のリソースと同等に扱える

yaml によるワークフローの定義

  • 各ジョブはKubernetes 標準の Pod の Spec とほぼ同等の記述で設定出来る
  • 並列処理などかなり柔軟なワークフローの設定が可能

選定基準のところと対になるような記述となりましたが、Argo はちょうど今求めているようなワークフローエンジンであり、より手軽にコンテナベースのワークフローを構築出来そうだと判断し導入を進めることとしました。

ちなみにデータ分析基盤チームの方では AirFlow を本格的に利用しています

導入してみた

現在は主に機械学習アプリケーションのワークフローとして利用しています。一例として以下のようなケースで利用しています。

  1. Python によりデータ分析基盤からデータを取得し後続のアルゴリズムによる計算のタスクに渡す形に整形
  2. R によるアルゴリズムで計算し結果を出力
  3. Python により出力結果を実データに変換しデータベースに登録

これら各タスクは全て独立したコンテナで動作しているため、共通の Volume をマウントすることにより各タスク間のデータの受け渡しを行っています。

また、このワークフローを定期実行するために Kubernetes の CronJob を利用しています。現状の Argo では単独で定期実行などのイベントによるトリガーが実装されていません。なので、CronJob による Job で argo submit を実行することにより定期実行を行っています。失敗時などの再実行は Kubernetes v1.10 より実装された kubectl create job <Job Name> --from=cronjob/<CronJob Name> にて手軽に実行出来ます。

どのように運用しているか

実際にどう設定してどのように動かしているのか、簡単なサンプルをベースに説明してみます。

ワークフローの設定

以下のようなフローを構築してみます。

  1. Python 可変長の値を生成 (コンテナイメージ: task-python:1.0)
  2. R により 1 で生成した数値を処理して出力 (コンテナイメージ: task-r:1.0)
  3. Python により 2 で出力した数値を処理する (コンテナイメージ: task-python:1.0)

また、2, 3 は 1 で生成した数値の個数分並列に実行させてみます。 Argo の設定ファイルは以下のようになります。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: workflow-sample-
spec:
  entrypoint: workflow-sample-steps
  arguments:
    parameters:
      - name: max
        value: 5
  templates:
    - name: workflow-sample-steps
      steps:
      - - name: task1
          template: task-python-pre
          arguments:
            parameters:
            - name: max
              value: "{{workflow.parameters.max}}"
      - - name: task2-3
          template: nested-steps
          arguments:
            parameters:
            - name: result1
              value: "{{item}}"
          withParam: "{{steps.task1.outputs.parameters.results}}"

    - name: nested-steps
      inputs:
        parameters:
          - name: result1
      steps:
      - - name: task2
          template: task-r
          arguments:
            parameters:
            - name: param1
              value: "{{inputs.parameters.result1}}"
      - - name: task3
          template: task-python-post
          arguments:
            parameters:
            - name: param1
              value: "{{steps.task2.outputs.parameters.result}}"

    - name: task-python-pre
      inputs:
        parameters:
          - name: max
      container:
        image: task-python:1.0
        command: [sh, -c]
        args: ["python pre-task.py {{inputs.parameters.max}}  > /tmp/results"]
      outputs:
        parameters:
          - name: results
            valueFrom:
              path: /tmp/results

    - name: task-r
      inputs:
        parameters:
          - name: param1
      container:
        image: task-r:1.0
        command: [sh, -c]
        args: ["Rscript task.R {{inputs.parameters.param1}} > /tmp/result"]
      outputs:
        parameters:
          - name: result
            valueFrom:
              path: /tmp/result

    - name: task-python-post
      inputs:
        parameters:
          - name: param1
      container:
        image: task-python:1.0
        command: [sh, -c]
        args: ["python post-task.py {{inputs.parameters.param1}}"]

設定ファイルの解説

  • ワークフロー / コンテナを templates として定義する
    • ワークフローテンプレート (step)
      • name: workflow-sample-steps
      • name: nested-steps
    • 実行コンテナテンプレート (container)
      • name: task-python-pre
      • name: task-r
      • name: task-python-post
  • step は入れ子で定義出来る
    • ここでは task1 で複数の数値を生成、以降のタスクを並列に実行させるため入れ子の step を定義している
  • 各タスクの入出力はファイル経由で行っている
  • task1 の出力は [1,2,3,4] のような JSON の配列になっていて、withParam により配列をパースし以降の処理に 1 つずつ渡すことで並列処理を行うようになっている
  • spec 直下の arguments に設定しているグローバルなパラメータ(ここでは max)は argo submit workflow.yml -p max=30 のように実行時に値を上書き出来る

各ジョブの実際の内容は DB へのアクセスやデータ分析基盤である RedShift へのアクセス、GCS へのバックアップなどの処理がありますが、ワークフローとしては先程上げた導入例とほぼ同じような流れとなっています。

上記ワークフローを実際に実行した際のダッシュボードは以下のようになっています。

f:id:livesense-analytics:20180723144631p:plain

f:id:livesense-analytics:20180723144643p:plain

各ジョブがどのように実行されどのくらい時間がかかったか、その際のログ出力などが確認出来ます。

CronJob の設定

CronJob でワークフローを定期実行するにあたり、以下のような運用を行っています。

Argo クライアント

Kubernetes クラスタ内から argo コマンドを実行するために Argo クライアント専用のコンテナを作成しています。複数のアプリケーションで Argo を利用することになりますが、Argo 本体とのバージョンの整合性を保ちやすくするなどの理由で共通のコンテナとしています。

ワークフロー定義ファイル

先程設定したワークフロー定義の yaml ファイルはジョブのコンテナに含める運用にしています。Kubernetes の ConfigMap に設定したり GCS に置いておくなどの方法もありそうですがジョブのコンテナ (= ジョブと同一のリポジトリ) に配置しておくことでジョブのロジックと一緒に管理出来るの方が今のところ扱いやすいためこのような方法を取っています。 今回はワークフローの定義(workflow.yml)は Python のコンテナに同梱するような構成で進めてみます。

これらを踏まえ CronJob の設定は以下のようになります。

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: argo-cronjob-sample
spec:
  schedule: "0 * * * *"
  jobTemplate:
    spec:
      backoffLimit: 0
      template:
        spec:
          restartPolicy: Never
          volumes:
            - name: workdir
              emptyDir: {}
          initContainers:
            - name: init-workflow
              image: task-python:1.0
              command: ["sh", "-c"]
              args: ["cp /workflow.yml /mnt/work/workflow.yml"]
              volumeMounts:
                - name: workdir
                  mountPath: /mnt/work
              securityContext:
                runAsUser: 0
          containers:
            - name: exec-workflow
              image: argo-cli:1.0
              command: ["sh", "-c"]
              args: ["argo submit /mnt/work/workflow.yml"]
              volumeMounts:
                - name: workdir
                  mountPath: /mnt/work

CronJob 実行時の処理の流れは

  1. initContainerstask-python コンテナにある workflow.yml を取り出しマウントした volume に配置
  2. containers によるメイン処理にて 1 で展開した定義ファイルを引数に argo submit

となっています。改めて workflow.yml をどこで扱うかは悩ましいのですがよりアプリケーションに近いところで管理しておくことでワークフローと処理内容を一緒に参照出来る形にしています。

まとめ

このように Argo は当初から欲しかった

  • コンテナによる実行が手軽に出来る
    • Kubernetes ベースの実装のため Kubernetes のリソースとの親和性も高い
  • ワークフローを宣言的に記述出来る
  • 出来るだけ手軽に導入 / 運用出来る

という部分の要求は十分に満たしています。また現時点で使っている機能はほんの一部で、 argo リポジトリの example にあるようにかなり多様で柔軟なワークフローを構築出来る点も魅力です。

反対に、使ってみて現時点での課題と思うところは以下です。

  • 柔軟で複雑な処理が書ける反面 yaml も複雑になりがち
    • 今回は使っていないが dag での実行も実装されているのでこれを利用すればより簡潔 / 柔軟に記述出来そう
  • ダッシュボード (Argo UI)
    • 現状実行済みワークフローの参照程度。定義済みのワークフローの表示や再実行など出来るようになると良さそう
    • 実行済みワークフローが沢山あると描画がめちゃくちゃ重い
  • 実行済みワークフローの削除が自動で行われず自前で管理しなければならない
    • CronJob の 実行済み Job のように古いものは自動で消えて欲しい気もする
  • やはりスケジュール実行 / イベントトリガーが欲しい

スケジュール実行やイベントトリガーは Argo Events というプロジェクトが開発中で Calendar(Cron) や Webhook の機能が見受けられます。 また、Argo CD / Argo CI というプロジェクトも進行中で、 Argo による手軽なワークフローを CI / CD に利用出来るのはかなり便利になるのではと思っています。