Ejegg has submitted this change and it was merged. Change subject: Move Unsubscribe Queue Consumer off ActiveMQ ......................................................................
Move Unsubscribe Queue Consumer off ActiveMQ Move existing functionality over to the new queue. Bug: T145419 Change-Id: I17946ad6974c2e5dc1cd3a960d87a6af7f9383f8 --- A sites/all/modules/queue2civicrm/unsubscribe/UnsubscribeQueueConsumer.php M sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info M sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module 3 files changed, 140 insertions(+), 116 deletions(-) Approvals: Ejegg: Looks good to me, approved jenkins-bot: Verified diff --git a/sites/all/modules/queue2civicrm/unsubscribe/UnsubscribeQueueConsumer.php b/sites/all/modules/queue2civicrm/unsubscribe/UnsubscribeQueueConsumer.php new file mode 100644 index 0000000..6bd51fb --- /dev/null +++ b/sites/all/modules/queue2civicrm/unsubscribe/UnsubscribeQueueConsumer.php @@ -0,0 +1,126 @@ +<?php namespace queue2civicrm\unsubscribe; + +use wmf_common\TransactionalWmfQueueConsumer; +use CRM_Core_DAO; +use WmfException; + + +class UnsubscribeQueueConsumer extends TransactionalWmfQueueConsumer { + + /** + * Processes an individual unsubscribe message. The message must contain the email address and + * the contribution ID. The contribution ID is required because, as a protection mechanism, we want + * unsubscribe emails to be single shot. Therefore we obtain the contact ID from the contribution ID + * (in case someone has gone through and de-duped contacts since the email was sent) and check its + * unsubscribe status taking one of two actions after: + * + * - If a contact has already opted out we abort and do no further action. + * - Otherwise, we opt out that contact and then do a Civi search for all matching emails - opting + * out those contacts as well. + * + * @param array $message. + * + */ + function processMessage( $message ) { + + // Sanity checking :) + if ( empty( $message['email'] ) or empty( $message['contribution-id'] ) ) { + $error = "Required field not present! Dropping message on floor. Message: " . json_encode( $message ); + throw new WmfException( 'UNSUBSCRIBE', $error ); + } + + $emails = array( strtolower( $message['email'] ) ); + $contribId = $message['contribution-id']; + watchdog( 'unsubscribe', "$contribId: Acting on contribution ID", array(), + WATCHDOG_INFO ); + + // Find the contact from the contribution ID + $contacts = $this->getEmailsFromContribution( $contribId ); + + if ( count( $contacts ) === 0 ) { + watchdog( 'unsubscribe', + "$contribId: No contacts returned for contribution ID. Acking frame and returning.", + WATCHDOG_NOTICE ); + } else { + // Excellent -- we have a collection of emails to unsubscribe now! :) Check opt out status and add them to the array + foreach ( $contacts as $contact ) { + if ( $contact['is_opt_out'] == TRUE ) + { + watchdog( 'unsubscribe', "$contribId: Contact already opted out with this contribution ID.", + WATCHDOG_NOTICE ); + continue; + } + $email = strtolower( $contact['email'] ); + if ( !in_array( $email, $emails ) ) { + $emails[] = $email; + } + } + + // And opt them out + $count = $this->optOutEmails( $emails ); + watchdog( 'unsubscribe', "$contribId: Successfully updated $count rows." ); + } + } + + /** + * Obtains a list of arrays of (contact ID, is opt out, email address) for + * the contact specified by the given contribution. + * + * @param int $contributionId The Civi contribution ID + * + * @return array + */ + function getEmailsFromContribution( $contributionId) { + $query = " + SELECT con.id, con.is_opt_out, e.email + FROM civicrm_contribution ct, civicrm_contact con + LEFT JOIN civicrm_email e + ON con.id = e.contact_id + WHERE ct.id = %1 AND ct.contact_id = con.id"; + + $dao = CRM_Core_DAO::executeQuery( $query, array( + 1 => array( $contributionId, 'Integer' ), + ) ); + + $out = array(); + while ( $dao->fetch() ) { + $out[] = array( + 'contact_id' => (int)$dao->id, + 'is_opt_out' => (bool)$dao->is_opt_out, + 'email' => $dao->email, + ); + } + return $out; + } + + /** + * Updates the Civi database with an opt out record for the specified email address + * + * @param array $emails Email addresses to unsubscribe + * + * @returns Number of affected rows + */ + function optOutEmails( $emails) { + $escaped = array(); + foreach ($emails as $email) { + $escaped[] = "'" . addslashes( $email ) . "'"; + } + $email_condition = 'e.email IN (' . implode(', ', $escaped) . ')'; + + $query = <<<EOS +UPDATE civicrm_contact con, civicrm_email e + SET con.is_opt_out = 1 + WHERE con.id = e.contact_id AND {$email_condition} +EOS; + + $dbs = wmf_civicrm_get_dbs(); + $dbs->push( 'civicrm' ); + + $result = db_query( $query ); + return $result->rowCount(); + } + +} + + + diff --git a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info index f9daace..6d1de24 100644 --- a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info +++ b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.info @@ -4,3 +4,4 @@ package = queue2civicrm configure = admin/config/queue2civicrm/unsubscribe_qc dependencies[] = queue2civicrm +files[] = UnsubscribeQueueConsumer.php diff --git a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module index 502b6f4..745837f 100644 --- a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module +++ b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module @@ -1,5 +1,9 @@ <?php +use queue2civicrm\unsubscribe\UnsubscribeQueueConsumer; +use SmashPig\Core\Configuration; +use SmashPig\Core\Context; + /** * Implements hook_menu */ @@ -80,16 +84,20 @@ * @ref drush_unsubscribe_queue_consume */ function unsubscribe_batch_process() { + $config = Configuration::createForView( 'default' ); + Context::initWithLogger( $config, 'unsubscribe' ); watchdog('unsubscribe', 'Executing: unsubscribe_batch_process'); civicrm_initialize(); - $processed = queue2civicrm_stomp()->dequeue_loop( - variable_get('unsubscribe_queue', '/queue/unsubscribe'), - variable_get('unsubscribe_batch', 0), - variable_get('unsubscribe_batch_time', 0), - 'unsubscribe_process_message' + $consumer = new UnsubscribeQueueConsumer( + 'unsubscribe', + variable_get('refund_batch_time', 0), + variable_get('refund_batch', 0) ); + + $processed = $consumer->dequeueMessages(); + if ($processed > 0) { watchdog('unsubscribe', 'Successfully processed ' . $processed . ' unsubscribe(s).'); @@ -97,115 +105,4 @@ else { watchdog('unsubscribe', 'No unsubscribes processed.'); } -} - -/** - * Processes an individual unsubscribe message. The message must contain the email address and - * the contribution ID. The contribution ID is required because, as a protection mechanism, we want - * unsubscribe emails to be single shot. Therefore we obtain the contact ID from the contribution ID - * (in case someone has gone through and de-duped contacts since the email was sent) and check its - * unsubscribe status taking one of two actions after: - * - * - If an contact has already opted out we abort and do no further action. - * - Otherwise, we opt out that contact and then do a Civi search for all matching emails - opting - * out those contacts as well. - * - * @param $msg A STOMP message class. - * - * @return bool True if the contact was processed without errors. False if errors occured; the msg - * will still have been removed from the queue though. - */ -function unsubscribe_process_message($msg) { - $txnid = $msg->headers['correlation-id']; - watchdog('unsubscribe', "Beginning processing of message $txnid: " . json_encode($msg), array(), WATCHDOG_INFO); - - // Sanity checking :) - $body = json_decode($msg->body, TRUE); - if ( empty($body['email']) or empty( $body['contribution-id'] ) ) { - $error = "$txnid: Required field not present! Dropping message on floor."; - throw new WmfException( 'UNSUBSCRIBE', $error ); - } - - $emails = array(strtolower($body['email'])); - $contribId = $body['contribution-id']; - watchdog('unsubscribe', "$txnid: Acting on contribution ID '$contribId'", array(), WATCHDOG_INFO); - - // Find the contact from the contribution ID and check opt out status - $contacts = unsubscribe_get_emails_from_contribution($contribId); - - if (count($contacts) === 0) { - watchdog('unsubscribe', "$txnid: No contacts returned for contribution ID $contribId. Acking frame and returning.", WATCHDOG_NOTICE); - } elseif ($contacts[0]['is_opt_out'] == TRUE) { - watchdog('unsubscribe', "$txnid: Contact already opted out with this contribution ID.", WATCHDOG_NOTICE); - } else { - // Excellent -- we have a collection of emails to unsubscribe now! :) Add them to the array - foreach ($contacts as $contact) { - $email = strtolower($contact['email']); - if (!in_array($email, $emails)) { - $emails[] = $email; - } - } - - // And opt them out - $count = unsubscribe_opt_out_emails($emails); - watchdog('unsubscribe', "$txnid: Successfully updated $count rows."); - } -} - -/** - * Obtains a list of arrays of (contact ID, is opt out, email address) for - * the contact specified by the given contribution. - * - * @param int $ctid The Civi contribution ID - * - * @return array - */ -function unsubscribe_get_emails_from_contribution($ctid) { - $query = " -SELECT con.id, con.is_opt_out, e.email -FROM civicrm_contribution ct, civicrm_contact con -LEFT JOIN civicrm_email e - ON con.id = e.contact_id -WHERE ct.id = %1 AND ct.contact_id = con.id"; - - $dao = CRM_Core_DAO::executeQuery( $query, array( - 1 => array( $ctid, 'Integer' ), - ) ); - - $out = array(); - while ( $dao->fetch() ) { - $out[] = array( - 'contact_id' => (int)$dao->id, - 'is_opt_out' => (bool)$dao->is_opt_out, - 'email' => $dao->email, - ); - } - return $out; -} - -/** - * Updates the Civi database with an opt out record for the specified email address - * - * @param array $emails Email addresses to unsubscribe - * - * @returns Number of effected rows - */ -function unsubscribe_opt_out_emails($emails) { - $escaped = array(); - foreach ($emails as $email) { - $escaped[] = "'" . addslashes( $email ) . "'"; - } - $email_condition = 'e.email IN (' . implode(', ', $escaped) . ')'; - - $query = <<<EOS -UPDATE civicrm_contact con, civicrm_email e - SET con.is_opt_out = 1 - WHERE con.id = e.contact_id AND {$email_condition} -EOS; - - $dbs = wmf_civicrm_get_dbs(); - $dbs->push( 'civicrm' ); - - $result = db_query( $query ); - return $result->rowCount(); } -- To view, visit https://gerrit.wikimedia.org/r/312084 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I17946ad6974c2e5dc1cd3a960d87a6af7f9383f8 Gerrit-PatchSet: 11 Gerrit-Project: wikimedia/fundraising/crm Gerrit-Branch: master Gerrit-Owner: XenoRyet <dkozlow...@wikimedia.org> Gerrit-Reviewer: Awight <awi...@wikimedia.org> Gerrit-Reviewer: Cdentinger <cdentin...@wikimedia.org> Gerrit-Reviewer: Ejegg <eeggles...@wikimedia.org> Gerrit-Reviewer: XenoRyet <dkozlow...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits