This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b1539ff KAFKA-7250: switch scala transform to TransformSupplier (#5481) b1539ff is described below commit b1539ff62dd3e8eeffeae526a2cbd7e4cb63ecda Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Thu Aug 9 12:11:48 2018 -0500 KAFKA-7250: switch scala transform to TransformSupplier (#5481) #5468 introduced a breaking API change that was actually avoidable. This PR re-introduces the old API as deprecated and alters the API introduced by #5468 to be consistent with the other methods also, fixed misc syntax problems --- build.gradle | 1 + .../kafka/streams/scala/kstream/KStream.scala | 30 ++++++++++++---------- .../apache/kafka/streams/scala/TopologyTest.scala | 28 ++++++++++---------- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/build.gradle b/build.gradle index 83b169b..7f5a1d9 100644 --- a/build.gradle +++ b/build.gradle @@ -1015,6 +1015,7 @@ project(':streams:streams-scala') { testCompile libs.junit testCompile libs.scalatest + testCompile libs.easymock testRuntime libs.slf4jlog4j } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index a8766bd..adc1850 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -22,7 +22,7 @@ package kstream import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _} -import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor} +import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier, TopicNameExtractor} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionConversions._ @@ -31,8 +31,8 @@ import scala.collection.JavaConverters._ /** * Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and delegates method calls to the underlying Java object. * - * @param [K] Type of keys - * @param [V] Type of values + * @tparam K Type of keys + * @tparam V Type of values * @param inner The underlying Java abstraction for KStream * * @see `org.apache.kafka.streams.kstream.KStream` @@ -167,7 +167,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def print(printed: Printed[K, V]): Unit = inner.print(printed) /** - * Perform an action on each record of 'KStream` + * Perform an action on each record of `KStream` * * @param action an action to perform on each record * @see `org.apache.kafka.streams.kstream.KStream#foreach` @@ -176,14 +176,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { inner.foreach((k: K, v: V) => action(k, v)) /** - * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on + * Creates an array of `KStream` from this stream by branching the records in the original stream based on * the supplied predicates. * * @param predicates the ordered list of functions that return a Boolean * @return multiple distinct substreams of this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#branch` */ - def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] = + //noinspection ScalaUnnecessaryParentheses + def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] = inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream)) /** @@ -211,7 +212,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * }}} * * @param topic the topic name - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#through` */ @@ -243,7 +244,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * }}} * * @param topic the topic name - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @see `org.apache.kafka.streams.kstream.KStream#to` */ def to(topic: String)(implicit produced: Produced[K, V]): Unit = @@ -275,7 +276,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * }}} * * @param extractor the extractor to determine the name of the Kafka topic to write to for reach record - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @see `org.apache.kafka.streams.kstream.KStream#to` */ def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit = @@ -295,9 +296,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transform` */ - def transform[K1, V1](transformerSupplier: () => Transformer[K, V, KeyValue[K1, V1]], + def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]], stateStoreNames: String*): KStream[K1, V1] = - inner.transform(transformerSupplier.asTransformerSupplier, stateStoreNames: _*) + inner.transform(transformerSupplier, stateStoreNames: _*) /** * Transform the value of each input record into a new value (with possible new type) of the output record. @@ -337,11 +338,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * In order to assign a state, the state must be created and registered * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` * - * @param processorSupplier a function that generates a [[org.apache.kafka.stream.Processor]] + * @param processorSupplier a function that generates a [[org.apache.kafka.streams.processor.Processor]] * @param stateStoreNames the names of the state store used by the processor * @see `org.apache.kafka.streams.kstream.KStream#process` */ def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit = { + //noinspection ConvertExpressionToSAM // because of the 2.11 build val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] { override def get(): Processor[K, V] = processorSupplier() } @@ -374,7 +376,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * // to the groupByKey call * }}} * - * @param (implicit) serialized the instance of Serialized that gives the serdes + * @param serialized the instance of Serialized that gives the serdes * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#groupByKey` */ @@ -564,7 +566,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def merge(stream: KStream[K, V]): KStream[K, V] = inner.merge(stream.inner) /** - * Perform an action on each record of {@code KStream}. + * Perform an action on each record of `KStream`. * <p> * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) * and returns an unchanged stream. diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 8a0eabb..b596dd3 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -21,19 +21,16 @@ package org.apache.kafka.streams.scala import java.util.regex.Pattern -import org.scalatest.junit.JUnitSuite -import org.junit.Assert._ -import org.junit._ - +import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KStream => KStreamJ, KTable => KTableJ, _} +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.kstream._ - -import ImplicitConversions._ - import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _} -import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _} -import org.apache.kafka.streams.processor.ProcessorContext +import org.junit.Assert._ +import org.junit._ +import org.scalatest.junit.JUnitSuite -import collection.JavaConverters._ +import _root_.scala.collection.JavaConverters._ /** * Test suite that verifies that the topology built by the Java and Scala APIs match. @@ -207,17 +204,20 @@ class TopologyTest extends JUnitSuite { val streamBuilder = new StreamsBuilder val textLines = streamBuilder.stream[String, String](inputTopic) + //noinspection ConvertExpressionToSAM due to 2.11 build val _: KTable[String, Long] = textLines - .transform( - () => + .transform(new TransformerSupplier[String, String, KeyValue[String, String]] { + override def get(): Transformer[String, String, KeyValue[String, String]] = new Transformer[String, String, KeyValue[String, String]] { override def init(context: ProcessorContext): Unit = Unit + override def transform(key: String, value: String): KeyValue[String, String] = new KeyValue(key, value.toLowerCase) + override def close(): Unit = Unit - } - ) + } + }) .groupBy((k, v) => v) .count()