[ https://issues.apache.org/jira/browse/KAFKA-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716720#comment-16716720 ]
ASF GitHub Bot commented on KAFKA-7708: --------------------------------------- lodamar closed pull request #6019: KAFKA-7708: Fixed KTable tests using KStream API in scala tests URL: https://github.com/apache/kafka/pull/6019 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala index dc080f13310..0ef50e383c9 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala @@ -34,26 +34,32 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic = "source" val sinkTopic = "sink" - val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count() - table.filter((_, value) => value > 1).toStream.to(sinkTopic) + val table = builder.table[String, String](sourceTopic) + table.mapValues(_.length).filter((_, value) => value > 5).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) { - testDriver.pipeRecord(sourceTopic, ("1", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "firstvalue")) + val record = testDriver.readRecord[String, Int](sinkTopic) record.key shouldBe "1" - record.value shouldBe (null: java.lang.Long) + record.value shouldBe 10 + } + { + testDriver.pipeRecord(sourceTopic, ("1", "secondvalue")) + val record = testDriver.readRecord[String, Int](sinkTopic) + record.key shouldBe "1" + record.value shouldBe 11 } { - testDriver.pipeRecord(sourceTopic, ("1", "value2")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "short")) + val record = testDriver.readRecord[String, Int](sinkTopic) record.key shouldBe "1" - record.value shouldBe 2 + record.value shouldBe (null: java.lang.Long) } { - testDriver.pipeRecord(sourceTopic, ("2", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("2", "val3")) + val record = testDriver.readRecord[String, Int](sinkTopic) record.key shouldBe "2" record.value shouldBe (null: java.lang.Long) } @@ -67,30 +73,36 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic = "source" val sinkTopic = "sink" - val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count() - table.filterNot((_, value) => value > 1).toStream.to(sinkTopic) + val table = builder.table[String, String](sourceTopic) + table.filterNot((_, value) => value.exists(_.isUpper)).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) { - testDriver.pipeRecord(sourceTopic, ("1", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "FirstValue")) + val record = testDriver.readRecord[String, String](sinkTopic) record.key shouldBe "1" - record.value shouldBe 1 + record.value shouldBe (null: java.lang.String) } { - testDriver.pipeRecord(sourceTopic, ("1", "value2")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "secondvalue")) + val record = testDriver.readRecord[String, String](sinkTopic) record.key shouldBe "1" - record.value shouldBe (null: java.lang.Long) + record.value shouldBe "secondvalue" } { - testDriver.pipeRecord(sourceTopic, ("2", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "Short")) + val record = testDriver.readRecord[String, String](sinkTopic) + record.key shouldBe "1" + record.value shouldBe (null: java.lang.String) + } + { + testDriver.pipeRecord(sourceTopic, ("2", "val")) + val record = testDriver.readRecord[String, String](sinkTopic) record.key shouldBe "2" - record.value shouldBe 1 + record.value shouldBe "val" } - testDriver.readRecord[String, Long](sinkTopic) shouldBe null + testDriver.readRecord[String, String](sinkTopic) shouldBe null testDriver.close() } @@ -101,17 +113,17 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic2 = "source2" val sinkTopic = "sink" - val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count() - val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count() + val table1 = builder.table[String, Int](sourceTopic1) + val table2 = builder.table[String, Int](sourceTopic2) table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) - testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1")) - testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1")) - testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2 + testDriver.pipeRecord(sourceTopic1, ("1", 3)) + testDriver.pipeRecord(sourceTopic2, ("1", 2)) + testDriver.readRecord[String, Int](sinkTopic).value shouldBe 5 - testDriver.readRecord[String, Long](sinkTopic) shouldBe null + testDriver.readRecord[String, Int](sinkTopic) shouldBe null testDriver.close() } @@ -122,20 +134,20 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic2 = "source2" val sinkTopic = "sink" val stateStore = "store" - val materialized = Materialized.as[String, Long, ByteArrayKeyValueStore](stateStore) + val materialized = Materialized.as[String, Int, ByteArrayKeyValueStore](stateStore) - val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count() - val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count() + val table1 = builder.table[String, Int](sourceTopic1) + val table2 = builder.table[String, Int](sourceTopic2) table1.join(table2, materialized)((a, b) => a + b).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) - testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1")) - testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1")) - testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2 - testDriver.getKeyValueStore[String, Long](stateStore).get("1") shouldBe 2 + testDriver.pipeRecord(sourceTopic1, ("1", 1)) + testDriver.pipeRecord(sourceTopic2, ("1", 3)) + testDriver.readRecord[String, Int](sinkTopic).value shouldBe 4 + testDriver.getKeyValueStore[String, Int](stateStore).get("1") shouldBe 4 - testDriver.readRecord[String, Long](sinkTopic) shouldBe null + testDriver.readRecord[String, Int](sinkTopic) shouldBe null testDriver.close() } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > [kafka-streams-scala] Invalid signature for KTable join in 2.12 > --------------------------------------------------------------- > > Key: KAFKA-7708 > URL: https://issues.apache.org/jira/browse/KAFKA-7708 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Edmondo Porcu > Priority: Major > Labels: scala > > The signature in Scala 2.12 for the join in the > org.kafka.streams.scala.streams.KTable cannot be resolved by the compiler, > probably due to the way parameters lists are handled by the compiler . > See: > > [https://github.com/scala/bug/issues/11288] > [https://stackoverflow.com/questions/53615950/scalac-2-12-fails-to-resolve-overloaded-methods-with-multiple-argument-lists-whe] > > We are wondering how this is not captured by the current build of Kafka, we > are building on 2.12.7 as well -- This message was sent by Atlassian JIRA (v7.6.3#76005)