OpenCVで画像のサイズを求めるgearman workerを作って、Rubyから呼ぶで作ったworkerをpreforkさせて、そいつらをdaemontoolsで管理できるようにした。あらかじめCPU個数+いくつかforkしておくと、CPUが複数あるマシンを生かせるし、解析前にlibcurlで画像を取得している時のI/O待ちが少なくなって良い。(この記事のworkerはlibcurl使ってない版だけど)
あと、返り値は自分で作ったjson_builder.hを使って返すようにした。
なにげに大量の画像の中からダウンロード失敗した破損画像を見つけるのに重宝している。
まずdaemontoolsをインストールしておく
daemontoolsで管理できるようにする。
普通にforkしただけだと、daemontoolsでsvc -dしてプロセスを止めようとしてもforkした子プロセスの方が止まらない。
Perlの場合の良い例があった。
How to manage Gearman worker processes. – TokuLog 改メ tokuhirom’s blog
Parallel::Preforkを使っている。Parallel::Preforkのソースを読んでみたら、trap_signalsオプションで親プロセスがSIGTERMとSIGHUPをフックして、子プロセスにkillを送っていた。
よく考えたら普通のforkで親が子を殺すというやつだった。
Parallel::Preforkと同じ様にやる。
forkした後親が子のpidのリストを持っておいて、SIGTERM/SIGHUPをフックして、子を全部killする処理を追加した。
daemontoolsのrunスクリプトはこれ
#!/bin/sh起動すると5個にプロセスが増える。親はdaemontoolsのsuperviseが管理してくれる。
exec 2>&1
exec setuidgid sho /Users/sho/src/gearmand-study/imgsize/imgsizeWorker -s localhost -p 7003 --fork 5
これでsvc -dとか-uとかすればまとめて起動終了するようになった。
imgsizeWorker.cpp
// 画像サイズを返すgearman worker
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string>
#include <iostream>
#include <cv.h>
#include <highgui.h>
#include <boost/program_options.hpp>
#include <boost/regex.hpp>
#include <boost/format.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/tuple/tuple_io.hpp>
#include <boost/any.hpp>
#include <libgearman/gearman.h>
#include "json_builder.h"
using namespace boost;
using namespace std;
tuple<int, int> get_size(const string& fileName); // 画像のwidth,heightを返す
map<string,any> imgsize(const string& fileName); // gearman workerとしてclientに返すためのJSON Objectを作る
void *job_imgsize(gearman_job_st *job, void *cb_arg, size_t *result_size, gearman_return_t *ret_ptr);
void on_exit_signal(int sig);
vector<int> pids;
int main(int argc, char* argv[]) {
program_options::options_description opts("options");
opts.add_options()
("help,h", "helpを表示")
("server,s", program_options::value<string>(), "gearmanサーバーのアドレス")
("port,p", program_options::value<int>(), "gearmanサーバーのport番号")
("fork", program_options::value<int>(), "preforkする数")
("test,t", program_options::value<string>(), "gearman worker単体テスト用query");
program_options::variables_map argmap;
program_options::store(parse_command_line(argc, argv, opts), argmap);
program_options::notify(argmap);
if(!argmap.count("help")){
if(argmap.count("test")){
cout << "---test---" << endl;
string gearman_param = argmap["test"].as<string>();
cout << json_builder::toJson(imgsize(gearman_param)) << endl; // 単体でworkerとしてのテスト
return 0;
}else if(argmap.count("server") && argmap.count("port")){
if(argmap.count("fork")){
int i, pid;
for(i = 1; i < argmap["fork"].as<int>(); i++){
pid = fork();
if(pid == 0){ // 子プロセス
pids.clear();
break;
}
else{ // 親プロセス
pids.push_back(pid);
cout << str(format("fork:%d - parent:%d child:%d") %
i %
getpid() %
pid) << endl;
}
}
}
if(pids.size() > 0){ // 親プロセスの終了シグナルをフックする
signal(SIGTERM, on_exit_signal);
signal(SIGHUP, on_exit_signal);
}
gearman_worker_st worker;
gearman_worker_create(&worker);
string g_server = argmap["server"].as<string>();
int g_port = argmap["port"].as<int>();
struct hostent *g_host = gethostbyname((char*)g_server.c_str());
string g_server_addr = str(format("%d.%d.%d.%d") %
(uint)(uchar)g_host->h_addr[0] %
(uint)(uchar)g_host->h_addr[1] %
(uint)(uchar)g_host->h_addr[2] %
(uint)(uchar)g_host->h_addr[3]);
gearman_worker_add_server(&worker, g_server_addr.c_str(), g_port);
gearman_worker_add_function(&worker, "img_size", 0, job_imgsize, NULL);
cout << str(format("---start worker (%s:%d)---") %
g_server_addr % g_port) << endl;
while(true) gearman_worker_work(&worker); // workerとして待機
return 0;
}
}
cerr << "server,portが必要です" << endl;
cerr << opts << endl;
return 1;
}
// opencvで画像サイズを取得
tuple<int, int> get_size(const string& fileName){
IplImage *img = cvLoadImage(fileName.c_str());
if(!img){
return make_tuple(-1, -1);
}
else{
int width = img->width;
int height = img->height;
cvReleaseImage(&img);
return make_tuple(width, height);
}
}
// 画像サイズを取得してgearman serverに返すJSON Objectを作る
map<string,any> imgsize(const string& fileName){
map<string,any> result_m;
int width, height;
tie(width, height) = get_size(fileName);
if(width > 0 && height > 0){
result_m["width"] = width;
result_m["height"] = height;
}
else{
result_m["error"] = string("image load error");
}
return result_m;
}
// gearman worker job
void *job_imgsize(gearman_job_st *job, void *cb_arg, size_t *result_size, gearman_return_t *ret_ptr){
string fileName = (char*)gearman_job_workload(job);
cout << fileName << endl;
string result_str = json_builder::toJson(imgsize(fileName));
cout << " => " << result_str << endl;
char *result = (char*)strdup(result_str.c_str());
*result_size = result_str.size();
*ret_ptr = GEARMAN_SUCCESS;
return result;
}
void on_exit_signal(int sig){
for(int i = 0; i < pids.size(); i++){
cout << str(format("kill (pid:%d)") % pids[i]) << endl;
if(kill(pids[i], SIGKILL) < 0){
cerr << str(format("kill failed (pid:%d)") % pids[i]) << endl;
}
}
exit(0);
}