Parent5446 has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/51635


Change subject: Added JobQueue that uses Amazon SQS.
......................................................................

Added JobQueue that uses Amazon SQS.

Increased required SDK version for SQS support. Added
JobQueue class that uses SDK to use SQS as a back-end
for job queues.

Change-Id: Ica5e18d31b335e8800427a154bb8809d93797dfb
---
M AWS.php
A JobQueueAmazonSqs.php
2 files changed, 210 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/AWS 
refs/changes/35/51635/1

diff --git a/AWS.php b/AWS.php
index 0290dc1..fca9c9a 100644
--- a/AWS.php
+++ b/AWS.php
@@ -43,6 +43,7 @@
  */
 $wgAWSRegion = false;
 
+$wgAutoloadClasses['JobQueueAmazonSqs'] = __DIR__ . '/JobQueueAmazonSqs.php';
 $wgExtensionMessagesFiles['AWSSDK'] = __DIR__ . '/AWS.i18n.php';
 
 require_once( __DIR__ . '/vendor/autoload.php' );
diff --git a/JobQueueAmazonSqs.php b/JobQueueAmazonSqs.php
new file mode 100644
index 0000000..5fcf26f
--- /dev/null
+++ b/JobQueueAmazonSqs.php
@@ -0,0 +1,209 @@
+<?php
+
+require 'vendor/autoload.php';
+
+use Aws\Sqs\SqsClient;
+
+/**
+ * Implements a JobQueue class for the Amazon SQS
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Extensions
+ */
+
+/**
+ * A job queue that uses the Amazon SQS service as a back-end.
+ *
+ * @see JobQueue
+ * @author Tyler Romeo <[email protected]>
+ */
+class JobQueueAmazonSqs extends JobQueue {
+       private $size = null;
+       private $ackSize = null;
+
+       function __construct( array $params ) {
+               global $wgAWSCredentials, $wgAWSRegion, $wgAWSUseHTTPS;
+               parent::__construct( $params );
+
+               $this->client = SqsClient::factory( array(
+                       'key' => $wgAWSCredentials['key'],
+                       'secret' => $wgAWSCredentials['secret'],
+                       'region' => $wgAWSRegion,
+                       'scheme' => $wgAWSUseHTTPS ? 'https' : 'http',
+                       'ssl.cert' => $wgAWSUseHTTPS ? true : null
+               ) );
+
+               $this->queue = $this->client->createQueue( array(
+                       'QueueName' => 'mediawiki-' . $this->wiki . 
'-jobqueue-' . $this->type
+               ) );
+       }
+
+       function doAck( Job $job ) {
+               $this->client->deleteMessage( array(
+                       'QueueUrl' => $this->queue['QueueUrl'],
+                       'ReceiptHandle' => $job->getParams()['aws_receipt']
+               ) );
+               return true;
+       }
+
+       function doBatchPush( array $jobs, $flags ) {
+               $entries = array();
+               foreach( $jobs as $job ) {
+                       $entries[] = array(
+                               'Id' => $job->getId(),
+                               'MessageBody' => serialize( array(
+                                       'cmd' => $job->getType(),
+                                       'id' => $job->getId(),
+                                       'namespace' => 
$job->getTitle()->getNamespace(),
+                                       'title' => $job->getTitle()->getText(),
+                                       'params' => $job->getParams()
+                               ) )
+                       );
+               }
+
+               $res = $this->client->sendMessageBatch( array(
+                       'QueueUrl' => $this->queue['QueueUrl'],
+                       'Entries' => $entries
+               ) );
+
+               if( count( $res['Failed'] ) ) {
+                       throw new MWException( 'Jobs failed to push to the AWS 
SQS Job Queue.' );
+               }
+
+               return true;
+       }
+
+       function doDeduplicateRootJob( Job $job ) {
+               global $wgMemc;
+
+               $params = $job->getParams();
+               if ( !isset( $params['rootJobSignature'] ) ) {
+                       throw new MWException( "Cannot register root job; 
missing 'rootJobSignature'." );
+               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+                       throw new MWException( "Cannot register root job; 
missing 'rootJobTimestamp'." );
+               }
+               $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+
+               // current last timestamp of this job
+               $timestamp = $wgMemc->get( $key );
+               if ( $timestamp >= $params['rootJobTimestamp'] ) {
+                       // a newer version of this root job was enqueued
+                       return true;
+               } else {
+                       // Update the timestamp of the last root job started at 
the location...
+                       return $wgMemc->set( $key, $params['rootJobTimestamp'], 
JobQueueDB::ROOTJOB_TTL );
+               }
+       }
+
+       function doGetAcquiredCount() {
+               if( $this->ackSize === null ) {
+                       $this->getAttributes();
+               }
+               return $this->ackSize;
+       }
+
+       function doGetSize() {
+               if( $this->size === null ) {
+                       $this->getAttributes();
+               }
+               return $this->size;
+       }
+
+       function doIsEmpty() {
+               return (bool) $this->doGetSize();
+       }
+
+       function doPop() {
+               $job = false;
+               do {
+                       $msgs = $this->client->receiveMessage( array(
+                               'QueueUrl' => $this->queue['QueueUrl'],
+                               'MaxNumberOfMessages' => 1,
+                               'VisibilityTimeout' => $this->claimTTL > 0 ? 
$this->claimTTL : 0
+                       ) );
+
+                       if( !$msgs['Messages'] ) {
+                               break;
+                       }
+
+                       $msg = $msgs['Messages'][0];
+
+                       if( isset( 
$msg['Attributes']['ApproximateNumberOfMessages'] ) ) {
+                               $this->size = 
$msg['Attributes']['ApproximateNumberOfMessages'];
+                       }
+                       if( isset( 
$msg['Attributes']['ApproximateNumberOfMessagesNotVisible'] ) ) {
+                               $this->ackSize = 
$msg['Attributes']['ApproximateNumberOfMessagesNotVisible'];
+                       }
+
+                       if( md5( $msg['Body'] ) !== $msg['MD5OfBody'] ) {
+                               throw new MWException( 'Invalid MD5 of message 
body received.' );
+                       }
+                       $desc = unserialize( $msg['Body'] );
+
+                       $title = Title::makeTitleSafe( $desc['namespace'], 
$desc['title'] );
+                       if( !$title ) {
+                               $this->client->deleteMessage( array(
+                                       'QueueUrl' => $this->queue['QueueUrl'],
+                                       'ReceiptHandle' => 
$this->msg->ReceiptHandle
+                               ) );
+                               wfDebugLog( 'JobQueueAmazonSqs', "Row has 
invalid title '{$desc['title']}'." );
+                               continue;
+                       }
+
+                       $desc['params'] += array( 'aws_id' => 
$msg['MessageId'], 'aws_receipt' => $msg['ReceiptHandle'] );
+                       $job = Job::factory( $desc['cmd'], $title, 
$desc['params'], $desc['id'] );
+                       if( $this->isRootJobOldDuplicate( $job ) ) {
+                               wfIncrStats( 'job-pop-duplicate' );
+                               $job = DuplicateJob::newFromJob( $job );
+                       }
+                       break;
+               } while( true );
+
+               return $job;
+       }
+
+       private function isRootJobOldDuplicate( Job $job ) {
+               global $wgMemc;
+
+               $params = $job->getParams();
+               if( !isset( $params['rootJobSignature'] ) ) {
+                       return false;
+               } elseif( !isset( $params['rootJobTimestamp'] ) ) {
+                       trigger_error( "Cannot check root job; missing 
'rootJobTimestamp'." );
+                       return false;
+               }
+
+               $timestamp = $wgMemc->get( $this->getRootJobCacheKey( 
$params['rootJobSignature'] ) );
+
+               return $timestamp > $params['rootJobTimestamp'];
+       }
+
+       private function getAttributes() {
+               $attrs = $this->client->getQueueAttributes( array(
+                       'QueueUrl' => $this->queue['QueueUrl'],
+                       'AttributeNames' => array( 
'ApproximateNumberOfMessages', 'ApproximateNumberOfMessagesNotVisible' )
+               ) );
+               $this->size = 
$attrs['Attributes']['ApproximateNumberOfMessages'];
+               $this->ackSize = 
$attrs['Attributes']['ApproximateNumberOfMessagesNotVisible'];
+       }
+
+       private function getRootJobCacheKey( $signature ) {
+               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 
'rootjob', $signature );
+       }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/51635
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ica5e18d31b335e8800427a154bb8809d93797dfb
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/AWS
Gerrit-Branch: master
Gerrit-Owner: Parent5446 <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to