Adamw has uploaded a new change for review.
https://gerrit.wikimedia.org/r/91333
Change subject: (FR #989) recurring error handling
......................................................................
(FR #989) recurring error handling
Reject or requeue bad messages
Introduces a new convention, where rejected messages can be looked up by bare
message-id.
TODO: support message-id lookup from damgaged queue item UI
Change-Id: I36c166445b7e4c0e943dd62e33e9b0d66e5ceda5
---
M sites/all/modules/queue2civicrm/recurring/recurring.module
M sites/all/modules/wmf_common/Queue.php
2 files changed, 45 insertions(+), 81 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/crm
refs/changes/33/91333/1
diff --git a/sites/all/modules/queue2civicrm/recurring/recurring.module
b/sites/all/modules/queue2civicrm/recurring/recurring.module
index d07a218..5b4d793 100644
--- a/sites/all/modules/queue2civicrm/recurring/recurring.module
+++ b/sites/all/modules/queue2civicrm/recurring/recurring.module
@@ -124,22 +124,20 @@
$msg_orig = $msg;
$q = queue2civicrm_stomp();
try {
- if (recurring_import($msg) === 1) {
- // only ack and remove from queue on explicit success
+ recurring_import($msg);
+ $q->ack($msg_orig);
+ } catch ( RequeueError $ex ) {
+ // We failed to process the message for some reason or another
+ // So... we queue a new message (the original plus a time delay and count
header)
+ // and ACK the original
+ $ret = $q->requeueWithDelay($msg_orig);
+
+ if ($ret) {
$q->ack($msg_orig);
} else {
- // We failed to process the message for some reason or another
- // So... we queue a new message (the original plus a time delay and
count header)
- // and ACK the original
- $ret = $q->requeueWithDelay($msg_orig);
-
- if ($ret) {
- $q->ack($msg_orig);
- } else {
- throw new WmfException("STOMP_BAD_CONNECTION", "Failed to requeue a
recurring message");
- }
+ throw new WmfException("STOMP_BAD_CONNECTION", "Failed to requeue a
recurring message");
}
- } catch (WmfException $ex) {
+ } catch ( WmfException $ex ) {
watchdog('recurring', 'something went wrong during import: ' .
$ex->getMessage(), NULL, WATCHDOG_ERROR);
if ($ex->isRejectMessage()) {
@@ -173,13 +171,7 @@
/**
* Import queued message contents to CiviCRM
*
- * Return status codes:
- * 0 = processing error
- * 1 = fully processed, ready for removal
- * 2 = not currently processable, return to queue
- *
* @param $msg
- * @return unknown_type
*/
function recurring_import( $msg ) {
global $txn_subscr_payment, $txn_subscr_acct;
@@ -226,12 +218,11 @@
watchdog( 'recurring', 'Duplicate contribution: ' . print_r( $msg, true
) );
throw new WmfException( 'DUPLICATE_CONTRIBUTION', "Contribution already
exists. Ignoring message." );
}
- $ret_val = recurring_import_subscr_payment( $msg );
+ recurring_import_subscr_payment( $msg );
} elseif ( isset( $msg[ 'txn_type' ] ) && in_array( $msg[ 'txn_type' ],
$txn_subscr_acct ) ) {
- $ret_val = recurring_import_subscr_acct( $msg );
+ recurring_import_subscr_acct( $msg );
} else {
- watchdog( 'recurring', 'Msg not recognized as a recurring payment related
message.' );
- $ret_val = 0;
+ throw new WmfException( 'INVALID_RECURRING', 'Msg not recognized as a
recurring payment related message.' );
}
// update the log
@@ -241,15 +232,12 @@
$log[ 'verified' ] = 1;
_queue2civicrm_log( $log );
}
-
- return $ret_val;
}
/**
* Import a recurring payment
*
* @param array $msg
- * @return int
*/
function recurring_import_subscr_payment( $msg ) {
/**
@@ -261,11 +249,10 @@
* otherwise, process the payment.
*/
if ( !isset( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Msg missing the subscr_id; cannot process.');
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'Msg missing the subscr_id;
cannot process.' );
} elseif ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ]
) ) { // check for parent record in civicrm_contribution_recur and fetch its id
watchdog( 'recurring', 'Msg does not have a matching recurring record in
civicrm_contribution_recur; requeueing for future processing.' );
- return 2;
+ throw new RequeueError();
}
civicrm_initialize(true);
@@ -322,8 +309,6 @@
// Send thank you email, other post-import things
module_invoke_all( 'queue2civicrm_import', $contribution_info );
-
- return 1;
}
/**
@@ -333,50 +318,43 @@
* function.
*
* @param $msg
- * @return int
*/
function recurring_import_subscr_acct( $msg ) {
switch ( $msg[ 'txn_type' ] ) {
case 'subscr_signup':
- $ret_val = recurring_import_subscr_signup( $msg );
+ recurring_import_subscr_signup( $msg );
break;
case 'subscr_cancel':
- $ret_val = recurring_import_subscr_cancel( $msg );
+ recurring_import_subscr_cancel( $msg );
break;
case 'subscr_eot':
- $ret_val = recurring_import_subscr_eot( $msg );
+ recurring_import_subscr_eot( $msg );
break;
case 'subscr_modify':
- $ret_val = recurring_import_subscr_modify( $msg );
+ recurring_import_subscr_modify( $msg );
break;
case 'subscr_failed':
- $ret_val = recurring_import_subscr_failed( $msg );
+ recurring_import_subscr_failed( $msg );
break;
default:
- watchdog( 'recurring', 'Invalid subscription message type: %msg', array(
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
- $ret_val = 0;
- break;
- }
-
- return $ret_val;
+ throw new WmfException( 'INVALID_RECURRING', 'Invalid subscription
message type' );
+ }
}
/**
* Import a subscription signup message
*
* @param $msg
- * @return int
*/
function recurring_import_subscr_signup( $msg ) {
// ensure there is not already a record of this account - if so, mark the
message as succesfuly processed
if ( $recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account already exists: %msg', array(
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
- return 1;
+ throw new WmfException( 'DUPLICATE_CONTRIBUTION', 'Subscription account
already exists' );
}
// create contact record
@@ -401,14 +379,11 @@
'create_date' => wmf_common_date_unix_to_civicrm( $msg[ 'create_date' ] ),
'trxn_id' => $msg[ 'subscr_id' ],
) )->execute();
- $dbs->pop();
-
+
if ( !$result ) {
- watchdog( 'recurring', 'Failed inserting subscriber signup for subscriber
id: %subscr_id', array( '%subscr_id' => print_r( $msg['subscr_id'], true )),
WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'IMPORT_CONTRIB', 'Failed inserting subscriber
signup for subscriber id: ' . print_r( $msg['subscr_id'], true ));
} else {
watchdog( 'recurring', 'Succesfully inserted subscription signup for
subscriber id: %subscr_id ', array( '%subscr_id' => print_r( $msg[ 'subscr_id'
], true )), WATCHDOG_NOTICE );
- return 1;
}
}
@@ -416,13 +391,11 @@
* Process a subscriber cancellation
*
* @param array $msg
- * @return int
*/
function recurring_import_subscr_cancel( $msg ) {
// ensure we have a record of the subscription
if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account does not exist: %msg', array(
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
- return 2;
+ throw new WmfException( 'INVALID_RECURRING', 'Subscription account does
not exist' );
}
$dbs = wmf_civicrm_get_dbs();
@@ -431,14 +404,11 @@
'cancel_date' => wmf_common_date_unix_to_civicrm( $msg[ 'cancel_date' ] ),
'end_date' => wmf_common_date_unix_to_civicrm( $msg[ 'cancel_date' ] ),
) )->condition( 'trxn_id', $msg[ 'subscr_id' ] )->execute();
- $dbs->pop();
-
+
if ( !$result ) {
- watchdog( 'recurring', 'There was a problem updating the subscription for
cancelation for subscriber id: %subscr_id', array( '%subscr_id' => print_r(
$msg[ 'subscr_id' ], true )), WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating
the subscription for cancelation for subscriber id: ' . print_r( $msg[
'subscr_id' ], true ));
} else {
- watchdog( 'recurring', 'Succesfuly cacneled subscription for subscriber id
%subscr_id', array( '%subscr_id' => print_r( $msg[ 'subscr_id' ], true )),
WATCHDOG_NOTICE );
- return 1;
+ watchdog( 'recurring', 'Succesfuly cancelled subscription for subscriber
id %subscr_id', array( '%subscr_id' => print_r( $msg[ 'subscr_id' ], true )),
WATCHDOG_NOTICE );
}
}
@@ -446,13 +416,11 @@
* Process an expired subscription
*
* @param array $msg
- * @return int
*/
function recurring_import_subscr_eot( $msg ) {
// ensure we have a record of the subscription
if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account does not exist: %msg', array(
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
- return 2;
+ throw new WmfException( 'INVALID_RECURRING', 'Subscription account does
not exist' );
}
$dbs = wmf_civicrm_get_dbs();
@@ -463,11 +431,9 @@
$dbs->pop();
if ( !$result ) {
- watchdog( 'recurring', 'There was a problem updating the subscription for
EOT for subscription id: %subscr_id', array( '%subscr_id' => print_r( $msg[
'subscr_id' ], true )), WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating
the subscription for EOT for subscription id: %subscr_id' . print_r( $msg[
'subscr_id' ], true ));
} else {
watchdog( 'recurring', 'Succesfuly ended subscription for subscriber id:
%subscr_id ', array( '%subscr_id' => print_r( $msg[ 'subscr_id' ], true )),
WATCHDOG_NOTICE );
- return 1;
}
}
@@ -478,13 +444,11 @@
* unused.
*
* @param array $msg
- * @return int
*/
function recurring_import_subscr_modify( $msg ) {
// ensure we have a record of the subscription
if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account does not exist for
subscription id: %subscr_id', array( '%subscr_id' => print_r( $msg, true )),
WATCHDOG_NOTICE );
- return 2;
+ throw new WmfException( 'INVALID_RECURRING', 'Subscription account does
not exist for subscription id: ' . print_r( $msg, true ));
}
$dbs = wmf_civicrm_get_dbs();
@@ -500,8 +464,7 @@
$dbs->pop();
if ( !$result ) {
- watchdog( 'recurring', 'There was a problem updating the subscription
record for subscription id ', print_r( $msg['subscr_id'], true),
WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating
the subscription record for subscription id ' . print_r( $msg['subscr_id'],
true ) );
}
// update the contact
@@ -514,19 +477,16 @@
$tag = wmf_civicrm_tag_contact_for_review( $contact );
watchdog( 'recurring', 'Subscription succesfully modified for subscription
id: %subscr_id', array( '%subscr_id' => print_r( $msg['subscr_id'], true )),
WATCHDOG_NOTICE );
- return 1;
}
/**
* Process failed subscription payment
* @param $msg
- * @return unknown_type
*/
function recurring_import_subscr_failed( $msg ) {
// ensure we have a record of the subscription
if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
- watchdog( 'recurring', 'Subscription account does not exist for
subscription: %subscription', array( "%subscription" => print_r( $msg, true )),
WATCHDOG_NOTICE );
- return 2;
+ throw new WmfException( 'INVALID_RECURRING', 'Subscription account does
not exist for subscription id: ' . print_r( $msg['subscr_id'], true ) );
}
$dbs = wmf_civicrm_get_dbs();
@@ -535,14 +495,11 @@
'failure_count' => $msg[ 'failure_count' ],
'failure_retry_date' => wmf_common_date_unix_to_civicrm( $msg[
'failure_retry_date' ] ),
) )->condition( 'trxn_id', $msg[ 'subscr_id' ] )->execute();
- $dbs->pop();
-
+
if ( !$result ) {
- watchdog( 'recurring', 'There was a problem updating the subscription for
failed payment for subscriber id: %subscr_id ', array( '%subscr_id' => print_r(
$msg[ 'subscr_id' ], true )), WATCHDOG_NOTICE );
- return 0;
+ throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating
the subscription for failed payment for subscriber id: ' . print_r(
$msg['subscr_id'], true ) );
} else {
watchdog( 'recurring', 'Succesfuly cacneled subscription for failed
payment for subscriber id: %subscr_id ', array( '%subscr_id' => print_r( $msg[
'subscr_id' ], true )), WATCHDOG_NOTICE );
- return 1;
}
}
@@ -591,7 +548,7 @@
if ( isset( $msg['frequency_unit'] ) ) {
if ( !in_array( $msg['frequency_unit'], array( 'day', 'week', 'month',
'year' ) ) ) {
- throw new WmfException("INVALID_MESSAGE", "Bad frequency unit:
{$msg['frequency_unit']}");
+ throw new WmfException("INVALID_RECURRING", "Bad frequency unit:
{$msg['frequency_unit']}");
}
}
@@ -810,3 +767,6 @@
return null;
}
}
+
+class RequeueError extends Exception {
+}
diff --git a/sites/all/modules/wmf_common/Queue.php
b/sites/all/modules/wmf_common/Queue.php
index 835a7dd..8ce96d2 100644
--- a/sites/all/modules/wmf_common/Queue.php
+++ b/sites/all/modules/wmf_common/Queue.php
@@ -105,6 +105,9 @@
if ( !empty( $body['gateway'] ) && !empty( $body['gateway_txn_id'] ) )
{
return "{$body['gateway']}-{$body['gateway_txn_id']}";
}
+ if ( !empty( $msg->headers['message-id'] ) ) {
+ return $msg->headers['message-id'];
+ }
watchdog( 'wmf_common', 'Could not create a correlation-id for
message: ' . $msg->body, NULL, WATCHDOG_WARNING );
return '';
@@ -269,6 +272,7 @@
$queue = $this->normalizeQueueName( $msg->headers['destination'] );
watchdog( 'wmf_common', 'Attempting to move a message to ' . $queue,
NULL, WATCHDOG_INFO );
+ watchdog( 'wmf_common', "Requeuing under correlation-id
{$msg->headers['correlation-id']}", NULL, WATCHDOG_INFO );
if ( !$this->enqueue( json_encode( $new_body ), $msg->headers, $queue
) ) {
$exMsg = 'Failed to inject rejected message into $queue! ' .
json_encode( $msg );
watchdog( 'wmf_common', $exMsg, NULL, WATCHDOG_ERROR );
--
To view, visit https://gerrit.wikimedia.org/r/91333
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I36c166445b7e4c0e943dd62e33e9b0d66e5ceda5
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/crm
Gerrit-Branch: master
Gerrit-Owner: Adamw <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits