cloud-fan commented on a change in pull request #23619: [SPARK-26695][SQL] data 
source v2 API refactor - continuous read
URL: https://github.com/apache/spark/pull/23619#discussion_r250042260
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ##########
 @@ -31,37 +31,29 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.streaming.{Offset => _, _}
 import org.apache.spark.sql.execution.streaming.sources.TextSocketReader
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.RpcUtils
 
 
 /**
- * A ContinuousReadSupport that reads text lines through a TCP socket, 
designed only for tutorials
- * and debugging. This ContinuousReadSupport will *not* work in production 
applications due to
+ * A [[ContinuousStream]] that reads text lines through a TCP socket, designed 
only for tutorials
+ * and debugging. This ContinuousStream will *not* work in production 
applications due to
  * multiple reasons, including no support for fault recovery.
  *
  * The driver maintains a socket connection to the host-port, keeps the 
received messages in
  * buckets and serves the messages to the executors via a RPC endpoint.
  */
-class TextSocketContinuousReadSupport(options: DataSourceOptions)
-  extends ContinuousReadSupport with Logging {
+class TextSocketContinuousStream(
+    host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
+  extends ContinuousStream with Logging {
 
   implicit val defaultFormats: DefaultFormats = DefaultFormats
 
-  private val host: String = options.get("host").get()
-  private val port: Int = options.get("port").get().toInt
-
-  assert(SparkSession.getActiveSession.isDefined)
-  private val spark = SparkSession.getActiveSession.get
-  private val numPartitions = spark.sparkContext.defaultParallelism
 
 Review comment:
   see https://github.com/apache/spark/pull/23619/files#r250041943

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to