Parent5446 has uploaded a new change for review. https://gerrit.wikimedia.org/r/51635
Change subject: Added JobQueue that uses Amazon SQS. ...................................................................... Added JobQueue that uses Amazon SQS. Increased required SDK version for SQS support. Added JobQueue class that uses SDK to use SQS as a back-end for job queues. Change-Id: Ica5e18d31b335e8800427a154bb8809d93797dfb --- M AWS.php A JobQueueAmazonSqs.php 2 files changed, 210 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/AWS refs/changes/35/51635/1 diff --git a/AWS.php b/AWS.php index 0290dc1..fca9c9a 100644 --- a/AWS.php +++ b/AWS.php @@ -43,6 +43,7 @@ */ $wgAWSRegion = false; +$wgAutoloadClasses['JobQueueAmazonSqs'] = __DIR__ . '/JobQueueAmazonSqs.php'; $wgExtensionMessagesFiles['AWSSDK'] = __DIR__ . '/AWS.i18n.php'; require_once( __DIR__ . '/vendor/autoload.php' ); diff --git a/JobQueueAmazonSqs.php b/JobQueueAmazonSqs.php new file mode 100644 index 0000000..5fcf26f --- /dev/null +++ b/JobQueueAmazonSqs.php @@ -0,0 +1,209 @@ +<?php + +require 'vendor/autoload.php'; + +use Aws\Sqs\SqsClient; + +/** + * Implements a JobQueue class for the Amazon SQS + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Extensions + */ + +/** + * A job queue that uses the Amazon SQS service as a back-end. + * + * @see JobQueue + * @author Tyler Romeo <[email protected]> + */ +class JobQueueAmazonSqs extends JobQueue { + private $size = null; + private $ackSize = null; + + function __construct( array $params ) { + global $wgAWSCredentials, $wgAWSRegion, $wgAWSUseHTTPS; + parent::__construct( $params ); + + $this->client = SqsClient::factory( array( + 'key' => $wgAWSCredentials['key'], + 'secret' => $wgAWSCredentials['secret'], + 'region' => $wgAWSRegion, + 'scheme' => $wgAWSUseHTTPS ? 'https' : 'http', + 'ssl.cert' => $wgAWSUseHTTPS ? true : null + ) ); + + $this->queue = $this->client->createQueue( array( + 'QueueName' => 'mediawiki-' . $this->wiki . '-jobqueue-' . $this->type + ) ); + } + + function doAck( Job $job ) { + $this->client->deleteMessage( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'ReceiptHandle' => $job->getParams()['aws_receipt'] + ) ); + return true; + } + + function doBatchPush( array $jobs, $flags ) { + $entries = array(); + foreach( $jobs as $job ) { + $entries[] = array( + 'Id' => $job->getId(), + 'MessageBody' => serialize( array( + 'cmd' => $job->getType(), + 'id' => $job->getId(), + 'namespace' => $job->getTitle()->getNamespace(), + 'title' => $job->getTitle()->getText(), + 'params' => $job->getParams() + ) ) + ); + } + + $res = $this->client->sendMessageBatch( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'Entries' => $entries + ) ); + + if( count( $res['Failed'] ) ) { + throw new MWException( 'Jobs failed to push to the AWS SQS Job Queue.' ); + } + + return true; + } + + function doDeduplicateRootJob( Job $job ) { + global $wgMemc; + + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); + } + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + + // current last timestamp of this job + $timestamp = $wgMemc->get( $key ); + if ( $timestamp >= $params['rootJobTimestamp'] ) { + // a newer version of this root job was enqueued + return true; + } else { + // Update the timestamp of the last root job started at the location... + return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } + } + + function doGetAcquiredCount() { + if( $this->ackSize === null ) { + $this->getAttributes(); + } + return $this->ackSize; + } + + function doGetSize() { + if( $this->size === null ) { + $this->getAttributes(); + } + return $this->size; + } + + function doIsEmpty() { + return (bool) $this->doGetSize(); + } + + function doPop() { + $job = false; + do { + $msgs = $this->client->receiveMessage( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'MaxNumberOfMessages' => 1, + 'VisibilityTimeout' => $this->claimTTL > 0 ? $this->claimTTL : 0 + ) ); + + if( !$msgs['Messages'] ) { + break; + } + + $msg = $msgs['Messages'][0]; + + if( isset( $msg['Attributes']['ApproximateNumberOfMessages'] ) ) { + $this->size = $msg['Attributes']['ApproximateNumberOfMessages']; + } + if( isset( $msg['Attributes']['ApproximateNumberOfMessagesNotVisible'] ) ) { + $this->ackSize = $msg['Attributes']['ApproximateNumberOfMessagesNotVisible']; + } + + if( md5( $msg['Body'] ) !== $msg['MD5OfBody'] ) { + throw new MWException( 'Invalid MD5 of message body received.' ); + } + $desc = unserialize( $msg['Body'] ); + + $title = Title::makeTitleSafe( $desc['namespace'], $desc['title'] ); + if( !$title ) { + $this->client->deleteMessage( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'ReceiptHandle' => $this->msg->ReceiptHandle + ) ); + wfDebugLog( 'JobQueueAmazonSqs', "Row has invalid title '{$desc['title']}'." ); + continue; + } + + $desc['params'] += array( 'aws_id' => $msg['MessageId'], 'aws_receipt' => $msg['ReceiptHandle'] ); + $job = Job::factory( $desc['cmd'], $title, $desc['params'], $desc['id'] ); + if( $this->isRootJobOldDuplicate( $job ) ) { + wfIncrStats( 'job-pop-duplicate' ); + $job = DuplicateJob::newFromJob( $job ); + } + break; + } while( true ); + + return $job; + } + + private function isRootJobOldDuplicate( Job $job ) { + global $wgMemc; + + $params = $job->getParams(); + if( !isset( $params['rootJobSignature'] ) ) { + return false; + } elseif( !isset( $params['rootJobTimestamp'] ) ) { + trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." ); + return false; + } + + $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); + + return $timestamp > $params['rootJobTimestamp']; + } + + private function getAttributes() { + $attrs = $this->client->getQueueAttributes( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'AttributeNames' => array( 'ApproximateNumberOfMessages', 'ApproximateNumberOfMessagesNotVisible' ) + ) ); + $this->size = $attrs['Attributes']['ApproximateNumberOfMessages']; + $this->ackSize = $attrs['Attributes']['ApproximateNumberOfMessagesNotVisible']; + } + + private function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + } +} -- To view, visit https://gerrit.wikimedia.org/r/51635 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ica5e18d31b335e8800427a154bb8809d93797dfb Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/extensions/AWS Gerrit-Branch: master Gerrit-Owner: Parent5446 <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
