Adamw has uploaded a new change for review.
https://gerrit.wikimedia.org/r/119546
Change subject: Add a job time limit to queue processing modules
......................................................................
Add a job time limit to queue processing modules
This makes it possible to guarantee that jobs do not overlap, and makes the
batch run behavior independent of system load.
The patch incidentally fixes a bug where the number of messages processed was
sometimes being incorrectly reported as zero.
TODO:
* decouple one-time and recurring jobs
Change-Id: If954138ce34aaf67a01d266ee211b0578919a4f3
---
M sites/all/modules/queue2civicrm/queue2civicrm.module
M sites/all/modules/queue2civicrm/recurring/recurring.module
M sites/all/modules/queue2civicrm/refund/wmf_refund_qc.module
M sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module
M sites/all/modules/wmf_common/Queue.php
5 files changed, 128 insertions(+), 52 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/crm
refs/changes/46/119546/1
diff --git a/sites/all/modules/queue2civicrm/queue2civicrm.module
b/sites/all/modules/queue2civicrm/queue2civicrm.module
index 068dc40..ebf06a8 100644
--- a/sites/all/modules/queue2civicrm/queue2civicrm.module
+++ b/sites/all/modules/queue2civicrm/queue2civicrm.module
@@ -90,6 +90,13 @@
function queue2civicrm_settings() {
$form = array();
+ $form['queue2civicrm_disable'] = array(
+ '#type' => 'checkbox',
+ '#title' => t('Disable job'),
+ '#description' => t('If checked, no message processing will be
performed.'),
+ '#default_value' => variable_get('queue2civicrm_disable', false),
+ );
+
$form['queue2civicrm_url'] = array(
'#type' => 'textfield',
'#title' => t('Connection URL'),
@@ -110,10 +117,9 @@
'#type' => 'select',
'#title' => t('Cron batch size'),
'#description' => t('Maximum number of donations processed by a
queue2civicrm job.'),
- '#required' => TRUE,
'#default_value' => variable_get('queue2civicrm_batch', 0),
'#options' => array(
- 0 => '0 (Disable)',
+ '' => 'Unlimited',
1 => 1,
5 => 5,
10 => 10,
@@ -136,6 +142,14 @@
650 => 650,
700 => 700,
),
+ );
+
+ $form['queue2civicrm_batch_time'] = array(
+ '#type' => 'textfield',
+ '#title' => t('Job time limit (in seconds)'),
+ '#description' => t('Maximum elapsed duration of a queue2civicrm job,
after which we will abort from the loop. This can be used to set a reliable
duty cycle for the job. Either a time limit or batch size limit is required.'),
+ '#required' => TRUE,
+ '#default_value' => variable_get('queue2civicrm_batch_time', 90),
);
$form[ 'queue2civicrm_gateways_to_monitor' ] = array(
@@ -168,9 +182,15 @@
function queue2civicrm_batch_process() {
wmf_civicrm_boost_performance();
+ if ( variable_get( 'queue2civicrm_disable', false ) ) {
+ watchdog( 'queue2civicrm', 'Job is disabled. Exiting.', NULL,
WATCHDOG_INFO );
+ return;
+ }
+
$processed = queue2civicrm_stomp()->dequeue_loop(
variable_get('queue2civicrm_subscription', '/queue/test'),
variable_get( 'queue2civicrm_batch', 0 ),
+ variable_get( 'queue2civicrm_batch_time', 0 ),
'queue2civicrm_import'
);
diff --git a/sites/all/modules/queue2civicrm/recurring/recurring.module
b/sites/all/modules/queue2civicrm/recurring/recurring.module
index 0e916d2..0f5981c 100644
--- a/sites/all/modules/queue2civicrm/recurring/recurring.module
+++ b/sites/all/modules/queue2civicrm/recurring/recurring.module
@@ -33,27 +33,19 @@
function recurring_settings() {
$form = array();
- $form[ 'recurring' ][ 'description' ] = array(
- '#type' => 'fieldset',
- '#title' => t('Recurring payments'),
- '#description' => t('The recurring queue contains notifications pertaining
to subscription donations, such as monthly payments, cancellation,and
expiration.'),
- '#collapsible' => FALSE,
- '#collapsed' => FALSE,
+ $form['recurring_description'] = array(
+ '#type' => 'markup',
+ '#markup' => t('<p>The recurring queue contains notifications pertaining
to subscription donations, such as monthly payments, cancellation,and
expiration.</p>'),
);
$form['recurring_process'] = array(
- '#type' => 'radios',
- '#title' => t('Process recurring transactions during queue consumption'),
- '#description' => t('If enabled, recurring processing will take place at
the end of the regular queue2civicrm job. If disabled, recurring notifications
must be processed by a dedicated job.'),
- '#required' => TRUE,
- '#options' => array(
- FALSE => t('No'),
- TRUE => t('Yes'),
- ),
+ '#type' => 'checkbox',
+ '#title' => t('Process recurring transactions'),
+ '#description' => t('If enabled, recurring processing will take place at
the end of the regular queue2civicrm job. If disabled, recurring notifications
will not be processed.'),
'#default_value' => variable_get('recurring_process', FALSE),
);
- $form[ 'recurring' ][ 'recurring_subscription' ] = array(
+ $form['recurring_subscription'] = array(
'#type' => 'textfield',
'#title' => t('Subscription path'),
'#required' => TRUE,
@@ -61,14 +53,13 @@
'#description' => t( 'Queue to watch for recurring notifications' ),
);
- $form[ 'recurring' ][ 'recurring_batch' ] = array(
+ $form['recurring_batch'] = array(
'#type' => 'select',
'#title' => t('Cron batch size'),
'#description' => t('Maximum number of items processed by the recurring
job'),
- '#required' => TRUE,
'#default_value' => variable_get('recurring_batch', 0),
'#options' => array(
- 0 => '0 (Disable)',
+ '' => 'Unlimited',
1 => 1,
5 => 5,
10 => 10,
@@ -82,7 +73,15 @@
150 => 150,
),
);
-
+
+ $form['recurring_batch_time'] = array(
+ '#type' => 'textfield',
+ '#title' => t('Job time limit (in seconds)'),
+ '#description' => t('Maximum elapsed duration of a recurring job, after
which we will abort from the loop. This can be used to set a reliable duty
cycle for the job. Either a time limit or batch size limit is required.'),
+ '#required' => TRUE,
+ '#default_value' => variable_get('recurring_batch_time', 90),
+ );
+
return system_settings_form( $form );
}
@@ -99,6 +98,7 @@
$recurring_processed = queue2civicrm_stomp()->dequeue_loop(
variable_get( 'recurring_subscription', '/queue/test_recurring' ),
variable_get( 'recurring_batch', 0 ),
+ variable_get( 'recurring_batch_time', 0 ),
'recurring_import'
);
diff --git a/sites/all/modules/queue2civicrm/refund/wmf_refund_qc.module
b/sites/all/modules/queue2civicrm/refund/wmf_refund_qc.module
index 0f5e008..895cfec 100644
--- a/sites/all/modules/queue2civicrm/refund/wmf_refund_qc.module
+++ b/sites/all/modules/queue2civicrm/refund/wmf_refund_qc.module
@@ -22,15 +22,19 @@
function refund_qc_settings() {
$form = array();
- $form[ 'refund_qc' ][ 'description' ] = array(
- '#type' => 'fieldset',
- '#title' => t('Refund queue'),
- '#description' => t('This queue contains refund notifications, usually
sent by the gateway to our IPN listener, or a nightly audit manifest. We honor
these notifications by refunding the associated contributions in our CRM.'),
- '#collapsible' => FALSE,
- '#collapsed' => FALSE,
+ $form['description'] = array(
+ '#type' => 'markup',
+ '#markup' => t('<p>This queue contains refund notifications, usually sent
by the gateway to our IPN listener, or a nightly audit manifest. We honor
these notifications by refunding the associated contributions in our CRM.</p>'),
);
- $form[ 'refund_qc' ][ 'refund_queue' ] = array(
+ $form['refund_disable'] = array(
+ '#type' => 'checkbox',
+ '#title' => t('Disable job'),
+ '#description' => t('If checked, no message processing will be
performed.'),
+ '#default_value' => variable_get('refund_disable', false),
+ );
+
+ $form['refund_queue'] = array(
'#type' => 'textfield',
'#title' => t('Subscription path'),
'#required' => TRUE,
@@ -38,14 +42,13 @@
'#description' => t('Queue for refund notifications'),
);
- $form[ 'refund_qc' ][ 'refund_batch' ] = array(
+ $form['refund_batch'] = array(
'#type' => 'select',
'#title' => t('Cron batch size'),
'#description' => t('Maximum number of donations processed by a refund
job'),
- '#required' => TRUE,
'#default_value' => variable_get('refund_batch', 0),
'#options' => array(
- 0 => '0 (Disable)',
+ '' => 'Unlimited',
1 => 1,
5 => 5,
10 => 10,
@@ -64,6 +67,14 @@
),
);
+ $form['refund_batch_time'] = array(
+ '#type' => 'textfield',
+ '#title' => t('Job time limit (in seconds)'),
+ '#description' => t('Maximum elapsed duration of a refund job, after which
we will abort from the loop. This can be used to set a reliable duty cycle for
the job. Either a time limit or batch size limit is required.'),
+ '#required' => TRUE,
+ '#default_value' => variable_get('refund_batch_time', 90),
+ );
+
return system_settings_form($form);
}
@@ -73,6 +84,11 @@
* @ref drush_refund_queue_consume
*/
function refund_batch_process() {
+ if ( variable_get('refund_disable', false) ) {
+ watchdog( 'wmf_refund_qc', 'Job is disabled. Exiting.', NULL,
WATCHDOG_INFO );
+ return;
+ }
+
civicrm_api_classapi(); # hah
watchdog('refund', 'Executing: refund_batch_process');
@@ -81,6 +97,7 @@
$processed = $q->dequeue_loop(
variable_get('refund_queue', '/queue/refund-notifications_test'),
variable_get('refund_batch', 0),
+ variable_get('refund_batch_time', 0),
'refund_import'
);
diff --git
a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module
b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module
index 793a69e..9e07ce0 100644
--- a/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module
+++ b/sites/all/modules/queue2civicrm/unsubscribe/wmf_unsubscribe_qc.module
@@ -22,15 +22,19 @@
function unsubscribe_qc_settings() {
$form = array();
- $form[ 'unsubscribe_qc' ][ 'description' ] = array(
- '#type' => 'fieldset',
- '#title' => t('Unsubscribe queue'),
- '#description' => t('Contacts in the unsubscribe queue are processed by
marking them as opt-out in the CRM.'),
- '#collapsible' => FALSE,
- '#collapsed' => FALSE,
+ $form['unsubscribe_qc_disable'] = array(
+ '#type' => 'checkbox',
+ '#title' => t('Disable job'),
+ '#description' => t('If checked, no message processing will be
performed.'),
+ '#default_value' => variable_get('unsubscribe_qc_disable', false),
);
- $form[ 'unsubscribe_qc' ][ 'unsubscribe_queue' ] = array(
+ $form['description'] = array(
+ '#type' => 'markup',
+ '#description' => t('<p>Contacts in the unsubscribe queue are processed by
marking them as opt-out in the CRM.</p>'),
+ );
+
+ $form['unsubscribe_queue'] = array(
'#type' => 'textfield',
'#title' => t('Subscription path'),
'#required' => TRUE,
@@ -38,14 +42,13 @@
'#description' => t('Queue for unsubscribe items'),
);
- $form[ 'unsubscribe_qc' ][ 'unsubscribe_batch' ] = array(
+ $form['unsubscribe_batch'] = array(
'#type' => 'select',
'#title' => t('Cron batch size'),
'#description' => t('Maximum number of items processed by an unsubscribe
job'),
- '#required' => TRUE,
'#default_value' => variable_get('unsubscribe_batch', 0),
'#options' => array(
- 0 => '0 (Disable)',
+ '' => 'Unlimited',
1 => 1,
5 => 5,
10 => 10,
@@ -58,6 +61,14 @@
750 => 750,
1000 => 1000
),
+ );
+
+ $form['unsubscribe_batch_time'] = array(
+ '#type' => 'textfield',
+ '#title' => t('Job time limit (in seconds)'),
+ '#description' => t('Maximum elapsed duration of an unsubscribe job, after
which we will abort from the loop. This can be used to set a reliable duty
cycle for the job. Either a time limit or batch size limit is required.'),
+ '#required' => TRUE,
+ '#default_value' => variable_get('unsubscribe_batch_time', 90),
);
return system_settings_form($form);
@@ -76,6 +87,7 @@
$processed = queue2civicrm_stomp()->dequeue_loop(
variable_get('unsubscribe_queue', '/queue/unsubscribe-test'),
variable_get('unsubscribe_batch', 0),
+ variable_get('unsubscribe_batch_time', 0),
'unsubscribe_process_message'
);
diff --git a/sites/all/modules/wmf_common/Queue.php
b/sites/all/modules/wmf_common/Queue.php
index dbd5df9..789c5b5 100644
--- a/sites/all/modules/wmf_common/Queue.php
+++ b/sites/all/modules/wmf_common/Queue.php
@@ -15,16 +15,30 @@
}
/**
- * Pop up to $batch_size messages off $queue and execute $callback on each.
+ * Pop from a queue and execute a callback on each message
*
- * @param $callback: must have the signature ($msg) -> bool
+ * @param string $queue name of the queue we will read from
+ * @param integer|null $batch_size maximum number of messages to process,
or falseish for unlimited
+ * @param integer|null $time_limit maximum time to spend looping, in
seconds, or falseish for unlimited
+ * @param callable $callback: must have the signature function($msg) ->
bool
+ *
+ * @return integer number of messages processed
*/
- function dequeue_loop( $queue, $batch_size, $callback ) {
+ function dequeue_loop( $queue, $batch_size, $time_limit, $callback ) {
+ if ( !$batch_size and !$time_limit ) {
+ throw new Exception( "Bad configuration: need to give a count or
time limit" );
+ }
+
$queue = $this->normalizeQueueName( $queue );
+
watchdog( 'wmf_common',
- t( 'Attempting to process at most %size contribution(s) from
"%queue" queue.',
- array( '%size' => $batch_size, '%queue' => $queue )
- ), NULL, WATCHDOG_INFO
+ 'Attempting to process at most %size contribution(s) from "%queue"
queue, spending at most %time seconds.',
+ array(
+ '%size' => ( $batch_size ? $batch_size : 'unlimited' ),
+ '%time' => ( $time_limit ? $time_limit : 'unlimited' ),
+ '%queue' => $queue,
+ ),
+ WATCHDOG_INFO
);
$con = $this->getFreshConnection();
@@ -33,17 +47,30 @@
// things that either have no delay_till header or a delay_till that is
// less than now. Because ActiveMQ is stupid, numeric selects auto
// compare to null.
- $ctime = time();
+ $start_time = time();
$con->subscribe( $queue, array( 'ack' => 'client', ) );
+ $con->setReadTimeout( 4 );
$processed = 0;
- for ( $i = 0; $i < $batch_size; $i++ ) {
- // we could alternatively set a time limit on the stomp readframe
- set_time_limit( 10 );
+ while ( true ) {
+ if ( $batch_size
+ and $processed >= $batch_size
+ ) {
+ watchdog( 'wmf_common', t( 'Processed the maximum batch size,
exiting dequeue loop.' ), NULL, WATCHDOG_INFO );
+ break;
+ }
+
+ if ( $time_limit
+ and time() >= ( $start_time + $time_limit )
+ ) {
+ watchdog( 'wmf_common', t( 'Time limit expired, exiting
dequeue loop.' ), NULL, WATCHDOG_INFO );
+ break;
+ }
+
$msg = $con->readFrame();
if ( empty($msg) ) {
watchdog( 'wmf_common', t('Ran out of messages.'), NULL,
WATCHDOG_INFO );
- return FALSE;
+ break;
}
if ($msg->command === 'RECEIPT') {
// TODO it would be smart to keep track of RECEIPT frames as
they are an ack-of-ack
--
To view, visit https://gerrit.wikimedia.org/r/119546
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If954138ce34aaf67a01d266ee211b0578919a4f3
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/crm
Gerrit-Branch: master
Gerrit-Owner: Adamw <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits