tech::hexagram

personal note for technical issue.

Exqのenqueueのプロセスを追ってみた

仕事に必要だったのでメモ書き程度に残しておく。

Exq というRedisを利用したJob queueライブラリのコードリーディング。

  • Exq.enqueue/4 : use Exq.Enqueuer.EnqueueApi とあるので、その中に実体がある。
# Exq.Enqueuer.EnqueueApi

def enqueue(pid, queue, worker, args), do: enqueue(pid, queue, worker, args, @default_options)

def enqueue(pid, queue, worker, args, options) do
  GenServer.call(pid, {:enqueue, queue, worker, args, options}, Config.get(:genserver_timeout))
end

GenServer の仕組みで :enqueue をメッセージとして送信する。 handle_call:enqueue を処理しているメソッドを探す。

# Exq.Manager.Server

def handle_call({:enqueue, queue, worker, args, options}, from, state) do
  Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options)
  {:noreply, state, 10}
end
  • Exq.Enqueuer.enqueue: use Exq.Enqueuer.EnqueueApi とあるので、その中に実体がある。
# Exq.Enqueuer.EnqueueApi.enqueue

def enqueue(pid, from, queue, worker, args, options) do
  GenServer.cast(pid, {:enqueue, from, queue, worker, args, options})
end

handle_cast:enqueue を処理しているメソッドを探す。

# Exq.Enqueuer.Server

def handle_cast({:enqueue, from, queue, worker, args, options}, state) do
  response = JobQueue.enqueue(state.redis, state.namespace, queue, worker, args, options)
  GenServer.reply(from, response)
  {:noreply, state}
end
# Exq.Redis.JobQueue
def enqueue(redis, namespace, queue, worker, args, options) do
  {jid, job_serialized} = to_job_serialized(queue, worker, args, options)
  case enqueue(redis, namespace, queue, job_serialized) do
    \:ok    -> {:ok, jid}
    other  -> other
  end
end
def enqueue(redis, namespace, queue, job_serialized) do
  try do
    response = Connection.qp(redis, [
      ["SADD", full_key(namespace, "queues"), queue],
      ["LPUSH", queue_key(namespace, queue), job_serialized]])

    case response do
      {:ok, [%Redix.Error{}, %Redix.Error{}]} = error -> error
      {:ok, [%Redix.Error{}, _]} = error -> error
      {:ok, [_, %Redix.Error{}]} = error -> error
      {:ok, [_, _]} -> \:ok
      other    -> other
    end
  catch
    :exit, e ->
      Logger.info("Error enqueueing -  #{Kernel.inspect e}")
      {:error, :timeout}
  end
end

def to_job_serialized(queue, worker, args, options) do
  to_job_serialized(queue, worker, args, options, Time.unix_seconds)
end
def to_job_serialized(queue, worker, args, options, enqueued_at) do
  jid = UUID.uuid4
  retry = Keyword.get_lazy(options, :max_retries, fn() -> Config.get(:max_retries) end)
  job = %{queue: queue, retry: retry, class: worker, args: args, jid: jid, enqueued_at: enqueued_at}
  {jid, Config.serializer.encode!(job)}
end

というわけで Exq.Redis.JobQueue.enqueue/4 がほぼ実体であることがわかった。Redisに投げているコマンドの詳細は下記となる。


ちなみに dequeue の処理を追うつもりであったが hexdoc に詳細な解説があったので大体の処理の流れは理解できた。

ロードバイクで仙台まで行ってきた

連休を利用し、3泊4日でロードバイクに乗ってはるばる仙台まで自走してきた。

総走破距離は386kmで、これまでで一番遠くまで行くことが出来た。今回の旅に向けて準備してきたこと、旅の道中、旅から帰宅後の反省も含めて記録しておく。このエントリは1万字を超えているので、どんな旅だったかかいつまんでみたい場合は目次を利用していただきたい。

準備編

気をつけたこと

実は去年、同じようにゴールデンウィークを利用して2泊3日でロードバイクに乗って名古屋に行こうとしていた。しかしながら、準備不足で2日目に浜松に到着した時点で体力が底を尽きてしまい、そこから輪行で名古屋に向かったので自走できたのは270kmほどだった。

この経験を踏まえて、今年は下記に気をつけて計画を練った。

1日あたりの走行距離を短縮

去年は1日平均120~130kmの行程で着いたら夕方、しかもヘトヘトで何もする気が起きなかった。自転車旅行で移動がメインの目的ではあるが、日々の目的地に着いた後の観光も(時間は限られてはいるが)楽しみたかったため、1日80km前後を目標に再設定した。

走行中に体に荷物を背負わないようにする

去年リタイアした最大の原因としては、荷物をバックパックに詰めて背負って移動していたため、1日経つ毎に肩の凝りがひどくなったことだった。これを踏まえて、走行時に身につけるものはヒップバッグのみになるようにし、大半の荷物は自転車に積むようにした。

ホテルはコインランドリーがあるところを選ぶ

サイクルウェアは1着しか用意していないので、基本的には宿についたら洗うようにする運用方法を取った。去年は宿に到着後、自分で洗ってたらすごく疲れたので今年はそれを回避。

平地で足を使いすぎない

去年は1日目に静岡の新富士のあたりまで走っていて、行程の終盤に箱根超えがあり、800mほどの登坂をする必要があった。にも関わらず、平地で飛ばしすぎてしまったことで箱根では足が使い物にならずほぼ押して凌いでしまっていた。

県をまたぐ際はほぼ山越えがあると思っていたほうが良いので、それを見据えて平地で気持ちよく走れそうなところもぐっと抑えて足に負担がかからないペースになるように気をつけた。

続きを読む

Javascriptで連想配列のDeepCopy

Object.assign を実装当初利用したものの、Safariでは9以上でないと動かないらしい。

jQuery$.extend を利用するとオブジェクトのマージという形でDeepCopyが出来る。

var src = fetchSrc(); // 何かしらで取ってくる
var dest = {};
$.extend(true, dest, src);

上記の用に記述すると destsrc連想配列がマージされる。 dest 自体はもともと空の連想配列なので、実質 src のDeepCopyとなる。 第一引数に true を渡すと再帰的に階層を渡ってマージするので基本的には true を渡しておいたほうが良さそう。

任意で渡ってきた変数が特定のStructかどうか判定する方法

is_struct みたいな便利メソッドがないようなので以下のように判定する必要があった。 割と面倒だったのでまとめておく。

例として、任意で受け取った引数 argsUser のStructかどうかを判定する。

StructはMapの拡張になっている

まずStructは Mapを拡張した拡張であるので、 is_map(args) がtrueになる。

Structs are extensions on top of maps that bring default values, compile-time guarantees and polymorphism into Elixir.

ref: Structs - Elixir

iex(1)> user_struct = %User{}
%User{...}
iex(2)> string = "test"
"test"
iex(3)> is_map(user_struct)
true
iex(4)> is_map(string)
false

Structは __struct__ というkeyを保持している

In the example above, pattern matching works because underneath structs are bare maps with a fixed set of fields. As maps, structs store a “special” field named struct that holds the name of the struct:

次に、Structの場合は __struct__ というkeyを保持しているので、「MapであるがStructでない変数」の場合は Map.has_key?(args, :__struct) で弾くことが出来る。

iex(5)> map = %{hoge: "huga"}
%{hoge: "huga"}
iex(6)> map |> Map.has_key?(:__struct__)
false
iex(7)> user_struct |> Map.has_key?(:__struct__)
true

__struct__ にはStructの名前が入っている

args.__struct__User であればtrueになるので、あとはこれを比較すれば良い。

iex(8)> not_user_struct = %NotUser{}
%NotUser{...}

iex(9)> user_struct.__struct__ == User
true
iex(10)> not_user_struct.__struct__ == User
false

まとめると

def is_user?(args) do
  is_map(args) && Map.has_key?(args, :__struct__) && args.__struct__ == User
end 

ansibleでrubyのArray#zipみたいなことをやる

varsで定義した2つの同じ要素数のリストをrubyのArray#zipみたいに結合してよしなに何かする場合のサンプル。

template module で利用するJinja2というpythonのテンプレートエンジンが、yamlでも展開して利用できる。

- name: "hogehoge"
  vars:
    list1:
      - "1"
      - "2"
      - "3"
    list2:
      - "a"
      - "b"
      - "c"
    list3: |
       {% set o = [] %}
       {% for i in list1 %}
       {% set _ = o.append({ 'hoge': i, 'huga': list2[loop.index0]}) %}
       {% endfor %}
       {{ o }}
# list3 = [ {hoge: 1, huga: a}, {hoge: 2, huga: b}, {hoge: 3, huga: c} ] のようになる

参考

Template Designer Documentation — Jinja2 Documentation (2.8-dev)

ほげめも: Ansible の Jinja2 を活用する