ぴよログ

↓に移転したのでこっちは更新されません、多分。

resque-schedulerで予約済みのJobを複数削除

移転しました →

公開されているresque-schedulerで条件に該当するジョブをまとめて削除する方法が見当たらなかったのでハックしたという話です。

resque-schedulerはバックグラウンド用gemであるResqueをさらに拡張したもので、cronのような定期的な動作をさせたり、特定の時刻に動作をさせたりするためのgemです。

resque/resque-scheduler

resque-schedulerのジョブを削除するために少し手をいれる必要があったのでそれについて書き残しておきます。

なお、この記事に書いてある内容はresque-scheduler 2.3.1時点のものであり、これ以降のバージョンでは以下に記す内容は必要なくなっている可能性があります。

Resqueのジョブ

まずResqueの概要ですが、Resqueでバックグラウンドジョブを登録するときはジョブを処理するクラスの名前と任意のパラメータを渡します。たとえばJobHandlerというジョブを処理するクラスがあって、パラメータを2つ受け取るとすると、クラス定義とジョブの登録はそれぞれ次のようになります。

# job handler class
class JobHandler
  def self.perform(hoge_id,piyo_id)
    # do something in background...
  end
end

# enqueue
def hoge
  hoge = Hoge.where( ...
  piyo = Piyo.where( ...
  Resque.enqueue(JobHandler, hoge.id, piyo.id)
end

実際の動作はというと、まずResque.enqueueでは渡されたクラス名やパラメータをResque.encodeメソッドにより一つの文字列化したものをRedisに入れます。そうするとバックグラウンドで動いているプロセスがRedis内のデータを参照し、処理する必要があるジョブの文字列をResque.decodeメソッドでクラス名とパラメータに分解し直します。そして、そのクラスのperformメソッドにパラメータを渡して実行するという流れになっています。

resque-schedulerも基本的には同じですが、時間を指定することができます。

Resque.enqueue_at(1.day.from_now, JobHandler, hoge.id, piyo.id)

スケジュールされたジョブを取りやめたい場合

resque-schedulerにはResque.enqueue_atの時間指定以外の引数を与えることができるResque.remove_delayedというメソッドがあります。このメソッドを使って指定したジョブを削除することができます。

def foo
  Resque.enqueue_at(1.day.from_now, JobHandler, 5, 10)
end

def bar
  Rescue.remove_delayed(JobHandler, 5, 10)
end

こんな風にして、ジョブの追加や削除が可能です。

では、条件に該当する複数のジョブを一気に削除したいときはどうすればいいのでしょう。上の例で言えば、hoge_id=5となる全てのジョブを削除したい、というような場合です。

実は2013年12月17日現在gemとしてリリースされているresque-schedulerの最新バージョン(2.3.1)にはそのためのメソッドが定義されていません。実はgithubのコードにはそのためのメソッドが新しく実装されているので、Gemfileでgithubリポジトリを参照するようにしてもよかったのですが、githubのバージョンではResqueのWeb UIに不具合があって困るので僕は使えないと判断しました。

githubの該当箇所

先ほど紹介したようにResqueのジョブは少しばかりencodeされていて非常に取り回しづらい上、Redisを直に叩くとなるとさらにコードが煩雑になりかねません。そこで、githubリポジトリで導入されている、条件に当てはまるジョブを削除するためのメソッドだけを取り込むことにしました。

ここでは、resque-schedulerがそうしているように、Resqueをextendする形で拡張します。

config/initializers/resque_init.rbあたりのresque用ファイルに次のコードを追記します。

module ResqueSchedulerExt
  # this methods is not implemented in current version (2.3.1)
  def remove_delayed_selection
    fail ArgumentError, "Please supply a block" unless block_given?

    destroyed = 0
    # There is no way to search Redis list entries for a partial match, so we query for all
    # delayed job tasks and do our matching after decoding the payload data
    jobs = Resque.redis.keys("delayed:*")
    jobs.each do |job|
      index = Resque.redis.llen(job) - 1
      while index >= 0
        payload = Resque.redis.lindex(job, index)
        decoded_payload = decode(payload)
        if yield(decoded_payload['args'])
          removed = redis.lrem job, 0, payload
          destroyed += removed
          index -= removed
        else
          index -= 1
        end
      end
    end
    destroyed
  end
end

Resque.extend ResqueSchedulerExt

これでResque.remove_delayed_selectionというメソッドを使えるようになりました。

あとは、必要な箇所でこのメソッドを呼び出すだけでOKです。

Resque.remove_delayed_selection do |args| # args is [hoge_id, piyo_id]
  args[0] == 5
end

これでhoge_id=5であるジョブを全て削除することができました。