Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21199#discussion_r206388466
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
---
@@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with
SharedSQLContext with Before
}
}
+ test("continuous data") {
+ serverThread = new ServerThread()
+ serverThread.start()
+
+ val reader = new TextSocketContinuousReader(
+ new DataSourceOptions(Map("numPartitions" -> "2", "host" ->
"localhost",
+ "port" -> serverThread.port.toString).asJava))
+ reader.setStartOffset(Optional.empty())
+ val tasks = reader.planRowInputPartitions()
+ assert(tasks.size == 2)
+
+ val numRecords = 10
+ val data = scala.collection.mutable.ListBuffer[Int]()
+ val offsets = scala.collection.mutable.ListBuffer[Int]()
+ import org.scalatest.time.SpanSugar._
+ failAfter(5 seconds) {
+ // inject rows, read and check the data and offsets
--- End diff --
Maybe adding more line comments in code block would help understanding the
test code easier, like intentionally committing in the middle of range, etc.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]