Manybubbles has uploaded a new change for review. https://gerrit.wikimedia.org/r/88131
Change subject: Optimize in place reindexing. ...................................................................... Optimize in place reindexing. 1. Turn replicas on _after_ building the new index. 2. Optimize the index before adding the replicas. Might help some with speed but helps more with query performance on closed indexes. 3. Build the index in multiple processes. 4. Use the 'create' action instead of the 'index' action. Bug: 54918 Change-Id: I51101191a9e1daac794d027af23bbf6a0fb096ab --- M CirrusSearch.php A maintenance/includes/CirrusSearchReindexForkController.php M updateOneSearchIndexConfig.php 3 files changed, 221 insertions(+), 50 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/CirrusSearch refs/changes/31/88131/1 diff --git a/CirrusSearch.php b/CirrusSearch.php index 54764e6..42dc04b 100644 --- a/CirrusSearch.php +++ b/CirrusSearch.php @@ -102,17 +102,21 @@ $wgCirrusSearchUseAggressiveSplitting = true; $dir = __DIR__ . '/'; +$includes = "$dir/includes/"; +$maintenanceIncludes = "$dir/maintenance/includes/"; /** * Classes */ -$wgAutoloadClasses['CirrusSearch'] = $dir . 'includes/CirrusSearch.body.php'; -$wgAutoloadClasses['CirrusSearchAnalysisConfigBuilder'] = $dir . 'includes/CirrusSearchAnalysisConfigBuilder.php'; -$wgAutoloadClasses['CirrusSearchConnection'] = $dir . 'includes/CirrusSearchConnection.php'; -$wgAutoloadClasses['CirrusSearchMappingConfigBuilder'] = $dir . 'includes/CirrusSearchMappingConfigBuilder.php'; -$wgAutoloadClasses['CirrusSearchPrefixSearchHook'] = $dir . 'includes/CirrusSearchPrefixSearchHook.php'; -$wgAutoloadClasses['CirrusSearchSearcher'] = $dir . 'includes/CirrusSearchSearcher.php'; -$wgAutoloadClasses['CirrusSearchTextFormatter'] = $dir . 'includes/CirrusSearchTextFormatter.php'; -$wgAutoloadClasses['CirrusSearchUpdater'] = $dir . 'includes/CirrusSearchUpdater.php'; +$wgAutoloadClasses['CirrusSearch'] = $includes . 'CirrusSearch.body.php'; +$wgAutoloadClasses['CirrusSearchAnalysisConfigBuilder'] = $includes . 'CirrusSearchAnalysisConfigBuilder.php'; +$wgAutoloadClasses['CirrusSearchConnection'] = $includes . 'CirrusSearchConnection.php'; +$wgAutoloadClasses['CirrusSearchMappingConfigBuilder'] = $includes . 'CirrusSearchMappingConfigBuilder.php'; +$wgAutoloadClasses['CirrusSearchPrefixSearchHook'] = $includes . 'CirrusSearchPrefixSearchHook.php'; +$wgAutoloadClasses['CirrusSearchSearcher'] = $includes . 'CirrusSearchSearcher.php'; +$wgAutoloadClasses['CirrusSearchTextFormatter'] = $includes . 'CirrusSearchTextFormatter.php'; +$wgAutoloadClasses['CirrusSearchUpdater'] = $includes . 'CirrusSearchUpdater.php'; + +$wgAutoloadClasses['CirrusSearchReindexForkController'] = $maintenanceIncludes . 'CirrusSearchReindexForkController.php'; /** * Hooks diff --git a/maintenance/includes/CirrusSearchReindexForkController.php b/maintenance/includes/CirrusSearchReindexForkController.php new file mode 100644 index 0000000..f6b3803 --- /dev/null +++ b/maintenance/includes/CirrusSearchReindexForkController.php @@ -0,0 +1,61 @@ +<? +/** + * Extensions to ForeController to prepare Elastica and to tell the child + * process which one it is. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + */ +class CirrusSearchReindexForkController extends ForkController { + /** + * @var integer number of this child or null if this is the parent + */ + var $childNumber; + /** + * Fork a number of worker processes. Have to hack ForkController to store + * the child number. + * + * @return string + */ + protected function forkWorkers( $numProcs ) { + $this->prepareEnvironment(); + + // Create the child processes + for ( $i = 0; $i < $numProcs; $i++ ) { + // Do the fork + $pid = pcntl_fork(); + if ( $pid === -1 || $pid === false ) { + echo "Error creating child processes\n"; + exit( 1 ); + } + + if ( !$pid ) { + $this->initChild(); + $this->childNumber = $i; // Hack right here. + return 'child'; + } else { + // This is the parent process + $this->children[$pid] = true; + } + } + + return 'parent'; + } + + protected function prepareEnvironment() { + parent::prepareEnvironment(); + CirrusSearchConnection::destroySingleton(); + } +} diff --git a/updateOneSearchIndexConfig.php b/updateOneSearchIndexConfig.php index 065228e..cd8e1cb 100644 --- a/updateOneSearchIndexConfig.php +++ b/updateOneSearchIndexConfig.php @@ -44,6 +44,16 @@ // steps. private $removeIndecies = false; + /** + * @var are there too few replicas in the index we're making? + */ + private $tooFewReplicas = false; + + /** + * @var number of processes to use when reindexing + */ + private $reindexProcesses; + public function __construct() { parent::__construct(); $this->addDescription( "Update the configuration or contents of one search index." ); @@ -78,6 +88,8 @@ "reindex all documents from that index (via the alias) to this one, swing the " . "alias to this index, and then remove other index. You'll have to redo all updates ". "performed during this operation manually. Defaults to false." ); + $maintenance->addOption( 'reindexProcesses', 'Number of processess to use in reindex. ' . + 'Defaults to 10.', false, true ); } public function execute() { @@ -100,6 +112,7 @@ $this->closeOk = $this->getOption( 'closeOk', false ); $this->indexIdentifier = $this->pickIndexIdentifierFromOption( $this->getOption( 'indexIdentifier', 'current' ) ); $this->reindexAndRemoveOk = $this->getOption( 'reindexAndRemoveOk', false ); + $this->reindexProcesses = $this->getOption( 'reindexProcesses', 10 ); $this->validateIndex(); $this->validateAnalyzers(); @@ -128,10 +141,14 @@ return; } $this->output( $this->indent . "Index exists so validating...\n" ); + $this->validateIndexSettings(); + } + + private function validateIndexSettings() { $settings = $this->getIndex()->getSettings()->get(); $this->output( $this->indent . "\tValidating number of shards..." ); - $actualShardCount = $settings['index.number_of_shards']; + $actualShardCount = $settings[ 'index.number_of_shards' ]; if ( $actualShardCount == $this->getShardCount() ) { $this->output( "ok\n" ); } else { @@ -145,7 +162,7 @@ } $this->output( $this->indent . "\tValidating number of replicas..." ); - $actualReplicaCount = $settings['index.number_of_replicas']; + $actualReplicaCount = $settings[ 'index.number_of_replicas' ]; if ( $actualReplicaCount == $this->getReplicaCount() ) { $this->output( "ok\n" ); } else { @@ -283,8 +300,11 @@ } } else { foreach ( $status->getIndicesWithAlias( $specificAliasName ) as $index ) { - if( $this->getName() === $this->getSpecificIndexName() ) { + if( $index->getName() === $this->getSpecificIndexName() ) { $this->output( "ok\n" ); + if ( $this->tooFewReplicas ) { + $this->validateIndexSettings(); + } return; } else { $otherIndeciesWithAlias[] = $index->getName(); @@ -295,16 +315,40 @@ $this->output( "alias is free..." ); $this->getIndex()->addAlias( $specificAliasName, false ); $this->output( "corrected\n" ); + if ( $this->tooFewReplicas ) { + $this->validateIndexSettings(); + } return; } if ( $this->reindexAndRemoveOk ) { $this->output( "is taken...\n" ); - $this->output( $this->indent . "\tReindexing...\n"); + $this->output( $this->indent . "\tReindexing...\n" ); // Muck with $this->indent because reindex is used to running at the top level. $saveIndent = $this->indent; $this->indent = $this->indent . "\t\t"; $this->reindex(); $this->indent = $saveIndent; + if ( $this->tooFewReplicas ) { + // Optimize the index so it'll be more compact for replication. Not required + // but should be helpful. + $this->output( $this->indent . "\tOptimizing..." ); + $this->getIndex()->optimize( array( 'max_num_segments' => 5 ) ); + $this->output( "Done\n" ); + $this->validateIndexSettings(); + $this->output( $this->indent . "\tWaiting for all shards to start...\n" ); + $each = 0; + while ( true ) { + $unstarted = $this->indexShardsUnstarted(); + if ( $each === 0 ) { + $this->output( $this->indent . "\t\t$unstarted remaining\n" ); + } + if ( $unstarted === 0 ) { + break; + } + $each = ( $each + 1 ) % 20; + sleep( 1 ); + } + } $this->output( $this->indent . "\tSwapping alias..."); $this->getIndex()->addAlias( $specificAliasName, true ); $this->output( "done\n" ); @@ -318,6 +362,21 @@ "--reindexAndRemoveOk. Make sure you understand the consequences of either\n" . "choice." ); $this->returnCode = 1; + } + + private function indexShardsUnstarted() { + $data = $this->getIndex()->getStatus()->getData(); + $count = 0; + foreach ( $data[ 'indices' ] as $index ) { + foreach ( $index[ 'shards' ] as $shard ) { + foreach ( $shard as $replica ) { + if ( $replica[ 'state' ] !== 'STARTED' ) { + $count++; + } + } + } + } + return $count; } public function validateAllAlias() { @@ -379,58 +438,105 @@ * reparsing everything. */ private function reindex() { - $query = new Elastica\Query(); - $query->setFields( array( '_id', '_source' ) ); - - // Note here we dump from the current index (using the alias) so we can use CirrusSearchConnection::getPageType - $result = CirrusSearchConnection::getPageType( $this->indexType )->search( $query, array( - 'search_type' => 'scan', - 'scroll' => '10m', - 'size'=> $this->reindexChunkSize / $this->getShardCount() + $settings = $this->getIndex()->getSettings(); + $settings->set( array( + 'refresh_interval' => -1, // This is supposed to help with bulk index io load. + 'merge.policy.merge_factor' => 20, // This is supposed to help with bulk index io load. ) ); - $totalDocsToReindex = $result->getResponse()->getData(); - $totalDocsToReindex = $totalDocsToReindex['hits']['total']; - $this->output( $this->indent . "About to reindex $totalDocsToReindex documents\n" ); - $operationStartTime = microtime( true ); - $completed = 0; - while ( true ) { - wfProfileIn( __METHOD__ . '::receiveDocs' ); - $result = $this->getIndex()->search( array(), array( - 'scroll_id' => $result->getResponse()->getScrollId(), - 'scroll' => '10m' - ) ); - wfProfileOut( __METHOD__ . '::receiveDocs' ); - if ( !$result->count() ) { - $this->output( $this->indent . "All done\n" ); - break; + $children = $this->reindexProcesses; + $fork = new CirrusSearchReindexForkController( $children ); + $forkResult = $fork->start(); + switch ( $forkResult ) { + case 'child': + try { + $childNumber = $fork->childNumber; + $this->output( $this->indent . "[$childNumber] Successfully forked child\n" ); + $query = new Elastica\Query(); + $query->setFields( array( '_id', '_source' ) ); + // Note that it is not ok to abs(_uid.hashCode) because hashCode(Integer.MIN_VALUE) == Integer.MIN_VALUE + $query->setFilter( new Elastica\Filter\Script( + "(doc['_uid'].value.hashCode() & Integer.MAX_VALUE) % $children == $childNumber" ) ); + + // Note here we dump from the current index (using the alias) so we can use CirrusSearchConnection::getPageType + $result = CirrusSearchConnection::getPageType( $this->indexType )->search( $query, array( + 'search_type' => 'scan', + 'scroll' => '10m', + 'size'=> $this->reindexChunkSize / $this->getShardCount() + ) ); + $totalDocsToReindex = $result->getResponse()->getData(); + $totalDocsToReindex = $totalDocsToReindex['hits']['total']; + $this->output( $this->indent . "[$childNumber] About to reindex $totalDocsToReindex documents\n" ); + $operationStartTime = microtime( true ); + $completed = 0; + while ( true ) { + wfProfileIn( __METHOD__ . '::receiveDocs' ); + $result = $this->getIndex()->search( array(), array( + 'scroll_id' => $result->getResponse()->getScrollId(), + 'scroll' => '1h' + ) ); + wfProfileOut( __METHOD__ . '::receiveDocs' ); + if ( !$result->count() ) { + $this->output( $this->indent . "[$childNumber] All done\n" ); + break; + } + wfProfileIn( __METHOD__ . '::packageDocs' ); + $documents = array(); + while ( $result->current() ) { + $document = new \Elastica\Document( $result->current()->getId(), $result->current()->getSource() ); + $document->setOpType( 'create' ); + $documents[] = $document; + $result->next(); + } + wfProfileOut( __METHOD__ . '::packageDocs' ); + wfProfileIn( __METHOD__ . '::sendDocs' ); + $updateResult = $this->getPageType()->addDocuments( $documents ); + wfDebugLog( 'CirrusSearch', 'Update completed in ' . $updateResult->getEngineTime() . ' (engine) millis' ); + wfProfileOut( __METHOD__ . '::sendDocs' ); + $completed += $result->count(); + $rate = round( $completed / ( microtime( true ) - $operationStartTime ) ); + $this->output( $this->indent . "[$childNumber] Reindexed $completed/$totalDocsToReindex documents at $rate/second\n"); + } + } catch ( \Elastica\Exception\ExceptionInterface $e ) { + // Note that we can't fail the master here, we have to check how many documents are in the new index in the master. + wfLogWarning( "Search backend error during reindex. Error message is: " . $e->getMessage() ); + die( 1 ); } - wfProfileIn( __METHOD__ . '::packageDocs' ); - $documents = array(); - while ( $result->current() ) { - $documents[] = new \Elastica\Document( $result->current()->getId(), $result->current()->getSource() ); - $result->next(); - } - wfProfileOut( __METHOD__ . '::packageDocs' ); - wfProfileIn( __METHOD__ . '::sendDocs' ); - $updateResult = $this->getPageType()->addDocuments( $documents ); - wfDebugLog( 'CirrusSearch', 'Update completed in ' . $updateResult->getEngineTime() . ' (engine) millis' ); - wfProfileOut( __METHOD__ . '::sendDocs' ); - $completed += $result->count(); - $rate = round( $completed / ( microtime( true ) - $operationStartTime ) ); - $this->output( $this->indent . "Reindexed $completed/$totalDocsToReindex documents at $rate/second\n"); + die( 0 ); + case 'done': + break; + default: + $this->error( "Unexpected result while forking: $forkResult", 1 ); } + + $this->output( $this->indent . "Verifying counts..." ); + $oldCount = CirrusSearchConnection::getPageType( $this->indexType )->count(); + $this->getIndex()->refresh(); + $newCount = $this->getPageType()->count(); + if ( $oldCount !== $newCount ) { + $this->output( "Different! Expected $oldCount but got $newCount\n" ); + $this->error( "Failed to load index. Expected $oldCount but got $newCount. Check for warnings above.", 1 ); + } + $this->output( "done\n" ); + + // Revert settings changed just for reindexing + $settings->set( array( + 'refresh_interval' => '1s', + 'merge.policy.merge_factor' => 10, + ) ); } private function createIndex( $rebuild ) { $this->getIndex()->create( array( 'settings' => array( 'number_of_shards' => $this->getShardCount(), - 'number_of_replicas' => $this->getReplicaCount(), + 'number_of_replicas' => $this->reindexAndRemoveOk ? 0 : $this->getReplicaCount(), 'analysis' => CirrusSearchAnalysisConfigBuilder::build(), + 'translog.flush_threshold_ops' => 50000, // This is supposed to help with bulk index io load. ) ), $rebuild ); $this->closeOk = false; + $this->tooFewReplicas = $this->reindexAndRemoveOk; } private function closeAndCorrect( $callback ) { -- To view, visit https://gerrit.wikimedia.org/r/88131 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I51101191a9e1daac794d027af23bbf6a0fb096ab Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/extensions/CirrusSearch Gerrit-Branch: master Gerrit-Owner: Manybubbles <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
