Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22402#discussion_r217286827
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
---
@@ -598,18 +599,20 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
val join = values.join(values, "key")
- testStream(join)(
- makeSureGetOffsetCalled,
- AddKafkaData(Set(topic), 1, 2),
- CheckAnswer((1, 1, 1), (2, 2, 2)),
- AddKafkaData(Set(topic), 6, 3),
- CheckAnswer((1, 1, 1), (2, 2, 2), (3, 3, 3), (1, 6, 1), (1, 1, 6),
(1, 6, 6)),
- AssertOnQuery { q =>
- assert(q.availableOffsets.iterator.size == 1)
- assert(q.recentProgress.map(_.numInputRows).sum == 4)
- true
- }
- )
+ withSQLConf(SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
--- End diff --
turn off `EXCHANGE_REUSE_ENABLED`, to expose the self-join numRows double
count bug.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]