Perlのforkで子プロセスの同時実行数を制限した並列処理

ゴールデンウィークとお遊び用CentOS7を手に入れたのでお遊びが進む君。
ということで、perlのforkで並列処理をやってみました。並列稼動する子プロセスの数を一定数に制限しながら指定数分の処理を行います。実際にも、1つだけや無制限に子プロを作るというようなことはなく、並列数を一定にして処理をさせることはよくあると思います。
例えば、100個のRSSフィードを取得したい場合、
・100個のフィードを10個づつ10個のフィード取得処理に分割
・子プロの最大同時並列数を5に限定
・1つの子プロで1つのフィード取得処理を実行
・親プロセスは、1つの子プロが終了したら新しい子プロ作って次のフィード取得処理用子プロを作成
・親プロセスは、全10フィード取得処理が完了するまで上記を繰り返し
終了判定の作り方次第で、複数サーバでの同時実行も比較的簡単にできると思います。

で、サンプルソース。Perlだと親プロセスと子プロセスの処理を同じソース内に書いてるのをよく見るのですがメンテナンスがやりにくくなるのと単体テストも難しくなるのでソースを分けています。

親プロセスのソース

fork_p.pl。子プロの制御と処理の終了判定をしています。
01:
02:
03:
04:
05:
06:
07:
08:
09:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
#!/usr/bin/perl
use strict;
use POSIX ":sys_wait_h";

my $TOTAL_PROC = 10;
my $MAX_CHILD = 3;

my %done = ();
my $running_proc = 0;
my $no = 1;

print "P:[$$] start parent\n";

sub run_child {
    my $no = shift;
    print "P:[$$] run_child $no\n";
    my $waittime = rand(3);
    exec("./fork_c.pl $no $waittime");
    exit();
}
my $running = 0;
while ($no <= $TOTAL_PROC) {
    print "P:[$$] no=$no running=$running done=" .keys(%done). "\n";
    my $can_run_cnt = $MAX_CHILD - $running;
    if ($can_run_cnt > $TOTAL_PROC - keys(%done)) {
	$can_run_cnt = $TOTAL_PROC - keys(%done);
    }

    for (my $i = 0; $i < $can_run_cnt; $i++) {
	my $pid = fork();
	print "P:[$$] fork pid=$pid\n";
	if ($pid) {
	    # parent
	    print "P:[$$] parent proc\n";
	    $running++;
	    $no++;

	} elsif ($pid == 0) {
	    # child
	    print "P:[$$] start child proc\n";
	    $done{$no} = $pid;
	    run_child($no);
	} else {
	    print "*** fork error\n";
	    exit();
	}
    }

    my $finished_child = waitpid(-1, 0);
    $running--;
    print "P:[$$] finished_child=$finished_child\n";
}

print "P:[$$] finished parent\n";

並列実行数の制御のためにwaitpidを利用していますが、実際には最後に書いているように処理が正常・失敗の何かを子プロで吐いて、親プロセスはそれをチェックさせたほうがいいかと思われます。waitpidでの子プロの状態判定は実はややこしいので。
また、waitpid(-1, 0)は、いずれかの子プロの終了を待ちますが、WNOHANGを指定して子プロの状態変化を待つことなく親プロセスの処理を続行させることもできます。

子プロセスのソース

fork_c.pl。実際に並列処理させたいロジックを定義します。サンプルは何もしないので親プロセスから渡された分だけsleepします。
01:
02:
03:
04:
05:
06:
07:
08:
#!/usr/bin/perl

my $no = $ARGV[0];
my $waittime = $ARGV[1];

print "C:[$$] start $no. sleep $waittime\n";
sleep($waittime);
print "C:[$$] finished $no.\n";

実行結果

[user01@pc02 perl]$ ./fork_p.pl 
P:[5061] start parent
P:[5061] no=1 running=0 done=0
P:[5061] fork pid=5062
P:[5061] parent proc
P:[5061] fork pid=5063
P:[5061] parent proc
P:[5061] fork pid=5064
P:[5061] parent proc
P:[5062] fork pid=0
P:[5062] start child proc
P:[5062] run_child 1
C:[5062] start 1. sleep 1.69095524981368
P:[5063] fork pid=0
P:[5063] start child proc
P:[5063] run_child 2
C:[5063] start 2. sleep 0.0821911322300473
C:[5063] finished 2.
P:[5061] finished_child=5063
P:[5061] no=4 running=2 done=0
P:[5061] fork pid=5065
P:[5061] parent proc
P:[5064] fork pid=0
P:[5064] start child proc
P:[5064] run_child 3
C:[5064] start 3. sleep 1.76854459753941
P:[5065] fork pid=0
P:[5065] start child proc
P:[5065] run_child 4
C:[5065] start 4. sleep 2.15744215830911
C:[5062] finished 1.
P:[5061] finished_child=5062
P:[5061] no=5 running=2 done=0
P:[5061] fork pid=5066
P:[5061] parent proc
P:[5066] fork pid=0
P:[5066] start child proc
P:[5066] run_child 5
C:[5066] start 5. sleep 2.0960794109733
C:[5064] finished 3.
P:[5061] finished_child=5064
P:[5061] no=6 running=2 done=0
P:[5061] fork pid=5067
P:[5061] parent proc
P:[5067] fork pid=0
P:[5067] start child proc
P:[5067] run_child 6
C:[5067] start 6. sleep 0.725568342243616
C:[5067] finished 6.
P:[5061] finished_child=5067
P:[5061] no=7 running=2 done=0
P:[5061] fork pid=5068
P:[5061] parent proc
P:[5068] fork pid=0
P:[5068] start child proc
P:[5068] run_child 7
C:[5068] start 7. sleep 0.0705134402842411
C:[5068] finished 7.
P:[5061] finished_child=5068
P:[5061] no=8 running=2 done=0
P:[5061] fork pid=5069
P:[5061] parent proc
P:[5069] fork pid=0
P:[5069] start child proc
P:[5069] run_child 8
C:[5069] start 8. sleep 0.558187886129428
C:[5069] finished 8.
P:[5061] finished_child=5069
P:[5061] no=9 running=2 done=0
P:[5061] fork pid=5070
P:[5061] parent proc
P:[5070] fork pid=0
P:[5070] start child proc
P:[5070] run_child 9
C:[5070] start 9. sleep 1.29167993807915
C:[5065] finished 4.
P:[5061] finished_child=5065
P:[5061] no=10 running=2 done=0
P:[5061] fork pid=5071
P:[5061] parent proc
P:[5071] fork pid=0
P:[5071] start child proc
P:[5071] run_child 10
C:[5071] start 10. sleep 0.447920087945942
C:[5071] finished 10.
P:[5061] finished_child=5071
P:[5061] finished parent
[user01@pc02 perl]$ C:[5070] finished 9.
C:[5066] finished 5.

見ての通り、最後の終了判定がイマイチなので子プロが生きているのに親プロが死んでいます。waitpidで子プロの終了を待ってますが、実際には子プロが終了時に、OK/NGの何かを返す/doneファイルを作る、などして親プロセスはそれをチェックするのがいいかと思われます。


PerでJSON その2(JSONとJSON::XS)

CentOS7をVMWareで入れる機会があったので、前回試したJSONもついでに動かしてみようということで試してみたところ動くには動いたが混乱中。Perl難しい。

perl-JSONのインストール

$ sudo yum install perl-Time-HiRes
$ sudo yum install perl-local-lib
$ sudo yum install perl-CPAN
$ sudo cpan JSON
これで一応JSONパーサが動くようにはなる。

Perlソースの修正

json1.plをコピーしてjson2.plを作成。変更点はperlへのpathとuse JSON::XS;部分。
01:
02:
03:
04:
05:
06:
07:
08:
09:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
#!/usr/bin/perl

use JSON;
use Data::Dumper;

my $json_str = '{"key_str":"abcdefg"
,"key_int":987
,"data":[101,102,103,201,202,203]}';

$json = decode_json($json_str);

print "JSON=" . Dumper($json);
print "[DUMP] " . encode_json($json) . "\n";

print "\nkey_str=" . $json->{'key_str'} . "\n";

print "\ndata cnt=" . @{$json->{'data'}} . "\n";
foreach my $val(@{$json->{'data'}}) {
	print "  val=$val \n";
}
print "\n[DUMP] " . encode_json($json) . "\n";

実行結果
[user01@localhost perl]$ ./json2.pl
JSON=$VAR1 = {
          'key_int' => 987,
          'data' => [
                      101,
                      102,
                      103,
                      201,
                      202,
                      203
                    ],
          'key_str' => 'abcdefg'
        };
[DUMP] {"key_int":987,"data":[101,102,103,201,202,203],"key_str":"abcdefg"}

key_str=abcdefg

data cnt=6
  val=101 
  val=102 
  val=103 
  val=201 
  val=202 
  val=203 

[DUMP] {"key_int":987,"data":["101","102","103","201","202","203"],"key_str":"abcdefg"}

実行結果は変わらないが、pathはともかく、パッケージの指定を変更しないといけないのが良くわからない。確かに、JSON自体は下記にインストールされている。
[user01@localhost JSON]$ cd /usr/local/share/perl5
[user01@localhost perl5]$ l
合計 80
drwxr-xr-x. 4 root root    54  5月  4 09:53 .
drwxr-xr-x. 6 root root    58  5月  4 09:53 ..
drwxr-xr-x. 3 root root    43  5月  4 09:53 JSON
-r--r--r--. 1 root root 70496 10月 31  2013 JSON.pm
と思ったらJSONとJSON::XSではUTF-8周りの挙動が違うようで日本語等を扱う場合に注意が必要な様子。
[Perl] JSON モジュールの utf8 フラグ周りの仕様 tips 注意点 Kawanet Blog II/ウェブリブログ
2008年の記事ということで、perl使いの方にとっては知ってて当然のことのようです・・・。

と、いうことでJSON::XSを入れなおして確認。

JSON::XSのインストール

JSON::XSを入れるにはCanary::Stabilityが必要なようなのでまずそれから。
$ sudo cpan Canary::Stability
$ sudo cpan JSON::XS
・・・
Files found in blib/arch: installing files in blib/lib into architecture dependent library tree
Installing /usr/local/lib64/perl5/auto/JSON/XS/XS.so
Installing /usr/local/lib64/perl5/auto/JSON/XS/XS.bs
Installing /usr/local/lib64/perl5/JSON/XS.pm
Installing /usr/local/lib64/perl5/JSON/XS/Boolean.pm
Installing /usr/local/share/man/man1/json_xs.1
Installing /usr/local/share/man/man3/JSON::XS::Boolean.3pm
Installing /usr/local/share/man/man3/JSON::XS.3pm
Installing /usr/local/bin/json_xs
Appending installation info to /usr/lib64/perl5/perllocal.pod
  MLEHMANN/JSON-XS-3.02.tar.gz
  /bin/make install  -- OK
とりあえず入った模様。
[user01@localhost ~]$ l /usr/local/lib64/perl5/JSON
合計 68
drwxr-xr-x. 3 root root    27  5月  5 09:45 .
drwxr-xr-x. 5 root root    41  5月  5 09:45 ..
drwxr-xr-x. 2 root root    23  5月  5 09:45 XS
-r--r--r--. 1 root root 67277  2月 27 06:46 XS.pm
無事、以前のJSON::XSバージョンのjson1.plも動いたということでめでたしめでたし(?)。
と、いうことでJSONを扱う場合は、EncodeをJSON::XSを併用するのがいいってことかな。
(まだ良くわかってない)


PerでJSON (JSON::XS)

PerlでJSON文字列へアクセス! そう、それだけ。

今回使ったのは、CPAN JSON::XS
JSONの配列へのアクセスにやや悩みましたが、Object部分を@{}で配列へcast?っぽくすると通常の配列のようにアクセスできました。
通常通り、$json->{'data'}[0]、$json->{'data'}[1]でもアクセス可能。

json1.pl
01:
02:
03:
04:
05:
06:
07:
08:
09:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
#!/usr/local/bin/perl -w

use JSON::XS;
use Data::Dumper;

my $json_str = '{"key_str":"abcdefg"
,"key_int":987
,"data":[101,102,103,201,202,203]}';

$json = decode_json($json_str);

print "JSON=" . Dumper($json);
print "[DUMP] " . encode_json($json) . "\n";

print "\nkey_str=" . $json->{'key_str'} . "\n";

print "\ndata cnt=" . @{$json->{'data'}} . "\n";
foreach my $val(@{$json->{'data'}}) {
        print "  val=$val \n";
}
print "\n[DUMP] " . encode_json($json) . "\n";

実行結果
$ ./json1.pl
JSON=$VAR1 = {
          'key_int' => 987,
          'data' => [
                      101,
                      102,
                      103,
                      201,
                      202,
                      203
                    ],
          'key_str' => 'abcdefg'
        };
[DUMP] {"key_int":987,"data":[101,102,103,201,202,203],"key_str":"abcdefg"}

key_str=abcdefg

data cnt=6
  val=101
  val=102
  val=103
  val=201
  val=202
  val=203


[DUMP] {"key_int":987,"data":["101","102","103","201","202","203"],"key_str":"abcdefg"}

実行してみて分かったのですが、DUMPの結果がJSONのkey/valueにアクセスする前とした後で型が変わっています。
参照前:"data":[101,102,103,201,202,203]
参照後:"data":["101","102","103","201","202","203"]

参照してさらに何かしらのJSONに変換して出力するようなプログラムの場合、Perlであれば問題ないかもしれませんが、他言語のパーサーで型判定までやってオブジェクト型で返すようなものがある場合問題があるかもしれません。
何かオプションがあったかな。


Perlでgzipファイルの入出力(Compress::Zlibの利用)

Perlのgzipファイル入出力のサンプル。
gzipとzipはちょっと異なっていますが、今回はアーカイバでない方のgzipを。
実際にはtarと併用して、「tar -zcvf」でdir丸ごと圧縮、「tar -zxvf」で展開(-Cで展開先を指定)を使うことが多いですが。

そんなこんなで今回は、Compress::Zlib (CPAN)FileHandle (CPAN) の2つのCPANライブラリを利用。
使えるものは何でも使っていきましょう。

と、いうことでサンプルの作成。

gzip.pl
01:
02:
03:
04:
05:
06:
07:
08:
09:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
#!/usr/local/bin/perl

use Compress::Zlib;
use FileHandle;

my $prm_in_file = $ARGV[0];
my $prm_ot_file = $ARGV[1];

my $in_gz = gzopen($prm_in_file, 'rb')
    or die "*** file open error. file='$prm_in_file' $!";
my $ot_gz = gzopen($prm_ot_file, 'wb')
    or die "*** file open error. file='$prm_ot_file' $!";

$cnt = 1;
$ot_gz->gzwrite("$cnt:first line\n");

while(($read_size = $in_gz->gzreadline($in_buf)) > 0) {
    die($in_gz->gzerror) if ($read_size < 0);
    chomp($in_buf);

    $cnt++;
    $ot_gz->gzwrite("$cnt:$in_buf\n");
}

$cnt++;
$ot_gz->gzwrite("$cnt:end line\n");

$in_gz->gzclose();
$ot_gz->gzclose();

in.csv
$ file gzip_in.txt.gz
gzip_in.txt.gz: gzip compressed data, was "gzip_in.txt", from Unix, last modified: Tue Mar 31 07:46:37 2015
$ zcat gzip_in.txt.gz
aaa
bbb
ccc
gzip_in.txt.gzを入力して、行番号、ヘッダーレコード、トレーラーレコードを追加してgzip_ot.txt.gzファイルに出力。

実行結果
$ ./gzip.pl gzip_in.txt.gz gzip_ot.txt.gz
$ file gzip_ot.txt.gz
gzip_ot.txt.gz: gzip compressed data, from Unix, last modified: Tue Mar 31 07:59:26 2015
$ zcat gzip_ot.txt.gz
1:first line
2:aaa
3:bbb
4:ccc
5:end line

また、FileHandleを使った例は下記に。

[続きを見る]

PerlでCSVファイルの入出力(Text::CSV_XSの利用)

PerlのCSV入出力のサンプル。
split(/,/)や、単純に,で連結してる例もありますが、実はややこしいCSV。
バイナリデータ、ダブルクォートのエスケープ("は""とする)、カンマ(,)を含むフィールドの取り扱い(,を含む場合はフィールドを"で囲む)など自作するとハマることが多々あるので「Text::CSV_XS」を使うのが簡単で正確です。
Text::CSV_XS (CPAN)

と、いうことでサンプルの作成。

csv.pl
01:
02:
03:
04:
05:
06:
07:
08:
09:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
#!/usr/local/bin/perl

use FileHandle;
use Data::Dumper;
use Text::CSV_XS;

my $prm_in_file = $ARGV[0];
my $prm_ot_file = $ARGV[1];

open ($fp_in, $prm_in_file) or die "$!";
open ($fp_ot, "> $prm_ot_file") or die "$!";

my $csv = Text::CSV_XS->new({
  'quote_char'   => '"',
  'escape_char'  => '"',
  'always_quote' => 1,
  'binary'       => 1,
});

my $line_cnt = 0;
while(<$fp_in>) {
    $line_cnt++;

    $status = $csv->parse($_);
    my @cols_in = $csv->fields();

    print "\n---- $line_cnt: IN\n";
    print Dumper @cols_in;

    # add line No.
    @cols_ot = ($line_cnt, @cols_in);
    print "---- $line_cnt: OT\n";
    print Dumper @cols_ot;
    $csv->combine(@cols_ot);
    print $fp_ot $csv->string()."\n";

}

close($fp_in);
close($fp_ot);

in.csv
"ABC","123","check,comma","check""w-quote","ctrl^Khat-K"
"ABC","123","check,comma","check""w-quote","ctrl^C^D^Ehat-CDE"
^K(垂直タブ)、^C、^D、^E はコントロールコードです。catすると下記になります。
$ less in.csv
"ABC","123","check,comma","check""w-quote","ctrl^Khat-K"
"ABC","123","check,comma","check""w-quote","ctrl^C^D^Ehat-CDE"

$ cat in.csv
"ABC","123","check,comma","check""w-quote","ctrl
                                                hat-K"
"ABC","123","check,comma","check""w-quote","ctrlhat-CDE"

実行結果
$ ./csv.pl in.csv ot.csv

---- 1: IN
$VAR1 = 'ABC';
$VAR2 = '123';
$VAR3 = 'check,comma';
$VAR4 = 'check"w-quote';
$VAR5 = 'ctrl
             hat-K';
---- 1: OT
$VAR1 = 1;
$VAR2 = 'ABC';
$VAR3 = '123';
$VAR4 = 'check,comma';
$VAR5 = 'check"w-quote';
$VAR6 = 'ctrl
             hat-K';

---- 2: IN
$VAR1 = 'ABC';
$VAR2 = '123';
$VAR3 = 'check,comma';
$VAR4 = 'check"w-quote';
$VAR5 = 'ctrlhat-CDE';
---- 2: OT
$VAR1 = 2;
$VAR2 = 'ABC';
$VAR3 = '123';
$VAR4 = 'check,comma';
$VAR5 = 'check"w-quote';
$VAR6 = 'ctrlhat-CDE';

$ less ot.csv
"1","ABC","123","check,comma","check""w-quote","ctrl^Khat-K"
"2","ABC","123","check,comma","check""w-quote","ctrl^C^D^Ehat-CDE"

$ cat ot.csv
"1","ABC","123","check,comma","check""w-quote","ctrl
                                                    hat-K"
"2","ABC","123","check,comma","check""w-quote","ctrlhat-CDE"

やってることは、CSVファイル(in.csv)を入力して、行番号を追加したCSVファイル(ot.csv)を出力してるだけです。
バイナリ(コントロールコード)、"および,のエスケープも問題なく行われています。
CSVファイルを扱うコツとしては、各フィールドはかならずダブルクォート(")で囲むようにすることです。
ちなみに、Data::Dumperはオブジェクトをダンプして表示するのに便利です。

カテゴリ

Amazon

アクセスランキング

[ジャンルランキング]
コンピュータ
249位
アクセスランキングを見る>>

[サブジャンルランキング]
プログラミング
38位
アクセスランキングを見る>>

RSSリンクの表示

ブロとも申請フォーム

Copyright © nopgm