LIVESENSE Data Analytics Blog

リブセンスのデータ分析、機械学習、分析基盤に関する取り組みをご紹介するブログです。

LivesenseDataAnalyticsBlog

MENU

分析基盤のバッチ処理構成を考える

データプラットフォームチームの橋本です。 日頃は分析基盤LivesenseAnalytics(LA)の保守・運用を担当しているエンジニアです。最近は専ら、バッチ処理のリファクタリングを懇々と進めていました。今回はその内容をまとめてみます。

分析基盤のバッチ処理

LAのバッチ処理基盤についてはこのへんを見ていただくとして、私が着手しているのはその中でもairflow-workerと呼称しているバッチ処理の集まりになります。その昔、Airflow導入以前に、cronでバッチ処理を運用していた頃から使い続けているrakeタスクの集まりで、類似の分析基盤を保守しているプロジェクトではよくある感じのバッチ処理集になっています。AirflowはDAGで設定された定時、ないしは任意のタイミング(ユーザーからのキック)で、airflow-workerのrakeタスクを起動します。この時、起動されるrakeタスクのIOは、概ね以下のようになります。

f:id:livesense-analytics:20180611112817p:plain

ところで、このairflow-worker。かつて、cronで運用されていた時代から引き継いだタスクの集まりで、チーム内では追加・修正の頻度が最も多いコードの1つなのですが、その分歴史と修正の積み重ねも厚く、多くの問題を抱えるようになっていました。具体的には、

  • 処理がrake直書きになっていて、テストが書けない
  • コンポーネントの配置やディレクトリ構造、命名に一貫性がなく、見通しが悪い
    • コードリーディングで繰り返される grep -r
    • 違う名前で機能が重複する実装
  • DRYじゃない・繰り返されるコード
    • task_env = ENV["task_env"] || "development"
    • エラーハンドリングと通知
  • dryrunできない・実装を逐一書かねばならない

などなどです。もともと素朴な定時処理から始まっているコードであるがゆえ、リリースの即時性に重きをおいて運用してきたため無理もないのですが、このまま可読性・保守性を犠牲にして拡張を続けると、

  • 新規参入エンジニアの立ち上がりコスト増加・属人化
  • 拡張コストの増加
  • バグの温床化

といったデメリットが考えられたので、思い切って大規模なリファクタリングを敢行しました。

リファクタリングの方針と改善のポイント

まず次の2点を大まかな方針としました。

  • 細かい問題点はたくさんあるが、大きな問題を優先して期間内に終わらせる
  • 理想形には近づけるが、完璧にしない

リファクタリングの作業自体は重要な仕事ですが、理想形を目指し続けると際限がないので、とにかく大きな問題点をザックリ改善し、今後の継続的なコード改善への足がかりにする方針としました。 上記のIOの図の通り、定時のバッチ処理は概ね

  • 日時やタスク名、スキーマ名などのパラメータを受けてタスクがキックされる
  • テンプレートからクエリを発行したり、日付の範囲をとったAPIに投げるパラメータを作る
  • 実際に処理を行うマネージドサービスのAPIを呼び出す、あるいはクエリを投入する

の3ステップで構成されています。そこで、バッチ処理のコンポーネントを図のように3つに分割しました。

f:id:livesense-analytics:20180611124506p:plain

改善のポイントは

  • コンポーネントの構成を整理
  • rakeタスクの処理を別のクラスへ委譲:rakeタスクとLATaskクラス
    • バッチ処理のインターフェースとロジックを分離する
    • ロジックに対してユニットテストを導入する余地を作る
  • APIアクセスをFacadeにまとめる
    • 可能な限りAPIアクセスをdryrunできるようにする
  • 分割しにくい部分や広域なスコープで共通化された処理等は、思い切ってUtilsとして切り出す

としました。以下、その内容をもう少し具体的に見ていきます。

クラスとパッケージの構成

f:id:livesense-analytics:20180530163538p:plain

クラス構成はザックリ上記の様な感じにしました。モジュールは大きく4つに分けて、以下の構成にしました。

  • API
    • API呼び出しを行うFacadeのモジュール
    • dryrunの機能をここに集約する
    • このパッケージのクラスは状態を保持しないようにする
  • LATask
    • 処理の実装を持つクラスのモジュール
    • 親クラスにエラーハンドリングやロギング、環境変数の参照などを集約する
  • rakeタスク
    • rakeタスクは各々のタスクに対応するLATaskクラスへ処理を委譲する
    • LATaskのインスタンスを呼び出し、パラメータを渡して処理を実行するだけ
  • Utils
    • バッチ処理全体で頻発する処理や、分割しきれなかった塊をとりあえず置いておくモジュール
    • 状態を保持しない、関数の集まりにしておく

以下、rakeタスクからの処理フローを、サンプルのソースコード上で追いかけてみます。

rakeタスク

require 'la_tasks/foo/foo_task'

namespace :foo do
  desc 'Foo関連のタスク'
  task :foo_task, %i(schema table column) do |task, args|
    LATask::Foo::FooTask.new(task).foo_task(
      args[:schema],
      args[:table],
      args[:column]
    )   
  end 
end

rakeタスク上では引数のチェックすらせず、LATaskクラスに処理を丸投げします。この様にrakeタスクの名前と実装を切り離しておくと、後から実装の差し替えや改名等がしやすくなります。

