Awight has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/279745

Change subject: [WIP] looking at direct kafka integration.
......................................................................

[WIP] looking at direct kafka integration.

Change-Id: I0d0c3c4e9d1924388e828686cd908faa912c591d
TODO: Need to compare to being part of a kafkatee pipe
Bug: T130304
---
M composer.json
M sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module
M sites/all/modules/wmf_common/Queue.php
3 files changed, 22 insertions(+), 1 deletion(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/crm 
refs/changes/45/279745/1

diff --git a/composer.json b/composer.json
index 7b90bab..25c4678 100644
--- a/composer.json
+++ b/composer.json
@@ -18,6 +18,7 @@
     "require": {
         "coderkungfu/php-queue": "dev-master",
         "cogpowered/finediff": "0.*",
+        "nmred/kafka-php": "0.1.5",
         "wikimedia/donation-interface": "dev-master",
         "wikimedia/smash-pig": "dev-master",
         "fusesource/stomp-php": "2.*",
diff --git a/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module 
b/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module
index e5a631e..ffac8d0 100644
--- a/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module
+++ b/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module
@@ -83,7 +83,7 @@
 
   $processed = 0;
   foreach ($dequeue_params as $type => $params) {
-    $processed += queue2civicrm_stomp()->dequeue_loop(
+    $processed += Queue::process_kafka_batch(
       $params['queue'], false, //not using the queue count limiter anymore
       $cycle_time, $params['callback']
     );
diff --git a/sites/all/modules/wmf_common/Queue.php 
b/sites/all/modules/wmf_common/Queue.php
index c9702c5..602b629 100644
--- a/sites/all/modules/wmf_common/Queue.php
+++ b/sites/all/modules/wmf_common/Queue.php
@@ -1,5 +1,7 @@
 <?php
 
+use Kafka\Consumer;
+
 class Queue {
     protected $url;
     protected $connection = NULL;
@@ -14,6 +16,24 @@
         $this->disconnect();
     }
 
+       /**
+        * XXX Rudderless prototyping
+        */
+       function process_kafka_batch( $queue, $batch_size, $time_limit, 
$callback ) {
+               $stripped_topic = preg_replace( '/\/|queue\b/', '', $queue );
+               $consumer = Consumer::getInstance( array( 'localhost:9092' ) );
+               $partition = 0;
+               $consumer->setPartition( $queue, $partition );
+               $result = $consumer->fetch();
+               // FIXME: error handling
+               foreach ( $result[$queue] as $partId => $messages ) {
+                       foreach ( $messages as $message ) {
+                               // FIXME: transactionality, etc.
+                               $callback( $message );
+                       }
+               }
+       }
+
     /**
      * Pop from a queue and execute a callback on each message
      *

-- 
To view, visit https://gerrit.wikimedia.org/r/279745
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0d0c3c4e9d1924388e828686cd908faa912c591d
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/crm
Gerrit-Branch: master
Gerrit-Owner: Awight <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to