Manybubbles has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/88131


Change subject: Optimize in place reindexing.
......................................................................

Optimize in place reindexing.

1.  Turn replicas on _after_ building the new index.
2.  Optimize the index before adding the replicas.  Might help some with
speed but helps more with query performance on closed indexes.
3.  Build the index in multiple processes.
4.  Use the 'create' action instead of the 'index' action.

Bug: 54918
Change-Id: I51101191a9e1daac794d027af23bbf6a0fb096ab
---
M CirrusSearch.php
A maintenance/includes/CirrusSearchReindexForkController.php
M updateOneSearchIndexConfig.php
3 files changed, 221 insertions(+), 50 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/CirrusSearch 
refs/changes/31/88131/1

diff --git a/CirrusSearch.php b/CirrusSearch.php
index 54764e6..42dc04b 100644
--- a/CirrusSearch.php
+++ b/CirrusSearch.php
@@ -102,17 +102,21 @@
 $wgCirrusSearchUseAggressiveSplitting = true;
 
 $dir = __DIR__ . '/';
+$includes = "$dir/includes/";
+$maintenanceIncludes = "$dir/maintenance/includes/";
 /**
  * Classes
  */
-$wgAutoloadClasses['CirrusSearch'] = $dir . 'includes/CirrusSearch.body.php';
-$wgAutoloadClasses['CirrusSearchAnalysisConfigBuilder'] = $dir . 
'includes/CirrusSearchAnalysisConfigBuilder.php';
-$wgAutoloadClasses['CirrusSearchConnection'] = $dir . 
'includes/CirrusSearchConnection.php';
-$wgAutoloadClasses['CirrusSearchMappingConfigBuilder'] = $dir . 
'includes/CirrusSearchMappingConfigBuilder.php';
-$wgAutoloadClasses['CirrusSearchPrefixSearchHook'] = $dir . 
'includes/CirrusSearchPrefixSearchHook.php';
-$wgAutoloadClasses['CirrusSearchSearcher'] = $dir . 
'includes/CirrusSearchSearcher.php';
-$wgAutoloadClasses['CirrusSearchTextFormatter'] = $dir . 
'includes/CirrusSearchTextFormatter.php';
-$wgAutoloadClasses['CirrusSearchUpdater'] = $dir . 
'includes/CirrusSearchUpdater.php';
+$wgAutoloadClasses['CirrusSearch'] = $includes . 'CirrusSearch.body.php';
+$wgAutoloadClasses['CirrusSearchAnalysisConfigBuilder'] = $includes . 
'CirrusSearchAnalysisConfigBuilder.php';
+$wgAutoloadClasses['CirrusSearchConnection'] = $includes . 
'CirrusSearchConnection.php';
+$wgAutoloadClasses['CirrusSearchMappingConfigBuilder'] = $includes . 
'CirrusSearchMappingConfigBuilder.php';
+$wgAutoloadClasses['CirrusSearchPrefixSearchHook'] = $includes . 
'CirrusSearchPrefixSearchHook.php';
+$wgAutoloadClasses['CirrusSearchSearcher'] = $includes . 
'CirrusSearchSearcher.php';
+$wgAutoloadClasses['CirrusSearchTextFormatter'] = $includes . 
'CirrusSearchTextFormatter.php';
+$wgAutoloadClasses['CirrusSearchUpdater'] = $includes . 
'CirrusSearchUpdater.php';
+
+$wgAutoloadClasses['CirrusSearchReindexForkController'] = $maintenanceIncludes 
. 'CirrusSearchReindexForkController.php';
 
 /**
  * Hooks
diff --git a/maintenance/includes/CirrusSearchReindexForkController.php 
b/maintenance/includes/CirrusSearchReindexForkController.php
new file mode 100644
index 0000000..f6b3803
--- /dev/null
+++ b/maintenance/includes/CirrusSearchReindexForkController.php
@@ -0,0 +1,61 @@
+<?
+/**
+ * Extensions to ForeController to prepare Elastica and to tell the child
+ * process which one it is.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ */
+class CirrusSearchReindexForkController extends ForkController {
+       /**
+        * @var integer number of this child or null if this is the parent
+        */
+       var $childNumber;
+       /**
+        * Fork a number of worker processes.  Have to hack ForkController to 
store
+        * the child number.
+        *
+        * @return string
+        */
+       protected function forkWorkers( $numProcs ) {
+               $this->prepareEnvironment();
+
+               // Create the child processes
+               for ( $i = 0; $i < $numProcs; $i++ ) {
+                       // Do the fork
+                       $pid = pcntl_fork();
+                       if ( $pid === -1 || $pid === false ) {
+                               echo "Error creating child processes\n";
+                               exit( 1 );
+                       }
+
+                       if ( !$pid ) {
+                               $this->initChild();
+                               $this->childNumber = $i; // Hack right here.
+                               return 'child';
+                       } else {
+                               // This is the parent process
+                               $this->children[$pid] = true;
+                       }
+               }
+
+               return 'parent';
+       }
+
+       protected function prepareEnvironment() {
+               parent::prepareEnvironment();
+               CirrusSearchConnection::destroySingleton();
+       }
+}
diff --git a/updateOneSearchIndexConfig.php b/updateOneSearchIndexConfig.php
index 065228e..cd8e1cb 100644
--- a/updateOneSearchIndexConfig.php
+++ b/updateOneSearchIndexConfig.php
@@ -44,6 +44,16 @@
        // steps.
        private $removeIndecies = false;
 
+       /**
+        * @var are there too few replicas in the index we're making?
+        */
+       private $tooFewReplicas = false;
+
+       /**
+        * @var number of processes to use when reindexing
+        */
+       private $reindexProcesses;
+
        public function __construct() {
                parent::__construct();
                $this->addDescription( "Update the configuration or contents of 
one search index." );
@@ -78,6 +88,8 @@
                        "reindex all documents from that index (via the alias) 
to this one, swing the " .
                        "alias to this index, and then remove other index.  
You'll have to redo all updates ".
                        "performed during this operation manually.  Defaults to 
false." );
+               $maintenance->addOption( 'reindexProcesses', 'Number of 
processess to use in reindex.  ' .
+                       'Defaults to 10.', false, true );
        }
 
        public function execute() {
@@ -100,6 +112,7 @@
                $this->closeOk = $this->getOption( 'closeOk', false );
                $this->indexIdentifier = $this->pickIndexIdentifierFromOption( 
$this->getOption( 'indexIdentifier', 'current' ) );
                $this->reindexAndRemoveOk = $this->getOption( 
'reindexAndRemoveOk', false );
+               $this->reindexProcesses = $this->getOption( 'reindexProcesses', 
10 );
 
                $this->validateIndex();
                $this->validateAnalyzers();
@@ -128,10 +141,14 @@
                        return;
                }
                $this->output( $this->indent . "Index exists so 
validating...\n" );
+               $this->validateIndexSettings();
+       }
+
+       private function validateIndexSettings() {
                $settings = $this->getIndex()->getSettings()->get();
 
                $this->output( $this->indent . "\tValidating number of 
shards..." );
-               $actualShardCount = $settings['index.number_of_shards'];
+               $actualShardCount = $settings[ 'index.number_of_shards' ];
                if ( $actualShardCount == $this->getShardCount() ) {
                        $this->output( "ok\n" );
                } else {
@@ -145,7 +162,7 @@
                }
 
                $this->output( $this->indent . "\tValidating number of 
replicas..." );
-               $actualReplicaCount = $settings['index.number_of_replicas'];
+               $actualReplicaCount = $settings[ 'index.number_of_replicas' ];
                if ( $actualReplicaCount == $this->getReplicaCount() ) {
                        $this->output( "ok\n" );
                } else {
@@ -283,8 +300,11 @@
                        }
                } else {
                        foreach ( $status->getIndicesWithAlias( 
$specificAliasName ) as $index ) {
-                               if( $this->getName() === 
$this->getSpecificIndexName() ) {
+                               if( $index->getName() === 
$this->getSpecificIndexName() ) {
                                        $this->output( "ok\n" );
+                                       if ( $this->tooFewReplicas ) {
+                                               $this->validateIndexSettings();
+                                       }
                                        return;
                                } else {
                                        $otherIndeciesWithAlias[] = 
$index->getName();
@@ -295,16 +315,40 @@
                        $this->output( "alias is free..." );
                        $this->getIndex()->addAlias( $specificAliasName, false 
);
                        $this->output( "corrected\n" );
+                       if ( $this->tooFewReplicas ) {
+                               $this->validateIndexSettings();
+                       }
                        return;
                }
                if ( $this->reindexAndRemoveOk ) {
                        $this->output( "is taken...\n" );
-                       $this->output( $this->indent . "\tReindexing...\n");
+                       $this->output( $this->indent . "\tReindexing...\n" );
                        // Muck with $this->indent because reindex is used to 
running at the top level.
                        $saveIndent = $this->indent;
                        $this->indent = $this->indent . "\t\t";
                        $this->reindex();
                        $this->indent = $saveIndent;
+                       if ( $this->tooFewReplicas ) {
+                               // Optimize the index so it'll be more compact 
for replication.  Not required
+                               // but should be helpful.
+                               $this->output( $this->indent . 
"\tOptimizing..." );
+                               $this->getIndex()->optimize( array( 
'max_num_segments' => 5 ) );
+                               $this->output( "Done\n" );
+                               $this->validateIndexSettings();
+                               $this->output( $this->indent . "\tWaiting for 
all shards to start...\n" );
+                               $each = 0;
+                               while ( true ) {
+                                       $unstarted = 
$this->indexShardsUnstarted();
+                                       if ( $each === 0 ) {
+                                               $this->output( $this->indent . 
"\t\t$unstarted remaining\n" );
+                                       }
+                                       if ( $unstarted === 0 ) {
+                                               break;
+                                       }
+                                       $each = ( $each + 1 ) % 20;
+                                       sleep( 1 );
+                               }
+                       }
                        $this->output( $this->indent . "\tSwapping alias...");
                        $this->getIndex()->addAlias( $specificAliasName, true );
                        $this->output( "done\n" );
@@ -318,6 +362,21 @@
                        "--reindexAndRemoveOk.  Make sure you understand the 
consequences of either\n" .
                        "choice." );
                $this->returnCode = 1;
+       }
+
+       private function indexShardsUnstarted() {
+               $data = $this->getIndex()->getStatus()->getData();
+               $count = 0;
+               foreach ( $data[ 'indices' ] as $index ) {
+                       foreach ( $index[ 'shards' ] as $shard ) {
+                               foreach ( $shard as $replica ) {
+                                       if ( $replica[ 'state' ] !== 'STARTED' 
) {
+                                               $count++;
+                                       }
+                               }
+                       }
+               }
+               return $count;
        }
 
        public function validateAllAlias() {
@@ -379,58 +438,105 @@
         * reparsing everything.
         */
        private function reindex() {
-               $query = new Elastica\Query();
-               $query->setFields( array( '_id', '_source' ) );
-
-               // Note here we dump from the current index (using the alias) 
so we can use CirrusSearchConnection::getPageType
-               $result = CirrusSearchConnection::getPageType( $this->indexType 
)->search( $query, array(
-                       'search_type' => 'scan',
-                       'scroll' => '10m',
-                       'size'=> $this->reindexChunkSize / 
$this->getShardCount()
+               $settings = $this->getIndex()->getSettings();
+               $settings->set( array(
+                       'refresh_interval' => -1,           // This is supposed 
to help with bulk index io load.
+                       'merge.policy.merge_factor' => 20,  // This is supposed 
to help with bulk index io load.
                ) );
-               $totalDocsToReindex = $result->getResponse()->getData();
-               $totalDocsToReindex = $totalDocsToReindex['hits']['total'];
-               $this->output( $this->indent . "About to reindex 
$totalDocsToReindex documents\n" );
-               $operationStartTime = microtime( true );
-               $completed = 0;
 
-               while ( true ) {
-                       wfProfileIn( __METHOD__ . '::receiveDocs' );
-                       $result = $this->getIndex()->search( array(), array(
-                               'scroll_id' => 
$result->getResponse()->getScrollId(),
-                               'scroll' => '10m'
-                       ) );
-                       wfProfileOut( __METHOD__ . '::receiveDocs' );
-                       if ( !$result->count() ) {
-                               $this->output( $this->indent . "All done\n" );
-                               break;
+               $children = $this->reindexProcesses;
+               $fork = new CirrusSearchReindexForkController( $children );
+               $forkResult = $fork->start();
+               switch ( $forkResult ) {
+               case 'child':
+                       try {
+                               $childNumber = $fork->childNumber;
+                               $this->output( $this->indent . "[$childNumber] 
Successfully forked child\n" );
+                               $query = new Elastica\Query();
+                               $query->setFields( array( '_id', '_source' ) );
+                               // Note that it is not ok to abs(_uid.hashCode) 
because hashCode(Integer.MIN_VALUE) == Integer.MIN_VALUE
+                               $query->setFilter( new Elastica\Filter\Script(
+                                       "(doc['_uid'].value.hashCode() & 
Integer.MAX_VALUE) % $children == $childNumber" ) );
+
+                               // Note here we dump from the current index 
(using the alias) so we can use CirrusSearchConnection::getPageType
+                               $result = CirrusSearchConnection::getPageType( 
$this->indexType )->search( $query, array(
+                                       'search_type' => 'scan',
+                                       'scroll' => '10m',
+                                       'size'=> $this->reindexChunkSize / 
$this->getShardCount()
+                               ) );
+                               $totalDocsToReindex = 
$result->getResponse()->getData();
+                               $totalDocsToReindex = 
$totalDocsToReindex['hits']['total'];
+                               $this->output( $this->indent . "[$childNumber] 
About to reindex $totalDocsToReindex documents\n" );
+                               $operationStartTime = microtime( true );
+                               $completed = 0;
+                               while ( true ) {
+                                       wfProfileIn( __METHOD__ . 
'::receiveDocs' );
+                                       $result = $this->getIndex()->search( 
array(), array(
+                                               'scroll_id' => 
$result->getResponse()->getScrollId(),
+                                               'scroll' => '1h'
+                                       ) );
+                                       wfProfileOut( __METHOD__ . 
'::receiveDocs' );
+                                       if ( !$result->count() ) {
+                                               $this->output( $this->indent . 
"[$childNumber] All done\n" );
+                                               break;
+                                       }
+                                       wfProfileIn( __METHOD__ . 
'::packageDocs' );
+                                       $documents = array();
+                                       while ( $result->current() ) {
+                                               $document = new 
\Elastica\Document( $result->current()->getId(), 
$result->current()->getSource() );
+                                               $document->setOpType( 'create' 
);
+                                               $documents[] = $document;
+                                               $result->next();
+                                       }
+                                       wfProfileOut( __METHOD__ . 
'::packageDocs' );
+                                       wfProfileIn( __METHOD__ . '::sendDocs' 
);
+                                       $updateResult = 
$this->getPageType()->addDocuments( $documents );
+                                       wfDebugLog( 'CirrusSearch', 'Update 
completed in ' . $updateResult->getEngineTime() . ' (engine) millis' );
+                                       wfProfileOut( __METHOD__ . '::sendDocs' 
);
+                                       $completed += $result->count();
+                                       $rate = round( $completed / ( 
microtime( true ) - $operationStartTime ) );
+                                       $this->output( $this->indent . 
"[$childNumber] Reindexed $completed/$totalDocsToReindex documents at 
$rate/second\n");
+                               }
+                       } catch ( \Elastica\Exception\ExceptionInterface $e ) {
+                               // Note that we can't fail the master here, we 
have to check how many documents are in the new index in the master.
+                               wfLogWarning( "Search backend error during 
reindex.  Error message is:  " . $e->getMessage() );
+                               die( 1 );
                        }
-                       wfProfileIn( __METHOD__ . '::packageDocs' );
-                       $documents = array();
-                       while ( $result->current() ) {
-                               $documents[] = new \Elastica\Document( 
$result->current()->getId(), $result->current()->getSource() );
-                               $result->next();
-                       }
-                       wfProfileOut( __METHOD__ . '::packageDocs' );
-                       wfProfileIn( __METHOD__ . '::sendDocs' );
-                       $updateResult = $this->getPageType()->addDocuments( 
$documents );
-                       wfDebugLog( 'CirrusSearch', 'Update completed in ' . 
$updateResult->getEngineTime() . ' (engine) millis' );
-                       wfProfileOut( __METHOD__ . '::sendDocs' );
-                       $completed += $result->count();
-                       $rate = round( $completed / ( microtime( true ) - 
$operationStartTime ) );
-                       $this->output( $this->indent . "Reindexed 
$completed/$totalDocsToReindex documents at $rate/second\n");
+                       die( 0 );
+               case 'done':
+                       break;
+               default:
+                       $this->error( "Unexpected result while forking:  
$forkResult", 1 );
                }
+
+               $this->output( $this->indent . "Verifying counts..." );
+               $oldCount = CirrusSearchConnection::getPageType( 
$this->indexType )->count();
+               $this->getIndex()->refresh();
+               $newCount = $this->getPageType()->count();
+               if ( $oldCount !== $newCount ) {
+                       $this->output( "Different!  Expected $oldCount but got 
$newCount\n" );
+                       $this->error( "Failed to load index.  Expected 
$oldCount but got $newCount.  Check for warnings above.", 1 );
+               }
+               $this->output( "done\n" );
+
+               // Revert settings changed just for reindexing
+               $settings->set( array(
+                       'refresh_interval' => '1s',
+                       'merge.policy.merge_factor' => 10,
+               ) );
        }
 
        private function createIndex( $rebuild ) {
                $this->getIndex()->create( array(
                        'settings' => array(
                                'number_of_shards' => $this->getShardCount(),
-                               'number_of_replicas' => 
$this->getReplicaCount(),
+                               'number_of_replicas' => 
$this->reindexAndRemoveOk ? 0 : $this->getReplicaCount(),
                                'analysis' => 
CirrusSearchAnalysisConfigBuilder::build(),
+                               'translog.flush_threshold_ops' => 50000,   // 
This is supposed to help with bulk index io load.
                        )
                ), $rebuild );
                $this->closeOk = false;
+               $this->tooFewReplicas = $this->reindexAndRemoveOk;
        }
 
        private function closeAndCorrect( $callback ) {

-- 
To view, visit https://gerrit.wikimedia.org/r/88131
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I51101191a9e1daac794d027af23bbf6a0fb096ab
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/CirrusSearch
Gerrit-Branch: master
Gerrit-Owner: Manybubbles <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to