DCausse has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/246243

Change subject: Split connection to source and target.
......................................................................

Split connection to source and target.

Add script copySearchIndex.php which allows to copy
an index from one cluster to another.

Bug: T113018
Change-Id: I4c5e9d9787adaaaa7f35e14b1432731bdd615611
(cherry picked from commit 5ce886eb8c21823d01e68d8ebde5e136c0e55607)
---
M CirrusSearch.php
M includes/Job/ElasticaWrite.php
M includes/Maintenance/Maintenance.php
M includes/Maintenance/Reindexer.php
A maintenance/copySearchIndex.php
M maintenance/updateOneSearchIndexConfig.php
6 files changed, 224 insertions(+), 38 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/CirrusSearch 
refs/changes/43/246243/1

diff --git a/CirrusSearch.php b/CirrusSearch.php
index 28beb6b..bdecda5 100644
--- a/CirrusSearch.php
+++ b/CirrusSearch.php
@@ -53,7 +53,8 @@
 // defaults to 9200.
 //
 // All writes will be processed in all configured clusters by the
-// ElasticaWrite job.
+// ElasticaWrite job, unless $wgCirrusSearchWriteClusters is
+// configured (see below).
 //
 // $wgCirrusSearchClusters = array(
 //     'eqiad' => array( 'es01.eqiad.wmnet', 'es02.eqiad.wmnet' ),
@@ -63,6 +64,12 @@
        'default' => array( 'localhost' ),
 );
 
+// List of clusters that can be used for writing. Must be a subset of keys
+// from $wgCirrusSearchClusters.
+// By default or when set to null, all keys of $wgCirrusSearchClusters are
+// available for writing.
+$wgCirrusSearchWriteClusters = null;
+
 // How many times to attempt connecting to a given server
 // If you're behind LVS and everything looks like one server,
 // you may want to reattempt 2 or 3 times.
diff --git a/includes/Job/ElasticaWrite.php b/includes/Job/ElasticaWrite.php
index caaeceb..7fd55d0 100644
--- a/includes/Job/ElasticaWrite.php
+++ b/includes/Job/ElasticaWrite.php
@@ -49,9 +49,16 @@
                        return array( $name => $this->connection );
                }
 
-               $clusters = $config->get( 'CirrusSearchClusters' );
+               if( $config->has( 'CirrusSearchWriteClusters' ) ) {
+                       $clusters = $config->get( 'CirrusSearchWriteClusters' );
+                       if( is_null( $clusters ) ) {
+                               $clusters = array_keys( $config->get( 
'CirrusSearchClusters' ) );
+                       }
+               } else {
+                       $clusters = array_keys( $config->get( 
'CirrusSearchClusters' ) );
+               }
                $connections = array();
-               foreach ( array_keys( $clusters ) as $name ) {
+               foreach ( $clusters as $name ) {
                        $connections[$name] = Connection::getPool( $config, 
$name );
                }
                return $connections;
diff --git a/includes/Maintenance/Maintenance.php 
b/includes/Maintenance/Maintenance.php
index 5658315..67c8ca1 100644
--- a/includes/Maintenance/Maintenance.php
+++ b/includes/Maintenance/Maintenance.php
@@ -38,7 +38,14 @@
                $this->addOption( 'cluster', 'Perform all actions on the 
specified elasticsearch cluster', false, true );
        }
 
-       public function getConnection() {
+       public function getConnection( $cluster = null ) {
+               if( $cluster ) {
+                       $config = 
ConfigFactory::getDefaultInstance()->makeConfig( 'CirrusSearch' );
+                       if (!$config->getElement( 'CirrusSearchClusters', 
$cluster ) ) {
+                               $this->error( 'Unknown cluster.', 1 );
+                       }
+                       return Connection::getPool( $config, $cluster );
+               }
                if ( $this->connection === null ) {
                        $config = 
ConfigFactory::getDefaultInstance()->makeConfig( 'CirrusSearch' );
                        $cluster = $this->decideCluster( $config );
diff --git a/includes/Maintenance/Reindexer.php 
b/includes/Maintenance/Reindexer.php
index c1114bc..124dcb5 100644
--- a/includes/Maintenance/Reindexer.php
+++ b/includes/Maintenance/Reindexer.php
@@ -30,20 +30,29 @@
  * http://www.gnu.org/copyleft/gpl.html
  */
 class Reindexer {
+       /*** "From" portion ***/
+       /**
+        * @var Index
+        */
+       private $oldIndex;
+
+       /**
+        * @var Connection
+        */
+       private $oldConnection;
+
+       /*** "To" portion ***/
+
        /**
         * @var Index
         */
        private $index;
 
        /**
-        * @var \Elastica\Client
+        * @var Connection
         */
-       private $client;
+       private $connection;
 
-       /**
-        * @var string
-        */
-       private $specificIndexName;
 
        /**
         * @var Type[]
@@ -80,10 +89,6 @@
         */
        private $mappingConfig;
 
-       /**
-        * @var \ElasticaConnection
-        */
-       private $connection;
 
        /**
         * @var Maintenance
@@ -91,8 +96,6 @@
        private $out;
 
        /**
-        * @param Index $index
-        * @param \ElasticaConnection $connection
         * @param Type[] $types
         * @param Type[] $oldTypes
         * @param int $shardCount
@@ -102,12 +105,10 @@
         * @param array $mappingConfig
         * @param Maintenance $out
         */
-       public function __construct( Index $index, \ElasticaConnection 
$connection, array $types, array $oldTypes, $shardCount, $replicaCount, 
$connectionTimeout, array $mergeSettings, array $mappingConfig, Maintenance 
$out = null ) {
+       public function __construct( Connection $source, Connection $target, 
array $types, array $oldTypes, $shardCount, $replicaCount, $connectionTimeout, 
array $mergeSettings, array $mappingConfig, Maintenance $out = null ) {
                // @todo: this constructor has too many arguments - refactor!
-               $this->index = $index;
-               $this->client = $this->index->getClient();
-               $this->specificIndexName = $this->index->getName();
-               $this->connection = $connection;
+               $this->oldConnection = $source;
+               $this->connection = $target;
                $this->types = $types;
                $this->oldTypes = $oldTypes;
                $this->shardCount = $shardCount;
@@ -116,6 +117,12 @@
                $this->mergeSettings = $mergeSettings;
                $this->mappingConfig = $mappingConfig;
                $this->out = $out;
+
+               if ( empty($types) || empty($oldTypes) ) {
+                       throw new \Exception( "Types list should be non-empty" 
);
+               }
+               $this->index = $types[0]->getIndex();
+               $this->oldIndex = $oldTypes[0]->getIndex();
        }
 
        /**
@@ -134,6 +141,7 @@
                // optimize after this to consolidate down to a proper number 
of shards but that is
                // is worth the price.  total_shards_per_node will help to make 
sure that each shard
                // has as few neighbors as possible.
+               $this->setConnectionTimeout();
                $settings = $this->index->getSettings();
                $maxShardsPerNode = $this->decideMaxShardsPerNodeForReindex();
                $settings->set( array(
@@ -143,9 +151,6 @@
                        'routing.allocation.total_shards_per_node' => 
$maxShardsPerNode,
                ) );
 
-               $sender = new DataSender( $this->connection );
-               $frozenIndexes = $this->connection->indexToIndexTypes( 
$this->types );
-               $sender->freezeIndexes( $frozenIndexes );
                if ( $processes > 1 ) {
                        if ( !isset( $wgCirrusSearchWikimediaExtraPlugin[ 
'id_hash_mod_filter' ] ) ||
                                        !$wgCirrusSearchWikimediaExtraPlugin[ 
'id_hash_mod_filter' ] ) {
@@ -155,9 +160,7 @@
                        $fork = new ForkController( $processes );
                        $forkResult = $fork->start();
                        // we don't want to share sockets between forks, so 
destroy the client.
-                       $this->connection->destroyClient();
-                       // destroying the client resets the timeout so we have 
to reinstate it.
-                       $this->setConnectionTimeout();
+                       $this->destroyClients();
 
                        switch ( $forkResult ) {
                                case 'child':
@@ -201,7 +204,6 @@
                        'refresh_interval' => $refreshInterval . 's',
                        'merge.policy' => $this->mergeSettings,
                ) );
-               $sender->thawIndexes( $frozenIndexes );
        }
 
        public function optimize() {
@@ -217,8 +219,7 @@
                        if ( $e->getMessage() === 'Operation timed out' ) {
                                $this->output( "Timed out...Continuing any 
way\n" );
                                // To continue without blowing up we need to 
reset the connection.
-                               $this->destroySingleton();
-                               $this->setConnectionTimeout();
+                               $this->destroyClients();
                        } else {
                                throw $e;
                        }
@@ -226,6 +227,10 @@
        }
 
        public function waitForShards() {
+               if( !$this->replicaCount || $this->replicaCount === "false" ) {
+                       $this->outputIndented( "\tNo replicas, skipping.\n" );
+                       return;
+               }
                $this->outputIndented( "\tWaiting for all shards to start...\n" 
);
                list( $lower, $upper ) = explode( '-', $this->replicaCount );
                $each = 0;
@@ -293,7 +298,7 @@
                        $operationStartTime = microtime( true );
                        $completed = 0;
                        $self = $this;
-                       Util::iterateOverScroll( $this->index, 
$result->getResponse()->getScrollId(), '1h',
+                       Util::iterateOverScroll( $this->oldIndex, 
$result->getResponse()->getScrollId(), '1h',
                                function( $results ) use ( $properties, 
$retryAttempts, $messagePrefix, $self, $type,
                                                &$completed, 
$totalDocsToReindex, $operationStartTime ) {
                                        $documents = array();
@@ -345,11 +350,15 @@
                return new Document( $result->getId(), $data );
        }
 
+       /**
+        * Get health information about the index
+        * @return array Response data array
+        */
        private function getHealth() {
                while ( true ) {
-                       $indexName = $this->specificIndexName;
+                       $indexName = $this->index->getName();
                        $path = "_cluster/health/$indexName";
-                       $response = $this->client->request( $path );
+                       $response = $this->index->getClient()->request( $path );
                        if ( $response->hasError() ) {
                                $this->error( 'Error fetching index health but 
going to retry.  Message: ' . $response->getError() );
                                sleep( 1 );
@@ -389,7 +398,6 @@
        /**
         * @param \Exception $e exception caught
         * @param int $errors number of errors
-        * @param Maintenance $out
         * @param string $messagePrefix
         * @param string $description
         */
@@ -403,7 +411,9 @@
        }
 
        /**
-        * This is really private.
+        * Send documents to type with retry.
+        * This is really private, marked as public for closure use.
+        * @access private
         */
        public function sendDocuments( Type $type, $messagePrefix, $documents ) 
{
                try {
@@ -419,12 +429,22 @@
                }
        }
 
+       /**
+        * Reset connection timeouts
+        */
        private function setConnectionTimeout() {
                $this->connection->setTimeout( $this->connectionTimeout );
+               $this->oldConnection->setTimeout( $this->connectionTimeout );
        }
 
-       private function destroySingleton() {
+       /**
+        * Destroy client connections
+        */
+       private function destroyClients() {
                $this->connection->destroyClient();
+               $this->oldConnection->destroyClient();
+               // Destroying connections resets timeouts, so we have to 
reinstate them
+               $this->setConnectionTimeout();
        }
 
        /**
diff --git a/maintenance/copySearchIndex.php b/maintenance/copySearchIndex.php
new file mode 100644
index 0000000..8f118c2
--- /dev/null
+++ b/maintenance/copySearchIndex.php
@@ -0,0 +1,143 @@
+<?php
+
+namespace CirrusSearch\Maintenance;
+
+use CirrusSearch\Connection;
+use CirrusSearch\Util;
+use ConfigFactory;
+use Elastica;
+use CirrusSearch\ClusterSettings;
+
+/**
+ * Copy search index from one cluster to another.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ */
+
+$IP = getenv( 'MW_INSTALL_PATH' );
+if( $IP === false ) {
+       $IP = __DIR__ . '/../../..';
+}
+require_once( "$IP/maintenance/Maintenance.php" );
+require_once( __DIR__ . '/../includes/Maintenance/Maintenance.php' );
+
+/**
+ * Update the elasticsearch configuration for this index.
+ */
+class CopySearchIndex extends Maintenance {
+       private $indexType;
+
+       private $indexBaseName;
+       /**
+        * @var int
+        */
+       protected $refreshInterval;
+
+       public function __construct() {
+               parent::__construct();
+               $this->addDescription( "Copy index from one cluster to 
another.\nThe index name and index type should be the same on both clusters." );
+               $this->addOption( 'indexType', 'Source index.  Either content 
or general.', true, true );
+               $this->addOption( 'targetCluster', 'Target Cluster.', true, 
true );
+               $this->addOption( 'reindexChunkSize', 'Documents per shard to 
reindex in a batch.   ' .
+                   'Note when changing the number of shards that the old shard 
size is used, not the new ' .
+                   'one.  If you see many errors submitting documents in bulk 
but the automatic retry as ' .
+                   'singles works then lower this number.  Defaults to 100.', 
false, true );
+               $this->addOption( 'reindexRetryAttempts', 'Number of times to 
back off and retry ' .
+                       'per failure.  Note that failures are not common but if 
Elasticsearch is in the process ' .
+                       'of moving a shard this can time out.  This will retry 
the attempt after some backoff ' .
+                       'rather than failing the whole reindex process.  
Defaults to 5.', false, true );
+       }
+
+       public function execute() {
+               global $wgCirrusSearchMaintenanceTimeout;
+
+               $this->indexType = $this->getOption( 'indexType' );
+               $this->indexBaseName = $this->getOption( 'baseName', wfWikiId() 
);
+
+               $reindexChunkSize = $this->getOption( 'reindexChunkSize', 100 );
+               $reindexRetryAttempts = $this->getOption( 
'reindexRetryAttempts', 5 );
+               $targetCluster = $this->getOption( 'targetCluster' );
+
+               $sourceConnection = $this->getConnection();
+               $targetConnection = $this->getConnection( $targetCluster );
+
+               if ( $sourceConnection->getClusterName() == 
$targetConnection->getClusterName() ) {
+                       $this->error("Target cluster should be different from 
current cluster.", 1);
+               }
+               $config = ConfigFactory::getDefaultInstance()->makeConfig( 
'CirrusSearch' );
+               $clusterSettings = new ClusterSettings( $config, 
$targetConnection->getClusterName() );
+
+               $targetIndexName = $targetConnection->getIndexName( 
$this->indexBaseName, $this->indexType );
+               $utils = new ConfigUtils( $targetConnection->getClient(), $this 
);
+               $indexIdentifier = $utils->pickIndexIdentifierFromOption( 
$this->getOption( 'indexIdentifier', 'current' ),
+                                $targetIndexName );
+
+               $reindexer = new Reindexer(
+                               $sourceConnection,
+                               $targetConnection,
+                               // Target Index
+                               array( $targetConnection->getIndex( 
$this->indexBaseName, $this->indexType,
+                                               $indexIdentifier )->getType( 
Connection::PAGE_TYPE_NAME )
+                               ),
+                               // Source Index
+                               array( $this->getConnection()->getPageType( 
$this->indexBaseName, $this->indexType ) ),
+                               $clusterSettings->getShardCount( 
$this->indexType ),
+                               $clusterSettings->getReplicaCount( 
$this->indexType ),
+                               $wgCirrusSearchMaintenanceTimeout,
+                               $this->getMergeSettings(),
+                               $this->getMappingConfig(),
+                               $this
+               );
+               $reindexer->reindex( 1, 1, $reindexRetryAttempts, 
$reindexChunkSize);
+               $reindexer->optimize();
+               $reindexer->waitForShards();
+       }
+
+       /**
+        * Get the merge settings for this index.
+        */
+       private function getMergeSettings() {
+               global $wgCirrusSearchMergeSettings;
+
+               if ( isset( $wgCirrusSearchMergeSettings[ $this->indexType ] ) 
) {
+                       return $wgCirrusSearchMergeSettings[ $this->indexType ];
+               }
+               // If there aren't configured merge settings for this index 
type default to the content type.
+               return $wgCirrusSearchMergeSettings[ 'content' ];
+       }
+
+       /**
+        * @return array
+        */
+       protected function getMappingConfig() {
+               global $wgCirrusSearchPrefixSearchStartsWithAnyWord, 
$wgCirrusSearchPhraseSuggestUseText,
+               $wgCirrusSearchOptimizeIndexForExperimentalHighlighter;
+
+               $builder = new MappingConfigBuilder( 
$wgCirrusSearchOptimizeIndexForExperimentalHighlighter );
+               $configFlags = 0;
+               if ( $wgCirrusSearchPrefixSearchStartsWithAnyWord ) {
+                       $configFlags |= 
MappingConfigBuilder::PREFIX_START_WITH_ANY;
+               }
+               if ( $wgCirrusSearchPhraseSuggestUseText ) {
+                       $configFlags |= 
MappingConfigBuilder::PHRASE_SUGGEST_USE_TEXT;
+               }
+               return $builder->buildConfig( $configFlags );
+       }
+
+}
+
+$maintClass = 'CirrusSearch\Maintenance\CopySearchIndex';
+require_once RUN_MAINTENANCE_IF_MAIN;
diff --git a/maintenance/updateOneSearchIndexConfig.php 
b/maintenance/updateOneSearchIndexConfig.php
index eab8ea5..0b476dd 100644
--- a/maintenance/updateOneSearchIndexConfig.php
+++ b/maintenance/updateOneSearchIndexConfig.php
@@ -351,9 +351,11 @@
        private function validateSpecificAlias() {
                global $wgCirrusSearchMaintenanceTimeout;
 
+               $connection = $this->getConnection();
+
                $reindexer = new Reindexer(
-                       $this->getIndex(),
-                       $this->getConnection(),
+                       $connection,
+                       $connection,
                        array( $this->getPageType() ),
                        array( $this->getOldPageType() ),
                        $this->getShardCount(),

-- 
To view, visit https://gerrit.wikimedia.org/r/246243
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4c5e9d9787adaaaa7f35e14b1432731bdd615611
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/CirrusSearch
Gerrit-Branch: wmf/1.27.0-wmf.3
Gerrit-Owner: DCausse <[email protected]>
Gerrit-Reviewer: Smalyshev <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to