Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21199#discussion_r206388213
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
---
@@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with
SharedSQLContext with Before
}
}
+ test("continuous data") {
+ serverThread = new ServerThread()
+ serverThread.start()
+
+ val reader = new TextSocketContinuousReader(
+ new DataSourceOptions(Map("numPartitions" -> "2", "host" ->
"localhost",
+ "port" -> serverThread.port.toString).asJava))
+ reader.setStartOffset(Optional.empty())
+ val tasks = reader.planRowInputPartitions()
+ assert(tasks.size == 2)
+
+ val numRecords = 10
+ val data = scala.collection.mutable.ListBuffer[Int]()
+ val offsets = scala.collection.mutable.ListBuffer[Int]()
+ import org.scalatest.time.SpanSugar._
+ failAfter(5 seconds) {
+ // inject rows, read and check the data and offsets
+ for (i <- 0 until numRecords) {
+ serverThread.enqueue(i.toString)
+ }
+ tasks.asScala.foreach {
+ case t: TextSocketContinuousInputPartition =>
+ val r =
t.createPartitionReader().asInstanceOf[TextSocketContinuousInputPartitionReader]
+ for (i <- 0 until numRecords / 2) {
+ r.next()
+
offsets.append(r.getOffset().asInstanceOf[ContinuousRecordPartitionOffset].offset)
+ data.append(r.get().getString(0).toInt)
+ if (i == 2) {
+ commitOffset(t.partitionId, i + 1)
+ }
+ }
+ assert(offsets.toSeq == Range.inclusive(1, 5))
+ assert(data.toSeq == Range(t.partitionId, 10, 2))
+ offsets.clear()
+ data.clear()
+ case _ => throw new IllegalStateException("Unexpected task type")
+ }
+ assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets
== List(3, 3))
+ reader.commit(TextSocketOffset(List(5, 5)))
+ assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets
== List(5, 5))
+ }
+
+ def commitOffset(partition: Int, offset: Int): Unit = {
+ val offsetsToCommit =
reader.getStartOffset.asInstanceOf[TextSocketOffset]
+ .offsets.updated(partition, offset)
+ reader.commit(TextSocketOffset(offsetsToCommit))
+ assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets
== offsetsToCommit)
+ }
+ }
+
+ test("continuous data - invalid commit") {
+ serverThread = new ServerThread()
+ serverThread.start()
+
+ val reader = new TextSocketContinuousReader(
+ new DataSourceOptions(Map("numPartitions" -> "2", "host" ->
"localhost",
+ "port" -> serverThread.port.toString).asJava))
+ reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5))))
+ // ok to commit same offset
+ reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5))))
+ assertThrows[IllegalStateException] {
+ reader.commit(TextSocketOffset(List(6, 6)))
+ }
+ }
+
+ test("continuous data with timestamp") {
+ serverThread = new ServerThread()
+ serverThread.start()
+
+ val reader = new TextSocketContinuousReader(
+ new DataSourceOptions(Map("numPartitions" -> "2", "host" ->
"localhost",
+ "includeTimestamp" -> "true",
+ "port" -> serverThread.port.toString).asJava))
+ reader.setStartOffset(Optional.empty())
+ val tasks = reader.planRowInputPartitions()
+ assert(tasks.size == 2)
+
+ val numRecords = 4
+ import org.apache.spark.sql.Row
--- End diff --
Looks like unused import
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]