上一篇日志说过了,内置的 Thread 模块无法解决线程排队、变量共享的问题,但是在使用多线程的过程中,这两个又是比较常见的需求。线程数无法控制,处理速度和内存消耗两者无法平衡;变量无法共享,无法让线程向同一个数据结构传递返回值。但是 Perl 有庞大的第三方库,CPAN 上有一个备受匿名和尚们推崇的模块能够很好地解决这些问题,这个模块就是传说中的 Parallel::ForkManager 。严格意义上说这个模块是通过 Fork 进程而不是创建线程来实现并行处理的,不过在实际应用过程中,这两者其实没什么区别。
Parallel::ForkManager 代码也只有寥寥的200多行,内容并不艰深。
对于线程排队,ForkManager 只是在 fork 进程的方法里添加了一个判断,如果当前子进程数量>=事先定义的数量,就调用等待方法。
{max_proc}) {
$s->on_wait;
$s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
};
对于变量共享, 它利用 Storable 模块 将子线程的数据结构序列化之后转换成文本,然后主线程再反其道行之将文本读取到内存,这样就绕开了复杂的 variable lock,当然,因为涉及到磁盘IO,性能也会打折扣。这个实现方式也有很大的局限,严格说来并不算是所谓的变量共享,我在最后会讨论这个问题。
{in_child} ) {
if (defined($r)) { # store the child's data structure
my $storable_tempfile = File::Spec->catfile($s->{tempdir}, 'Parallel-ForkManager-' . $s->{parent_pid} . '-' . $$ . '.txt');
my $stored = eval { return &store($r, $storable_tempfile); };
# handle Storables errors, IE logcarp or carp returning undef, or die (via logcroak or croak)
if (not $stored or $@) {
warn(qq|The storable module was unable to store the child's data structure to the temp file "$storable_tempfile": | . join(', ', $@));
}
}
CORE::exit($x || 0);
}
下面是一个简单的示例:
use strict;
use warnings;
use Parallel::ForkManager;
# 指定进程峰值为100
my $pm = Parallel::ForkManager->new(100);
# 所有子进程返回的数据结构都会传回到这个hash里
my %persons;
# 进程结束时运行的callback,将子进程返回的数据结构传入%persons,
# 这是“共享数据结构”的关键
$pm -> run_on_finish( sub {
my $person_info = pop @_;
while ( my ($key,$value) = each %$person_info ) {
$persons{$key} = $value;
}
});
for my $email ( @emails ) {
# 创建进程
$pm->start and next;
# 实际需要运行的代码
my $data = get_person_info($email);
# 结束进程并返回$data。
# $data是一个array ref,其中有进程id,进程退出码等信息,
# 其中最后一个item是子进程返回的数据结构。
$pm->finish(0, $data);
}
$pm->wait_all_children;
# 伪sub
sub get_person_info {
my $email = shift @_;
# bla bla bla
return \%person_info;
}
上面代码的关键在于在创建PM对象的时候设定进程最大值,以及设定 run_on_finish
的 callback 。之前提到利用 run_on_finish
有一定局限,如果我希望在子线程的代码中加入条件判断,将不同类别的数据输出到两个不同的共享hash就不怎么好处理了。此外,如果子进程需要实时从共享hash 中读取数据,ForkManager 是无法做到的。如果不想自己写代码,可以利用另外一个CPAN模块,下一篇再说。