Awight has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/279744

Change subject: [WIP] Mirror to Kafka
......................................................................

[WIP] Mirror to Kafka

Bug: T130304
Change-Id: I293e2d3f6a8e34fcbc37ff7e97d83023818044b9
---
M composer.json
M gateway_common/DonationData.php
M gateway_common/DonationQueue.php
3 files changed, 48 insertions(+), 2 deletions(-)


  git pull 
ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/DonationInterface 
refs/changes/44/279744/1

diff --git a/composer.json b/composer.json
index 3846e2f..e6f1085 100644
--- a/composer.json
+++ b/composer.json
@@ -23,6 +23,7 @@
                "fusesource/stomp-php": "2.1.*",
                "minfraud/http": "^1.70",
                "monolog/monolog": "1.12.0",
+               "nmred/kafka-php": "0.1.5",
                "predis/predis": "1.*",
                "psr/log": "1.0.0",
                "zordius/lightncandy": "0.18",
diff --git a/gateway_common/DonationData.php b/gateway_common/DonationData.php
index da1160b..d989110 100644
--- a/gateway_common/DonationData.php
+++ b/gateway_common/DonationData.php
@@ -860,7 +860,7 @@
 
                if ( $ctid ) {
                        // We're updating a record, but only if we actually 
have data to update
-                       if ( count( $tracking_data ) ) {
+                       if ( $tracking_data ) {
                                $db->update(
                                        'contribution_tracking',
                                        $tracking_data,
@@ -1034,7 +1034,8 @@
                                $country = $this->normalized['country'];
                                $defaultCurrency = 
NationalCurrencies::getNationalCurrency( $country );
                        }
-               } else {
+               }
+               if ( !$defaultCurrency ) {
                        $defaultCurrency = $this->gateway->getGlobal( 
'FallbackCurrency' );
                }
                if ( !$defaultCurrency ) {
diff --git a/gateway_common/DonationQueue.php b/gateway_common/DonationQueue.php
index d0b1afc..47f4eac 100644
--- a/gateway_common/DonationQueue.php
+++ b/gateway_common/DonationQueue.php
@@ -1,5 +1,8 @@
 <?php
 
+use Kafka\MetaDataFromKafka;
+use Kafka\Produce;
+
 class DonationQueue {
        protected static $instance;
 
@@ -26,6 +29,7 @@
                $properties = $this->buildHeaders( $transaction );
                $message = $this->buildBody( $transaction );
                $this->newBackend( $queue )->push( $message, $properties );
+               $this->mirrorToKafka( $queue, $message, $properties );
        }
 
        public function pop( $queue ) {
@@ -53,6 +57,7 @@
                $properties = $this->buildHeaders( $transaction );
                $message = $this->buildBody( $transaction );
                $this->newBackend( $queue )->set( $correlationId, $message, 
$properties );
+               $this->mirrorToKafka( $queue, $message, $properties );
        }
 
        public function get( $correlationId, $queue ) {
@@ -281,4 +286,43 @@
 
                return $transaction;
        }
+
+       /**
+        * Transitional function to copy data to a Kafka sink.
+        * TODO:
+        *   - We'll eventually wrap Kafka like any other WriteOnlyQueue
+        *   - Configuration should support mirroring, for future migrations.
+        */
+       protected function mirrorToKafka( $queue, $message, $properties ) {
+               if ( !GatewayAdapter::getGlobal( 'EnableKafka' ) ) {
+                       return;
+               }
+               if ( !class_exists( 'Kafka\Produce' ) ) {
+                       // TODO: log error.
+                       return;
+               }
+               $metadata = new MetaDataFromKafka( GatewayAdapter::getGlobal( 
'KafkaServers' ) );
+               $produce = new Produce( $metadata );
+
+               // Confirm that the message was delivered.
+               $produce->setRequireAck( -1 );
+
+               $timeout = GatewayAdapter::getGlobal( 'KafkaSendTimeout' );
+               if ( $timeout ) {
+                       $produce->getClient()->setStreamOption( 
'SendTimeoutSec', 0 );
+                       $produce->getClient()->setStreamOption( 
'SendTimeoutUSec',
+                               intval( $timeOut * 1000000 ) );
+               }
+
+               // FIXME: Decide whether to partition.
+               $partition = 0;
+               // TODO: helper function, and a standard way to do this
+               $unifiedMessage = json_encode( $message + $properties );
+               $produce->setMessages( $queue, $partition, array( 
$unifiedMessage ) );
+               try {
+                       $produce->send();
+               } catch ( Kafka\Exception $ex ) {
+                       // TODO: log error and retry or fallback so we don't 
lose the message.
+               }
+       }
 }

-- 
To view, visit https://gerrit.wikimedia.org/r/279744
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I293e2d3f6a8e34fcbc37ff7e97d83023818044b9
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/DonationInterface
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to