jenkins-bot has submitted this change and it was merged.
Change subject: Added $wgJobSerialCommitThreshold setting
......................................................................
Added $wgJobSerialCommitThreshold setting
* This is used to avoid lag by certain jobs
Bug: T95501
Change-Id: Id707c9a840fa23d56407e03aaae4e25149a1f906
---
M includes/DefaultSettings.php
M includes/db/Database.php
M includes/db/DatabaseMysqlBase.php
M includes/db/LoadBalancer.php
M includes/jobqueue/JobRunner.php
5 files changed, 103 insertions(+), 13 deletions(-)
Approvals:
Gilles: Looks good to me, approved
jenkins-bot: Verified
diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php
index 3cfeb8c..e323ec6 100644
--- a/includes/DefaultSettings.php
+++ b/includes/DefaultSettings.php
@@ -6452,6 +6452,21 @@
$wgJobBackoffThrottling = array();
/**
+ * Make job runners commit changes for slave-lag prone jobs one job at a time.
+ * This is useful if there are many job workers that race on slave lag checks.
+ * If set, jobs taking this many seconds of DB write time have serialized
commits.
+ *
+ * Note that affected jobs may have worse lock contention. Also, if they affect
+ * several DBs at once they may have a smaller chance of being atomic due to
the
+ * possibility of connection loss while queueing up to commit. Affected jobs
may
+ * also fail due to the commit lock acquisition timeout.
+ *
+ * @var float|bool
+ * @since 1.26
+ */
+$wgJobSerialCommitThreshold = false;
+
+/**
* Map of job types to configuration arrays.
* This determines which queue class and storage system is used for each job
type.
* Job types that do not have explicit configuration will use the 'default'
config.
diff --git a/includes/db/Database.php b/includes/db/Database.php
index 8c1ebf9..2c1ebea 100644
--- a/includes/db/Database.php
+++ b/includes/db/Database.php
@@ -4235,7 +4235,7 @@
}
/**
- * Check to see if a named lock is available. This is non-blocking.
+ * Check to see if a named lock is available (non-blocking)
*
* @param string $lockName Name of lock to poll
* @param string $method Name of method calling us
@@ -4249,8 +4249,7 @@
/**
* Acquire a named lock
*
- * Abstracted from Filestore::lock() so child classes can implement for
- * their own needs.
+ * Named locks are not related to transactions
*
* @param string $lockName Name of lock to aquire
* @param string $method Name of method calling us
@@ -4262,7 +4261,9 @@
}
/**
- * Release a lock.
+ * Release a lock
+ *
+ * Named locks are not related to transactions
*
* @param string $lockName Name of lock to release
* @param string $method Name of method calling us
@@ -4276,6 +4277,16 @@
}
/**
+ * Check to see if a named lock used by lock() use blocking queues
+ *
+ * @return bool
+ * @since 1.26
+ */
+ public function namedLocksEnqueue() {
+ return false;
+ }
+
+ /**
* Lock specific tables
*
* @param array $read Array of tables to lock for read access
diff --git a/includes/db/DatabaseMysqlBase.php
b/includes/db/DatabaseMysqlBase.php
index aac95a8..64917cc 100644
--- a/includes/db/DatabaseMysqlBase.php
+++ b/includes/db/DatabaseMysqlBase.php
@@ -873,6 +873,10 @@
return ( $row->lockstatus == 1 );
}
+ public function namedLocksEnqueue() {
+ return true;
+ }
+
/**
* @param array $read
* @param array $write
diff --git a/includes/db/LoadBalancer.php b/includes/db/LoadBalancer.php
index 624f46b..be67d75 100644
--- a/includes/db/LoadBalancer.php
+++ b/includes/db/LoadBalancer.php
@@ -845,8 +845,9 @@
/**
* @return int
+ * @since 1.26
*/
- private function getWriterIndex() {
+ public function getWriterIndex() {
return 0;
}
diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php
index 9425423..2311ea2 100644
--- a/includes/jobqueue/JobRunner.php
+++ b/includes/jobqueue/JobRunner.php
@@ -36,6 +36,11 @@
protected $debug;
/**
+ * @var LoggerInterface $logger
+ */
+ protected $logger;
+
+ /**
* @param callable $debug Optional debug output handler
*/
public function setDebugHandler( $debug ) {
@@ -43,12 +48,8 @@
}
/**
- * @var LoggerInterface $logger
- */
- protected $logger;
-
- /**
* @param LoggerInterface $logger
+ * @return void
*/
public function setLogger( LoggerInterface $logger ) {
$this->logger = $logger;
@@ -183,7 +184,7 @@
++$jobsRun;
$status = $job->run();
$error = $job->getLastError();
- wfGetLBFactory()->commitMasterChanges();
+ $this->commitMasterChanges( $job );
} catch ( Exception $e ) {
MWExceptionHandler::rollbackMasterChangesAndLog( $e );
$status = false;
@@ -304,7 +305,6 @@
* @return array Map of (job type => backoff expiry timestamp)
*/
private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
-
$file = wfTempDir() . '/mw-runJobs-backoffs.json';
if ( is_file( $file ) ) {
$noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
@@ -342,7 +342,6 @@
* @return array The new backoffs account for $backoffs and the latest
file data
*/
private function syncBackoffDeltas( array $backoffs, array &$deltas,
$mode = 'wait' ) {
-
if ( !$deltas ) {
return $this->loadBackoffs( $backoffs, $mode );
}
@@ -409,4 +408,64 @@
call_user_func_array( $this->debug, array( wfTimestamp(
TS_DB ) . " $msg\n" ) );
}
}
+
+ /**
+ * Commit any DB master changes from a job on all load balancers
+ *
+ * @param Job $job
+ * @throws DBError
+ */
+ private function commitMasterChanges( Job $job ) {
+ global $wgJobSerialCommitThreshold;
+
+ $lb = wfGetLB( wfWikiID() );
+ if ( $wgJobSerialCommitThreshold !== false ) {
+ // Generally, there is one master connection to the
local DB
+ $dbwSerial = $lb->getAnyOpenConnection(
$lb->getWriterIndex() );
+ } else {
+ $dbwSerial = false;
+ }
+
+ if ( !$dbwSerial
+ || !$dbwSerial->namedLocksEnqueue()
+ || $dbwSerial->pendingWriteQueryDuration() <
$wgJobSerialCommitThreshold
+ ) {
+ // Writes are all to foreign DBs, named locks don't
form queues,
+ // or $wgJobSerialCommitThreshold is not reached;
commit changes now
+ wfGetLBFactory()->commitMasterChanges();
+ return;
+ }
+
+ $ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() );
+ $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of
writes]";
+ $this->logger->info( $msg );
+ $this->debugCallback( $msg );
+
+ // Wait for an exclusive lock to commit
+ if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__,
30 ) ) {
+ // This will trigger a rollback in the main loop
+ throw new DBError( $dbwSerial, "Timed out waiting on
commit queue." );
+ }
+ // Wait for the generic slave to catch up
+ $pos = $lb->getMasterPos();
+ if ( $pos ) {
+ $lb->waitForOne( $pos );
+ }
+
+ // Re-ping all masters with transactions. This throws DBError
if some
+ // connection died while waiting on locks/slaves, triggering a
rollback.
+ wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) {
+ $lb->forEachOpenConnection( function( DatabaseBase
$conn ) {
+ if ( $conn->writesOrCallbacksPending() ) {
+ $conn->query( "SELECT 1" );
+ }
+ } );
+ } );
+
+ // Actually commit the DB master changes
+ wfGetLBFactory()->commitMasterChanges();
+
+ // Release the lock
+ $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
+ }
}
--
To view, visit https://gerrit.wikimedia.org/r/205825
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Id707c9a840fa23d56407e03aaae4e25149a1f906
Gerrit-PatchSet: 9
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Gilles <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Parent5446 <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits