EBernhardson has uploaded a new change for review.
https://gerrit.wikimedia.org/r/227939
Change subject: Basic script to run cirrus queries in bulk
......................................................................
Basic script to run cirrus queries in bulk
script takes a stream of queries on stdin and issues them via the \CirrusSearch
object. Reports the query and number of results to stdout as a json object per
line. Can override global variables by passing a json object as a cli
parameter.
Change-Id: Id92168c03b6310fb4ad6bb77f28dd9ee90f184ea
---
M autoload.php
A includes/Maintenance/StreamingForkController.php
A maintenance/runSearch.php
3 files changed, 228 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/CirrusSearch
refs/changes/39/227939/1
diff --git a/autoload.php b/autoload.php
index 87954b0..99e6b28 100644
--- a/autoload.php
+++ b/autoload.php
@@ -47,6 +47,8 @@
'CirrusSearch\\Maintenance\\MappingConfigBuilder' => __DIR__ .
'/includes/Maintenance/MappingConfigBuilder.php',
'CirrusSearch\\Maintenance\\ReindexForkController' => __DIR__ .
'/includes/Maintenance/ReindexForkController.php',
'CirrusSearch\\Maintenance\\Reindexer' => __DIR__ .
'/includes/Maintenance/Reindexer.php',
+ 'CirrusSearch\\Maintenance\\RunSearch' => __DIR__ .
'/maintenance/runSearch.php',
+ 'CirrusSearch\\Maintenance\\StreamingForkController' => __DIR__ .
'/includes/Maintenance/StreamingForkController.php',
'CirrusSearch\\Maintenance\\UpdateOneSearchIndexConfig' => __DIR__ .
'/maintenance/updateOneSearchIndexConfig.php',
'CirrusSearch\\Maintenance\\UpdateSearchIndexConfig' => __DIR__ .
'/maintenance/updateSearchIndexConfig.php',
'CirrusSearch\\Maintenance\\UpdateVersionIndex' => __DIR__ .
'/maintenance/updateVersionIndex.php',
diff --git a/includes/Maintenance/StreamingForkController.php
b/includes/Maintenance/StreamingForkController.php
new file mode 100644
index 0000000..397f596
--- /dev/null
+++ b/includes/Maintenance/StreamingForkController.php
@@ -0,0 +1,99 @@
+<?php
+
+namespace CirrusSearch\Maintenance;
+
+class StreamingForkController extends \ForkController {
+ protected $workCallback;
+ protected $input, $output;
+ protected $socket;
+
+ public function __construct( $numProcs, $workCallback, $input, $output
) {
+ parent::__construct( $numProcs );
+ $this->workCallback = $workCallback;
+ $this->input = $input;
+ $this->output = $output;
+ }
+
+ public function start() {
+ $status = parent::start();
+ if ( $status === 'child' ) {
+ $this->consume();
+ }
+ }
+
+ protected function forkWorkers( $numProcs ) {
+ $this->prepareEnvironment();
+
+ // Create the child processes
+ for ( $i = 0; $i < $numProcs; $i++ ) {
+ $sockets = stream_socket_pair( STREAM_PF_UNIX,
STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
+ // Do the fork
+ $pid = pcntl_fork();
+ if ( $pid === -1 || $pid === false ) {
+ echo "Error creating child processes\n";
+ exit( 1 );
+ }
+
+ if ( !$pid ) {
+ $this->initChild();
+ $this->childNumber = $i;
+ $this->socket = $sockets[0];
+ fclose( $sockets[1] );
+ return 'child';
+ } else {
+ // This is the parent process
+ $this->children[$pid] = true;
+ fclose( $sockets[0] );
+ $childSockets[$i] = $sockets[1];
+ }
+ }
+ $this->feedChildren( $childSockets );
+ foreach ( $childSockets as $socket ) {
+ fclose( $socket );
+ }
+ return 'parent';
+ }
+
+ protected function consume() {
+ while ( !feof( $this->socket ) ) {
+ $line = trim( fgets( $this->socket ) );
+ if ( $line ) {
+ $result = call_user_func( $this->workCallback,
$line );
+ fwrite( $this->socket, $result . "\n" );
+ }
+ }
+ }
+
+ protected function feedChildren( array $sockets ) {
+ $used = array();
+ while ( !feof( $this->input ) ) {
+ $query = fgets( $this->input );
+ if ( !trim( $query ) ) {
+ continue;
+ }
+ if ( $used ) {
+ do {
+ $this->updateAvailableSockets(
$sockets, $used, $sockets ? 0 : 5 );
+ } while( !$sockets );
+ }
+ $socket = array_pop( $sockets );
+ fputs( $socket, $query );
+ $used[] = $socket;
+ }
+ while ( $used ) {
+ $this->updateAvailableSockets( $sockets, $used, 5 );
+ }
+ }
+
+ protected function updateAvailableSockets( &$sockets, &$used, $timeout
) {
+ $read = $used;
+ $write = $except = array();
+ stream_select( $read, $write, $except, $timeout );
+ foreach ( $read as $socket ) {
+ fwrite( $this->output, fgets( $socket ) );
+ $sockets[] = $socket;
+ $idx = array_search( $socket, $used );
+ unset( $used[$idx] );
+ }
+ }
+}
diff --git a/maintenance/runSearch.php b/maintenance/runSearch.php
new file mode 100644
index 0000000..56c85c0
--- /dev/null
+++ b/maintenance/runSearch.php
@@ -0,0 +1,127 @@
+<?php
+
+namespace CirrusSearch\Maintenance;
+
+/**
+ * Run search queries provided on stdin
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ */
+
+$IP = getenv( 'MW_INSTALL_PATH' );
+if( $IP === false ) {
+ $IP = __DIR__ . '/../../..';
+}
+require_once( "$IP/maintenance/Maintenance.php" );
+require_once( __DIR__ . '/../includes/Maintenance/Maintenance.php' );
+
+/**
+ * Dump an index from elasticsearch.
+ */
+class RunSearch extends Maintenance {
+
+ public function __construct() {
+ parent::__construct();
+ $this->addDescription( "Run one or more searches against the
cluster. " .
+ "search queries are read from stdin." );
+ $this->addOption( 'type', 'What type of search to run, prefix
or full_text.' .
+ 'defaults to full_text.', false, true );
+ $this->addOption( 'options', 'A JSON object mapping from global
variable to its test value' );
+ $this->addOption( 'fork', 'Fork multiple processes to run
queries from.' .
+ 'defaults to false.', false, true );
+ }
+
+ public function execute() {
+ global $wgPoolCounterConf;
+
+ // Make sure we don't flood the pool counter
+ unset( $wgPoolCounterConf['CirrusSearch-Search'],
+ $wgPoolCounterConf['CirrusSearch-PerUser'] );
+
+ $this->applyGlobals();
+ $search = $this->getSearchCallback();
+ $forks = $this->getOption( 'fork', false );
+ if ( ctype_digit( $forks ) ) {
+ $self = $this;
+ $callback = function( $line ) use ( $self, $search ) {
+ return $self->consume( $search, $line );
+ };
+ $controller = new StreamingForkController( $forks,
$callback, STDIN, STDOUT );
+ $controller->start();
+ } else {
+ while ( !feof( STDIN ) ) {
+ $line = trim( fgets( STDIN ) );
+ if ( $line ) {
+ echo $this->consume( $search, $line );
+ }
+ }
+ }
+ }
+
+ protected function applyGlobals() {
+ $options = json_decode( $this->getOption( 'options', 'false' )
);
+ if ( $options ) {
+ foreach ( $options as $key => $value ) {
+ if ( array_key_exists( $key, $GLOBALS ) ) {
+ $GLOBALS[$key] = $value;
+ } else {
+ $this->error( "\nERROR: $key is not a
valid global variable\n" );
+ exit();
+ }
+ }
+ }
+ }
+
+ protected function getSearchCallback() {
+ $searchType = $this->getOption( 'type', 'full_text' );
+ switch ( $searchType ) {
+ case 'full_text':
+ $engine = new \CirrusSearch;
+ return function ( $query ) use ( $engine ) {
+ $result = $engine->searchText( $query );
+ if ( $result instanceof \Status ) {
+ return $result;
+ } else {
+ return \Status::newGood(
$result );
+ }
+ };
+
+ case 'prefix':
+ return function ( $query ) {
+ $searcher = new \CirrusSearch\Searcher(
0, 10 );
+ return $searcher->prefixSearch( $query
);
+ };
+
+ default:
+ $this->error( "\nERROR: Unknown search type
$searchType\n" );
+ exit( 1 );
+ }
+ }
+
+ public function consume( $search, $query ) {
+ $data = array( 'query' => $query );
+ $status = $search( $query );
+ if ( $status->isOK() ) {
+ $data['rows'] = $status->getValue()->numRows();
+ } else {
+ $data['error'] = $resultSet->getMessage()->text();
+ }
+ return json_encode( $data );
+ }
+}
+
+$maintClass = "CirrusSearch\Maintenance\RunSearch";
+require_once RUN_MAINTENANCE_IF_MAIN;
--
To view, visit https://gerrit.wikimedia.org/r/227939
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id92168c03b6310fb4ad6bb77f28dd9ee90f184ea
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/CirrusSearch
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits