EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/334009 )
Change subject: [WIP] Replace reindexing with _reindex API ...................................................................... [WIP] Replace reindexing with _reindex API Initial replacement of the internals of our Reindexer class with the _reindex API in es 5. This generally appears to work for the basic case of copying an index for mapping updates, but it needs further testing. Tested working: * Basic mapping change and copy from old index to new on same cluster * Delete specific fields from all documents while reindexing * Can still vary chunk size as necessary. TODO: * Have not yet implemented indexing from a remote ES server, which is needed for the copySearchIndex.php script. * Needs decisions about some specific parameters, such as request throttling and timeouts. In the past request throttling was done through parallelism limits rather than a requests per second, likely will switch to a flag on the maint scripts? * What happens when batch sizes are too big? not sure yet. We used to have a bunch of code for handling it. Need to review upstream code. * There was mapping information passed into the reindexer, but it was never used. I've removed that but need to trace back some history and figure out if something was lost. Bug: T155506 Change-Id: Ifbbae065038cfe6e07a84f6a1b46dd317c3081d3 --- M includes/Maintenance/Reindexer.php M includes/SearchConfig.php M maintenance/copySearchIndex.php M maintenance/updateOneSearchIndexConfig.php 4 files changed, 60 insertions(+), 361 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/CirrusSearch refs/changes/09/334009/1 diff --git a/includes/Maintenance/Reindexer.php b/includes/Maintenance/Reindexer.php index 16a8f21..ffe9f3c 100644 --- a/includes/Maintenance/Reindexer.php +++ b/includes/Maintenance/Reindexer.php @@ -10,6 +10,7 @@ use Elastica\Exception\ExceptionInterface; use Elastica\Index; use Elastica\Query; +use Elastica\Request; use Elastica\Type; use ForkController; use MediaWiki\Logger\LoggerFactory; @@ -86,11 +87,6 @@ private $mergeSettings; /** - * @var array - */ - private $mappingConfig; - - /** * @var Maintenance */ private $out; @@ -109,11 +105,10 @@ * @param int $shardCount * @param string $replicaCount * @param array $mergeSettings - * @param array $mappingConfig * @param Maintenance $out * @throws \Exception */ - public function __construct( SearchConfig $searchConfig, Connection $source, Connection $target, array $types, array $oldTypes, $shardCount, $replicaCount, array $mergeSettings, array $mappingConfig, Maintenance $out = null, $fieldsToDelete = [] ) { + public function __construct( SearchConfig $searchConfig, Connection $source, Connection $target, array $types, array $oldTypes, $shardCount, $replicaCount, array $mergeSettings, Maintenance $out = null, $fieldsToDelete = [] ) { // @todo: this constructor has too many arguments - refactor! $this->searchConfig = $searchConfig; $this->oldConnection = $source; @@ -123,7 +118,6 @@ $this->shardCount = $shardCount; $this->replicaCount = $replicaCount; $this->mergeSettings = $mergeSettings; - $this->mappingConfig = $mappingConfig; $this->out = $out; $this->fieldsToDelete = $fieldsToDelete; @@ -137,13 +131,11 @@ /** * Dump everything from the live index into the one being worked on. * - * @param int $processes * @param int $refreshInterval - * @param int $retryAttempts * @param int $chunkSize * @param float $acceptableCountDeviation */ - public function reindex( $processes = 1, $refreshInterval = 1, $retryAttempts = 5, $chunkSize = 100, $acceptableCountDeviation = .05 ) { + public function reindex( $refreshInterval = 1, $chunkSize = 100, $acceptableCountDeviation = .05 ) { global $wgCirrusSearchWikimediaExtraPlugin; // Set some settings that should help io load during bulk indexing. We'll have to @@ -160,41 +152,38 @@ 'routing.allocation.total_shards_per_node' => $maxShardsPerNode, ] ); - if ( $processes > 1 ) { - if ( !isset( $wgCirrusSearchWikimediaExtraPlugin[ 'id_hash_mod_filter' ] ) || - !$wgCirrusSearchWikimediaExtraPlugin[ 'id_hash_mod_filter' ] ) { - $this->error( "Can't use multiple processes without \$wgCirrusSearchWikimediaExtraPlugin[ 'id_hash_mod_filter' ] = true", 1 ); + $script = $this->generateDeleteFieldsScript(); + + foreach ( $this->types as $i => $type ) { + $oldType = $this->oldTypes[$i]; + $request = [ + 'source' => [ + 'index' => $oldType->getIndex()->getName(), + 'type' => $oldType->getName(), + 'size' => $chunkSize, + ], + 'dest' => [ + 'index' => $type->getIndex()->getName(), + 'type' => $type->getName(), + ], + ]; + + if ( $this->connection !== $this->oldConnection ) { + $request['source']['remote'] = [ + 'host' => $this->makeRemoteSourceParams( $this->oldConnection ), + ]; } - $fork = new ForkController( $processes ); - $forkResult = $fork->start(); - // we don't want to share sockets between forks, so destroy the client. - $this->destroyClients(); + if ( $script !== null ) { + $request['script'] = $script; + } - switch ( $forkResult ) { - case 'child': - $success = false; - try { - foreach ( $this->types as $i => $type ) { - $oldType = $this->oldTypes[$i]; - $this->reindexInternal( $type, $oldType, $processes, $fork->getChildNumber(), $chunkSize, $retryAttempts, $acceptableCountDeviation ); - } - $success = true; - } finally { - // We don't want the child to continue the script - die( $success ? 0 : 1 ); - } - case 'done': - // When done is returned all children have terminated this is handled by the ForkController - break; - default: - $this->error( "Unexpected result while forking: $forkResult", 1 ); - } - } else { - foreach ( $this->types as $i => $type ) { - $oldType = $this->oldTypes[$i]; - $this->reindexInternal( $type, $oldType, 1, 1, $chunkSize, $retryAttempts, $acceptableCountDeviation ); - } + $this->connection->getClient()->request( '_reindex', Request::POST, $request, [ + // @TODO How to reasonably set the throttling? + 'requests_per_second' => -1, + // @todo Controls how long individual bulk write requests will wait + // 'timeout' => ???, + ] ); } $this->outputIndented( "Verifying counts..." ); @@ -280,176 +269,6 @@ } /** - * Internal reindex method. Will run in a separate process if $children is > 1 - * - * @param Type $type the elasticsearch Type we will write to - * @param Type $oldType the elasticsearch Type we read from - * @param int $children the number of processes - * @param int $childNumber the "process id" - * @param int|string $chunkSize number of docs we index in one bulk operation - * @param int $retryAttempts the number of times we retry elasticsearch operations - * @param float $acceptableCountDeviation acceptable difference between - * the number of documents in the old and new index, 0.05 means that we - * accept a ratio of 5%. If the source index contains 100 docs the target index - * must contain between 95 and 105 docs. - * @throws \Exception if an unrecoverable error occured while reindexing - */ - private function reindexInternal( Type $type, Type $oldType, $children, $childNumber, $chunkSize, $retryAttempts, $acceptableCountDeviation ) { - $filter = null; - $messagePrefix = ""; - if ( $childNumber === 1 && $children === 1 ) { - $this->outputIndented( "\t\tStarting single process reindex\n" ); - } else { - if ( $childNumber >= $children ) { - $this->error( "Invalid parameters - childNumber >= children ($childNumber >= $children) ", 1 ); - } - $messagePrefix = "\t\t[$childNumber] "; - $this->outputIndented( $messagePrefix . "Starting child process reindex\n" ); - // Note that it is not ok to abs(_uid.hashCode) because hashCode(Integer.MIN_VALUE) == Integer.MIN_VALUE - $filter = new \CirrusSearch\Extra\Query\IdHashMod( $children, $childNumber ); - } - $numberOfDocsInOldType = $oldType->count(); - $scrollId = null; - try { - $query = new Query(); - $query->setStoredFields( [ '_id', '_source' ] ); - if ( $filter ) { - $bool = new \Elastica\Query\BoolQuery(); - $bool->addFilter( $filter ); - $query->setQuery( $bool ); - } - - // Note here we dump from the current index (using the alias) so we can use Connection::getPageType - $scrollTime = '1h'; - $results = $oldType - ->search( $query, [ - 'search_type' => 'scan', - 'scroll' => $scrollTime, - 'size'=> $chunkSize, - ] ); - $totalDocsToReindex = $results->getResponse()->getData(); - $totalDocsToReindex = $totalDocsToReindex['hits']['total']; - $this->outputIndented( $messagePrefix . "About to reindex $totalDocsToReindex documents\n" ); - $operationStartTime = microtime( true ); - $completed = 0; - $failures = 0; - - while( true ) { - $scrollId = $results->getResponse()->getScrollId(); - $results = MWElasticUtils::withRetry( $retryAttempts, - function() use ( $oldType, $scrollId, $scrollTime ) { - return $oldType->search( [], [ - 'scroll_id' => $scrollId, - 'scroll' => $scrollTime, - ] ); - }, - /* on error callback */ - function( $e, $errors ) use ( $messagePrefix ) { - $this->sleepOnRetry( $e, $errors, - $messagePrefix, 'fetching documents to reindex' ); - } - ); - - if( !$results->count() ) { - // No need to clear scroll on the last call - $scrollId = null; - break; - } - - $documents = []; - foreach( $results as $result ) { - $documents[] = $this->buildNewDocument( $result ); - } - - // We will retry bulk failures by sending index operations - // individually, if we fail again we ignore the error. - $nbDocIndexed = $this->sendDocuments( $type, $messagePrefix, $documents ); - - $failures += count( $results ) - $nbDocIndexed; - $completed += count( $results ); - - $rate = round( $completed / ( microtime( true ) - $operationStartTime ) ); - $this->outputIndented( $messagePrefix . - "Reindexed $completed/$totalDocsToReindex documents " . - "($failures failures) at $rate/second\n" ); - - // Check if we can have enough docs in the future index - // by deducting the number of failed docs - $deviation = $failures / $numberOfDocsInOldType; - if ( $numberOfDocsInOldType > 0 && $deviation > $acceptableCountDeviation ) { - throw new \Exception( "Too many failures ($failures), " . - "the resulting index cannot have enough documents " . - "to match an acceptable count deviation of " . - "$acceptableCountDeviation." ); - } - } - $this->outputIndented( $messagePrefix . "All done\n" ); - } catch ( ExceptionInterface $e ) { - // Note that we can't fail the master here, we have to check how many documents are in the new index in the master. - $type = get_class( $e ); - $error = ElasticaErrorHandler::extractFullError( $e ); - LoggerFactory::getInstance( 'CirrusSearch' )->warning( - "Search backend error during reindex. Error type is '{exception_type}' ({error_type}) and message is: {error_reason}", - [ - 'exception_type' => $type, - 'error_type' => $error['type'], - 'error_reason' => $error['reason'], - ] - ); - /** @suppress PhanTypeMismatchArgumentInternal ExceptionInterface is an Exception */ - throw new \Exception( "Search backend error during reindex.", 0, $e ); - } catch( \Exception $e ) { - $this->outputIndented( $messagePrefix . "Error during reindex: " . $e->getMessage() . "\n" ); - LoggerFactory::getInstance( 'CirrusSearch' )->warning( - "Error during reindex: {error_message}", - [ - 'error_message' => $e->getMessage(), - 'exception' => $e, - ] - ); - throw new \Exception( 'Error during reindex.', 0, $e ); - } finally { - if ( $scrollId ) { - try { - $oldType->getIndex()->getClient() ->request( - "_search/scroll/".$scrollId, - \Elastica\Request::DELETE - ); - } catch ( \Exception $e ) {} - } - } - } - - /** - * Build the new document to just contain keys which have a mapping in the new properties. To clean - * out any old fields that we no longer use. - * - * @param \Elastica\Result $result original document retrieved from a search - * @param array $properties mapping properties - * @return Document - */ - private function buildNewDocument( \Elastica\Result $result ) { - // FIXME: support inner properties - $data = array_diff_key( $result->getSource(), array_flip( $this->fieldsToDelete ) ); - // This field was added July, 2016. For the first reindex that occurs after it was added it will - // not exist in the documents, so add it here. - if ( !isset( $data['wiki'] ) ) { - $data['wiki'] = $this->searchConfig->getWikiId(); - } - - // Maybe instead the reindexer should know if we are converting from the old - // style numeric page id's to the new style prefixed id's. This probably - // works though. - $docId = $this->searchConfig->maybeMakeId( $result->getId() ); - - // Note that while setting the opType to create might improve performance slightly it can cause - // trouble if the scroll returns the same id twice. It can do that if the document is updated - // during the scroll process. I'm unclear on if it will always do that, so you still have to - // perform the date based catch up after the reindex. - return new Document( $docId, $data ); - } - - /** * Get health information about the index * * @return array Response data array @@ -487,110 +306,45 @@ } /** - * @param ExceptionInterface $e exception caught - * @param int $errors number of errors - * @param string $messagePrefix - * @param string $description + * @return array|null Returns an array suitable for use as + * the _reindex api script parameter to delete fields from + * the copied documents, or null if no script is needed. */ - private function sleepOnRetry( ExceptionInterface $e, $errors, $messagePrefix, $description ) { - $type = get_class( $e ); - $seconds = MWElasticUtils::backoffDelay( $errors ); - $message = ElasticaErrorHandler::extractMessage( $e ); - $this->outputIndented( $messagePrefix . "Caught an error $description. " . - "Backing off for $seconds and retrying. Error type is '$type' and message is: $message\n" ); - sleep( $seconds ); + private function generateDeleteFieldsScript() { + if ( !$this->fieldsToDelete ) { + return null; + } + + $script = [ + 'inline' => '', + 'lang' => 'painless', + ]; + foreach ( $this->fieldsToDelete as $field ) { + // Does this actually work? + $script['inline'] .= "ctx._source.remove('$field');"; + } + + return $script; } /** - * Send documents to type with retry. + * Creates an array suitable for use as the _reindex api source.remote + * parameter to read from $connection * - * @param Type $type - * @param string $messagePrefix - * @param \Elastica\Document[] - * @return int the number of indexed document + * @param Connection $connection + * @return array */ - private function sendDocuments( Type $type, $messagePrefix, array $documents ) { - $nbDocIndexed = 0; - try { - $type->addDocuments( $documents ); - $nbDocIndexed = count( $documents ); - } catch ( ExceptionInterface $e ) { - $errorType = get_class( $e ); - $message = ElasticaErrorHandler::extractMessage( $e ); - if ( $e instanceof \Elastica\Exception\Bulk\ResponseException ) { - // Some docs failed let's retry them individually - $failures = count( $e->getActionExceptions() ); - $nbDocIndexed += count( $documents ) - $failures; - $this->outputIndented( $messagePrefix . "Partial failure (indexed $nbDocIndexed, failed $failures) detected in bulk action. " . - "Retrying as singles. Error type is '$errorType' and message is: $message\n" ); - $nbDocIndexed += $this->retryBulkFailures( $type, $messagePrefix, $documents, $e ); - } else { - // Global failure? network? - $this->outputIndented( $messagePrefix . "Error adding documents in bulk. Retrying as singles. Error type is '$errorType' and message is: $message\n" ); - foreach ( $documents as $document ) { - $nbDocIndexed += $this->sendSingleDoc( $type, $messagePrefix, $document ); - } - } - } - return $nbDocIndexed; + private function makeRemoteSourceParams( Connection $connection ) { + throw new \RuntimeException( 'Not Implemented' ); } /** - * Retry individual failures from a bulk request - * - * @param Type $type - * @param string $messagePrefix - * @param Elastica\Document[] $documents - * @param Elastica\Exception\Bulk\ResponseException $e - * @return int the number of indexed documents - */ - private function retryBulkFailures( Type $type, $messagePrefix, array $documents, \Elastica\Exception\Bulk\ResponseException $e ) { - $docsById = []; - foreach( $documents as $doc ) { - $docsById[$doc->getId()] = $doc; - } - $nbDocIndexed = 0; - foreach( $e->getActionExceptions() as $failure ) { - $action = $failure->getAction(); - if ( !isset( $docsById[$action->getMetadata()['_id']] ) ) { - // A bug in Elastica? - continue; - } - $doc = $docsById[$action->getMetadata()['_id']]; - $nbDocIndexed += $this->sendSingleDoc( $type, $messagePrefix, $doc ); - } - return $nbDocIndexed; - } - - /** - * Retry a single doc, a failure will be ignored - * - * @param Type $type - * @param string $messagePrefix - * @param Elastica\Document $doc - * @return int the number of docs indexed: 1 if succesfull 0 otherwize - */ - private function sendSingleDoc( Type $type, $messagePrefix, \Elastica\Document $doc ) { - try { - // TODO: should we use MWElasticUtils::withRetry here? - $type->addDocument( $doc ); - return 1; - } catch( ExceptionInterface $e ) { - $errorType = get_class( $e ); - $message = ElasticaErrorHandler::extractMessage( $e ); - $id = $doc->getId(); - $this->outputIndented( $messagePrefix . "Failed to to index doc id $id : $errorType, $message\n" ); - return 0; - } - } - - /** - * Reset connection timeouts + * Set the maintenance timeout to the connection we will issue the reindex request + * to, so that it does not timeout will the reindex is running. */ private function setConnectionTimeout() { $timeout = $this->searchConfig->get( 'CirrusSearchMaintenanceTimeout' ); $this->connection->setTimeout( $timeout ); - $this->oldConnection->setTimeout( $timeout ); } /** diff --git a/includes/SearchConfig.php b/includes/SearchConfig.php index 41c4339..c81a7e5 100644 --- a/includes/SearchConfig.php +++ b/includes/SearchConfig.php @@ -171,22 +171,6 @@ } /** - * There are times, such as when using the Reindexer, when we aren't completely - * sure if these are old style numeric page id's, or new style prefixed id's. - * Do some magic to decide and self::makeId() when necessary. - * - * @param string|int $pageOrDocId - * @return string - */ - public function maybeMakeId( $pageOrDocId ) { - if ( !is_string( $pageOrDocId ) || ctype_digit( $pageOrDocId ) ) { - return $this->makeId( $pageOrDocId ); - } else { - return $pageOrDocId; - } - } - - /** * Convert an elasticsearch document id back into a mediawiki page id. * * @param string $docId Elasticsearch document id diff --git a/maintenance/copySearchIndex.php b/maintenance/copySearchIndex.php index 972191f..e944129 100644 --- a/maintenance/copySearchIndex.php +++ b/maintenance/copySearchIndex.php @@ -102,10 +102,9 @@ $clusterSettings->getShardCount( $this->indexType ), $clusterSettings->getReplicaCount( $this->indexType ), $this->getMergeSettings(), - $this->getMappingConfig(), $this ); - $reindexer->reindex( $processes, 1, $reindexRetryAttempts, $reindexChunkSize); + $reindexer->reindex( 1, $reindexChunkSize); $reindexer->optimize(); $reindexer->waitForShards(); } @@ -123,25 +122,6 @@ // If there aren't configured merge settings for this index type default to the content type. return $wgCirrusSearchMergeSettings[ 'content' ]; } - - /** - * @return array - */ - protected function getMappingConfig() { - global $wgCirrusSearchPrefixSearchStartsWithAnyWord, $wgCirrusSearchPhraseSuggestUseText, - $wgCirrusSearchOptimizeIndexForExperimentalHighlighter; - - $builder = new MappingConfigBuilder( $wgCirrusSearchOptimizeIndexForExperimentalHighlighter ); - $configFlags = 0; - if ( $wgCirrusSearchPrefixSearchStartsWithAnyWord ) { - $configFlags |= MappingConfigBuilder::PREFIX_START_WITH_ANY; - } - if ( $wgCirrusSearchPhraseSuggestUseText ) { - $configFlags |= MappingConfigBuilder::PHRASE_SUGGEST_USE_TEXT; - } - return $builder->buildConfig( $configFlags ); - } - } $maintClass = CopySearchIndex::class; diff --git a/maintenance/updateOneSearchIndexConfig.php b/maintenance/updateOneSearchIndexConfig.php index d3cf0a5..03e3daa 100644 --- a/maintenance/updateOneSearchIndexConfig.php +++ b/maintenance/updateOneSearchIndexConfig.php @@ -54,11 +54,6 @@ private $reindexChunkSize; /** - * @var int - */ - private $reindexRetryAttempts; - - /** * @var string */ private $indexBaseName; @@ -77,11 +72,6 @@ * @var boolean are there too few replicas in the index we're making? */ private $tooFewReplicas = false; - - /** - * @var int number of processes to use when reindexing - */ - private $reindexProcesses; /** * @var string language code we're building for @@ -170,8 +160,6 @@ "reindex all documents from that index (via the alias) to this one, swing the " . "alias to this index, and then remove other index. Updates performed while this". "operation is in progress will be queued up in the job queue. Defaults to false." ); - $maintenance->addOption( 'reindexProcesses', 'Number of processes to use in reindex. ' . - 'Not supported on Windows. Defaults to 1 on Windows and 5 otherwise.', false, true ); $maintenance->addOption( 'reindexAcceptableCountDeviation', 'How much can the reindexed ' . 'copy of an index is allowed to deviate from the current copy without triggering a ' . 'reindex failure. Defaults to 5%.', false, true ); @@ -179,10 +167,6 @@ 'Note when changing the number of shards that the old shard size is used, not the new ' . 'one. If you see many errors submitting documents in bulk but the automatic retry as ' . 'singles works then lower this number. Defaults to 100.', false, true ); - $maintenance->addOption( 'reindexRetryAttempts', 'Number of times to back off and retry ' . - 'per failure. Note that failures are not common but if Elasticsearch is in the process ' . - 'of moving a shard this can time out. This will retry the attempt after some backoff ' . - 'rather than failing the whole reindex process. Defaults to 5.', false, true ); $maintenance->addOption( 'baseName', 'What basename to use for all indexes, ' . 'defaults to wiki id', false, true ); $maintenance->addOption( 'debugCheckConfig', 'Print the configuration as it is checked ' . @@ -213,11 +197,9 @@ $this->startOver = $this->getOption( 'startOver', false ); $this->indexBaseName = $this->getOption( 'baseName', $this->getSearchConfig()->get( SearchConfig::INDEX_BASE_NAME ) ); $this->reindexAndRemoveOk = $this->getOption( 'reindexAndRemoveOk', false ); - $this->reindexProcesses = $this->getOption( 'reindexProcesses', wfIsWindows() ? 1 : 5 ); $this->reindexAcceptableCountDeviation = Util::parsePotentialPercent( $this->getOption( 'reindexAcceptableCountDeviation', '5%' ) ); $this->reindexChunkSize = $this->getOption( 'reindexChunkSize', 100 ); - $this->reindexRetryAttempts = $this->getOption( 'reindexRetryAttempts', 5 ); $this->printDebugCheckConfig = $this->getOption( 'debugCheckConfig', false ); $this->langCode = $wgLanguageCode; $this->prefixSearchStartsWithAny = $wgCirrusSearchPrefixSearchStartsWithAnyWord; @@ -421,7 +403,6 @@ $this->getShardCount(), $this->getReplicaCount(), $this->getMergeSettings(), - $this->getMappingConfig(), $this, explode( ',', $this->getOption( 'fieldsToDelete', '' ) ) ); @@ -432,7 +413,7 @@ $this->getSpecificIndexName(), $this->startOver, $reindexer, - [ $this->reindexProcesses, $this->refreshInterval, $this->reindexRetryAttempts, $this->reindexChunkSize, $this->reindexAcceptableCountDeviation ], + [ $this->refreshInterval, $this->reindexChunkSize, $this->reindexAcceptableCountDeviation ], $this->getIndexSettingsValidators(), $this->reindexAndRemoveOk, $this->tooFewReplicas, -- To view, visit https://gerrit.wikimedia.org/r/334009 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ifbbae065038cfe6e07a84f6a1b46dd317c3081d3 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/extensions/CirrusSearch Gerrit-Branch: es5 Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits