This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new cbacd96  Use non-blocking techniques when interfacing to Kafka. (#2582)
cbacd96 is described below

commit cbacd967794db8180b4ddb31cc7907f88e0af6fb
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Tue Aug 8 13:37:43 2017 +0200

    Use non-blocking techniques when interfacing to Kafka. (#2582)
    
    Producing a message can be done with a callback upon receiving the message, 
effectively acting as a non-blocking API.
    
    Consuming a message is a bit trickier. Adding `blocking` instructs the 
default dispatcher to create new threads if necessary.
---
 .../connector/kafka/KafkaProducerConnector.scala   | 16 +++++++++++----
 .../whisk/core/connector/MessageConsumer.scala     | 23 ++++++++++++----------
 2 files changed, 25 insertions(+), 14 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 84f714b..4111cb5 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -22,9 +22,11 @@ import java.util.UUID
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
+import scala.concurrent.Promise
 import scala.util.Failure
 import scala.util.Success
 
+import org.apache.kafka.clients.producer.Callback
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -50,10 +52,16 @@ class KafkaProducerConnector(
         implicit val transid = msg.transid
         val record = new ProducerRecord[String, String](topic, "messages", 
msg.serialize)
 
-        Future {
-            logging.debug(this, s"sending to topic '$topic' msg '$msg'")
-            producer.send(record).get
-        } andThen {
+        logging.debug(this, s"sending to topic '$topic' msg '$msg'")
+        val produced = Promise[RecordMetadata]()
+        producer.send(record, new Callback {
+            override def onCompletion(metadata: RecordMetadata, exception: 
Exception): Unit = {
+                if (exception == null) produced.success(metadata)
+                else produced.failure(exception)
+            }
+        })
+
+        produced.future.andThen {
             case Success(status) =>
                 logging.debug(this, s"sent message: 
${status.topic()}[${status.partition()}][${status.offset()}]")
                 sentCounter.next()
diff --git 
a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala 
b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 87353df..f335441 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -20,6 +20,7 @@ package whisk.core.connector
 import scala.annotation.tailrec
 import scala.collection.mutable
 import scala.concurrent.Future
+import scala.concurrent.blocking
 import scala.concurrent.duration._
 import scala.util.Failure
 
@@ -167,16 +168,18 @@ class MessageFeed(
     private def fillPipeline(): Unit = {
         if (outstandingMessages.size <= pipelineFillThreshold) {
             Future {
-                // Grab next batch of messages and commit offsets immediately
-                // essentially marking the activation as having satisfied "at 
most once"
-                // semantics (this is the point at which the activation is 
considered started).
-                // If the commit fails, then messages peeked are peeked again 
on the next poll.
-                // While the commit is synchronous and will block until it 
completes, at steady
-                // state with enough buffering (i.e., maxPipelineDepth > 
maxPeek), the latency
-                // of the commit should be masked.
-                val records = consumer.peek(longPollDuration)
-                consumer.commit()
-                FillCompleted(records.toSeq)
+                blocking {
+                    // Grab next batch of messages and commit offsets 
immediately
+                    // essentially marking the activation as having satisfied 
"at most once"
+                    // semantics (this is the point at which the activation is 
considered started).
+                    // If the commit fails, then messages peeked are peeked 
again on the next poll.
+                    // While the commit is synchronous and will block until it 
completes, at steady
+                    // state with enough buffering (i.e., maxPipelineDepth > 
maxPeek), the latency
+                    // of the commit should be masked.
+                    val records = consumer.peek(longPollDuration)
+                    consumer.commit()
+                    FillCompleted(records.toSeq)
+                }
             }.andThen {
                 case Failure(e: CommitFailedException) => logging.error(this, 
s"failed to commit $description consumer offset: $e")
                 case Failure(e: Throwable)             => logging.error(this, 
s"exception while pulling new $description records: $e")

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to