Awight has uploaded a new change for review.
https://gerrit.wikimedia.org/r/226948
Change subject: WIP Implement high-availability queue pool
......................................................................
WIP Implement high-availability queue pool
These changes allow the orphan slayer to read round-robin from available queues.
TODO:
* Drop unresponsive servers for the remainder of the batch job.
Bug: T99015
Change-Id: I50939cf574ef4c585cf2156692c0f1df8472c91f
---
M globalcollect_gateway/scripts/orphans.php
1 file changed, 55 insertions(+), 25 deletions(-)
git pull
ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/DonationInterface
refs/changes/48/226948/1
diff --git a/globalcollect_gateway/scripts/orphans.php
b/globalcollect_gateway/scripts/orphans.php
index be49f00..2f4eff4 100644
--- a/globalcollect_gateway/scripts/orphans.php
+++ b/globalcollect_gateway/scripts/orphans.php
@@ -12,6 +12,8 @@
//If you get errors on this next line, set (and export) your MW_INSTALL_PATH
var.
require_once( "$IP/maintenance/Maintenance.php" );
+use Predis\Connection\ConnectionException;
+
class GlobalCollectOrphanRectifier extends Maintenance {
protected $killfiles = array();
@@ -151,16 +153,19 @@
return $elapsed;
}
+ protected function deleteMessage( $correlation_id, $queue ) {
+ $this->handled_ids[$correlation_id] = 'antimessage';
+
+ // Delete from Memcache
+ DonationQueue::instance()->delete(
+ $correlation_id, $queue );
+ }
+
function addStompCorrelationIDToAckBucket( $correlation_id, $ackNow =
false ){
static $bucket = array();
$count = 50; //sure. Why not?
if ( $correlation_id ) {
$bucket[$correlation_id] = "'$correlation_id'";
//avoiding duplicates.
- $this->handled_ids[$correlation_id] = 'antimessage';
-
- // Delete from Memcache
- DonationQueue::instance()->delete(
- $correlation_id,
GlobalCollectAdapter::GC_CC_LIMBO_QUEUE );
}
if ( count( $bucket ) && ( count( $bucket ) >= $count ||
$ackNow ) ){
//ack now.
@@ -191,6 +196,9 @@
//add the correlation ID to the ack bucket.
if (array_key_exists('correlation-id',
$message->headers)) {
$this->addStompCorrelationIDToAckBucket( $message->headers['correlation-id'] );
+
+ // TODO: This is safe to remove when we
stop using STOMP.
+ $this->deleteMessage(
$message->headers['correlation-id'], GlobalCollectAdapter::GC_CC_LIMBO_QUEUE );
} else {
echo 'The STOMP message ' .
$message->headers['message-id'] . " has no correlation ID!\n";
}
@@ -209,29 +217,51 @@
$orphans = array();
$false_orphans = array();
- while ( $message = DonationQueue::instance()->pop(
GlobalCollectAdapter::GC_CC_LIMBO_QUEUE ) ) {
- $correlation_id = 'globalcollect-' .
$message['gateway_txn_id'];
- if ( array_key_exists( $correlation_id,
$this->handled_ids ) ) {
- continue;
+
+ $queue_pool = (array) $this->getOrphanGlobal(
'gc_cc_limbo_queue_pool' );
+ if ( !$queue_pool ) {
+ // FIXME: cheesy inline default
+ $queue_pool = (array)
GlobalCollectAdapter::GC_CC_LIMBO_QUEUE;
+ }
+
+ while ( $message = DonationQueue::instance()->pop(
$queue_pool[0] ) ) {
+ $current_queue = $queue_pool[0];
+
+ try {
+ $correlation_id = 'globalcollect-' .
$message['gateway_txn_id'];
+ if ( array_key_exists( $correlation_id,
$this->handled_ids ) ) {
+ continue;
+ }
+
+ // Check the timestamp to see if it's old
enough, and stop when
+ // we're below the threshold. Messages are
guaranteed to pop in
+ // chronological order.
+ $elapsed = $this->start_time - $message['date'];
+ if ( $elapsed < $time_buffer ) {
+ // Put it back!
+ DonationQueue::instance()->set(
$correlation_id, $message, $current_queue );
+ break;
+ }
+
+ // We got ourselves an orphan!
+ $order_id = explode('-', $correlation_id);
+ $order_id = $order_id[1];
+ $message['order_id'] = $order_id;
+ $message = unCreateQueueMessage($message);
+ $orphans[$correlation_id] = $message;
+ $this->logger->info( "Found an orphan!
$correlation_id" );
+
+ $this->deleteMessage( $correlation_id,
$current_queue );
+ } catch ( ConnectionException $ex ) {
+ // Drop this server, for the batch lifetime.
+ array_shift( $queue_pool );
}
- // Check the timestamp to see if it's old enough, and
stop when
- // we're below the threshold. Messages are guaranteed
to pop in
- // chronological order.
- $elapsed = $this->start_time - $message['date'];
- if ( $elapsed < $time_buffer ) {
- // Put it back!
- DonationQueue::instance()->set(
$correlation_id, $message, GlobalCollectAdapter::GC_CC_LIMBO_QUEUE );
- break;
+ // Round-robin the pool.
+ if ( count( $queue_pool ) > 1 ) {
+ $rotate_elem = array_shift( $queue_pool );
+ array_push( $queue_pool, $rotate_elem );
}
-
- // We got ourselves an orphan!
- $order_id = explode('-', $correlation_id);
- $order_id = $order_id[1];
- $message['order_id'] = $order_id;
- $message = unCreateQueueMessage($message);
- $orphans[$correlation_id] = $message;
- $this->logger->info( "Found an orphan! $correlation_id"
);
// TODO: stop stomping
$this->addStompCorrelationIDToAckBucket(
$correlation_id );
--
To view, visit https://gerrit.wikimedia.org/r/226948
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I50939cf574ef4c585cf2156692c0f1df8472c91f
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/DonationInterface
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits