Matthias Mullie has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/177222

Change subject: Move reindex code into separate class
......................................................................

Move reindex code into separate class

This depends on I5848fac3a79fc694875f5bc5ca3073d7a0190e55

Change-Id: Ib7bea664e47f38854bfc1e43b58f2c4731217261
---
M CirrusSearch.php
M includes/ElasticsearchIntermediary.php
M includes/Maintenance/ReindexForkController.php
A includes/Maintenance/Reindexer.php
M maintenance/updateOneSearchIndexConfig.php
5 files changed, 465 insertions(+), 265 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/CirrusSearch 
refs/changes/22/177222/1

diff --git a/CirrusSearch.php b/CirrusSearch.php
index e81cb43..13f8ae3 100644
--- a/CirrusSearch.php
+++ b/CirrusSearch.php
@@ -575,6 +575,7 @@
 $wgAutoloadClasses['CirrusSearch\Maintenance\IndexNamespaces'] = __DIR__ . 
'/maintenance/indexNamespaces.php';
 $wgAutoloadClasses['CirrusSearch\Maintenance\Maintenance'] = $maintenanceDir . 
'Maintenance.php';
 $wgAutoloadClasses['CirrusSearch\Maintenance\MappingConfigBuilder'] = 
$maintenanceDir . 'MappingConfigBuilder.php';
+$wgAutoloadClasses['CirrusSearch\Maintenance\Reindexer'] = $maintenanceDir . 
'Reindexer.php';
 $wgAutoloadClasses['CirrusSearch\Maintenance\ReindexForkController'] = 
$maintenanceDir . 'ReindexForkController.php';
 $wgAutoloadClasses['CirrusSearch\Maintenance\Validators\Validator'] = 
$maintenanceDir . '/Validators/Validator.php';
 
