Cdentinger has uploaded a new change for review. https://gerrit.wikimedia.org/r/313124
Change subject: update php-queue to 7834573f5dc2ab8935684259661b4a1fa83978cf ...................................................................... update php-queue to 7834573f5dc2ab8935684259661b4a1fa83978cf also a bunch of other stuff that was not up to date for some reason Change-Id: Iab77badb479e0609f6675791ce171739454931c7 --- M coderkungfu/php-queue/composer.json M coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php M coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php M coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php M coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php 5 files changed, 64 insertions(+), 21 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/SmashPig/vendor refs/changes/24/313124/1 diff --git a/coderkungfu/php-queue/composer.json b/coderkungfu/php-queue/composer.json index 5e52c7c..f75d7ef 100644 --- a/coderkungfu/php-queue/composer.json +++ b/coderkungfu/php-queue/composer.json @@ -20,7 +20,7 @@ "require": { "php": ">=5.3.0", "monolog/monolog": "~1.3", - "clio/clio": "@stable" + "clio/clio": "0.1.*" }, "require-dev": { "jakub-onderka/php-parallel-lint": "0.9", diff --git a/coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php b/coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php index 5b1fd28..e3a620a 100644 --- a/coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php +++ b/coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php @@ -104,8 +104,8 @@ throw new \Exception('Could not prepare statement'); } $_tmp = json_encode($data); - $sth->bindParam(1, $_tmp, \PDO::PARAM_STR); - $sth->bindParam(2, self::getTimeStamp(), \PDO::PARAM_STR); + $sth->bindValue(1, $_tmp, \PDO::PARAM_STR); + $sth->bindValue(2, self::getTimeStamp(), \PDO::PARAM_STR); return $sth->execute(); } @@ -114,9 +114,9 @@ $sql = sprintf('REPLACE INTO `%s` (`id`, `data`, `timestamp`) VALUES (?, ?, ?)', $this->db_table); $sth = $this->getConnection()->prepare($sql); $_tmp = json_encode($data); - $sth->bindParam(1, $id, \PDO::PARAM_INT); - $sth->bindParam(2, $_tmp, \PDO::PARAM_STR); - $sth->bindParam(3, self::getTimeStamp(), \PDO::PARAM_STR); + $sth->bindValue(1, $id, \PDO::PARAM_INT); + $sth->bindValue(2, $_tmp, \PDO::PARAM_STR); + $sth->bindValue(3, self::getTimeStamp(), \PDO::PARAM_STR); $sth->execute(); } @@ -137,7 +137,7 @@ $sql = sprintf('SELECT `id`, `data` FROM `%s` WHERE `id` = ?', $this->db_table); $sth = $this->getConnection()->prepare($sql); - $sth->bindParam(1, $id, \PDO::PARAM_INT); + $sth->bindValue(1, $id, \PDO::PARAM_INT); $sth->execute(); $result = $sth->fetch(\PDO::FETCH_ASSOC); @@ -154,6 +154,12 @@ // Get oldest message. $sql = sprintf('SELECT `id`, `data` FROM `%s` WHERE 1 ORDER BY id ASC LIMIT 1', $this->db_table); $sth = $this->getConnection()->prepare($sql); + + // This will be false if the table or collection does not exist + if ( ! $sth ) { + return null; + } + $sth->execute(); $result = $sth->fetch(\PDO::FETCH_ASSOC); @@ -193,7 +199,7 @@ try { $sql = sprintf('DELETE FROM `%s` WHERE `id` = ?', $this->db_table); $sth = $this->getConnection()->prepare($sql); - $sth->bindParam(1, $id, \PDO::PARAM_INT); + $sth->bindValue(1, $id, \PDO::PARAM_INT); $sth->execute(); } catch (\Exception $ex) { throw new BackendException('Invalid ID.'); diff --git a/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php b/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php index fbe8e0f..e74e6a5 100644 --- a/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php +++ b/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php @@ -5,6 +5,7 @@ use Predis\Transaction\MultiExec; use PHPQueue\Exception\BackendException; +use PHPQueue\Interfaces\AtomicReadBuffer; use PHPQueue\Interfaces\KeyValueStore; use PHPQueue\Interfaces\FifoQueueStore; @@ -28,7 +29,10 @@ */ class Predis extends Base - implements FifoQueueStore, KeyValueStore + implements + AtomicReadBuffer, + FifoQueueStore, + KeyValueStore { const TYPE_STRING='string'; const TYPE_HASH='hash'; @@ -198,6 +202,37 @@ return json_decode($data, true); } + public function popAtomic($callback) { + if (!$this->hasQueue()) { + throw new BackendException("No queue specified."); + } + if ($this->order_key) { + throw new BackendException("atomicPop not yet supported for zsets"); + } + + // Pop and process the first element, erring on the side of + // at-least-once processing where the callback might get the same + // element before it's popped in the case of a race. + $options = array( + 'cas' => true, + 'watch' => $this->queue_name, + 'retry' => 3, + ); + $data = null; + $self = $this; + $this->getConnection()->transaction($options, function ($tx) use (&$data, $callback, $self) { + // Begin transaction. + $tx->multi(); + + $data = $tx->lpop($self->queue_name); + $data = json_decode($data, true); + if ($data !== null) { + call_user_func($callback, $data); + } + }); + return $data; + } + /** * Return the top element in the queue. * diff --git a/coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php b/coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php index d6fea1a..901e193 100644 --- a/coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php +++ b/coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php @@ -11,18 +11,20 @@ * consumed in case of failure. * * @param callable $callback A processing function with the signature, - * function( $message ) throws Exception - * This function accepts an array $message, the next message to be - * popped from your buffer. In normal operation, the message is popped - * after the function returns successfully, which gives us the - * guarantee that each message is consumed successfully "at least - * once". The processor callback can handle a message by diverting to - * a reject sink, of course, a clean return only means that the - * callback has completed some action locally considered correct, not - * that there were no errors in processing. + * void function( $message ) throws Exception + * This function accepts an array $message, the next message popped + * from your buffer. In normal operation, the message is popped after + * the function returns successfully, which gives us the guarantee that + * each message is consumed successfully "at least once". The + * processor callback might either handle a message and consume it, + * reject and divert to a dead-letter queue, or raise an exception. The + * contract is that a clean function return means that the callback has + * completed its purpose and the message can be dropped without further + * processing. * Throwing an exception from callback means that we were unable or * chose not to handle the message at all, and it should be considered - * unconsumed. In this case it is not popped when popAtomic returns. + * unconsumed. In this case it is not popped from the queue when + * popAtomic returns. * If there are no messages in the queue, the callback is not run. * * @return array|null popAtomic returns the currently popped record as a @@ -37,5 +39,5 @@ * causes "queue jam", something we alert about loudly and should * eventually shunt these messages into a reject stream. */ - public function popAtomic( $callback ); + public function popAtomic($callback); } diff --git a/coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php b/coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php index 5a420a1..267e952 100644 --- a/coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php +++ b/coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php @@ -139,7 +139,7 @@ } // Punchline: data should still be available for the retry pop. - $this->assertEquals($data, $this->object->popAtomic(function ($message) {})); + $this->assertEquals($data, $this->object->pop()); } /** -- To view, visit https://gerrit.wikimedia.org/r/313124 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Iab77badb479e0609f6675791ce171739454931c7 Gerrit-PatchSet: 1 Gerrit-Project: wikimedia/fundraising/SmashPig/vendor Gerrit-Branch: master Gerrit-Owner: Cdentinger <cdentin...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits