Mwalker has submitted this change and it was merged.

Change subject: (FR #989) recurring error handling
......................................................................


(FR #989) recurring error handling

Reject or requeue bad messages.

We look up damaged messages by JMSMessageID, cos we cannot make any
assumptions about the message structure.

Change-Id: I36c166445b7e4c0e943dd62e33e9b0d66e5ceda5
---
M sites/all/modules/queue2civicrm/queue2civicrm.info
M sites/all/modules/queue2civicrm/recurring/recurring.module
M sites/all/modules/wmf_common/Queue.php
M sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info
M sites/all/modules/wmf_common/wmf_common.module
5 files changed, 71 insertions(+), 91 deletions(-)

Approvals:
  Mwalker: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/sites/all/modules/queue2civicrm/queue2civicrm.info 
b/sites/all/modules/queue2civicrm/queue2civicrm.info
index 211fff1..1eff9c2 100644
--- a/sites/all/modules/queue2civicrm/queue2civicrm.info
+++ b/sites/all/modules/queue2civicrm/queue2civicrm.info
@@ -3,7 +3,7 @@
 core = 7.x
 configure = admin/config/queue2civicrm
 dependencies[] = exchange_rates
-dependencies[] = civicrm
+dependencies[] = wmf_civicrm
 package = queue2civicrm
 files[] = Queue2civicrmTrxnCounter.php
 files[] = Stomp.php
diff --git a/sites/all/modules/queue2civicrm/recurring/recurring.module 
b/sites/all/modules/queue2civicrm/recurring/recurring.module
index d07a218..52983be 100644
--- a/sites/all/modules/queue2civicrm/recurring/recurring.module
+++ b/sites/all/modules/queue2civicrm/recurring/recurring.module
@@ -103,7 +103,7 @@
   );
 
   if ($recurring_processed > 0) {
-    watchdog('recurring', 'Successfully processed ' . $recurring_processed . ' 
subscription messages.');
+    watchdog('recurring', 'Processed ' . $recurring_processed . ' subscription 
messages.');
   }
   else {
     watchdog('recurring', 'No contributions processed.');
@@ -124,22 +124,20 @@
   $msg_orig = $msg;
   $q = queue2civicrm_stomp();
   try {
-    if (recurring_import($msg) === 1) {
-      // only ack and remove from queue on explicit success
+    recurring_import($msg);
+    $q->ack($msg_orig);
+  } catch ( RequeueError $ex ) {
+    // We failed to process the message for some reason or another
+    // So... we queue a new message (the original plus a time delay and count 
header)
+    // and ACK the original
+    $ret = $q->requeueWithDelay($msg_orig);
+
+    if ($ret) {
       $q->ack($msg_orig);
     } else {
-      // We failed to process the message for some reason or another
-      // So... we queue a new message (the original plus a time delay and 
count header)
-      // and ACK the original
-      $ret = $q->requeueWithDelay($msg_orig);
-
-      if ($ret) {
-        $q->ack($msg_orig);
-      } else {
-        throw new WmfException("STOMP_BAD_CONNECTION", "Failed to requeue a 
recurring message");
-      }
+      throw new WmfException("STOMP_BAD_CONNECTION", "Failed to requeue a 
recurring message");
     }
-  } catch (WmfException $ex) {
+  } catch ( WmfException $ex ) {
     watchdog('recurring', 'something went wrong during import: ' . 
$ex->getMessage(), NULL, WATCHDOG_ERROR);
 
     if ($ex->isRejectMessage()) {
@@ -173,13 +171,7 @@
 /**
  * Import queued message contents to CiviCRM
  * 
- * Return status codes:
- *  0 = processing error
- *     1 = fully processed, ready for removal
- *     2 = not currently processable, return to queue
- * 
  * @param $msg
- * @return unknown_type
  */
 function recurring_import( $msg ) {
   global $txn_subscr_payment, $txn_subscr_acct;
@@ -226,12 +218,11 @@
       watchdog( 'recurring', 'Duplicate contribution: ' . print_r( $msg, true 
) );
       throw new WmfException( 'DUPLICATE_CONTRIBUTION', "Contribution already 
exists. Ignoring message." );
     }  
-    $ret_val = recurring_import_subscr_payment( $msg );
+    recurring_import_subscr_payment( $msg );
   } elseif ( isset( $msg[ 'txn_type' ] ) && in_array( $msg[ 'txn_type' ], 
$txn_subscr_acct ) ) {
-      $ret_val = recurring_import_subscr_acct( $msg );
+    recurring_import_subscr_acct( $msg );
   } else {
-    watchdog( 'recurring', 'Msg not recognized as a recurring payment related 
message.' );
-    $ret_val = 0;
+    throw new WmfException( 'INVALID_RECURRING', 'Msg not recognized as a 
recurring payment related message.' );
   }
   
   // update the log
@@ -241,15 +232,12 @@
     $log[ 'verified' ] = 1;
     _queue2civicrm_log( $log );
   }
-  
-  return $ret_val;
 }
 
 /**
  * Import a recurring payment
  * 
  * @param array $msg
- * @return int
  */
 function recurring_import_subscr_payment( $msg ) {
   /**
@@ -261,11 +249,10 @@
    * otherwise, process the payment.
    */
   if ( !isset( $msg[ 'subscr_id' ] ) ) {
-    watchdog( 'recurring', 'Msg missing the subscr_id; cannot process.');
-    return 0;
+    throw new WmfException( 'INVALID_RECURRING', 'Msg missing the subscr_id; 
cannot process.' );
   } elseif ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] 
) ) { // check for parent record in civicrm_contribution_recur and fetch its id
     watchdog( 'recurring', 'Msg does not have a matching recurring record in 
civicrm_contribution_recur; requeueing for future processing.' );
-    return 2;
+    throw new RequeueError();
   }
 
   civicrm_initialize(true);
@@ -322,8 +309,6 @@
   
   // Send thank you email, other post-import things
   module_invoke_all( 'queue2civicrm_import', $contribution_info );
-  
-  return 1;  
 }
 
 /**
@@ -333,50 +318,43 @@
  * function.
  * 
  * @param $msg
- * @return int
  */
 function recurring_import_subscr_acct( $msg ) {
   switch ( $msg[ 'txn_type' ] ) {
     case 'subscr_signup':
-      $ret_val = recurring_import_subscr_signup( $msg );
+      recurring_import_subscr_signup( $msg );
       break;
       
     case 'subscr_cancel':
-      $ret_val = recurring_import_subscr_cancel( $msg );
+      recurring_import_subscr_cancel( $msg );
       break;
       
     case 'subscr_eot':
-      $ret_val = recurring_import_subscr_eot( $msg );
+      recurring_import_subscr_eot( $msg );
       break;
       
     case 'subscr_modify':
-      $ret_val = recurring_import_subscr_modify( $msg );
+      recurring_import_subscr_modify( $msg );
       break;
       
     case 'subscr_failed':
-      $ret_val = recurring_import_subscr_failed( $msg );
+      recurring_import_subscr_failed( $msg );
       break;
      
     default:
-      watchdog( 'recurring', 'Invalid subscription message type: %msg', array( 
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
-      $ret_val = 0;
-      break;
-  }  
-  
-  return $ret_val;
+      throw new WmfException( 'INVALID_RECURRING', 'Invalid subscription 
message type' );
+  }
 }
 
 /**
  * Import a subscription signup message
  * 
  * @param $msg
- * @return int
  */
 function recurring_import_subscr_signup( $msg ) {
   // ensure there is not already a record of this account - if so, mark the 
message as succesfuly processed
   if ( $recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
-    watchdog( 'recurring', 'Subscription account already exists: %msg', array( 
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
-    return 1;
+    throw new WmfException( 'DUPLICATE_CONTRIBUTION', 'Subscription account 
already exists' );
   }
   
   // create contact record
@@ -401,14 +379,11 @@
     'create_date' => wmf_common_date_unix_to_civicrm( $msg[ 'create_date' ] ),
     'trxn_id' => $msg[ 'subscr_id' ],
   ) )->execute();
-  $dbs->pop(); 
-   
+
   if ( !$result ) {
-    watchdog( 'recurring', 'Failed inserting subscriber signup for subscriber 
id: %subscr_id', array( '%subscr_id' => print_r( $msg['subscr_id'], true )), 
WATCHDOG_NOTICE );
-    return 0;
+    throw new WmfException( 'IMPORT_CONTRIB', 'Failed inserting subscriber 
signup for subscriber id: ' . print_r( $msg['subscr_id'], true ));
   } else {
     watchdog( 'recurring', 'Succesfully inserted subscription signup for 
subscriber id: %subscr_id ', array( '%subscr_id' => print_r( $msg[ 'subscr_id' 
], true )), WATCHDOG_NOTICE );
-    return 1;
   }
 }
 
@@ -416,13 +391,11 @@
  * Process a subscriber cancellation
  * 
  * @param array $msg
- * @return int
  */
 function recurring_import_subscr_cancel( $msg ) {
   // ensure we have a record of the subscription
   if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
-    watchdog( 'recurring', 'Subscription account does not exist: %msg', array( 
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
-    return 2;
+    throw new WmfException( 'INVALID_RECURRING', 'Subscription account does 
not exist' );
   }
   
   $dbs = wmf_civicrm_get_dbs();
@@ -431,14 +404,11 @@
     'cancel_date' => wmf_common_date_unix_to_civicrm( $msg[ 'cancel_date' ] ), 
     'end_date' => wmf_common_date_unix_to_civicrm( $msg[ 'cancel_date' ] ), 
   ) )->condition( 'trxn_id', $msg[ 'subscr_id' ] )->execute();
-  $dbs->pop();
-  
+
   if ( !$result ) {
-    watchdog( 'recurring', 'There was a problem updating the subscription for 
cancelation for subscriber id: %subscr_id', array( '%subscr_id' => print_r( 
$msg[ 'subscr_id' ], true )), WATCHDOG_NOTICE );
-    return 0;
+    throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating 
the subscription for cancelation for subscriber id: ' . print_r( $msg[ 
'subscr_id' ], true ));
   } else {
-    watchdog( 'recurring', 'Succesfuly cacneled subscription for subscriber id 
%subscr_id', array( '%subscr_id' => print_r( $msg[ 'subscr_id' ], true )), 
WATCHDOG_NOTICE );
-    return 1;
+    watchdog( 'recurring', 'Succesfuly cancelled subscription for subscriber 
id %subscr_id', array( '%subscr_id' => print_r( $msg[ 'subscr_id' ], true )), 
WATCHDOG_NOTICE );
   }
 }
 
@@ -446,13 +416,11 @@
  * Process an expired subscription
  * 
  * @param array $msg
- * @return int
  */
 function recurring_import_subscr_eot( $msg ) {
   // ensure we have a record of the subscription
   if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
-    watchdog( 'recurring', 'Subscription account does not exist: %msg', array( 
'%msg' => print_r( $msg, true )), WATCHDOG_NOTICE );
-    return 2;
+    throw new WmfException( 'INVALID_RECURRING', 'Subscription account does 
not exist' );
   }
 
   $dbs = wmf_civicrm_get_dbs();
@@ -463,11 +431,9 @@
   $dbs->pop();
   
   if ( !$result ) {
-    watchdog( 'recurring', 'There was a problem updating the subscription for 
EOT for subscription id: %subscr_id', array( '%subscr_id' => print_r( $msg[ 
'subscr_id' ], true )), WATCHDOG_NOTICE );
-    return 0;
+    throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating 
the subscription for EOT for subscription id: %subscr_id' . print_r( $msg[ 
'subscr_id' ], true ));
   } else {
     watchdog( 'recurring', 'Succesfuly ended subscription for subscriber id: 
%subscr_id ', array( '%subscr_id' => print_r( $msg[ 'subscr_id' ], true )), 
WATCHDOG_NOTICE );
-    return 1;
   }
 }
 
