EBernhardson has uploaded a new change for review.
https://gerrit.wikimedia.org/r/247379
Change subject: Split connection to source and target.
......................................................................
Split connection to source and target.
Add script copySearchIndex.php which allows to copy
an index from one cluster to another.
Bug: T113018
Change-Id: I4c5e9d9787adaaaa7f35e14b1432731bdd615611
(cherry picked from commit 5ce886eb8c21823d01e68d8ebde5e136c0e55607)
---
M CirrusSearch.php
M includes/Job/ElasticaWrite.php
M includes/Maintenance/Maintenance.php
M includes/Maintenance/Reindexer.php
A maintenance/copySearchIndex.php
M maintenance/updateOneSearchIndexConfig.php
6 files changed, 224 insertions(+), 38 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/CirrusSearch
refs/changes/79/247379/1
diff --git a/CirrusSearch.php b/CirrusSearch.php
index 2f6ce92..2b037d5 100644
--- a/CirrusSearch.php
+++ b/CirrusSearch.php
@@ -53,7 +53,8 @@
// defaults to 9200.
//
// All writes will be processed in all configured clusters by the
-// ElasticaWrite job.
+// ElasticaWrite job, unless $wgCirrusSearchWriteClusters is
+// configured (see below).
//
// $wgCirrusSearchClusters = array(
// 'eqiad' => array( 'es01.eqiad.wmnet', 'es02.eqiad.wmnet' ),
@@ -63,6 +64,12 @@
'default' => array( 'localhost' ),
);
+// List of clusters that can be used for writing. Must be a subset of keys
+// from $wgCirrusSearchClusters.
+// By default or when set to null, all keys of $wgCirrusSearchClusters are
+// available for writing.
+$wgCirrusSearchWriteClusters = null;
+
// How many times to attempt connecting to a given server
// If you're behind LVS and everything looks like one server,
// you may want to reattempt 2 or 3 times.
diff --git a/includes/Job/ElasticaWrite.php b/includes/Job/ElasticaWrite.php
index b3f034f..f10c4cc 100644
--- a/includes/Job/ElasticaWrite.php
+++ b/includes/Job/ElasticaWrite.php
@@ -66,9 +66,16 @@
return array( $name => $this->connection );
}
- $clusters = $config->get( 'CirrusSearchClusters' );
+ if( $config->has( 'CirrusSearchWriteClusters' ) ) {
+ $clusters = $config->get( 'CirrusSearchWriteClusters' );
+ if( is_null( $clusters ) ) {
+ $clusters = array_keys( $config->get(
'CirrusSearchClusters' ) );
+ }
+ } else {
+ $clusters = array_keys( $config->get(
'CirrusSearchClusters' ) );
+ }
$connections = array();
- foreach ( array_keys( $clusters ) as $name ) {
+ foreach ( $clusters as $name ) {
$connections[$name] = Connection::getPool( $config,
$name );
}
return $connections;
diff --git a/includes/Maintenance/Maintenance.php
b/includes/Maintenance/Maintenance.php
index 5658315..67c8ca1 100644
--- a/includes/Maintenance/Maintenance.php
+++ b/includes/Maintenance/Maintenance.php
@@ -38,7 +38,14 @@
$this->addOption( 'cluster', 'Perform all actions on the
specified elasticsearch cluster', false, true );
}
- public function getConnection() {
+ public function getConnection( $cluster = null ) {
+ if( $cluster ) {
+ $config =
ConfigFactory::getDefaultInstance()->makeConfig( 'CirrusSearch' );
+ if (!$config->getElement( 'CirrusSearchClusters',
$cluster ) ) {
+ $this->error( 'Unknown cluster.', 1 );
+ }
+ return Connection::getPool( $config, $cluster );
+ }
if ( $this->connection === null ) {
$config =
ConfigFactory::getDefaultInstance()->makeConfig( 'CirrusSearch' );
$cluster = $this->decideCluster( $config );
diff --git a/includes/Maintenance/Reindexer.php
b/includes/Maintenance/Reindexer.php
index 899446d..124dcb5 100644
--- a/includes/Maintenance/Reindexer.php
+++ b/includes/Maintenance/Reindexer.php
@@ -30,20 +30,29 @@
* http://www.gnu.org/copyleft/gpl.html
*/
class Reindexer {
+ /*** "From" portion ***/
+ /**
+ * @var Index
+ */
+ private $oldIndex;
+
+ /**
+ * @var Connection
+ */
+ private $oldConnection;
+
+ /*** "To" portion ***/
+
/**
* @var Index
*/
private $index;
/**
- * @var \Elastica\Client
+ * @var Connection
*/
- private $client;
+ private $connection;
- /**
- * @var string
- */
- private $specificIndexName;
/**
* @var Type[]
@@ -80,10 +89,6 @@
*/
private $mappingConfig;
- /**
- * @var \ElasticaConnection
- */
- private $connection;
/**
* @var Maintenance
@@ -91,8 +96,6 @@
private $out;
/**
- * @param Index $index
- * @param \ElasticaConnection $connection
* @param Type[] $types
* @param Type[] $oldTypes
* @param int $shardCount
@@ -102,12 +105,10 @@
* @param array $mappingConfig
* @param Maintenance $out
*/
- public function __construct( Index $index, \ElasticaConnection
$connection, array $types, array $oldTypes, $shardCount, $replicaCount,
$connectionTimeout, array $mergeSettings, array $mappingConfig, Maintenance
$out = null ) {
+ public function __construct( Connection $source, Connection $target,
array $types, array $oldTypes, $shardCount, $replicaCount, $connectionTimeout,
array $mergeSettings, array $mappingConfig, Maintenance $out = null ) {
// @todo: this constructor has too many arguments - refactor!
- $this->index = $index;
- $this->client = $this->index->getClient();
- $this->specificIndexName = $this->index->getName();
- $this->connection = $connection;
+ $this->oldConnection = $source;
+ $this->connection = $target;
$this->types = $types;
$this->oldTypes = $oldTypes;
$this->shardCount = $shardCount;
@@ -116,6 +117,12 @@
$this->mergeSettings = $mergeSettings;
$this->mappingConfig = $mappingConfig;
$this->out = $out;
+
+ if ( empty($types) || empty($oldTypes) ) {
+ throw new \Exception( "Types list should be non-empty"
);
+ }
+ $this->index = $types[0]->getIndex();
+ $this->oldIndex = $oldTypes[0]->getIndex();
}
/**
@@ -134,6 +141,7 @@
// optimize after this to consolidate down to a proper number
of shards but that is
// is worth the price. total_shards_per_node will help to make
sure that each shard
// has as few neighbors as possible.
+ $this->setConnectionTimeout();
$settings = $this->index->getSettings();
$maxShardsPerNode = $this->decideMaxShardsPerNodeForReindex();
$settings->set( array(
@@ -143,9 +151,6 @@
'routing.allocation.total_shards_per_node' =>
$maxShardsPerNode,
) );
- $sender = new DataSender( $this->connection );
- $frozenIndexes = $conn->indexToIndexTypes( $this->types );
- $sender->freezeIndexes( $frozenIndexes );
if ( $processes > 1 ) {
if ( !isset( $wgCirrusSearchWikimediaExtraPlugin[
'id_hash_mod_filter' ] ) ||
!$wgCirrusSearchWikimediaExtraPlugin[
'id_hash_mod_filter' ] ) {
@@ -155,9 +160,7 @@
$fork = new ForkController( $processes );
$forkResult = $fork->start();
// we don't want to share sockets between forks, so
destroy the client.
- $this->connection->destroyClient();
- // destroying the client resets the timeout so we have
to reinstate it.
- $this->setConnectionTimeout();
+ $this->destroyClients();
switch ( $forkResult ) {
case 'child':
@@ -201,7 +204,6 @@
'refresh_interval' => $refreshInterval . 's',
'merge.policy' => $this->mergeSettings,
) );
- $sender->thawIndexes( $frozenIndexes );
}
public function optimize() {
@@ -217,8 +219,7 @@
if ( $e->getMessage() === 'Operation timed out' ) {
$this->output( "Timed out...Continuing any
way\n" );
// To continue without blowing up we need to
reset the connection.
- $this->destroySingleton();
- $this->setConnectionTimeout();
+ $this->destroyClients();
} else {
throw $e;
}
@@ -226,6 +227,10 @@
}
public function waitForShards() {
+ if( !$this->replicaCount || $this->replicaCount === "false" ) {
+ $this->outputIndented( "\tNo replicas, skipping.\n" );
+ return;
+ }
$this->outputIndented( "\tWaiting for all shards to start...\n"
);
list( $lower, $upper ) = explode( '-', $this->replicaCount );
$each = 0;
@@ -293,7 +298,7 @@
$operationStartTime = microtime( true );
$completed = 0;
$self = $this;
- Util::iterateOverScroll( $this->index,
$result->getResponse()->getScrollId(), '1h',
+ Util::iterateOverScroll( $this->oldIndex,
$result->getResponse()->getScrollId(), '1h',
function( $results ) use ( $properties,
$retryAttempts, $messagePrefix, $self, $type,
&$completed,
$totalDocsToReindex, $operationStartTime ) {
$documents = array();
@@ -345,11 +350,15 @@
return new Document( $result->getId(), $data );
}
+ /**
+ * Get health information about the index
+ * @return array Response data array
+ */
private function getHealth() {
while ( true ) {
- $indexName = $this->specificIndexName;
+ $indexName = $this->index->getName();
$path = "_cluster/health/$indexName";
- $response = $this->client->request( $path );
+ $response = $this->index->getClient()->request( $path );
if ( $response->hasError() ) {
$this->error( 'Error fetching index health but
going to retry. Message: ' . $response->getError() );
sleep( 1 );
@@ -389,7 +398,6 @@
/**
* @param \Exception $e exception caught
* @param int $errors number of errors
- * @param Maintenance $out
* @param string $messagePrefix
* @param string $description
*/
@@ -403,7 +411,9 @@
}
/**
- * This is really private.
+ * Send documents to type with retry.
+ * This is really private, marked as public for closure use.
+ * @access private
*/
public function sendDocuments( Type $type, $messagePrefix, $documents )
{
try {
@@ -419,12 +429,22 @@
}
}
+ /**
+ * Reset connection timeouts
+ */
private function setConnectionTimeout() {
$this->connection->setTimeout( $this->connectionTimeout );
+ $this->oldConnection->setTimeout( $this->connectionTimeout );
}
- private function destroySingleton() {
+ /**
+ * Destroy client connections
+ */
+ private function destroyClients() {
$this->connection->destroyClient();
+ $this->oldConnection->destroyClient();
+ // Destroying connections resets timeouts, so we have to
reinstate them
+ $this->setConnectionTimeout();
}
/**
diff --git a/maintenance/copySearchIndex.php b/maintenance/copySearchIndex.php
new file mode 100644
index 0000000..8f118c2
--- /dev/null
+++ b/maintenance/copySearchIndex.php
@@ -0,0 +1,143 @@
+<?php
+
+namespace CirrusSearch\Maintenance;
+
+use CirrusSearch\Connection;
+use CirrusSearch\Util;
+use ConfigFactory;
+use Elastica;
+use CirrusSearch\ClusterSettings;
+
+/**
+ * Copy search index from one cluster to another.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ */
+
+$IP = getenv( 'MW_INSTALL_PATH' );
+if( $IP === false ) {
+ $IP = __DIR__ . '/../../..';
+}
+require_once( "$IP/maintenance/Maintenance.php" );
+require_once( __DIR__ . '/../includes/Maintenance/Maintenance.php' );
+
+/**
+ * Update the elasticsearch configuration for this index.
+ */
+class CopySearchIndex extends Maintenance {
+ private $indexType;
+
+ private $indexBaseName;
+ /**
+ * @var int
+ */
+ protected $refreshInterval;
+
+ public function __construct() {
+ parent::__construct();
+ $this->addDescription( "Copy index from one cluster to
another.\nThe index name and index type should be the same on both clusters." );
+ $this->addOption( 'indexType', 'Source index. Either content
or general.', true, true );
+ $this->addOption( 'targetCluster', 'Target Cluster.', true,
true );
+ $this->addOption( 'reindexChunkSize', 'Documents per shard to
reindex in a batch. ' .
+ 'Note when changing the number of shards that the old shard
size is used, not the new ' .
+ 'one. If you see many errors submitting documents in bulk
but the automatic retry as ' .
+ 'singles works then lower this number. Defaults to 100.',
false, true );
+ $this->addOption( 'reindexRetryAttempts', 'Number of times to
back off and retry ' .
+ 'per failure. Note that failures are not common but if
Elasticsearch is in the process ' .
+ 'of moving a shard this can time out. This will retry
the attempt after some backoff ' .
+ 'rather than failing the whole reindex process.
Defaults to 5.', false, true );
+ }
+
+ public function execute() {
+ global $wgCirrusSearchMaintenanceTimeout;
+
+ $this->indexType = $this->getOption( 'indexType' );
+ $this->indexBaseName = $this->getOption( 'baseName', wfWikiId()
);
+
+ $reindexChunkSize = $this->getOption( 'reindexChunkSize', 100 );
+ $reindexRetryAttempts = $this->getOption(
'reindexRetryAttempts', 5 );
+ $targetCluster = $this->getOption( 'targetCluster' );
+
+ $sourceConnection = $this->getConnection();
+ $targetConnection = $this->getConnection( $targetCluster );
+
+ if ( $sourceConnection->getClusterName() ==
$targetConnection->getClusterName() ) {
+ $this->error("Target cluster should be different from
current cluster.", 1);
+ }
+ $config = ConfigFactory::getDefaultInstance()->makeConfig(
'CirrusSearch' );
+ $clusterSettings = new ClusterSettings( $config,
$targetConnection->getClusterName() );
+
+ $targetIndexName = $targetConnection->getIndexName(
$this->indexBaseName, $this->indexType );
+ $utils = new ConfigUtils( $targetConnection->getClient(), $this
);
+ $indexIdentifier = $utils->pickIndexIdentifierFromOption(
$this->getOption( 'indexIdentifier', 'current' ),
+ $targetIndexName );
+
+ $reindexer = new Reindexer(
+ $sourceConnection,
+ $targetConnection,
+ // Target Index
+ array( $targetConnection->getIndex(
$this->indexBaseName, $this->indexType,
+ $indexIdentifier )->getType(
Connection::PAGE_TYPE_NAME )
+ ),
+ // Source Index
+ array( $this->getConnection()->getPageType(
$this->indexBaseName, $this->indexType ) ),
+ $clusterSettings->getShardCount(
$this->indexType ),
+ $clusterSettings->getReplicaCount(
$this->indexType ),
+ $wgCirrusSearchMaintenanceTimeout,
+ $this->getMergeSettings(),
+ $this->getMappingConfig(),
+ $this
+ );
+ $reindexer->reindex( 1, 1, $reindexRetryAttempts,
$reindexChunkSize);
+ $reindexer->optimize();
+ $reindexer->waitForShards();
+ }
+
+ /**
+ * Get the merge settings for this index.
+ */
+ private function getMergeSettings() {
+ global $wgCirrusSearchMergeSettings;
+
+ if ( isset( $wgCirrusSearchMergeSettings[ $this->indexType ] )
) {
+ return $wgCirrusSearchMergeSettings[ $this->indexType ];
+ }
+ // If there aren't configured merge settings for this index
type default to the content type.
+ return $wgCirrusSearchMergeSettings[ 'content' ];
+ }
+
+ /**
+ * @return array
+ */
+ protected function getMappingConfig() {
+ global $wgCirrusSearchPrefixSearchStartsWithAnyWord,
$wgCirrusSearchPhraseSuggestUseText,
+ $wgCirrusSearchOptimizeIndexForExperimentalHighlighter;
+
+ $builder = new MappingConfigBuilder(
$wgCirrusSearchOptimizeIndexForExperimentalHighlighter );
+ $configFlags = 0;
+ if ( $wgCirrusSearchPrefixSearchStartsWithAnyWord ) {
+ $configFlags |=
MappingConfigBuilder::PREFIX_START_WITH_ANY;
+ }
+ if ( $wgCirrusSearchPhraseSuggestUseText ) {
+ $configFlags |=
MappingConfigBuilder::PHRASE_SUGGEST_USE_TEXT;
+ }
+ return $builder->buildConfig( $configFlags );
+ }
+
+}
+
+$maintClass = 'CirrusSearch\Maintenance\CopySearchIndex';
+require_once RUN_MAINTENANCE_IF_MAIN;
diff --git a/maintenance/updateOneSearchIndexConfig.php
b/maintenance/updateOneSearchIndexConfig.php
index eab8ea5..0b476dd 100644
--- a/maintenance/updateOneSearchIndexConfig.php
+++ b/maintenance/updateOneSearchIndexConfig.php
@@ -351,9 +351,11 @@
private function validateSpecificAlias() {
global $wgCirrusSearchMaintenanceTimeout;
+ $connection = $this->getConnection();
+
$reindexer = new Reindexer(
- $this->getIndex(),
- $this->getConnection(),
+ $connection,
+ $connection,
array( $this->getPageType() ),
array( $this->getOldPageType() ),
$this->getShardCount(),
--
To view, visit https://gerrit.wikimedia.org/r/247379
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4c5e9d9787adaaaa7f35e14b1432731bdd615611
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/CirrusSearch
Gerrit-Branch: wmf/1.27.0-wmf.2
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: Smalyshev <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits