はじめに
実際に運用していた時に非同期にしていた主な処理は下記のようなものがあります。
- iOS Android の push 通知の送信処理
- ログの作成
- 様々な外部 API の呼び出し
- 非同期で更新しても問題ないデータの更新
Sidekiq is なに
sidekiqは非同期処理を実現する gem
他にも Ruby で非同期処理を実現できる有名な gem には
resque や delayed_job 等がある。
sidekiq.org
Enterprise版等もありますが、
今回はOSS版を使用している前提でのお話しです。
他の非同期処理が可能な gem との簡単な比較
FAQ · mperham/sidekiq Wiki · GitHub
この内容は結構真実を語っていることを最近知った
Sidekiq
- Redis
- マルチスレッド
- リトライ処理あり
- おしゃれなダッシュボード
Resque
- Redis
- ジョブごとにフォーク
- リトライ処理なし
- Sidekiqに比べると簡素なダッシュボード
Delayed Job
- DB(専用テーブル作成)
- リトライ処理あり
- delay method が便利
- 基本的にDBを使うので導入簡単
実装例
Redis は既にセットアップ済みとする
sidekiq を追加
$ vi Gemfile
+ gem 'sidekiq'
$ bundle install
config を追加
$ vi config/sidekiq.yml
:verbose: true
:pidfile: ./tmp/pids/sidekiq.pid
:logfile: ./log/sidekiq.log
:concurrency: 10 # worker process 数
:queues: # 処理するキュー名
- default
- user
env による設定の変更方法
production:
:concurrency: 25
staging:
:concurrency: 15
下記のように記述することで priority が設定できる
:queues:
- [user, 2]
- [default, 1]
worker file を作成
# UserWorker を作成
$ bundle exec rails g sidekiq:worker User
create app/workers/user_worker.rb
create test/workers/user_worker_test.rb
user_worker を編集
※今回は例としてわかりやすい処理にしています。
$ vim app/workers/user_worker.rb
- def perform(*args)
- # Do something
+ sidekiq_options queue: :user # キュー名指定がない場合は default になる
+ def perform(id, name)
+ user = User.find(id)
+ user.update(name: name)
キューを積む
$ bundle exec rails c
irb> UserWorker.perform_async(1, "test")
=> "f212bd8bae56c79467494ec8"
irb> User.find(1)
User Load (0.3ms) SELECT `users`.* FROM `users` WHERE `users`.`id` = 1 LIMIT 1
=>
Sidekiq を起動する
$ bundle exec sidekiq -C config/sidekiq.yml -d
$ bundle exec rails c
irb> User.find(1)
User Load (0.3ms) SELECT `users`.* FROM `users` WHERE `users`.`id` = 1 LIMIT 1
=>
これで簡単な実装例はおしまいです。
Sidekiq の enqueue の解説
Sidekiq が Redis にどのように queue を追加するかを解説する。
Sidekiq は使う method によって使う Redis のデータ型が違う
perform_asyncの場合
「セット型」と「リスト型」を使う。
1.セット型で queues という key で各ワーカーのキュー名を member として登録する
(実装例の場合は member に user を指定する)
2.リストの先頭に queue:キュー名 をキーにして string を追加する。
(実装例の場合は queue:user というキー名になる)
3.string は { class: worker class 名, arg: perform_asyncの引数配列, retry: retry機能が有効かのフラグ, queue: キュー名, jid: SecureRandom.hex(12) で生成された job id, created_at: 作成日, enqueued_at: キューに追加した時間 } この Hash を json にしたもの
以下が実装例での string の例
{ class: "UserWorker", args: [1, "test"], retry: true, queue: "user", jid: 6284ea2d3637756fc121b8f7, created_at: 1444571406.7752862, enqueued_at: 1444571406.7754705 }
"{\"class\":\"UserWorker\",\"args\":[1,\"test\"],\"retry\":true,\"queue\":\"user\",\"jid\":\"6284ea2d3637756fc121b8f7\",\"created_at\":1444571406.7752862,\"enqueued_at\":1444571406.7754705}"
perform_in の場合
「ソート済みセット型」を使う
1. ソート済みセット型で schedule という key で score に 実行時間を指定して member を登録する
2. member は { class: worker class 名, arg: perform_asyncの引数配列, retry: retry機能が有効かのフラグ, queue: キュー名, jid: SecureRandom.hex(12) で生成された job id, created_at: 作成日 } この Hash を json にしたもの
以下が実装例での member の例
{ class: "UserWorker", args: [1, "test"], retry: true, queue: "user", jid: 6284ea2d3637756fc121b8f7, created_at: 1444571406.7752862 }
"{\"class\":\"UserWorker\",\"args\":[1,\"test\"],\"retry\":true,\"queue\":\"user\",\"jid\":\"6284ea2d3637756fc121b8f7\",\"created_at\":1444571406.7752862}"
Redis について
Sidekiq を使う上で欠かせない Redis ですが、今回の記事で Redis のことまで深く説明すると
だいぶ長い記事になるので別記事として後日公開します。
工夫した点
ここからが本題の知見と言ってもいいかなという部分になります。
1. Redis の構成
当時開発していた Rails のアプリケーションでは、
session, sidekiq, cache に Redis を使用していました。
それぞれ別々の Redis にデータを保存していました。
別々にした理由は下記
- Redis はオンメモリ型のKVSなので容量的な制約が大きい(実メモリの約半分程度)
- 複数の用途で使い容量が設定によっては消えて欲しくないものが消える危険
- 1つの Redis で運用をし後々分割しようと思った際にデータを分けるのが面倒
構成
レプリケーションを組み,Redis Sentinelで自動フェイルオーバーするように設定を行う。
アプリケーション側は下記の記事で書きましたが, gem の redis-sentinel を導入
shiro-16.hatenablog.com
redis-sentinel を導入すると Rails は Redis Sentinel から現在の master 情報を取得し接続、
障害により接続に失敗した場合に 「Redis Sentinel から master 情報を取得し接続」を繰り返し
master が切り替わったタイミングで正常に接続ができるようになるという感じ
2. 引数は出来るだけ少なく
perform_async 等の引数は出来るだけ少なくしましょうという話
例として Twitter でファボられた時に push を送る処理を実装
(実際には1ユーザに複数のtokenが紐づく可能性がある)
def perform(token, message)
FavoritePush.send(token, message)
end
def perform(id)
favorite = Favorite.find(id)
message = FavoritePush.message(favorite)
FavoritePush.send(favorite.tweet.user.token, message)
end
何が良いのか?
・Redis の容量の圧迫を軽減出来る
・キューを積んでから実行されるまでに record が更新された場合にも対処出来る(今回の場合 token が更新されて push が送信出来ない問題を回避出来る)
・console 等からキューを積む場合に id を渡すだけなのでシンプル
・SQLの発行回数が増えるのですが primary key や index を使用した検索になることが多いと思われるので問題ないかと思われる。
DBが重くなるのであれば Slave を増やせばいいので Redis の容量問題に比べれば微々たるものだし、primary key や index を使用しても重いならそれは設計が(ry
・自分が運用していた時は引数は1〜3つでした。(例外が一部ありましたが)
※ ただし引数が多くなる場合も、勿論あります。
3. 判定処理等を worker 側に持たせる
今回は2の例に push を送信するかどうかを判定する処理を追加する
def perform(token, message)
FavoritePush.send(token, message)
end
def perform(id)
favorite = Favorite.find(id)
return unless favorite.tweet.user.send_push?
message = FavoritePush.message(favorite)
FavoritePush.send(favorite.tweet.user.token, message)
end
何が良いのか?
今回の場合はシンプルな判定だが、もっと複雑な判定を行う場面は多い
そうなった場合に特に有効
・キューを積む際に判定するより同期処理側の処理速度は速くなる
・今回の場合は message を生成する処理も worker 側で行うので同期処理側の処理速度は速くなる
・console 等からキューを積む場合に判定内容を気にしなくて良くなる
※ Redis に積むキューの数自体は増えてしまうので Redis の使用量は増えてしまいます
4. retry 機能に注意する
Sidekiq には retry 機能があるのでそのことを念頭に置いて実装を行う必要がある。
push 送信の処理に送信済みかフラグを持たせる
def perform(id)
favorite = Favorite.find(id)
return unless favorite.tweet.user.send_push?
message = FavoritePush.message(favorite)
FavoritePush.send(favorite.tweet.user.token, message)
favorite.update(send_push: true)
end
retry 機能があるのでもし favorite.update でエラーが起こった場合
retry されて再度実行されます。
なので favorite.update が正常に終了するまで同じ内容の push を送信し続けることになってしまいます。
例外をうまく扱うか update を別 worker として実装し push 送信後にキューを積む等の工夫が必要
※ retry を個別に off にすることも可能
5. record が削除されている場合も考慮する
Sidekiq だけの話ではないですが、
非同期処理なので処理が実行される際には既に対象の record が削除されている可能性も考えなければなりません。
4.の retry の話を念頭に置いて ActiveRecord::RecordNotFound 等の例外を上手く処理する必要があります。
6. batch 処理
「3.判定処理等を worker 側に持たせる」と同じような内容になるのですが、
batch 等であるテーブルの各 record に対して複雑な処理をする場合、
処理は全て worker 側で行ってしまおうということです。
例)全ユーザを対象に何かしらの計算を処理を行う batch があるとする
User.find_each do |user|
UserWorker.perform_async(user.id, result)
end
class UserWorker
def perform(id, result)
user = User.find(id)
user.update(result: result)
end
end
User.select(:id).find_each do |user|
UserWorker.perform_async(user.id)
end
class UserWorker
def perform(id)
user = User.find(id)
user.update(result: result)
end
end
良い点
・時間がかかる複雑な処理をマルチスレッドで行うことで処理速度が上がる
最後に
なぜこのような記事にまとめようと思ったのかというと
前々からまとめておこうとは思ったのですが、
最近 Sidekiq を使用していないので忘れてしまいそうだったことと、
最近 Sidekiq 感覚で Delayed Job を使用したら少々痛い目を見たという
辛い経験があったので非同期処理についてまとめておこうといった感じです。
また、ここに書いてあることが全て正しいわけではなく
自分の開発していたアプリケーションにはこのパターンが良かったというだけで、
これに沿ったからといって問題が全て解決するというわけではないと思います。
Sidekiq と Redis の特徴を理解して上手く付き合っていくのが良いかと思います。
自分は Sidekiq 好きなので贔屓目の記事になっているかもしれません。