Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206388213 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("continuous data") { + serverThread = new ServerThread() + serverThread.start() + + val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", + "port" -> serverThread.port.toString).asJava)) + reader.setStartOffset(Optional.empty()) + val tasks = reader.planRowInputPartitions() + assert(tasks.size == 2) + + val numRecords = 10 + val data = scala.collection.mutable.ListBuffer[Int]() + val offsets = scala.collection.mutable.ListBuffer[Int]() + import org.scalatest.time.SpanSugar._ + failAfter(5 seconds) { + // inject rows, read and check the data and offsets + for (i <- 0 until numRecords) { + serverThread.enqueue(i.toString) + } + tasks.asScala.foreach { + case t: TextSocketContinuousInputPartition => + val r = t.createPartitionReader().asInstanceOf[TextSocketContinuousInputPartitionReader] + for (i <- 0 until numRecords / 2) { + r.next() + offsets.append(r.getOffset().asInstanceOf[ContinuousRecordPartitionOffset].offset) + data.append(r.get().getString(0).toInt) + if (i == 2) { + commitOffset(t.partitionId, i + 1) + } + } + assert(offsets.toSeq == Range.inclusive(1, 5)) + assert(data.toSeq == Range(t.partitionId, 10, 2)) + offsets.clear() + data.clear() + case _ => throw new IllegalStateException("Unexpected task type") + } + assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == List(3, 3)) + reader.commit(TextSocketOffset(List(5, 5))) + assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == List(5, 5)) + } + + def commitOffset(partition: Int, offset: Int): Unit = { + val offsetsToCommit = reader.getStartOffset.asInstanceOf[TextSocketOffset] + .offsets.updated(partition, offset) + reader.commit(TextSocketOffset(offsetsToCommit)) + assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == offsetsToCommit) + } + } + + test("continuous data - invalid commit") { + serverThread = new ServerThread() + serverThread.start() + + val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", + "port" -> serverThread.port.toString).asJava)) + reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5)))) + // ok to commit same offset + reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5)))) + assertThrows[IllegalStateException] { + reader.commit(TextSocketOffset(List(6, 6))) + } + } + + test("continuous data with timestamp") { + serverThread = new ServerThread() + serverThread.start() + + val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", + "includeTimestamp" -> "true", + "port" -> serverThread.port.toString).asJava)) + reader.setStartOffset(Optional.empty()) + val tasks = reader.planRowInputPartitions() + assert(tasks.size == 2) + + val numRecords = 4 + import org.apache.spark.sql.Row --- End diff -- Looks like unused import
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org