Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r206388213
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
    @@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
         }
       }
     
    +  test("continuous data") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val reader = new TextSocketContinuousReader(
    +      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
    +        "port" -> serverThread.port.toString).asJava))
    +    reader.setStartOffset(Optional.empty())
    +    val tasks = reader.planRowInputPartitions()
    +    assert(tasks.size == 2)
    +
    +    val numRecords = 10
    +    val data = scala.collection.mutable.ListBuffer[Int]()
    +    val offsets = scala.collection.mutable.ListBuffer[Int]()
    +    import org.scalatest.time.SpanSugar._
    +    failAfter(5 seconds) {
    +      // inject rows, read and check the data and offsets
    +      for (i <- 0 until numRecords) {
    +        serverThread.enqueue(i.toString)
    +      }
    +      tasks.asScala.foreach {
    +        case t: TextSocketContinuousInputPartition =>
    +          val r = 
t.createPartitionReader().asInstanceOf[TextSocketContinuousInputPartitionReader]
    +          for (i <- 0 until numRecords / 2) {
    +            r.next()
    +            
offsets.append(r.getOffset().asInstanceOf[ContinuousRecordPartitionOffset].offset)
    +            data.append(r.get().getString(0).toInt)
    +            if (i == 2) {
    +              commitOffset(t.partitionId, i + 1)
    +            }
    +          }
    +          assert(offsets.toSeq == Range.inclusive(1, 5))
    +          assert(data.toSeq == Range(t.partitionId, 10, 2))
    +          offsets.clear()
    +          data.clear()
    +        case _ => throw new IllegalStateException("Unexpected task type")
    +      }
    +      assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets 
== List(3, 3))
    +      reader.commit(TextSocketOffset(List(5, 5)))
    +      assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets 
== List(5, 5))
    +    }
    +
    +    def commitOffset(partition: Int, offset: Int): Unit = {
    +      val offsetsToCommit = 
reader.getStartOffset.asInstanceOf[TextSocketOffset]
    +        .offsets.updated(partition, offset)
    +      reader.commit(TextSocketOffset(offsetsToCommit))
    +      assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets 
== offsetsToCommit)
    +    }
    +  }
    +
    +  test("continuous data - invalid commit") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val reader = new TextSocketContinuousReader(
    +      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
    +        "port" -> serverThread.port.toString).asJava))
    +    reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5))))
    +    // ok to commit same offset
    +    reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5))))
    +    assertThrows[IllegalStateException] {
    +      reader.commit(TextSocketOffset(List(6, 6)))
    +    }
    +  }
    +
    +  test("continuous data with timestamp") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val reader = new TextSocketContinuousReader(
    +      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
    +        "includeTimestamp" -> "true",
    +        "port" -> serverThread.port.toString).asJava))
    +    reader.setStartOffset(Optional.empty())
    +    val tasks = reader.planRowInputPartitions()
    +    assert(tasks.size == 2)
    +
    +    val numRecords = 4
    +    import org.apache.spark.sql.Row
    --- End diff --
    
    Looks like unused import


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to