Нахрена вам разделяемая память и костыли с трубами?
Куча async. В конце всем ->join. Если надо по частям передавать данные то Coro::Channel к каждому On 5 Jun 2014, at 19:47, Харпалёв Иван <[email protected]> wrote: > Спасибо! > На Go выглядит заманчиво, хотя и совершенно не понятно, как происходит > распределение входа между воркерами. > > Классный пример с ForkEngine. > Вот только в доке IO::Pipe не описаны функции autoflush и blocking. Зачем > они вызываются в ForkEngine?? и зачем делается binmode на дескриптор? > > Передача через Pipe -- подходящее решение (прогон через paip почти не > замедляет построчное копирование из файла в файл на Perl). > > А как же быть c получением из воркеров?? > как-то так? > open my $input, "<", $in_file or die "Can not open file for read"; > open my $output, ">", $out_file or die "Can not open file for write"; > for (@workers) { > ⇥ my $wait_for_input = AnyEvent->io ( > ⇥ ⇥ fh => $_->{fromchild}, > ⇥ ⇥ poll => 'r', > ⇥ ⇥ cb => sub { > ⇥ ⇥ ⇥ say $output readline ($_->{fromchild}); > ⇥ ⇥ ⇥ $_->{c}--; #счётчик очереди в worker'е > ⇥ ⇥ } > ⇥ ) > } > my $number=0; > while (<$input>) { > ⇥ while(1){ > ⇥ ⇥ $number = ++$number % $#workers; > ⇥ ⇥ my $worker = $workers[$number]; > ⇥ ⇥ if ($worker->{c} < 10) { > ⇥ ⇥ ⇥ my $to = $worker->{tochild}; > ⇥ ⇥ ⇥ say $to $_; > ⇥ ⇥ ⇥ $worker->{c}++; > ⇥ ⇥ } > ⇥ } > } > > Gearman -- штука крутая, но всё-таки хочется на Perl. > А как решить на Coro по-прежнему непонятно( > Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и > следовательно должно быть возможно изменять оду и ту же переменную из разных > потоков... Возможно ли вообще такое в Perl. Хотя пайпы вроде норм решение > для передачи и главная проблема в неблокирующем ожидании. > > Спасибо! > > > > 5 июня 2014 г., 17:16 пользователь Eugene Toropov <[email protected]> > написал: > Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у товарищей :) > > On Jun 5, 2014, at 5:14 PM, Alexander Lourier <[email protected]> wrote: > >> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если >> учесть, что каждая горутина может выполняться на своём CPU и никаких GIL, то >> становится ням-ням как вкусно. >> >> >> 5 июня 2014 г., 15:10 пользователь Eugene Toropov <[email protected]> >> написал: >> Интересно, это уже готовое было или запилил за полчаса? >> >> Евгений >> >> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <[email protected]> wrote: >> >>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я его >>> обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть >>> очень компактно и работать производительно. >>> >>> package >>> main >>> >>> >>> import >>> ( >>> >>> "fmt" >>> >>> >>> "math/rand" >>> >>> >>> "time" >>> >>> ) >>> >>> >>> const >>> ( >>> numWorkers = >>> 10 >>> >>> ) >>> >>> >>> // task - это задание для воркера. >>> type task struct >>> { >>> value >>> int >>> >>> output >>> chan >>> result >>> } >>> >>> >>> // result - это результат обработки задания воркером. >>> type result struct >>> { >>> value >>> int >>> >>> worker >>> int >>> >>> } >>> >>> >>> // worker берёт данные из канала input, обрабатывает их (умножает на 100) и >>> кладёт в канал ответа, >>> // который прислан вместе с заданием. >>> func worker(workerNumber int, input chan >>> task) { >>> >>> // Пока входной канал не закроют, читаем из него задание. >>> >>> >>> for task := range >>> input { >>> >>> // Работаем в поте лица. >>> >>> time.Sleep(time.Duration(rand.Intn( >>> 100 >>> )) * time.Millisecond) >>> task.output <- result{task.value * >>> 100 >>> , workerNumber} >>> } >>> } >>> >>> >>> // prepareInput готовит входные задания и кладёт их в два канала: в одну >>> очередь >>> // задания для воркеров, в другую - каналы ответа. >>> func prepareInput(input chan task, output chan chan >>> result) { >>> >>> for i := 0; i < 100 >>> ; i++ { >>> >>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ >>> считают, >>> >>> >>> // а сразу брался за следующее задание. >>> >>> outputChan := >>> make(chan result, 1 >>> ) >>> >>> // Тот факт, что задания кладутся в input и output в одном и том же порядке, >>> >>> >>> // гарантирует, что ответы будут упорядочены в том же порядке. >>> >>> input <- task{i, outputChan} >>> output <- outputChan >>> } >>> >>> close >>> (input) >>> >>> close >>> (output) >>> } >>> >>> >>> func >>> main() { >>> >>> // Каналы обязательно буферизованные (длина буфера = числу воркеров). >>> >>> input := >>> make(chan >>> task, numWorkers) >>> output := >>> make(chan chan >>> result, numWorkers) >>> >>> >>> // Запускаем готовилку входных данных. >>> >>> >>> go >>> prepareInput(input, output) >>> >>> >>> // Запускаем воркеры. >>> >>> >>> for i := 0 >>> ; i < numWorkers; i++ { >>> >>> go >>> worker(i, input) >>> } >>> >>> >>> // Читаем ответы в порядке, в каком нам нужно. >>> >>> >>> for res := range >>> output { >>> fmt.Printf( >>> "%+v\n" >>> , <-res) >>> } >>> } >>> >>> >>> >>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван <[email protected]> >>> написал: >>> Добрый день, могучий MoscowPM >>> >>> Опять про параллельную обработку. >>> >>> Хочется написать вот такую схему обработки ввода: >>> master создаёт work'ов, >>> читает порции из файла, раздаёт порции worker'ам >>> ждёт, пока worker обработает, получает ответ worker'a >>> пишет результат в файл. >>> Так же мастер буфереизует вывод, чтобы выход писался в правильном порядке. >>> >>> Самое туманное: >>> Как передавать данные от мастера к worker'у и Обратно?!!!! >>> Как ждать готовности?!!! >>> Должна ли такая схема (работа с диском из одного места) дать ускорение по >>> сравнению с чтением/записью файла в каждом worker'е? >>> >>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму: >>> как сделать разделяемую память, (как быстро передавать данные между >>> мастером и worker'ом внутри Perl)? >>> как сделать неблокирующее ожидание готовности одного из worker'ов (при >>> котором можно заниматься вводом-выводом)? >>> >>> Подскажите, на чём и как такое писать!! >>> Спасибо! >>> >>> Уважение >>> Иван Харпалев >>> -- Moscow.pm mailing list [email protected] | http://moscow.pm.org
