Nikerabbit has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/394118 )

Change subject: Add parallelism to repong
......................................................................

Add parallelism to repong

Change-Id: Ic9ae1290a7230ec1df0912e229d212149731ba8d
---
M repoconfig.commit.json
M repoconfig.json
M repong/repong.php
3 files changed, 140 insertions(+), 34 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/translatewiki 
refs/changes/18/394118/1

diff --git a/repoconfig.commit.json b/repoconfig.commit.json
index cda897c..3686f90 100644
--- a/repoconfig.commit.json
+++ b/repoconfig.commit.json
@@ -1,6 +1,7 @@
 {
        "@meta": {
-               "export": "php 
/srv/mediawiki/targets/production/extensions/Translate/scripts/export.php"
+               "export": "php 
/srv/mediawiki/targets/production/extensions/Translate/scripts/export.php",
+               "expand": "php 
/srv/mediawiki/targets/production/extensions/Translate/scripts/expand-groupspec.php"
        },
        "blockly": {
                "group": "out-blockly*",
diff --git a/repoconfig.json b/repoconfig.json
index 83c6d32..56b3885 100644
--- a/repoconfig.json
+++ b/repoconfig.json
@@ -1,6 +1,7 @@
 {
        "@meta": {
-               "export": "php 
/srv/mediawiki/targets/production/extensions/Translate/scripts/export.php"
+               "export": "php 
/srv/mediawiki/targets/production/extensions/Translate/scripts/export.php",
+                "expand": "php 
/srv/mediawiki/targets/production/extensions/Translate/scripts/expand-groupspec.php"
        },
        "blockly": {
                "group": "out-blockly*",
diff --git a/repong/repong.php b/repong/repong.php
index 9ba684f..cbc538c 100644
--- a/repong/repong.php
+++ b/repong/repong.php
@@ -6,6 +6,8 @@
 use Symfony\Component\Console\Input\InputArgument;
 use Symfony\Component\Console\Input\InputInterface;
 use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Process\Exception\ProcessFailedException;
+use Symfony\Component\Process\Exception\ProcessTimedOutException;
 use Symfony\Component\Process\Process;
 
 require_once __DIR__ . '/vendor/autoload.php';
@@ -16,6 +18,7 @@
        protected $usernameConversion = [
                'nike' => 'nikerabbit',
        ];
+       protected $parallelism = 1;
 
        public function initialize() {
                $base = $this->getBase();
@@ -31,6 +34,11 @@
 
                $json = file_get_contents( "$base/repoconfig.json" );
                $this->config = json_decode( $json, true );
+
+               $cores = preg_match_all( '/^processor/m', file_get_contents( 
'/proc/cpuinfo' ) );
+               if ( $cores ) {
+                       $this->parallelism = (int)$cores;
+               }
        }
 
        protected function getBase() {
@@ -71,6 +79,89 @@
 
                return $str;
        }
+
+       protected function runParallelWithOutput( SplObjectStorage $processes, 
OutputInterface $output ) {
+               $this->runParaller(
+                       $processes,
+                       function ( $process ) use ( $output ) {
+                               $output->writeln( $process->getCommandLine(), 
OutputInterface::VERBOSITY_VERBOSE );
+                       },
+                       function ( $process, $exception = null ) use ( $output 
) {
+                               if ( $process->isSuccessful() ) {
+                                       $processOutput = $process->getOutput();
+                                       if ( trim( $processOutput ) ) {
+                                               $formatter = 
$output->getFormatter();
+                                               $command = $formatter->escape( 
$process->getCommandLine() );
+                                               $stdout = $formatter->escape( 
$processOutput );
+                                               $output->write( 
"<options=bold>$command</>\n$stdout\n" );
+                                       }
+                               } elseif ( $exception ) {
+                                       throw $exception;
+                               } else {
+                                       throw new ProcessFailedException( 
$process );
+                               }
+                       }
+               );
+       }
+
+       protected function runParaller(
+               SplObjectStorage $queue,
+               callable $onStart = null,
+               callable $onEnd = null
+       ) {
+               $running = [];
+               do {
+                       while ( count( $running ) < $this->parallelism ) {
+                               list( $ok, $process ) = 
self::getNextExecutableProcessIndex( $queue );
+
+                               // Check if anything to execute
+                               if ( $ok === 'NONE' ) {
+                                       break;
+                               }
+
+                               unset( $queue[ $process ] );
+                               is_callable( $onStart ) && $onStart( $process );
+
+                               // Check whether to run or fail the process
+                               if ( $ok === 'OK' ) {
+                                       $process->start();
+                                       $running[] = $process;
+                               } else {
+                                       $exception = new RuntimeException( 'A 
dependent task failed' );
+                                       is_callable( $onEnd ) && $onEnd( 
$process, $exception );
+                               }
+                       }
+
+                       usleep( 10000 );
+
+                       foreach ( $running as $index => $process ) {
+                               if ( !$process->isRunning() ) {
+                                       unset( $running[$index] );
+                                       is_callable( $onEnd ) && $onEnd( 
$process );
+                               }
+
+                               try {
+                                       $process->checkTimeout();
+                               } catch ( ProcessTimedOutException $e ) {
+                                       unset( $running[$index] );
+                                       is_callable( $onEnd ) && $onEnd( 
$process, $e );
+                               }
+                       }
+               } while ( count( $queue ) > 0 || $running !== [] );
+       }
+
+       private static function getNextExecutableProcessIndex( $queue ) {
+               foreach ( $queue as $process ) {
+                       $dependency = $queue[ $process ];
+                       if ( $dependency === null ) {
+                               return [ 'OK', $process ];
+                       } elseif ( $dependency->isTerminated() ) {
+                               return $dependency->isSuccessful() ? [ 'OK', 
$process ] : [ 'FAIL', $process ];
+                       }
+               }
+
+               return [ 'NONE', null ];
+       }
 }
 
 class UpdateCommand extends RepoNgCommand {
@@ -86,6 +177,8 @@
                $config = $this->getConfig( $project );
                $base = $this->getBase();
                $bindir = $this->bindir;
+
+               $processes = new SplObjectStorage();
 
                foreach ( $config['repos'] as $name => $repo ) {
                        $type = $repo['type'];
@@ -110,13 +203,13 @@
                        } else {
                                throw new RuntimeException( 'Unknown repo type' 
);
                        }
-                       $output->writeln( $command, 
OutputInterface::VERBOSITY_VERBOSE );
 
                        $process = new Process( $command );
-                       $process->setTimeout( 600 );
-                       $process->mustRun();
-                       $output->write( $process->getOutput() );
+                       $process->setTimeout( 300 );
+                       $processes->attach( $process );
                }
+
+               $this->runParallelWithOutput( $processes, $output );
        }
 }
 
@@ -132,10 +225,11 @@
                $project = $input->getArgument( 'project' );
                $config = $this->getConfig( $project );
                $exporter = $this->config['@meta']['export'];
+               $expander = $this->config['@meta']['expand'];
 
                $defaultOptions = [
+                       'group' => null,
                        'quiet' => true,
-                       'group' => $config['group'],
                        'threshold' => 35,
                        'target' => $this->getBase(),
                ];
@@ -152,37 +246,47 @@
                        $defaultOptions['threshold'] = 
(int)$config['export-threshold'];
                }
 
-               $jobOptions = [ 'lang' => '*' ] + $defaultOptions + [ 'skip' => 
'en,qqq' ];
-               $command = $this->buildCommandline( $exporter, $jobOptions );
-               $output->writeln( $command, OutputInterface::VERBOSITY_VERBOSE 
);
 
+               $command = "$expander '{$config[ 'group' ]}'";
                $process = new Process( $command );
-               $process->setTimeout( 300 );
+               $process->setTimeout( 10 );
                $process->mustRun();
-               $output->write( $process->getOutput() );
+               $groups = explode( "\n", trim( $process->getOutput() ) );
 
-               // Then message documentation
-               $jobOptions = [ 'lang' => 'qqq', 'threshold' => null ] + 
$defaultOptions;
-               $command = $this->buildCommandline( $exporter, $jobOptions );
-               $output->writeln( $command, OutputInterface::VERBOSITY_VERBOSE 
);
+               $processes = new SplObjectStorage();
 
-               $process = new Process( $command );
-               $process->mustRun();
-               $output->write( $process->getOutput() );
+               foreach ( $groups as $group ) {
+                       $defaultOptions[ 'group' ] = $group;
 
-               // Last languages that have a forced export
-               if ( isset( $config['always-export-languages'] ) ) {
+                       $jobOptions = [ 'lang' => '*' ] + $defaultOptions + [ 
'skip' => 'en,qqq' ];
+                       $command = $this->buildCommandline( $exporter, 
$jobOptions );
+                       $process1 = new Process( $command );
+                       $process1->setTimeout( 300 );
+
+                       $processes->attach( $process1 );
+
+                       // Then message documentation
+                       $jobOptions = [ 'lang' => 'qqq', 'threshold' => null ] 
+ $defaultOptions;
+                       $command = $this->buildCommandline( $exporter, 
$jobOptions );
+                       $process2 = new Process( $command );
+                       $process2->setTimeout( 30 );
+                       $processes->attach( $process2, $process1 );
+
+                       // Last languages that have a forced export
+                       if ( !isset( $config['always-export-languages'] ) ) {
+                               continue;
+                       }
+
                        $lang = $config['always-export-languages'];
                        $jobOptions = [ 'lang' => $lang, 'threshold' => null ] 
+ $defaultOptions;
                        $command = $this->buildCommandline( $exporter, 
$jobOptions );
-                       $output->writeln( $command, 
OutputInterface::VERBOSITY_VERBOSE );
 
-                       $process = new Process( $command );
-                       $process->setTimeout( 120 );
-                       $process->mustRun();
-                       $output->write( $process->getOutput() );
+                       $process3 = new Process( $command );
+                       $process3->setTimeout( 120 );
+                       $processes->attach( $process3, $process2 );
                }
 
+               $this->runParallelWithOutput( $processes, $output );
        }
 }
 
@@ -199,6 +303,8 @@
                $config = $this->getConfig( $project );
                $message = 'Localisation updates from 
https://translatewiki.net.';
                $base = $this->getBase();
+
+               $processes = new SplObjectStorage();
 
                foreach ( $config['repos'] as $name => $repo ) {
                        if ( $repo['type'] === 'git' || $repo['type'] === 
'github' ) {
@@ -228,12 +334,10 @@
                        } else {
                                throw new RuntimeException( 'Unknown repo type' 
);
                        }
-                       $output->writeln( $command, 
OutputInterface::VERBOSITY_VERBOSE );
 
                        $process = new Process( $command );
                        $process->setTimeout( 120 );
-                       $process->mustRun();
-                       $output->write( $process->getOutput() );
+                       $processes->attach( $process );
 
                        $autoMerge = isset( $repo['auto-merge'] ) ? 
$repo['auto-merge'] : true;
 
@@ -241,14 +345,14 @@
                        if ( $repo['type'] === 'wmgerrit' && $autoMerge ) {
                                $project = str_replace( 
'ssh://[email protected]:29418/', '', $repo['url'] );
                                $command = $this->bindir . 
"/merge-wmgerrit-patches '$project'";
-                               $output->writeln( $command, 
OutputInterface::VERBOSITY_VERBOSE );
 
-                               $process = new Process( $command );
-                               $process->setTimeout( 120 );
-                               $process->mustRun();
-                               $output->write( $process->getOutput() );
+                               $mergeProcess = new Process( $command );
+                               $mergeProcess->setTimeout( 120 );
+                               $processes->attach( $mergeProcess, $process );
                        }
                }
+
+               $this->runParallelWithOutput( $processes, $output );
        }
 }
 

-- 
To view, visit https://gerrit.wikimedia.org/r/394118
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic9ae1290a7230ec1df0912e229d212149731ba8d
Gerrit-PatchSet: 1
Gerrit-Project: translatewiki
Gerrit-Branch: master
Gerrit-Owner: Nikerabbit <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to