Параллельная обработка данных в Perl

23 апреля 2010

Решил выложить небольшую заготовку для скрипта, обрабатывающего данные с помощью нескольких дочерних процессов. Мне уже приходилось пару раз ее использовать, может и вам пригодится.

Идея следующая. Есть много дочерних процессов, предназначенных непосредственно для обработки данных, и родительский процесс, который собственно создает дочерние, если нужно — синхронизирует их, и так далее.

Для начала — код родителя, parent.pl:

#!/usr/bin/perl
# parent.pl - (c) 2010 Alexandr A Alexeev, http://eax.me/

use Socket;
use IO::Select;
use IO::Handle;
use strict;

my $select = IO::Select->new();

# команды для запуска дочерних процессов
my @commands; # ("ls -la /", "pkg_info | grep p5-");

print "List of commands:\n\n";

# тут заполняешь массив @commands
# while(1) {
#  my $cmd = "./child.pl arg1 arg2 arg3";
#  print "$cmd\n";
#  push @commands, $cmd;
# }

print "\nstarting in 10 seconds...\n\n";
sleep 10;

for my $cmd (@commands) {
 print "starting '$cmd'...\n";
 my ($hchild, $hparent, $childid);
 socketpair($hchild, $hparent, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
   or die "socketpair: $!";

 $childid = fork;
 die "cannot fork" if($childid == -1);

 # перенаправляем ввод/вывод потомка в сокет
 unless($childid) {
  # потомок
  open STDIN, "<&", $hparent;
  open STDOUT, ">&", $hparent;
  open STDERR, ">&", $hparent;
  close $hparent;
  close $hchild;
  # унаследованные хэгдлы следует закрыть
  $select->remove($_) and close $_ for($select->handles);
  exec $cmd;
 }

 close $hparent;
 $select->add($hchild);
}

print "All done, now reading...\n";

while(my @ready = $select->can_read){
 # кто-то из потомков завершился или что-то написал в stdout
 for my $read(@ready) {
  if($read->eof || $read->error) {
    # потомок завершил работу
    $select->remove($read);
    close $read;
    next;
  }
  if(my $str = <$read>) {
    print $str;
    # обрабатываем строку $str
  }
 }
}

В массив @commands помещаются те команды, которые мы хотим выполнить. Скорее всего, это будет child.pl с различными аргументами (имя входного файла, к примеру).

Затем для каждой команды скрипт создает пару сокетов, форкатеся, перенаправляет ввод/вывод потомка в один из сокетов и делает exec(). Считаем, что все необходимые данные дочерние скрипты будут выводить в STDOUT, откуда родитель сможет считать их через сокет.

Вот заготовка для child.pl:

#!/usr/bin/perl

use strict;

$| = 1; # <-- архиважная строчка, отключающая буферизацию вывода

# здесь тупо парсим аргументы и выводим все, что надо в stdout

Как видите, вся заготовка фактически заключается в единственной строке под номером пять. Присвоив переменной с именем «вертикальная черта» значение 1, мы отключаем буферизацию вывода дочернего скрипта — все данные будут передаваться родителю безо всяких задержек.

Кстати, интересная особенность ОС семейства UNIX — не существует приема (патчинг бинарных файлов и прочие извращения не предлагать), позволяющего произвести описанные действия для произвольной программы. Необходимо, чтобы возможность отключения кэширования была предусмотрена в исходном коде программы. Например, некоторые утилиты имеют для этого специальный ключ.

Ну вот пожалуй и все, что я хотел сказать в этой заметке. В качестве дополнительных источников информации рекомендую мою серию уроков по Perl (разумеется!), а также «man IO::Select» и книгу «UNIX — профессиональное программирование» Ричарда Стивенса. Несмотря на свой объем, читается книга на одном дыхании. Не так давно она была переиздана на русском языке.

Дополнение: В принципе можно использовать и threads::shared, но я бы не советовал.

Дополнение: См существенно доработанную версию скрипта в пункте 2 заметки Как написать своего паука.

Метки: , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.