Aaron Schulz has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/394430 )

Change subject: [WIP] Rewrite LoadMonitor and related LoadBalancer logic
......................................................................

[WIP] Rewrite LoadMonitor and related LoadBalancer logic

* Use WANObjectCache::getWithSetCallback()
* Increase the server state polling interval but add a
  mutex on shared cache updates via "lockTSE" as a throttle
* Increase the moving average factor to make the weight
  scale values more responsive to problems
* Account for DBError exceptions from getLag()
* Add pingUsageFailure() method to quickly react to failing
  servers rather than just waiting on the server state polls
* Add backtraces to replication wait timeouts

Bug: T180918
Change-Id: I8b1c7040051d9761516a1e7c4eecd4d4616f252c
---
M includes/libs/rdbms/database/Database.php
M includes/libs/rdbms/database/IDatabase.php
M includes/libs/rdbms/loadbalancer/LoadBalancer.php
M includes/libs/rdbms/loadmonitor/ILoadMonitor.php
M includes/libs/rdbms/loadmonitor/LoadMonitor.php
M includes/libs/rdbms/loadmonitor/LoadMonitorMySQL.php
6 files changed, 369 insertions(+), 92 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core 
refs/changes/30/394430/1

diff --git a/includes/libs/rdbms/database/Database.php 
b/includes/libs/rdbms/database/Database.php
index e04566e..1d11f4d 100644
--- a/includes/libs/rdbms/database/Database.php
+++ b/includes/libs/rdbms/database/Database.php
@@ -3278,6 +3278,7 @@
         *   - lag: highest lag of any of the DBs or false on error (e.g. 
replication stopped)
         *   - since: oldest UNIX timestamp of any of the DB lag estimates
         *   - pending: whether any of the DBs have uncommitted changes
+        * @throws DBError
         * @since 1.27
         */
        public static function getCacheSetOptions( IDatabase $db1 ) {
diff --git a/includes/libs/rdbms/database/IDatabase.php 
b/includes/libs/rdbms/database/IDatabase.php
index bbf88dc..e69a7d7 100644
--- a/includes/libs/rdbms/database/IDatabase.php
+++ b/includes/libs/rdbms/database/IDatabase.php
@@ -1690,6 +1690,7 @@
         * instead.
         *
         * @return int|bool Database replication lag in seconds or false on 
error
+        * @throws DBError
         */
        public function getLag();
 
@@ -1704,6 +1705,7 @@
         * indication of the staleness of subsequent reads.
         *
         * @return array ('lag': seconds or false on error, 'since': UNIX 
timestamp of BEGIN)
+        * @throws DBError
         * @since 1.27
         */
        public function getSessionLagStatus();
diff --git a/includes/libs/rdbms/loadbalancer/LoadBalancer.php 
b/includes/libs/rdbms/loadbalancer/LoadBalancer.php
index a67e6e9..65c0494 100644
--- a/includes/libs/rdbms/loadbalancer/LoadBalancer.php
+++ b/includes/libs/rdbms/loadbalancer/LoadBalancer.php
@@ -123,6 +123,10 @@
 
        /** @var int Default 'maxLag' when unspecified */
        const MAX_LAG_DEFAULT = 10;
+       /** @var int Default 'waitTimeout' when unspecified */
+       const MAX_WAIT_DEFAULT = 10;
+       /** @var float Avoid masterPosWait() checks on a server if they fail at 
this rate */
+       const MAX_WAIT_FAILURE_RATE = 10;
        /** @var int Seconds to cache master server read-only status */
        const TTL_CACHE_READONLY = 5;
 
@@ -159,7 +163,9 @@
                        $this->localDomainIdAlias = 
$this->localDomain->getDatabase();
                }
 
-               $this->mWaitTimeout = isset( $params['waitTimeout'] ) ? 
$params['waitTimeout'] : 10;
+               $this->mWaitTimeout = isset( $params['waitTimeout'] )
+                       ? $params['waitTimeout']
+                       : self::MAX_WAIT_DEFAULT;
 
                $this->mReadIndex = -1;
                $this->mConns = [
@@ -346,7 +352,8 @@
                        $loads = $this->mLoads;
                }
 
-               // Scale the configured load ratios according to each server's 
load and state
+               // Scale the configured load ratios according to each server's 
load and state.
+               // This reduces traffic to servers that are unavailable.
                $this->getLoadMonitor()->scaleLoads( $loads, $domain );
 
                // Pick a server to use, accounting for weights, load, lag, and 
mWaitForPos
@@ -363,6 +370,8 @@
                        // so update laggedReplicaMode as needed for 
consistency.
                        if ( !$this->doWait( $i ) ) {
                                $laggedReplicaMode = true;
+                               // Let LoadMonitor know that this waited a full 
default timeout wait failed
+                               $this->loadMonitor->pingFailure( $i, 
self::DOMAIN_ANY, LoadMonitor::TYPE_POS_SYNC );
                        }
                }
 
@@ -382,7 +391,7 @@
        }
 
        /**
-        * @param array $loads List of server weights
+        * @param array $loads List of server weights already scaled by 
scaleLoads()
         * @param string|bool $domain
         * @return array (reader index, lagged replica mode) or false on failure
         */
@@ -418,6 +427,7 @@
                                if ( $i === false && count( $currentLoads ) != 
0 ) {
                                        // All replica DBs lagged. Switch to 
read-only mode
                                        $this->replLogger->error( "All replica 
DBs lagged. Switch to read-only mode" );
+                                       // Pick a server based only on the 
scaled weights provided
                                        $i = ArrayUtils::pickRandom( 
$currentLoads );
                                        $laggedReplicaMode = true;
                                }
@@ -580,6 +590,16 @@
                        return true;
                }
 
+               // Short-circuit if this is likely to just fail and waste time
+               $failureRate = $this->loadMonitor->getSyncFailureRate( $index, 
self::DOMAIN_ANY );
+               if ( $failureRate > self::MAX_WAIT_FAILURE_RATE ) {
+                       $this->replLogger->warning( __METHOD__ .
+                               ': To many failures to wait for replica DB 
{dbserver} to catch up...',
+                               [ 'dbserver' => $server ] );
+
+                       return false;
+               }
+
                // Find a connection to wait on, creating one if needed and 
allowed
                $conn = $this->getAnyOpenConnection( $index );
                if ( !$conn ) {
@@ -602,16 +622,31 @@
                        }
                }
 
