Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167128485
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
    @@ -164,54 +213,43 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
         }
       }
     
    -  override def toString: String = s"TextSocketSource[host: $host, port: 
$port]"
    +  override def toString: String = s"TextSocketMicroBatchReader[host: 
$host, port: $port]"
     }
     
    -class TextSocketSourceProvider extends StreamSourceProvider with 
DataSourceRegister with Logging {
    -  private def parseIncludeTimestamp(params: Map[String, String]): Boolean 
= {
    -    Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {
    -      case Success(bool) => bool
    -      case Failure(_) =>
    -        throw new AnalysisException("includeTimestamp must be set to 
either \"true\" or \"false\"")
    -    }
    -  }
    +class TextSocketSourceProvider extends DataSourceV2
    +  with MicroBatchReadSupport with DataSourceRegister with Logging {
     
    -  /** Returns the name and schema of the source that can be used to 
continually read data. */
    -  override def sourceSchema(
    -      sqlContext: SQLContext,
    -      schema: Option[StructType],
    -      providerName: String,
    -      parameters: Map[String, String]): (String, StructType) = {
    +  private def checkParameters(params: Map[String, String]): Unit = {
         logWarning("The socket source should not be used for production 
applications! " +
           "It does not support recovery.")
    -    if (!parameters.contains("host")) {
    +    if (!params.contains("host")) {
           throw new AnalysisException("Set a host to read from with 
option(\"host\", ...).")
         }
    -    if (!parameters.contains("port")) {
    +    if (!params.contains("port")) {
           throw new AnalysisException("Set a port to read from with 
option(\"port\", ...).")
         }
    -    if (schema.nonEmpty) {
    -      throw new AnalysisException("The socket source does not support a 
user-specified schema.")
    +    Try {
    +      params.get("includeTimestamp")
    +        .orElse(params.get("includetimestamp"))
    +        .getOrElse("false")
    +        .toBoolean
    +    } match {
    +      case Success(_) =>
    +      case Failure(_) =>
    +        throw new AnalysisException("includeTimestamp must be set to 
either \"true\" or \"false\"")
         }
    -
    -    val sourceSchema =
    -      if (parseIncludeTimestamp(parameters)) {
    -        TextSocketSource.SCHEMA_TIMESTAMP
    -      } else {
    -        TextSocketSource.SCHEMA_REGULAR
    -      }
    -    ("textSocket", sourceSchema)
       }
     
    -  override def createSource(
    -      sqlContext: SQLContext,
    -      metadataPath: String,
    -      schema: Option[StructType],
    -      providerName: String,
    -      parameters: Map[String, String]): Source = {
    -    val host = parameters("host")
    -    val port = parameters("port").toInt
    -    new TextSocketSource(host, port, parseIncludeTimestamp(parameters), 
sqlContext)
    +  override def createMicroBatchReader(
    +      schema: Optional[StructType],
    +      checkpointLocation: String,
    +      options: DataSourceOptions): MicroBatchReader = {
    +    checkParameters(options.asMap().asScala.toMap)
    --- End diff --
    
    why not check it as DataSourceOptions (which is known to be 
case-insensitive) rather than a map which raises questions about case 
sensitivity?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to