Awight has uploaded a new change for review.
https://gerrit.wikimedia.org/r/203000
Change subject: WIP more DonationQueue work
......................................................................
WIP more DonationQueue work
* Discriminate between FIFO and key-value queue access, which allows us to stop
using antimessages.
* Scrap the STOMP hooks, and toggle on EnableStomp global exclusively.
* Start cleaning up the orphan slayer and refactoring to use DonationQueue.
Change-Id: Id40e497b35f5dd1e0cf148223c7b11b211ea27fc
---
M activemq_stomp/activemq_stomp.php
M gateway_common/DonationQueue.php
M gateway_common/gateway.adapter.php
M globalcollect_gateway/globalcollect.adapter.php
M globalcollect_gateway/scripts/orphan_adapter.php
M globalcollect_gateway/scripts/orphans.php
6 files changed, 135 insertions(+), 283 deletions(-)
git pull
ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/DonationInterface
refs/changes/00/203000/1
diff --git a/activemq_stomp/activemq_stomp.php
b/activemq_stomp/activemq_stomp.php
index 4c0f2a9..11f71a1 100644
--- a/activemq_stomp/activemq_stomp.php
+++ b/activemq_stomp/activemq_stomp.php
@@ -50,46 +50,18 @@
* nothing exploded big enough to kill the whole thing.
*/
function sendSTOMP( $transaction, $queue ) {
+ // FIXME: Switching queueing style on this archaic, magic STOMP key
name should be deprecated.
+ if ( array_key_exists( 'correlation-id', $transaction ) ) {
+ DonationLoggerFactory::getLogger()->info(
/*{$logPrefix}*/"Queueing message at key [{$transaction['correlation-id']}],
onto queue {$queue}" );
+ $queue->set( $properties['correlation-id'], $message,
$properties );
+ } else {
+ DonationLoggerFactory::getLogger()->info(
/*{$logPrefix}*/"Queueing message onto {$queue}" );
+ $queue->push( $message, $properties );
+ }
+
DonationQueue::sendMessage( $transaction, $queue );
return true;
-}
-
-/**
- * Hook to send transaction information to ActiveMQ server
- * @deprecated Use sendSTOMP with $queue = 'pending' instead
- *
- * @param array $transaction Key-value array of staged and ready donation data.
- * @return bool Just returns true all the time. Presumably an indication that
- * nothing exploded big enough to kill the whole thing.
- */
-function sendPendingSTOMP( $transaction ) {
- return sendSTOMP( $transaction, 'pending' );
-}
-
-/**
- * Hook to send transaction information to ActiveMQ server
- * @deprecated Use sendSTOMP with $queue = 'limbo' instead
- *
- * @param array $transaction Key-value array of staged and ready donation data.
- * @return bool Just returns true all the time. Presumably an indication that
- * nothing exploded big enough to kill the whole thing.
- */
-function sendLimboSTOMP( $transaction ) {
- return sendSTOMP( $transaction, 'limbo' );
-}
-
-/**
- * Hook to send transaction information to ActiveMQ server
- * @deprecated Use sendSTOMP with $queue = 'limbo' instead
- *
- * @param array $transaction Key-value array of staged and ready donation data.
- * @return bool Just returns true all the time. Presumably an indication that
- * nothing exploded big enough to kill the whole thing.
- */
-function sendFreeformSTOMP( $transaction, $queue ) {
- $transaction['freeform'] = true;
- return sendSTOMP( $transaction, $queue );
}
/**
diff --git a/gateway_common/DonationQueue.php b/gateway_common/DonationQueue.php
index c3d6f6a..8b80dfa 100644
--- a/gateway_common/DonationQueue.php
+++ b/gateway_common/DonationQueue.php
@@ -1,40 +1,50 @@
<?php
class DonationQueue {
+ protected function __construct() {
+ }
+
/**
- * TODO: Move dispatch logic out of the build-* functions.
- * TODO: Calling code should specify either set() or push(), the
correlation-id
- * condition is to support legacy callers, that assume STOMP.
+ * Singleton entrypoint
+ *
+ * @return DonationQueue
+ * FIXME: I don't like this function name, and now neither do you.
+ */
+ public static function getQueue() {
+ return new DonationQueue();
+ }
+
+ /**
+ *
+ * TODO: Move dispatch logic out of the build- functions.
*/
public static function sendMessage( $transaction, $queue ) {
+ if ( !GatewayAdapter::getGlobal( 'EnableStomp' ) ) {
+ return;
+ }
+
// FIXME: Let's get rid of the payment_method magic, so we
don't need to
// peek at $transaction to map queue names. Shouldn't the
caller be more specific?
$rawQueue = DonationQueue::buildRawQueueName( $transaction,
$queue );
$queue = DonationQueue::newBackend( $queue, array( 'queue' =>
$rawQueue ) );
$properties = DonationQueue::buildHeaders( $transaction );
- $message = DonationQueue::buildMessage( $transaction );
+ $message = DonationQueue::buildBody( $transaction );
// FIXME: We really do want the prefix to be applied
transparently.
$logPrefix = "{$transaction['gateway']}
{$transaction['contribution_tracking_id']}:";
if ( array_key_exists( 'gateway_txn_id', $transaction ) ) {
$logPrefix .= $transaction['gateway_txn_id'];
}
-
- // FIXME: Switching insert style is a legacy thing.
- if ( array_key_exists( 'correlation-id', $properties ) ) {
- DonationLoggerFactory::getLogger()->info( "$logPrefix
Queueing message at key [{$properties['correlation-id']}], onto queue
{$rawQueue}" );
- $queue->set( $properties['correlation-id'], $message,
$properties );
- } else {
- DonationLoggerFactory::getLogger()->info( "$logPrefix
Queueing message onto {$rawQueue}" );
- $queue->push( $message, $properties );
- }
}
/**
+ * Build message headers from given donation data array
+ *
+ * @param array $transaction
* @return array
*/
- protected static function buildHeaders($transaction) {
+ protected function buildHeaders( $transaction ) {
global $IP;
static $sourceRevision = null;
@@ -66,7 +76,7 @@
$properties['antimessage'] = 'true';
}
- // TODO: Deprecated: This should be a set() operation instead.
+ // TODO: Deprecated, use set() directly instead.
if ( array_key_exists( 'correlation-id', $transaction ) ) {
$properties['correlation-id'] =
$transaction['correlation-id'];
}
@@ -74,22 +84,33 @@
return $properties;
}
- protected static function buildMessage( $transaction ) {
+ /**
+ * Build a body string, given a donation data array
+ *
+ * @param array $transaction
+ *
+ * @return string Message body
+ */
+ protected function buildBody( $transaction ) {
if ( array_key_exists( 'freeform', $transaction ) &&
$transaction['freeform'] ) {
- return json_encode( $transaction );
+ $data = $transaction;
+ } elseif ( array_key_exists( 'antimessage', $transaction ) &&
$transaction['antimessage'] ) {
+ $data = '';
+ } else {
+ // Assume anything else is a regular donation.
+ $data = DonationQueue::buildTransactionMessage(
$transaction );
}
- if ( array_key_exists( 'antimessage', $transaction ) &&
$transaction['antimessage'] ) {
- return '';
- }
- return DonationQueue::buildTransactionMessage( $transaction );
+ return json_encode( $data );
}
/**
+ * Construct a new backend queue object
+ *
* @param string $name Queue identifier
* @param array $overrides Additional values to pass to the backend's
constructor.
* @return \PHPQueue\Interfaces\FifoQueueStore and
\PHPQueue\Interfaces\KeyValueStore
*/
- protected static function newBackend( $name, $options = array() ) {
+ protected function newBackend( $name, $options = array() ) {
global $wgDonationInterfaceDefaultQueueServer,
$wgDonationInterfaceQueues;
// Build the configuration.
@@ -129,7 +150,7 @@
* Order ID (generated with transaction) is assigned to
'contribution_tracking_id'?
* Response from processor is assigned to 'response'
*/
- protected static function buildTransactionMessage( $transaction ) {
+ protected function buildTransactionMessage( $transaction ) {
// specifically designed to match the CiviCRM API that will
handle it
// edit this array to include/ignore transaction data sent to
the server
$message = array(
@@ -157,7 +178,7 @@
'user_ip' => $transaction['user_ip'],
//the following int casting fixes an issue that is more
in Drupal/CiviCRM than here.
//The code there should also be fixed.
- 'date' => ( int ) $transaction['date'],
+ 'date' => (int)$transaction['date'],
);
//optional keys
@@ -170,12 +191,12 @@
'utm_medium',
);
foreach ( $optional_keys as $key ) {
- if ( isset( $transaction[ $key ] ) ) {
- $message[ $key ] = $transaction[ $key ];
+ if ( isset( $transaction[$key] ) ) {
+ $message[$key] = $transaction[$key];
}
}
- //as this is just the one thing, I can't think of a way to do
this that isn't actually more annoying. :/
+ // FIXME: as this is just the one thing, I can't think of a way
to do this that isn't actually more annoying. :/
if ( isset( $message['street_supplemental'] ) ) {
$message['supplemental_address_1'] =
$message['street_supplemental'];
unset( $message['street_supplemental'] );
@@ -184,11 +205,11 @@
return $message;
}
- public static function get( $queue, $key ) {
+ public function get( $queue, $key ) {
return DonationQueue::getQueue( $queue )->get( $key );
}
- public static function pop( $queue ) {
+ public function pop( $queue ) {
return DonationQueue::getQueue( $queue )->pop();
}
@@ -201,7 +222,7 @@
*
* @return string Mapped, physical queue name.
*/
- public static function buildRawQueueName( $transaction, $queue ) {
+ public function buildRawQueueName( $transaction, $queue ) {
global $wgStompQueueNames;
$methodSpecificQueueName = $transaction['payment_method'] . '-'
. $queue;
diff --git a/gateway_common/gateway.adapter.php
b/gateway_common/gateway.adapter.php
index 7da1270..39ac327 100644
--- a/gateway_common/gateway.adapter.php
+++ b/gateway_common/gateway.adapter.php
@@ -1742,6 +1742,7 @@
* @param string $transaction
* @param string $key The key
to lookup in the transaction such as STATUSID
* @param integer|string $code This gets
converted to an integer if the values is numeric.
+ * FIXME: We should be pulling $code out of the current transaction
fields, internally.
*
* @return null|string Returns the code action if a valid code
is supplied. Otherwise, the return is null.
*/
@@ -1803,84 +1804,42 @@
* through performing the current transaction.
* To put it another way, getFinalStatus should always return
* false, unless it's new data about a new transaction. In that case,
the
- * outcome will be assigned and the proper stomp hook selected.
+ * outcome will be assigned and the proper queue selected.
*
* Probably called in runPostProcessHooks(), which is itself most
likely to
* be called through executeFunctionIfExists, later on in
do_transaction.
- * @return null
*/
protected function doStompTransaction() {
- if ( !$this->getGlobal( 'EnableStomp' ) ){
- return;
- }
-
- // send the thing.
- $transaction = $this->getStompTransaction();
-
- $queue = 'default';
-
$status = $this->getFinalStatus();
switch ( $status ) {
case 'complete':
- $queue = 'default';
+ $this->pushMessage( 'complete' );
break;
case 'pending':
case 'pending-poke':
- $queue = 'pending';
+ // FIXME: I don't understand what the pending
queue does.
+ $this->pushMessage( 'pending' );
break;
default:
// No action
- $this->logger->info( "STOMP transaction has no
place to go for status $status. This is probably completely normal." );
- return;
- }
-
- try {
- WmfFramework::runHooks( 'gwStomp', array( $transaction,
$queue ) );
- } catch ( Exception $e ) {
- $this->logger->critical( "STOMP ERROR. Could not add
message to '{$queue}' queue: {$e->getMessage()} " . json_encode( $transaction )
);
- }
- }
-
-
- /**
- * Function that adds a stomp message to a special 'limbo' queue, for
data
- * that is either highly likely or completely guaranteed to be
bifurcated by
- * handing the ball to a third-party process.
- *
- * @param bool $antiMessage If TRUE message will be formatted to
destroy a message in the limbo
- * queue when the orphan slayer is run.
- *
- * @return null
- */
- protected function doLimboStompTransaction( $antiMessage = false ) {
- if ( !$this->getGlobal( 'EnableStomp' ) ){
- return;
- }
-
- $this->debugarray[] = "Attempting Limbo Stomp Transaction!";
-
- $transaction = $this->getStompTransaction( $antiMessage );
-
- try {
- WmfFramework::runHooks( 'gwStomp', array( $transaction,
'limbo' ) );
- } catch ( Exception $e ) {
- $this->logger->critical( "STOMP ERROR. Could not add
message to 'limbo' queue: {$e->getMessage()} " . json_encode( $transaction ) );
+ $this->logger->info( "Not sending queue message
for status {$status}." );
}
}
/**
* Formats an array in preparation for dispatch to a STOMP queue
*
- * @param bool $antiMessage If TRUE, message will be prepared to destroy
* @param bool $recoverTimestamp If TRUE the timestamp will be set to
any recoverable timestamp
* from the transaction. If it cannot be recovered or this argument is
false, it will take the
* current time.
*
* @return array Pass this return array to STOMP :)
+ *
+ * TODO: Stop saying "STOMP".
*/
- protected function getStompTransaction( $antiMessage = false,
$recoverTimestamp = false ) {
+ protected function getStompTransaction( $recoverTimestamp = false ) {
$transaction = array(
'gateway_txn_id' => $this->getTransactionGatewayTxnID(),
'payment_method' => $this->getData_Unstaged_Escaped(
'payment_method' ),
@@ -1890,33 +1849,29 @@
'gateway' => $this->getData_Unstaged_Escaped( 'gateway'
),
);
- if ( $antiMessage == true ) {
- // As anti-messages only exist to destroy messages all
we need is the identifier
- $transaction['antimessage'] = 'true';
- } else {
- // Else we actually need the rest of the data
- $stomp_data = array_intersect_key(
- $this->getData_Unstaged_Escaped(),
- array_flip(
$this->dataObj->getStompMessageFields() )
- );
+ // Else we actually need the rest of the data
+ $stomp_data = array_intersect_key(
+ $this->getData_Unstaged_Escaped(),
+ array_flip( $this->dataObj->getStompMessageFields() )
+ );
- // The order here is important, values in $transaction
are considered more definitive
- // in case the transaction already had keys with those
values
- $transaction = array_merge( $stomp_data, $transaction );
+ // The order here is important, values in $transaction are
considered more definitive
+ // in case the transaction already had keys with those values
+ $transaction = array_merge( $stomp_data, $transaction );
- // And now determine the date; which is annoyingly not
as easy as one would like it
- // if we're attempting to recover some data: ie: we're
an orphan
- $timestamp = null;
- if ( $recoverTimestamp === true ) {
- if ( !is_null( $this->getData_Unstaged_Escaped(
'date' ) ) ) {
- $timestamp =
$this->getData_Unstaged_Escaped( 'date' );
- } elseif ( !is_null(
$this->getData_Unstaged_Escaped( 'ts' ) ) ) {
- // That this works is mildly surprising
- $timestamp = strtotime(
$this->getData_Unstaged_Escaped( 'ts' ) );
- }
+ // And now determine the date; which is annoyingly not as easy
as one would like it
+ // if we're attempting to recover some data: ie: we're an orphan
+ // FIXME: Can't we make this the default?
+ $timestamp = null;
+ if ( $recoverTimestamp === true ) {
+ if ( !is_null( $this->getData_Unstaged_Escaped( 'date'
) ) ) {
+ $timestamp = $this->getData_Unstaged_Escaped(
'date' );
+ } elseif ( !is_null( $this->getData_Unstaged_Escaped(
'ts' ) ) ) {
+ // That this works is mildly surprising
+ $timestamp = strtotime(
$this->getData_Unstaged_Escaped( 'ts' ) );
}
- $transaction['date'] = ( $timestamp === null ) ? time()
: $timestamp;
}
+ $transaction['date'] = ( $timestamp === null ) ? time() :
$timestamp;
return $transaction;
}
@@ -2526,7 +2481,20 @@
protected function runPostProcessHooks() {
// expose a hook for any post processing
WmfFramework::runHooks( 'GatewayPostProcess', array( &$this ) );
+
$this->doStompTransaction();
+ }
+
+ protected function pushMessage( $queue ) {
+ DonationQueue::getQueue()->push( $this->buildMessage(), $queue
);
+ }
+
+ protected function setLimboMessage( $queue ) {
+ DonationQueue::getQueue()->set( $this->buildMessage(), $queue );
+ }
+
+ protected function deleteLimboMessage( $queue ) {
+ DonationQueue::getQueue()->delete( $this->getCorrelationID() );
}
/**
@@ -2634,11 +2602,7 @@
* @return type
*/
public function isBatchProcessor(){
- if (!property_exists($this, 'batch')){
- return false;
- } else {
- return $this->batch;
- }
+ return $this->batch;
}
/**
diff --git a/globalcollect_gateway/globalcollect.adapter.php
b/globalcollect_gateway/globalcollect.adapter.php
index 4b18945..af6feb3 100644
--- a/globalcollect_gateway/globalcollect.adapter.php
+++ b/globalcollect_gateway/globalcollect.adapter.php
@@ -1177,7 +1177,7 @@
$this->logger->info( $logmsg );
//add an antimessage for everything but orphans
$this->logger->info( 'Adding Antimessage' );
- $this->doLimboStompTransaction( true );
+ $this->dropLimboMessage();
} else { //this is an orphan transaction.
$is_orphan = true;
//have to change this code range: All these are usually
"pending" and
@@ -1351,8 +1351,7 @@
$this->finalizeInternalStatus(
'complete' );
//get the old status from the first
txn, and add in the part where we set the payment.
$this->setTransactionResult( "Original
Response Status (pre-SET_PAYMENT): " . $original_status_code, 'txn_message' );
- $this->runPostProcessHooks(); //stomp
is in here
- $add_antimessage = true; //TODO: use or
remove
+ $this->runPostProcessHooks(); //
Queueing is in here.
} else {
$this->finalizeInternalStatus( 'failed'
);
$problemflag = true;
@@ -1436,12 +1435,14 @@
if (isset($result['status']) &&
$result['status'] === true)
{
$this->finalizeInternalStatus(
'complete' );
- $this->doLimboStompTransaction(
true );
} else {
$this->finalizeInternalStatus(
'failed' );
//get the old status from the
first txn, and add in the part where we set the payment.
$this->setTransactionResult(
"Original Response Status (pre-SET_PAYMENT): " . $original_status_code,
'txn_message' );
}
+
+ // We won't need the limbo message
again, either way, so cancel it.
+ $this->dropLimboMessage();
}
}
}
@@ -2361,11 +2362,11 @@
*/
protected function post_process_insert_orderwithpayment(){
//yeah, we absolutely want to do this for every one of these.
- if ( $this->getTransactionStatus() === true ) {
+ if ( $this->getTransactionStatus() ) {
$data = $this->getTransactionData();
$action = $this->findCodeAction( 'GET_ORDERSTATUS',
'STATUSID', $data['STATUSID'] );
- if ($action != 'failed'){
- $this->doLimboStompTransaction();
+ if ( $action !== 'failed' ) {
+ $this->pushLimboMessage();
}
}
}
diff --git a/globalcollect_gateway/scripts/orphan_adapter.php
b/globalcollect_gateway/scripts/orphan_adapter.php
index 435bc93..48399ec 100644
--- a/globalcollect_gateway/scripts/orphan_adapter.php
+++ b/globalcollect_gateway/scripts/orphan_adapter.php
@@ -14,6 +14,7 @@
parent::__construct( $options = array ( ) );
}
+ // FIXME: Get rid of this.
public function unstage_data( $data = array( ), $final = true ) {
$unstaged = array( );
foreach ( $data as $key => $val ) {
@@ -31,6 +32,7 @@
}
}
if ( $final ) {
+ // FIXME
$this->stageData( 'response' );
}
foreach ( $unstaged as $key => $val ) {
@@ -144,57 +146,6 @@
//if we got here, we can't find anything else...
$this->logger->error( "$ctid: FAILED to find UTM Source value.
Using default." );
return $data;
- }
-
- /**
- * Copying this here because it's the fastest way to bring in an actual
timestamp.
- */
- protected function doStompTransaction() {
- if ( !$this->getGlobal( 'EnableStomp' ) ) {
- return;
- }
- $this->debugarray[] = "Attempting Stomp Transaction!";
- $hook = '';
-
- $status = $this->getFinalStatus();
- switch ( $status ) {
- case 'complete':
- $hook = 'gwStomp';
- break;
- case 'pending':
- case 'pending-poke':
- $hook = 'gwPendingStomp';
- break;
- }
- if ( $hook === '' ) {
- $this->debugarray[] = "No Stomp Hook Found for
FINAL_STATUS $status";
- return;
- }
-
- if ( !is_null( $this->getData_Unstaged_Escaped( 'date' ) ) ) {
- $timestamp = $this->getData_Unstaged_Escaped( 'date' );
- } else {
- if ( !is_null( $this->getData_Unstaged_Escaped( 'ts' )
) ) {
- $timestamp = strtotime(
$this->getData_Unstaged_Escaped( 'ts' ) ); //I hate that this works.
- } else {
- $timestamp = time();
- }
- }
-
- // send the thing.
- $transaction = array(
- 'response' => $this->getTransactionMessage(),
- 'date' => $timestamp,
- 'gateway_txn_id' => $this->getTransactionGatewayTxnID(),
- //'language' => '',
- );
- $transaction += $this->getData_Unstaged_Escaped();
-
- try {
- WmfFramework::runHooks( $hook, array( $transaction ) );
- } catch ( Exception $e ) {
- $this->logger->critical( "STOMP ERROR. Could not add
message. " . $e->getMessage() );
- }
}
/**
diff --git a/globalcollect_gateway/scripts/orphans.php
b/globalcollect_gateway/scripts/orphans.php
index 3e53492..c05910b 100644
--- a/globalcollect_gateway/scripts/orphans.php
+++ b/globalcollect_gateway/scripts/orphans.php
@@ -74,25 +74,9 @@
//Then, we go back and pull more... and that same one is in the
list again. We should stop after one try per message per execute.
//We should also be smart enough to not process things we
believe we just deleted.
$this->handled_ids = array();
-
- //first, we need to... clean up the limbo queue.
-
- //building in some redundancy here.
- $collider_keepGoing = true;
- $am_called_count = 0;
- while ( $collider_keepGoing ){
- $antimessageCount = $this->handleStompAntiMessages();
- $am_called_count += 1;
- if ( $antimessageCount < 10 ){
- $collider_keepGoing = false;
- } else {
- sleep(2); //two seconds.
- }
- }
- $this->logger->info( 'Removed ' . $this->removed_message_count
. ' messages and antimessages.' );
-
+
if ( $this->keepGoing() ){
- //Pull a batch of CC orphans, keeping in mind that
Things May Have Happened in the small slice of time since we handled the
antimessages.
+ //Pull a batch of CC orphans
$orphans = $this->getStompOrphans();
while ( count( $orphans ) && $this->keepGoing() ){
echo count( $orphans ) . " orphans left this
batch\n";
@@ -100,6 +84,7 @@
foreach ( $orphans as $correlation_id =>
$orphan ) {
//process
if ( $this->keepGoing() ){
+ // TODO: Maybe we can simplify
by checking that modified time < job start time.
echo "Attempting to rectify
orphan $correlation_id\n";
if ( $this->rectifyOrphan(
$orphan ) ){
$this->addStompCorrelationIDToAckBucket( $correlation_id );
@@ -125,9 +110,6 @@
$fe = 0;
foreach( $this->handled_ids as $id=>$whathappened ){
switch ( $whathappened ){
- case 'antimessage' :
- $am += 1;
- break;
case 'rectified' :
$rec += 1;
break;
@@ -140,7 +122,6 @@
}
}
$final = "\nDone! Final results: \n";
- $final .= " $am destroyed via antimessage (called
$am_called_count times) \n";
$final .= " $rec rectified orphans \n";
$final .= " $err errored out \n";
$final .= " $fe false orphans caught \n";
@@ -174,6 +155,7 @@
return $elapsed;
}
+ // TODO: Split into add and ack functions.
function addStompCorrelationIDToAckBucket( $correlation_id, $ackNow =
false ){
static $bucket = array();
$count = 50; //sure. Why not?
@@ -199,78 +181,39 @@
}
- function handleStompAntiMessages(){
- $selector = "antimessage = 'true' AND gateway='globalcollect'";
- $antimessages = stompFetchMessages( 'cc-limbo', $selector, 1000
);
- $count = 0;
- while ( count( $antimessages ) > 10 && $this->keepGoing() ){
//if there's an antimessage, we can ack 'em all right now.
- echo "Colliding " . count( $antimessages ) . "
antimessages\n";
- $count += count( $antimessages );
- foreach ( $antimessages as $message ){
- //add the correlation ID to the ack bucket.
- if (array_key_exists('correlation-id',
$message->headers)) {
-
$this->addStompCorrelationIDToAckBucket( $message->headers['correlation-id'] );
- } else {
- echo 'The STOMP message ' .
$message->headers['message-id'] . " has no correlation ID!\n";
- }
- }
- $this->addStompCorrelationIDToAckBucket( false, true );
//ack all outstanding.
- $antimessages = stompFetchMessages( 'cc-limbo',
$selector, 1000 );
- }
- $this->addStompCorrelationIDToAckBucket( false, true ); //this
just acks everything that's waiting for it.
- $this->logger->info( "Found $count antimessages." );
- return $count;
- }
-
/**
* Returns an array of **at most** 300 decoded orphans that we don't
think we've rectified yet.
* @return array keys are the correlation_id, and the values are the
decoded stomp message body.
*/
function getStompOrphans(){
+ // FIXME: Expiration should be set in configuration, and
enforced by
+ // the queue's native expiry anyway.
$time_buffer = 60*20; //20 minutes? Sure. Why not?
$selector = "payment_method = 'cc' AND gateway='globalcollect'";
echo "Fetching 300 Orphans\n";
$messages = stompFetchMessages( 'cc-limbo', $selector, 300 );
$orphans = array();
- $false_orphans = array();
foreach ( $messages as $message ){
- //this next block will do quite a lot of antimessage
collision
- //when the queue is not being railed.
- if ( array_key_exists('antimessage', $message->headers
) ){
- $correlation_id =
$message->headers['correlation-id'];
- $false_orphans[] = $correlation_id;
- echo "False Orphan! $correlation_id \n";
- } else {
- //legit message
- if ( !array_key_exists(
$message->headers['correlation-id'], $this->handled_ids ) ) {
- //check the timestamp to see if it's
old enough.
- $decoded = json_decode($message->body,
true);
- if ( array_key_exists( 'date', $decoded
) ){
- $elapsed = $this->start_time -
$decoded['date'];
- if ( $elapsed > $time_buffer ){
- //we got ourselves an
orphan!
- $correlation_id =
$message->headers['correlation-id'];
- $order_id =
explode('-', $correlation_id);
- $order_id =
$order_id[1];
- $decoded['order_id'] =
$order_id;
- $decoded =
unCreateQueueMessage($decoded);
- $decoded['card_num'] =
'';
-
$orphans[$correlation_id] = $decoded;
- echo "Found an orphan!
$correlation_id \n";
- }
+ if ( !array_key_exists(
$message->headers['correlation-id'], $this->handled_ids ) ) {
+ //check the timestamp to see if it's old
enough.
+ $decoded = json_decode($message->body, true);
+ if ( array_key_exists( 'date', $decoded ) ){
+ $elapsed = $this->start_time -
$decoded['date'];
+ if ( $elapsed > $time_buffer ){
+ //we got ourselves an orphan!
+ $correlation_id =
$message->headers['correlation-id'];
+ $order_id = explode('-',
$correlation_id);
+ $order_id = $order_id[1];
+ $decoded['order_id'] =
$order_id;
+ $decoded =
unCreateQueueMessage($decoded);
+ $decoded['card_num'] = '';
+ $orphans[$correlation_id] =
$decoded;
+ echo "Found an orphan!
$correlation_id \n";
}
}
}
}
-
- foreach ( $orphans as $cid => $data ){
- if ( in_array( $cid, $false_orphans ) ){
- unset( $orphans[$cid] );
- $this->addStompCorrelationIDToAckBucket( $cid );
- $this->handled_ids[ $cid ] = 'false_orphan';
- }
- }
-
+
return $orphans;
}
@@ -343,7 +286,7 @@
}
/**
- * Uses the Orphan Adapter to rectify a single orphan. Returns a
boolean letting the caller know if
+ * Uses the Orphan Adapter to rectify (complete the charge for) a
single orphan. Returns a boolean letting the caller know if
* the orphan has been fully rectified or not.
* @param array $data Some set of orphan data.
* @param boolean $query_contribution_tracking A flag specifying if we
should query the contribution_tracking table or not.
--
To view, visit https://gerrit.wikimedia.org/r/203000
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id40e497b35f5dd1e0cf148223c7b11b211ea27fc
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/DonationInterface
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits