Mwalker has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/112836

Change subject: Script to add messages back into queue!
......................................................................

Script to add messages back into queue!

Change-Id: I1d811c437bc94056f567e5ab776c814c28223288
---
M Core/DataStores/StompDataStore.php
A Maintenance/PopulateQueueFromDump.php
2 files changed, 96 insertions(+), 3 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/SmashPig 
refs/changes/36/112836/1

diff --git a/Core/DataStores/StompDataStore.php 
b/Core/DataStores/StompDataStore.php
index db1a980..c7042c2 100644
--- a/Core/DataStores/StompDataStore.php
+++ b/Core/DataStores/StompDataStore.php
@@ -133,12 +133,31 @@
                }
 
                // Push the object to the queue!
-               $sent = $this->stompObj->send( $this->queue_id, $obj->toJson(), 
$headers );
-               if ( !$sent ) {
+               $bodyJson = $obj->toJson();
+               try {
+                       $this->addObjectRaw( $bodyJson, $headers );
+        } catch ( DataStoreException $ex ) {
                        Logger::error(
                                "Could not queue message ({$objClass}) with id 
'{$objKeys[ 'correlationId' ]}' to '{$this->queue_id}' on '{$this->uri}'",
-                               $obj->toJson()
+                               $bodyJson
                        );
+                       throw $ex;
+               }
+       }
+
+       /**
+        * Function to inject a raw message to the queue server. This is 
provided as a
+        * public API only for maintenance scripts that need to interact 
directly with
+        * the backend queue. In general, use addObject().
+        *
+        * @param string $body JSON encoded message body (or any string)
+        * @param string[] $headers Any STOMP headers required for submission 
(none will be added)
+        *
+        * @throws DataStoreException if the message could not be stored
+        */
+       public function addObjectRaw( $body, $headers ) {
+               $sent = $this->stompObj->send( $this->queue_id, $body, $headers 
);
+               if ( !$sent ) {
                        throw new DataStoreException( "Could not queue message 
to '{$this->queue_id}' on '{$this->uri}'" );
                }
        }
diff --git a/Maintenance/PopulateQueueFromDump.php 
b/Maintenance/PopulateQueueFromDump.php
new file mode 100644
index 0000000..1db0170
--- /dev/null
+++ b/Maintenance/PopulateQueueFromDump.php
@@ -0,0 +1,74 @@
+<?php namespace SmashPig\Maintenance;
+
+require( 'MaintenanceBase.php' );
+
+use SmashPig\Core\Logging\Logger;
+use SmashPig\Core\DataStores\StompDataStore;
+
+$maintClass = '\SmashPig\Maintenance\PopulateQueueFromDump';
+
+/**
+ * Script to import a file created by EmptyQueueToDump back into a backing 
STOMP queue.
+ *
+ * @package SmashPig\Maintenance
+ */
+class PopulateQueueFromDump extends MaintenanceBase {
+
+       protected $datastore = null;
+
+       public function __construct() {
+               parent::__construct();
+               $this->addOption( 'queue', 'queue name to inject into', 'test' 
);
+               $this->addArgument( 'file', 'File, created by EmptyQueueToDump, 
with JSON format messages to inject', true );
+       }
+
+       /**
+        * Do the actual work of the script.
+        */
+       public function execute() {
+               $this->datastore = new StompDataStore( $this->getOption( 
'queue' ) );
+
+               $startTime = time();
+               $messageCount = 0;
+
+               $raw = $this->getOption( 'raw' );
+
+               // Open the file for read
+               $infile = $this->getOption( 'file' );
+               $f = fopen( $infile, 'r' );
+               if ( !$f ) {
+                       $this->error( "Could not open $infile for read", true );
+               }
+
+               // Do the loop!
+               while ( ( $line = fgets( $f ) ) !== false ) {
+                       $parts = explode( '=', $line, 2 );
+                       if ( count( $parts ) !== 2 ) {
+                               $this->error( "Improperly formatted line: 
$line" );
+                               continue;
+                       }
+
+                       $obj = json_decode( $parts[ 1 ], true );
+                       if ( !array_key_exists( 'headers', $obj ) || 
!array_key_exists( 'body', $obj ) ) {
+                               $this->error( "Decoded line does not have 
headers and body elements: $line" );
+                               continue;
+                       }
+
+                       $this->datastore->addObjectRaw( json_encode( $obj[ 
'body' ] ), $obj[ 'headers' ] );
+
+                       $messageCount++;
+                       if ( $messageCount % 1000 == 0 ) {
+                               print( '.' );
+                       }
+               }
+               print( '\n' );
+
+               $elapsedTime = time() - $startTime;
+               Logger::info(
+                       "Imported $messageCount messages from $infile in 
$elapsedTime seconds."
+               );
+       }
+
+}
+
+require( RUN_MAINTENANCE_IF_MAIN );

-- 
To view, visit https://gerrit.wikimedia.org/r/112836
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I1d811c437bc94056f567e5ab776c814c28223288
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Mwalker <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to