This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new cbacd96 Use non-blocking techniques when interfacing to Kafka. (#2582) cbacd96 is described below commit cbacd967794db8180b4ddb31cc7907f88e0af6fb Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Tue Aug 8 13:37:43 2017 +0200 Use non-blocking techniques when interfacing to Kafka. (#2582) Producing a message can be done with a callback upon receiving the message, effectively acting as a non-blocking API. Consuming a message is a bit trickier. Adding `blocking` instructs the default dispatcher to create new threads if necessary. --- .../connector/kafka/KafkaProducerConnector.scala | 16 +++++++++++---- .../whisk/core/connector/MessageConsumer.scala | 23 ++++++++++++---------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala index 84f714b..4111cb5 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala @@ -22,9 +22,11 @@ import java.util.UUID import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.concurrent.Promise import scala.util.Failure import scala.util.Success +import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord @@ -50,10 +52,16 @@ class KafkaProducerConnector( implicit val transid = msg.transid val record = new ProducerRecord[String, String](topic, "messages", msg.serialize) - Future { - logging.debug(this, s"sending to topic '$topic' msg '$msg'") - producer.send(record).get - } andThen { + logging.debug(this, s"sending to topic '$topic' msg '$msg'") + val produced = Promise[RecordMetadata]() + producer.send(record, new Callback { + override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { + if (exception == null) produced.success(metadata) + else produced.failure(exception) + } + }) + + produced.future.andThen { case Success(status) => logging.debug(this, s"sent message: ${status.topic()}[${status.partition()}][${status.offset()}]") sentCounter.next() diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala index 87353df..f335441 100644 --- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala +++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala @@ -20,6 +20,7 @@ package whisk.core.connector import scala.annotation.tailrec import scala.collection.mutable import scala.concurrent.Future +import scala.concurrent.blocking import scala.concurrent.duration._ import scala.util.Failure @@ -167,16 +168,18 @@ class MessageFeed( private def fillPipeline(): Unit = { if (outstandingMessages.size <= pipelineFillThreshold) { Future { - // Grab next batch of messages and commit offsets immediately - // essentially marking the activation as having satisfied "at most once" - // semantics (this is the point at which the activation is considered started). - // If the commit fails, then messages peeked are peeked again on the next poll. - // While the commit is synchronous and will block until it completes, at steady - // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency - // of the commit should be masked. - val records = consumer.peek(longPollDuration) - consumer.commit() - FillCompleted(records.toSeq) + blocking { + // Grab next batch of messages and commit offsets immediately + // essentially marking the activation as having satisfied "at most once" + // semantics (this is the point at which the activation is considered started). + // If the commit fails, then messages peeked are peeked again on the next poll. + // While the commit is synchronous and will block until it completes, at steady + // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency + // of the commit should be masked. + val records = consumer.peek(longPollDuration) + consumer.commit() + FillCompleted(records.toSeq) + } }.andThen { case Failure(e: CommitFailedException) => logging.error(this, s"failed to commit $description consumer offset: $e") case Failure(e: Throwable) => logging.error(this, s"exception while pulling new $description records: $e") -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].