@@ -478,13 +444,11 @@
  * unused.
  * 
  * @param array $msg
- * @return int
  */
 function recurring_import_subscr_modify( $msg ) {
   // ensure we have a record of the subscription
   if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
-    watchdog( 'recurring', 'Subscription account does not exist for 
subscription id: %subscr_id', array( '%subscr_id' => print_r( $msg, true )), 
WATCHDOG_NOTICE );
-    return 2;
+    throw new WmfException( 'INVALID_RECURRING', 'Subscription account does 
not exist for subscription id: ' . print_r( $msg, true ));
   }
 
   $dbs = wmf_civicrm_get_dbs();
@@ -500,8 +464,7 @@
   $dbs->pop();
 
   if ( !$result ) {
-    watchdog( 'recurring', 'There was a problem updating the subscription 
record for subscription id ', print_r( $msg['subscr_id'], true), 
WATCHDOG_NOTICE );
-    return 0;
+    throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating 
the subscription record for subscription id ' . print_r( $msg['subscr_id'], 
true ) );
   }
    
   // update the contact
@@ -514,19 +477,16 @@
   $tag = wmf_civicrm_tag_contact_for_review( $contact );
 
   watchdog( 'recurring', 'Subscription succesfully modified for subscription 
id: %subscr_id', array( '%subscr_id' => print_r( $msg['subscr_id'], true )), 
WATCHDOG_NOTICE );
-  return 1;
 }
 
 /**
  * Process failed subscription payment
  * @param $msg
- * @return unknown_type
  */
 function recurring_import_subscr_failed( $msg ) {
   // ensure we have a record of the subscription
   if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
-    watchdog( 'recurring', 'Subscription account does not exist for 
subscription: %subscription', array( "%subscription" => print_r( $msg, true )), 
WATCHDOG_NOTICE );
-    return 2;
+    throw new WmfException( 'INVALID_RECURRING', 'Subscription account does 
not exist for subscription id: ' . print_r( $msg['subscr_id'], true ) );
   }
   
   $dbs = wmf_civicrm_get_dbs();
@@ -535,14 +495,11 @@
     'failure_count' => $msg[ 'failure_count' ],
     'failure_retry_date' => wmf_common_date_unix_to_civicrm( $msg[ 
'failure_retry_date' ] ),
   ) )->condition( 'trxn_id', $msg[ 'subscr_id' ] )->execute();
-  $dbs->pop();
-  
+
   if ( !$result ) {
-    watchdog( 'recurring', 'There was a problem updating the subscription for 
failed payment for subscriber id: %subscr_id ', array( '%subscr_id' => print_r( 
$msg[ 'subscr_id' ], true )), WATCHDOG_NOTICE );
-    return 0;
+    throw new WmfException( 'INVALID_RECURRING', 'There was a problem updating 
the subscription for failed payment for subscriber id: ' . print_r( 
$msg['subscr_id'], true ) );
   } else {
     watchdog( 'recurring', 'Succesfuly cacneled subscription for failed 
payment for subscriber id: %subscr_id ', array( '%subscr_id' => print_r( $msg[ 
'subscr_id' ], true )), WATCHDOG_NOTICE );
-    return 1;
   }
 }
 
@@ -591,7 +548,7 @@
 
   if ( isset( $msg['frequency_unit'] ) ) {
     if ( !in_array( $msg['frequency_unit'], array( 'day', 'week', 'month', 
'year' ) ) ) {
-      throw new WmfException("INVALID_MESSAGE", "Bad frequency unit: 
{$msg['frequency_unit']}");
+      throw new WmfException("INVALID_RECURRING", "Bad frequency unit: 
{$msg['frequency_unit']}");
     }
   }
 
@@ -810,3 +767,6 @@
     return null;
   }
 }
