読者です 読者をやめる 読者になる 読者になる

tech::hexagram

personal note for technical issue.

Exqのenqueueのプロセスを追ってみた

仕事に必要だったのでメモ書き程度に残しておく。

Exq というRedisを利用したJob queueライブラリのコードリーディング。

  • Exq.enqueue/4 : use Exq.Enqueuer.EnqueueApi とあるので、その中に実体がある。
# Exq.Enqueuer.EnqueueApi

def enqueue(pid, queue, worker, args), do: enqueue(pid, queue, worker, args, @default_options)

def enqueue(pid, queue, worker, args, options) do
  GenServer.call(pid, {:enqueue, queue, worker, args, options}, Config.get(:genserver_timeout))
end

GenServer の仕組みで :enqueue をメッセージとして送信する。 handle_call:enqueue を処理しているメソッドを探す。

# Exq.Manager.Server

def handle_call({:enqueue, queue, worker, args, options}, from, state) do
  Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options)
  {:noreply, state, 10}
end
  • Exq.Enqueuer.enqueue: use Exq.Enqueuer.EnqueueApi とあるので、その中に実体がある。
# Exq.Enqueuer.EnqueueApi.enqueue

def enqueue(pid, from, queue, worker, args, options) do
  GenServer.cast(pid, {:enqueue, from, queue, worker, args, options})
end

handle_cast:enqueue を処理しているメソッドを探す。

# Exq.Enqueuer.Server

def handle_cast({:enqueue, from, queue, worker, args, options}, state) do
  response = JobQueue.enqueue(state.redis, state.namespace, queue, worker, args, options)
  GenServer.reply(from, response)
  {:noreply, state}
end
# Exq.Redis.JobQueue
def enqueue(redis, namespace, queue, worker, args, options) do
  {jid, job_serialized} = to_job_serialized(queue, worker, args, options)
  case enqueue(redis, namespace, queue, job_serialized) do
    \:ok    -> {:ok, jid}
    other  -> other
  end
end
def enqueue(redis, namespace, queue, job_serialized) do
  try do
    response = Connection.qp(redis, [
      ["SADD", full_key(namespace, "queues"), queue],
      ["LPUSH", queue_key(namespace, queue), job_serialized]])

    case response do
      {:ok, [%Redix.Error{}, %Redix.Error{}]} = error -> error
      {:ok, [%Redix.Error{}, _]} = error -> error
      {:ok, [_, %Redix.Error{}]} = error -> error
      {:ok, [_, _]} -> \:ok
      other    -> other
    end
  catch
    :exit, e ->
      Logger.info("Error enqueueing -  #{Kernel.inspect e}")
      {:error, :timeout}
  end
end

def to_job_serialized(queue, worker, args, options) do
  to_job_serialized(queue, worker, args, options, Time.unix_seconds)
end
def to_job_serialized(queue, worker, args, options, enqueued_at) do
  jid = UUID.uuid4
  retry = Keyword.get_lazy(options, :max_retries, fn() -> Config.get(:max_retries) end)
  job = %{queue: queue, retry: retry, class: worker, args: args, jid: jid, enqueued_at: enqueued_at}
  {jid, Config.serializer.encode!(job)}
end

というわけで Exq.Redis.JobQueue.enqueue/4 がほぼ実体であることがわかった。Redisに投げているコマンドの詳細は下記となる。


ちなみに dequeue の処理を追うつもりであったが hexdoc に詳細な解説があったので大体の処理の流れは理解できた。