Ejegg has submitted this change and it was merged.

Change subject: Move Unsubscribe Queue Consumer off ActiveMQ
......................................................................


Move Unsubscribe Queue Consumer off ActiveMQ

Move existing functionality over to the new queue.

Bug: T145419
Change-Id: I17946ad6974c2e5dc1cd3a960d87a6af7f9383f8
---
A sites/all/modules/queue2civicrm/unsubscribe/UnsubscribeQueueConsumer.php
M sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info
M sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module
3 files changed, 140 insertions(+), 116 deletions(-)

Approvals:
  Ejegg: Looks good to me, approved
  jenkins-bot: Verified



diff --git 
a/sites/all/modules/queue2civicrm/unsubscribe/UnsubscribeQueueConsumer.php 
b/sites/all/modules/queue2civicrm/unsubscribe/UnsubscribeQueueConsumer.php
new file mode 100644
index 0000000..6bd51fb
--- /dev/null
+++ b/sites/all/modules/queue2civicrm/unsubscribe/UnsubscribeQueueConsumer.php
@@ -0,0 +1,126 @@
+<?php namespace queue2civicrm\unsubscribe;
+
+use wmf_common\TransactionalWmfQueueConsumer;
+use CRM_Core_DAO;
+use WmfException;
+
+
+class UnsubscribeQueueConsumer extends TransactionalWmfQueueConsumer {
+
+       /**
+        * Processes an individual unsubscribe message. The message must 
contain the email address and
+        * the contribution ID. The contribution ID is required because, as a 
protection mechanism, we want
+        * unsubscribe emails to be single shot. Therefore we obtain the 
contact ID from the contribution ID
+        * (in case someone has gone through and de-duped contacts since the 
email was sent) and check its
+        * unsubscribe status taking one of two actions after:
+        *
+        * - If a contact has already opted out we abort and do no further 
action.
+        * - Otherwise, we opt out that contact and then do a Civi search for 
all matching emails - opting
+        *   out those contacts as well.
+        *
+        * @param array $message.
+        *
+        */
+       function processMessage( $message ) {
+
+               // Sanity checking :)
+               if ( empty( $message['email'] ) or empty( 
$message['contribution-id'] ) ) {
+                       $error = "Required field not present! Dropping message 
on floor. Message: " . json_encode( $message );
+                       throw new WmfException( 'UNSUBSCRIBE', $error );
+               }
+
+               $emails = array( strtolower( $message['email'] ) );
+               $contribId = $message['contribution-id'];
+               watchdog( 'unsubscribe', "$contribId: Acting on contribution 
ID", array(),
+                       WATCHDOG_INFO );
+
+               // Find the contact from the contribution ID
+               $contacts = $this->getEmailsFromContribution( $contribId );
+
+               if ( count( $contacts ) === 0 ) {
+                       watchdog( 'unsubscribe',
+                               "$contribId: No contacts returned for 
contribution ID. Acking frame and returning.",
+                               WATCHDOG_NOTICE );
+               } else {
+                       // Excellent -- we have a collection of emails to 
unsubscribe now! :) Check opt out status and add them to the array
+                       foreach ( $contacts as $contact ) {
+                               if ( $contact['is_opt_out'] == TRUE )
+                               {
+                                       watchdog( 'unsubscribe', "$contribId: 
Contact already opted out with this contribution ID.",
+                                               WATCHDOG_NOTICE );
+                                       continue;
+                               }
+                               $email = strtolower( $contact['email'] );
+                               if ( !in_array( $email, $emails ) ) {
+                                       $emails[] = $email;
+                               }
+                       }
+
+                       // And opt them out
+                       $count = $this->optOutEmails( $emails );
+                       watchdog( 'unsubscribe', "$contribId: Successfully 
updated $count rows." );
+               }
+       }
+
+       /**
+        * Obtains a list of arrays of (contact ID, is opt out, email address) 
for
+        * the contact specified by the given contribution.
+        *
+        * @param int $contributionId  The Civi contribution ID
+        *
+        * @return array
+        */
+       function getEmailsFromContribution( $contributionId) {
+               $query = "
+                       SELECT con.id, con.is_opt_out, e.email
+                       FROM civicrm_contribution ct, civicrm_contact con
+                       LEFT JOIN civicrm_email e
+                         ON con.id = e.contact_id
+                       WHERE ct.id = %1 AND ct.contact_id = con.id";
+
+               $dao = CRM_Core_DAO::executeQuery( $query, array(
+                       1 => array( $contributionId, 'Integer' ),
+               ) );
+
+               $out = array();
+               while ( $dao->fetch() ) {
+                       $out[] = array(
+                               'contact_id' => (int)$dao->id,
+                               'is_opt_out' => (bool)$dao->is_opt_out,
+                               'email' => $dao->email,
+                       );
+               }
+               return $out;
+       }
+
+       /**
+        * Updates the Civi database with an opt out record for the specified 
email address
+        *
+        * @param array   $emails   Email addresses to unsubscribe
+        *
+        * @returns Number of affected rows
+        */
+       function optOutEmails( $emails) {
+               $escaped = array();
+               foreach ($emails as $email) {
+                       $escaped[] = "'" . addslashes( $email ) . "'";
+               }
+               $email_condition = 'e.email IN (' . implode(', ', $escaped) . 
')';
+
+               $query = <<<EOS
+UPDATE civicrm_contact con, civicrm_email e
+    SET con.is_opt_out = 1
+    WHERE con.id = e.contact_id AND {$email_condition}
+EOS;
+
+               $dbs = wmf_civicrm_get_dbs();
+               $dbs->push( 'civicrm' );
+
+               $result = db_query( $query );
+               return $result->rowCount();
+       }
+
+}
+
+
+
diff --git 
a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info 
b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info
index f9daace..6d1de24 100644
--- a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info
+++ b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info
@@ -4,3 +4,4 @@
 package = queue2civicrm
 configure = admin/config/queue2civicrm/unsubscribe_qc
 dependencies[] = queue2civicrm