+
+class RequeueError extends Exception {
+}
diff --git a/sites/all/modules/wmf_common/Queue.php 
b/sites/all/modules/wmf_common/Queue.php
index 835a7dd..862a6cb 100644
--- a/sites/all/modules/wmf_common/Queue.php
+++ b/sites/all/modules/wmf_common/Queue.php
@@ -97,6 +97,17 @@
         return $con->readFrame();
     }
 
+    function getByAnyId( $queue, $correlationId ) {
+        $con = $this->getFreshConnection();
+        $properties = array(
+            'ack' => 'client',
+            'selector' => "JMSCorrelationID='{$correlationId}' OR 
JMSMessageID='{$correlationId}'",
+        );
+        $con->subscribe( $this->normalizeQueueName( $queue ), $properties );
+
+        return $con->readFrame();
+    }
+
     static function getCorrelationId( $msg ) {
         if ( !empty( $msg->headers['correlation-id'] ) ) {
             return $msg->headers['correlation-id'];
@@ -104,6 +115,9 @@
         $body = json_decode( $msg->body, TRUE );
         if ( !empty( $body['gateway'] ) && !empty( $body['gateway_txn_id'] ) ) 
{
             return "{$body['gateway']}-{$body['gateway_txn_id']}";
+        }
+        if ( !empty( $msg->headers['message-id'] ) ) {
+            return $msg->headers['message-id'];
         }
 
         watchdog( 'wmf_common', 'Could not create a correlation-id for 
message: ' . $msg->body, NULL, WATCHDOG_WARNING );
@@ -269,6 +283,7 @@
         $queue = $this->normalizeQueueName( $msg->headers['destination'] );
 
         watchdog( 'wmf_common', 'Attempting to move a message to ' . $queue, 
NULL, WATCHDOG_INFO );
+        watchdog( 'wmf_common', "Requeuing under correlation-id 
{$msg->headers['correlation-id']}", NULL, WATCHDOG_INFO );
         if ( !$this->enqueue( json_encode( $new_body ), $msg->headers, $queue 
) ) {
             $exMsg = 'Failed to inject rejected message into $queue! ' . 
json_encode( $msg );
             watchdog( 'wmf_common', $exMsg, NULL, WATCHDOG_ERROR );
diff --git a/sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info 
b/sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info
index ade8bbc..62d1b00 100644
--- a/sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info
+++ b/sites/all/modules/wmf_common/wmf_civicrm/wmf_civicrm.info
@@ -5,4 +5,5 @@
 dependencies[] = civicrm
 dependencies[] = exchange_rates
 dependencies[] = contribution_tracking
+dependencies[] = wmf_common
 files[] = db_switcher.inc
diff --git a/sites/all/modules/wmf_common/wmf_common.module 
b/sites/all/modules/wmf_common/wmf_common.module
index 037502f..808f707 100644
--- a/sites/all/modules/wmf_common/wmf_common.module
+++ b/sites/all/modules/wmf_common/wmf_common.module
@@ -126,12 +126,12 @@
     '#title'         => t('Twig location'),
     '#default_value' => $twig_path,
     '#required'      => TRUE,
-    '#description'   => "{$twig_msg} " . t('Filesystem path of the Twig 
library.  Try: <code>svn co 
http://svn.wikimedia.org/svnroot/wikimedia/vendors/twig</code>'),
+    '#description'   => "{$twig_msg} " . t('Filesystem path of the Twig 
library.  Try: <code>git clone 
https://gerrit.wikimedia.org/r/wikimedia/fundraising/twig</code>'),
   );
 
   $phpmailer_path = variable_get('wmf_common_phpmailer_location', 
drupal_get_path('module', 'wmf_common') . DIRECTORY_SEPARATOR . "PHPMailer");
   $phpmailer_installed = file_exists(implode(DIRECTORY_SEPARATOR, 
array($phpmailer_path, 'class.phpmailer.php')));
-  $phpmailer_msg = $phpmailer_installed ? '<span 
style="color:green;">Installed Correctly</span>' : '<span 
style="color:red;">Please install.  Try: 
<code>https://code.google.com/a/apache-extras.org/p/phpmailer/downloads/detail?name=PHPMailer_5.2.2.tgz</code></span>';
+  $phpmailer_msg = $phpmailer_installed ? '<span 
style="color:green;">Installed Correctly</span>' : '<span 
style="color:red;">Please install.  Try: <code>git clone 
https://gerrit.wikimedia.org/r/wikimedia/fundraising/phpmailer</code></span>';
 
   $form['wmf_common_phpmailer_location'] = array(
     '#type'          => 'textfield',
@@ -175,10 +175,14 @@
 function wmf_common_queue_item_form( $form, &$form_state, $queue, 
$correlationId ) {
   $q = queue2civicrm_stomp();
 
-  $queue = preg_replace( '/[^-a-z]/', '', $queue );
-  $correlationId = preg_replace( '/[^-a-zA-Z0-9]/', '', $correlationId );
+  $queue = preg_replace( '/[^-_a-z]/', '', $queue );
+  $correlationId = preg_replace( '/[^-a-zA-Z0-9:]/', '', $correlationId );
 
   $msg = $q->getByCorrelationId( $queue, $correlationId );
+
+  if ( !$msg ) {
+       $msg = $q->getByAnyId( $queue, $correlationId );
+  }
 
   if ( !$msg ) {
     drupal_set_message( t( 'Message %id not found.', array( '%id' => 
$correlationId ) ) );
@@ -232,8 +236,8 @@
 }
 
 function wmf_common_queue_item_submit( $form, &$form_state ) {
-  $queue = preg_replace( '/[^-a-z]|-damaged$/', '', 
$form_state['build_info']['args'][0] );
-  $correlationId = preg_replace( '/[^-a-z0-9]/', '', 
$form_state['build_info']['args'][1] );
+  $queue = preg_replace( '/[^-_a-z]|-damaged$/', '', 
$form_state['build_info']['args'][0] );
+  $correlationId = preg_replace( '/[^-a-zA-Z0-9:]/', '', 
$form_state['build_info']['args'][1] );
 
   $headers = array(
     'correlation-id' => $correlationId,
@@ -251,14 +255,14 @@
   case $form_state['values']['resend']:
     $q->enqueue( json_encode( $body ), $headers, $queue );
 
-    $msg = $q->getByCorrelationId( "{$queue}-damaged", $correlationId );
+    $msg = $q->getByAnyId( "{$queue}-damaged", $correlationId );
     $q->ack( $msg );
 
     drupal_set_message( t( 'Message %id resent for processing.', array( '%id' 
=> $correlationId ) ) );
     $form_state['redirect'] = "queue/{$queue}/{$correlationId}";
     break;
   case $form_state['values']['delete']:
-    $msg = $q->getByCorrelationId( "{$queue}-damaged", $correlationId );
+    $msg = $q->getByAnyId( "{$queue}-damaged", $correlationId );
     $q->ack( $msg );
     drupal_set_message( t( 'Successfully removed message %id.', array( '%id' 
=> $correlationId ) ) );
     $form_state['redirect'] = "queue/{$queue}-damaged/";

-- 
To view, visit https://gerrit.wikimedia.org/r/91333
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I36c166445b7e4c0e943dd62e33e9b0d66e5ceda5
Gerrit-PatchSet: 2
Gerrit-Project: wikimedia/fundraising/crm
Gerrit-Branch: master
Gerrit-Owner: Adamw <[email protected]>
Gerrit-Reviewer: Katie Horn <[email protected]>
Gerrit-Reviewer: Mwalker <[email protected]>
Gerrit-Reviewer: jenkins-bot

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to