-               $this->replLogger->info( __METHOD__ . ': Waiting for replica DB 
{dbserver} to catch up...',
+               $this->replLogger->info( __METHOD__ .
+                       ': Waiting for replica DB {dbserver} to catch up...',
                        [ 'dbserver' => $server ] );
+
                $timeout = $timeout ?: $this->mWaitTimeout;
                $result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
 
-               if ( $result == -1 || is_null( $result ) ) {
-                       // Timed out waiting for replica DB, use master instead
+               if ( $result === null ) {
+                       $this->replLogger->warning(
+                               __METHOD__ . ': Errored out waiting on {host} 
pos {pos}',
+                               [
+                                       'host' => $server,
+                                       'pos' => $this->mWaitForPos,
+                                       'trace' => ( new RuntimeException() 
)->getTraceAsString()
+                               ]
+                       );
+                       $ok = false;
+               } elseif ( $result == -1 ) {
                        $this->replLogger->warning(
                                __METHOD__ . ': Timed out waiting on {host} pos 
{pos}',
-                               [ 'host' => $server, 'pos' => 
$this->mWaitForPos ]
+                               [
+                                       'host' => $server,
+                                       'pos' => $this->mWaitForPos,
+                                       'trace' => ( new RuntimeException() 
)->getTraceAsString()
+                               ]
                        );
                        $ok = false;
                } else {
@@ -991,6 +1026,8 @@
                // application calls LoadBalancer::commitMasterChanges() before 
the PHP script completes.
                $server['flags'] = isset( $server['flags'] ) ? $server['flags'] 
: IDatabase::DBO_DEFAULT;
 
+               $i = $server['serverIndex'];
+
                // Create a live connection object
                try {
                        $db = Database::factory( $server['type'], $server );
@@ -998,6 +1035,8 @@
                        // FIXME: This is probably the ugliest thing I have 
ever done to
                        // PHP. I'm half-expecting it to segfault, just out of 
disgust. -- TS
                        $db = $e->db;
+                       // Make LoadMoniter aware of each time a connection 
attempt failed
+                       $this->loadMonitor->pingFailure( $i, self::DOMAIN_ANY, 
LoadMonitor::TYPE_CONNECTION );
                }
 
                $db->setLBInfo( $server );
@@ -1006,7 +1045,7 @@
                );
                $db->setTableAliases( $this->tableAliases );
 
-               if ( $server['serverIndex'] === $this->getWriterIndex() ) {
+               if ( $i === $this->getWriterIndex() ) {
                        if ( $this->trxRoundId !== false ) {
                                $this->applyTransactionRoundFlags( $db );
                        }
@@ -1648,8 +1687,11 @@
                        $result = $conn->masterPosWait( $pos, $timeout );
                        if ( $result == -1 || is_null( $result ) ) {
                                $msg = __METHOD__ . ': Timed out waiting on 
{host} pos {pos}';
-                               $this->replLogger->warning( $msg,
-                                       [ 'host' => $conn->getServer(), 'pos' 
=> $pos ] );
+                               $this->replLogger->warning( $msg, [
+                                       'host' => $conn->getServer(),
+                                       'pos' => $pos,
+                                       'trace' => ( new RuntimeException() 
)->getTraceAsString()
+                               ] );
                                $ok = false;
                        } else {
                                $this->replLogger->info( __METHOD__ . ': Done' 
);
diff --git a/includes/libs/rdbms/loadmonitor/ILoadMonitor.php 
b/includes/libs/rdbms/loadmonitor/ILoadMonitor.php
index a0877a4..2e119d3 100644
--- a/includes/libs/rdbms/loadmonitor/ILoadMonitor.php
+++ b/includes/libs/rdbms/loadmonitor/ILoadMonitor.php
@@ -33,6 +33,10 @@
  * @ingroup Database
  */
 interface ILoadMonitor extends LoggerAwareInterface {
+       /** @var int A connection attempt */
+       const TYPE_CONNECTION = 1;
+       const TYPE_POS_SYNC = 2;
+
        /**
         * Construct a new LoadMonitor with a given LoadBalancer parent
         *
@@ -59,8 +63,26 @@
         * Values may be "false" if replication is too broken to estimate
         *
         * @param int[] $serverIndexes
-        * @param string $domain
+        * @param string|bool $domain
         * @return array Map of (server index => float|int|bool)
         */
        public function getLagTimes( array $serverIndexes, $domain );
+
+       /**
+        * Inform the monitor that basic usage of server failed (error or 
timeout)
+        *
+        * @param int $serverIndex
+        * @param string|bool $domain
+        * @param integer $type ILoadMonitor::TYPE_* constant
+        */
+       public function pingFailure( $serverIndex, $domain, $type );
+
+       /**
+        * Get the failure rate of replication sync attempts
+        *
+        * @param $serverIndex
+        * @param $domain
+        * @return mixed
+        */
+       public function getSyncFailureRate( $serverIndex, $domain );
 }
diff --git a/includes/libs/rdbms/loadmonitor/LoadMonitor.php 
b/includes/libs/rdbms/loadmonitor/LoadMonitor.php
index 74c7765..0aecccd 100644
--- a/includes/libs/rdbms/loadmonitor/LoadMonitor.php
+++ b/includes/libs/rdbms/loadmonitor/LoadMonitor.php
@@ -29,7 +29,11 @@
 
 /**
  * Basic DB load monitor with no external dependencies
- * Uses memcached to cache the replication lag for a short time
+ *
+ * Uses local and shared caches for server state information
+ *
+ * Note that the "domain" parameters are unused, though may be at a later data.
+ * At present, this assumes one channel of replication per server.
  *
  * @ingroup Database
  */
@@ -44,14 +48,26 @@
        protected $replLogger;
 
        /** @var float Moving average ratio (e.g. 0.1 for 10% weight to new 
weight) */
-       private $movingAveRatio;
+       private $movingAveRatio = .3;
+       /** @var float Moving average ratio for reported failures */
+       private $movingAveRatioConnFail = .2;
+       /** @var float Moving average ratio for reported failures */
+       private $movingAveRatioSyncFail = .1;
        /** @var int Amount of replication lag in seconds before warnings are 
logged */
        private $lagWarnThreshold;
 
-       /** @var int cache key version */
-       const VERSION = 1;
+       /** @var int Avoid failure ping cache updates while cache updates are 
running */
+       private $recursionGuard = 0;
+
        /** @var int Default 'max lag' in seconds when unspecified */
        const LAG_WARN_THRESHOLD = 10;
+
+       /** @var int cache key version */
+       const VERSION = 1;
+       /** @var int Maximum effective logical TTL for server state cache */
+       const POLL_PERIOD_MS = 500;
+       /** @var int How long to cache server states including time past 
logical expiration */
+       const STATE_PRESERVE_TTL = 60;
 
        /**
         * @param ILoadBalancer $lb
@@ -59,6 +75,8 @@
         * @param WANObjectCache $wCache
         * @param array $options
         *   - movingAveRatio: moving average constant for server weight 
updates based on lag
+        *   - movingAveRatioConnFail: moving average constant for reported 
connection failures
+        *   - movingAveRatioSyncFail: moving average constant for reported 
query failures
         *   - lagWarnThreshold: how many seconds of lag trigger warnings
         */
        public function __construct(
@@ -69,9 +87,13 @@
                $this->wanCache = $wCache;
                $this->replLogger = new NullLogger();
 
-               $this->movingAveRatio = isset( $options['movingAveRatio'] )
-                       ? $options['movingAveRatio']
-                       : 0.1;
+               $fields = [ 'movingAveRatio', 'movingAveRatioConnFail', 
'movingAveRatioSyncFail' ];
+               foreach ( $fields as $field ) {
+                       if ( isset( $options[$field] ) ) {
+                               $this->$field = $options[$field];
+                       }
+               }
+
                $this->lagWarnThreshold = isset( $options['lagWarnThreshold'] )
                        ? $options['lagWarnThreshold']
                        : self::LAG_WARN_THRESHOLD;
@@ -81,13 +103,13 @@
                $this->replLogger = $logger;
        }
 
-       public function scaleLoads( array &$weightByServer, $domain ) {
+       final public function scaleLoads( array &$weightByServer, $domain ) {
                $serverIndexes = array_keys( $weightByServer );
                $states = $this->getServerStates( $serverIndexes, $domain );
-               $coefficientsByServer = $states['weightScales'];
+               $newScalesByServer = $states['weightScales'];
                foreach ( $weightByServer as $i => $weight ) {
-                       if ( isset( $coefficientsByServer[$i] ) ) {
-                               $weightByServer[$i] = $weight * 
$coefficientsByServer[$i];
+                       if ( isset( $newScalesByServer[$i] ) ) {
+                               $weightByServer[$i] = $weight * 
$newScalesByServer[$i];
                        } else { // server recently added to config?
                                $host = $this->parent->getServerName( $i );
                                $this->replLogger->error( __METHOD__ . ": host 
$host not in cache" );
@@ -95,88 +117,220 @@
                }
        }
 
-       public function getLagTimes( array $serverIndexes, $domain ) {
-               $states = $this->getServerStates( $serverIndexes, $domain );
-
-               return $states['lagTimes'];
+       final public function getLagTimes( array $serverIndexes, $domain ) {
+               return $this->getServerStates( $serverIndexes, $domain 
)['lagTimes'];
        }
 
+       final public function pingFailure( $serverIndex, $domain, $type ) {
+               if ( $this->recursionGuard ) {
+                       return; // getServerStates() is already updating the 
cache
+               } elseif ( $serverIndex == $this->parent->getWriterIndex() ) {
+                       return; // not a replica DB
+               }
+
+               if ( $type === self::TYPE_CONNECTION ) {
+                       $this->pingConnFailure( $serverIndex, $domain );
+               } elseif ( $type === self::TYPE_POS_SYNC ) {
+                       $this->pingSyncFailure( $serverIndex, $domain );
+               } else {
+                       throw new \UnexpectedValueException( __METHOD__  . ': 
got bad failure type' );
+               }
+       }
+
+       final public function getSyncFailureRate( $serverIndex, $domain ) {
+               $state = $this->wanCache->get( $this->getFailureKey( 
$this->wanCache, $serverIndex ) );
+
+               return $state ? $state['failureRate'] : 0.0;
+       }
+
+       /**
+        * @param int $serverIndex
+        * @param string|bool $domain
+        */
+       protected function pingConnFailure( $serverIndex, $domain ) {
+               // Server index list is part of the cache key so keep that 
consistent for all uses
+               $serverIndexes = range( 0, $this->parent->getServerCount() - 1 
);
+               // Update the cache to lower the weight scale value for this 
server
+               $this->wanCache->getWithSetCallback(
+                       $this->getStatesCacheKey( $this->wanCache, 
$serverIndexes ),
+                       1, // logical expiry in seconds
+                       function ( $priorStates ) use ( $serverIndex ) {
+                               if ( !$priorStates || !isset( 
$priorStates['weightScales'][$serverIndex] ) ) {
+                                       return false; // getServerStates() 
should have populated this
+                               }
+
+                               // Get new weight scale using a moving average 
of the naïve and prior values
+                               $priorStates['weightScales'][$serverIndex] = 
$this->getNewScaleViaMovingAve(
+                                       
$priorStates['weightScales'][$serverIndex],
+                                       $naiveScale = 0.0, // e.g. "not usable"
+                                       $this->movingAveRatioConnFail
+                               );
+
+                               return $priorStates;
+                       },
+                       [
+                               // One thread can update at a time and 
contenders use the old value
+                               'lockTSE' => self::STATE_PRESERVE_TTL,
+                               // Force a cache miss to make the callback 
always run
+                               'minAsOf' => microtime( true ) + 3600
+                       ]
+               );
+       }
+
+       /**
+        * @param int $serverIndex
+        * @param string|bool $domain
+        */
+       protected function pingSyncFailure( $serverIndex, $domain ) {
+               $this->wanCache->getWithSetCallback(
+                       $this->getFailureKey( $this->wanCache, $serverIndex ),
+                       WANObjectCache::TTL_MINUTE,
+                       function ( $curState ) {
+                               $now = microtime( true );
+                               $curState = $curState ?: [ 'failRate' => 0.0, 
'lastFailTime' => null ];
+                               // Get new weight scale using a moving average 
of the naïve and prior values
+                               return [
+                                       'failRate' => 
$this->getNewScaleViaMovingAve(
+                                               $curState['failRate'],
+                                               $curState['lastFailTime']
+                                                       ? 1 / ( $now - 
$curState['lastFailTime'] )
+                                                       : 1 / 60, // a priori 
starting rate
+                                               $this->movingAveRatioSyncFail
+                                       ),
+                                       'lastFailTime' => $now
+                               ];
+                       },
+                       [
+                               // Force a cache miss to make the callback 
always run
+                               'minAsOf' => microtime( true ) + 3600
+                       ]
+               );
+       }
+
+       /**
+        * @param array $serverIndexes
+        * @param string|bool $domain
+        * @return array
+        */
        protected function getServerStates( array $serverIndexes, $domain ) {
-               $writerIndex = $this->parent->getWriterIndex();
-               if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == 
$writerIndex ) {
-                       # Single server only, just return zero without caching
+               // Represent the cluster by the name of the master DB
+               $cluster = $this->parent->getServerName( 
$this->parent->getWriterIndex() );
+
+               // Randomize TTLs to reduce stampedes
+               $ttlMs = mt_rand( 1 * 1e6, self::POLL_PERIOD_MS * 1e6 ) / 1e6;
+               $minAsOfTime = microtime( true ) - 1e3 * $ttlMs;
+
+               // (a) Check the local server cache
+               $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, 
$serverIndexes );
+               $value = $this->srvCache->get( $srvCacheKey );
+               if ( $value && $value['timestamp'] > $minAsOfTime ) {
+                       $this->replLogger->debug( __METHOD__ . ": used fresh 
$cluster lag times" );
+
+                       return $value; // cache hit
+               }
+
+               // (b) Value is stale/missing; try to use/refresh the shared 
cache
+               $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 
10 );
+               if ( !$scopedLock && $value ) {
+                       $this->replLogger->debug( __METHOD__ . ": used stale 
$cluster lag times" );
+                       // (b1) Another thread on this server is already 
checking the shared cache
+                       return $value;
+               }
+
+               // (b2) This thread gets to check the shared cache or (b3) 
value is missing
+               $staleValue = $value;
+               $ran = false;
+               $value = $this->wanCache->getWithSetCallback(
+                       $this->getStatesCacheKey( $this->wanCache, 
$serverIndexes ),
+                       1, // 1 second logical expiry
+                       function ( $oldValue ) use ( $serverIndexes, $domain, 
$staleValue, $cluster, &$ran ) {
+                               $ran = true;
+                               $this->replLogger->info( __METHOD__ . ": 
re-calculated $cluster lag times" );
+
+                               $this->recursionGuard++; // ignore failure ping 
updates on connection attempts
+                               $states = $this->computeServerStates(
+                                       $serverIndexes,
+                                       $domain,
+                                       $oldValue ?: $staleValue // fallback to 
local cache stale value
+                               );
+                               $this->recursionGuard--;
+
+                               return $states;
+                       },
+                       [
+                               // One thread can update at a time and 
contenders use the old value
+                               'lockTSE' => self::STATE_PRESERVE_TTL,
+                               // If there is not value then those contenders 
use this trivial value
+                               'busyValue' => $staleValue ?: [
+                                       'lagTimes' => array_fill_keys( 
$serverIndexes, 0 ),
+                                       'weightScales' => array_fill_keys( 
$serverIndexes, 1.0 ),
+                                       'timestamp' => microtime( true )
+                               ]
+                       ]
+               );
+
+               if ( $ran ) {
+                       $this->replLogger->info( __METHOD__ . ": used WAN cache 
$cluster lag times" );
+               }
+
+               // Backfill the local server cache
+               $this->srvCache->set( $srvCacheKey, $value, 
self::STATE_PRESERVE_TTL );
+
+               return $value;
+       }
+
+       /**
+        * @param array $serverIndexes
+        * @param string|bool $domain
+        * @param array|false $priorStates
+        * @return array
+        * @throws DBAccessError
+        */
+       protected function computeServerStates( array $serverIndexes, $domain, 
$priorStates ) {
+               // Check if there is just a master DB and no replication 
involved
+               if ( $this->parent->getServerCount() <= 1 ) {
                        return [
-                               'lagTimes' => [ $writerIndex => 0 ],
-                               'weightScales' => [ $writerIndex => 1.0 ]
+                               'lagTimes' => [ $this->parent->getWriterIndex() 
=> 0 ],
+                               'weightScales' => [ 
$this->parent->getWriterIndex() => 1.0 ],
+                               'timestamp' => microtime( true )
                        ];
                }
 
-               $key = $this->getCacheKey( $serverIndexes );
-               # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
-               $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
-               # Keep keys around longer as fallbacks
-               $staleTTL = 60;
-
-               # (a) Check the local APC cache
-               $value = $this->srvCache->get( $key );
-               if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl 
) ) {
-                       $this->replLogger->debug( __METHOD__ . ": got lag times 
($key) from local cache" );
-                       return $value; // cache hit
-               }
-               $staleValue = $value ?: false;
-
-               # (b) Check the shared cache and backfill APC
-               $value = $this->wanCache->get( $key );
-               if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl 
) ) {
-                       $this->srvCache->set( $key, $value, $staleTTL );
-                       $this->replLogger->debug( __METHOD__ . ": got lag times 
($key) from main cache" );
-
-                       return $value; // cache hit
-               }
-               $staleValue = $value ?: $staleValue;
-
-               # (c) Cache key missing or expired; regenerate and backfill
-               if ( $this->srvCache->lock( $key, 0, 10 ) ) {
-                       # Let only this process update the cache value on this 
server
-                       $sCache = $this->srvCache;
-                       /** @noinspection PhpUnusedLocalVariableInspection */
-                       $unlocker = new ScopedCallback( function () use ( 
$sCache, $key ) {
-                               $sCache->unlock( $key );
-                       } );
-               } elseif ( $staleValue ) {
-                       # Could not acquire lock but an old cache exists, so 
use it
-                       return $staleValue;
-               }
+               $priorScales = $priorStates ? $priorStates['weightScales'] : [];
 
                $lagTimes = [];
                $weightScales = [];
-               $movAveRatio = $this->movingAveRatio;
                foreach ( $serverIndexes as $i ) {
+                       // Avoid querying the DB master; this method might run 
any datacenter
                        if ( $i == $this->parent->getWriterIndex() ) {
-                               $lagTimes[$i] = 0; // master always has no lag
-                               $weightScales[$i] = 1.0; // nominal weight
+                               $lagTimes[$i] = 0;
+                               $weightScales[$i] = 1.0;
+
                                continue;
                        }
 
+                       $host = $this->parent->getServerName( $i );
                        $conn = $this->parent->getAnyOpenConnection( $i );
                        if ( $conn ) {
                                $close = false; // already open
                        } else {
-                               $conn = $this->parent->openConnection( $i, '' );
+                               $conn = $this->parent->openConnection( $i, 
LoadBalancer::DOMAIN_ANY );
                                $close = true; // new connection
                        }
 
-                       $lastWeight = isset( $staleValue['weightScales'][$i] )
-                               ? $staleValue['weightScales'][$i]
-                               : 1.0;
-                       $coefficient = $this->getWeightScale( $i, $conn ?: null 
);
-                       $newWeight = $movAveRatio * $coefficient + ( 1 - 
$movAveRatio ) * $lastWeight;
+                       // Get new weight scale using a moving average of the 
naïve and prior values
+                       $lastScale = isset( $priorScales[$i] ) ? 
$priorScales[$i] : 1.0;
+                       $naiveScale = $this->getWeightScale( $i, $conn ?: null 
);
+                       $newScale = $this->getNewScaleViaMovingAve(
+                               $lastScale,
+                               $naiveScale,
+                               $this->movingAveRatio
+                       );
 
-                       // Scale from 10% to 100% of nominal weight
-                       $weightScales[$i] = max( $newWeight, 0.10 );
+                       // Scale from 0% to 100% of nominal weight (sanity)
+                       $weightScales[$i] = max( $newScale, 0.0 );
 
-                       $host = $this->parent->getServerName( $i );
-
+                       // Mark replication lag on this server as "false" if it 
is unreacheable
                        if ( !$conn ) {
                                $lagTimes[$i] = false;
                                $this->replLogger->error(
@@ -186,10 +340,16 @@
                                continue;
                        }
 
+                       // Determine the amount of replication lag on this 
server
                        if ( $conn->getLBInfo( 'is static' ) ) {
-                               $lagTimes[$i] = 0;
+                               $lagTimes[$i] = 0; // data never changes
                        } else {
-                               $lagTimes[$i] = $conn->getLag();
+                               try {
+                                       $lagTimes[$i] = $conn->getLag();
+                               } catch ( DBError $e ) {
+                                       $lagTimes[$i] = false;
+                               }
+
                                if ( $lagTimes[$i] === false ) {
                                        $this->replLogger->error(
                                                __METHOD__ . ": host 
{db_server} is not replicating?",
@@ -216,17 +376,11 @@
                        }
                }
 
-               # Add a timestamp key so we know when it was cached
-               $value = [
+               return [
                        'lagTimes' => $lagTimes,
                        'weightScales' => $weightScales,
                        'timestamp' => microtime( true )
                ];
-               $this->wanCache->set( $key, $value, $staleTTL );
-               $this->srvCache->set( $key, $value, $staleTTL );
-               $this->replLogger->info( __METHOD__ . ": re-calculated lag 
times ($key)" );
-
-               return $value;
        }
 
        /**
@@ -238,14 +392,70 @@
                return $conn ? 1.0 : 0.0;
        }
 
-       private function getCacheKey( array $serverIndexes ) {
+       /**
+        * Get the moving average weight scale given a naive and the last 
iteration value
+        *
+        * One case of particular note is if a server totally cannot have its 
state queried.
+        * Ideally, the scale should be able to drop from 1.0 to a miniscule 
amount (say 0.001)
+        * fairly quickly. To get the time to reach 0.001, some calculations 
can be done:
+        *
+        * SCALE = $naiveScale * $movAveRatio + $lastScale * (1 - $movAveRatio)
+        * SCALE = 0 * $movAveRatio + $lastScale * (1 - $movAveRatio)
+        * SCALE = $lastScale * (1 - $movAveRatio)
+        *
+        * Given a starting weight scale of 1.0:
+        * 1.0 * (1 - $movAveRatio)^(# iterations) = 0.001
+        * ceil( log<1 - $movAveRatio>(0.001) ) = (# iterations)
+        * t = (# iterations) * (POLL_PERIOD + SHARED_CACHE_TTL)
+        * t = (# iterations) * (1e3 * POLL_PERIOD_MS + SHARED_CACHE_TTL)
+        *
+        * If $movAveRatio is 0.5, then:
+        * t = ceil( log<0.5>(0.01) ) * 1.5 = 7 * 1.5 = 10.5 seconds [for 1% 
scale]
+        * t = ceil( log<0.5>(0.001) ) * 1.5 = 10 * 1.5 = 15 seconds [for 0.1% 
scale]
+        *
+        * If $movAveRatio is 0.8, then:
+        * t = ceil( log<0.2>(0.01) ) * 1.5 = 3 * 1.5 = 4.5 seconds [for 1% 
scale]
+        * t = ceil( log<0.2>(0.001) ) * 1.5 = 5 * 1.5 = 7.5 seconds [for 0.1% 
scale]
+        *
+        * Use of connection failure rate can greatly speed this process up
+        *
+        * @param float $lastScale
+        * @param float $naiveScale
+        * @param float $movAveRatio
+        * @return float
+        */
+       protected function getNewScaleViaMovingAve( $lastScale, $naiveScale, 
$movAveRatio ) {
+               return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * 
$lastScale;
+       }
+
+       /**
+        * @param WANObjectCache|BagOStuff $cache
+        * @param array $serverIndexes
+        * @return string
+        */
+       private function getStatesCacheKey( $cache, array $serverIndexes ) {
                sort( $serverIndexes );
                // Lag is per-server, not per-DB, so key on the master DB name
                return $this->srvCache->makeGlobalKey(
-                       'lag-times',
+                       'database-lag-times',
                        self::VERSION,
                        $this->parent->getServerName( 
$this->parent->getWriterIndex() ),
                        implode( '-', $serverIndexes )
                );
        }
+
+       /**
+        * @param WANObjectCache|BagOStuff $cache
+        * @param int $serverIndex
+        * @return string
+        */
+       private function getFailureKey( $cache, $serverIndex ) {
+               // Lag is per-server, not per-DB, so key on the master DB name
+               return $this->srvCache->makeGlobalKey(
+                       'database-failure-rate',
+                       self::VERSION,
+                       $this->parent->getServerName( 
$this->parent->getWriterIndex() ),
+                       $serverIndex
+               );
+       }
 }
diff --git a/includes/libs/rdbms/loadmonitor/LoadMonitorMySQL.php 
b/includes/libs/rdbms/loadmonitor/LoadMonitorMySQL.php
index f8ad329..98cff6d 100644
--- a/includes/libs/rdbms/loadmonitor/LoadMonitorMySQL.php
+++ b/includes/libs/rdbms/loadmonitor/LoadMonitorMySQL.php
@@ -46,7 +46,7 @@
 
        protected function getWeightScale( $index, IDatabase $conn = null ) {
                if ( !$conn ) {
-                       return 0.0;
+                       return parent::getWeightScale( $index, $conn );
                }
 
                $weight = 1.0;

-- 
To view, visit https://gerrit.wikimedia.org/r/394430
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8b1c7040051d9761516a1e7c4eecd4d4616f252c
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to