Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167124768
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends
KafkaSourceSuiteBase {
)
}
- testWithUninterruptibleThread(
- "deserialization of initial offset with Spark 2.1.0") {
- withTempDir { metadataPath =>
- val topic = newTopic
- testUtils.createTopic(topic, partitions = 3)
-
- val provider = new KafkaSourceProvider
- val parameters = Map(
- "kafka.bootstrap.servers" -> testUtils.brokerAddress,
- "subscribe" -> topic
- )
- val source = provider.createSource(spark.sqlContext,
metadataPath.getAbsolutePath, None,
- "", parameters)
- source.getOffset.get // Write initial offset
-
- // Make sure Spark 2.1.0 will throw an exception when reading the
new log
- intercept[java.lang.IllegalArgumentException] {
- // Simulate how Spark 2.1.0 reads the log
- Utils.tryWithResource(new
FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
- val length = in.read()
- val bytes = new Array[Byte](length)
- in.read(bytes)
- KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
- }
- }
- }
- }
-
- testWithUninterruptibleThread("deserialization of initial offset written
by Spark 2.1.0") {
+ test("deserialization of initial offset written by Spark 2.1.0") {
withTempDir { metadataPath =>
--- End diff --
Changed the two tests below to not use the source/reader directly (too
low-level implementation dependent test) to actually run a streaming query
using sample initial offset files in the `test/resources`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]