pengzhon-db commented on code in PR #40586:
URL: https://github.com/apache/spark/pull/40586#discussion_r1154944439
##########
python/pyspark/sql/connect/client.py:
##########
@@ -949,6 +949,10 @@ def _execute_and_fetch_as_iterator(
if b.HasField("sql_command_result"):
logger.debug("Received the SQL command result.")
yield {"sql_command_result":
b.sql_command_result.relation}
+ if b.HasField("write_stream_operation_start_result"):
+ yield {"write_stream_operation_start_result":
b.write_stream_operation_start_result}
Review Comment:
Do u want to add a `logger.debug` as other cases?
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,118 @@ message WriteOperationV2 {
// (Optional) A condition for overwrite saving mode
Expression overwrite_condition = 8;
}
+
+// Starts write stream operation as streaming query. Query ID and Run ID of
the streaming
+// query are returned.
+message WriteStreamOperationStart {
+
+ // (Required) The output of the `input` streaming relation will be written.
+ Relation input = 1;
+
+ // The following fields directly map to API for DataStreamWriter().
+ // Consult API documentation unless explicitly documented here.
+
+ string format = 2;
+ map<string, string> options = 3;
+ repeated string partitioning_column_names = 4;
+
+ oneof trigger {
+ string processing_time_interval = 5;
+ bool available_now = 6;
+ bool one_time = 7;
+ string continuous_checkpoint_interval = 8;
+ }
+
+ string output_mode = 9;
+ string query_name = 10;
+
+ // The destination is optional. When set, it can be a path or a table name.
+ oneof sink_destination {
+ string path = 11;
+ string table_name = 12;
+ }
+}
+
+message WriteStreamOperationStartResult {
+
+ // (Required)
+ string query_id = 1;
+
+ // (Required)
+ string run_id = 2;
+
+ // An optional query name.
+ string name = 3;
+
+ // TODO: How do we indicate errors?
+ // TODO: Consider adding StreamingQueryStatusResult here.
+}
+
+// Commands for a streaming query.
+message StreamingQueryCommand {
+
+ // (Required) query id of the streaming query.
+ string query_id = 1;
+ // (Required) run id of the streaming query.
+ string run_id = 2;
+
+ // A running query is identified by both run_id and query_id.
+
+ oneof command_type {
+ // Status of the query. Used to support multiple status related API like
lastProgress().
+ StatusCommand status = 3;
+ // Stops the query.
+ bool stop = 4;
+ // Waits till all the available data is processed. See
processAllAvailable() API doc.
+ bool process_all_available = 5;
+ // Returns logical and physical plans.
+ ExplainCommand explain = 6;
+
+ // TODO(SPARK-42960) Add more commands: await_termination(), exception()
etc.
+ }
+
+ message StatusCommand {
+ // A limit on how many progress reports to return.
+ int32 recent_progress_limit = 1;
+ }
+
+ message ExplainCommand {
+ // TODO: Consider reusing Explain from AnalyzePlanRequest message.
+ // We can not do this right now since it base.proto imports this
file.
+ bool extended = 1;
+ }
+
+}
+
+// Response for commands on a streaming query.
+message StreamingQueryCommandResult {
+ // (Required)
+ string query_id = 1;
+
+ oneof result_type {
+ StatusResult status = 2;
+ ExplainResult explain = 3;
+ }
+
+ message StatusResult {
+ // This status includes all the available to status, including progress
messages.
+
+ // Fields from Scala 'StreamingQueryStatus' struct
+ string status_message = 1;
+ bool is_data_available = 2;
+ bool is_trigger_active = 3;
+
+ bool is_active = 4;
+
+ // Progress reports as an array of json strings.
+ repeated string recent_progress_json = 5;
Review Comment:
would it be better to separate progress with status? The status of a query
only contains first 3 fields
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,118 @@ message WriteOperationV2 {
// (Optional) A condition for overwrite saving mode
Expression overwrite_condition = 8;
}
+
+// Starts write stream operation as streaming query. Query ID and Run ID of
the streaming
+// query are returned.
+message WriteStreamOperationStart {
+
+ // (Required) The output of the `input` streaming relation will be written.
+ Relation input = 1;
+
+ // The following fields directly map to API for DataStreamWriter().
+ // Consult API documentation unless explicitly documented here.
+
+ string format = 2;
+ map<string, string> options = 3;
+ repeated string partitioning_column_names = 4;
+
+ oneof trigger {
+ string processing_time_interval = 5;
+ bool available_now = 6;
+ bool one_time = 7;
+ string continuous_checkpoint_interval = 8;
+ }
+
+ string output_mode = 9;
+ string query_name = 10;
+
+ // The destination is optional. When set, it can be a path or a table name.
+ oneof sink_destination {
+ string path = 11;
+ string table_name = 12;
+ }
+}
+
+message WriteStreamOperationStartResult {
+
+ // (Required)
+ string query_id = 1;
+
+ // (Required)
+ string run_id = 2;
+
+ // An optional query name.
+ string name = 3;
+
+ // TODO: How do we indicate errors?
+ // TODO: Consider adding StreamingQueryStatusResult here.
+}
+
+// Commands for a streaming query.
+message StreamingQueryCommand {
+
+ // (Required) query id of the streaming query.
+ string query_id = 1;
+ // (Required) run id of the streaming query.
+ string run_id = 2;
+
+ // A running query is identified by both run_id and query_id.
+
+ oneof command_type {
+ // Status of the query. Used to support multiple status related API like
lastProgress().
+ StatusCommand status = 3;
+ // Stops the query.
+ bool stop = 4;
+ // Waits till all the available data is processed. See
processAllAvailable() API doc.
+ bool process_all_available = 5;
+ // Returns logical and physical plans.
+ ExplainCommand explain = 6;
+
+ // TODO(SPARK-42960) Add more commands: await_termination(), exception()
etc.
+ }
+
+ message StatusCommand {
+ // A limit on how many progress reports to return.
+ int32 recent_progress_limit = 1;
+ }
+
+ message ExplainCommand {
+ // TODO: Consider reusing Explain from AnalyzePlanRequest message.
+ // We can not do this right now since it base.proto imports this
file.
+ bool extended = 1;
+ }
+
+}
+
+// Response for commands on a streaming query.
+message StreamingQueryCommandResult {
+ // (Required)
+ string query_id = 1;
Review Comment:
why the other response has `run_id` but this one doesn't?
##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -154,6 +158,9 @@ message Read {
//
// This is only supported by the JDBC data source.
repeated string predicates = 5;
+
+ // (Optional) Source table name for a streaming read. Not used in batch
read.
+ string streaming_table_name = 6;
Review Comment:
I think batch can also read from a table. How does that work? Can we reuse
that?
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1969,6 +2017,142 @@ class SparkConnectPlanner(val session: SparkSession) {
}
}
+ def handleWriteStreamOperationStart(
+ writeOp: WriteStreamOperationStart,
+ sessionId: String,
+ responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+ val plan = transformRelation(writeOp.getInput)
+ val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+ val writer = dataset.writeStream
+
+ if (writeOp.getFormat.nonEmpty) {
+ writer.format(writeOp.getFormat)
+ }
+
+ writer.options(writeOp.getOptionsMap)
+
+ if (writeOp.getPartitioningColumnNamesCount > 0) {
+
writer.partitionBy(writeOp.getPartitioningColumnNamesList.asScala.toList: _*)
+ }
+
+ writeOp.getTriggerCase match {
+ case TriggerCase.PROCESSING_TIME_INTERVAL =>
+
writer.trigger(Trigger.ProcessingTime(writeOp.getProcessingTimeInterval))
+ case TriggerCase.AVAILABLE_NOW =>
+ writer.trigger(Trigger.AvailableNow())
+ case TriggerCase.ONE_TIME =>
+ writer.trigger(Trigger.Once())
+ case TriggerCase.CONTINUOUS_CHECKPOINT_INTERVAL =>
+
writer.trigger(Trigger.Continuous(writeOp.getContinuousCheckpointInterval))
+ case TriggerCase.TRIGGER_NOT_SET =>
+ }
+
+ if (writeOp.getOutputMode.nonEmpty) {
+ writer.outputMode(writeOp.getOutputMode)
+ }
+
+ if (writeOp.getQueryName.nonEmpty) {
+ writer.queryName(writeOp.getQueryName)
+ }
+
+ val query = writeOp.getPath match {
+ case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+ case "" => writer.start()
+ case path => writer.start(path)
+ }
+
+ val result = WriteStreamOperationStartResult
+ .newBuilder()
+ .setQueryId(query.id.toString)
+ .setRunId(query.runId.toString)
+ .setName(Option(query.name).getOrElse(""))
+ .build()
+
+ responseObserver.onNext(
+ ExecutePlanResponse
+ .newBuilder()
+ .setSessionId(sessionId)
+ .setWriteStreamOperationStartResult(result)
+ .build())
+ }
+
+ def handleStreamingQueryCommand(
+ command: StreamingQueryCommand,
+ sessionId: String,
+ responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+
+ val queryId = command.getQueryId
+
+ val respBuilder = StreamingQueryCommandResult
+ .newBuilder()
+ .setQueryId(command.getQueryId)
+
+ val query = Option(session.streams.get(queryId)) match {
+ case Some(query) if query.runId.toString == command.getRunId =>
+ query
+ case Some(query) =>
+ throw new IllegalArgumentException(
+ s"Run id mismatch for query id $queryId. Run id in the request
${command.getRunId} " +
+ s"does not match one on the server ${query.runId}. The query might
have restarted.")
+ case None =>
+ throw new IllegalArgumentException(s"Streaming query $queryId is not
found")
+ // TODO(SPARK-42962): Handle this better. May be cache stopped queries
for a few minutes.
+ }
+
+ command.getCommandTypeCase match {
+ case StreamingQueryCommand.CommandTypeCase.STATUS =>
+ val recentProgress: Seq[String] =
command.getStatus.getRecentProgressLimit match {
+ case 0 => Seq.empty
+ case limit if limit < 0 =>
+ query.recentProgress.map(_.json) // All the cached progresses.
+ case limit => query.recentProgress.takeRight(limit).map(_.json) //
Most recent
+ }
+
+ val queryStatus = query.status
+
+ val statusResult = StreamingQueryCommandResult.StatusResult
+ .newBuilder()
+ .setStatusMessage(queryStatus.message)
+ .setIsDataAvailable(queryStatus.isDataAvailable)
+ .setIsTriggerActive(queryStatus.isTriggerActive)
+ .setIsActive(query.isActive)
+ .addAllRecentProgressJson(recentProgress.asJava)
+ .build()
+
+ respBuilder.setStatus(statusResult)
+
+ case StreamingQueryCommand.CommandTypeCase.STOP =>
+ query.stop()
+
+ case StreamingQueryCommand.CommandTypeCase.PROCESS_ALL_AVAILABLE =>
+ query.processAllAvailable()
Review Comment:
This method may take a long time to return if there is continually arriving
data. How to handle that? Will it time out the rpc call?
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -816,6 +836,27 @@ class SparkConnectPlanner(val session: SparkSession) {
s"Predicates are not supported for ${rel.getDataSource.getFormat}
data sources.")
}
+ case proto.Read.ReadTypeCase.DATA_SOURCE if rel.getIsStreaming =>
+ val streamSource = rel.getDataSource
+ val reader = session.readStream
+ if (streamSource.hasFormat) {
Review Comment:
could `hasFormat` be false? what happens in that case?
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,118 @@ message WriteOperationV2 {
// (Optional) A condition for overwrite saving mode
Expression overwrite_condition = 8;
}
+
+// Starts write stream operation as streaming query. Query ID and Run ID of
the streaming
+// query are returned.
+message WriteStreamOperationStart {
+
+ // (Required) The output of the `input` streaming relation will be written.
+ Relation input = 1;
+
+ // The following fields directly map to API for DataStreamWriter().
+ // Consult API documentation unless explicitly documented here.
+
+ string format = 2;
+ map<string, string> options = 3;
+ repeated string partitioning_column_names = 4;
+
+ oneof trigger {
+ string processing_time_interval = 5;
+ bool available_now = 6;
+ bool one_time = 7;
Review Comment:
why call this `one_time` instead of `once` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]