Ejegg has uploaded a new change for review.
https://gerrit.wikimedia.org/r/288037
Change subject: WIP popAtomic for Stomp
......................................................................
WIP popAtomic for Stomp
For maximum deployment schedule flexibility
Change-Id: I680ad0662019130be1f37cf4c38b1831d031aed1
---
M src/PHPQueue/Backend/Stomp.php
1 file changed, 32 insertions(+), 3 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/php-queue
refs/changes/37/288037/1
diff --git a/src/PHPQueue/Backend/Stomp.php b/src/PHPQueue/Backend/Stomp.php
index e855af3..558f03e 100644
--- a/src/PHPQueue/Backend/Stomp.php
+++ b/src/PHPQueue/Backend/Stomp.php
@@ -6,6 +6,7 @@
use BadMethodCallException;
use PHPQueue\Exception\BackendException;
use PHPQueue\Exception\JobNotFoundException;
+use PHPQueue\Interfaces\AtomicReadBuffer;
use PHPQueue\Interfaces\FifoQueueStore;
use PHPQueue\Interfaces\KeyValueStore;
@@ -18,7 +19,7 @@
*/
class Stomp
extends Base
- implements FifoQueueStore, KeyValueStore
+ implements FifoQueueStore, KeyValueStore, AtomicReadBuffer
{
public $queue_name;
public $uri;
@@ -87,6 +88,31 @@
return $this->readFrame();
}
+ /**
+ * @param callable $callback
+ * @return array|null
+ * @throws BackendException
+ */
+ public function popAtomic($callback)
+ {
+ $message = $this->readFrame();
+ if (!$message) {
+ return null;
+ }
+
+ $txnId = 'phpq' . mt_rand();
+
+ try {
+ $this->getConnection()->begin($txnId);
+ call_user_func($callback, $message);
+ $this->getConnection()->ack();
+ $this->getConnection()->commit($txnId);
+ } catch (\Exception $ex) {
+ $this->getConnection()->abort($txnId);
+ throw $ex;
+ }
+ }
+
public function set($key, $data=array(), $properties=array())
{
$properties['correlation-id'] = $key;
@@ -107,8 +133,11 @@
/**
* @param array|null $properties Optional selectors.
+ * @param bool $preventAck if true, will ensure message is left on queue
+ * @return array
+ * @throws BackendException
*/
- protected function readFrame($properties = null)
+ protected function readFrame($properties = null, $preventAck=false)
{
$this->beforeGet();
if ($properties === null) {
@@ -124,7 +153,7 @@
$response = $this->getConnection()->readFrame();
$this->afterGet();
- if ($response && $this->ack) {
+ if ($response && !$preventAck && $this->ack) {
$this->getConnection()->ack($response);
}
--
To view, visit https://gerrit.wikimedia.org/r/288037
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I680ad0662019130be1f37cf4c38b1831d031aed1
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/php-queue
Gerrit-Branch: master
Gerrit-Owner: Ejegg <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits