Awight has submitted this change and it was merged.
Change subject: follow composer.lock
......................................................................
follow composer.lock
Change-Id: I4e6c900bd6df83b5e50c62d20c2c3d10a90e4c44
---
M coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php
M coderkungfu/php-queue/test/PHPQueue/Backend/PredisTest.php
M coderkungfu/php-queue/test/PHPQueue/Backend/PredisZsetTest.php
M composer/installed.json
4 files changed, 139 insertions(+), 29 deletions(-)
Approvals:
Awight: Verified; Looks good to me, approved
diff --git a/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php
b/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php
index 25b0cd3..721d91c 100644
--- a/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php
+++ b/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php
@@ -1,6 +1,8 @@
<?php
namespace PHPQueue\Backend;
+use Predis\Transaction\MultiExec;
+
use PHPQueue\Exception\BackendException;
use PHPQueue\Interfaces\KeyValueStore;
use PHPQueue\Interfaces\FifoQueueStore;
@@ -112,6 +114,43 @@
}
/**
+ * Remove stale elements at the top of the queue and return the first real
entry
+ *
+ * When data expires, it still leaves a queue entry linking to its
+ * correlation ID. Clear any of these stale entries at the head of
+ * the queue.
+ *
+ * Note that we run this from inside a transaction, to make it less
+ * likely that we'll hit a race condition.
+ *
+ * @param MultiExec $tx transaction we're working within.
+ *
+ * @return string|null Top element's key, or null if the queue is empty.
+ */
+ protected function peekWithCleanup(MultiExec $tx)
+ {
+ for (;;) {
+ // Look up the first element in the FIFO ordering.
+ $values = $tx->zrange(Predis::FIFO_INDEX, 0, 0);
+ if ($values) {
+ // Use that value as a key into the key-value block.
+ $key = $values[0];
+ $exists = $tx->exists($key);
+
+ if (!$exists) {
+ // If the data is missing, then remove from the FIFO index.
+ $tx->zrem(Predis::FIFO_INDEX, $key);
+ } else {
+ return $key;
+ }
+ } else {
+ break;
+ }
+ }
+ return null;
+ }
+
+ /**
* @return array|null
*/
public function pop()
@@ -123,36 +162,28 @@
}
if ($this->order_key) {
// Pop the first element.
- do {
- // Adapted from
https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php
- $options = array(
- 'cas' => true,
- 'watch' => self::FIFO_INDEX,
- 'retry' => 3,
- );
- $order_key = $this->order_key;
- $queue_empty = true; // Fail open.
- $this->getConnection()->transaction($options, function ($tx)
use ($order_key, &$data, &$queue_empty) {
- // Look up the first element in the FIFO ordering.
- $values = $tx->zrange(Predis::FIFO_INDEX, 0, 0);
- if ($values) {
- $queue_empty = false;
- // Use that value as a key into the key-value block,
to get the data.
- $key = $values[0];
- $data = $tx->get($key);
+ // Adapted from
https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php
+ $options = array(
+ 'cas' => true,
+ 'watch' => self::FIFO_INDEX,
+ 'retry' => 3,
+ );
+ $self = $this;
+ $this->getConnection()->transaction($options, function ($tx) use
(&$data, &$self) {
+ // Begin transaction.
+ $tx->multi();
- // Begin transaction.
- $tx->multi();
+ $key = $self->peekWithCleanup($tx);
- // Remove from both indexes.
- $tx->zrem(Predis::FIFO_INDEX, $key);
- $tx->del($key);
- }
- });
- // Skip over anything without a corresponding key/value entry,
- // which occurs when objects expire, and loop to the next
- // element.
- } while ($data === null && !$queue_empty);
+ if ($key) {
+ // Use that value as a key into the key-value block.
+ $data = $tx->get($key);
+
+ // Remove from both indexes.
+ $tx->zrem(Predis::FIFO_INDEX, $key);
+ $tx->del($key);
+ }
+ });
} else {
$data = $this->getConnection()->lpop($this->queue_name);
}
@@ -166,6 +197,56 @@
return json_decode($data, true);
}
+ /**
+ * Return the top element in the queue.
+ *
+ * @return array|null
+ */
+ public function peek()
+ {
+ $data = null;
+ $this->beforeGet();
+ if (!$this->hasQueue()) {
+ throw new BackendException("No queue specified.");
+ }
+ if ($this->order_key) {
+ // Adapted from
https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php
+ $options = array(
+ 'cas' => true,
+ 'watch' => self::FIFO_INDEX,
+ 'retry' => 3,
+ );
+ $self = $this;
+ $this->getConnection()->transaction($options, function ($tx) use
(&$data, &$self) {
+ // Begin transaction.
+ $tx->multi();
+
+ $key = $self->peekWithCleanup($tx);
+
+ if ($key) {
+ // Use that value as a key into the key-value block.
+ $data = $tx->get($key);
+ }
+ });
+ } else {
+ $data_range = $this->getConnection()->lrange($this->queue_name, 0,
0);
+ if (!$data_range) {
+ return null;
+ } else {
+ // Unpack list.
+ $data = $data_range[0];
+ }
+ }
+ if (!$data) {
+ return null;
+ }
+ $this->last_job = $data;
+ $this->last_job_id = time();
+ $this->afterGet();
+
+ return json_decode($data, true);
+ }
+
public function release($jobId=null)
{
$this->beforeRelease($jobId);
diff --git a/coderkungfu/php-queue/test/PHPQueue/Backend/PredisTest.php
b/coderkungfu/php-queue/test/PHPQueue/Backend/PredisTest.php
index 04d6384..e46a8cc 100644
--- a/coderkungfu/php-queue/test/PHPQueue/Backend/PredisTest.php
+++ b/coderkungfu/php-queue/test/PHPQueue/Backend/PredisTest.php
@@ -183,10 +183,24 @@
$this->object->push($data);
$this->assertEquals($data, $this->object->pop());
+
+ // Check that we did remove the object.
+ $this->assertNull($this->object->pop());
}
public function testPopEmpty()
{
$this->assertNull($this->object->pop());
}
+
+ public function testPeek()
+ {
+ $data = 'Weezle-' . mt_rand();
+ $this->object->push($data);
+
+ $this->assertEquals($data, $this->object->peek());
+
+ // Check that we didn't remove the object by peeking.
+ $this->assertEquals($data, $this->object->pop());
+ }
}
diff --git a/coderkungfu/php-queue/test/PHPQueue/Backend/PredisZsetTest.php
b/coderkungfu/php-queue/test/PHPQueue/Backend/PredisZsetTest.php
index ed3d1cf..32df3cb 100644
--- a/coderkungfu/php-queue/test/PHPQueue/Backend/PredisZsetTest.php
+++ b/coderkungfu/php-queue/test/PHPQueue/Backend/PredisZsetTest.php
@@ -103,4 +103,19 @@
{
$this->assertNull($this->object->pop());
}
+
+ public function testPeek()
+ {
+ $data = array(
+ 'name' => 'Weezle-' . mt_rand(),
+ 'timestamp' => mt_rand(),
+ 'txn_id' => mt_rand(),
+ );
+ $this->object->push($data);
+
+ $this->assertEquals($data, $this->object->peek());
+
+ // Check that we didn't remove the object by peeking.
+ $this->assertEquals($data, $this->object->pop());
+ }
}
diff --git a/composer/installed.json b/composer/installed.json
index fe0941c..c9faf59 100644
--- a/composer/installed.json
+++ b/composer/installed.json
@@ -315,7 +315,7 @@
"source": {
"type": "git",
"url":
"https://gerrit.wikimedia.org/r/p/wikimedia/fundraising/php-queue.git",
- "reference": "4f8d50b0bab4a2c3bfa1eb0ce584f198362ded61"
+ "reference": "afbe1a5e1e310a1b9e3b74adb79287d1626ea63d"
},
"require": {
"clio/clio": "@stable",
--
To view, visit https://gerrit.wikimedia.org/r/231054
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I4e6c900bd6df83b5e50c62d20c2c3d10a90e4c44
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/DonationInterface/vendor
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>
Gerrit-Reviewer: Awight <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits