Awight has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/226948

Change subject: WIP Implement high-availability queue pool
......................................................................

WIP Implement high-availability queue pool

These changes allow the orphan slayer to read round-robin from available queues.

TODO:
* Drop unresponsive servers for the remainder of the batch job.

Bug: T99015
Change-Id: I50939cf574ef4c585cf2156692c0f1df8472c91f
---
M globalcollect_gateway/scripts/orphans.php
1 file changed, 55 insertions(+), 25 deletions(-)


  git pull 
ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/DonationInterface 
refs/changes/48/226948/1

diff --git a/globalcollect_gateway/scripts/orphans.php 
b/globalcollect_gateway/scripts/orphans.php
index be49f00..2f4eff4 100644
--- a/globalcollect_gateway/scripts/orphans.php
+++ b/globalcollect_gateway/scripts/orphans.php
@@ -12,6 +12,8 @@
 //If you get errors on this next line, set (and export) your MW_INSTALL_PATH 
var. 
 require_once( "$IP/maintenance/Maintenance.php" );
 
+use Predis\Connection\ConnectionException;
+
 class GlobalCollectOrphanRectifier extends Maintenance {
 
        protected $killfiles = array();
@@ -151,16 +153,19 @@
                return $elapsed;
        }
 
+       protected function deleteMessage( $correlation_id, $queue ) {
+           $this->handled_ids[$correlation_id] = 'antimessage';
+
+           // Delete from Memcache
+           DonationQueue::instance()->delete(
+                   $correlation_id, $queue );
+       }
+
        function addStompCorrelationIDToAckBucket( $correlation_id, $ackNow = 
false ){
                static $bucket = array();
                $count = 50; //sure. Why not?
                if ( $correlation_id ) {
                        $bucket[$correlation_id] = "'$correlation_id'"; 
//avoiding duplicates.
-                       $this->handled_ids[$correlation_id] = 'antimessage';
-
-                       // Delete from Memcache
-                       DonationQueue::instance()->delete(
-                               $correlation_id, 
GlobalCollectAdapter::GC_CC_LIMBO_QUEUE );
                }
                if ( count( $bucket ) && ( count( $bucket ) >= $count || 
$ackNow ) ){
                        //ack now.
@@ -191,6 +196,9 @@
                                //add the correlation ID to the ack bucket. 
                                if (array_key_exists('correlation-id', 
$message->headers)) {
                                        
$this->addStompCorrelationIDToAckBucket( $message->headers['correlation-id'] );
+
+                                       // TODO: This is safe to remove when we 
stop using STOMP.
+                                       $this->deleteMessage( 
$message->headers['correlation-id'], GlobalCollectAdapter::GC_CC_LIMBO_QUEUE );
                                } else {
                                        echo 'The STOMP message ' . 
$message->headers['message-id'] . " has no correlation ID!\n";
                                }
@@ -209,29 +217,51 @@
 
                $orphans = array();
                $false_orphans = array();
-               while ( $message = DonationQueue::instance()->pop( 
GlobalCollectAdapter::GC_CC_LIMBO_QUEUE ) ) {
-                       $correlation_id = 'globalcollect-' . 
$message['gateway_txn_id'];
-                       if ( array_key_exists( $correlation_id, 
$this->handled_ids ) ) {
-                               continue;
+
+               $queue_pool = (array) $this->getOrphanGlobal( 
'gc_cc_limbo_queue_pool' );
+               if ( !$queue_pool ) {
+                       // FIXME: cheesy inline default
+                       $queue_pool = (array) 
GlobalCollectAdapter::GC_CC_LIMBO_QUEUE;
+               }
+
+               while ( $message = DonationQueue::instance()->pop( 
$queue_pool[0] ) ) {
+                       $current_queue = $queue_pool[0];
+
+                       try {
+                               $correlation_id = 'globalcollect-' . 
$message['gateway_txn_id'];
+                               if ( array_key_exists( $correlation_id, 
$this->handled_ids ) ) {
+                                       continue;
+                               }
+
+                               // Check the timestamp to see if it's old 
enough, and stop when
+                               // we're below the threshold.  Messages are 
guaranteed to pop in
+                               // chronological order.
+                               $elapsed = $this->start_time - $message['date'];
+                               if ( $elapsed < $time_buffer ) {
+                                       // Put it back!
+                                       DonationQueue::instance()->set( 
$correlation_id, $message, $current_queue );
+                                       break;
+                               }
+
+                               // We got ourselves an orphan!
+                               $order_id = explode('-', $correlation_id);
+                               $order_id = $order_id[1];
+                               $message['order_id'] = $order_id;
+                               $message = unCreateQueueMessage($message);
+                               $orphans[$correlation_id] = $message;
+                               $this->logger->info( "Found an orphan! 
$correlation_id" );
+
+                               $this->deleteMessage( $correlation_id, 
$current_queue );
+                       } catch ( ConnectionException $ex ) {
+                               // Drop this server, for the batch lifetime.
+                               array_shift( $queue_pool );
                        }
 
-                       // Check the timestamp to see if it's old enough, and 
stop when
-                       // we're below the threshold.  Messages are guaranteed 
to pop in
-                       // chronological order.
-                       $elapsed = $this->start_time - $message['date'];
-                       if ( $elapsed < $time_buffer ) {
-                               // Put it back!
-                               DonationQueue::instance()->set( 
$correlation_id, $message, GlobalCollectAdapter::GC_CC_LIMBO_QUEUE );
-                               break;
+                       // Round-robin the pool.
+                       if ( count( $queue_pool ) > 1 ) {
+                           $rotate_elem = array_shift( $queue_pool );
+                           array_push( $queue_pool, $rotate_elem );
                        }
-
-                       // We got ourselves an orphan!
-                       $order_id = explode('-', $correlation_id);
-                       $order_id = $order_id[1];
-                       $message['order_id'] = $order_id;
-                       $message = unCreateQueueMessage($message);
-                       $orphans[$correlation_id] = $message;
-                       $this->logger->info( "Found an orphan! $correlation_id" 
);
 
                        // TODO: stop stomping
                        $this->addStompCorrelationIDToAckBucket( 
$correlation_id );

-- 
To view, visit https://gerrit.wikimedia.org/r/226948
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I50939cf574ef4c585cf2156692c0f1df8472c91f
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/DonationInterface
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to