Awight has uploaded a new change for review.
https://gerrit.wikimedia.org/r/279745
Change subject: [WIP] looking at direct kafka integration.
......................................................................
[WIP] looking at direct kafka integration.
Change-Id: I0d0c3c4e9d1924388e828686cd908faa912c591d
TODO: Need to compare to being part of a kafkatee pipe
Bug: T130304
---
M composer.json
M sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module
M sites/all/modules/wmf_common/Queue.php
3 files changed, 22 insertions(+), 1 deletion(-)
git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/crm
refs/changes/45/279745/1
diff --git a/composer.json b/composer.json
index 7b90bab..25c4678 100644
--- a/composer.json
+++ b/composer.json
@@ -18,6 +18,7 @@
"require": {
"coderkungfu/php-queue": "dev-master",
"cogpowered/finediff": "0.*",
+ "nmred/kafka-php": "0.1.5",
"wikimedia/donation-interface": "dev-master",
"wikimedia/smash-pig": "dev-master",
"fusesource/stomp-php": "2.*",
diff --git a/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module
b/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module
index e5a631e..ffac8d0 100644
--- a/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module
+++ b/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module
@@ -83,7 +83,7 @@
$processed = 0;
foreach ($dequeue_params as $type => $params) {
- $processed += queue2civicrm_stomp()->dequeue_loop(
+ $processed += Queue::process_kafka_batch(
$params['queue'], false, //not using the queue count limiter anymore
$cycle_time, $params['callback']
);
diff --git a/sites/all/modules/wmf_common/Queue.php
b/sites/all/modules/wmf_common/Queue.php
index c9702c5..602b629 100644
--- a/sites/all/modules/wmf_common/Queue.php
+++ b/sites/all/modules/wmf_common/Queue.php
@@ -1,5 +1,7 @@
<?php
+use Kafka\Consumer;
+
class Queue {
protected $url;
protected $connection = NULL;
@@ -14,6 +16,24 @@
$this->disconnect();
}
+ /**
+ * XXX Rudderless prototyping
+ */
+ function process_kafka_batch( $queue, $batch_size, $time_limit,
$callback ) {
+ $stripped_topic = preg_replace( '/\/|queue\b/', '', $queue );
+ $consumer = Consumer::getInstance( array( 'localhost:9092' ) );
+ $partition = 0;
+ $consumer->setPartition( $queue, $partition );
+ $result = $consumer->fetch();
+ // FIXME: error handling
+ foreach ( $result[$queue] as $partId => $messages ) {
+ foreach ( $messages as $message ) {
+ // FIXME: transactionality, etc.
+ $callback( $message );
+ }
+ }
+ }
+
/**
* Pop from a queue and execute a callback on each message
*
--
To view, visit https://gerrit.wikimedia.org/r/279745
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0d0c3c4e9d1924388e828686cd908faa912c591d
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/crm
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits