データプラットフォームチームの橋本です。 日頃は分析基盤LivesenseAnalytics(LA)の保守・運用を担当しているエンジニアです。最近は専ら、バッチ処理のリファクタリングを懇々と進めていました。今回はその内容をまとめてみます。
分析基盤のバッチ処理
LAのバッチ処理基盤についてはこのへんを見ていただくとして、私が着手しているのはその中でもairflow-workerと呼称しているバッチ処理の集まりになります。その昔、Airflow導入以前に、cronでバッチ処理を運用していた頃から使い続けているrakeタスクの集まりで、類似の分析基盤を保守しているプロジェクトではよくある感じのバッチ処理集になっています。AirflowはDAGで設定された定時、ないしは任意のタイミング(ユーザーからのキック)で、airflow-workerのrakeタスクを起動します。この時、起動されるrakeタスクのIOは、概ね以下のようになります。
ところで、このairflow-worker。かつて、cronで運用されていた時代から引き継いだタスクの集まりで、チーム内では追加・修正の頻度が最も多いコードの1つなのですが、その分歴史と修正の積み重ねも厚く、多くの問題を抱えるようになっていました。具体的には、
- 処理がrake直書きになっていて、テストが書けない
- コンポーネントの配置やディレクトリ構造、命名に一貫性がなく、見通しが悪い
- コードリーディングで繰り返される
grep -r
- 違う名前で機能が重複する実装
- コードリーディングで繰り返される
- DRYじゃない・繰り返されるコード
task_env = ENV["task_env"] || "development"
- エラーハンドリングと通知
- dryrunできない・実装を逐一書かねばならない
などなどです。もともと素朴な定時処理から始まっているコードであるがゆえ、リリースの即時性に重きをおいて運用してきたため無理もないのですが、このまま可読性・保守性を犠牲にして拡張を続けると、
- 新規参入エンジニアの立ち上がりコスト増加・属人化
- 拡張コストの増加
- バグの温床化
といったデメリットが考えられたので、思い切って大規模なリファクタリングを敢行しました。
リファクタリングの方針と改善のポイント
まず次の2点を大まかな方針としました。
- 細かい問題点はたくさんあるが、大きな問題を優先して期間内に終わらせる
- 理想形には近づけるが、完璧にしない
リファクタリングの作業自体は重要な仕事ですが、理想形を目指し続けると際限がないので、とにかく大きな問題点をザックリ改善し、今後の継続的なコード改善への足がかりにする方針としました。 上記のIOの図の通り、定時のバッチ処理は概ね
- 日時やタスク名、スキーマ名などのパラメータを受けてタスクがキックされる
- テンプレートからクエリを発行したり、日付の範囲をとったAPIに投げるパラメータを作る
- 実際に処理を行うマネージドサービスのAPIを呼び出す、あるいはクエリを投入する
の3ステップで構成されています。そこで、バッチ処理のコンポーネントを図のように3つに分割しました。
改善のポイントは
- コンポーネントの構成を整理
- rakeタスクの処理を別のクラスへ委譲:rakeタスクとLATaskクラス
- バッチ処理のインターフェースとロジックを分離する
- ロジックに対してユニットテストを導入する余地を作る
- APIアクセスをFacadeにまとめる
- 可能な限りAPIアクセスをdryrunできるようにする
- 分割しにくい部分や広域なスコープで共通化された処理等は、思い切ってUtilsとして切り出す
としました。以下、その内容をもう少し具体的に見ていきます。
クラスとパッケージの構成
クラス構成はザックリ上記の様な感じにしました。モジュールは大きく4つに分けて、以下の構成にしました。
- API
- API呼び出しを行うFacadeのモジュール
- dryrunの機能をここに集約する
- このパッケージのクラスは状態を保持しないようにする
- LATask
- 処理の実装を持つクラスのモジュール
- 親クラスにエラーハンドリングやロギング、環境変数の参照などを集約する
- rakeタスク
- rakeタスクは各々のタスクに対応するLATaskクラスへ処理を委譲する
- LATaskのインスタンスを呼び出し、パラメータを渡して処理を実行するだけ
- Utils
- バッチ処理全体で頻発する処理や、分割しきれなかった塊をとりあえず置いておくモジュール
- 状態を保持しない、関数の集まりにしておく
以下、rakeタスクからの処理フローを、サンプルのソースコード上で追いかけてみます。
rakeタスク
require 'la_tasks/foo/foo_task' namespace :foo do desc 'Foo関連のタスク' task :foo_task, %i(schema table column) do |task, args| LATask::Foo::FooTask.new(task).foo_task( args[:schema], args[:table], args[:column] ) end end
rakeタスク上では引数のチェックすらせず、LATask
クラスに処理を丸投げします。この様にrakeタスクの名前と実装を切り離しておくと、後から実装の差し替えや改名等がしやすくなります。
LATaskクラス
LATask - 親クラス
require 'path/to/messenger' module LATask class Base def initialize(task_name) @messenger = Messenger.new @task_env = ENV['TASK_ENV'] || 'development' fail "Wrong TASK_ENV: #{@task_env}" unless ['production', 'development'].include? @task_env @task_name = "#{task_name}:#{@task_env}" end protected def exec notify("Task Start - #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}") yield notify("Task End - #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}") rescue => e error_action(e) @messenger.alert("#{@task_name}: #{e}", e) end def error_action(e) # 何かあればoverrideして使う end def alert(message, e) @messenger.alert("#{@task_name}: #{message}", e) end end end
LATaskクラスの親クラスには、ロギングやエラーハンドリング、通知、環境変数チェック等、全タスク共通で頻発する処理をシンプルにまとめておきました。これにより、子クラスの実装がかなりスッキリしました。
LATask - 子クラス
require 'api/aws/sqs' class LATask::Foo::FooTask < LATask::Base def initialize(task_name) super(task_name) @sqs = API::AWS::SQS.new end def foo_task(schema, table, column) exec do # ここに処理本体を記述 alert("task_env is #{@task_env}") @sqs.send_msg(@task_name) end end def bar_task exec do # ここに処理本体を記述 end end private def private_func # このタスク限定の処理など end end
LATaskの子クラスに、タスクの実装を書きます。この時、exec
に渡すプロック内で全てのタスクの処理を記述することにより、ロギング、エラーハンドリングを親クラスの実装に一元化できます。パラメータのチェックを行う場合も、ここに記述します。また、タスクのロジックをrakeタスクにベタ書きせず、クラスに切り出した事で、各々の関数に対してテストが書けるようになりました。
APIクラス
API - 親クラス
require 'utils/utils' module API include Utils def check_dry_run_then(param) if dry_run? puts "### DRY_RUN ###" pp param else yield end end end
equire 'api/api' module API module AWS include API require 'aws-sdk-core' end end
APIの親クラスに、dryrunの処理を実装しています。こうすることで、LATaskのロジック本体からdryrunに関連する処理を取り除けるようになり、コードが簡潔になりました。
API - 子クラス
require 'api/aws' require 'yaml' class API::AWS::SQS include API::AWS def initialize(region = AWS_REGION) @sqs = Aws::SQS::Client.new( region: region, access_key_id: AWS_ACCESS_KEY_ID, secret_access_key: AWS_SECRET_ACCESS_KEY ) @conf = YAML.load_file('config/aws.yml')[task_env][:sqs] end def send_msg(data) check_dry_run_then("[sqs.send_msg] #{data}") do @sqs.send_message(data) end end private AWS_REGION = "ap-northeast-1" end
check_dry_run_then
のブロックで、APIにアクセスしています。dryrunの際にはこのブロックの処理が実行されず、代わりにcheck_dry_run_then
の引数で与えられる文字列が表示されます。他のAPIアクセスについても同様に記述しておくことで、簡単にdryrunが実装できます。
Utilクラス
module Utils def dry_run? !ENV['DRY_RUN'].nil? end end
システム横断でよく使われる処理をまとめた関数群ですが、なるべく使わないようにしています。
リファクタリングを途中までやってみて
下記の様なメリットを感じつつあります。
- 全体が一貫性を持った構成、書き方で統一できた(可読性・保守性の向上)
- テストを書く余地ができた
- ディレクトリ構成でコンポーネントが一目瞭然(もうgrepしない)
- LATaskクラスにシンプルなロジックを記述するだけでも、概ね全部のタスクにdryrunモードが実装できそう
- エラーハンドリングとロギングが集約できた
当初、課題として挙げていた部分はまずまず解決できそうな見込みです。一方で、
- まだイマイチな実装が細々とたくさん残ってる
- 本当は思い切って名前を変えて、タスクの分類体系も再編したい
- 想定通りとは言え、分解しきれずUtilsに残ってしまった塊
といった点が、課題として見えてきました。
まとめ
サンプルにある通りかなりシンプルな実装ですが、コードの見通しの良さを保ったまま課題としていた事を概ね克服しつつあるので、良かったと感じています。
- 実装の難易よりも、課題をシンプルに解決できる事が大切
- 課題の本質を、しっかり見定めることが同様に大切
という学びとリファクタリングの経験を得られました。新しいことも奇抜なこともないリファクタリングでしたが、よい学びと経験が得られたと感じています。