Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167127039
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
    @@ -103,23 +111,40 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
         readThread.start()
       }
     
    -  /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    +  override def setOffsetRange(
    +      start: Optional[Offset],
    +      end: Optional[Offset]): Unit = synchronized {
    +    startOffset = start.orElse(LongOffset(-1L))
    +    endOffset = end.orElse(currentOffset)
    +  }
     
    -  override def getOffset: Option[Offset] = synchronized {
    -    if (currentOffset.offset == -1) {
    -      None
    +  override def getStartOffset(): Offset = {
    +    Option(startOffset).getOrElse(throw new IllegalStateException("start 
offset not set"))
    +  }
    +
    +  override def getEndOffset(): Offset = {
    +    Option(endOffset).getOrElse(throw new IllegalStateException("end 
offset not set"))
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    LongOffset(json.toLong)
    +  }
    +
    +  override def readSchema(): StructType = {
    +    val includeTimestamp = options.getBoolean("includeTimestamp", false)
    --- End diff --
    
    supernit: is there need for a variable here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to