Parent5446 has submitted this change and it was merged. Change subject: Added JobQueue that uses Amazon SQS. ......................................................................
Added JobQueue that uses Amazon SQS. Increased required SDK version for SQS support. Added JobQueue class that uses SDK to use SQS as a back-end for job queues. Change-Id: Ica5e18d31b335e8800427a154bb8809d93797dfb --- M AWS.php A sqs/JobQueueAmazonSqs.php 2 files changed, 327 insertions(+), 2 deletions(-) Approvals: Parent5446: Verified; Looks good to me, approved diff --git a/AWS.php b/AWS.php index 9119424..ff65e5a 100644 --- a/AWS.php +++ b/AWS.php @@ -47,8 +47,14 @@ */ $wgAWSRegion = false; -$wgAutoloadClasses['AmazonS3FileBackend'] = __DIR__ . '/s3/AmazonS3FileBackend.php'; +/** + * Whether to use HTTPS with AWS + */ +$wgAWSUseHTTPS = true; $wgExtensionMessagesFiles['AWS'] = __DIR__ . '/AWS.i18n.php'; +$wgAutoloadClasses['JobQueueAmazonSqs'] = __DIR__ . '/sqs/JobQueueAmazonSqs.php'; +$wgAutoloadClasses['AmazonS3FileBackend'] = __DIR__ . '/s3/AmazonS3FileBackend.php'; +$wgJobTypeConf['sqs'] = array( 'class' => 'JobQueueAmazonSqs', 'order' => 'random' ); -require_once( __DIR__ . '/vendor/autoload.php' ); +require_once __DIR__ . '/vendor/autoload.php'; diff --git a/sqs/JobQueueAmazonSqs.php b/sqs/JobQueueAmazonSqs.php new file mode 100644 index 0000000..2f3637e --- /dev/null +++ b/sqs/JobQueueAmazonSqs.php @@ -0,0 +1,319 @@ +<?php + +/** + * Implements a JobQueue class for the Amazon SQS + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Extensions + */ + +use Aws\Sqs\SqsClient; +use Aws\Sqs\Exception\SqsException; + +/** + * A job queue that uses the Amazon SQS service as a back-end. + * + * @see JobQueue + * @author Tyler Romeo <[email protected]> + */ +class JobQueueAmazonSqs extends JobQueue { + /** + * Cached size of the entire queue + * @var int + */ + private $size = null; + + /** + * Cached size of the queue's acquired jobs + * @var int + */ + private $ackSize = null; + + /** + * Cached size of the queue's delayed jobs + * @var int + */ + private $delaySize = null; + + /** + * Name of the queue in AWS SQS + * @var string + */ + private $queueName = null; + + /** + * Descriptor from AWS of the current queue + * @var array + */ + private $queue; + + /** + * The AWS client + * @var Aws\Sqs\SqsClient + */ + private $client; + + function __construct( array $params ) { + global $wgAWSCredentials, $wgAWSRegion, $wgAWSUseHTTPS; + + parent::__construct( $params ); + + $this->queueName = "mediawiki-{$this->wiki}-jobqueue-{$this->type}"; + + $useHTTPS = false; + if ( isset( $params['aws-https'] ) ) { + $useHTTPS = (bool)$params['aws-https']; + } else { + $useHTTPS = (bool)$wgAWSUseHTTPS; + } + + $this->client = SqsClient::factory( array( + 'key' => isset( $params['aws-key'] ) ? $params['aws-key'] : $wgAWSCredentials['key'], + 'secret' => isset( $params['aws-secret'] ) ? $params['aws-secret'] : $wgAWSCredentials['secret'], + 'region' => isset( $params['aws-region'] ) ? $params['aws-region'] : $wgAWSRegion, + 'scheme' => $useHTTPS ? 'https' : 'http', + 'ssl.cert' => $useHTTPS ? true : null + ) ); + } + + public function connect() { + global $wgMemc; + + if ( $this->queue !== null ) { + return; + } + + $existenceKey = wfForeignMemcKey( $this->wiki, 'awssqs', 'existence', $this->queueName ); + + // Use memcached to check for queue existence in order to avoid + // a request to AWS. + if ( !$wgMemc->get( $existenceKey ) ) { + try { + $this->queue = $this->client->createQueue( array( + 'QueueName' => $this->queueName + ) ); + $this->client->setQueueAttributes( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'Attributes' => array( + 'VisibilityTimeout' => $this->claimTTL + ) + ) ); + } catch ( SqsException $e ) { + throw new MWException( "Amazon SQS error: {$e->getMessage()}", 0, $e ); + } + + $wgMemc->set( $existenceKey, 1 ); + } + } + + function supportedOrders() { + return array( 'random' ); + } + + function optimalOrder() { + return 'random'; + } + + function doAck( Job $job ) { + $this->connect(); + $this->ackSize = null; + + try { + $this->client->deleteMessage( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'ReceiptHandle' => $job->metadata['aws_receipt'] + ) ); + } catch ( SqsException $e ) { + throw new MWException( "Amazon SQS error: {$e->getMessage()}", 0, $e ); + } + + return true; + } + + function doBatchPush( array $jobs, $flags ) { + $this->size = null; + + $entries = array(); + $now = time(); + foreach ( $jobs as $i => $job ) { + $entries[$i] = array( + 'Id' => $i, + 'MessageBody' => serialize( array( + 'cmd' => $job->getType(), + 'namespace' => $job->getTitle()->getNamespace(), + 'title' => $job->getTitle()->getText(), + 'params' => $job->getParams() + ) ), + ); + + if ( $this->checkDelay ) { + $delaySeconds = $job->getReleaseTimestamp() - $now; + if ( $delaySeconds >= 0 ) { + $entries[$i]['DelaySeconds'] = $delaySeconds; + } + } + } + + $this->connect(); + try { + $res = $this->client->sendMessageBatch( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'Entries' => $entries + ) ); + } catch ( SqsException $e ) { + throw new MWException( "Amazon SQS error: {$e->getMessage()}", 0, $e ); + } + + if ( count( $res['Failed'] ) ) { + $exc = new SqsException( $res['Failed'][0]['Message'], $res['Failed'][0]['Code'] ); + throw new MWException( 'Jobs failed to push to the AWS SQS Job Queue.', 0, $exc ); + } + + foreach ( $res['Successful'] as $success ) { + if ( $success['MD5OfMessageBody'] !== md5( $entries[$success['Id']]['MessageBody'] ) ) { + throw new MWException( 'Invalid MD5 of message body received.' ); + } + } + + return true; + } + + function doGetAcquiredCount() { + $this->getAttributes(); + + return $this->ackSize; + } + + function doGetSize() { + $this->getAttributes(); + + return $this->size; + } + + function doIsEmpty() { + return !$this->doGetSize(); + } + + function doPop() { + return $this->getJobs( 1, true )->current(); + } + + function getAllQueuedJobs() { + return $this->getJobs(); + } + + /** + * Gets a number of jobs from the SQS queue. + * + * @param int|bool $limit Max number of jobs to retrieve, or false for no limit + * @param bool $claim Whether to claim the jobs + * @return MappedIterator with the jobs + * @see JobQueue::getAllQueuedJobs + */ + private function getJobs( $limit = false, $claim = false ) { + $this->connect(); + + try { + $msgs = $this->client->receiveMessage( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'MaxNumberOfMessages' => $limit === false ? null : $limit, + 'VisibilityTimeout' => $claim ? $this->claimTTL : 0 + ) ); + } catch ( SqsException $e ) { + throw new MWException( "Amazon SQS error: {$e->getMessage()}", 0, $e ); + } + + $that = $this; + $callback = function( $msg ) use ( $that ) { + if( md5( $msg['Body'] ) !== $msg['MD5OfBody'] ) { + throw new MWException( 'Invalid MD5 of message body received.' ); + } + + $desc = unserialize( $msg['Body'] ); + $title = Title::makeTitle( $desc['namespace'], $desc['title'] ); + + $job = Job::factory( $desc['cmd'], $title, $desc['params'], $msg['MessageId'] ); + $job->metadata['aws_receipt'] = $msg['ReceiptHandle']; + + return $job; + }; + + // Clear the count cache. + $this->ackSize = $this->size = null; + + return new MappedIterator( $msgs['Messages'], $callback ); + } + + function doDelete() { + $entries = array(); + foreach ( $this->getAllQueuedJobs() as $i => $job ) { + $entries[] = array( + 'Id' => $i, + 'ReceiptHandle' => $job->metadata['aws_receipt'] + ); + } + + if ( !$entries ) { + return; + } + + // Should already be connected from $this->getAllQueuedJobs, but just in case. + $this->connect(); + try { + $res = $this->client->deleteMessageBatch( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'Entries' => $entries + ) ); + } catch ( SqsException $e ) { + throw new MWException( "Amazon SQS error: {$e->getMessage()}", 0, $e ); + } + + if ( count( $res['Failed'] ) ) { + $exc = new SqsException( $res['Failed'][0]['Message'], $res['Failed'][0]['Code'] ); + throw new MWException( 'Jobs failed to delete from AWS SQS Job Queue.', 0, $exc ); + } + } + + function doFlushCaches() { + $this->size = $this->ackSize = $this->delaySize = null; + } + + private function getAttributes() { + if ( $this->size !== null && $this->ackSize !== null && $this->delaySize !== null ) { + return; + } + + $this->connect(); + try { + $attrs = $this->client->getQueueAttributes( array( + 'QueueUrl' => $this->queue['QueueUrl'], + 'AttributeNames' => array( + 'ApproximateNumberOfMessages', + 'ApproximateNumberOfMessagesNotVisible', + 'ApproximateNumberOfMessagesDelayed' + ) + ) ); + } catch ( SqsException $e ) { + throw new MWException( "Amazon SQS error: {$e->getMessage()}", 0, $e ); + } + + $this->size = (int)$attrs['Attributes']['ApproximateNumberOfMessages']; + $this->ackSize = (int)$attrs['Attributes']['ApproximateNumberOfMessagesNotVisible']; + $this->delaySize = (int)$attrs['Attributes']['ApproximateNumberOfMessagesDelayed']; + } +} -- To view, visit https://gerrit.wikimedia.org/r/51635 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ica5e18d31b335e8800427a154bb8809d93797dfb Gerrit-PatchSet: 9 Gerrit-Project: mediawiki/extensions/AWS Gerrit-Branch: master Gerrit-Owner: Parent5446 <[email protected]> Gerrit-Reviewer: Aaron Schulz <[email protected]> Gerrit-Reviewer: Parent5446 <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
