jenkins-bot has submitted this change and it was merged.
Change subject: Fix time and message limits
......................................................................
Fix time and message limits
Numeric coersion fixes strict comparison failures in loop condition,
also fix an off-by-one message limit error.
Bug: T133965
Change-Id: I7034e0deaf8421e238791a4da292e6c139fdef28
---
M Core/DataStores/QueueConsumer.php
M Tests/QueueConsumerTest.php
2 files changed, 90 insertions(+), 7 deletions(-)
Approvals:
Awight: Looks good to me, approved
jenkins-bot: Verified
diff --git a/Core/DataStores/QueueConsumer.php
b/Core/DataStores/QueueConsumer.php
index 02cb5a3..95fcb9f 100644
--- a/Core/DataStores/QueueConsumer.php
+++ b/Core/DataStores/QueueConsumer.php
@@ -54,18 +54,24 @@
$messageLimit = 0,
$damagedQueue = null
) {
+ if ( !is_numeric( $timeLimit ) ) {
+ throw new InvalidArgumentException( 'timeLimit must be
numeric' );
+ }
+ if ( !is_numeric( $messageLimit ) ) {
+ throw new InvalidArgumentException( 'messageLimit must
be numeric' );
+ }
+ if ( !is_callable( $callback ) ) {
+ throw new InvalidArgumentException( "Processing
callback must be callable" );
+ }
+
$this->callback = $callback;
- $this->timeLimit = $timeLimit;
- $this->messageLimit = $messageLimit;
+ $this->timeLimit = intval( $timeLimit );
+ $this->messageLimit = intval( $messageLimit );
$this->backend = self::getQueue( $queueName );
if ( !$this->backend instanceof AtomicReadBuffer ) {
throw new InvalidArgumentException( "Queue $queueName
is not an AtomicReadBuffer" );
- }
-
- if ( !is_callable( $callback ) ) {
- throw new InvalidArgumentException( "Processing
callback is not callable" );
}
if ( $damagedQueue ) {
@@ -105,7 +111,7 @@
$processed++;
}
$timeOk = $this->timeLimit === 0 || time() <=
$startTime + $this->timeLimit;
- $countOk = $this->messageLimit === 0 || $processed <=
$this->messageLimit;
+ $countOk = $this->messageLimit === 0 || $processed <
$this->messageLimit;
}
while( $timeOk && $countOk && $data !== null );
return $processed;
diff --git a/Tests/QueueConsumerTest.php b/Tests/QueueConsumerTest.php
index 4dec67e..a8a0988 100644
--- a/Tests/QueueConsumerTest.php
+++ b/Tests/QueueConsumerTest.php
@@ -111,4 +111,81 @@
'Should delete message on exception when damaged queue
exists'
);
}
+
+ public function testMessageLimit() {
+ $messages = array();
+ for ( $i = 0; $i < 5; $i++ ) {
+ $message = array(
+ 'box' => 'thing' . $i,
+ 'creepiness' => mt_rand(),
+ );
+ $messages[] = $message;
+ $this->queue->push( $message );
+ }
+ $processedMessages = array();
+ $callback = function( $message ) use ( &$processedMessages ) {
+ $processedMessages[] = $message;
+ };
+ // Should work when you pass in the limits as strings.
+ $consumer = new QueueConsumer( 'test', $callback, 0, '3' );
+ $count = $consumer->dequeueMessages();
+ $this->assertEquals( 3, $count, 'dequeueMessages returned wrong
count' );
+ $this->assertEquals( 3, count( $processedMessages ), 'Called
callback wrong number of times' );
+
+ for ( $i = 0; $i < 3; $i++ ) {
+ $this->assertEquals( $messages[$i],
$processedMessages[$i], 'Message mutated' );
+ }
+ $this->assertEquals(
+ $messages[3],
+ $this->queue->popAtomic( function( $unused ) {} ),
+ 'Messed with too many messages'
+ );
+ }
+
+ public function testKeepRunningOnDamage() {
+ $damagedQueue = QueueConsumer::getQueue( 'damaged' );
+ $damagedQueue->createTable( 'damaged' ); // FIXME: should not
need
+
+ $messages = array();
+ for ( $i = 0; $i < 5; $i++ ) {
+ $message = array(
+ 'box' => 'thing' . $i,
+ 'creepiness' => mt_rand(),
+ );
+ $messages[] = $message;
+ $this->queue->push( $message );
+ }
+ $processedMessages = array();
+ $cb = function( $message ) use ( &$processedMessages ) {
+ $processedMessages[] = $message;
+ throw new \Exception( 'kaboom!' );
+ };
+
+ $consumer = new QueueConsumer( 'test', $cb, 0, 3, 'damaged' );
+ $count = 0;
+ try {
+ $count = $consumer->dequeueMessages();
+ } catch ( \Exception $ex ) {
+ $this->fail(
+ 'Exception should not have bubbled up: ' .
$ex->getMessage()
+ );
+ }
+ $this->assertEquals( 3, $count, 'dequeueMessages returned wrong
count' );
+ $this->assertEquals( 3, count( $processedMessages ), 'Called
callback wrong number of times' );
+
+ for ( $i = 0; $i < 3; $i++ ) {
+ $this->assertEquals( $messages[$i],
$processedMessages[$i], 'Message mutated' );
+ $this->assertEquals(
+ $messages[$i],
+ $damagedQueue->popAtomic( function( $unused )
{} ),
+ 'Should move message to damaged queue when
exception is thrown'
+ );
+ }
+ $this->assertEquals(
+ $messages[3],
+ $this->queue->popAtomic( function( $unused ) {} ),
+ 'message 4 should be at the head of the queue'
+ );
+ }
+
}
--
To view, visit https://gerrit.wikimedia.org/r/288449
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I7034e0deaf8421e238791a4da292e6c139fdef28
Gerrit-PatchSet: 7
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Ejegg <[email protected]>
Gerrit-Reviewer: Awight <[email protected]>
Gerrit-Reviewer: Cdentinger <[email protected]>
Gerrit-Reviewer: Ejegg <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits