Ejegg has uploaded a new change for review.
https://gerrit.wikimedia.org/r/287147
Change subject: WIP pending queue consumer
......................................................................
WIP pending queue consumer
TODO: catch exeptions, send to damaged queue
also filter fields and set correct param types
Bug: T133197
Change-Id: I4209d8ed96ef476f9031a7f266740292447418a7
---
A Maintenance/ConsumePendingQueue.php
1 file changed, 74 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/SmashPig
refs/changes/47/287147/1
diff --git a/Maintenance/ConsumePendingQueue.php
b/Maintenance/ConsumePendingQueue.php
new file mode 100644
index 0000000..4e05a3f
--- /dev/null
+++ b/Maintenance/ConsumePendingQueue.php
@@ -0,0 +1,74 @@
+<?php namespace SmashPig\Maintenance;
+
+require ( 'MaintenanceBase.php' );
+
+use SmashPig\Core\Context;
+use SmashPig\Core\Logging\Logger;
+use SmashPig\Core\DataStores\QueueConsumer;
+use \PDO;
+
+$maintClass = '\SmashPig\Maintenance\ConsumePendingQueue';
+
+/**
+ * Reads messages out of the pending queue and inserts them into a db table
+ */
+class ConsumePendingQueue extends MaintenanceBase {
+
+ /**
+ * @var PDO
+ */
+ protected $pendingDatabase = null;
+
+ public function __construct() {
+ parent::__construct();
+ $this->addOption( 'queue', 'queue name to consume from',
'pending' );
+ $this->addOption( 'destination', 'PDO database to store
messages', 'pending-db' );
+ $this->addOption( 'time-limit', 'Try to keep execution under
<n> seconds', 60, 't' );
+ $this->addOption( 'max-messages', 'At most consume <n>
messages', 10, 'm' );
+ }
+
+ /**
+ * Do the actual work of the script.
+ */
+ public function execute() {
+ $config = Context::get()->getConfiguration();
+ $this->pendingDatabase = $config->object(
+ 'data-store/' . $this->getOption( 'destination' )
+ );
+
+ $consumer = new QueueConsumer(
+ $this->getOption( 'queue' ),
+ $this->getOption( 'time-limit' ),
+ $this->getOption( 'message-limit' )
+ );
+
+ $startTime = time();
+ $messageCount = $consumer->dequeueMessages(
+ array( $this, 'storeMessage' )
+ );
+
+ $elapsedTime = time() - $startTime;
+ Logger::info(
+ "Processed $messageCount pending messages in
$elapsedTime seconds."
+ );
+ }
+
+ public function storeMessage( $message ) {
+ // TODO: filter to just the things we have db fields for
+ $fieldList = implode( ',', array_keys( $message ) );
+ $paramNames = ':' . implode( ', :', array_keys( $message ) );
+ $insert = "INSERT INTO pending ( $fieldList ) values (
$paramNames );";
+ $prepared = $this->pendingDatabase->prepare( $insert );
+ foreach ( $message as $field => $value ) {
+ // TODO: types, lengths
+ $prepared->bindParam(
+ ':' . $field,
+ $value,
+ PDO::PARAM_STR
+ );
+ }
+ $prepared->execute();
+ }
+}
+
+require ( RUN_MAINTENANCE_IF_MAIN );
--
To view, visit https://gerrit.wikimedia.org/r/287147
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4209d8ed96ef476f9031a7f266740292447418a7
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Ejegg <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits