Ejegg has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/312443

Change subject: Eradicate Stomp from crm repo
......................................................................

Eradicate Stomp from crm repo

Well, except for composer.lock, since DI still has a dependency on it.
This will totally conflict with the new damaged UI patch, but I couldn't
resist.

Change-Id: I3b41e9579396ad76cae0066bdc373c9df0cd8749
---
M composer.json
D sites/all/modules/queue2civicrm/Stomp.php
D sites/all/modules/queue2civicrm/Stomp/Exception.php
D sites/all/modules/queue2civicrm/Stomp/Frame.php
D sites/all/modules/queue2civicrm/Stomp/Message.php
D sites/all/modules/queue2civicrm/Stomp/Message/Bytes.php
D sites/all/modules/queue2civicrm/Stomp/Message/Map.php
M sites/all/modules/queue2civicrm/queue2civicrm.info
M sites/all/modules/queue2civicrm/queue2civicrm.module
M sites/all/modules/queue2civicrm/queue_consume.drush.inc
M sites/all/modules/queue2civicrm/recurring/recurring_queue_consume.drush.inc
D sites/all/modules/queue2civicrm/tests/includes/MessageSource.php
M sites/all/modules/wmf_civicrm/WmfTransaction.php
D sites/all/modules/wmf_common/Queue.php
M sites/all/modules/wmf_common/failmail.php
M sites/all/modules/wmf_common/wmf_common.info
M sites/all/modules/wmf_common/wmf_common.module
17 files changed, 8 insertions(+), 1,644 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/crm 
refs/changes/43/312443/1

diff --git a/composer.json b/composer.json
index 7150415..e16b453 100644
--- a/composer.json
+++ b/composer.json
@@ -21,7 +21,6 @@
         "cogpowered/finediff": "0.*",
         "wikimedia/donation-interface": "dev-master",
         "wikimedia/smash-pig": "dev-master",
-        "fusesource/stomp-php": "2.*",
         "phpmailer/phpmailer": "5.2.6",
         "phpseclib/phpseclib": "0.3.7",
         "predis/predis": "1.*",
diff --git a/sites/all/modules/queue2civicrm/Stomp.php 
b/sites/all/modules/queue2civicrm/Stomp.php
deleted file mode 100644
index 75134da..0000000
--- a/sites/all/modules/queue2civicrm/Stomp.php
+++ /dev/null
@@ -1,604 +0,0 @@
-<?php
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* vim: set expandtab tabstop=3 shiftwidth=3: */
-
-require_once 'Stomp/Frame.php';
-
-/**
- * A Stomp Connection
- *
- *
- * @package Stomp
- * @author Hiram Chirino <hi...@hiramchirino.com>
- * @author Dejan Bosanac <de...@nighttale.net> 
- * @author Michael Caplan <mcap...@labnet.net>
- * @version $Revision: 43 $
- */
-class Stomp
-{
-    /**
-     * Perform request synchronously
-     *
-     * @var boolean
-     */
-    public $sync = false;
-
-    /**
-     * Default prefetch size
-     *
-     * @var int
-     */
-       public $prefetchSize = 1;
-    
-       /**
-     * Client id used for durable subscriptions
-     *
-     * @var string
-     */
-       public $clientId = null;
-    
-    protected $_brokerUri = null;
-    protected $_socket = null;
-    protected $_hosts = array();
-    protected $_params = array();
-    protected $_subscriptions = array();
-    protected $_defaultPort = 61613;
-    protected $_currentHost = - 1;
-    protected $_attempts = 10;
-    protected $_username = '';
-    protected $_password = '';
-    protected $_sessionId;
-    protected $_read_timeout_seconds = 5;
-    protected $_read_timeout_milliseconds = 0;
-    
-    /**
-     * Constructor
-     *
-     * @param string $brokerUri Broker URL
-     * @throws Stomp_Exception
-     */
-    public function __construct ($brokerUri)
-    {
-        $this->_brokerUri = $brokerUri;
-        $this->_init();
-    }
-    /**
-     * Initialize connection
-     *
-     * @throws Stomp_Exception
-     */
-    protected function _init ()
-    {
-        $pattern = 
"|^(([a-zA-Z]+)://)+\(*([a-zA-Z0-9\.:/i,-]+)\)*\??([a-zA-Z0-9=]*)$|i";
-        if (preg_match($pattern, $this->_brokerUri, $regs)) {
-            $scheme = $regs[2];
-            $hosts = $regs[3];
-            $params = $regs[4];
-            if ($scheme != "failover") {
-                $this->_processUrl($this->_brokerUri);
-            } else {
-                $urls = explode(",", $hosts);
-                foreach ($urls as $url) {
-                    $this->_processUrl($url);
-                }
-            }
-            if ($params != null) {
-                parse_str($params, $this->_params);
-            }
-        } else {
-            require_once 'Stomp/Exception.php';
-            throw new Stomp_Exception("Bad Broker URL {$this->_brokerUri}");
-        }
-    }
-    /**
-     * Process broker URL
-     *
-     * @param string $url Broker URL
-     * @throws Stomp_Exception
-     * @return boolean
-     */
-    protected function _processUrl ($url)
-    {
-        $parsed = parse_url($url);
-        if ($parsed) {
-            array_push($this->_hosts, array($parsed['host'] , $parsed['port'] 
, $parsed['scheme']));
-        } else {
-            require_once 'Stomp/Exception.php';
-            throw new Stomp_Exception("Bad Broker URL $url");
-        }
-    }
-    /**
-     * Make socket connection to the server
-     *
-     * @throws Stomp_Exception
-     */
-    protected function _makeConnection ()
-    {
-        if (count($this->_hosts) == 0) {
-            require_once 'Stomp/Exception.php';
-            throw new Stomp_Exception("No broker defined");
-        }
-        
-        // force disconnect, if previous established connection exists
-        $this->disconnect();
-        
-        $i = $this->_currentHost;
-        $att = 0;
-        $connected = false;
-        while (! $connected && $att ++ < $this->_attempts) {
-            if (isset($this->_params['randomize']) && 
$this->_params['randomize'] == 'true') {
-                $i = rand(0, count($this->_hosts) - 1);
-            } else {
-                $i = ($i + 1) % count($this->_hosts);
-            }
-            $broker = $this->_hosts[$i];
-            $host = $broker[0];
-            $port = $broker[1];
-            $scheme = $broker[2];
-            if ($port == null) {
-                $port = $this->_defaultPort;
-            }
-            if ($this->_socket != null) {
-                fclose($this->_socket);
-                $this->_socket = null;
-            }
-            $this->_socket = @fsockopen($scheme . '://' . $host, $port);
-            if (!is_resource($this->_socket) && $att >= $this->_attempts && 
!array_key_exists($i + 1, $this->_hosts)) {
-                require_once 'Stomp/Exception.php';
-                throw new Stomp_Exception("Could not connect to $host:$port 
($att/{$this->_attempts})");
-            } else if (is_resource($this->_socket)) {
-                $connected = true;
-                $this->_currentHost = $i;
-                break;
-            }
-        }
-        if (! $connected) {
-            require_once 'Stomp/Exception.php';
-            throw new Stomp_Exception("Could not connect to a broker");
-        }
-    }
-    /**
-     * Connect to server
-     *
-     * @param string $username
-     * @param string $password
-     * @return boolean
-     * @throws Stomp_Exception
-     */
-    public function connect ($username = '', $password = '')
-    {
-        $this->_makeConnection();
-        if ($username != '') {
-            $this->_username = $username;
-        }
-        if ($password != '') {
-            $this->_password = $password;
-        }
-               $headers = array('login' => $this->_username , 'passcode' => 
$this->_password);
-               if ($this->clientId != null) {
-                       $headers["client-id"] = $this->clientId;
-               }
-               $frame = new Stomp_Frame("CONNECT", $headers);
-        $this->_writeFrame($frame);
-        $frame = $this->readFrame();
-        if ($frame instanceof Stomp_Frame && $frame->command == 'CONNECTED') {
-            $this->_sessionId = $frame->headers["session"];
-            return true;
-        } else {
-            require_once 'Stomp/Exception.php';
-            if ($frame instanceof Stomp_Frame) {
-                throw new Stomp_Exception("Unexpected command: 
{$frame->command}", 0, $frame->body);
-            } else {
-                throw new Stomp_Exception("Connection not acknowledged");
-            }
-        }
-    }
-    
-    /**
-     * Check if client session has ben established
-     *
-     * @return boolean
-     */
-    public function isConnected ()
-    {
-        return !empty($this->_sessionId) && is_resource($this->_socket);
-    }
-    /**
-     * Current stomp session ID
-     *
-     * @return string
-     */
-    public function getSessionId()
-    {
-        return $this->_sessionId;
-    }
-    /**
-     * Send a message to a destination in the messaging system 
-     *
-     * @param string $destination Destination queue
-     * @param string|Stomp_Frame $msg Message
-     * @param array $properties
-     * @param boolean $sync Perform request synchronously
-     * @return boolean
-     */
-    public function send ($destination, $msg, $properties = null, $sync = null)
-    {
-        if ($msg instanceof Stomp_Frame) {
-            $msg->headers['destination'] = $destination;
-            $msg->headers = array_merge($msg->headers, $properties);
-            $frame = $msg;
-        } else {
-            $headers = $properties;
-            $headers['destination'] = $destination;
-            $frame = new Stomp_Frame('SEND', $headers, $msg);
-        }
-        $this->_prepareReceipt($frame, $sync);
-        $this->_writeFrame($frame);
-        return $this->_waitForReceipt($frame, $sync);
-    }
-    /**
-     * Prepair frame receipt
-     *
-     * @param Stomp_Frame $frame
-     * @param boolean $sync
-     */
-    protected function _prepareReceipt (Stomp_Frame $frame, $sync)
-    {
-        $receive = $this->sync;
-        if ($sync !== null) {
-            $receive = $sync;
-        }
-        if ($receive == true) {
-            $frame->headers['receipt'] = md5(microtime());
-        }
-    }
-    /**
-     * Wait for receipt
-     *
-     * @param Stomp_Frame $frame
-     * @param boolean $sync
-     * @return boolean
-     * @throws Stomp_Exception
-     */
-    protected function _waitForReceipt (Stomp_Frame $frame, $sync)
-    {
-
-        $receive = $this->sync;
-        if ($sync !== null) {
-            $receive = $sync;
-        }
-        if ($receive == true) {
-            $id = (isset($frame->headers['receipt'])) ? 
$frame->headers['receipt'] : null;
-            if ($id == null) {
-                return true;
-            }
-            $frame = $this->readFrame();
-            if ($frame instanceof Stomp_Frame && $frame->command == 'RECEIPT') 
{
-                if ($frame->headers['receipt-id'] == $id) {
-                    return true;
-                } else {
-                    require_once 'Stomp/Exception.php';
-                    throw new Stomp_Exception("Unexpected receipt id 
{$frame->headers['receipt-id']}", 0, $frame->body);
-                }
-            } else {
-                require_once 'Stomp/Exception.php';
-                if ($frame instanceof Stomp_Frame) {
-                    throw new Stomp_Exception("Unexpected command 
{$frame->command}", 0, $frame->body);
-                } else {
-                    throw new Stomp_Exception("Receipt not received");
-                }
-            }
-        }
-        return true;
-    }
-    /**
-     * Register to listen to a given destination
-     *
-     * @param string $destination Destination queue
-     * @param array $properties
-     * @param boolean $sync Perform request synchronously
-     * @return boolean
-     * @throws Stomp_Exception
-     */
-    public function subscribe ($destination, $properties = null, $sync = null)
-    {
-        $headers = array('ack' => 'client');
-               $headers['activemq.prefetchSize'] = $this->prefetchSize;
-               if ($this->clientId != null) {
-                       $headers["activemq.subcriptionName"] = $this->clientId;
-               }
-        if (isset($properties)) {
-            foreach ($properties as $name => $value) {
-                $headers[$name] = $value;
-            }
-        }
-        $headers['destination'] = $destination;
-        $frame = new Stomp_Frame('SUBSCRIBE', $headers);
-        $this->_prepareReceipt($frame, $sync);
-        $this->_writeFrame($frame);
-        if ($this->_waitForReceipt($frame, $sync) == true) {
-            $this->_subscriptions[$destination] = $properties;
-            return true;
-        } else {
-            return false;
-        }
-    }
-    /**
-     * Remove an existing subscription
-     *
-     * @param string $destination
-     * @param array $properties
-     * @param boolean $sync Perform request synchronously
-     * @return boolean
-     * @throws Stomp_Exception
-     */
-    public function unsubscribe ($destination, $properties = null, $sync = 
null)
-    {
-        $headers = array();
-        if (isset($properties)) {
-            foreach ($properties as $name => $value) {
-                $headers[$name] = $value;
-            }
-        }
-        $headers['destination'] = $destination;
-        $frame = new Stomp_Frame('UNSUBSCRIBE', $headers);
-        $this->_prepareReceipt($frame, $sync);
-        $this->_writeFrame($frame);
-        if ($this->_waitForReceipt($frame, $sync) == true) {
-            unset($this->_subscriptions[$destination]);
-            return true;
-        } else {
-            return false;
-        }
-    }
-    /**
-     * Start a transaction
-     *
-     * @param string $transactionId
-     * @param boolean $sync Perform request synchronously
-     * @return boolean
-     * @throws Stomp_Exception
-     */
-    public function begin ($transactionId = null, $sync = null)
-    {
-        $headers = array();
-        if (isset($transactionId)) {
-            $headers['transaction'] = $transactionId;
-        }
-        $frame = new Stomp_Frame('BEGIN', $headers);
-        $this->_prepareReceipt($frame, $sync);
-        $this->_writeFrame($frame);
-        return $this->_waitForReceipt($frame, $sync);
-    }
-    /**
-     * Commit a transaction in progress
-     *
-     * @param string $transactionId
-     * @param boolean $sync Perform request synchronously
-     * @return boolean
-     * @throws Stomp_Exception
-     */
-    public function commit ($transactionId = null, $sync = null)
-    {
-        $headers = array();
-        if (isset($transactionId)) {
-            $headers['transaction'] = $transactionId;
-        }
-        $frame = new Stomp_Frame('COMMIT', $headers);
-        $this->_prepareReceipt($frame, $sync);
-        $this->_writeFrame($frame);
-        return $this->_waitForReceipt($frame, $sync);
-    }
-    /**
-     * Roll back a transaction in progress
-     *
-     * @param string $transactionId
-     * @param boolean $sync Perform request synchronously
-     */
-    public function abort ($transactionId = null, $sync = null)
-    {
-        $headers = array();
-        if (isset($transactionId)) {
-            $headers['transaction'] = $transactionId;
-        }
-        $frame = new Stomp_Frame('ABORT', $headers);
-        $this->_prepareReceipt($frame, $sync);
-        $this->_writeFrame($frame);
-        return $this->_waitForReceipt($frame, $sync);
-    }
-    /**
-     * Acknowledge consumption of a message from a subscription
-        * Note: This operation is always asynchronous
-     *
-     * @param string|Stomp_Frame $messageMessage ID
-     * @param string $transactionId
-     * @return boolean
-     * @throws Stomp_Exception
-     */
-    public function ack ($message, $transactionId = null)
-    {
-        if ($message instanceof Stomp_Frame) {
-            $frame = new Stomp_Frame('ACK', $message->headers);
-            $this->_writeFrame($frame);
-            return true;
-        } else {
-            $headers = array();
-            if (isset($transactionId)) {
-                $headers['transaction'] = $transactionId;
-            }
-            $headers['message-id'] = $message;
-            $frame = new Stomp_Frame('ACK', $headers);
-            $this->_writeFrame($frame);
-            return true;
-        }
-    }
-    /**
-     * Graceful disconnect from the server
-     *
-     */
-    public function disconnect ()
-    {
-               $headers = array();
-
-               if ($this->clientId != null) {
-                       $headers["client-id"] = $this->clientId;
-               }
-
-        if (is_resource($this->_socket)) {
-            $this->_writeFrame(new Stomp_Frame('DISCONNECT', $headers));
-            fclose($this->_socket);
-        }
-        $this->_socket = null;
-        $this->_sessionId = null;
-        $this->_currentHost = -1;
-        $this->_subscriptions = array();
-        $this->_username = '';
-        $this->_password = '';
-    }
-    /**
-     * Write frame to server
-     *
-     * @param Stomp_Frame $stompFrame
-     */
-    protected function _writeFrame (Stomp_Frame $stompFrame)
-    {
-        if (!is_resource($this->_socket)) {
-            require_once 'Stomp/Exception.php';
-            throw new Stomp_Exception('Socket connection hasn\'t been 
established');
-        }
-
-        $data = $stompFrame->__toString();
-        
-        $r = fwrite($this->_socket, $data, strlen($data));
-        if ($r === false || $r == 0) {
-            $this->_reconnect();
-            $this->_writeFrame($stompFrame);
-        }
-    }
-    
-    /**
-     * Set timeout to wait for content to read
-     *
-     * @param int $seconds_to_wait  Seconds to wait for a frame
-     * @param int $milliseconds Milliseconds to wait for a frame
-     */
-    public function setReadTimeout($seconds, $milliseconds = 0) 
-    {
-        $this->_read_timeout_seconds = $seconds;
-        $this->_read_timeout_milliseconds = $milliseconds;
-    }
-    
-    /**
-     * Read responce frame from server
-     *
-     * @return Stomp_Frame|Stomp_Message_Map|boolean False when no frame to 
read
-     */
-    public function readFrame ()
-    {
-        if (!$this->hasFrameToRead()) {
-            return false;
-        }
-        
-        stream_set_timeout($this->_socket,
-            $this->_read_timeout_seconds + $this->_read_timeout_milliseconds / 
1000.0);
-        $rb = 1024;
-        $data = '';
-        do {
-            $read = fgets($this->_socket, $rb);
-            $info = stream_get_meta_data($this->_socket);
-            if ($info['timed_out']) {
-              return FALSE;
-            }
-            //if ($read === false) {
-            //    $this->_reconnect();
-            //    return $this->readFrame();
-            //}
-            $data .= $read;
-            $len = strlen($data);
-        } while ($read && ($len < 2 || ! ($data[$len - 2] == "\x00" && 
$data[$len - 1] == "\n")));
-        
-        list ($header, $body) = explode("\n\n", $data, 2);
-        $header = explode("\n", $header);
-        $headers = array();
-        $command = null;
-        foreach ($header as $v) {
-            if (isset($command)) {
-                list ($name, $value) = explode(':', $v, 2);
-                $headers[$name] = $value;
-            } else {
-                $command = $v;
-            }
-        }
-        $frame = new Stomp_Frame($command, $headers, trim($body));
-        
-        if (isset($frame->headers['amq-msg-type']) && 
$frame->headers['amq-msg-type'] == 'MapMessage') {
-            require_once 'Stomp/Message/Map.php';
-            return new Stomp_Message_Map($frame);
-        } else {
-            return $frame;
-        }
-    }
-    
-    /**
-     * Check if there is a frame to read
-     *
-     * @return boolean
-     */
-    public function hasFrameToRead()
-    {
-        return TRUE; // http://bugs.php.net/bug.php?id=46024
-        
-        $read = array($this->_socket);
-        $write = null;
-        $except = null;
-        
-        $has_frame_to_read = stream_select($read, $write, $except, 
$this->_read_timeout_seconds, $this->_read_timeout_milliseconds);
-
-        if ($has_frame_to_read === false) {
-            throw new Stomp_Exception('Check failed to determin if the socket 
is readable');
-        } else if ($has_frame_to_read > 0) {
-            return true;
-        } else {
-            return false; 
-        }
-    }
-    
-    /**
-     * Reconnects and renews subscriptions (if there were any)
-     * Call this method when you detect connection problems     
-     */
-    protected function _reconnect ()
-    {
-        $subscriptions = $this->_subscriptions;
-        
-        $this->connect($this->_username, $this->_password);
-        foreach ($subscriptions as $dest => $properties) {
-            $this->subscribe($dest, $properties);
-        }
-    }
-    /**
-     * Graceful object desruction
-     *
-     */
-    public function __destruct()
-    {
-        $this->disconnect();
-    }
-}
-?>
diff --git a/sites/all/modules/queue2civicrm/Stomp/Exception.php 
b/sites/all/modules/queue2civicrm/Stomp/Exception.php
deleted file mode 100644
index 828e26e..0000000
--- a/sites/all/modules/queue2civicrm/Stomp/Exception.php
+++ /dev/null
@@ -1,57 +0,0 @@
-<?php
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* vim: set expandtab tabstop=3 shiftwidth=3: */
-
-/**
- * A Stomp Connection
- *
- *
- * @package Stomp
- * @author Michael Caplan <mcap...@labnet.net>
- * @version $Revision: 23 $
- */
-class Stomp_Exception extends Exception
-{
-    protected $_details;
-    
-    /**
-     * Constructor
-     *
-     * @param string $message Error message
-     * @param int $code Error code
-     * @param string $details Stomp server error details
-     */
-    public function __construct($message = null, $code = 0, $details = '')
-    {
-        $this->_details = $details;
-        
-        parent::__construct($message, $code);
-    }
-    
-    /**
-     * Stomp server error details
-     *
-     * @return string
-     */
-    public function getDetails()
-    {
-        return $this->_details;
-    }
-}
-?>
\ No newline at end of file
diff --git a/sites/all/modules/queue2civicrm/Stomp/Frame.php 
b/sites/all/modules/queue2civicrm/Stomp/Frame.php
deleted file mode 100644
index bcabab8..0000000
--- a/sites/all/modules/queue2civicrm/Stomp/Frame.php
+++ /dev/null
@@ -1,80 +0,0 @@
-<?php
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* vim: set expandtab tabstop=3 shiftwidth=3: */
-
-/**
- * Stomp Frames are messages that are sent and received on a StompConnection.
- *
- * @package Stomp
- * @author Hiram Chirino <hi...@hiramchirino.com>
- * @author Dejan Bosanac <de...@nighttale.net>
- * @author Michael Caplan <mcap...@labnet.net>
- * @version $Revision: 36 $
- */
-class Stomp_Frame
-{
-    public $command;
-    public $headers = array();
-    public $body;
-    
-    /**
-     * Constructor
-     *
-     * @param string $command
-     * @param array $headers
-     * @param string $body
-     */
-    public function __construct ($command = null, $headers = null, $body = 
null)
-    {
-        $this->_init($command, $headers, $body);
-    }
-    
-    protected function _init ($command = null, $headers = null, $body = null)
-    {
-        $this->command = $command;
-        if ($headers != null) {
-            $this->headers = $headers;
-        }
-        $this->body = $body;
-        
-        if ($this->command == 'ERROR') {
-            require_once 'Exception.php';
-            throw new Stomp_Exception($this->headers['message'], 0, 
$this->body);
-        }
-    }
-    
-    /**
-     * Convert frame to transportable string
-     *
-     * @return string
-     */
-    public function __toString()
-    {
-        $data = $this->command . "\n";
-        
-        foreach ($this->headers as $name => $value) {
-            $data .= $name . ": " . $value . "\n";
-        }
-        
-        $data .= "\n";
-        $data .= $this->body;
-        return $data .= "\x00\n";
-    }
-}
-?>
\ No newline at end of file
diff --git a/sites/all/modules/queue2civicrm/Stomp/Message.php 
b/sites/all/modules/queue2civicrm/Stomp/Message.php
deleted file mode 100644
index 6bcad3e..0000000
--- a/sites/all/modules/queue2civicrm/Stomp/Message.php
+++ /dev/null
@@ -1,37 +0,0 @@
-<?php
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* vim: set expandtab tabstop=3 shiftwidth=3: */
-
-require_once 'Stomp/Frame.php';
-
-/**
- * Basic text stomp message
- *
- * @package Stomp
- * @author Dejan Bosanac <de...@nighttale.net>
- * @version $Revision: 23 $
- */
-class Stomp_Message extends Stomp_Frame
-{
-    public function __construct ($body, $headers = null)
-    {
-        $this->_init("SEND", $headers, $body);
-    }
-}
-?>
\ No newline at end of file
diff --git a/sites/all/modules/queue2civicrm/Stomp/Message/Bytes.php 
b/sites/all/modules/queue2civicrm/Stomp/Message/Bytes.php
deleted file mode 100644
index c75f23e..0000000
--- a/sites/all/modules/queue2civicrm/Stomp/Message/Bytes.php
+++ /dev/null
@@ -1,47 +0,0 @@
-<?php
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* vim: set expandtab tabstop=3 shiftwidth=3: */
-
-require_once 'Stomp/Message.php';
-
-/**
- * Message that contains a stream of uninterpreted bytes
- *
- * @package Stomp
- * @author Dejan Bosanac <de...@nighttale.net>
- * @version $Revision: 23 $
- */
-class Stomp_Message_Bytes extends Stomp_Message
-{
-    /**
-     * Constructor
-     *
-     * @param string $body
-     * @param array $headers
-     */
-    function __construct ($body, $headers = null)
-    {
-        $this->_init("SEND", $headers, $body);
-        if ($this->headers == null) {
-            $this->headers = array();
-        }
-        $this->headers['content-length'] = count($body);
-    }
-}
-?>
\ No newline at end of file
diff --git a/sites/all/modules/queue2civicrm/Stomp/Message/Map.php 
b/sites/all/modules/queue2civicrm/Stomp/Message/Map.php
deleted file mode 100644
index 288456a..0000000
--- a/sites/all/modules/queue2civicrm/Stomp/Message/Map.php
+++ /dev/null
@@ -1,55 +0,0 @@
-<?php
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* vim: set expandtab tabstop=3 shiftwidth=3: */
-
-require_once 'Stomp/Message.php';
-
-/**
- * Message that contains a set of name-value pairs
- *
- * @package Stomp
- * @author Dejan Bosanac <de...@nighttale.net>
- * @version $Revision: 23 $
- */
-class Stomp_Message_Map extends Stomp_Message
-{
-    public $map;
-    
-    /**
-     * Constructor
-     *
-     * @param Stomp_Frame|string $msg
-     * @param array $headers
-     */
-    function __construct ($msg, $headers = null)
-    {
-        if ($msg instanceof Stomp_Frame) {
-            $this->_init($msg->command, $msg->headers, $msg->body);
-            $this->map = json_decode($msg->body);
-        } else {
-            $this->_init("SEND", $headers, $msg);
-            if ($this->headers == null) {
-                $this->headers = array();
-            }
-            $this->headers['amq-msg-type'] = 'MapMessage';
-            $this->body = json_encode($msg);
-        }
-    }
-}
-?>
\ No newline at end of file
diff --git a/sites/all/modules/queue2civicrm/queue2civicrm.info 
b/sites/all/modules/queue2civicrm/queue2civicrm.info
index e154679..7363dd8 100644
--- a/sites/all/modules/queue2civicrm/queue2civicrm.info
+++ b/sites/all/modules/queue2civicrm/queue2civicrm.info
@@ -1,5 +1,5 @@
 name = Queue to CiviCRM
-description = Imports contributions from a STOMP-compliant message queue.
+description = Imports contributions from a message queue.
 core = 7.x
 configure = admin/config/queue2civicrm
 dependencies[] = exchange_rates
@@ -7,7 +7,5 @@
 package = queue2civicrm
 files[] = Queue2civicrmTrxnCounter.php
 files[] = DonationQueueConsumer.php
-files[] = Stomp.php
 files[] = tests/includes/Message.php
-files[] = tests/includes/MessageSource.php
 files[] = tests/includes/TestingSmashPigDbQueueConfiguration.php
diff --git a/sites/all/modules/queue2civicrm/queue2civicrm.module 
b/sites/all/modules/queue2civicrm/queue2civicrm.module
index e36778b..aa7992c 100644
--- a/sites/all/modules/queue2civicrm/queue2civicrm.module
+++ b/sites/all/modules/queue2civicrm/queue2civicrm.module
@@ -43,39 +43,6 @@
 }
 
 /**
- * Create or retrieve a stomp queue connection.  It's best to use a different
- * Queue object for each queue you access if you are interleaving reads from
- * different queues.
- * @param string $name a label for the Queue object
- * @return Queue named connection object
- */
-function queue2civicrm_stomp( $name = 'default' ) {
-  static $q = array();
-
-  if ( empty( $q[$name] ) ) {
-    $q[$name] = new Queue( queue2civicrm_stomp_url() );
-  }
-
-  return $q[$name];
-}
-
-function queue2civicrm_test() {
-  $content = array();
-  
-  $content[] = '<p>Attempting connection to ' . queue2civicrm_stomp_url() . 
'... ';  
-  $q = queue2civicrm_stomp();
-  try {
-    $q->getConnection();
-    $content[] = '[SUCCESS]</p>';
-  }
-  catch (Exception $e) {
-    $content[] = '[FAILURE], error: ' . $e . '</p>';
-  }
-  
-  return implode("\n", $content);
-}
-
-/**
  * Implements hook_permission().
  */
 function queue2civicrm_permission() {
@@ -97,23 +64,6 @@
     '#title' => t('Disable job'),
     '#description' => t('If checked, no message processing will be 
performed.'),
     '#default_value' => variable_get('queue2civicrm_disable', false),
-  );
-
-  $form['queue2civicrm_url'] = array(
-    '#type' => 'textfield',
-    '#title' => t('Connection URL'),
-    '#description' => t('Include the port number, like so for a typical 
development environment: !example_url', array( '!example_url' => 
'http://localhost:61613')),
-    '#required' => TRUE,
-    '#default_value' => queue2civicrm_stomp_url(),
-  );
-
-  // TODO: Remove when done mirroring from audits to ActiveMQ
-  $form['queue2civicrm_subscription'] = array(
-    '#type' => 'textfield',
-    '#title' => t('Subscription path'),
-    '#description' => t('The queue holding normal, incoming donations waiting 
to be processed.'),
-    '#required' => TRUE,
-    '#default_value' => variable_get('queue2civicrm_subscription', 
'/queue/donations'),
   );
 
   $form['queue2civicrm_batch'] = array(
diff --git a/sites/all/modules/queue2civicrm/queue_consume.drush.inc 
b/sites/all/modules/queue2civicrm/queue_consume.drush.inc
index 123da0f..b6045dd 100644
--- a/sites/all/modules/queue2civicrm/queue_consume.drush.inc
+++ b/sites/all/modules/queue2civicrm/queue_consume.drush.inc
@@ -1,7 +1,7 @@
 <?php
 /**
  * @file queue_consume.drush.inc
- *  Consume and process items from an ActiveMQ queue into CiviCRM
+ *  Consume and process items from a message queue into CiviCRM
  * @author Arthur Richards <aricha...@wikimedia.org>
  * @TODO print some useful info to STDOUT
  */
@@ -14,7 +14,7 @@
 
   $items['queue-consume'] = array(
     'description' => 
-      'Consumes items from a specified ActiveMQ queue and processes them into 
CiviCRM',
+      'Consumes items from a specified queue and processes them into CiviCRM',
     'examples' => array( 'drush queue-consume' => '# Consume the queue' ), 
     'aliases' => array( 'qc' ),
   );
@@ -38,7 +38,7 @@
 function queue_consume_drush_help( $section ) {
   switch ( $section ) {
     case 'drush:queue-consume':
-      return dt( "Pulls items from an ActiveMQ queue and processes them into 
CiviCRM" );
+      return dt( "Pulls items from a message queue and processes them into 
CiviCRM" );
     case 'drush:donations-process-messagefile':
       return dt( "Feeds messages directly into the import pipeline, bypassing 
the queue." );
   }
diff --git 
a/sites/all/modules/queue2civicrm/recurring/recurring_queue_consume.drush.inc 
b/sites/all/modules/queue2civicrm/recurring/recurring_queue_consume.drush.inc
index 779afa5..c107ede 100644
--- 
a/sites/all/modules/queue2civicrm/recurring/recurring_queue_consume.drush.inc
+++ 
b/sites/all/modules/queue2civicrm/recurring/recurring_queue_consume.drush.inc
@@ -8,7 +8,7 @@
 
   $items['recurring-queue-consume'] = array(
     'description' => 
-      'Consumes items from a specified ActiveMQ queue and processes them into 
CiviCRM',
+      'Consumes items from a specified message queue and processes them into 
CiviCRM',
     'examples' => array( 'drush recurring-queue-consume' => '# Consume the 
queue' ), 
     'aliases' => array( 'rqc' ),
   );
@@ -33,7 +33,7 @@
 function recurring_queue_consume_drush_help( $section ) {
   switch ( $section ) {
     case 'drush:recurring-queue-consume':
-      return dt( "Pulls recurring items from an ActiveMQ queue and processes 
them into CiviCRM" );
+      return dt( "Pulls recurring items from a message queue and processes 
them into CiviCRM" );
     case 'drush:recurring-process-messagefile':
       return dt( "Feeds a single message directly into the recurring import 
pipeline, bypassing the queue." );
   }
diff --git a/sites/all/modules/queue2civicrm/tests/includes/MessageSource.php 
b/sites/all/modules/queue2civicrm/tests/includes/MessageSource.php
deleted file mode 100644
index 89a8e1f..0000000
--- a/sites/all/modules/queue2civicrm/tests/includes/MessageSource.php
+++ /dev/null
@@ -1,43 +0,0 @@
-<?php
-
-/**
- * FIXME: injecting into a live MQ should done from a drush command, not from 
tests
- */
-class MessageSource {
-    protected $stomp_url = 'tcp://localhost:61613';
-    //protected $message_type = "TransactionMessage";
-    protected $queue = "/queue/test-donations";
-
-    function __construct( $stomp_url = null, $queue = null ) {
-        if ( $stomp_url ) {
-            $this->stomp_url = $stomp_url;
-        }
-        $this->stomp = new Stomp( $this->stomp_url );
-
-        if (method_exists($this->stomp, 'connect'))
-                $this->stomp->connect();
-
-        if ( $queue ) {
-            $this->queue = $queue;
-        }
-    }
-
-    function setQueue( $queue ) {
-        $this->queue = $queue;
-    }
-
-    function inject( $values = array() ) {
-        //$message = new $message_type( $values );
-        if ( is_array( $values ) ) {
-            $message = new TransactionMessage( $values );
-        } elseif ( is_object( $values ) ) {
-            $message = $values;
-        }
-
-        $this->stomp->send(
-            $this->queue,
-            json_encode( $message->getBody() ),
-            $message->getHeaders()
-        );
-    }
-}
diff --git a/sites/all/modules/wmf_civicrm/WmfTransaction.php 
b/sites/all/modules/wmf_civicrm/WmfTransaction.php
index 054544a..1311cc7 100644
--- a/sites/all/modules/wmf_civicrm/WmfTransaction.php
+++ b/sites/all/modules/wmf_civicrm/WmfTransaction.php
@@ -48,7 +48,7 @@
     }
 
     static function from_message( $msg ) {
-        // stomp message, does not have a unique id yet
+        // queue message, does not have a unique id yet
         $transaction = new WmfTransaction();
         if ( isset( $msg['gateway_refund_id'] ) ) {
             $transaction->gateway_txn_id = $msg['gateway_refund_id'];
diff --git a/sites/all/modules/wmf_common/Queue.php 
b/sites/all/modules/wmf_common/Queue.php
deleted file mode 100644
index 393bb8e..0000000
--- a/sites/all/modules/wmf_common/Queue.php
+++ /dev/null
@@ -1,466 +0,0 @@
-<?php
-
-class Queue {
-    protected $url;
-    protected $connection = NULL;
-
-    const MAX_CONNECT_ATTEMPTS = 3;
-
-    function __construct( $url ) {
-        $this->url = $url;
-    }
-
-    function __destruct() {
-        $this->disconnect();
-    }
-
-    /**
-     * Pop from a queue and execute a callback on each message
-     *
-     * @param string $queue name of the queue we will read from
-     * @param integer|null $batch_size maximum number of messages to process, 
or falseish for unlimited
-     * @param integer|null $time_limit maximum time to spend looping, in 
seconds, or falseish for unlimited
-     * @param callable $callback: must have the signature function($msg) -> 
bool
-     *
-     * @return integer number of messages processed
-     */
-    function dequeue_loop( $queue, $batch_size, $time_limit, $callback ) {
-        if ( !$batch_size and !$time_limit ) {
-            throw new Exception( "Bad configuration: need to give a count or 
time limit" );
-        }
-
-        $queue = $this->normalizeQueueName( $queue );
-
-        watchdog( 'wmf_common',
-            'Attempting to process at most %size contribution(s) from "%queue" 
queue, spending at most %time seconds.',
-            array(
-                '%size' => ( $batch_size ? $batch_size : 'unlimited' ),
-                '%time' => ( $time_limit ? $time_limit : 'unlimited' ),
-                '%queue' => $queue,
-            ),
-            WATCHDOG_INFO
-        );
-
-        $con = $this->getFreshConnection();
-
-        // Create the subscription -- to handle requeues we will only select
-        // things that either have no delay_till header or a delay_till that is
-        // less than now. Because ActiveMQ is stupid, numeric selects auto
-        // compare to null.
-        $start_time = time();
-        $con->subscribe( $queue, array( 'ack' => 'client', ) );
-        $con->setReadTimeout( 4 );
-
-        $processed = 0;
-        while ( true ) {
-            if ( $batch_size
-                and $processed >= $batch_size
-            ) {
-                watchdog( 'wmf_common', t( 'Processed the maximum batch size, 
exiting dequeue loop.' ), NULL, WATCHDOG_INFO );
-                break;
-            }
-
-            if ( $time_limit
-                and time() >= ( $start_time + $time_limit )
-            ) {
-                watchdog( 'wmf_common', t( 'Time limit expired, exiting 
dequeue loop.' ), NULL, WATCHDOG_INFO );
-                break;
-            }
-
-            $msg = $con->readFrame();
-            if ( empty($msg) ) {
-                watchdog( 'wmf_common', t('Ran out of messages.'), NULL, 
WATCHDOG_INFO );
-                break;
-            }
-            if ($msg->command === 'RECEIPT') {
-                // TODO it would be smart to keep track of RECEIPT frames as 
they are an ack-of-ack
-                watchdog( 'wmf_common', t('Popped a sweet nothing off the 
queue:') . check_plain( print_r($msg, TRUE) ), NULL, WATCHDOG_INFO );
-                $i--;
-                continue;
-            } elseif (($msg->command === 'MESSAGE') &&
-                      array_key_exists('delay_till', $msg->headers) &&
-                      (intval($msg->headers['delay_till']) > time())
-            ) {
-                // We have a message that we are not supposed to process yet, 
So requeue and ack
-                watchdog( 'wmf_common', t('Requeueing delay_till message.'), 
NULL, WATCHDOG_DEBUG );
-                if( $this->requeue( $msg ) ) {
-                    $this->ack( $msg );
-                } else {
-                    throw new WmfException("STOMP_BAD_CONNECTION", "Failed to 
requeue a delayed message: ", $msg);
-                }
-
-                // If we're seeing messages with the current transaction ID in 
it we've started to eat our own
-                // tail. So... we should bounce out.
-                if ( strpos($msg->headers['message-id'], 
$this->getSessionId()) === 0 ) {
-                    break;
-                } else {
-                    continue;
-                }
-            }
-
-            watchdog( 'wmf_common', t( 'Feeding raw queue message to %callback 
: %msg', array( '%callback' => print_r($callback, TRUE), '%msg' => 
$this->debug_message( $msg ) ) ), NULL, WATCHDOG_INFO );
-
-            set_time_limit( 60 );
-
-            try {
-                $this->commonMessageNormalize( $msg );
-                WmfDatabase::transactionalCall(
-                    $callback, array( $msg )
-                );
-                $this->ack( $msg );
-            } catch ( WmfException $ex ) {
-                watchdog( 'wmf_common', 'Failure while processing message: ' . 
$ex->getMessage(), NULL, WATCHDOG_ERROR );
-
-                if ( $ex->isRequeue() ) {
-                    $ret = $this->requeueWithDelay( $msg, $ex );
-
-                    if ( $ret ) {
-                      $this->ack( $msg );
-                    } else {
-                      throw new WmfException( "STOMP_BAD_CONNECTION", "Failed 
to requeue a message" );
-                    }
-                }
-
-                if ( $ex->isDropMessage() ) {
-                    watchdog( 'wmf_common', "Dropping message altogether: " . 
Queue::getCorrelationId( $msg ), NULL, WATCHDOG_ERROR );
-                    $this->ack( $msg );
-                } elseif ( $ex->isRejectMessage() ) {
-                    watchdog( 'wmf_common', "\nRemoving failed message from 
the queue: \n" . print_r($msg, TRUE), NULL, WATCHDOG_ERROR );
-                    $this->reject( $msg, $ex );
-                    $this->ack( $msg );
-
-                    $mailableDetails = $this->item_url( $msg );
-                } else {
-                    $mailableDetails = "Redacted contents of message ID: " . 
Queue::getCorrelationId( $msg );
-                }
-
-                if ( !$ex->isNoEmail() ) {
-                    wmf_common_failmail( 'wmf_common', '', $ex, 
$mailableDetails );
-                }
-
-                if ( $ex->isFatal() ) {
-                    $error = "Halting Process.";
-                    watchdog( 'wmf_common', $error, NULL, WATCHDOG_ERROR );
-
-                    throw $ex;
-                }
-            } catch (Exception $ex) {
-                $error = 'UNHANDLED ERROR. Halting dequeue loop. Exception: ' 
. $ex->getMessage() . "\nStack Trace: " . print_r( $ex->getTrace(), true );
-                watchdog( 'wmf_common', $error, NULL, WATCHDOG_ERROR );
-                wmf_common_failmail( 'wmf_common', $error, NULL, 
Queue::getCorrelationId( $msg ) );
-
-                throw $ex;
-            }
-
-            $processed++;
-        }
-
-        $con->unsubscribe( $queue );
-        $this->disconnect();
-
-        return $processed;
-    }
-
-    /**
-     * Preview several messages from the head of a queue, without ack'ing.
-     */
-    function peekMultiple( $queue, $count ) {
-        # TODO: prefetchPolicy, timeout as params?
-
-        $queue = $this->normalizeQueueName( $queue );
-        $con = $this->getFreshConnection();
-
-        $con->setReadTimeout( 1 );
-        # FIXME: probably the wrong value--does this buffer include msgs not
-        # specific to our subscription?
-        $con->prefetchSize = $count;
-
-        $con->subscribe( $queue, array( 'ack' => 'client' ) );
-        $messages = array();
-        while ( $con->hasFrameToRead() && $count-- >= 0 ) {
-            $msg = $con->readFrame();
-            if ( !$msg ) {
-                break;
-            }
-            $messages[] = $msg;
-        }
-        return $messages;
-    }
-
-    function getByCorrelationId( $queue, $correlationId ) {
-        $con = $this->getFreshConnection();
-        $properties = array(
-            'ack' => 'client',
-            'selector' => "JMSCorrelationID='{$correlationId}'",
-        );
-               $timeout = variable_get( "queue_timeout_$queue", 5 );
-               $con->setReadTimeout( $timeout );
-        $con->subscribe( $this->normalizeQueueName( $queue ), $properties );
-
-        return $con->readFrame();
-    }
-
-    function getByAnyId( $queue, $correlationId ) {
-        $con = $this->getFreshConnection();
-        $properties = array(
-            'ack' => 'client',
-            'selector' => "JMSCorrelationID='{$correlationId}' OR 
JMSMessageID='{$correlationId}'",
-        );
-        $con->subscribe( $this->normalizeQueueName( $queue ), $properties );
-
-        return $con->readFrame();
-    }
-
-    static function getCorrelationId( $msg ) {
-        if ( !empty( $msg->headers['correlation-id'] ) ) {
-            return $msg->headers['correlation-id'];
-        }
-        $body = json_decode( $msg->body, TRUE );
-        if ( !empty( $body['gateway'] ) && !empty( $body['gateway_txn_id'] ) ) 
{
-            return "{$body['gateway']}-{$body['gateway_txn_id']}";
-        }
-        if ( !empty( $msg->headers['message-id'] ) ) {
-            return $msg->headers['message-id'];
-        }
-
-        watchdog( 'wmf_common', 'Could not create a correlation-id for 
message: ' . $msg->body, NULL, WATCHDOG_WARNING );
-        return '';
-    }
-
-    function isConnected() {
-        return $this->connection and $this->connection->isConnected();
-    }
-
-    function getFreshConnection() {
-        return $this->getConnection( true );
-    }
-
-    function getConnection( $renew = false ) {
-        if ( $renew && !empty( $this->connection ) ) {
-            $this->disconnect();
-        }
-
-        watchdog( 'wmf_common', 'Attempting connection to queue server: %url', 
array( '%url' => $this->url ) );
-
-        $attempt = 0;
-        while ( !$this->isConnected() and $attempt <= 
Queue::MAX_CONNECT_ATTEMPTS ) {
-            try {
-                ++$attempt;
-                $this->connection = new Stomp( $this->url );
-                $this->connection->connect();
-            }
-            catch ( Stomp_Exception $e ) {
-                $this->connection = null;
-                watchdog( 'wmf_common', "Queue connection failure #$attempt: " 
. $e->getMessage(), array(), WATCHDOG_ERROR );
-            }
-        }
-
-        if ( !$this->isConnected() ) {
-            throw new WmfException( "STOMP_BAD_CONNECTION", "Gave up 
connecting to the queue." );
-        }
-
-        return $this->connection;
-    }
-
-    /**
-     * Disconnect. Never called directly, only used as an
-     * automatic shutdown function.  Do not blow up.
-     */
-    function disconnect() {
-        try {
-            if ( $this->isConnected() ) {
-                watchdog( 'wmf_common', t('Attempting to disconnect from the 
queue server'), NULL, WATCHDOG_INFO );
-                $this->connection->disconnect();
-            }
-        }
-        catch ( Exception $ex ) {
-            watchdog( 'wmf_common', "Explosion during disconnect: " . 
$ex->getMessage(), NULL, WATCHDOG_ERROR );
-        }
-    }
-
-    /**
-     * Acknowledge a STOMP message and remove it from the queue
-     * @param $msg Message to acknowledge
-     */
-    function ack( $msg ) {
-        $this->getConnection()->ack( $msg );
-    }
-
-    /**
-     * Enqueue a STOMP message
-     *
-     * @param array $msg    Message to queue
-     * @param string $queue  Queue to queue to; should start with /queue/
-     * @return bool   True if STOMP claims it worked
-     */
-    function enqueue( $msg, $properties, $queue ) {
-      $properties['persistent'] = 'true';
-
-      $queue = $this->normalizeQueueName( $queue );
-      watchdog( 'wmf_common', 'Enqueueing a message on ' . $queue, NULL, 
WATCHDOG_DEBUG );
-      return $this->getConnection()->send( $queue, $msg, $properties );
-    }
-
-    /**
-     * Places a STOMP message back onto the queue moving its original 
timestamp to another
-     * property and maintaining a count of previous moves. After 
wmf_common_requeue_max
-     * moves it will reject the message (place into a rejection queue with 
details of
-     * the underlying exception that has caused this requeue.)
-     *
-     * Note: orig_timestamp is in ms
-     *
-     * @param StompFrame $msg_orig The message as fetched from stomp
-     * @param WmfException $ex The exception which has caused this message to 
need to be requeued.
-     *
-     * @return bool True if it all went successfully
-     */
-    function requeueWithDelay( $msg_orig, WmfException $ex ) {
-      $msg = $msg_orig->body;
-      $headers = array(
-        'orig_timestamp' => array_key_exists( 'orig_timestamp', 
$msg_orig->headers ) ? $msg_orig->headers['orig_timestamp'] : 
$msg_orig->headers['timestamp'],
-        'delay_till' => time() + 
intval(variable_get('wmf_common_requeue_delay', 20 * 60)),
-        'delay_count' => array_key_exists( 'delay_count', $msg_orig->headers ) 
? $msg_orig->headers['delay_count'] + 1 : 1,
-      );
-      $headers += $msg_orig->headers;
-
-      $queue = $msg_orig->headers['destination'];
-      $max_count = intval(variable_get('wmf_common_requeue_max', 10));
-
-      $retval = FALSE;
-      if (($max_count > 0) && ($headers['delay_count'] > $max_count)) {
-        try {
-            $this->reject( $msg_orig, $ex );
-            $retval = TRUE;
-        } catch ( WmfException $ex ) {
-            $errorMsg = $ex->getMessage();
-        }
-      } else {
-        watchdog( 'wmf_common', "Requeueing message to $queue", NULL, 
WATCHDOG_INFO );
-        $errorMsg = "Bad connection?";
-        try {
-          $retval = $this->enqueue( $msg, $headers, $queue );
-        } catch ( Stomp_Exception $ex ) {
-          $errorMsg = $ex->getMessage();
-        }
-      }
-
-      if ( !$retval ) {
-        $error = "Failed to requeue message: {$errorMsg}. Contents: " . 
json_encode( $msg_orig );
-        watchdog( 'wmf_common', $error, NULL, WATCHDOG_ERROR );
-        wmf_common_failmail( 'wmf_common', $error );
-      }
-
-      return $retval;
-    }
-
-    /**
-     * Places a STOMP message back onto the queue. Does not increment 
delay_count.
-     *
-     * @param $msg_orig
-     * @return bool
-     */
-    function requeue( $msg_orig ) {
-      $msg = $msg_orig->body;
-      $headers = array(
-        'orig_timestamp' => array_key_exists( 'orig_timestamp', 
$msg_orig->headers ) ? $msg_orig->headers['orig_timestamp'] : 
$msg_orig->headers['timestamp'],
-        'delay_till' => array_key_exists( 'delay_till', $msg_orig->headers ) ? 
$msg_orig->headers['delay_till'] : 0,
-        'delay_count' => array_key_exists( 'delay_count', $msg_orig->headers ) 
? $msg_orig->headers['delay_count'] : 0,
-      );
-      $headers += $msg_orig->headers;
-      $queue = $headers['destination'];
-
-      try {
-        $retval = $this->enqueue( $msg, $headers, $queue );
-        return $retval;
-      } catch (Stomp_Exception $ex) {
-        $exMsg = "Failed to requeue message with {$ex->getMessage()}. 
Contents: " . json_encode($msg_orig);
-        watchdog('wmf_common', $exMsg, NULL, WATCHDOG_ERROR);
-        wmf_common_failmail('wmf_common', $exMsg);
-        return false;
-      }
-    }
-
-    /**
-     * Saves an archival msg to <QUEUE>-damaged
-     *
-     * @return string URL pointing to manual edit and requeuing of the newly 
archived msg
-     */
-    function reject( $msg, WmfException $error ) {
-        $suffix = "-damaged";
-        //if ( strstr( $msg->headers['destination'], $suffix ) ) { ERROR
-        $msg->headers['destination'] .= $suffix;
-
-        $new_body = array(
-            'original' => json_decode( $msg->body ),
-        );
-        if ( $error ) {
-            $msg->headers['error'] = $error->getErrorName();
-            $new_body['error'] = $error->getMessage();
-        }
-        $msg->headers['correlation-id'] = Queue::getCorrelationId( $msg );
-
-        $queue = $this->normalizeQueueName( $msg->headers['destination'] );
-
-        watchdog( 'wmf_common', 'Attempting to move a message to ' . $queue, 
NULL, WATCHDOG_INFO );
-        watchdog( 'wmf_common', "Requeuing under correlation-id 
{$msg->headers['correlation-id']}", NULL, WATCHDOG_INFO );
-        if ( !$this->enqueue( json_encode( $new_body ), $msg->headers, $queue 
) ) {
-            $exMsg = 'Failed to inject rejected message into $queue! ' . 
json_encode( $msg );
-            watchdog( 'wmf_common', $exMsg, NULL, WATCHDOG_ERROR );
-            throw new WmfException( 'STOMP_BAD_CONNECTION', 'Could not 
reinject damaged message' );
-        }
-    }
-
-    function item_url( $msg ) {
-        global $base_url;
-        $queue = str_replace('/queue/', '', $msg->headers['destination'] );
-        $correlationId = Queue::getCorrelationId( $msg );
-        return "{$base_url}/queue/{$queue}/{$correlationId}";
-    }
-
-    /**
-     * Obtain the current stomp session id prefix
-     *
-     * @return string
-     */
-    protected function getSessionId() {
-        return $this->getConnection()->getSessionId();
-    }
-
-    protected function normalizeQueueName( $queue ) {
-        $queue = str_replace( '/queue/', '', $queue );
-        return '/queue/' . $queue;
-    }
-
-    protected function debug_message( $msg ) {
-        $msg_copy = clone( $msg );
-        if ( is_string( $msg_copy->body ) ) {
-            $decoded = json_decode( $msg_copy->body, true );
-            if ( $decoded ) {
-                $msg_copy->body = $decoded;
-            }
-        }
-        # php 5.4 $msg_str = json_encode( $msg_copy, JSON_PRETTY_PRINT );
-        $msg_str = print_r( $msg_copy, true );
-        return $msg_str;
-    }
-
-    protected function commonMessageNormalize( &$msg ) {
-        if ( !$msg->body ) {
-            throw new WmfException( 'INVALID_MESSAGE', 'Bad news, this message 
was lacking a body.' );
-        }
-
-        // argh.  Collapse useful headers into the message, then do a stupid 
dance.
-        $pull_headers = array(
-            'source_name',
-            'source_type',
-            'source_host',
-            'source_run_id',
-            'source_version',
-            'source_enqueued_time',
-        );
-        $newBody = json_decode( $msg->body, true )
-            + array_intersect_key( $msg->headers, array_flip( $pull_headers ) 
);
-
-        $msg->body = json_encode( $newBody );
-    }
-}
diff --git a/sites/all/modules/wmf_common/failmail.php 
b/sites/all/modules/wmf_common/failmail.php
index aaed052..13295fa 100644
--- a/sites/all/modules/wmf_common/failmail.php
+++ b/sites/all/modules/wmf_common/failmail.php
@@ -71,7 +71,7 @@
     $body = array();
 
     if ($isRemoved === true){
-        $body[] = t("A message was removed from ActiveMQ due to the following 
error(s):");
+        $body[] = t("A message was removed from the queue due to the following 
error(s):");
     } elseif($error === null && !$message){
         $body[] = t("A message failed for reasons unknown, while being 
processed:");
     } else {
diff --git a/sites/all/modules/wmf_common/wmf_common.info 
b/sites/all/modules/wmf_common/wmf_common.info
index c929a09..a603a1d 100755
--- a/sites/all/modules/wmf_common/wmf_common.info
+++ b/sites/all/modules/wmf_common/wmf_common.info
@@ -4,7 +4,6 @@
 package = Wikimedia
 configure = admin/config/wmf_common
 files[] = CsvBatchFile.php
-files[] = Queue.php
 files[] = TransactionalWmfQueueConsumer.php
 files[] = WmfDatabase.php
 files[] = WmfException.php
diff --git a/sites/all/modules/wmf_common/wmf_common.module 
b/sites/all/modules/wmf_common/wmf_common.module
index 5c1bb69..53a3295 100644
--- a/sites/all/modules/wmf_common/wmf_common.module
+++ b/sites/all/modules/wmf_common/wmf_common.module
@@ -25,22 +25,6 @@
     'page callback'    => 'drupal_get_form',
     'page arguments'   => array( 'wmf_common_settings' ),
   );
-  $items['queue/damaged'] = array(
-    'title' => 'Browse all damaged messages',
-    'access arguments' => array( 'manipulate queues' ),
-    'page callback' => 'drupal_get_form',
-    'page arguments' => array( 'wmf_common_queue_browser_form' ),
-  );
-  $items['queue/%/%'] = array(
-    'access arguments' => array( 'manipulate queues' ),
-    'page callback' => 'drupal_get_form',
-    'page arguments' => array( 'wmf_common_queue_item_form', 1, 2 ),
-  );
-  $items['queue/%/%/download'] = array(
-    'access arguments' => array( 'manipulate queues' ),
-    'page callback' => 'wmf_common_queue_item_download',
-    'page arguments' => array( 1, 2 ),
-  );
 
   return $items;
 }
@@ -101,183 +85,6 @@
   );
 
   return system_settings_form($form);
-}
-
-function wmf_common_queue_browser_form( $form, &$form_state ) {
-  $q = queue2civicrm_stomp();
-
-  # TODO: collect these using a hook
-  $queues = array(
-    variable_get( 'queue2civicrm_subscription' ),
-    variable_get( 'recurring_subscription' ),
-    variable_get( 'refund_queue' ),
-    variable_get( 'unsubscribe_queue' ),
-    variable_get( 'job_queue', '/queue/job-requests' ),
-  );
-
-  # semi-FIXME: not actually paging
-  $allMsgs = array();
-  $pageSize = 100;
-  foreach ( $queues as $queue ) {
-    $msgs = $q->peekMultiple( "{$queue}-damaged", $pageSize );
-    $pageSize -= count( $msgs );
-    $allMsgs = array_merge( $allMsgs, $msgs );
-  }
-
-  $rows = array();
-  foreach ( $allMsgs as $msg ) {
-    $damaged_queue = str_replace( '/queue/', '', $msg->headers['destination'] 
);
-    $queue = str_replace( '-damaged', '', $damaged_queue );
-    if ( !empty( $msg->headers['correlation-id'] ) ) {
-      $id = $msg->headers['correlation-id'];
-    } else {
-      $id = $msg->headers['message-id'];
-    }
-
-    $rows[] = array(
-      $msg->headers['error'],
-      $queue,
-      l( $id, $q->item_url( $msg ) ),
-    );
-  }
-
-
-  $table_html = theme_table( array(
-    'header' => array(
-      'Error',
-      'Queue',
-      'ID',
-    ),
-    'rows' => $rows,
-    'empty' => 'Nothing in the damaged queues!  Be very suspicious...',
-    'attributes' => array(),
-    'caption' => t( 'Damaged messages' ),
-    'colgroups' => array(),
-    'sticky' => true,
-  ) ).theme( 'pager' );
-
-  $form['table'] = array(
-    '#markup' => $table_html,
-  );
-
-  return $form;
-}
-
-function wmf_common_queue_item_form( $form, &$form_state, $queue, 
$correlationId ) {
-  $q = queue2civicrm_stomp();
-
-  $queue = preg_replace( '/[^-_a-z]/', '', $queue );
-  $correlationId = preg_replace( '/[^-a-zA-Z0-9\.:]/', '', $correlationId );
-
-  $msg = $q->getByCorrelationId( $queue, $correlationId );
-
-  if ( !$msg ) {
-       $msg = $q->getByAnyId( $queue, $correlationId );
-  }
-
-  if ( !$msg ) {
-    drupal_set_message( t( 'Message %id not found.', array( '%id' => 
$correlationId ) ) );
-    return $form;
-  }
-
-  $msg = json_decode( $msg->body, TRUE );
-
-  if ( !empty( $msg['error'] ) ) {
-    $form['error'] = array(
-      '#title' => 'Error',
-      '#markup' => "<div>Failure reason:<br>" . check_plain( $msg['error'] ) . 
"</div>",
-    );
-  }
-  if ( !empty( $msg['original'] ) ) {
-    $body = $msg['original'];
-  } else {
-    $body = $msg;
-  }
-  $form['msg_fields'] = array(
-    '#type' => 'fieldset',
-    '#title' => 'Message',
-  );
-  foreach ( $body as $key => $value ) {
-    $form['msg_fields']['field-' . $key] = array(
-        '#type' => 'textfield',
-        '#title' => $key,
-        '#default_value' => $value,
-        '#maxlength' => 1024,
-    );
-  }
-
-  if ( preg_match( '/-damaged$/', $queue ) ) {
-    $form['resend'] = array(
-      '#type' => 'submit',
-      '#value' => t( 'Resend' ),
-    );
-    $form['download'] = array(
-      '#type' => 'submit',
-      '#value' => t( 'Download' ),
-    );
-    $form['delete'] = array(
-      '#type' => 'submit',
-      '#value' => t( 'Delete' ),
-    );
-    $form['#submit'] = array( 'wmf_common_queue_item_submit' );
-  }
-
-  return $form;
-}
-
-function wmf_common_queue_item_submit( $form, &$form_state ) {
-  $queue = preg_replace( '/[^-_a-z]|-damaged$/', '', 
$form_state['build_info']['args'][0] );
-  $correlationId = preg_replace( '/[^-a-zA-Z0-9\.:]/', '', 
$form_state['build_info']['args'][1] );
-
-  $headers = array(
-    'correlation-id' => $correlationId,
-  );
-  $body = array();
-  foreach ( $form_state['values'] as $key => $value ) {
-    if ( preg_match( '/field-(.*)/', $key, $matches ) ) {
-      $body[$matches[1]] = $value;
-    }
-  }
-
-  $q = queue2civicrm_stomp();
-
-  switch ( $form_state['values']['op'] ) {
-  case $form_state['values']['resend']:
-    $msg = $q->getByAnyId( "{$queue}-damaged", $correlationId );
-    $q->enqueue( json_encode( $body ), $headers + $msg->headers, $queue );
-
-    $q->ack( $msg );
-
-    drupal_set_message( t( 'Message %id resent for processing.', array( '%id' 
=> $correlationId ) ) );
-    $form_state['redirect'] = "queue/{$queue}/{$correlationId}";
-    break;
-  case $form_state['values']['delete']:
-    $msg = $q->getByAnyId( "{$queue}-damaged", $correlationId );
-    $q->ack( $msg );
-    drupal_set_message( t( 'Successfully removed message %id.', array( '%id' 
=> $correlationId ) ) );
-    $form_state['redirect'] = "queue/damaged";
-    break;
-  case $form_state['values']['download']:
-    $form_state['redirect'] = 
"queue/{$queue}-damaged/{$correlationId}/download";
-    break;
-  }
-}
-
-function wmf_common_queue_item_download( $queue, $correlationId ) {
-  $queue = preg_replace( '/[^-_a-z]/', '', $queue );
-  $correlationId = preg_replace( '/[^-a-zA-Z0-9\.:]/', '', $correlationId );
-
-  $q = queue2civicrm_stomp();
-  $msg = $q->getByAnyId( $queue, $correlationId );
-
-  $name = mime_header_encode( "{$correlationId}.json" );
-  drupal_add_http_header( 'Content-Type', 'application/json' );
-  drupal_add_http_header( 'Content-Disposition', "attachment; 
filename=\"{$name}\"" );
-  $data = json_encode( array(
-    'headers' => $msg->headers,
-    'body' => json_decode( $msg->body, true ),
-  ) );
-  echo( $data );
 }
 
 /**

-- 
To view, visit https://gerrit.wikimedia.org/r/312443
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3b41e9579396ad76cae0066bdc373c9df0cd8749
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/crm
Gerrit-Branch: master
Gerrit-Owner: Ejegg <eeggles...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to