jenkins-bot has submitted this change and it was merged.

Change subject: New interface for atomic reads
......................................................................


New interface for atomic reads

The new AtomicReadBuffer data source provides opaque access to any FIFO-like
buffer that supports manual commit of pop operations.  This interface is only
used for this one popAtomic operation.

They are processed one at a time by a client-defined callback, and any uncaught
Exception causes the pop to be rolled back.

Bug: T131271
Bug: T131269
Change-Id: I3a704669a65fcc7d55b8d6a354d4f16aee586dc7
---
M README.md
M src/PHPQueue/Backend/PDO.php
A src/PHPQueue/Interfaces/AtomicReadBuffer.php
M src/PHPQueue/Interfaces/IndexedFifoQueueStore.php
M src/PHPQueue/Interfaces/KeyValueStore.php
M test/PHPQueue/Backend/PDOTest.php
6 files changed, 151 insertions(+), 6 deletions(-)

Approvals:
  Ejegg: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/README.md b/README.md
index 72a73c8..c69cef1 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@
 
 ## Why PHP-Queue? ##
 
-The pains of implementing a queueing system (eg. Beanstalk, Amazon SQS, 
RabbitMQ) for your applciation:
+Implementing a queueing system (eg. Beanstalk, Amazon SQS, RabbitMQ) for your 
application can be painful:
 
 * Which one is most efficient? Performant?
 * Learning curve to effectively implement the queue backend & the libraries.
@@ -170,6 +170,37 @@
 
 You can read more about the [Runners 
here](https://github.com/CoderKungfu/php-queue/blob/master/demo/runners/README.md).
 
+## Interfaces ##
+
+The queue backends will support one or more of these interfaces:
+
+* AtomicReadBuffer
+
+This is the recommended way to consume messages.  AtomicReadBuffer provides the
+popAtomic($callback) interface, which rolls back the popped record if the
+callback returns by exception.  For example:
+    $queue = new PHPQueue\Backend\PDO($options);
+
+    $queue->popAtomic(function ($message) use ($processor) {
+        $processor->churn($message);
+    });
+
+The message will only be popped if churn() returns successfully.
+
+* FifoQueueStore
+
+A first in first out queue accessed by push and pop.
+
+* IndexedFifoQueueStore
+
+Messages are indexed along one column as they are pushed into a FIFO queue,
+otherwise these behave like FifoQueueStore. clear() deletes records by index.
+There is no get() operation, you'll need a KeyValueStore for that.
+
+* KeyValueStore
+
+Jobs can be retrieved and deleted by their index.
+
 ---
 ## License ##
 
diff --git a/src/PHPQueue/Backend/PDO.php b/src/PHPQueue/Backend/PDO.php
index 813624d..85d023f 100644
--- a/src/PHPQueue/Backend/PDO.php
+++ b/src/PHPQueue/Backend/PDO.php
@@ -2,12 +2,17 @@
 namespace PHPQueue\Backend;
 
 use PHPQueue\Exception\BackendException;
+use PHPQueue\Interfaces\AtomicReadBuffer;
+use PHPQueue\Interfaces\FifoQueueStore;
 use PHPQueue\Interfaces\IndexedFifoQueueStore;
 use PHPQueue\Interfaces\KeyValueStore;
 
 class PDO
     extends Base
-    implements IndexedFifoQueueStore, KeyValueStore
+    implements AtomicReadBuffer,
+        FifoQueueStore,
+        IndexedFifoQueueStore,
+        KeyValueStore
 {
     private $connection_string;
     private $db_user;
@@ -77,7 +82,6 @@
                 throw new BackendException('Statement failed: ' . implode(' - 
', $sth->errorInfo()));
             }
         } catch (\Exception $ex) {
-            // XXX: This isn't catching the exception!
             // TODO: Log original error and table creation attempt.
             $this->createTable($this->db_table);
 
@@ -127,7 +131,7 @@
 
     public function pop()
     {
-        // Where $id is null, get oldest message
+        // Get oldest message.
         $sql = sprintf('SELECT `id`, `data` FROM `%s` WHERE 1 ORDER BY id ASC 
LIMIT 1', $this->db_table);
         $sth = $this->getConnection()->prepare($sql);
         $sth->execute();
@@ -135,11 +139,30 @@
         $result = $sth->fetch(\PDO::FETCH_ASSOC);
         if ($result) {
             $this->last_job_id = $result['id'];
+            $this->clear($result['id']);
             return json_decode($result['data'], true);
         }
         return null;
     }
 
+    public function popAtomic($callback) {
+        try {
+            $this->getConnection()->beginTransaction();
+            $data = $this->pop();
+
+            if (!is_callable($callback)) {
+                throw new RuntimeException("Bad callback passed to " . 
__METHOD__);
+            }
+            call_user_func($callback, $data);
+
+            $this->getConnection()->commit();
+            return $data;
+        } catch (\Exception $ex) {
+            $this->getConnection()->rollback();
+            throw $ex;
+        }
+    }
+
     public function clear($id = null)
     {
         if (empty($id)) {
diff --git a/src/PHPQueue/Interfaces/AtomicReadBuffer.php 
b/src/PHPQueue/Interfaces/AtomicReadBuffer.php
new file mode 100644
index 0000000..bdc62bc
--- /dev/null
+++ b/src/PHPQueue/Interfaces/AtomicReadBuffer.php
@@ -0,0 +1,40 @@
+<?php
+namespace PHPQueue\Interfaces;
+
+/**
+ * Implemented by backends that provide "at least once" consumption
+ */
+interface AtomicReadBuffer
+{
+    /**
+     * Pop and process data in an atomic way, so that the message will not be
+     * consumed in case of failure.
+     *
+     * @param $callback A processing function with the signature,
+     *     function( $message ) throws Exception
+     *         This function accepts an array $message, the next message to be
+     *     popped from your buffer.  In normal operation, the message is popped
+     *     after the function returns successfully, which gives us the
+     *     guarantee that each message is consumed successfully "at least
+     *     once".  The processor callback can handle a message by diverting to
+     *     a reject sink, of course, a clean return only means that the
+     *     callback has completed some action locally considered correct, not
+     *     that there were no errors in processing.
+     *         Throwing an exception from callback means that we were unable or
+     *     chose not to handle the message at all, and it should be considered
+     *     unconsumed.  In this case it is not popped when popAtomic returns.
+     *
+     * @return array|null popAtomic returns the currently popped record as a
+     *     courtesy.  Note that any atomic processing should happen within
+     *     $callback.  I'm not sure when it's valid to do anything with the
+     *     return value.
+     *     Or, returns null if the queue is empty over the backend poll 
interval.
+     *
+     * @throws \Exception When the processor dies in an unexpected way.  Most
+     *     callbacks should handle rejected messages internally and not throw
+     *     an error.  A message which causes the processor to error repeatedly
+     *     causes "queue jam", something we alert about loudly and should
+     *     eventually shunt these messages into a reject stream.
+     */
+    public function popAtomic( $callback );
+}
diff --git a/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php 
b/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php
index 1d15dde..ff06f66 100644
--- a/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php
+++ b/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php
@@ -19,7 +19,8 @@
      * @throws \PHPQueue\Exception\JobNotFoundException When no data is 
available.
      * @throws \Exception Other failures.
      *
-     * TODO: We should provide transactionality to make this operation safer.
+     * @deprecated This is not a safe operation.  Consider using
+     *     AtomicReadBuffer::popAtomic instead.
      */
     public function pop();
 
diff --git a/src/PHPQueue/Interfaces/KeyValueStore.php 
b/src/PHPQueue/Interfaces/KeyValueStore.php
index b98b04a..b9d5823 100644
--- a/src/PHPQueue/Interfaces/KeyValueStore.php
+++ b/src/PHPQueue/Interfaces/KeyValueStore.php
@@ -10,11 +10,15 @@
      * @param $key string
      * @param $value mixed Serializable value
      * @param $properties array optional additional message properties
+     *     FIXME: Define better.  Are these columns the indexes?  Why separate
+     *     from the message?
      * @throws \Exception
      */
     public function set($key, $value, $properties=array());
 
     /**
+     * Look up and return a value by its index value.
+     *
      * @param $key string
      * @return array The data.
      * @throws \Exception
diff --git a/test/PHPQueue/Backend/PDOTest.php 
b/test/PHPQueue/Backend/PDOTest.php
index da70b14..9149025 100644
--- a/test/PHPQueue/Backend/PDOTest.php
+++ b/test/PHPQueue/Backend/PDOTest.php
@@ -1,5 +1,6 @@
 <?php
 namespace PHPQueue\Backend;
+
 class PDOTest extends \PHPUnit_Framework_TestCase
 {
     /**
@@ -23,11 +24,12 @@
 
         // Check that the database exists, and politely skip if not.
         try {
-            $this->object = new \PDO($options['connection_string']);
+            new \PDO($options['connection_string']);
         } catch ( \PDOException $ex ) {
             $this->markTestSkipped('Database access failed: ' . 
$ex->getMessage());
         }
 
+        $this->object = new PHPQueue\Backend\PDO($options);
         // Create table
         $this->assertTrue($this->object->createTable('pdotest'));
         $this->object->clearAll();
@@ -113,4 +115,48 @@
     {
         $this->assertNull( $this->object->pop() );
     }
+
+    /**
+     * popAtomic should pop if the processor callback is successful.
+     */
+    public function testPopAtomicCommit()
+    {
+        $data = array(mt_rand(), 'Abbie', 'Hoffman');
+
+        $this->object->push($data);
+        $self = $this;
+        $did_run = false;
+        $callback = function ($message) use ($self, &$did_run, $data) {
+            $self->assertEquals($data, $message);
+            $did_run = true;
+        };
+        $this->assertEquals($data, $this->object->popAtomic($callback));
+        $this->assertEquals(true, $did_run);
+        // Record has really gone away.
+        $this->assertEquals(null, $this->object->pop());
+    }
+
+    /**
+     * popAtomic should not pop if the processor throws an error.
+     */
+    public function testPopAtomicRollback()
+    {
+        $data = array(mt_rand(), 'Abbie', 'Hoffman');
+
+        $this->object->push($data);
+        $self = $this;
+        $callback = function ($message) use ($self, $data) {
+            $self->assertEquals($data, $message);
+            throw new \Exception("Foiled!");
+        };
+        try {
+            $this->assertEquals($data, $this->object->popAtomic($callback));
+            $this->fail("Should have failed by this point");
+        } catch (\Exception $ex) {
+            $this->assertEquals("Foiled!", $ex->getMessage());
+        }
+
+        // Punchline: data should still be available for the retry pop.
+        $this->assertEquals($data, $this->object->popAtomic(function 
($message) {}));
+    }
 }

-- 
To view, visit https://gerrit.wikimedia.org/r/284977
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I3a704669a65fcc7d55b8d6a354d4f16aee586dc7
Gerrit-PatchSet: 22
Gerrit-Project: wikimedia/fundraising/php-queue
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>
Gerrit-Reviewer: Awight <[email protected]>
Gerrit-Reviewer: Cdentinger <[email protected]>
Gerrit-Reviewer: Ejegg <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to