Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/22009#discussion_r211739221
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
---
@@ -47,7 +47,9 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
- case StreamingDataSourceV2Relation(_, _, _, r:
KafkaContinuousReader) => r
+ case r: StreamingDataSourceV2Relation
+ if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
+
r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig]
--- End diff --
I think this logic is subtly incorrect (and what's causing the flakiness in
the continuous test). It needs to get the actual scan config being used from
DataSourceV2ScanExec in the physical plan; r.scanConfigBuilder.build() will
always produce the most up-to-date `knownPartitions` value.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]