Exqのenqueueのプロセスを追ってみた
仕事に必要だったのでメモ書き程度に残しておく。
Exq というRedisを利用したJob queueライブラリのコードリーディング。
- Exq.enqueue/4 :
use Exq.Enqueuer.EnqueueApi
とあるので、その中に実体がある。
# Exq.Enqueuer.EnqueueApi def enqueue(pid, queue, worker, args), do: enqueue(pid, queue, worker, args, @default_options) def enqueue(pid, queue, worker, args, options) do GenServer.call(pid, {:enqueue, queue, worker, args, options}, Config.get(:genserver_timeout)) end
GenServer
の仕組みで :enqueue
をメッセージとして送信する。 handle_call
で :enqueue
を処理しているメソッドを探す。
# Exq.Manager.Server def handle_call({:enqueue, queue, worker, args, options}, from, state) do Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options) {:noreply, state, 10} end
- Exq.Enqueuer.enqueue:
use Exq.Enqueuer.EnqueueApi
とあるので、その中に実体がある。
# Exq.Enqueuer.EnqueueApi.enqueue def enqueue(pid, from, queue, worker, args, options) do GenServer.cast(pid, {:enqueue, from, queue, worker, args, options}) end
handle_cast
で :enqueue
を処理しているメソッドを探す。
# Exq.Enqueuer.Server def handle_cast({:enqueue, from, queue, worker, args, options}, state) do response = JobQueue.enqueue(state.redis, state.namespace, queue, worker, args, options) GenServer.reply(from, response) {:noreply, state} end
# Exq.Redis.JobQueue def enqueue(redis, namespace, queue, worker, args, options) do {jid, job_serialized} = to_job_serialized(queue, worker, args, options) case enqueue(redis, namespace, queue, job_serialized) do \:ok -> {:ok, jid} other -> other end end def enqueue(redis, namespace, queue, job_serialized) do try do response = Connection.qp(redis, [ ["SADD", full_key(namespace, "queues"), queue], ["LPUSH", queue_key(namespace, queue), job_serialized]]) case response do {:ok, [%Redix.Error{}, %Redix.Error{}]} = error -> error {:ok, [%Redix.Error{}, _]} = error -> error {:ok, [_, %Redix.Error{}]} = error -> error {:ok, [_, _]} -> \:ok other -> other end catch :exit, e -> Logger.info("Error enqueueing - #{Kernel.inspect e}") {:error, :timeout} end end def to_job_serialized(queue, worker, args, options) do to_job_serialized(queue, worker, args, options, Time.unix_seconds) end def to_job_serialized(queue, worker, args, options, enqueued_at) do jid = UUID.uuid4 retry = Keyword.get_lazy(options, :max_retries, fn() -> Config.get(:max_retries) end) job = %{queue: queue, retry: retry, class: worker, args: args, jid: jid, enqueued_at: enqueued_at} {jid, Config.serializer.encode!(job)} end
というわけで Exq.Redis.JobQueue.enqueue/4
がほぼ実体であることがわかった。Redisに投げているコマンドの詳細は下記となる。
- sadd: Set型に値を追加する。keyに対応するセットに指定した値を追加。
- lpush: List型に値を追加する。lpushの場合はリストの先頭に追加。
ちなみに dequeue
の処理を追うつもりであったが hexdoc に詳細な解説があったので大体の処理の流れは理解できた。