LATaskクラス

LATask - 親クラス

require 'path/to/messenger'

module LATask
  class Base
    def initialize(task_name)
      @messenger = Messenger.new
      @task_env = ENV['TASK_ENV'] || 'development'
      fail "Wrong TASK_ENV: #{@task_env}" unless ['production', 'development'].include? @task_env
      @task_name = "#{task_name}:#{@task_env}"
    end
        
    protected
        
    def exec
        notify("Task Start - #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}")
        yield
        notify("Task End - #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}")
      rescue => e 
        error_action(e)
        @messenger.alert("#{@task_name}: #{e}", e)
    end
        
    def error_action(e)
      # 何かあればoverrideして使う
    end 
        
    def alert(message, e)
      @messenger.alert("#{@task_name}: #{message}", e)
    end
  end
end

LATaskクラスの親クラスには、ロギングやエラーハンドリング、通知、環境変数チェック等、全タスク共通で頻発する処理をシンプルにまとめておきました。これにより、子クラスの実装がかなりスッキリしました。

LATask - 子クラス

require 'api/aws/sqs'

class LATask::Foo::FooTask < LATask::Base
  def initialize(task_name)
    super(task_name)
    @sqs = API::AWS::SQS.new
  end

   def foo_task(schema, table, column)
    exec do
      # ここに処理本体を記述
      alert("task_env is #{@task_env}")
      @sqs.send_msg(@task_name)
    end
  end

  def bar_task
    exec do
      # ここに処理本体を記述
    end
  end

  private

  def private_func
    # このタスク限定の処理など
  end
end

LATaskの子クラスに、タスクの実装を書きます。この時、execに渡すプロック内で全てのタスクの処理を記述することにより、ロギング、エラーハンドリングを親クラスの実装に一元化できます。パラメータのチェックを行う場合も、ここに記述します。また、タスクのロジックをrakeタスクにベタ書きせず、クラスに切り出した事で、各々の関数に対してテストが書けるようになりました。

APIクラス

API - 親クラス

require 'utils/utils'                                                                                                 

module API                                                                                                             
  include Utils
  def check_dry_run_then(param)                                                                                        
    if dry_run?                                                                                                        
      puts "### DRY_RUN ###"
      pp param
    else
      yield
    end
  end                                                                                                                  
end
equire 'api/api'

module API 
  module AWS 
    include API 
    require 'aws-sdk-core'
  end 
end

APIの親クラスに、dryrunの処理を実装しています。こうすることで、LATaskのロジック本体からdryrunに関連する処理を取り除けるようになり、コードが簡潔になりました。

API - 子クラス

require 'api/aws'                                                                                                     
require 'yaml'                                                                                                         
                                                                                                                       
class API::AWS::SQS                                                                                                    
  include API::AWS                                                                                                     
                                                                                                                       
  def initialize(region = AWS_REGION)                                                                                  
    @sqs = Aws::SQS::Client.new(                                                                                       
      region: region,                                                                                                  
      access_key_id: AWS_ACCESS_KEY_ID,                                                                         
      secret_access_key: AWS_SECRET_ACCESS_KEY                                                          
    )                                                                                                                  
    @conf = YAML.load_file('config/aws.yml')[task_env][:sqs]                                                    
  end                                                                                                                  
                                                                                                                       
  def send_msg(data)                                                                                                   
    check_dry_run_then("[sqs.send_msg] #{data}") do                                                                     
      @sqs.send_message(data)                                                      
    end                                                                                                                
  end                                                                                                                  

  private

  AWS_REGION = "ap-northeast-1"
end

check_dry_run_thenのブロックで、APIにアクセスしています。dryrunの際にはこのブロックの処理が実行されず、代わりにcheck_dry_run_thenの引数で与えられる文字列が表示されます。他のAPIアクセスについても同様に記述しておくことで、簡単にdryrunが実装できます。

Utilクラス

module Utils
  def dry_run?
    !ENV['DRY_RUN'].nil?
  end
end

システム横断でよく使われる処理をまとめた関数群ですが、なるべく使わないようにしています。

リファクタリングを途中までやってみて

下記の様なメリットを感じつつあります。

  • 全体が一貫性を持った構成、書き方で統一できた(可読性・保守性の向上)
  • テストを書く余地ができた
  • ディレクトリ構成でコンポーネントが一目瞭然(もうgrepしない)
  • LATaskクラスにシンプルなロジックを記述するだけでも、概ね全部のタスクにdryrunモードが実装できそう
  • エラーハンドリングとロギングが集約できた

当初、課題として挙げていた部分はまずまず解決できそうな見込みです。一方で、

  • まだイマイチな実装が細々とたくさん残ってる
  • 本当は思い切って名前を変えて、タスクの分類体系も再編したい
  • 想定通りとは言え、分解しきれずUtilsに残ってしまった塊

といった点が、課題として見えてきました。

まとめ

サンプルにある通りかなりシンプルな実装ですが、コードの見通しの良さを保ったまま課題としていた事を概ね克服しつつあるので、良かったと感じています。

  • 実装の難易よりも、課題をシンプルに解決できる事が大切
  • 課題の本質を、しっかり見定めることが同様に大切

という学びとリファクタリングの経験を得られました。新しいことも奇抜なこともないリファクタリングでしたが、よい学びと経験が得られたと感じています。