Cdentinger has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/313124

Change subject: update php-queue to 7834573f5dc2ab8935684259661b4a1fa83978cf
......................................................................

update php-queue to 7834573f5dc2ab8935684259661b4a1fa83978cf

also a bunch of other stuff that was not up to date for some reason

Change-Id: Iab77badb479e0609f6675791ce171739454931c7
---
M coderkungfu/php-queue/composer.json
M coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php
M coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php
M coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php
M coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php
5 files changed, 64 insertions(+), 21 deletions(-)


  git pull 
ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/SmashPig/vendor 
refs/changes/24/313124/1

diff --git a/coderkungfu/php-queue/composer.json 
b/coderkungfu/php-queue/composer.json
index 5e52c7c..f75d7ef 100644
--- a/coderkungfu/php-queue/composer.json
+++ b/coderkungfu/php-queue/composer.json
@@ -20,7 +20,7 @@
     "require": {
         "php": ">=5.3.0",
         "monolog/monolog": "~1.3",
-        "clio/clio": "@stable"
+        "clio/clio": "0.1.*"
     },
     "require-dev": {
         "jakub-onderka/php-parallel-lint": "0.9",
diff --git a/coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php 
b/coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php
index 5b1fd28..e3a620a 100644
--- a/coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php
+++ b/coderkungfu/php-queue/src/PHPQueue/Backend/PDO.php
@@ -104,8 +104,8 @@
             throw new \Exception('Could not prepare statement');
         }
         $_tmp = json_encode($data);
-        $sth->bindParam(1, $_tmp, \PDO::PARAM_STR);
-        $sth->bindParam(2, self::getTimeStamp(), \PDO::PARAM_STR);
+        $sth->bindValue(1, $_tmp, \PDO::PARAM_STR);
+        $sth->bindValue(2, self::getTimeStamp(), \PDO::PARAM_STR);
         return $sth->execute();
     }
 
@@ -114,9 +114,9 @@
         $sql = sprintf('REPLACE INTO `%s` (`id`, `data`, `timestamp`) VALUES 
(?, ?, ?)', $this->db_table);
         $sth = $this->getConnection()->prepare($sql);
         $_tmp = json_encode($data);
-        $sth->bindParam(1, $id, \PDO::PARAM_INT);
-        $sth->bindParam(2, $_tmp, \PDO::PARAM_STR);
-        $sth->bindParam(3, self::getTimeStamp(), \PDO::PARAM_STR);
+        $sth->bindValue(1, $id, \PDO::PARAM_INT);
+        $sth->bindValue(2, $_tmp, \PDO::PARAM_STR);
+        $sth->bindValue(3, self::getTimeStamp(), \PDO::PARAM_STR);
         $sth->execute();
     }
 
@@ -137,7 +137,7 @@
 
         $sql = sprintf('SELECT `id`, `data` FROM `%s` WHERE `id` = ?', 
$this->db_table);
         $sth = $this->getConnection()->prepare($sql);
-        $sth->bindParam(1, $id, \PDO::PARAM_INT);
+        $sth->bindValue(1, $id, \PDO::PARAM_INT);
         $sth->execute();
 
         $result = $sth->fetch(\PDO::FETCH_ASSOC);
@@ -154,6 +154,12 @@
         // Get oldest message.
         $sql = sprintf('SELECT `id`, `data` FROM `%s` WHERE 1 ORDER BY id ASC 
LIMIT 1', $this->db_table);
         $sth = $this->getConnection()->prepare($sql);
+
+        // This will be false if the table or collection does not exist
+        if ( ! $sth ) {
+            return null;
+        }
+
         $sth->execute();
 
         $result = $sth->fetch(\PDO::FETCH_ASSOC);
@@ -193,7 +199,7 @@
         try {
             $sql = sprintf('DELETE FROM `%s` WHERE `id` = ?', $this->db_table);
             $sth = $this->getConnection()->prepare($sql);
-            $sth->bindParam(1, $id, \PDO::PARAM_INT);
+            $sth->bindValue(1, $id, \PDO::PARAM_INT);
             $sth->execute();
         } catch (\Exception $ex) {
             throw new BackendException('Invalid ID.');
diff --git a/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php 
b/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php
index fbe8e0f..e74e6a5 100644
--- a/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php
+++ b/coderkungfu/php-queue/src/PHPQueue/Backend/Predis.php
@@ -5,6 +5,7 @@
 use Predis\Transaction\MultiExec;
 
 use PHPQueue\Exception\BackendException;
+use PHPQueue\Interfaces\AtomicReadBuffer;
 use PHPQueue\Interfaces\KeyValueStore;
 use PHPQueue\Interfaces\FifoQueueStore;
 
@@ -28,7 +29,10 @@
  */
 class Predis
     extends Base
-    implements FifoQueueStore, KeyValueStore
+    implements
+        AtomicReadBuffer,
+        FifoQueueStore,
+        KeyValueStore
 {
     const TYPE_STRING='string';
     const TYPE_HASH='hash';
@@ -198,6 +202,37 @@
         return json_decode($data, true);
     }
 
+    public function popAtomic($callback) {
+        if (!$this->hasQueue()) {
+            throw new BackendException("No queue specified.");
+        }
+        if ($this->order_key) {
+            throw new BackendException("atomicPop not yet supported for 
zsets");
+        }
+
+        // Pop and process the first element, erring on the side of
+        // at-least-once processing where the callback might get the same
+        // element before it's popped in the case of a race.
+        $options = array(
+            'cas' => true,
+            'watch' => $this->queue_name,
+            'retry' => 3,
+        );
+        $data = null;
+        $self = $this;
+        $this->getConnection()->transaction($options, function ($tx) use 
(&$data, $callback, $self) {
+            // Begin transaction.
+            $tx->multi();
+
+            $data = $tx->lpop($self->queue_name);
+            $data = json_decode($data, true);
+            if ($data !== null) {
+                call_user_func($callback, $data);
+            }
+        });
+        return $data;
+    }
+
     /**
      * Return the top element in the queue.
      *
diff --git a/coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php 
b/coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php
index d6fea1a..901e193 100644
--- a/coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php
+++ b/coderkungfu/php-queue/src/PHPQueue/Interfaces/AtomicReadBuffer.php
@@ -11,18 +11,20 @@
      * consumed in case of failure.
      *
      * @param callable $callback A processing function with the signature,
-     *     function( $message ) throws Exception
-     *         This function accepts an array $message, the next message to be
-     *     popped from your buffer.  In normal operation, the message is popped
-     *     after the function returns successfully, which gives us the
-     *     guarantee that each message is consumed successfully "at least
-     *     once".  The processor callback can handle a message by diverting to
-     *     a reject sink, of course, a clean return only means that the
-     *     callback has completed some action locally considered correct, not
-     *     that there were no errors in processing.
+     *     void function( $message ) throws Exception
+     *         This function accepts an array $message, the next message popped
+     *     from your buffer.  In normal operation, the message is popped after
+     *     the function returns successfully, which gives us the guarantee that
+     *     each message is consumed successfully "at least once".  The
+     *     processor callback might either handle a message and consume it,
+     *     reject and divert to a dead-letter queue, or raise an exception. The
+     *     contract is that a clean function return means that the callback has
+     *     completed its purpose and the message can be dropped without further
+     *     processing.
      *         Throwing an exception from callback means that we were unable or
      *     chose not to handle the message at all, and it should be considered
-     *     unconsumed.  In this case it is not popped when popAtomic returns.
+     *     unconsumed.  In this case it is not popped from the queue when
+     *     popAtomic returns.
      *         If there are no messages in the queue, the callback is not run.
      *
      * @return array|null popAtomic returns the currently popped record as a
@@ -37,5 +39,5 @@
      *     causes "queue jam", something we alert about loudly and should
      *     eventually shunt these messages into a reject stream.
      */
-    public function popAtomic( $callback );
+    public function popAtomic($callback);
 }
diff --git a/coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php 
b/coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php
index 5a420a1..267e952 100644
--- a/coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php
+++ b/coderkungfu/php-queue/test/PHPQueue/Backend/PDOBaseTest.php
@@ -139,7 +139,7 @@
         }
 
         // Punchline: data should still be available for the retry pop.
-        $this->assertEquals($data, $this->object->popAtomic(function 
($message) {}));
+        $this->assertEquals($data, $this->object->pop());
     }
 
     /**

-- 
To view, visit https://gerrit.wikimedia.org/r/313124
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iab77badb479e0609f6675791ce171739454931c7
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/SmashPig/vendor
Gerrit-Branch: master
Gerrit-Owner: Cdentinger <cdentin...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to