OpenCVで画像のサイズを求めるgearman workerを作って、Rubyから呼ぶで作ったworkerをpreforkさせて、そいつらをdaemontoolsで管理できるようにした。あらかじめCPU個数+いくつかforkしておくと、CPUが複数あるマシンを生かせるし、解析前にlibcurlで画像を取得している時のI/O待ちが少なくなって良い。(この記事のworkerはlibcurl使ってない版だけど)
あと、返り値は自分で作ったjson_builder.hを使って返すようにした。

なにげに大量の画像の中からダウンロード失敗した破損画像を見つけるのに重宝している。

まずdaemontoolsをインストールしておく

gearmandもdaemontoolsで自動起動するようにしておく。


daemontoolsで管理できるようにする。
普通にforkしただけだと、daemontoolsでsvc -dしてプロセスを止めようとしてもforkした子プロセスの方が止まらない。

Perlの場合の良い例があった。
How to manage Gearman worker processes. – TokuLog 改メ tokuhirom’s blog
Parallel::Preforkを使っている。Parallel::Preforkのソースを読んでみたら、trap_signalsオプションで親プロセスがSIGTERMとSIGHUPをフックして、子プロセスにkillを送っていた。
よく考えたら普通のforkで親が子を殺すというやつだった。


Parallel::Preforkと同じ様にやる。
forkした後親が子のpidのリストを持っておいて、SIGTERM/SIGHUPをフックして、子を全部killする処理を追加した。

daemontoolsのrunスクリプトはこれ
#!/bin/sh
exec 2>&1
exec setuidgid sho /Users/sho/src/gearmand-study/imgsize/imgsizeWorker -s localhost -p 7003 --fork 5
起動すると5個にプロセスが増える。親はdaemontoolsのsuperviseが管理してくれる。
これでsvc -dとか-uとかすればまとめて起動終了するようになった。

imgsizeWorker.cpp
// 画像サイズを返すgearman worker
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string>
#include <iostream>
#include <cv.h>
#include <highgui.h>
#include <boost/program_options.hpp>
#include <boost/regex.hpp>
#include <boost/format.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/tuple/tuple_io.hpp>
#include <boost/any.hpp>
#include <libgearman/gearman.h>
#include "json_builder.h"

using namespace boost;
using namespace std;

tuple<int, int> get_size(const string& fileName); // 画像のwidth,heightを返す
map<string,any> imgsize(const string& fileName); // gearman workerとしてclientに返すためのJSON Objectを作る
void *job_imgsize(gearman_job_st *job, void *cb_arg, size_t *result_size, gearman_return_t *ret_ptr);
void on_exit_signal(int sig);
vector<int> pids;

int main(int argc, char* argv[]) {
  program_options::options_description opts("options");
  opts.add_options()
    ("help,h", "helpを表示")
    ("server,s", program_options::value<string>(), "gearmanサーバーのアドレス")
    ("port,p", program_options::value<int>(), "gearmanサーバーのport番号")
    ("fork", program_options::value<int>(), "preforkする数")
    ("test,t", program_options::value<string>(), "gearman worker単体テスト用query");
  program_options::variables_map argmap;
  program_options::store(parse_command_line(argc, argv, opts), argmap);
  program_options::notify(argmap);

  if(!argmap.count("help")){
    if(argmap.count("test")){
      cout << "---test---" << endl;
      string gearman_param = argmap["test"].as<string>();
      cout << json_builder::toJson(imgsize(gearman_param)) << endl; // 単体でworkerとしてのテスト
      return 0;
    }else if(argmap.count("server") && argmap.count("port")){
      if(argmap.count("fork")){
int i, pid;
for(i = 1; i < argmap["fork"].as<int>(); i++){
  pid = fork();
  if(pid == 0){ // 子プロセス
    pids.clear();
    break;
  }
  else{ // 親プロセス
    pids.push_back(pid);
    cout << str(format("fork:%d - parent:%d child:%d") % 
i %
getpid() %
pid) << endl;
  }
}
      }
      if(pids.size() > 0){ // 親プロセスの終了シグナルをフックする
signal(SIGTERM, on_exit_signal);
signal(SIGHUP, on_exit_signal);
      }
      gearman_worker_st worker;
      gearman_worker_create(&worker);
      string g_server = argmap["server"].as<string>();
      int g_port = argmap["port"].as<int>();

      struct hostent *g_host = gethostbyname((char*)g_server.c_str());
      string g_server_addr = str(format("%d.%d.%d.%d") %
 (uint)(uchar)g_host->h_addr[0] %
 (uint)(uchar)g_host->h_addr[1] %
 (uint)(uchar)g_host->h_addr[2] %
 (uint)(uchar)g_host->h_addr[3]);

      gearman_worker_add_server(&worker, g_server_addr.c_str(), g_port);
      gearman_worker_add_function(&worker, "img_size", 0, job_imgsize, NULL);
      cout << str(format("---start worker (%s:%d)---") %
  g_server_addr % g_port) << endl;
      while(true) gearman_worker_work(&worker); // workerとして待機
      return 0;
    }
  }
  cerr << "server,portが必要です" << endl;
  cerr << opts << endl;
  return 1;
  
}

// opencvで画像サイズを取得
tuple<int, int> get_size(const string& fileName){
  IplImage *img = cvLoadImage(fileName.c_str());
  if(!img){
    return make_tuple(-1, -1);
  }
  else{
    int width = img->width;
    int height = img->height;
    cvReleaseImage(&img);
    return make_tuple(width, height);
  }
}

// 画像サイズを取得してgearman serverに返すJSON Objectを作る
map<string,any> imgsize(const string& fileName){
  map<string,any> result_m;
  int width, height;
  tie(width, height) = get_size(fileName);
  if(width > 0 && height > 0){
    result_m["width"] = width;
    result_m["height"] = height;
  }
  else{
    result_m["error"] = string("image load error");
  }
  return result_m;
}

// gearman worker job
void *job_imgsize(gearman_job_st *job, void *cb_arg, size_t *result_size, gearman_return_t *ret_ptr){
  string fileName = (char*)gearman_job_workload(job);
  cout << fileName << endl;
  string result_str = json_builder::toJson(imgsize(fileName));
  cout << " => " << result_str << endl;
  char *result = (char*)strdup(result_str.c_str());
  *result_size = result_str.size();
  *ret_ptr = GEARMAN_SUCCESS;
  return result;
}

void on_exit_signal(int sig){
  for(int i = 0; i < pids.size(); i++){
    cout << str(format("kill (pid:%d)") % pids[i]) << endl;
    if(kill(pids[i], SIGKILL) < 0){
      cerr << str(format("kill failed (pid:%d)") % pids[i]) << endl;
    }
  }
  exit(0);
}