Mwalker has submitted this change and it was merged.
Change subject: (FR #989) recurring error handling
......................................................................
(FR #989) recurring error handling
Reject or requeue bad messages.
We look up damaged messages by JMSMessageID, cos we cannot make any
assumptions about the message structure.
Change-Id: I36c166445b7e4c0e943dd62e33e9b0d66e5ceda5
---
M sites/all/modules/queue2civicrm/queue2civicrm.info
M sites/all/modules/queue2civicrm/recurring/recurring.module
M sites/all/modules/wmf_common/Queue.php
M sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info
M sites/all/modules/wmf_common/wmf_common.module
5 files changed, 71 insertions(+), 91 deletions(-)
Approvals:
Mwalker: Looks good to me, approved
jenkins-bot: Verified
diff --git a/sites/all/modules/queue2civicrm/queue2civicrm.info
b/sites/all/modules/queue2civicrm/queue2civicrm.info
index 211fff1..1eff9c2 100644
--- a/sites/all/modules/queue2civicrm/queue2civicrm.info
+++ b/sites/all/modules/queue2civicrm/queue2civicrm.info
@@ -3,7 +3,7 @@
core = 7.x
configure = admin/config/queue2civicrm
dependencies[] = exchange_rates
-dependencies[] = civicrm
+dependencies[] = wmf_civicrm
package = queue2civicrm
files[] = Queue2civicrmTrxnCounter.php
files[] = Stomp.php
diff --git a/sites/all/modules/queue2civicrm/recurring/recurring.module
b/sites/all/modules/queue2civicrm/recurring/recurring.module
index d07a218..52983be 100644
--- a/sites/all/modules/queue2civicrm/recurring/recurring.module
+++ b/sites/all/modules/queue2civicrm/recurring/recurring.module
@@ -103,7 +103,7 @@
);
if ($recurring_processed > 0) {
- watchdog('recurring', 'Successfully processed ' . $recurring_processed . '
subscription messages.');
+ watchdog('recurring', 'Processed ' . $recurring_processed . ' subscription
messages.');
}
else {
watchdog('recurring', 'No contributions processed.');
@@ -124,22 +124,20 @@
$msg_orig = $msg;
$q = queue2civicrm_stomp();
try {
- if (recurring_import($msg) === 1) {
- // only ack and remove from queue on explicit success
+ recurring_import($msg);
+ $q->ack($msg_orig);
+ } catch ( RequeueError $ex ) {
+ // We failed to process the message for some reason or another
+ // So... we queue a new message (the original plus a time delay and count
header)
+ // and ACK the original
+ $ret = $q->requeueWithDelay($msg_orig);
+
+ if ($ret) {
$q->ack($msg_orig);
} else {
- // We failed to process the message for some reason or another
- // So... we queue a new message (the original plus a time delay and
count header)
- // and ACK the original
- $ret = $q->requeueWithDelay($msg_orig);
-
- if ($ret) {
- $q->ack($msg_orig);
- } else {
- throw new WmfException("STOMP_BAD_CONNECTION", "Failed to requeue a
recurring message");
- }
+ throw new WmfException("STOMP_BAD_CONNECTION", "Failed to requeue a
recurring message");
}
- } catch (WmfException $ex) {
+ } catch ( WmfException $ex ) {
watchdog('recurring', 'something went wrong during import: ' .
$ex->getMessage(), NULL, WATCHDOG_ERROR);
if ($ex->isRejectMessage()) {
@@ -173,13 +171,7 @@
/**
* Import queued message contents to CiviCRM
*
- * Return status codes:
- * 0 = processing error
- * 1 = fully processed, ready for removal
- * 2 = not currently processable, return to queue
- *
* @param $msg
- * @return unknown_type
*/
function recurring_import( $msg ) {
global $txn_subscr_payment, $txn_subscr_acct;
@@ -226,12 +218,11 @@
watchdog( 'recurring', 'Duplicate contribution: ' . print_r( $msg, true
) );
throw new WmfException( 'DUPLICATE_CONTRIBUTION', "Contribution already
exists. Ignoring message." );
}
- $ret_val = recurring_import_subscr_payment( $msg );
+ recurring_import_subscr_payment( $msg );
} elseif ( isset( $msg[ 'txn_type' ] ) && in_array( $msg[ 'txn_type' ],
$txn_subscr_acct ) ) {
- $ret_val = recurring_import_subscr_acct( $msg );
+ recurring_import_subscr_acct( $msg );
} else {
- watchdog( 'recurring', 'Msg not recognized as a recurring payment related
message.' );
- $ret_val = 0;
+ throw new WmfException( 'INVALID_RECURRING', 'Msg not recognized as a
recurring payment related message.' );
}
// update the log
@@ -241,15 +232,12 @@
$log[ 'verified' ] = 1;
_queue2civicrm_log( $log );
}
-
- return $ret_val;
}
/**
* Import a recurring payment
*
* @param array $msg
- * @return int
*/
function recurring_import_subscr_payment( $msg ) {
/**
@@ -261,11 +249,10 @@
* otherwise, process the payment.
*/
if ( !isset( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Msg missing the subscr_id; cannot process.');
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'Msg missing the subscr_id;
cannot process.' );
} elseif ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ]
) ) { // check for parent record in civicrm_contribution_recur and fetch its id
watchdog( 'recurring', 'Msg does not have a matching recurring record in
civicrm_contribution_recur; requeueing for future processing.' );
- return 2;
+ throw new RequeueError();
}
civicrm_initialize(true);
@@ -322,8 +309,6 @@
// Send thank you email, other post-import things
module_invoke_all( 'queue2civicrm_import', $contribution_info );
-
- return 1;
}
/**
@@ -333,50 +318,43 @@
* function.
*
* @param $msg
- * @return int
*/
function recurring_import_subscr_acct( $msg ) {
switch ( $msg[ 'txn_type' ] ) {
case 'subscr_signup':
- $ret_val = recurring_import_subscr_signup( $msg );
+ recurring_import_subscr_signup( $msg );
break;
case 'subscr_cancel':
- $ret_val = recurring_import_subscr_cancel( $msg );
+ recurring_import_subscr_cancel( $msg );
break;
case 'subscr_eot':
- $ret_val = recurring_import_subscr_eot( $msg );
+ recurring_import_subscr_eot( $msg );
break;
case 'subscr_modify':
- $ret_val = recurring_import_subscr_modify( $msg );
+ recurring_import_subscr_modify( $msg );
break;
case 'subscr_failed':
- $ret_val = recurring_import_subscr_failed( $msg );
+ recurring_import_subscr_failed( $msg );
break;
default:
- watchdog( 'recurring', 'Invalid subscription message type: %msg', array(
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
- $ret_val = 0;
- break;
- }
-
- return $ret_val;
+ throw new WmfException( 'INVALID_RECURRING', 'Invalid subscription
message type' );
+ }
}
/**
* Import a subscription signup message
*
* @param $msg
- * @return int
*/
function recurring_import_subscr_signup( $msg ) {
// ensure there is not already a record of this account - if so, mark the
message as succesfuly processed
if ( $recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account already exists: %msg', array(
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
- return 1;
+ throw new WmfException( 'DUPLICATE_CONTRIBUTION', 'Subscription account
already exists' );
}
// create contact record
@@ -401,14 +379,11 @@
'create_date' => wmf_common_date_unix_to_civicrm( $msg[ 'create_date' ] ),
'trxn_id' => $msg[ 'subscr_id' ],
) )->execute();
- $dbs->pop();
-
+
if ( !$result ) {
- watchdog( 'recurring', 'Failed inserting subscriber signup for subscriber
id: %subscr_id', array( '%subscr_id' => print_r( $msg['subscr_id'], true )),
WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'IMPORT_CONTRIB', 'Failed inserting subscriber
signup for subscriber id: ' . print_r( $msg['subscr_id'], true ));
} else {
watchdog( 'recurring', 'Succesfully inserted subscription signup for
subscriber id: %subscr_id ', array( '%subscr_id' => print_r( $msg[ 'subscr_id'
], true )), WATCHDOG_NOTICE );
- return 1;
}
}
@@ -416,13 +391,11 @@
* Process a subscriber cancellation
*
* @param array $msg
- * @return int
*/
function recurring_import_subscr_cancel( $msg ) {
// ensure we have a record of the subscription
if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account does not exist: %msg', array(
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
- return 2;
+ throw new WmfException( 'INVALID_RECURRING', 'Subscription account does
not exist' );
}
$dbs = wmf_civicrm_get_dbs();
@@ -431,14 +404,11 @@
'cancel_date' => wmf_common_date_unix_to_civicrm( $msg[ 'cancel_date' ] ),
'end_date' => wmf_common_date_unix_to_civicrm( $msg[ 'cancel_date' ] ),
) )->condition( 'trxn_id', $msg[ 'subscr_id' ] )->execute();
- $dbs->pop();
-
+
if ( !$result ) {
- watchdog( 'recurring', 'There was a problem updating the subscription for
cancelation for subscriber id: %subscr_id', array( '%subscr_id' => print_r(
$msg[ 'subscr_id' ], true )), WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating
the subscription for cancelation for subscriber id: ' . print_r( $msg[
'subscr_id' ], true ));
} else {
- watchdog( 'recurring', 'Succesfuly cacneled subscription for subscriber id
%subscr_id', array( '%subscr_id' => print_r( $msg[ 'subscr_id' ], true )),
WATCHDOG_NOTICE );
- return 1;
+ watchdog( 'recurring', 'Succesfuly cancelled subscription for subscriber
id %subscr_id', array( '%subscr_id' => print_r( $msg[ 'subscr_id' ], true )),
WATCHDOG_NOTICE );
}
}
@@ -446,13 +416,11 @@
* Process an expired subscription
*
* @param array $msg
- * @return int
*/
function recurring_import_subscr_eot( $msg ) {
// ensure we have a record of the subscription
if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account does not exist: %msg', array(
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
- return 2;
+ throw new WmfException( 'INVALID_RECURRING', 'Subscription account does
not exist' );
}
$dbs = wmf_civicrm_get_dbs();
@@ -463,11 +431,9 @@
$dbs->pop();
if ( !$result ) {
- watchdog( 'recurring', 'There was a problem updating the subscription for
EOT for subscription id: %subscr_id', array( '%subscr_id' => print_r( $msg[
'subscr_id' ], true )), WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating
the subscription for EOT for subscription id: %subscr_id' . print_r( $msg[
'subscr_id' ], true ));
} else {
watchdog( 'recurring', 'Succesfuly ended subscription for subscriber id:
%subscr_id ', array( '%subscr_id' => print_r( $msg[ 'subscr_id' ], true )),
WATCHDOG_NOTICE );
- return 1;
}
}
@@ -478,13 +444,11 @@
* unused.
*
* @param array $msg
- * @return int
*/
function recurring_import_subscr_modify( $msg ) {
// ensure we have a record of the subscription
if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account does not exist for
subscription id: %subscr_id', array( '%subscr_id' => print_r( $msg, true )),
WATCHDOG_NOTICE );
- return 2;
+ throw new WmfException( 'INVALID_RECURRING', 'Subscription account does
not exist for subscription id: ' . print_r( $msg, true ));
}
$dbs = wmf_civicrm_get_dbs();
@@ -500,8 +464,7 @@
$dbs->pop();
if ( !$result ) {
- watchdog( 'recurring', 'There was a problem updating the subscription
record for subscription id ', print_r( $msg['subscr_id'], true),
WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating
the subscription record for subscription id ' . print_r( $msg['subscr_id'],
true ) );
}
// update the contact
@@ -514,19 +477,16 @@
$tag = wmf_civicrm_tag_contact_for_review( $contact );
watchdog( 'recurring', 'Subscription succesfully modified for subscription
id: %subscr_id', array( '%subscr_id' => print_r( $msg['subscr_id'], true )),
WATCHDOG_NOTICE );
- return 1;
}
/**
* Process failed subscription payment
* @param $msg
- * @return unknown_type
*/
function recurring_import_subscr_failed( $msg ) {
// ensure we have a record of the subscription
if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account does not exist for
subscription: %subscription', array( "%subscription" => print_r( $msg, true )),
WATCHDOG_NOTICE );
- return 2;
+ throw new WmfException( 'INVALID_RECURRING', 'Subscription account does
not exist for subscription id: ' . print_r( $msg['subscr_id'], true ) );
}
$dbs = wmf_civicrm_get_dbs();
@@ -535,14 +495,11 @@
'failure_count' => $msg[ 'failure_count' ],
'failure_retry_date' => wmf_common_date_unix_to_civicrm( $msg[
'failure_retry_date' ] ),
) )->condition( 'trxn_id', $msg[ 'subscr_id' ] )->execute();
- $dbs->pop();
-
+
if ( !$result ) {
- watchdog( 'recurring', 'There was a problem updating the subscription for
failed payment for subscriber id: %subscr_id ', array( '%subscr_id' => print_r(
$msg[ 'subscr_id' ], true )), WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating
the subscription for failed payment for subscriber id: ' . print_r(
$msg['subscr_id'], true ) );
} else {
watchdog( 'recurring', 'Succesfuly cacneled subscription for failed
payment for subscriber id: %subscr_id ', array( '%subscr_id' => print_r( $msg[
'subscr_id' ], true )), WATCHDOG_NOTICE );
- return 1;
}
}
@@ -591,7 +548,7 @@
if ( isset( $msg['frequency_unit'] ) ) {
if ( !in_array( $msg['frequency_unit'], array( 'day', 'week', 'month',
'year' ) ) ) {
- throw new WmfException("INVALID_MESSAGE", "Bad frequency unit:
{$msg['frequency_unit']}");
+ throw new WmfException("INVALID_RECURRING", "Bad frequency unit:
{$msg['frequency_unit']}");
}
}
@@ -810,3 +767,6 @@
return null;
}
}
+
+class RequeueError extends Exception {
+}
diff --git a/sites/all/modules/wmf_common/Queue.php
b/sites/all/modules/wmf_common/Queue.php
index 835a7dd..862a6cb 100644
--- a/sites/all/modules/wmf_common/Queue.php
+++ b/sites/all/modules/wmf_common/Queue.php
@@ -97,6 +97,17 @@
return $con->readFrame();
}
+ function getByAnyId( $queue, $correlationId ) {
+ $con = $this->getFreshConnection();
+ $properties = array(
+ 'ack' => 'client',
+ 'selector' => "JMSCorrelationID='{$correlationId}' OR
JMSMessageID='{$correlationId}'",
+ );
+ $con->subscribe( $this->normalizeQueueName( $queue ), $properties );
+
+ return $con->readFrame();
+ }
+
static function getCorrelationId( $msg ) {
if ( !empty( $msg->headers['correlation-id'] ) ) {
return $msg->headers['correlation-id'];
@@ -104,6 +115,9 @@
$body = json_decode( $msg->body, TRUE );
if ( !empty( $body['gateway'] ) && !empty( $body['gateway_txn_id'] ) )
{
return "{$body['gateway']}-{$body['gateway_txn_id']}";
+ }
+ if ( !empty( $msg->headers['message-id'] ) ) {
+ return $msg->headers['message-id'];
}
watchdog( 'wmf_common', 'Could not create a correlation-id for
message: ' . $msg->body, NULL, WATCHDOG_WARNING );
@@ -269,6 +283,7 @@
$queue = $this->normalizeQueueName( $msg->headers['destination'] );
watchdog( 'wmf_common', 'Attempting to move a message to ' . $queue,
NULL, WATCHDOG_INFO );
+ watchdog( 'wmf_common', "Requeuing under correlation-id
{$msg->headers['correlation-id']}", NULL, WATCHDOG_INFO );
if ( !$this->enqueue( json_encode( $new_body ), $msg->headers, $queue
) ) {
$exMsg = 'Failed to inject rejected message into $queue! ' .
json_encode( $msg );
watchdog( 'wmf_common', $exMsg, NULL, WATCHDOG_ERROR );
diff --git a/sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info
b/sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info
index ade8bbc..62d1b00 100644
--- a/sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info
+++ b/sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info
@@ -5,4 +5,5 @@
dependencies[] = civicrm
dependencies[] = exchange_rates
dependencies[] = contribution_tracking
+dependencies[] = wmf_common
files[] = db_switcher.inc
diff --git a/sites/all/modules/wmf_common/wmf_common.module
b/sites/all/modules/wmf_common/wmf_common.module
index 037502f..808f707 100644
--- a/sites/all/modules/wmf_common/wmf_common.module
+++ b/sites/all/modules/wmf_common/wmf_common.module
@@ -126,12 +126,12 @@
'#title' => t('Twig location'),
'#default_value' => $twig_path,
'#required' => TRUE,
- '#description' => "{$twig_msg} " . t('Filesystem path of the Twig
library. Try: <code>svn co
http://svn.wikimedia.org/svnroot/wikimedia/vendors/twig</code>'),
+ '#description' => "{$twig_msg} " . t('Filesystem path of the Twig
library. Try: <code>git clone
https://gerrit.wikimedia.org/r/wikimedia/fundraising/twig</code>'),
);
$phpmailer_path = variable_get('wmf_common_phpmailer_location',
drupal_get_path('module', 'wmf_common') . DIRECTORY_SEPARATOR . "PHPMailer");
$phpmailer_installed = file_exists(implode(DIRECTORY_SEPARATOR,
array($phpmailer_path, 'class.phpmailer.php')));
- $phpmailer_msg = $phpmailer_installed ? '<span
style="color:green;">Installed Correctly</span>' : '<span
style="color:red;">Please install. Try:
<code>https://code.google.com/a/apache-extras.org/p/phpmailer/downloads/detail?name=PHPMailer_5.2.2.tgz</code></span>';
+ $phpmailer_msg = $phpmailer_installed ? '<span
style="color:green;">Installed Correctly</span>' : '<span
style="color:red;">Please install. Try: <code>git clone
https://gerrit.wikimedia.org/r/wikimedia/fundraising/phpmailer</code></span>';
$form['wmf_common_phpmailer_location'] = array(
'#type' => 'textfield',
@@ -175,10 +175,14 @@
function wmf_common_queue_item_form( $form, &$form_state, $queue,
$correlationId ) {
$q = queue2civicrm_stomp();
- $queue = preg_replace( '/[^-a-z]/', '', $queue );
- $correlationId = preg_replace( '/[^-a-zA-Z0-9]/', '', $correlationId );
+ $queue = preg_replace( '/[^-_a-z]/', '', $queue );
+ $correlationId = preg_replace( '/[^-a-zA-Z0-9:]/', '', $correlationId );
$msg = $q->getByCorrelationId( $queue, $correlationId );
+
+ if ( !$msg ) {
+ $msg = $q->getByAnyId( $queue, $correlationId );
+ }
if ( !$msg ) {
drupal_set_message( t( 'Message %id not found.', array( '%id' =>
$correlationId ) ) );
@@ -232,8 +236,8 @@
}
function wmf_common_queue_item_submit( $form, &$form_state ) {
- $queue = preg_replace( '/[^-a-z]|-damaged$/', '',
$form_state['build_info']['args'][0] );
- $correlationId = preg_replace( '/[^-a-z0-9]/', '',
$form_state['build_info']['args'][1] );
+ $queue = preg_replace( '/[^-_a-z]|-damaged$/', '',
$form_state['build_info']['args'][0] );
+ $correlationId = preg_replace( '/[^-a-zA-Z0-9:]/', '',
$form_state['build_info']['args'][1] );
$headers = array(
'correlation-id' => $correlationId,
@@ -251,14 +255,14 @@
case $form_state['values']['resend']:
$q->enqueue( json_encode( $body ), $headers, $queue );
- $msg = $q->getByCorrelationId( "{$queue}-damaged", $correlationId );
+ $msg = $q->getByAnyId( "{$queue}-damaged", $correlationId );
$q->ack( $msg );
drupal_set_message( t( 'Message %id resent for processing.', array( '%id'
=> $correlationId ) ) );
$form_state['redirect'] = "queue/{$queue}/{$correlationId}";
break;
case $form_state['values']['delete']:
- $msg = $q->getByCorrelationId( "{$queue}-damaged", $correlationId );
+ $msg = $q->getByAnyId( "{$queue}-damaged", $correlationId );
$q->ack( $msg );
drupal_set_message( t( 'Successfully removed message %id.', array( '%id'
=> $correlationId ) ) );
$form_state['redirect'] = "queue/{$queue}-damaged/";
--
To view, visit https://gerrit.wikimedia.org/r/91333
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I36c166445b7e4c0e943dd62e33e9b0d66e5ceda5
Gerrit-PatchSet: 2
Gerrit-Project: wikimedia/fundraising/crm
Gerrit-Branch: master
Gerrit-Owner: Adamw <[email protected]>
Gerrit-Reviewer: Katie Horn <[email protected]>
Gerrit-Reviewer: Mwalker <[email protected]>
Gerrit-Reviewer: jenkins-bot
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits