Mwalker has uploaded a new change for review.
https://gerrit.wikimedia.org/r/112836
Change subject: Script to add messages back into queue!
......................................................................
Script to add messages back into queue!
Change-Id: I1d811c437bc94056f567e5ab776c814c28223288
---
M Core/DataStores/StompDataStore.php
A Maintenance/PopulateQueueFromDump.php
2 files changed, 96 insertions(+), 3 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/SmashPig
refs/changes/36/112836/1
diff --git a/Core/DataStores/StompDataStore.php
b/Core/DataStores/StompDataStore.php
index db1a980..c7042c2 100644
--- a/Core/DataStores/StompDataStore.php
+++ b/Core/DataStores/StompDataStore.php
@@ -133,12 +133,31 @@
}
// Push the object to the queue!
- $sent = $this->stompObj->send( $this->queue_id, $obj->toJson(),
$headers );
- if ( !$sent ) {
+ $bodyJson = $obj->toJson();
+ try {
+ $this->addObjectRaw( $bodyJson, $headers );
+ } catch ( DataStoreException $ex ) {
Logger::error(
"Could not queue message ({$objClass}) with id
'{$objKeys[ 'correlationId' ]}' to '{$this->queue_id}' on '{$this->uri}'",
- $obj->toJson()
+ $bodyJson
);
+ throw $ex;
+ }
+ }
+
+ /**
+ * Function to inject a raw message to the queue server. This is
provided as a
+ * public API only for maintenance scripts that need to interact
directly with
+ * the backend queue. In general, use addObject().
+ *
+ * @param string $body JSON encoded message body (or any string)
+ * @param string[] $headers Any STOMP headers required for submission
(none will be added)
+ *
+ * @throws DataStoreException if the message could not be stored
+ */
+ public function addObjectRaw( $body, $headers ) {
+ $sent = $this->stompObj->send( $this->queue_id, $body, $headers
);
+ if ( !$sent ) {
throw new DataStoreException( "Could not queue message
to '{$this->queue_id}' on '{$this->uri}'" );
}
}
diff --git a/Maintenance/PopulateQueueFromDump.php
b/Maintenance/PopulateQueueFromDump.php
new file mode 100644
index 0000000..1db0170
--- /dev/null
+++ b/Maintenance/PopulateQueueFromDump.php
@@ -0,0 +1,74 @@
+<?php namespace SmashPig\Maintenance;
+
+require( 'MaintenanceBase.php' );
+
+use SmashPig\Core\Logging\Logger;
+use SmashPig\Core\DataStores\StompDataStore;
+
+$maintClass = '\SmashPig\Maintenance\PopulateQueueFromDump';
+
+/**
+ * Script to import a file created by EmptyQueueToDump back into a backing
STOMP queue.
+ *
+ * @package SmashPig\Maintenance
+ */
+class PopulateQueueFromDump extends MaintenanceBase {
+
+ protected $datastore = null;
+
+ public function __construct() {
+ parent::__construct();
+ $this->addOption( 'queue', 'queue name to inject into', 'test'
);
+ $this->addArgument( 'file', 'File, created by EmptyQueueToDump,
with JSON format messages to inject', true );
+ }
+
+ /**
+ * Do the actual work of the script.
+ */
+ public function execute() {
+ $this->datastore = new StompDataStore( $this->getOption(
'queue' ) );
+
+ $startTime = time();
+ $messageCount = 0;
+
+ $raw = $this->getOption( 'raw' );
+
+ // Open the file for read
+ $infile = $this->getOption( 'file' );
+ $f = fopen( $infile, 'r' );
+ if ( !$f ) {
+ $this->error( "Could not open $infile for read", true );
+ }
+
+ // Do the loop!
+ while ( ( $line = fgets( $f ) ) !== false ) {
+ $parts = explode( '=', $line, 2 );
+ if ( count( $parts ) !== 2 ) {
+ $this->error( "Improperly formatted line:
$line" );
+ continue;
+ }
+
+ $obj = json_decode( $parts[ 1 ], true );
+ if ( !array_key_exists( 'headers', $obj ) ||
!array_key_exists( 'body', $obj ) ) {
+ $this->error( "Decoded line does not have
headers and body elements: $line" );
+ continue;
+ }
+
+ $this->datastore->addObjectRaw( json_encode( $obj[
'body' ] ), $obj[ 'headers' ] );
+
+ $messageCount++;
+ if ( $messageCount % 1000 == 0 ) {
+ print( '.' );
+ }
+ }
+ print( '\n' );
+
+ $elapsedTime = time() - $startTime;
+ Logger::info(
+ "Imported $messageCount messages from $infile in
$elapsedTime seconds."
+ );
+ }
+
+}
+
+require( RUN_MAINTENANCE_IF_MAIN );
--
To view, visit https://gerrit.wikimedia.org/r/112836
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I1d811c437bc94056f567e5ab776c814c28223288
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Mwalker <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits