>Ну а если воркеры вычисления делают, т.е. CPU потребляют, будет работать?
Отвечу за Антона - будет, медленней точно не станет. Оборачиваем "воркер" в async - в цикле эти асинки складываем в массив, Делаем join и радуемся массиву значений. Создавать под это процессы это перебор. 5 июня 2014 г., 22:24 пользователь Victor Efimov <[email protected]> написал: > 5 июня 2014 г., 22:12 пользователь Antonio Nikishaev <[email protected]> написал: > > Нахрена вам разделяемая память и костыли с трубами? > > > > Куча async. В конце всем ->join. Если надо по частям передавать данные > то Coro::Channel к каждому > > > > Ну а если воркеры вычисления делают, т.е. CPU потребляют, будет работать? > > > > > > > On 5 Jun 2014, at 19:47, Харпалёв Иван <[email protected]> wrote: > > > >> Спасибо! > >> На Go выглядит заманчиво, хотя и совершенно не понятно, как происходит > распределение входа между воркерами. > >> > >> Классный пример с ForkEngine. > >> Вот только в доке IO::Pipe не описаны функции autoflush и blocking. > Зачем они вызываются в ForkEngine?? и зачем делается binmode на дескриптор? > >> > >> Передача через Pipe -- подходящее решение (прогон через paip почти не > замедляет построчное копирование из файла в файл на Perl). > >> > >> А как же быть c получением из воркеров?? > >> как-то так? > >> open my $input, "<", $in_file or die "Can not open file for read"; > >> open my $output, ">", $out_file or die "Can not open file for write"; > >> for (@workers) { > >> ⇥ my $wait_for_input = AnyEvent->io ( > >> ⇥ ⇥ fh => $_->{fromchild}, > >> ⇥ ⇥ poll => 'r', > >> ⇥ ⇥ cb => sub { > >> ⇥ ⇥ ⇥ say $output readline ($_->{fromchild}); > >> ⇥ ⇥ ⇥ $_->{c}--; #счётчик очереди в worker'е > >> ⇥ ⇥ } > >> ⇥ ) > >> } > >> my $number=0; > >> while (<$input>) { > >> ⇥ while(1){ > >> ⇥ ⇥ $number = ++$number % $#workers; > >> ⇥ ⇥ my $worker = $workers[$number]; > >> ⇥ ⇥ if ($worker->{c} < 10) { > >> ⇥ ⇥ ⇥ my $to = $worker->{tochild}; > >> ⇥ ⇥ ⇥ say $to $_; > >> ⇥ ⇥ ⇥ $worker->{c}++; > >> ⇥ ⇥ } > >> ⇥ } > >> } > >> > >> Gearman -- штука крутая, но всё-таки хочется на Perl. > >> А как решить на Coro по-прежнему непонятно( > >> Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и > следовательно должно быть возможно изменять оду и ту же переменную из > разных потоков... Возможно ли вообще такое в Perl. Хотя пайпы вроде норм > решение для передачи и главная проблема в неблокирующем ожидании. > >> > >> Спасибо! > >> > >> > >> > >> 5 июня 2014 г., 17:16 пользователь Eugene Toropov < > [email protected]> написал: > >> Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у > товарищей :) > >> > >> On Jun 5, 2014, at 5:14 PM, Alexander Lourier <[email protected]> wrote: > >> > >>> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если > учесть, что каждая горутина может выполняться на своём CPU и никаких GIL, > то становится ням-ням как вкусно. > >>> > >>> > >>> 5 июня 2014 г., 15:10 пользователь Eugene Toropov < > [email protected]> написал: > >>> Интересно, это уже готовое было или запилил за полчаса? > >>> > >>> Евгений > >>> > >>> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <[email protected]> wrote: > >>> > >>>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я > его обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть > очень компактно и работать производительно. > >>>> > >>>> package > >>>> main > >>>> > >>>> > >>>> import > >>>> ( > >>>> > >>>> "fmt" > >>>> > >>>> > >>>> "math/rand" > >>>> > >>>> > >>>> "time" > >>>> > >>>> ) > >>>> > >>>> > >>>> const > >>>> ( > >>>> numWorkers = > >>>> 10 > >>>> > >>>> ) > >>>> > >>>> > >>>> // task - это задание для воркера. > >>>> type task struct > >>>> { > >>>> value > >>>> int > >>>> > >>>> output > >>>> chan > >>>> result > >>>> } > >>>> > >>>> > >>>> // result - это результат обработки задания воркером. > >>>> type result struct > >>>> { > >>>> value > >>>> int > >>>> > >>>> worker > >>>> int > >>>> > >>>> } > >>>> > >>>> > >>>> // worker берёт данные из канала input, обрабатывает их (умножает на > 100) и кладёт в канал ответа, > >>>> // который прислан вместе с заданием. > >>>> func worker(workerNumber int, input chan > >>>> task) { > >>>> > >>>> // Пока входной канал не закроют, читаем из него задание. > >>>> > >>>> > >>>> for task := range > >>>> input { > >>>> > >>>> // Работаем в поте лица. > >>>> > >>>> time.Sleep(time.Duration(rand.Intn( > >>>> 100 > >>>> )) * time.Millisecond) > >>>> task.output <- result{task.value * > >>>> 100 > >>>> , workerNumber} > >>>> } > >>>> } > >>>> > >>>> > >>>> // prepareInput готовит входные задания и кладёт их в два канала: в > одну очередь > >>>> // задания для воркеров, в другую - каналы ответа. > >>>> func prepareInput(input chan task, output chan chan > >>>> result) { > >>>> > >>>> for i := 0; i < 100 > >>>> ; i++ { > >>>> > >>>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ > считают, > >>>> > >>>> > >>>> // а сразу брался за следующее задание. > >>>> > >>>> outputChan := > >>>> make(chan result, 1 > >>>> ) > >>>> > >>>> // Тот факт, что задания кладутся в input и output в одном и том же > порядке, > >>>> > >>>> > >>>> // гарантирует, что ответы будут упорядочены в том же порядке. > >>>> > >>>> input <- task{i, outputChan} > >>>> output <- outputChan > >>>> } > >>>> > >>>> close > >>>> (input) > >>>> > >>>> close > >>>> (output) > >>>> } > >>>> > >>>> > >>>> func > >>>> main() { > >>>> > >>>> // Каналы обязательно буферизованные (длина буфера = числу воркеров). > >>>> > >>>> input := > >>>> make(chan > >>>> task, numWorkers) > >>>> output := > >>>> make(chan chan > >>>> result, numWorkers) > >>>> > >>>> > >>>> // Запускаем готовилку входных данных. > >>>> > >>>> > >>>> go > >>>> prepareInput(input, output) > >>>> > >>>> > >>>> // Запускаем воркеры. > >>>> > >>>> > >>>> for i := 0 > >>>> ; i < numWorkers; i++ { > >>>> > >>>> go > >>>> worker(i, input) > >>>> } > >>>> > >>>> > >>>> // Читаем ответы в порядке, в каком нам нужно. > >>>> > >>>> > >>>> for res := range > >>>> output { > >>>> fmt.Printf( > >>>> "%+v\n" > >>>> , <-res) > >>>> } > >>>> } > >>>> > >>>> > >>>> > >>>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван < > [email protected]> написал: > >>>> Добрый день, могучий MoscowPM > >>>> > >>>> Опять про параллельную обработку. > >>>> > >>>> Хочется написать вот такую схему обработки ввода: > >>>> master создаёт work'ов, > >>>> читает порции из файла, раздаёт порции worker'ам > >>>> ждёт, пока worker обработает, получает ответ worker'a > >>>> пишет результат в файл. > >>>> Так же мастер буфереизует вывод, чтобы выход писался в правильном > порядке. > >>>> > >>>> Самое туманное: > >>>> Как передавать данные от мастера к worker'у и Обратно?!!!! > >>>> Как ждать готовности?!!! > >>>> Должна ли такая схема (работа с диском из одного места) дать > ускорение по сравнению с чтением/записью файла в каждом worker'е? > >>>> > >>>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму: > >>>> как сделать разделяемую память, (как быстро передавать данные между > мастером и worker'ом внутри Perl)? > >>>> как сделать неблокирующее ожидание готовности одного из worker'ов > (при котором можно заниматься вводом-выводом)? > >>>> > >>>> Подскажите, на чём и как такое писать!! > >>>> Спасибо! > >>>> > >>>> Уважение > >>>> Иван Харпалев > >>>> > > > > -- > > Moscow.pm mailing list > > [email protected] | http://moscow.pm.org > -- > Moscow.pm mailing list > [email protected] | http://moscow.pm.org >
-- Moscow.pm mailing list [email protected] | http://moscow.pm.org