+files[] = UnsubscribeQueueConsumer.php
diff --git 
a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module 
b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module
index 502b6f4..745837f 100644
--- a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module
+++ b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module
@@ -1,5 +1,9 @@
 <?php
 
+use queue2civicrm\unsubscribe\UnsubscribeQueueConsumer;
+use SmashPig\Core\Configuration;
+use SmashPig\Core\Context;
+
 /**
  * Implements hook_menu
  */
@@ -80,16 +84,20 @@
  * @ref drush_unsubscribe_queue_consume
  */
 function unsubscribe_batch_process() {
+  $config = Configuration::createForView( 'default' );
+  Context::initWithLogger( $config, 'unsubscribe' );
   watchdog('unsubscribe', 'Executing: unsubscribe_batch_process');
 
   civicrm_initialize();
 
-  $processed = queue2civicrm_stomp()->dequeue_loop(
-    variable_get('unsubscribe_queue', '/queue/unsubscribe'),
-    variable_get('unsubscribe_batch', 0),
-    variable_get('unsubscribe_batch_time', 0),
-    'unsubscribe_process_message'
+  $consumer = new UnsubscribeQueueConsumer(
+     'unsubscribe',
+     variable_get('refund_batch_time', 0),
+     variable_get('refund_batch', 0)
   );
+
+  $processed = $consumer->dequeueMessages();
+
 
   if ($processed > 0) {
     watchdog('unsubscribe', 'Successfully processed ' . $processed . ' 
unsubscribe(s).');
@@ -97,115 +105,4 @@
   else {
     watchdog('unsubscribe', 'No unsubscribes processed.');
   }
-}
-
-/**
- * Processes an individual unsubscribe message. The message must contain the 
email address and
- * the contribution ID. The contribution ID is required because, as a 
protection mechanism, we want
- * unsubscribe emails to be single shot. Therefore we obtain the contact ID 
from the contribution ID
- * (in case someone has gone through and de-duped contacts since the email was 
sent) and check its
- * unsubscribe status taking one of two actions after:
- *
- * - If an contact has already opted out we abort and do no further action.
- * - Otherwise, we opt out that contact and then do a Civi search for all 
matching emails - opting
- *   out those contacts as well.
- *
- * @param $msg A STOMP message class.
- *
- * @return bool True if the contact was processed without errors. False if 
errors occured; the msg
- *              will still have been removed from the queue though.
- */
-function unsubscribe_process_message($msg) {
-    $txnid = $msg->headers['correlation-id'];
-    watchdog('unsubscribe', "Beginning processing of message $txnid: " . 
json_encode($msg), array(), WATCHDOG_INFO);
-
-    // Sanity checking :)
-    $body = json_decode($msg->body, TRUE);
-    if ( empty($body['email']) or empty( $body['contribution-id'] ) ) {
-      $error = "$txnid: Required field not present! Dropping message on 
floor.";
-      throw new WmfException( 'UNSUBSCRIBE', $error );
-    }
-
-    $emails = array(strtolower($body['email']));
-    $contribId = $body['contribution-id'];
-    watchdog('unsubscribe', "$txnid: Acting on contribution ID '$contribId'", 
array(), WATCHDOG_INFO);
-
-    // Find the contact from the contribution ID and check opt out status
-    $contacts = unsubscribe_get_emails_from_contribution($contribId);
-
-    if (count($contacts) === 0) {
-      watchdog('unsubscribe', "$txnid: No contacts returned for contribution 
ID $contribId. Acking frame and returning.", WATCHDOG_NOTICE);
-    } elseif ($contacts[0]['is_opt_out'] == TRUE) {
-      watchdog('unsubscribe', "$txnid: Contact already opted out with this 
contribution ID.", WATCHDOG_NOTICE);
-    } else {
-      // Excellent -- we have a collection of emails to unsubscribe now! :) 
Add them to the array
-      foreach ($contacts as $contact) {
-        $email = strtolower($contact['email']);
-        if (!in_array($email, $emails)) {
-          $emails[] = $email;
-        }
-      }
-
-      // And opt them out
-      $count = unsubscribe_opt_out_emails($emails);
-      watchdog('unsubscribe', "$txnid: Successfully updated $count rows.");
-    }
-}
-
-/**
- * Obtains a list of arrays of (contact ID, is opt out, email address) for
- * the contact specified by the given contribution.
- *
- * @param int $ctid  The Civi contribution ID
- *
- * @return array
- */
-function unsubscribe_get_emails_from_contribution($ctid) {
-  $query = "
-SELECT con.id, con.is_opt_out, e.email
-FROM civicrm_contribution ct, civicrm_contact con
-LEFT JOIN civicrm_email e
-  ON con.id = e.contact_id
-WHERE ct.id = %1 AND ct.contact_id = con.id";
-
-  $dao = CRM_Core_DAO::executeQuery( $query, array(
-    1 => array( $ctid, 'Integer' ),
-  ) );
-
-  $out = array();
-  while ( $dao->fetch() ) {
-    $out[] = array(
-      'contact_id' => (int)$dao->id,
-      'is_opt_out' => (bool)$dao->is_opt_out,
-      'email' => $dao->email,
-    );
-  }
-  return $out;
-}
-
-/**
- * Updates the Civi database with an opt out record for the specified email 
address
- *
- * @param array   $emails   Email addresses to unsubscribe
- *
- * @returns Number of effected rows
- */
-function unsubscribe_opt_out_emails($emails) {
-  $escaped = array();
-  foreach ($emails as $email) {
-    $escaped[] = "'" . addslashes( $email ) . "'";
-  }
-  $email_condition = 'e.email IN (' . implode(', ', $escaped) . ')';
-
-  $query = <<<EOS
-UPDATE civicrm_contact con, civicrm_email e
-    SET con.is_opt_out = 1
-    WHERE con.id = e.contact_id AND {$email_condition}
-EOS;
-
-  $dbs = wmf_civicrm_get_dbs();
-  $dbs->push( 'civicrm' );
-
-  $result = db_query( $query );
-  return $result->rowCount();
 }

-- 
To view, visit https://gerrit.wikimedia.org/r/312084
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I17946ad6974c2e5dc1cd3a960d87a6af7f9383f8
Gerrit-PatchSet: 11
Gerrit-Project: wikimedia/fundraising/crm
Gerrit-Branch: master
Gerrit-Owner: XenoRyet <dkozlow...@wikimedia.org>
Gerrit-Reviewer: Awight <awi...@wikimedia.org>
Gerrit-Reviewer: Cdentinger <cdentin...@wikimedia.org>
Gerrit-Reviewer: Ejegg <eeggles...@wikimedia.org>
Gerrit-Reviewer: XenoRyet <dkozlow...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to