resque-schedulerで予約済みのJobを複数削除
移転しました →
公開されているresque-schedulerで条件に該当するジョブをまとめて削除する方法が見当たらなかったのでハックしたという話です。
resque-schedulerはバックグラウンド用gemであるResqueをさらに拡張したもので、cronのような定期的な動作をさせたり、特定の時刻に動作をさせたりするためのgemです。
resque-schedulerのジョブを削除するために少し手をいれる必要があったのでそれについて書き残しておきます。
なお、この記事に書いてある内容はresque-scheduler 2.3.1時点のものであり、これ以降のバージョンでは以下に記す内容は必要なくなっている可能性があります。
Resqueのジョブ
まずResqueの概要ですが、Resqueでバックグラウンドジョブを登録するときはジョブを処理するクラスの名前と任意のパラメータを渡します。たとえばJobHandler
というジョブを処理するクラスがあって、パラメータを2つ受け取るとすると、クラス定義とジョブの登録はそれぞれ次のようになります。
# job handler class class JobHandler def self.perform(hoge_id,piyo_id) # do something in background... end end # enqueue def hoge hoge = Hoge.where( ... piyo = Piyo.where( ... Resque.enqueue(JobHandler, hoge.id, piyo.id) end
実際の動作はというと、まずResque.enqueue
では渡されたクラス名やパラメータをResque.encode
メソッドにより一つの文字列化したものをRedisに入れます。そうするとバックグラウンドで動いているプロセスがRedis内のデータを参照し、処理する必要があるジョブの文字列をResque.decode
メソッドでクラス名とパラメータに分解し直します。そして、そのクラスのperform
メソッドにパラメータを渡して実行するという流れになっています。
resque-schedulerも基本的には同じですが、時間を指定することができます。
Resque.enqueue_at(1.day.from_now, JobHandler, hoge.id, piyo.id)
スケジュールされたジョブを取りやめたい場合
resque-schedulerにはResque.enqueue_at
の時間指定以外の引数を与えることができるResque.remove_delayed
というメソッドがあります。このメソッドを使って指定したジョブを削除することができます。
def foo Resque.enqueue_at(1.day.from_now, JobHandler, 5, 10) end def bar Rescue.remove_delayed(JobHandler, 5, 10) end
こんな風にして、ジョブの追加や削除が可能です。
では、条件に該当する複数のジョブを一気に削除したいときはどうすればいいのでしょう。上の例で言えば、hoge_id=5
となる全てのジョブを削除したい、というような場合です。
実は2013年12月17日現在gemとしてリリースされているresque-schedulerの最新バージョン(2.3.1)にはそのためのメソッドが定義されていません。実はgithubのコードにはそのためのメソッドが新しく実装されているので、Gemfileでgithubリポジトリを参照するようにしてもよかったのですが、githubのバージョンではResqueのWeb UIに不具合があって困るので僕は使えないと判断しました。
先ほど紹介したようにResqueのジョブは少しばかりencodeされていて非常に取り回しづらい上、Redisを直に叩くとなるとさらにコードが煩雑になりかねません。そこで、githubリポジトリで導入されている、条件に当てはまるジョブを削除するためのメソッドだけを取り込むことにしました。
ここでは、resque-schedulerがそうしているように、Resqueをextendする形で拡張します。
config/initializers/resque_init.rb
あたりのresque用ファイルに次のコードを追記します。
module ResqueSchedulerExt # this methods is not implemented in current version (2.3.1) def remove_delayed_selection fail ArgumentError, "Please supply a block" unless block_given? destroyed = 0 # There is no way to search Redis list entries for a partial match, so we query for all # delayed job tasks and do our matching after decoding the payload data jobs = Resque.redis.keys("delayed:*") jobs.each do |job| index = Resque.redis.llen(job) - 1 while index >= 0 payload = Resque.redis.lindex(job, index) decoded_payload = decode(payload) if yield(decoded_payload['args']) removed = redis.lrem job, 0, payload destroyed += removed index -= removed else index -= 1 end end end destroyed end end Resque.extend ResqueSchedulerExt
これでResque.remove_delayed_selection
というメソッドを使えるようになりました。
あとは、必要な箇所でこのメソッドを呼び出すだけでOKです。
Resque.remove_delayed_selection do |args| # args is [hoge_id, piyo_id] args[0] == 5 end
これでhoge_id=5
であるジョブを全て削除することができました。