jenkins-bot has submitted this change and it was merged.
Change subject: Create Kafka event relayer
......................................................................
Create Kafka event relayer
Bug: T125138
Change-Id: I9d7705cb164bc975c3a0ddf4a33ac54fe7de931c
---
M autoload.php
M includes/libs/eventrelayer/EventRelayer.php
A includes/libs/eventrelayer/EventRelayerKafka.php
M includes/libs/objectcache/WANObjectCache.php
4 files changed, 87 insertions(+), 1 deletion(-)
Approvals:
Aaron Schulz: Looks good to me, approved
jenkins-bot: Verified
diff --git a/autoload.php b/autoload.php
index 22411cd..e941fc1 100644
--- a/autoload.php
+++ b/autoload.php
@@ -397,6 +397,7 @@
'ErrorPageError' => __DIR__ . '/includes/exception/ErrorPageError.php',
'EventRelayer' => __DIR__ .
'/includes/libs/eventrelayer/EventRelayer.php',
'EventRelayerGroup' => __DIR__ . '/includes/EventRelayerGroup.php',
+ 'EventRelayerKafka' => __DIR__ .
'/includes/libs/eventrelayer/EventRelayerKafka.php',
'EventRelayerNull' => __DIR__ .
'/includes/libs/eventrelayer/EventRelayerNull.php',
'Exif' => __DIR__ . '/includes/media/Exif.php',
'ExifBitmapHandler' => __DIR__ . '/includes/media/ExifBitmap.php',
diff --git a/includes/libs/eventrelayer/EventRelayer.php
b/includes/libs/eventrelayer/EventRelayer.php
index b61cae7..f28a4c0 100644
--- a/includes/libs/eventrelayer/EventRelayer.php
+++ b/includes/libs/eventrelayer/EventRelayer.php
@@ -18,15 +18,22 @@
* @file
* @author Aaron Schulz
*/
+use Psr\Log\LoggerInterface;
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\NullLogger;
/**
* Base class for reliable event relays
*/
-abstract class EventRelayer {
+abstract class EventRelayer implements LoggerAwareInterface {
+ /** @var LoggerInterface */
+ protected $logger;
+
/**
* @param array $params
*/
public function __construct( array $params ) {
+ $this->logger = new NullLogger();
}
/**
@@ -48,6 +55,14 @@
}
/**
+ * Set logger instance.
+ * @param LoggerInterface $logger
+ */
+ public function setLogger( LoggerInterface $logger ) {
+ $this->logger = $logger;
+ }
+
+ /**
* @param string $channel
* @param array $events List of event data maps
* @return bool Success
diff --git a/includes/libs/eventrelayer/EventRelayerKafka.php
b/includes/libs/eventrelayer/EventRelayerKafka.php
new file mode 100644
index 0000000..3555a23
--- /dev/null
+++ b/includes/libs/eventrelayer/EventRelayerKafka.php
@@ -0,0 +1,66 @@
+<?php
+use Kafka\Produce;
+
+/**
+ * Event relayer for Apache Kafka.
+ * Configuring for WANCache:
+ * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' =>
'localhost:9092' ],
+ */
+class EventRelayerKafka extends EventRelayer {
+
+ /**
+ * Configuration.
+ *
+ * @var Config
+ */
+ protected $config;
+
+ /**
+ * Kafka producer.
+ *
+ * @var Produce
+ */
+ protected $producer;
+
+ /**
+ * Create Kafka producer.
+ *
+ * @param Config $config
+ */
+ public function __construct( array $params ) {
+ $this->config = new HashConfig( $params );
+ if ( !$this->config->has( 'KafkaEventHost' ) ) {
+ throw new InvalidArgumentException( "KafkaEventHost
must be configured" );
+ }
+ }
+
+ /**
+ * Get the producer object from kafka-php.
+ * @return Produce
+ */
+ protected function getKafkaProducer() {
+ if ( !$this->producer ) {
+ $this->producer = Produce::getInstance( null, null,
$this->config->get( 'KafkaEventHost' ) );
+ }
+ return $this->producer;
+ }
+
+ /**
+ * (non-PHPdoc)
+ *
+ * @see EventRelayer::doNotify()
+ *
+ */
+ protected function doNotify( $channel, array $events ) {
+ $jsonEvents = array_map( 'json_encode', $events );
+ try {
+ $producer = $this->getKafkaProducer();
+ $producer->setMessages( $channel, 0, $jsonEvents );
+ $producer->send();
+ } catch ( \Kafka\Exception $e ) {
+ $this->logger->warning( "Sending events failed: $e" );
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/includes/libs/objectcache/WANObjectCache.php
b/includes/libs/objectcache/WANObjectCache.php
index b212e97..dd2e0d5 100644
--- a/includes/libs/objectcache/WANObjectCache.php
+++ b/includes/libs/objectcache/WANObjectCache.php
@@ -149,6 +149,10 @@
$this->setLogger( isset( $params['logger'] ) ?
$params['logger'] : new NullLogger() );
}
+ /**
+ * Set logger instance.
+ * @param LoggerInterface $logger
+ */
public function setLogger( LoggerInterface $logger ) {
$this->logger = $logger;
}
--
To view, visit https://gerrit.wikimedia.org/r/283750
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I9d7705cb164bc975c3a0ddf4a33ac54fe7de931c
Gerrit-PatchSet: 9
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <[email protected]>
Gerrit-Reviewer: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Krinkle <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Smalyshev <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits