Nikerabbit has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/394118 )
Change subject: Add parallelism to repong
......................................................................
Add parallelism to repong
Change-Id: Ic9ae1290a7230ec1df0912e229d212149731ba8d
---
M repoconfig.commit.json
M repoconfig.json
M repong/repong.php
3 files changed, 140 insertions(+), 34 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/translatewiki
refs/changes/18/394118/1
diff --git a/repoconfig.commit.json b/repoconfig.commit.json
index cda897c..3686f90 100644
--- a/repoconfig.commit.json
+++ b/repoconfig.commit.json
@@ -1,6 +1,7 @@
{
"@meta": {
- "export": "php
/srv/mediawiki/targets/production/extensions/Translate/scripts/export.php"
+ "export": "php
/srv/mediawiki/targets/production/extensions/Translate/scripts/export.php",
+ "expand": "php
/srv/mediawiki/targets/production/extensions/Translate/scripts/expand-groupspec.php"
},
"blockly": {
"group": "out-blockly*",
diff --git a/repoconfig.json b/repoconfig.json
index 83c6d32..56b3885 100644
--- a/repoconfig.json
+++ b/repoconfig.json
@@ -1,6 +1,7 @@
{
"@meta": {
- "export": "php
/srv/mediawiki/targets/production/extensions/Translate/scripts/export.php"
+ "export": "php
/srv/mediawiki/targets/production/extensions/Translate/scripts/export.php",
+ "expand": "php
/srv/mediawiki/targets/production/extensions/Translate/scripts/expand-groupspec.php"
},
"blockly": {
"group": "out-blockly*",
diff --git a/repong/repong.php b/repong/repong.php
index 9ba684f..cbc538c 100644
--- a/repong/repong.php
+++ b/repong/repong.php
@@ -6,6 +6,8 @@
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Process\Exception\ProcessFailedException;
+use Symfony\Component\Process\Exception\ProcessTimedOutException;
use Symfony\Component\Process\Process;
require_once __DIR__ . '/vendor/autoload.php';
@@ -16,6 +18,7 @@
protected $usernameConversion = [
'nike' => 'nikerabbit',
];
+ protected $parallelism = 1;
public function initialize() {
$base = $this->getBase();
@@ -31,6 +34,11 @@
$json = file_get_contents( "$base/repoconfig.json" );
$this->config = json_decode( $json, true );
+
+ $cores = preg_match_all( '/^processor/m', file_get_contents(
'/proc/cpuinfo' ) );
+ if ( $cores ) {
+ $this->parallelism = (int)$cores;
+ }
}
protected function getBase() {
@@ -71,6 +79,89 @@
return $str;
}
+
+ protected function runParallelWithOutput( SplObjectStorage $processes,
OutputInterface $output ) {
+ $this->runParaller(
+ $processes,
+ function ( $process ) use ( $output ) {
+ $output->writeln( $process->getCommandLine(),
OutputInterface::VERBOSITY_VERBOSE );
+ },
+ function ( $process, $exception = null ) use ( $output
) {
+ if ( $process->isSuccessful() ) {
+ $processOutput = $process->getOutput();
+ if ( trim( $processOutput ) ) {
+ $formatter =
$output->getFormatter();
+ $command = $formatter->escape(
$process->getCommandLine() );
+ $stdout = $formatter->escape(
$processOutput );
+ $output->write(
"<options=bold>$command</>\n$stdout\n" );
+ }
+ } elseif ( $exception ) {
+ throw $exception;
+ } else {
+ throw new ProcessFailedException(
$process );
+ }
+ }
+ );
+ }
+
+ protected function runParaller(
+ SplObjectStorage $queue,
+ callable $onStart = null,
+ callable $onEnd = null
+ ) {
+ $running = [];
+ do {
+ while ( count( $running ) < $this->parallelism ) {
+ list( $ok, $process ) =
self::getNextExecutableProcessIndex( $queue );
+
+ // Check if anything to execute
+ if ( $ok === 'NONE' ) {
+ break;
+ }
+
+ unset( $queue[ $process ] );
+ is_callable( $onStart ) && $onStart( $process );
+
+ // Check whether to run or fail the process
+ if ( $ok === 'OK' ) {
+ $process->start();
+ $running[] = $process;
+ } else {
+ $exception = new RuntimeException( 'A
dependent task failed' );
+ is_callable( $onEnd ) && $onEnd(
$process, $exception );
+ }
+ }
+
+ usleep( 10000 );
+
+ foreach ( $running as $index => $process ) {
+ if ( !$process->isRunning() ) {
+ unset( $running[$index] );
+ is_callable( $onEnd ) && $onEnd(
$process );
+ }
+
+ try {
+ $process->checkTimeout();
+ } catch ( ProcessTimedOutException $e ) {
+ unset( $running[$index] );
+ is_callable( $onEnd ) && $onEnd(
$process, $e );
+ }
+ }
+ } while ( count( $queue ) > 0 || $running !== [] );
+ }
+
+ private static function getNextExecutableProcessIndex( $queue ) {
+ foreach ( $queue as $process ) {
+ $dependency = $queue[ $process ];
+ if ( $dependency === null ) {
+ return [ 'OK', $process ];
+ } elseif ( $dependency->isTerminated() ) {
+ return $dependency->isSuccessful() ? [ 'OK',
$process ] : [ 'FAIL', $process ];
+ }
+ }
+
+ return [ 'NONE', null ];
+ }
}
class UpdateCommand extends RepoNgCommand {
@@ -86,6 +177,8 @@
$config = $this->getConfig( $project );
$base = $this->getBase();
$bindir = $this->bindir;
+
+ $processes = new SplObjectStorage();
foreach ( $config['repos'] as $name => $repo ) {
$type = $repo['type'];
@@ -110,13 +203,13 @@
} else {
throw new RuntimeException( 'Unknown repo type'
);
}
- $output->writeln( $command,
OutputInterface::VERBOSITY_VERBOSE );
$process = new Process( $command );
- $process->setTimeout( 600 );
- $process->mustRun();
- $output->write( $process->getOutput() );
+ $process->setTimeout( 300 );
+ $processes->attach( $process );
}
+
+ $this->runParallelWithOutput( $processes, $output );
}
}
@@ -132,10 +225,11 @@
$project = $input->getArgument( 'project' );
$config = $this->getConfig( $project );
$exporter = $this->config['@meta']['export'];
+ $expander = $this->config['@meta']['expand'];
$defaultOptions = [
+ 'group' => null,
'quiet' => true,
- 'group' => $config['group'],
'threshold' => 35,
'target' => $this->getBase(),
];
@@ -152,37 +246,47 @@
$defaultOptions['threshold'] =
(int)$config['export-threshold'];
}
- $jobOptions = [ 'lang' => '*' ] + $defaultOptions + [ 'skip' =>
'en,qqq' ];
- $command = $this->buildCommandline( $exporter, $jobOptions );
- $output->writeln( $command, OutputInterface::VERBOSITY_VERBOSE
);
+ $command = "$expander '{$config[ 'group' ]}'";
$process = new Process( $command );
- $process->setTimeout( 300 );
+ $process->setTimeout( 10 );
$process->mustRun();
- $output->write( $process->getOutput() );
+ $groups = explode( "\n", trim( $process->getOutput() ) );
- // Then message documentation
- $jobOptions = [ 'lang' => 'qqq', 'threshold' => null ] +
$defaultOptions;
- $command = $this->buildCommandline( $exporter, $jobOptions );
- $output->writeln( $command, OutputInterface::VERBOSITY_VERBOSE
);
+ $processes = new SplObjectStorage();
- $process = new Process( $command );
- $process->mustRun();
- $output->write( $process->getOutput() );
+ foreach ( $groups as $group ) {
+ $defaultOptions[ 'group' ] = $group;
- // Last languages that have a forced export
- if ( isset( $config['always-export-languages'] ) ) {
+ $jobOptions = [ 'lang' => '*' ] + $defaultOptions + [
'skip' => 'en,qqq' ];
+ $command = $this->buildCommandline( $exporter,
$jobOptions );
+ $process1 = new Process( $command );
+ $process1->setTimeout( 300 );
+
+ $processes->attach( $process1 );
+
+ // Then message documentation
+ $jobOptions = [ 'lang' => 'qqq', 'threshold' => null ]
+ $defaultOptions;
+ $command = $this->buildCommandline( $exporter,
$jobOptions );
+ $process2 = new Process( $command );
+ $process2->setTimeout( 30 );
+ $processes->attach( $process2, $process1 );
+
+ // Last languages that have a forced export
+ if ( !isset( $config['always-export-languages'] ) ) {
+ continue;
+ }
+
$lang = $config['always-export-languages'];
$jobOptions = [ 'lang' => $lang, 'threshold' => null ]
+ $defaultOptions;
$command = $this->buildCommandline( $exporter,
$jobOptions );
- $output->writeln( $command,
OutputInterface::VERBOSITY_VERBOSE );
- $process = new Process( $command );
- $process->setTimeout( 120 );
- $process->mustRun();
- $output->write( $process->getOutput() );
+ $process3 = new Process( $command );
+ $process3->setTimeout( 120 );
+ $processes->attach( $process3, $process2 );
}
+ $this->runParallelWithOutput( $processes, $output );
}
}
@@ -199,6 +303,8 @@
$config = $this->getConfig( $project );
$message = 'Localisation updates from
https://translatewiki.net.';
$base = $this->getBase();
+
+ $processes = new SplObjectStorage();
foreach ( $config['repos'] as $name => $repo ) {
if ( $repo['type'] === 'git' || $repo['type'] ===
'github' ) {
@@ -228,12 +334,10 @@
} else {
throw new RuntimeException( 'Unknown repo type'
);
}
- $output->writeln( $command,
OutputInterface::VERBOSITY_VERBOSE );
$process = new Process( $command );
$process->setTimeout( 120 );
- $process->mustRun();
- $output->write( $process->getOutput() );
+ $processes->attach( $process );
$autoMerge = isset( $repo['auto-merge'] ) ?
$repo['auto-merge'] : true;
@@ -241,14 +345,14 @@
if ( $repo['type'] === 'wmgerrit' && $autoMerge ) {
$project = str_replace(
'ssh://[email protected]:29418/', '', $repo['url'] );
$command = $this->bindir .
"/merge-wmgerrit-patches '$project'";
- $output->writeln( $command,
OutputInterface::VERBOSITY_VERBOSE );
- $process = new Process( $command );
- $process->setTimeout( 120 );
- $process->mustRun();
- $output->write( $process->getOutput() );
+ $mergeProcess = new Process( $command );
+ $mergeProcess->setTimeout( 120 );
+ $processes->attach( $mergeProcess, $process );
}
}
+
+ $this->runParallelWithOutput( $processes, $output );
}
}
--
To view, visit https://gerrit.wikimedia.org/r/394118
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic9ae1290a7230ec1df0912e229d212149731ba8d
Gerrit-PatchSet: 1
Gerrit-Project: translatewiki
Gerrit-Branch: master
Gerrit-Owner: Nikerabbit <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits