jenkins-bot has submitted this change and it was merged.

Change subject: Added $wgJobSerialCommitThreshold setting
......................................................................


Added $wgJobSerialCommitThreshold setting

* This is used to avoid lag by certain jobs

Bug: T95501
Change-Id: Id707c9a840fa23d56407e03aaae4e25149a1f906
---
M includes/DefaultSettings.php
M includes/db/Database.php
M includes/db/DatabaseMysqlBase.php
M includes/db/LoadBalancer.php
M includes/jobqueue/JobRunner.php
5 files changed, 103 insertions(+), 13 deletions(-)

Approvals:
  Gilles: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php
index 3cfeb8c..e323ec6 100644
--- a/includes/DefaultSettings.php
+++ b/includes/DefaultSettings.php
@@ -6452,6 +6452,21 @@
 $wgJobBackoffThrottling = array();
 
 /**
+ * Make job runners commit changes for slave-lag prone jobs one job at a time.
+ * This is useful if there are many job workers that race on slave lag checks.
+ * If set, jobs taking this many seconds of DB write time have serialized 
commits.
+ *
+ * Note that affected jobs may have worse lock contention. Also, if they affect
+ * several DBs at once they may have a smaller chance of being atomic due to 
the
+ * possibility of connection loss while queueing up to commit. Affected jobs 
may
+ * also fail due to the commit lock acquisition timeout.
+ *
+ * @var float|bool
+ * @since 1.26
+ */
+$wgJobSerialCommitThreshold = false;
+
+/**
  * Map of job types to configuration arrays.
  * This determines which queue class and storage system is used for each job 
type.
  * Job types that do not have explicit configuration will use the 'default' 
config.
diff --git a/includes/db/Database.php b/includes/db/Database.php
index 8c1ebf9..2c1ebea 100644
--- a/includes/db/Database.php
+++ b/includes/db/Database.php
@@ -4235,7 +4235,7 @@
        }
 
        /**
-        * Check to see if a named lock is available. This is non-blocking.
+        * Check to see if a named lock is available (non-blocking)
         *
         * @param string $lockName Name of lock to poll
         * @param string $method Name of method calling us
@@ -4249,8 +4249,7 @@
        /**
         * Acquire a named lock
         *
-        * Abstracted from Filestore::lock() so child classes can implement for
-        * their own needs.
+        * Named locks are not related to transactions
         *
         * @param string $lockName Name of lock to aquire
         * @param string $method Name of method calling us
@@ -4262,7 +4261,9 @@
        }
 
        /**
-        * Release a lock.
+        * Release a lock
+        *
+        * Named locks are not related to transactions
         *
         * @param string $lockName Name of lock to release
         * @param string $method Name of method calling us
@@ -4276,6 +4277,16 @@
        }
 
        /**
+        * Check to see if a named lock used by lock() use blocking queues
+        *
+        * @return bool
+        * @since 1.26
+        */
+       public function namedLocksEnqueue() {
+               return false;
+       }
+
+       /**
         * Lock specific tables
         *
         * @param array $read Array of tables to lock for read access
diff --git a/includes/db/DatabaseMysqlBase.php 
b/includes/db/DatabaseMysqlBase.php
index aac95a8..64917cc 100644
--- a/includes/db/DatabaseMysqlBase.php
+++ b/includes/db/DatabaseMysqlBase.php
@@ -873,6 +873,10 @@
                return ( $row->lockstatus == 1 );
        }
 
+       public function namedLocksEnqueue() {
+               return true;
+       }
+
        /**
         * @param array $read
         * @param array $write
diff --git a/includes/db/LoadBalancer.php b/includes/db/LoadBalancer.php
index 624f46b..be67d75 100644
--- a/includes/db/LoadBalancer.php
+++ b/includes/db/LoadBalancer.php
@@ -845,8 +845,9 @@
 
        /**
         * @return int
+        * @since 1.26
         */
-       private function getWriterIndex() {
+       public function getWriterIndex() {
                return 0;
        }
 
diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php
index 9425423..2311ea2 100644
--- a/includes/jobqueue/JobRunner.php
+++ b/includes/jobqueue/JobRunner.php
@@ -36,6 +36,11 @@
        protected $debug;
 
        /**
+        * @var LoggerInterface $logger
+        */
+       protected $logger;
+
+       /**
         * @param callable $debug Optional debug output handler
         */
        public function setDebugHandler( $debug ) {
@@ -43,12 +48,8 @@
        }
 
        /**
-        * @var LoggerInterface $logger
-        */
-       protected $logger;
-
-       /**
         * @param LoggerInterface $logger
+        * @return void
         */
        public function setLogger( LoggerInterface $logger ) {
                $this->logger = $logger;
@@ -183,7 +184,7 @@
                                        ++$jobsRun;
                                        $status = $job->run();
                                        $error = $job->getLastError();
-                                       wfGetLBFactory()->commitMasterChanges();
+                                       $this->commitMasterChanges( $job );
                                } catch ( Exception $e ) {
                                        
MWExceptionHandler::rollbackMasterChangesAndLog( $e );
                                        $status = false;
@@ -304,7 +305,6 @@
         * @return array Map of (job type => backoff expiry timestamp)
         */
        private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
-
                $file = wfTempDir() . '/mw-runJobs-backoffs.json';
                if ( is_file( $file ) ) {
                        $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
@@ -342,7 +342,6 @@
         * @return array The new backoffs account for $backoffs and the latest 
file data
         */
        private function syncBackoffDeltas( array $backoffs, array &$deltas, 
$mode = 'wait' ) {
-
                if ( !$deltas ) {
                        return $this->loadBackoffs( $backoffs, $mode );
                }
@@ -409,4 +408,64 @@
                        call_user_func_array( $this->debug, array( wfTimestamp( 
TS_DB ) . " $msg\n" ) );
                }
        }
+
+       /**
+        * Commit any DB master changes from a job on all load balancers
+        *
+        * @param Job $job
+        * @throws DBError
+        */
+       private function commitMasterChanges( Job $job ) {
+               global $wgJobSerialCommitThreshold;
+
+               $lb = wfGetLB( wfWikiID() );
+               if ( $wgJobSerialCommitThreshold !== false ) {
+                       // Generally, there is one master connection to the 
local DB
+                       $dbwSerial = $lb->getAnyOpenConnection( 
$lb->getWriterIndex() );
+               } else {
+                       $dbwSerial = false;
+               }
+
+               if ( !$dbwSerial
+                       || !$dbwSerial->namedLocksEnqueue()
+                       || $dbwSerial->pendingWriteQueryDuration() < 
$wgJobSerialCommitThreshold
+               ) {
+                       // Writes are all to foreign DBs, named locks don't 
form queues,
+                       // or $wgJobSerialCommitThreshold is not reached; 
commit changes now
+                       wfGetLBFactory()->commitMasterChanges();
+                       return;
+               }
+
+               $ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() );
+               $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of 
writes]";
+               $this->logger->info( $msg );
+               $this->debugCallback( $msg );
+
+               // Wait for an exclusive lock to commit
+               if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 
30 ) ) {
+                       // This will trigger a rollback in the main loop
+                       throw new DBError( $dbwSerial, "Timed out waiting on 
commit queue." );
+               }
+               // Wait for the generic slave to catch up
+               $pos = $lb->getMasterPos();
+               if ( $pos ) {
+                       $lb->waitForOne( $pos );
+               }
+
+               // Re-ping all masters with transactions. This throws DBError 
if some
+               // connection died while waiting on locks/slaves, triggering a 
rollback.
+               wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) {
+                       $lb->forEachOpenConnection( function( DatabaseBase 
$conn ) {
+                               if ( $conn->writesOrCallbacksPending() ) {
+                                       $conn->query( "SELECT 1" );
+                               }
+                       } );
+               } );
+
+               // Actually commit the DB master changes
+               wfGetLBFactory()->commitMasterChanges();
+
+               // Release the lock
+               $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
+       }
 }

-- 
To view, visit https://gerrit.wikimedia.org/r/205825
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Id707c9a840fa23d56407e03aaae4e25149a1f906
Gerrit-PatchSet: 9
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Gilles <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Parent5446 <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to