Awight has uploaded a new change for review.
https://gerrit.wikimedia.org/r/279744
Change subject: [WIP] Mirror to Kafka
......................................................................
[WIP] Mirror to Kafka
Bug: T130304
Change-Id: I293e2d3f6a8e34fcbc37ff7e97d83023818044b9
---
M composer.json
M gateway_common/DonationData.php
M gateway_common/DonationQueue.php
3 files changed, 48 insertions(+), 2 deletions(-)
git pull
ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/DonationInterface
refs/changes/44/279744/1
diff --git a/composer.json b/composer.json
index 3846e2f..e6f1085 100644
--- a/composer.json
+++ b/composer.json
@@ -23,6 +23,7 @@
"fusesource/stomp-php": "2.1.*",
"minfraud/http": "^1.70",
"monolog/monolog": "1.12.0",
+ "nmred/kafka-php": "0.1.5",
"predis/predis": "1.*",
"psr/log": "1.0.0",
"zordius/lightncandy": "0.18",
diff --git a/gateway_common/DonationData.php b/gateway_common/DonationData.php
index da1160b..d989110 100644
--- a/gateway_common/DonationData.php
+++ b/gateway_common/DonationData.php
@@ -860,7 +860,7 @@
if ( $ctid ) {
// We're updating a record, but only if we actually
have data to update
- if ( count( $tracking_data ) ) {
+ if ( $tracking_data ) {
$db->update(
'contribution_tracking',
$tracking_data,
@@ -1034,7 +1034,8 @@
$country = $this->normalized['country'];
$defaultCurrency =
NationalCurrencies::getNationalCurrency( $country );
}
- } else {
+ }
+ if ( !$defaultCurrency ) {
$defaultCurrency = $this->gateway->getGlobal(
'FallbackCurrency' );
}
if ( !$defaultCurrency ) {
diff --git a/gateway_common/DonationQueue.php b/gateway_common/DonationQueue.php
index d0b1afc..47f4eac 100644
--- a/gateway_common/DonationQueue.php
+++ b/gateway_common/DonationQueue.php
@@ -1,5 +1,8 @@
<?php
+use Kafka\MetaDataFromKafka;
+use Kafka\Produce;
+
class DonationQueue {
protected static $instance;
@@ -26,6 +29,7 @@
$properties = $this->buildHeaders( $transaction );
$message = $this->buildBody( $transaction );
$this->newBackend( $queue )->push( $message, $properties );
+ $this->mirrorToKafka( $queue, $message, $properties );
}
public function pop( $queue ) {
@@ -53,6 +57,7 @@
$properties = $this->buildHeaders( $transaction );
$message = $this->buildBody( $transaction );
$this->newBackend( $queue )->set( $correlationId, $message,
$properties );
+ $this->mirrorToKafka( $queue, $message, $properties );
}
public function get( $correlationId, $queue ) {
@@ -281,4 +286,43 @@
return $transaction;
}
+
+ /**
+ * Transitional function to copy data to a Kafka sink.
+ * TODO:
+ * - We'll eventually wrap Kafka like any other WriteOnlyQueue
+ * - Configuration should support mirroring, for future migrations.
+ */
+ protected function mirrorToKafka( $queue, $message, $properties ) {
+ if ( !GatewayAdapter::getGlobal( 'EnableKafka' ) ) {
+ return;
+ }
+ if ( !class_exists( 'Kafka\Produce' ) ) {
+ // TODO: log error.
+ return;
+ }
+ $metadata = new MetaDataFromKafka( GatewayAdapter::getGlobal(
'KafkaServers' ) );
+ $produce = new Produce( $metadata );
+
+ // Confirm that the message was delivered.
+ $produce->setRequireAck( -1 );
+
+ $timeout = GatewayAdapter::getGlobal( 'KafkaSendTimeout' );
+ if ( $timeout ) {
+ $produce->getClient()->setStreamOption(
'SendTimeoutSec', 0 );
+ $produce->getClient()->setStreamOption(
'SendTimeoutUSec',
+ intval( $timeOut * 1000000 ) );
+ }
+
+ // FIXME: Decide whether to partition.
+ $partition = 0;
+ // TODO: helper function, and a standard way to do this
+ $unifiedMessage = json_encode( $message + $properties );
+ $produce->setMessages( $queue, $partition, array(
$unifiedMessage ) );
+ try {
+ $produce->send();
+ } catch ( Kafka\Exception $ex ) {
+ // TODO: log error and retry or fallback so we don't
lose the message.
+ }
+ }
}
--
To view, visit https://gerrit.wikimedia.org/r/279744
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I293e2d3f6a8e34fcbc37ff7e97d83023818044b9
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/DonationInterface
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits