jenkins-bot has submitted this change and it was merged. Change subject: New interface for atomic reads ......................................................................
New interface for atomic reads The new AtomicReadBuffer data source provides opaque access to any FIFO-like buffer that supports manual commit of pop operations. This interface is only used for this one popAtomic operation. They are processed one at a time by a client-defined callback, and any uncaught Exception causes the pop to be rolled back. Bug: T131271 Bug: T131269 Change-Id: I3a704669a65fcc7d55b8d6a354d4f16aee586dc7 --- M README.md M src/PHPQueue/Backend/PDO.php A src/PHPQueue/Interfaces/AtomicReadBuffer.php M src/PHPQueue/Interfaces/IndexedFifoQueueStore.php M src/PHPQueue/Interfaces/KeyValueStore.php M test/PHPQueue/Backend/PDOTest.php 6 files changed, 151 insertions(+), 6 deletions(-) Approvals: Ejegg: Looks good to me, approved jenkins-bot: Verified diff --git a/README.md b/README.md index 72a73c8..c69cef1 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ ## Why PHP-Queue? ## -The pains of implementing a queueing system (eg. Beanstalk, Amazon SQS, RabbitMQ) for your applciation: +Implementing a queueing system (eg. Beanstalk, Amazon SQS, RabbitMQ) for your application can be painful: * Which one is most efficient? Performant? * Learning curve to effectively implement the queue backend & the libraries. @@ -170,6 +170,37 @@ You can read more about the [Runners here](https://github.com/CoderKungfu/php-queue/blob/master/demo/runners/README.md). +## Interfaces ## + +The queue backends will support one or more of these interfaces: + +* AtomicReadBuffer + +This is the recommended way to consume messages. AtomicReadBuffer provides the +popAtomic($callback) interface, which rolls back the popped record if the +callback returns by exception. For example: + $queue = new PHPQueue\Backend\PDO($options); + + $queue->popAtomic(function ($message) use ($processor) { + $processor->churn($message); + }); + +The message will only be popped if churn() returns successfully. + +* FifoQueueStore + +A first in first out queue accessed by push and pop. + +* IndexedFifoQueueStore + +Messages are indexed along one column as they are pushed into a FIFO queue, +otherwise these behave like FifoQueueStore. clear() deletes records by index. +There is no get() operation, you'll need a KeyValueStore for that. + +* KeyValueStore + +Jobs can be retrieved and deleted by their index. + --- ## License ## diff --git a/src/PHPQueue/Backend/PDO.php b/src/PHPQueue/Backend/PDO.php index 813624d..85d023f 100644 --- a/src/PHPQueue/Backend/PDO.php +++ b/src/PHPQueue/Backend/PDO.php @@ -2,12 +2,17 @@ namespace PHPQueue\Backend; use PHPQueue\Exception\BackendException; +use PHPQueue\Interfaces\AtomicReadBuffer; +use PHPQueue\Interfaces\FifoQueueStore; use PHPQueue\Interfaces\IndexedFifoQueueStore; use PHPQueue\Interfaces\KeyValueStore; class PDO extends Base - implements IndexedFifoQueueStore, KeyValueStore + implements AtomicReadBuffer, + FifoQueueStore, + IndexedFifoQueueStore, + KeyValueStore { private $connection_string; private $db_user; @@ -77,7 +82,6 @@ throw new BackendException('Statement failed: ' . implode(' - ', $sth->errorInfo())); } } catch (\Exception $ex) { - // XXX: This isn't catching the exception! // TODO: Log original error and table creation attempt. $this->createTable($this->db_table); @@ -127,7 +131,7 @@ public function pop() { - // Where $id is null, get oldest message + // Get oldest message. $sql = sprintf('SELECT `id`, `data` FROM `%s` WHERE 1 ORDER BY id ASC LIMIT 1', $this->db_table); $sth = $this->getConnection()->prepare($sql); $sth->execute(); @@ -135,11 +139,30 @@ $result = $sth->fetch(\PDO::FETCH_ASSOC); if ($result) { $this->last_job_id = $result['id']; + $this->clear($result['id']); return json_decode($result['data'], true); } return null; } + public function popAtomic($callback) { + try { + $this->getConnection()->beginTransaction(); + $data = $this->pop(); + + if (!is_callable($callback)) { + throw new RuntimeException("Bad callback passed to " . __METHOD__); + } + call_user_func($callback, $data); + + $this->getConnection()->commit(); + return $data; + } catch (\Exception $ex) { + $this->getConnection()->rollback(); + throw $ex; + } + } + public function clear($id = null) { if (empty($id)) { diff --git a/src/PHPQueue/Interfaces/AtomicReadBuffer.php b/src/PHPQueue/Interfaces/AtomicReadBuffer.php new file mode 100644 index 0000000..bdc62bc --- /dev/null +++ b/src/PHPQueue/Interfaces/AtomicReadBuffer.php @@ -0,0 +1,40 @@ +<?php +namespace PHPQueue\Interfaces; + +/** + * Implemented by backends that provide "at least once" consumption + */ +interface AtomicReadBuffer +{ + /** + * Pop and process data in an atomic way, so that the message will not be + * consumed in case of failure. + * + * @param $callback A processing function with the signature, + * function( $message ) throws Exception + * This function accepts an array $message, the next message to be + * popped from your buffer. In normal operation, the message is popped + * after the function returns successfully, which gives us the + * guarantee that each message is consumed successfully "at least + * once". The processor callback can handle a message by diverting to + * a reject sink, of course, a clean return only means that the + * callback has completed some action locally considered correct, not + * that there were no errors in processing. + * Throwing an exception from callback means that we were unable or + * chose not to handle the message at all, and it should be considered + * unconsumed. In this case it is not popped when popAtomic returns. + * + * @return array|null popAtomic returns the currently popped record as a + * courtesy. Note that any atomic processing should happen within + * $callback. I'm not sure when it's valid to do anything with the + * return value. + * Or, returns null if the queue is empty over the backend poll interval. + * + * @throws \Exception When the processor dies in an unexpected way. Most + * callbacks should handle rejected messages internally and not throw + * an error. A message which causes the processor to error repeatedly + * causes "queue jam", something we alert about loudly and should + * eventually shunt these messages into a reject stream. + */ + public function popAtomic( $callback ); +} diff --git a/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php b/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php index 1d15dde..ff06f66 100644 --- a/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php +++ b/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php @@ -19,7 +19,8 @@ * @throws \PHPQueue\Exception\JobNotFoundException When no data is available. * @throws \Exception Other failures. * - * TODO: We should provide transactionality to make this operation safer. + * @deprecated This is not a safe operation. Consider using + * AtomicReadBuffer::popAtomic instead. */ public function pop(); diff --git a/src/PHPQueue/Interfaces/KeyValueStore.php b/src/PHPQueue/Interfaces/KeyValueStore.php index b98b04a..b9d5823 100644 --- a/src/PHPQueue/Interfaces/KeyValueStore.php +++ b/src/PHPQueue/Interfaces/KeyValueStore.php @@ -10,11 +10,15 @@ * @param $key string * @param $value mixed Serializable value * @param $properties array optional additional message properties + * FIXME: Define better. Are these columns the indexes? Why separate + * from the message? * @throws \Exception */ public function set($key, $value, $properties=array()); /** + * Look up and return a value by its index value. + * * @param $key string * @return array The data. * @throws \Exception diff --git a/test/PHPQueue/Backend/PDOTest.php b/test/PHPQueue/Backend/PDOTest.php index da70b14..9149025 100644 --- a/test/PHPQueue/Backend/PDOTest.php +++ b/test/PHPQueue/Backend/PDOTest.php @@ -1,5 +1,6 @@ <?php namespace PHPQueue\Backend; + class PDOTest extends \PHPUnit_Framework_TestCase { /** @@ -23,11 +24,12 @@ // Check that the database exists, and politely skip if not. try { - $this->object = new \PDO($options['connection_string']); + new \PDO($options['connection_string']); } catch ( \PDOException $ex ) { $this->markTestSkipped('Database access failed: ' . $ex->getMessage()); } + $this->object = new PHPQueue\Backend\PDO($options); // Create table $this->assertTrue($this->object->createTable('pdotest')); $this->object->clearAll(); @@ -113,4 +115,48 @@ { $this->assertNull( $this->object->pop() ); } + + /** + * popAtomic should pop if the processor callback is successful. + */ + public function testPopAtomicCommit() + { + $data = array(mt_rand(), 'Abbie', 'Hoffman'); + + $this->object->push($data); + $self = $this; + $did_run = false; + $callback = function ($message) use ($self, &$did_run, $data) { + $self->assertEquals($data, $message); + $did_run = true; + }; + $this->assertEquals($data, $this->object->popAtomic($callback)); + $this->assertEquals(true, $did_run); + // Record has really gone away. + $this->assertEquals(null, $this->object->pop()); + } + + /** + * popAtomic should not pop if the processor throws an error. + */ + public function testPopAtomicRollback() + { + $data = array(mt_rand(), 'Abbie', 'Hoffman'); + + $this->object->push($data); + $self = $this; + $callback = function ($message) use ($self, $data) { + $self->assertEquals($data, $message); + throw new \Exception("Foiled!"); + }; + try { + $this->assertEquals($data, $this->object->popAtomic($callback)); + $this->fail("Should have failed by this point"); + } catch (\Exception $ex) { + $this->assertEquals("Foiled!", $ex->getMessage()); + } + + // Punchline: data should still be available for the retry pop. + $this->assertEquals($data, $this->object->popAtomic(function ($message) {})); + } } -- To view, visit https://gerrit.wikimedia.org/r/284977 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I3a704669a65fcc7d55b8d6a354d4f16aee586dc7 Gerrit-PatchSet: 22 Gerrit-Project: wikimedia/fundraising/php-queue Gerrit-Branch: master Gerrit-Owner: Awight <[email protected]> Gerrit-Reviewer: Awight <[email protected]> Gerrit-Reviewer: Cdentinger <[email protected]> Gerrit-Reviewer: Ejegg <[email protected]> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