$wgAutoloadClasses['CirrusSearch\Maintenance\Validators\CacheWarmersValidator'] 
= $maintenanceDir . '/Validators/CacheWarmersValidator.php';
diff --git a/includes/ElasticsearchIntermediary.php 
b/includes/ElasticsearchIntermediary.php
index 639fb02..2a262b1 100644
--- a/includes/ElasticsearchIntermediary.php
+++ b/includes/ElasticsearchIntermediary.php
@@ -110,7 +110,7 @@
        /**
         * Extract an error message from an exception thrown by Elastica.
         * @param RuntimeException $exception exception from which to extract a 
message
-        * @return message from the exception
+        * @return string message from the exception
         */
        public static function extractMessage( $exception ) {
                if ( !( $exception instanceof ResponseException ) ) {
diff --git a/includes/Maintenance/ReindexForkController.php 
b/includes/Maintenance/ReindexForkController.php
index 997d57d..cb8cd92 100644
--- a/includes/Maintenance/ReindexForkController.php
+++ b/includes/Maintenance/ReindexForkController.php
@@ -4,7 +4,7 @@
 use \ForkController;
 
 /**
- * Extensions to ForeController to prepare Elastica and to tell the child
+ * Extensions to ForkController to prepare Elastica and to tell the child
  * process which one it is.
  *
  * This program is free software; you can redistribute it and/or modify
diff --git a/includes/Maintenance/Reindexer.php 
b/includes/Maintenance/Reindexer.php
new file mode 100644
index 0000000..7b5bb48
--- /dev/null
+++ b/includes/Maintenance/Reindexer.php
@@ -0,0 +1,441 @@
+<?php
+
+namespace CirrusSearch\Maintenance;
+
+use CirrusSearch\ElasticsearchIntermediary;
+use Elastica\Document;
+use Elastica\Exception\Connection\HttpException;
+use Elastica\Exception\ExceptionInterface;
+use Elastica\Filter\Script;
+use Elastica\Index;
+use Elastica\Query;
+use Elastica\Type;
+
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ */
+class Reindexer {
+       /**
+        * This one's public because it's used in a Closure, where $this is 
passed
+        * in as $self (because PHP<5.4 doesn't properly support $this in 
closures)
+        *
+        * @var Index
+        */
+       public $index;
+
+       /**
+        * @var \Elastica\Client
+        */
+       private $client;
+
+       /**
+        * @var string
+        */
+       private $specificIndexName;
+
+       /**
+        * @var Type
+        */
+       private $type;
+
+       /**
+        * @var Type
+        */
+       private $oldType;
+
+       /**
+        * @var int
+        */
+       private $shardCount;
+
+       /**
+        * @var string
+        */
+       private $replicaCount;
+
+       /**
+        * @var int
+        */
+       private $connectionTimeout;
+
+       /**
+        * @var array
+        */
+       private $mergeSettings;
+
+       /**
+        * @var MappingConfigBuilder
+        */
+       private $mappingConfig;
+
+       /**
+        * @var \ElasticaConnection
+        */
+       private $connection;
+
+       /**
+        * @var Maintenance
+        */
+       private $out;
+
+       /**
+        * @param Index $index
+        * @param \ElasticaConnection $connection
+        * @param Type $type
+        * @param Type $oldType
+        * @param int $shardCount
+        * @param string $replicaCount
+        * @param int $connectionTimeout
+        * @param array $mergeSettings
+        * @param MappingConfigBuilder $mappingConfig
+        * @param Maintenance $out
+        */
+       public function __construct( Index $index, \ElasticaConnection 
$connection, Type $type, Type $oldType, $shardCount, $replicaCount, 
$connectionTimeout, array $mergeSettings, MappingConfigBuilder $mappingConfig, 
Maintenance $out = null ) {
+               // @todo: this constructor has too many arguments - refactor!
+               $this->index = $index;
+               $this->client = $this->index->getClient();
+               $this->specificIndexName = $this->index->getName();
+               $this->connection = $connection;
+               $this->type = $type;
+               $this->oldType = $oldType;
+               $this->shardCount = $shardCount;
+               $this->replicaCount = $replicaCount;
+               $this->connectionTimeout = $connectionTimeout;
+               $this->mergeSettings = $mergeSettings;
+               $this->mappingConfig = $mappingConfig;
+               $this->out = $out;
+       }
+
+       /**
+        * Dump everything from the live index into the one being worked on.
+        *
+        * @param int $processes
+        * @param int $refreshInterval
+        * @param int $retryAttempts
+        * @param int $chunkSize
+        * @param float $acceptableCountDeviation
+        */
+       public function reindex( $processes = 1, $refreshInterval = 1, 
$retryAttempts = 5, $chunkSize = 100, $acceptableCountDeviation = .05 ) {
+               // Set some settings that should help io load during bulk 
indexing.  We'll have to
+               // optimize after this to consolidate down to a proper number 
of shards but that is
+               // is worth the price.  total_shards_per_node will help to make 
sure that each shard
+               // has as few neighbors as possible.
+               $settings = $this->index->getSettings();
+               $maxShardsPerNode = $this->decideMaxShardsPerNodeForReindex();
+               $settings->set( array(
+                       'refresh_interval' => -1,
+                       'merge.policy.segments_per_tier' => 40,
+                       'merge.policy.max_merge_at_once' => 40,
+                       'routing.allocation.total_shards_per_node' => 
$maxShardsPerNode,
+               ) );
+
+               if ( $processes > 1 ) {
+                       $fork = new ReindexForkController( $processes );
+                       $forkResult = $fork->start();
+                       // Forking clears the timeout so we have to reinstate 
it.
+                       $this->setConnectionTimeout();
+
+                       switch ( $forkResult ) {
+                               case 'child':
+                                       $this->reindexInternal( $processes, 
$fork->getChildNumber(), $chunkSize, $retryAttempts );
+                                       die( 0 );
+                               case 'done':
+                                       break;
+                               default:
+                                       $this->error( "Unexpected result while 
forking:  $forkResult", 1 );
+                       }
+
+                       $this->outputIndented( "Verifying counts..." );
+                       // We can't verify counts are exactly equal because 
they won't be - we still push updates into
+                       // the old index while reindexing the new one.
+                       $oldCount = (float) $this->oldType->count();
+                       $this->index->refresh();
+                       $newCount = (float) $this->type->count();
+                       $difference = $oldCount > 0 ? abs( $oldCount - 
$newCount ) / $oldCount : 0;
+                       if ( $difference > $acceptableCountDeviation ) {
+                               $this->output( "Not close enough!  
old=$oldCount new=$newCount difference=$difference\n" );
+                               $this->error( 'Failed to load index - counts 
not close enough.  ' .
+                                       "old=$oldCount new=$newCount 
difference=$difference.  " .
+                                       'Check for warnings above.', 1 );
+                       }
+                       $this->output( "done\n" );
+               } else {
+                       $this->reindexInternal( 1, 1, $chunkSize, 
$retryAttempts );
+               }
+
+               // Revert settings changed just for reindexing
+               $settings->set( array(
+                       'refresh_interval' => $refreshInterval . 's',
+                       'merge.policy' => $this->mergeSettings,
+               ) );
+       }
+
+       public function optimize() {
+               // Optimize the index so it'll be more compact for replication. 
 Not required
+               // but should be helpful.
+               $this->outputIndented( "\tOptimizing..." );
+               try {
+                       // Reset the timeout just in case we lost it somewhere 
along the line
+                       $this->setConnectionTimeout();
+                       $this->index->optimize( array( 'max_num_segments' => 5 
) );
+                       $this->output( "Done\n" );
+               } catch ( HttpException $e ) {
+                       if ( $e->getMessage() === 'Operation timed out' ) {
+                               $this->output( "Timed out...Continuing any 
way\n" );
+                               // To continue without blowing up we need to 
reset the connection.
+                               $this->destroySingleton();
+                               $this->setConnectionTimeout();
+                       } else {
+                               throw $e;
+                       }
+               }
+       }
+
+       public function waitForShards() {
+               $this->outputIndented( "\tWaiting for all shards to start...\n" 
);
+               list( $lower, $upper ) = explode( '-', $this->replicaCount );
+               $each = 0;
+               while ( true ) {
+                       $health = $this->getHealth();
+                       $active = $health[ 'active_shards' ];
+                       $relocating = $health[ 'relocating_shards' ];
+                       $initializing = $health[ 'initializing_shards' ];
+                       $unassigned = $health[ 'unassigned_shards' ];
+                       $nodes = $health[ 'number_of_nodes' ];
+                       if ( $nodes < $lower ) {
+                               $this->error( "Require $lower replicas but only 
have $nodes nodes. "
+                                       . "This is almost always due to 
misconfiguration, aborting.", 1 );
+                       }
+                       // If the upper range is all, expect the upper bound to 
be the number of nodes
+                       if ( $upper === 'all' ) {
+                               $upper = $nodes - 1;
+                       }
+                       $expectedReplicas =  min( max( $nodes - 1, $lower ), 
$upper );
+                       $expectedActive = $this->shardCount * ( 1 + 
$expectedReplicas );
+                       if ( $each === 0 || $active === $expectedActive ) {
+                               $this->outputIndented( 
"\t\tactive:$active/$expectedActive relocating:$relocating " .
+                                       "initializing:$initializing 
unassigned:$unassigned\n" );
+                               if ( $active === $expectedActive ) {
+                                       break;
+                               }
+                       }
+                       $each = ( $each + 1 ) % 20;
+                       sleep( 1 );
+               }
+       }
+
+       private function reindexInternal( $children, $childNumber, $chunkSize, 
$retryAttempts ) {
+               $filter = null;
+               $messagePrefix = "";
+               if ( $childNumber === 1 && $children === 1 ) {
+                       $this->outputIndented( "\t\tStarting single process 
reindex\n" );
+               } else {
+                       if ( $childNumber >= $children ) {
+                               $this->error( "Invalid parameters - childNumber 
>= children ($childNumber >= $children) ", 1 );
+                       }
+                       $messagePrefix = "\t\t[$childNumber] ";
+                       $this->outputIndented( $messagePrefix . "Starting child 
process reindex\n" );
+                       // Note that it is not ok to abs(_uid.hashCode) because 
hashCode(Integer.MIN_VALUE) == Integer.MIN_VALUE
+                       $filter = new Script( array(
+                               'script' => "(doc['_uid'].value.hashCode() & 
Integer.MAX_VALUE) % $children == $childNumber",
+                               'lang' => 'groovy'
+                       ) );
+               }
+               $pageProperties = $this->mappingConfig->buildConfig();
+               $pageProperties = $pageProperties[ 'page' ][ 'properties' ];
+               try {
+                       $query = new Query();
+                       $query->setFields( array( '_id', '_source' ) );
+                       if ( $filter ) {
+                               $query->setFilter( $filter );
+                       }
+
+                       // Note here we dump from the current index (using the 
alias) so we can use Connection::getPageType
+                       $result = $this->oldType
+                               ->search( $query, array(
+                                       'search_type' => 'scan',
+                                       'scroll' => '1h',
+                                       'size'=> $chunkSize,
+                               ) );
+                       $totalDocsToReindex = $result->getResponse()->getData();
+                       $totalDocsToReindex = 
$totalDocsToReindex['hits']['total'];
+                       $this->outputIndented( $messagePrefix . "About to 
reindex $totalDocsToReindex documents\n" );
+                       $operationStartTime = microtime( true );
+                       $completed = 0;
+                       $self = $this;
+                       while ( true ) {
+                               wfProfileIn( __METHOD__ . '::receiveDocs' );
+                               $result = $this->withRetry( $retryAttempts, 
$messagePrefix, 'fetching documents to reindex',
+                                       function() use ( $self, $result ) {
+                                               return $self->index->search( 
array(), array(
+                                                       'scroll_id' => 
$result->getResponse()->getScrollId(),
+                                                       'scroll' => '1h'
+                                               ) );
+                                       } );
+                               wfProfileOut( __METHOD__ . '::receiveDocs' );
+                               if ( !$result->count() ) {
+                                       $this->outputIndented( $messagePrefix . 
"All done\n" );
+                                       break;
+                               }
+                               wfProfileIn( __METHOD__ . '::packageDocs' );
+                               $documents = array();
+                               while ( $result->current() ) {
+                                       // Build the new document to just 
contain keys which have a mapping in the new properties.  To clean
+                                       // out any old fields that we no longer 
use.  Note that this filter is only a single level which is
+                                       // likely ok for us.
+                                       $document = new Document( 
$result->current()->getId(),
+                                               array_intersect_key( 
$result->current()->getSource(), $pageProperties ) );
+                                       // Note that while setting the opType 
to create might improve performance slightly it can cause
+                                       // trouble if the scroll returns the 
same id twice.  It can do that if the document is updated
+                                       // during the scroll process.  I'm 
unclear on if it will always do that, so you still have to
+                                       // perform the date based catch up 
after the reindex.
+                                       $documents[] = $document;
+                                       $result->next();
+                               }
+                               wfProfileOut( __METHOD__ . '::packageDocs' );
+                               $this->withRetry( $retryAttempts, 
$messagePrefix, 'retrying as singles',
+                                       function() use ( $self, $messagePrefix, 
$documents ) {
+                                               $self->sendDocuments( 
$messagePrefix, $documents );
+                                       } );
+                               $completed += $result->count();
+                               $rate = round( $completed / ( microtime( true ) 
- $operationStartTime ) );
+                               $this->outputIndented( $messagePrefix .
+                                       "Reindexed 
$completed/$totalDocsToReindex documents at $rate/second\n");
+                       }
+               } catch ( ExceptionInterface $e ) {
+                       // Note that we can't fail the master here, we have to 
check how many documents are in the new index in the master.
+                       $type = get_class( $e );
+                       $message = ElasticsearchIntermediary::extractMessage( 
$e );
+                       wfLogWarning( "Search backend error during reindex.  
Error type is '$type' and message is:  $message" );
+                       die( 1 );
+               }
+       }
+
+       private function getHealth() {
+               while ( true ) {
+                       $indexName = $this->specificIndexName;
+                       $path = "_cluster/health/$indexName";
+                       $response = $this->client->request( $path );
+                       if ( $response->hasError() ) {
+                               $this->error( 'Error fetching index health but 
going to retry.  Message: ' . $response->getError() );
+                               sleep( 1 );
+                               continue;
+                       }
+                       return $response->getData();
+               }
+       }
+
+       private function decideMaxShardsPerNodeForReindex() {
+               $health = $this->getHealth();
+               $totalNodes = $health[ 'number_of_nodes' ];
+               $totalShards = $this->shardCount * ( 
$this->getMaxReplicaCount() + 1 );
+               return ceil( 1.0 * $totalShards / $totalNodes );
+       }
+
+       private function getMaxReplicaCount() {
+               $replica = explode( '-', $this->replicaCount );
+               return $replica[ count( $replica ) - 1 ];
+       }
+
+       /**
+        * @param int $attempts
+        * @param string $messagePrefix
+        * @param string $description
+        * @param callable $func
+        * @return mixed
+        */
+       private function withRetry( $attempts, $messagePrefix, $description, 
$func ) {
+               $errors = 0;
+               while ( true ) {
+                       if ( $errors < $attempts ) {
+                               try {
+                                       return $func();
+                               } catch ( ExceptionInterface $e ) {
+                                       $errors += 1;
+                                       // Random backoff with lowest possible 
upper bound as 16 seconds.
+                                       // With the default maximum number of 
errors (5) this maxes out at 256 seconds.
+                                       $seconds = rand( 1, pow( 2, 3 + $errors 
) );
+                                       $type = get_class( $e );
+                                       $message = 
ElasticsearchIntermediary::extractMessage( $e );
+                                       $this->outputIndented( $messagePrefix . 
"Caught an error $description.  " .
+                                               "Backing off for $seconds and 
retrying.  Error type is '$type' and message is:  $message\n" );
+                                       sleep( $seconds );
+                               }
+                       } else {
+                               return $func();
+                       }
+               }
+       }
+
+       private function sendDocuments( $messagePrefix, $documents ) {
+               try {
+                       $this->type->addDocuments( $documents );
+               } catch ( ExceptionInterface $e ) {
+                       $type = get_class( $e );
+                       $message = ElasticsearchIntermediary::extractMessage( 
$e );
+                       $this->outputIndented( $messagePrefix . "Error adding 
documents in bulk.  Retrying as singles.  Error type is '$type' and message is: 
 $message" );
+                       foreach ( $documents as $document ) {
+                               // Continue using the bulk api because we're 
used to it.
+                               $this->type->addDocuments( array( $document ) );
+                       }
+               }
+       }
+
+       private function setConnectionTimeout() {
+               $this->connection->setTimeout2( $this->connectionTimeout );
+       }
+
+       private function destroySingleton() {
+               $this->connection->destroyClient();
+       }
+
+       /**
+        * @param string $message
+        * @param mixed $channel
+        */
+       protected function output( $message, $channel = null ) {
+               if ( $this->out ) {
+                       $this->out->output( $message, $channel );
+               }
+       }
+
+       /**
+        * @param string $message
+        */
+       protected function outputIndented( $message ) {
+               if ( $this->out ) {
+                       $this->out->outputIndented( $message );
+               }
+       }
+
+       /**
+        * @param string $message
+        * @param int $die
+        */
+       private function error( $message, $die = 0 ) {
+               // @todo: I'll want to get rid of this method, but this patch 
will be big enough already
+               // @todo: I'll probably want to throw exceptions and/or return 
Status objects instead, later
+
+               if ( $this->out ) {
+                       $this->out->error( $message, $die );
+               }
+
+               $die = intval( $die );
+               if ( $die > 0 ) {
+                       die( $die );
+               }
+       }
+}
diff --git a/maintenance/updateOneSearchIndexConfig.php 
b/maintenance/updateOneSearchIndexConfig.php
index 86c3b76..5e4e319 100644
--- a/maintenance/updateOneSearchIndexConfig.php
+++ b/maintenance/updateOneSearchIndexConfig.php
@@ -370,7 +370,7 @@
                        $this->getIndex(),
                        $this->optimizeIndexForExperimentalHighlighter,
                        $this->availablePlugins,
-                       $this->getMappingConfig(),
+                       $this->getMappingConfig()->buildConfig(),
                        $this->getPageType(),
                        $this->getNamespaceType(),
                        $this
@@ -435,59 +435,28 @@
                        return;
                }
                if ( $this->reindexAndRemoveOk ) {
+                       global $wgCirrusSearchMaintenanceTimeout;
+                       $reindexer = new Reindexer(
+                               $this->getIndex(),
+                               Connection::getSingleton(),
+                               $this->getPageType(),
+                               $this->getOldPageType(),
+                               $this->getShardCount(),
+                               $this->getReplicaCount(),
+                               $wgCirrusSearchMaintenanceTimeout,
+                               $this->getMergeSettings(),
+                               $this->getMappingConfig(),
+                               $this
+                       );
+
                        $this->output( "is taken...\n" );
                        $this->outputIndented( "\tReindexing...\n" );
-                       $this->reindex();
+                       $reindexer->reindex( $this->reindexProcesses, 
$this->refreshInterval, $this->reindexRetryAttempts, $this->reindexChunkSize, 
$this->reindexAcceptableCountDeviation);
+
                        if ( $this->tooFewReplicas ) {
-                               // Optimize the index so it'll be more compact 
for replication.  Not required
-                               // but should be helpful.
-                               $this->outputIndented( "\tOptimizing..." );
-                               try {
-                                       // Reset the timeout just in case we 
lost it somehwere along the line
-                                       $this->setConnectionTimeout();
-                                       $this->getIndex()->optimize( array( 
'max_num_segments' => 5 ) );
-                                       $this->output( "Done\n" );
-                               } catch ( 
\Elastica\Exception\Connection\HttpException $e ) {
-                                       if ( $e->getMessage() === 'Operation 
timed out' ) {
-                                               $this->output( "Timed 
out...Continuing any way\n" );
-                                               // To continue without blowing 
up we need to reset the connection.
-                                               $this->destroySingleton();
-                                               $this->setConnectionTimeout();
-                                       } else {
-                                               throw $e;
-                                       }
-                               }
+                               $reindexer->optimize();
                                $this->validateIndexSettings();
-                               $this->outputIndented( "\tWaiting for all 
shards to start...\n" );
-                               list( $lower, $upper ) = explode( '-', 
$this->getReplicaCount() );
-                               $each = 0;
-                               while ( true ) {
-                                       $health = $this->getHealth();
-                                       $active = $health[ 'active_shards' ];
-                                       $relocating = $health[ 
'relocating_shards' ];
-                                       $initializing = $health[ 
'initializing_shards' ];
-                                       $unassigned = $health[ 
'unassigned_shards' ];
-                                       $nodes = $health[ 'number_of_nodes' ];
-                                       if ( $nodes < $lower ) {
-                                               $this->error( "Require $lower 
replicas but only have $nodes nodes. "
-                                                       . "This is almost 
always due to misconfiguration, aborting.", 1 );
-                                       }
-                                       // If the upper range is all, expect 
the upper bound to be the number of nodes
-                                       if ( $upper === 'all' ) {
-                                               $upper = $nodes - 1;
-                                       }
-                                       $expectedReplicas =  min( max( $nodes - 
1, $lower ), $upper );
-                                       $expectedActive = 
$this->getShardCount() * ( 1 + $expectedReplicas );
-                                       if ( $each === 0 || $active === 
$expectedActive ) {
-                                               $this->outputIndented( 
"\t\tactive:$active/$expectedActive relocating:$relocating " .
-                                                       
"initializing:$initializing unassigned:$unassigned\n" );
-                                               if ( $active === 
$expectedActive ) {
-                                                       break;
-                                               }
-                                       }
-                                       $each = ( $each + 1 ) % 20;
-                                       sleep( 1 );
-                               }
+                               $reindexer->waitForShards();
                        }
                        $this->outputIndented( "\tSwapping alias...");
                        $this->getIndex()->addAlias( $specificAliasName, true );
@@ -522,203 +491,6 @@
                                $this->outputIndented( "\t\t$oldIndex..." );
                                $this->getClient()->getIndex( $oldIndex 
)->delete();
                                $this->output( "done\n" );
-                       }
-               }
-       }
-
-       /**
-        * Dump everything from the live index into the one being worked on.
-        */
-       private function reindex() {
-               // Set some settings that should help io load during bulk 
indexing.  We'll have to
-               // optimize after this to consolidate down to a proper number 
of shards but that is
-               // is worth the price.  total_shards_per_node will help to make 
sure that each shard
-               // has as few neighbors as possible.
-               $settings = $this->getIndex()->getSettings();
-               $maxShardsPerNode = $this->decideMaxShardsPerNodeForReindex();
-               $settings->set( array(
-                       'refresh_interval' => -1,
-                       'merge.policy.segments_per_tier' => 40,
-                       'merge.policy.max_merge_at_once' => 40,
-                       'routing.allocation.total_shards_per_node' => 
$maxShardsPerNode,
-               ) );
-
-               if ( $this->reindexProcesses > 1 ) {
-                       $fork = new 
\CirrusSearch\Maintenance\ReindexForkController( $this->reindexProcesses );
-                       $forkResult = $fork->start();
-                       // Forking clears the timeout so we have to reinstate 
it.
-                       $this->setConnectionTimeout();
-
-                       switch ( $forkResult ) {
-                       case 'child':
-                               $this->reindexInternal( 
$this->reindexProcesses, $fork->getChildNumber() );
-                               die( 0 );
-                       case 'done':
-                               break;
-                       default:
-                               $this->error( "Unexpected result while forking: 
 $forkResult", 1 );
-                       }
-
-                       $this->outputIndented( "Verifying counts..." );
-                       // We can't verify counts are exactly equal because 
they won't be - we still push updates into
-                       // the old index while reindexing the new one.
-                       $oldCount = (float) $this->getOldPageType()->count();
-                       $this->getIndex()->refresh();
-                       $newCount = (float) $this->getPageType()->count();
-                       $difference = $oldCount > 0 ? abs( $oldCount - 
$newCount ) / $oldCount : 0;
-                       if ( $difference > 
$this->reindexAcceptableCountDeviation ) {
-                               $this->output( "Not close enough!  
old=$oldCount new=$newCount difference=$difference\n" );
-                               $this->error( 'Failed to load index - counts 
not close enough.  ' .
-                                       "old=$oldCount new=$newCount 
difference=$difference.  " .
-                                       'Check for warnings above.', 1 );
-                       }
-                       $this->output( "done\n" );
-               } else {
-                       $this->reindexInternal( 1, 1 );
-               }
-
-               // Revert settings changed just for reindexing
-               $settings->set( array(
-                       'refresh_interval' => $this->refreshInterval . 's',
-                       'merge.policy' => $this->getMergeSettings(),
-               ) );
-       }
-
-       private function reindexInternal( $children, $childNumber ) {
-               $filter = null;
-               $messagePrefix = "";
-               if ( $childNumber === 1 && $children === 1 ) {
-                       $this->outputIndented( "\t\tStarting single process 
reindex\n" );
-               } else {
-                       if ( $childNumber >= $children ) {
-                               $this->error( "Invalid parameters - childNumber 
>= children ($childNumber >= $children) ", 1 );
-                       }
-                       $messagePrefix = "\t\t[$childNumber] ";
-                       $this->outputIndented( $messagePrefix . "Starting child 
process reindex\n" );
-                       // Note that it is not ok to abs(_uid.hashCode) because 
hashCode(Integer.MIN_VALUE) == Integer.MIN_VALUE
-                       $filter = new Elastica\Filter\Script( array(
-                               'script' => "(doc['_uid'].value.hashCode() & 
Integer.MAX_VALUE) % $children == $childNumber",
-                               'lang' => 'groovy'
-                       ) );
-               }
-               $pageProperties = $this->getMappingConfig();
-               $pageProperties = $pageProperties[ 'page' ][ 'properties' ];
-               try {
-                       $query = new Elastica\Query();
-                       $query->setFields( array( '_id', '_source' ) );
-                       if ( $filter ) {
-                               $query->setFilter( $filter );
-                       }
-
-                       // Note here we dump from the current index (using the 
alias) so we can use Connection::getPageType
-                       $result = $this->getOldPageType()
-                               ->search( $query, array(
-                                       'search_type' => 'scan',
-                                       'scroll' => '1h',
-                                       'size'=> $this->reindexChunkSize,
-                               )
-                       );
-                       $totalDocsToReindex = $result->getResponse()->getData();
-                       $totalDocsToReindex = 
$totalDocsToReindex['hits']['total'];
-                       $this->outputIndented( $messagePrefix . "About to 
reindex $totalDocsToReindex documents\n" );
-                       $operationStartTime = microtime( true );
-                       $completed = 0;
-                       $self = $this;
-                       while ( true ) {
-                               wfProfileIn( __METHOD__ . '::receiveDocs' );
-                               $result = $this->withRetry( 
$this->reindexRetryAttempts, $messagePrefix, 'fetching documents to reindex',
-                                       function() use ( $self, $result ) {
-                                               return 
$self->getIndex()->search( array(), array(
-                                                       'scroll_id' => 
$result->getResponse()->getScrollId(),
-                                                       'scroll' => '1h'
-                                               ) );
-                                       } );
-                               wfProfileOut( __METHOD__ . '::receiveDocs' );
-                               if ( !$result->count() ) {
-                                       $this->outputIndented( $messagePrefix . 
"All done\n" );
-                                       break;
-                               }
-                               wfProfileIn( __METHOD__ . '::packageDocs' );
-                               $documents = array();
-                               while ( $result->current() ) {
-                                       // Build the new document to just 
contain keys which have a mapping in the new properties.  To clean
-                                       // out any old fields that we no longer 
use.  Note that this filter is only a single level which is
-                                       // likely ok for us.
-                                       $document = new \Elastica\Document( 
$result->current()->getId(),
-                                               array_intersect_key( 
$result->current()->getSource(), $pageProperties ) );
-                                       // Note that while setting the opType 
to create might improve performance slightly it can cause
-                                       // trouble if the scroll returns the 
same id twice.  It can do that if the document is updated
-                                       // during the scroll process.  I'm 
unclear on if it will always do that, so you still have to
-                                       // perform the date based catch up 
after the reindex.
-                                       $documents[] = $document;
-                                       $result->next();
-                               }
-                               wfProfileOut( __METHOD__ . '::packageDocs' );
-                               $this->withRetry( $this->reindexRetryAttempts, 
$messagePrefix, 'retrying as singles',
-                                       function() use ( $self, $messagePrefix, 
$documents ) {
-                                               $self->sendDocuments( 
$messagePrefix, $documents );
-                                       } );
-                               $completed += $result->count();
-                               $rate = round( $completed / ( microtime( true ) 
- $operationStartTime ) );
-                               $this->outputIndented( $messagePrefix .
-                                       "Reindexed 
$completed/$totalDocsToReindex documents at $rate/second\n");
-                       }
-               } catch ( \Elastica\Exception\ExceptionInterface $e ) {
-                       // Note that we can't fail the master here, we have to 
check how many documents are in the new index in the master.
-                       $type = get_class( $e );
-                       $message = ElasticsearchIntermediary::extractMessage( 
$e );
-                       wfLogWarning( "Search backend error during reindex.  
Error type is '$type' and message is:  $message" );
-                       die( 1 );
-               }
-       }
-
-       private function getHealth() {
-               while ( true ) {
-                       $indexName = $this->getSpecificIndexName();
-                       $path = "_cluster/health/$indexName";
-                       $response = $this->getClient()->request( $path );
-                       if ( $response->hasError() ) {
-                               $this->error( 'Error fetching index health but 
going to retry.  Message: ' + $response->getError() );
-                               sleep( 1 );
-                               continue;
-                       }
-                       return $response->getData();
-               }
-       }
-
-       private function withRetry( $attempts, $messagePrefix, $description, 
$func ) {
-               $errors = 0;
-               while ( true ) {
-                       if ( $errors < $attempts ) {
-                               try {
-                                       return $func();
-                               } catch ( 
\Elastica\Exception\ExceptionInterface $e ) {
-                                       $errors += 1;
-                                       // Random backoff with lowest possible 
upper bound as 16 seconds.
-                                       // With the default mximum number of 
errors (5) this maxes out at 256 seconds.
-                                       $seconds = rand( 1, pow( 2, 3 + $errors 
) );
-                                       $type = get_class( $e );
-                                       $message = 
ElasticsearchIntermediary::extractMessage( $e );
-                                       $this->outputIndented( $messagePrefix . 
"Caught an error $description.  " .
-                                               "Backing off for $seconds and 
retrying.  Error type is '$type' and message is:  $message\n" );
-                                       sleep( $seconds );
-                               }
-                       } else {
-                               return $func();
-                       }
-               }
-       }
-
-       public function sendDocuments( $messagePrefix, $documents ) {
-               try {
-                       $updateResult = $this->getPageType()->addDocuments( 
$documents );
-               } catch ( \Elastica\Exception\ExceptionInterface $e ) {
-                       $type = get_class( $e );
-                       $message = ElasticsearchIntermediary::extractMessage( 
$e );
-                       $this->outputIndented( $messagePrefix . "Error adding 
documents in bulk.  Retrying as singles.  Error type is '$type' and message is: 
 $message" );
-                       foreach ( $documents as $document ) {
-                               // Continue using the bulk api because we're 
used to it.
-                               $updateResult = 
$this->getPageType()->addDocuments( array( $document ) );
                        }
                }
        }
@@ -816,16 +588,14 @@
        }
 
        /**
-        * @return array
+        * @return MappingConfigBuilder
         */
        protected function getMappingConfig() {
-               $builder = new \CirrusSearch\Maintenance\MappingConfigBuilder(
+               return new MappingConfigBuilder(
                        $this->prefixSearchStartsWithAny,
                        $this->phraseSuggestUseText,
                        $this->optimizeIndexForExperimentalHighlighter
                );
-
-               return $builder->buildConfig();
        }
 
        /**
@@ -940,18 +710,6 @@
 
                // Otherwise its just a raw scalar so we should respect that too
                return $wgCirrusSearchReplicas;
-       }
-
-       private function getMaxReplicaCount() {
-               $replica = explode( '-', $this->getReplicaCount() );
-               return $replica[ count( $replica ) - 1 ];
-       }
-
-       private function decideMaxShardsPerNodeForReindex() {
-               $health = $this->getHealth();
-               $totalNodes = $health[ 'number_of_nodes' ];
-               $totalShards = $this->getShardCount() * ( 
$this->getMaxReplicaCount() + 1 );
-               return ceil( 1.0 * $totalShards / $totalNodes );
        }
 
        private function parsePotentialPercent( $str ) {

-- 
To view, visit https://gerrit.wikimedia.org/r/177222
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib7bea664e47f38854bfc1e43b58f2c4731217261
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/CirrusSearch
Gerrit-Branch: master
Gerrit-Owner: Matthias Mullie <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to