5 июня 2014 г., 19:47 пользователь Харпалёв Иван <[email protected]> написал: > Спасибо! > На Go выглядит заманчиво, хотя и совершенно не понятно, как происходит > распределение входа между воркерами. > > Классный пример с ForkEngine. > Вот только в доке IO::Pipe не описаны функции autoflush и blocking. Зачем > они вызываются в ForkEngine?? и зачем делается binmode на дескриптор?
IO::Pipe наследник IO::Handle (видно в его коде, но не очевидно), там они и описаны https://metacpan.org/pod/IO::Handle blocking - в POSIX системах есть неблокирующий ввод вывод и блокирующий, тут обязательно блокирующий autoflush - чтобы не было буферизации binmode пожалуй, просто на всякий случай. > > Передача через Pipe -- подходящее решение (прогон через paip почти не > замедляет построчное копирование из файла в файл на Perl). > > А как же быть c получением из воркеров?? Воркеры - отдельный процессы, соотв. AE там не нужен. Они просто читают из пайпов и пишут в пайпы, функциями read, print, единственное докция perl просит не смешивать буферизированный ввод вывод и IO::Select (хотя так делают и часто багов нет, но иногда есть), так что я использую sysread, syswrite. Но если очень внимательно посмотреть документацию к sysread/syswrite видно что они могут вернуть ошибку EINTR даже если всё нормально и можно работать дальше, поэтому над ними нужен враппер типа такого https://github.com/vsespb/mt-aws-glacier/blob/421f9d04b96a4657d89eb7ef2bc66aee15b8cec3/lib/App/MtAws/Utils.pm#L269 https://github.com/vsespb/mt-aws-glacier/blob/421f9d04b96a4657d89eb7ef2bc66aee15b8cec3/lib/App/MtAws/Utils.pm#L296 вообще если хочется самому это всё делать нужно хорошо знать API вызовы unix > как-то так? > open my $input, "<", $in_file or die "Can not open file for read"; > open my $output, ">", $out_file or die "Can not open file for write"; > for (@workers) { > ⇥ my $wait_for_input = AnyEvent->io ( > ⇥ ⇥ fh => $_->{fromchild}, > ⇥ ⇥ poll => 'r', > ⇥ ⇥ cb => sub { > ⇥ ⇥ ⇥ say $output readline ($_->{fromchild}); > ⇥ ⇥ ⇥ $_->{c}--; #счётчик очереди в worker'е > ⇥ ⇥ } > ⇥ ) > } > my $number=0; > while (<$input>) { > ⇥ while(1){ > ⇥ ⇥ $number = ++$number % $#workers; > ⇥ ⇥ my $worker = $workers[$number]; > ⇥ ⇥ if ($worker->{c} < 10) { > ⇥ ⇥ ⇥ my $to = $worker->{tochild}; > ⇥ ⇥ ⇥ say $to $_; > ⇥ ⇥ ⇥ $worker->{c}++; > ⇥ ⇥ } > ⇥ } > } > > Gearman -- штука крутая, но всё-таки хочется на Perl. > А как решить на Coro по-прежнему непонятно( > Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и > следовательно должно быть возможно изменять оду и ту же переменную из разных > потоков... Возможно ли вообще такое в Perl. Хотя пайпы вроде норм решение > для передачи и главная проблема в неблокирующем ожидании. > > Спасибо! > > > > 5 июня 2014 г., 17:16 пользователь Eugene Toropov <[email protected]> > написал: > >> Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у товарищей >> :) >> >> On Jun 5, 2014, at 5:14 PM, Alexander Lourier <[email protected]> wrote: >> >> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если >> учесть, что каждая горутина может выполняться на своём CPU и никаких GIL, то >> становится ням-ням как вкусно. >> >> >> 5 июня 2014 г., 15:10 пользователь Eugene Toropov >> <[email protected]> написал: >>> >>> Интересно, это уже готовое было или запилил за полчаса? >>> >>> Евгений >>> >>> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <[email protected]> wrote: >>> >>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я его >>> обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть очень >>> компактно и работать производительно. >>> >>> >>> package main >>> >>> import ( >>> "fmt" >>> "math/rand" >>> "time" >>> ) >>> >>> const ( >>> numWorkers = 10 >>> ) >>> >>> // task - это задание для воркера. >>> type task struct { >>> value int >>> output chan result >>> } >>> >>> // result - это результат обработки задания воркером. >>> type result struct { >>> value int >>> worker int >>> } >>> >>> // worker берёт данные из канала input, обрабатывает их (умножает на 100) >>> и кладёт в канал ответа, >>> // который прислан вместе с заданием. >>> func worker(workerNumber int, input chan task) { >>> // Пока входной канал не закроют, читаем из него задание. >>> for task := range input { >>> // Работаем в поте лица. >>> time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) >>> task.output <- result{task.value * 100, workerNumber} >>> } >>> } >>> >>> // prepareInput готовит входные задания и кладёт их в два канала: в одну >>> очередь >>> // задания для воркеров, в другую - каналы ответа. >>> func prepareInput(input chan task, output chan chan result) { >>> for i := 0; i < 100; i++ { >>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ >>> считают, >>> // а сразу брался за следующее задание. >>> outputChan := make(chan result, 1) >>> // Тот факт, что задания кладутся в input и output в одном и том же >>> порядке, >>> // гарантирует, что ответы будут упорядочены в том же порядке. >>> input <- task{i, outputChan} >>> output <- outputChan >>> } >>> close(input) >>> close(output) >>> } >>> >>> func main() { >>> // Каналы обязательно буферизованные (длина буфера = числу воркеров). >>> input := make(chan task, numWorkers) >>> output := make(chan chan result, numWorkers) >>> >>> // Запускаем готовилку входных данных. >>> go prepareInput(input, output) >>> >>> // Запускаем воркеры. >>> for i := 0; i < numWorkers; i++ { >>> go worker(i, input) >>> } >>> >>> // Читаем ответы в порядке, в каком нам нужно. >>> for res := range output { >>> fmt.Printf("%+v\n", <-res) >>> } >>> } >>> >>> >>> >>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван >>> <[email protected]> написал: >>>> >>>> Добрый день, могучий MoscowPM >>>> >>>> Опять про параллельную обработку. >>>> >>>> Хочется написать вот такую схему обработки ввода: >>>> master создаёт work'ов, >>>> читает порции из файла, раздаёт порции worker'ам >>>> ждёт, пока worker обработает, получает ответ worker'a >>>> пишет результат в файл. >>>> Так же мастер буфереизует вывод, чтобы выход писался в правильном >>>> порядке. >>>> >>>> Самое туманное: >>>> Как передавать данные от мастера к worker'у и Обратно?!!!! >>>> Как ждать готовности?!!! >>>> Должна ли такая схема (работа с диском из одного места) дать ускорение >>>> по сравнению с чтением/записью файла в каждом worker'е? >>>> >>>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму: >>>> как сделать разделяемую память, (как быстро передавать данные между >>>> мастером и worker'ом внутри Perl)? >>>> как сделать неблокирующее ожидание готовности одного из worker'ов (при >>>> котором можно заниматься вводом-выводом)? >>>> >>>> Подскажите, на чём и как такое писать!! >>>> Спасибо! >>>> >>>> Уважение >>>> Иван Харпалев >>>> >>>> >>>> >>>> -- >>>> Moscow.pm mailing list >>>> [email protected] | http://moscow.pm.org >>>> >>> >>> -- >>> Moscow.pm mailing list >>> [email protected] | http://moscow.pm.org >>> >>> >>> >>> -- >>> Moscow.pm mailing list >>> [email protected] | http://moscow.pm.org >>> >> >> -- >> Moscow.pm mailing list >> [email protected] | http://moscow.pm.org >> >> >> >> -- >> Moscow.pm mailing list >> [email protected] | http://moscow.pm.org >> > > > -- > Moscow.pm mailing list > [email protected] | http://moscow.pm.org > -- Moscow.pm mailing list [email protected] | http://moscow.pm.org
