Awight has uploaded a new change for review.
https://gerrit.wikimedia.org/r/226947
Change subject: WIP Orphan slayer reads from frack Redis
......................................................................
WIP Orphan slayer reads from frack Redis
Still preserves the mirroring to STOMP, in case of rollback.
Deployment notes:
* composer update
* Must configure the queue's order_key = 'date' and other Redis connection
settings. See example config in @Ia5c21efc5
Bug: T99017
Change-Id: Ia4494d2f3fd1cba3c440fe4a7a72eb07191f90c3
---
M DonationInterface.php
M composer.json
M composer.lock
M globalcollect_gateway/scripts/orphans.php
4 files changed, 128 insertions(+), 51 deletions(-)
git pull
ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/DonationInterface
refs/changes/47/226947/1
diff --git a/DonationInterface.php b/DonationInterface.php
index 2cbff32..9539e1b 100644
--- a/DonationInterface.php
+++ b/DonationInterface.php
@@ -467,26 +467,29 @@
$wgDonationInterfaceQueues = array(
// Incoming donations that we think have been paid for.
'completed' => array(),
+
// So-called limbo queue for GlobalCollect, where we store donor
personal
// information while waiting for the donor to return from iframe or a
// redirect. It's very important that this data is not stored anywhere
// permanent such as logs or the database, until we know this person
// finished making a donation.
// FIXME: Note that this must be an instance of KeyValueStore.
- 'globalcollect-cc-limbo' => array(
- 'type' => 'PHPQueue\Backend\Memcache',
- 'servers' => array( 'localhost' ),
- # 30 days, in seconds
- 'expiry' => 2592000,
- ),
- // The general limbo queue (see above). FIXME: deprecated?
- 'limbo' => array(
- 'type' => 'PHPQueue\Backend\Memcache',
- 'servers' => array( 'localhost' ),
- 'expiry' => 2592000,
- ),
- // Where limbo messages go to die, if the orphan slayer decides they are
- // still in one of the pending states. FIXME: who reads from this
queue?
+ //
+ // Example of a PCI-compliant queue configuration:
+ //
+ // 'globalcollect-cc-limbo' => array(
+ // 'type' => 'PHPQueue\Backend\Predis',
+ // # Note that servers cannot be an array, due to some incompatibility
+ // # with aggregate connections.
+ // 'servers' => 'tcp://payments1003.eqiad.net',
+ // # 1 hour, in seconds
+ // 'expiry' => 3600,
+ // 'score_key' => 'date',
+ // ),
+ // 'limbo' => ...
+
+ // Transactions still needing action before they are settled.
+ // FIXME: who reads from this queue?
'pending' => array(),
// Non-critical queues
diff --git a/composer.json b/composer.json
index 65f6a1d..629394c 100644
--- a/composer.json
+++ b/composer.json
@@ -21,8 +21,9 @@
"require": {
"coderkungfu/php-queue": "dev-master",
"fusesource/stomp-php": "2.1.*",
- "psr/log": "1.0.0",
"monolog/monolog": "1.12.0",
+ "predis/predis": "1.*",
+ "psr/log": "1.0.0",
"zordius/lightncandy": "0.18"
},
"repositories": [
diff --git a/composer.lock b/composer.lock
index 41a9c26..96c5f25 100644
--- a/composer.lock
+++ b/composer.lock
@@ -1,10 +1,10 @@
{
"_readme": [
"This file locks the dependencies of your project to a known state",
- "Read more about it at
http://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file",
+ "Read more about it at
https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file",
"This file is @generated automatically"
],
- "hash": "2f350a68b09bdf7dee7b4c1a7b2d37bf",
+ "hash": "548d536bcd2bbb4110e29b3d72294f1c",
"packages": [
{
"name": "clio/clio",
@@ -231,6 +231,56 @@
"time": "2014-12-29 21:29:35"
},
{
+ "name": "predis/predis",
+ "version": "v1.0.1",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/nrk/predis.git",
+ "reference": "7a170b3d8123c556597b4fbdb88631f99de180c2"
+ },
+ "dist": {
+ "type": "zip",
+ "url":
"https://api.github.com/repos/nrk/predis/zipball/7a170b3d8123c556597b4fbdb88631f99de180c2",
+ "reference": "7a170b3d8123c556597b4fbdb88631f99de180c2",
+ "shasum": ""
+ },
+ "require": {
+ "php": ">=5.3.2"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "~4.0"
+ },
+ "suggest": {
+ "ext-curl": "Allows access to Webdis when paired with
phpiredis",
+ "ext-phpiredis": "Allows faster serialization and
deserialization of the Redis protocol"
+ },
+ "type": "library",
+ "autoload": {
+ "psr-4": {
+ "Predis\\": "src/"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "MIT"
+ ],
+ "authors": [
+ {
+ "name": "Daniele Alessandri",
+ "email": "[email protected]",
+ "homepage": "http://clorophilla.net"
+ }
+ ],
+ "description": "Flexible and feature-complete PHP client library
for Redis",
+ "homepage": "http://github.com/nrk/predis",
+ "keywords": [
+ "nosql",
+ "predis",
+ "redis"
+ ],
+ "time": "2015-01-02 12:51:34"
+ },
+ {
"name": "psr/log",
"version": "1.0.0",
"source": {
diff --git a/globalcollect_gateway/scripts/orphans.php
b/globalcollect_gateway/scripts/orphans.php
index 1184e6a..be49f00 100644
--- a/globalcollect_gateway/scripts/orphans.php
+++ b/globalcollect_gateway/scripts/orphans.php
@@ -28,7 +28,7 @@
$wgDonationInterfaceEnableIPVelocityFilter = false;
if ( !$this->getOrphanGlobal( 'enable' ) ){
- echo "\nOrphan cron disabled. Have a nice day.";
+ $this->warning( 'Orphan cron disabled. Have a nice
day.' );
return;
}
@@ -61,6 +61,7 @@
//first, we need to... clean up the limbo queue.
+ // TODO: Remove STOMP code.
//building in some redundancy here.
$collider_keepGoing = true;
$am_called_count = 0;
@@ -75,37 +76,24 @@
}
$this->logger->info( 'Removed ' . $this->removed_message_count
. ' messages and antimessages.' );
- if ( $this->keepGoing() ){
+ do {
//Pull a batch of CC orphans, keeping in mind that
Things May Have Happened in the small slice of time since we handled the
antimessages.
- $orphans = $this->getStompOrphans();
- while ( count( $orphans ) && $this->keepGoing() ){
- echo count( $orphans ) . " orphans left this
batch\n";
- //..do stuff.
- foreach ( $orphans as $correlation_id =>
$orphan ) {
- //process
- if ( $this->keepGoing() ){
- // TODO: Maybe we can simplify
by checking that modified time < job start time.
- echo "Attempting to rectify
orphan $correlation_id\n";
- if ( $this->rectifyOrphan(
$orphan ) ){
- // TODO: Stop mirroring
to STOMP.
-
$this->addStompCorrelationIDToAckBucket( $correlation_id );
-
-
$this->handled_ids[$correlation_id] = 'rectified';
- } else {
-
$this->handled_ids[$correlation_id] = 'error';
- }
+ $orphans = $this->getOrphans();
+ echo count( $orphans ) . " orphans left this batch\n";
+ //..do stuff.
+ foreach ( $orphans as $correlation_id => $orphan ) {
+ //process
+ if ( $this->keepGoing() ){
+ // TODO: Maybe we can simplify by
checking that modified time < job start time.
+ $this->logger->info( "Attempting to
rectify orphan $correlation_id" );
+ if ( $this->rectifyOrphan( $orphan ) ) {
+
$this->handled_ids[$correlation_id] = 'rectified';
+ } else {
+
$this->handled_ids[$correlation_id] = 'error';
}
}
- // TODO: Stop mirroring to STOMP.
- $this->addStompCorrelationIDToAckBucket( false,
true ); //ack all outstanding.
- if ( $this->keepGoing() ){
- $orphans = $this->getStompOrphans();
- }
}
- }
-
- // TODO: Stop mirroring to STOMP.
- $this->addStompCorrelationIDToAckBucket( false, true ); //ack
all outstanding.
+ } while ( count( $orphans ) && $this->keepGoing() );
//TODO: Make stats squirt out all over the place.
$am = 0;
@@ -215,7 +203,48 @@
return $count;
}
+ protected function getOrphans() {
+ // TODO: Make this configurable.
+ $time_buffer = 60*20; //20 minutes? Sure. Why not?
+
+ $orphans = array();
+ $false_orphans = array();
+ while ( $message = DonationQueue::instance()->pop(
GlobalCollectAdapter::GC_CC_LIMBO_QUEUE ) ) {
+ $correlation_id = 'globalcollect-' .
$message['gateway_txn_id'];
+ if ( array_key_exists( $correlation_id,
$this->handled_ids ) ) {
+ continue;
+ }
+
+ // Check the timestamp to see if it's old enough, and
stop when
+ // we're below the threshold. Messages are guaranteed
to pop in
+ // chronological order.
+ $elapsed = $this->start_time - $message['date'];
+ if ( $elapsed < $time_buffer ) {
+ // Put it back!
+ DonationQueue::instance()->set(
$correlation_id, $message, GlobalCollectAdapter::GC_CC_LIMBO_QUEUE );
+ break;
+ }
+
+ // We got ourselves an orphan!
+ $order_id = explode('-', $correlation_id);
+ $order_id = $order_id[1];
+ $message['order_id'] = $order_id;
+ $message = unCreateQueueMessage($message);
+ $orphans[$correlation_id] = $message;
+ $this->logger->info( "Found an orphan! $correlation_id"
);
+
+ // TODO: stop stomping
+ $this->addStompCorrelationIDToAckBucket(
$correlation_id );
+ }
+
+ // TODO: stop stomping
+ $this->addStompCorrelationIDToAckBucket( false, true );
+
+ return $orphans;
+ }
+
/**
+ * TODO: Remove this along with other STOMP code. Use getOrphans()
instead.
* Returns an array of at most $batch_size decoded orphans that we don't
* think we've rectified yet.
*
@@ -223,19 +252,13 @@
* decoded stomp message body.
*/
protected function getStompOrphans(){
- // TODO: Remove STOMP block.
- // FIXME: Expiration should be set in configuration, and
enforced by
- // the queue's native expiry anyway.
$time_buffer = 60*20; //20 minutes? Sure. Why not?
$selector = "payment_method = 'cc' AND gateway='globalcollect'";
echo "Fetching 300 Orphans\n";
$messages = stompFetchMessages( 'cc-limbo', $selector, 300 );
- // TODO: Batch size from config.
$batch_size = 300;
echo "Fetching {$batch_size} Orphans\n";
-
- // TODO: Write popMultiple for Memcache.
$orphans = array();
$false_orphans = array();
--
To view, visit https://gerrit.wikimedia.org/r/226947
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia4494d2f3fd1cba3c440fe4a7a72eb07191f90c3
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/DonationInterface
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits