This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b1539ff  KAFKA-7250: switch scala transform to TransformSupplier 
(#5481)
b1539ff is described below

commit b1539ff62dd3e8eeffeae526a2cbd7e4cb63ecda
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Thu Aug 9 12:11:48 2018 -0500

    KAFKA-7250: switch scala transform to TransformSupplier (#5481)
    
    #5468 introduced a breaking API change that was actually avoidable. This PR 
re-introduces the old API as deprecated and alters the API introduced by #5468 
to be consistent with the other methods
    
    also, fixed misc syntax problems
---
 build.gradle                                       |  1 +
 .../kafka/streams/scala/kstream/KStream.scala      | 30 ++++++++++++----------
 .../apache/kafka/streams/scala/TopologyTest.scala  | 28 ++++++++++----------
 3 files changed, 31 insertions(+), 28 deletions(-)

diff --git a/build.gradle b/build.gradle
index 83b169b..7f5a1d9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1015,6 +1015,7 @@ project(':streams:streams-scala') {
 
     testCompile libs.junit
     testCompile libs.scalatest
+    testCompile libs.easymock
 
     testRuntime libs.slf4jlog4j
   }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index a8766bd..adc1850 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -22,7 +22,7 @@ package kstream
 
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
-import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, 
TopicNameExtractor}
+import org.apache.kafka.streams.processor.{Processor, ProcessorContext, 
ProcessorSupplier, TopicNameExtractor}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
@@ -31,8 +31,8 @@ import scala.collection.JavaConverters._
 /**
  * Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and 
delegates method calls to the underlying Java object.
  *
- * @param [K] Type of keys
- * @param [V] Type of values
+ * @tparam K Type of keys
+ * @tparam V Type of values
  * @param inner The underlying Java abstraction for KStream
  *
  * @see `org.apache.kafka.streams.kstream.KStream`
@@ -167,7 +167,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   def print(printed: Printed[K, V]): Unit = inner.print(printed)
 
   /**
-   * Perform an action on each record of 'KStream`
+   * Perform an action on each record of `KStream`
    *
    * @param action an action to perform on each record
    * @see `org.apache.kafka.streams.kstream.KStream#foreach`
@@ -176,14 +176,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.foreach((k: K, v: V) => action(k, v))
 
   /**
-   * Creates an array of {@code KStream} from this stream by branching the 
records in the original stream based on
+   * Creates an array of `KStream` from this stream by branching the records 
in the original stream based on
    * the supplied predicates.
    *
    * @param predicates the ordered list of functions that return a Boolean
    * @return multiple distinct substreams of this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#branch`
    */
-  def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] =
+  //noinspection ScalaUnnecessaryParentheses
+  def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] =
     inner.branch(predicates.map(_.asPredicate): _*).map(kstream => 
wrapKStream(kstream))
 
   /**
@@ -211,7 +212,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * }}}
    *
    * @param topic the topic name
-   * @param (implicit) produced the instance of Produced that gives the serdes 
and `StreamPartitioner`
+   * @param produced the instance of Produced that gives the serdes and 
`StreamPartitioner`
    * @return a [[KStream]] that contains the exact same (and potentially 
repartitioned) records as this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#through`
    */
@@ -243,7 +244,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * }}}
    *
    * @param topic the topic name
-   * @param (implicit) produced the instance of Produced that gives the serdes 
and `StreamPartitioner`
+   * @param produced the instance of Produced that gives the serdes and 
`StreamPartitioner`
    * @see `org.apache.kafka.streams.kstream.KStream#to`
    */
   def to(topic: String)(implicit produced: Produced[K, V]): Unit =
@@ -275,7 +276,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * }}}
    *
    * @param extractor the extractor to determine the name of the Kafka topic 
to write to for reach record
-   * @param (implicit) produced the instance of Produced that gives the serdes 
and `StreamPartitioner`
+   * @param produced the instance of Produced that gives the serdes and 
`StreamPartitioner`
    * @see `org.apache.kafka.streams.kstream.KStream#to`
    */
   def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, 
V]): Unit =
@@ -295,9 +296,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with new key and 
value (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transform`
    */
-  def transform[K1, V1](transformerSupplier: () => Transformer[K, V, 
KeyValue[K1, V1]],
+  def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, 
KeyValue[K1, V1]],
                         stateStoreNames: String*): KStream[K1, V1] =
-    inner.transform(transformerSupplier.asTransformerSupplier, 
stateStoreNames: _*)
+    inner.transform(transformerSupplier, stateStoreNames: _*)
 
   /**
    * Transform the value of each input record into a new value (with possible 
new type) of the output record.
@@ -337,11 +338,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * In order to assign a state, the state must be created and registered
    * beforehand via stores added via `addStateStore` or `addGlobalStore` 
before they can be connected to the `Transformer`
    *
-   * @param processorSupplier a function that generates a 
[[org.apache.kafka.stream.Processor]]
+   * @param processorSupplier a function that generates a 
[[org.apache.kafka.streams.processor.Processor]]
    * @param stateStoreNames   the names of the state store used by the 
processor
    * @see `org.apache.kafka.streams.kstream.KStream#process`
    */
   def process(processorSupplier: () => Processor[K, V], stateStoreNames: 
String*): Unit = {
+    //noinspection ConvertExpressionToSAM // because of the 2.11 build
     val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, 
V] {
       override def get(): Processor[K, V] = processorSupplier()
     }
@@ -374,7 +376,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * // to the groupByKey call
    * }}}
    *
-   * @param (implicit) serialized the instance of Serialized that gives the 
serdes
+   * @param serialized the instance of Serialized that gives the serdes
    * @return a [[KGroupedStream]] that contains the grouped records of the 
original [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
    */
@@ -564,7 +566,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   def merge(stream: KStream[K, V]): KStream[K, V] = inner.merge(stream.inner)
 
   /**
-   * Perform an action on each record of {@code KStream}.
+   * Perform an action on each record of `KStream`.
    * <p>
    * Peek is a non-terminal operation that triggers a side effect (such as 
logging or statistics collection)
    * and returns an unchanged stream.
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 8a0eabb..b596dd3 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -21,19 +21,16 @@ package org.apache.kafka.streams.scala
 
 import java.util.regex.Pattern
 
-import org.scalatest.junit.JUnitSuite
-import org.junit.Assert._
-import org.junit._
-
+import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, 
KStream => KStreamJ, KTable => KTableJ, _}
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.kstream._
-
-import ImplicitConversions._
-
 import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
-import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => 
KStreamJ, KGroupedStream => KGroupedStreamJ, _}
-import org.apache.kafka.streams.processor.ProcessorContext
+import org.junit.Assert._
+import org.junit._
+import org.scalatest.junit.JUnitSuite
 
-import collection.JavaConverters._
+import _root_.scala.collection.JavaConverters._
 
 /**
  * Test suite that verifies that the topology built by the Java and Scala APIs 
match.
@@ -207,17 +204,20 @@ class TopologyTest extends JUnitSuite {
       val streamBuilder = new StreamsBuilder
       val textLines = streamBuilder.stream[String, String](inputTopic)
 
+      //noinspection ConvertExpressionToSAM due to 2.11 build
       val _: KTable[String, Long] =
         textLines
-          .transform(
-            () =>
+          .transform(new TransformerSupplier[String, String, KeyValue[String, 
String]] {
+            override def get(): Transformer[String, String, KeyValue[String, 
String]] =
               new Transformer[String, String, KeyValue[String, String]] {
                 override def init(context: ProcessorContext): Unit = Unit
+
                 override def transform(key: String, value: String): 
KeyValue[String, String] =
                   new KeyValue(key, value.toLowerCase)
+
                 override def close(): Unit = Unit
-            }
-          )
+              }
+          })
           .groupBy((k, v) => v)
           .count()
 

Reply via email to