EBernhardson has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/227939

Change subject: Basic script to run cirrus queries in bulk
......................................................................

Basic script to run cirrus queries in bulk

script takes a stream of queries on stdin and issues them via the \CirrusSearch
object. Reports the query and number of results to stdout as a json object per
line. Can override global variables by passing a json object as a cli
parameter.

Change-Id: Id92168c03b6310fb4ad6bb77f28dd9ee90f184ea
---
M autoload.php
A includes/Maintenance/StreamingForkController.php
A maintenance/runSearch.php
3 files changed, 228 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/CirrusSearch 
refs/changes/39/227939/1

diff --git a/autoload.php b/autoload.php
index 87954b0..99e6b28 100644
--- a/autoload.php
+++ b/autoload.php
@@ -47,6 +47,8 @@
        'CirrusSearch\\Maintenance\\MappingConfigBuilder' => __DIR__ . 
'/includes/Maintenance/MappingConfigBuilder.php',
        'CirrusSearch\\Maintenance\\ReindexForkController' => __DIR__ . 
'/includes/Maintenance/ReindexForkController.php',
        'CirrusSearch\\Maintenance\\Reindexer' => __DIR__ . 
'/includes/Maintenance/Reindexer.php',
+       'CirrusSearch\\Maintenance\\RunSearch' => __DIR__ . 
'/maintenance/runSearch.php',
+       'CirrusSearch\\Maintenance\\StreamingForkController' => __DIR__ . 
'/includes/Maintenance/StreamingForkController.php',
        'CirrusSearch\\Maintenance\\UpdateOneSearchIndexConfig' => __DIR__ . 
'/maintenance/updateOneSearchIndexConfig.php',
        'CirrusSearch\\Maintenance\\UpdateSearchIndexConfig' => __DIR__ . 
'/maintenance/updateSearchIndexConfig.php',
        'CirrusSearch\\Maintenance\\UpdateVersionIndex' => __DIR__ . 
'/maintenance/updateVersionIndex.php',
diff --git a/includes/Maintenance/StreamingForkController.php 
b/includes/Maintenance/StreamingForkController.php
new file mode 100644
index 0000000..397f596
--- /dev/null
+++ b/includes/Maintenance/StreamingForkController.php
@@ -0,0 +1,99 @@
+<?php
+
+namespace CirrusSearch\Maintenance;
+
+class StreamingForkController extends \ForkController {
+       protected $workCallback;
+       protected $input, $output;
+       protected $socket;
+
+       public function __construct( $numProcs, $workCallback, $input, $output 
) {
+               parent::__construct( $numProcs );
+               $this->workCallback = $workCallback;
+               $this->input = $input;
+               $this->output = $output;
+       }
+
+       public function start() {
+               $status = parent::start();
+               if ( $status === 'child' ) {
+                       $this->consume();
+               }
+       }
+
+       protected function forkWorkers( $numProcs ) {
+               $this->prepareEnvironment();
+
+               // Create the child processes
+               for ( $i = 0; $i < $numProcs; $i++ ) {
+                       $sockets = stream_socket_pair( STREAM_PF_UNIX, 
STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
+                       // Do the fork
+                       $pid = pcntl_fork();
+                       if ( $pid === -1 || $pid === false ) {
+                               echo "Error creating child processes\n";
+                               exit( 1 );
+                       }
+
+                       if ( !$pid ) {
+                               $this->initChild();
+                               $this->childNumber = $i;
+                               $this->socket = $sockets[0];
+                               fclose( $sockets[1] );
+                               return 'child';
+                       } else {
+                               // This is the parent process
+                               $this->children[$pid] = true;
+                               fclose( $sockets[0] );
+                               $childSockets[$i] = $sockets[1];
+                       }
+               }
+               $this->feedChildren( $childSockets );
+               foreach ( $childSockets as $socket ) {
+                       fclose( $socket );
+               }
+               return 'parent';
+       }
+
+       protected function consume() {
+               while ( !feof( $this->socket ) ) {
+                       $line = trim( fgets( $this->socket ) );
+                       if ( $line ) {
+                               $result = call_user_func( $this->workCallback, 
$line );
+                               fwrite( $this->socket, $result . "\n" );
+                       }
+               }
+       }
+
+       protected function feedChildren( array $sockets ) {
+               $used = array();
+               while ( !feof( $this->input ) ) {
+                       $query = fgets( $this->input );
+                       if ( !trim( $query ) ) {
+                               continue;
+                       }
+                       if ( $used ) {
+                               do {
+                                       $this->updateAvailableSockets( 
$sockets, $used, $sockets ? 0 : 5 );
+                               } while( !$sockets );
+                       }
+                       $socket = array_pop( $sockets );
+                       fputs( $socket, $query );
+                       $used[] = $socket;
+               }
+               while ( $used ) {
+                       $this->updateAvailableSockets( $sockets, $used, 5 );
+               }
+       }
+
+       protected function updateAvailableSockets( &$sockets, &$used, $timeout 
) {
+               $read = $used;
+               $write = $except = array();
+               stream_select( $read, $write, $except, $timeout );
+               foreach ( $read as $socket ) {
+                       fwrite( $this->output, fgets( $socket ) );
+                       $sockets[] = $socket;
+                       $idx = array_search( $socket, $used );
+                       unset( $used[$idx] );
+               }
+       }
+}
diff --git a/maintenance/runSearch.php b/maintenance/runSearch.php
new file mode 100644
index 0000000..56c85c0
--- /dev/null
+++ b/maintenance/runSearch.php
@@ -0,0 +1,127 @@
+<?php
+
+namespace CirrusSearch\Maintenance;
+
+/**
+ * Run search queries provided on stdin
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ */
+
+$IP = getenv( 'MW_INSTALL_PATH' );
+if( $IP === false ) {
+       $IP = __DIR__ . '/../../..';
+}
+require_once( "$IP/maintenance/Maintenance.php" );
+require_once( __DIR__ . '/../includes/Maintenance/Maintenance.php' );
+
+/**
+ * Dump an index from elasticsearch.
+ */
+class RunSearch extends Maintenance {
+
+       public function __construct() {
+               parent::__construct();
+               $this->addDescription( "Run one or more searches against the 
cluster. " .
+                       "search queries are read from stdin." );
+               $this->addOption( 'type', 'What type of search to run, prefix 
or full_text.' .
+                       'defaults to full_text.', false, true );
+               $this->addOption( 'options', 'A JSON object mapping from global 
variable to its test value' );
+               $this->addOption( 'fork', 'Fork multiple processes to run 
queries from.' .
+                       'defaults to false.', false, true );
+       }
+
+       public function execute() {
+               global $wgPoolCounterConf;
+
+               // Make sure we don't flood the pool counter
+               unset( $wgPoolCounterConf['CirrusSearch-Search'],
+                       $wgPoolCounterConf['CirrusSearch-PerUser'] );
+
+               $this->applyGlobals();
+               $search = $this->getSearchCallback();
+               $forks = $this->getOption( 'fork', false );
+               if ( ctype_digit( $forks ) ) {
+                       $self = $this;
+                       $callback = function( $line ) use ( $self, $search ) {
+                               return $self->consume( $search, $line );
+                       };
+                       $controller = new StreamingForkController( $forks, 
$callback, STDIN, STDOUT );
+                       $controller->start();
+               } else {
+                       while ( !feof( STDIN ) ) {
+                               $line = trim( fgets( STDIN ) );
+                               if ( $line ) {
+                                       echo $this->consume( $search, $line );
+                               }
+                       }
+               }
+       }
+
+       protected function applyGlobals() {
+               $options = json_decode( $this->getOption( 'options', 'false' ) 
);
+               if ( $options ) {
+                       foreach ( $options as $key => $value ) {
+                               if ( array_key_exists( $key, $GLOBALS ) ) {
+                                       $GLOBALS[$key] = $value;
+                               } else {
+                                       $this->error( "\nERROR: $key is not a 
valid global variable\n" );
+                                       exit();
+                               }
+                       }
+               }
+       }
+
+       protected function getSearchCallback() {
+               $searchType = $this->getOption( 'type', 'full_text' );
+               switch ( $searchType ) {
+                       case 'full_text':
+                               $engine = new \CirrusSearch;
+                               return function ( $query ) use ( $engine ) {
+                                       $result = $engine->searchText( $query );
+                                       if ( $result instanceof \Status ) {
+                                               return $result;
+                                       } else {
+                                               return \Status::newGood( 
$result );
+                                       }
+                               };
+
+                       case 'prefix':
+                               return function ( $query ) {
+                                       $searcher = new \CirrusSearch\Searcher( 
0, 10 );
+                                       return $searcher->prefixSearch( $query 
);
+                               };
+
+                       default:
+                               $this->error( "\nERROR: Unknown search type 
$searchType\n" );
+                               exit( 1 );
+               }
+       }
+
+       public function consume( $search, $query ) {
+               $data = array( 'query' => $query );
+               $status = $search( $query );
+               if ( $status->isOK() ) {
+                       $data['rows'] = $status->getValue()->numRows();
+               } else {
+                       $data['error'] = $resultSet->getMessage()->text();
+               }
+               return json_encode( $data );
+       }
+}
+
+$maintClass = "CirrusSearch\Maintenance\RunSearch";
+require_once RUN_MAINTENANCE_IF_MAIN;

-- 
To view, visit https://gerrit.wikimedia.org/r/227939
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id92168c03b6310fb4ad6bb77f28dd9ee90f184ea
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/CirrusSearch
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to