Ejegg has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/288037

Change subject: WIP popAtomic for Stomp
......................................................................

WIP popAtomic for Stomp

For maximum deployment schedule flexibility

Change-Id: I680ad0662019130be1f37cf4c38b1831d031aed1
---
M src/PHPQueue/Backend/Stomp.php
1 file changed, 32 insertions(+), 3 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/php-queue 
refs/changes/37/288037/1

diff --git a/src/PHPQueue/Backend/Stomp.php b/src/PHPQueue/Backend/Stomp.php
index e855af3..558f03e 100644
--- a/src/PHPQueue/Backend/Stomp.php
+++ b/src/PHPQueue/Backend/Stomp.php
@@ -6,6 +6,7 @@
 use BadMethodCallException;
 use PHPQueue\Exception\BackendException;
 use PHPQueue\Exception\JobNotFoundException;
+use PHPQueue\Interfaces\AtomicReadBuffer;
 use PHPQueue\Interfaces\FifoQueueStore;
 use PHPQueue\Interfaces\KeyValueStore;
 
@@ -18,7 +19,7 @@
  */
 class Stomp
     extends Base
-    implements FifoQueueStore, KeyValueStore
+    implements FifoQueueStore, KeyValueStore, AtomicReadBuffer
 {
     public $queue_name;
     public $uri;
@@ -87,6 +88,31 @@
         return $this->readFrame();
     }
 
+    /**
+     * @param callable $callback
+     * @return array|null
+     * @throws BackendException
+     */
+    public function popAtomic($callback)
+    {
+        $message = $this->readFrame();
+        if (!$message) {
+            return null;
+        }
+
+        $txnId = 'phpq' . mt_rand();
+
+        try {
+            $this->getConnection()->begin($txnId);
+            call_user_func($callback, $message);
+            $this->getConnection()->ack();
+            $this->getConnection()->commit($txnId);
+        } catch (\Exception $ex) {
+            $this->getConnection()->abort($txnId);
+            throw $ex;
+        }
+    }
+
     public function set($key, $data=array(), $properties=array())
     {
         $properties['correlation-id'] = $key;
@@ -107,8 +133,11 @@
 
     /**
      * @param array|null $properties Optional selectors.
+     * @param bool $preventAck if true, will ensure message is left on queue
+     * @return array
+     * @throws BackendException
      */
-    protected function readFrame($properties = null)
+    protected function readFrame($properties = null, $preventAck=false)
     {
         $this->beforeGet();
         if ($properties === null) {
@@ -124,7 +153,7 @@
         $response = $this->getConnection()->readFrame();
         $this->afterGet();
 
-        if ($response && $this->ack) {
+        if ($response && !$preventAck && $this->ack) {
             $this->getConnection()->ack($response);
         }
 

-- 
To view, visit https://gerrit.wikimedia.org/r/288037
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I680ad0662019130be1f37cf4c38b1831d031aed1
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/php-queue
Gerrit-Branch: master
Gerrit-Owner: Ejegg <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to