rangadi commented on code in PR #40783:
URL: https://github.com/apache/spark/pull/40783#discussion_r1169162842


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -453,11 +466,12 @@ class SparkSession private[sql] (
     client.execute(plan).asScala.foreach(_ => ())
   }
 
-  private[sql] def execute(command: proto.Command): Unit = {
+  private[sql] def execute(command: proto.Command): Seq[ExecutePlanResponse] = 
{

Review Comment:
   This can be used for RPCs where response is required (RemoteStreamingQuery 
methods use this).
   Alternately we can add another method. But does not seem necessary. Better 
for caller to explicitly ignore them. 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.Futures.timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.SQLHelper
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.functions.window
+
+class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {

Review Comment:
   Thanks @hvanhovell. Looking forward to these improvements. That will be 
awesome. 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Read.DataSource
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamReader private[sql] (sparkSession: SparkSession) {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamReader = {
+    sourceBuilder.setFormat(source)
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
+   * automatically from data. By specifying the schema here, the underlying 
data source can skip
+   * the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schema: StructType): DataStreamReader = {
+    if (schema != null) {
+      sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail 
all the attributes.

Review Comment:
   `toDDL` is a lossy conversion (e.g. it does not contain nullability of an 
element in an array). I went with json because it is more complete.
   Ideally, the contract for `toDDL` should explicitly mention these (or we 
could make it non-lossy).



##########
connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;

Review Comment:
   Sounds good. I left the comment here for reviewers. 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.Futures.timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.SQLHelper
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.functions.window
+
+class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
+
+  test("Streaming API with windowed aggregate query") {
+    // This verifies standard streaming API by starting a streaming query with 
windowed count.
+    withSQLConf(
+      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+    ) {
+      val readDF = spark.readStream
+        .format("rate")
+        .option("rowsPerSecond", "10")
+        .option("numPartitions", "1")
+        .load()
+
+      // Verify schema (results in RPC
+      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
+
+      val countsDF = readDF
+        .withWatermark("timestamp", "10 seconds")
+        .groupBy(window(col("timestamp"), "5 seconds"))
+        .count()
+        .selectExpr("window.start as timestamp", "count as num_events")
+
+      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT 
NOT NULL")
+
+      // Start the query
+      val queryName = "sparkConnectStreamingQuery"
+
+      val query = countsDF.writeStream
+        .format("memory")
+        .queryName(queryName)
+        .trigger(Trigger.ProcessingTime("1 second"))
+        .start()
+
+      // Verify some of the API.
+      assert(query.isActive)
+
+      eventually(timeout(10.seconds)) {
+        assert(query.status.isDataAvailable)
+        assert(query.recentProgress.length > 0) // Query made progress.
+      }
+
+      query.explain() // Prints the plan to console.
+
+      // Don't wait for any processed data. Otherwise the test could take 
multiple seconds.
+      query.stop()

Review Comment:
   Done.



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.Futures.timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.SQLHelper
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.functions.window
+
+class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
+
+  test("Streaming API with windowed aggregate query") {
+    // This verifies standard streaming API by starting a streaming query with 
windowed count.
+    withSQLConf(
+      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+    ) {
+      val readDF = spark.readStream
+        .format("rate")
+        .option("rowsPerSecond", "10")
+        .option("numPartitions", "1")
+        .load()
+
+      // Verify schema (results in RPC
+      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
+
+      val countsDF = readDF
+        .withWatermark("timestamp", "10 seconds")
+        .groupBy(window(col("timestamp"), "5 seconds"))
+        .count()
+        .selectExpr("window.start as timestamp", "count as num_events")
+
+      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT 
NOT NULL")
+
+      // Start the query
+      val queryName = "sparkConnectStreamingQuery"
+
+      val query = countsDF.writeStream
+        .format("memory")
+        .queryName(queryName)
+        .trigger(Trigger.ProcessingTime("1 second"))
+        .start()
+
+      // Verify some of the API.
+      assert(query.isActive)
+
+      eventually(timeout(10.seconds)) {
+        assert(query.status.isDataAvailable)
+        assert(query.recentProgress.length > 0) // Query made progress.

Review Comment:
   Fixed.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.Locale
+import java.util.concurrent.TimeoutException
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Command
+import org.apache.spark.connect.proto.WriteStreamOperationStart
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.streaming.AvailableNowTrigger
+import org.apache.spark.sql.execution.streaming.ContinuousTrigger
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger
+import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
+
+/**
+ * Interface used to write a streaming `Dataset` to external storage systems 
(e.g. file systems,
+ * key-value stores, etc). Use `Dataset.writeStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) {
+
+  /**
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink. <ul> <li>
+   * `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be written
+   * to the sink.</li> <li> `OutputMode.Complete()`: all the rows in the 
streaming
+   * DataFrame/Dataset will be written to the sink every time there are some 
updates.</li> <li>
+   * `OutputMode.Update()`: only the rows that were updated in the streaming 
DataFrame/Dataset
+   * will be written to the sink every time there are some updates. If the 
query doesn't contain
+   * aggregations, it will be equivalent to `OutputMode.Append()` mode.</li> 
</ul>
+   *
+   * @since 3.5.0
+   */
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+    sinkBuilder.setOutputMode(outputMode.toString.toLowerCase(Locale.ROOT))
+    this
+  }
+
+  /**
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink. <ul> <li>
+   * `append`: only the new rows in the streaming DataFrame/Dataset will be 
written to the
+   * sink.</li> <li> `complete`: all the rows in the streaming 
DataFrame/Dataset will be written
+   * to the sink every time there are some updates.</li> <li> `update`: only 
the rows that were
+   * updated in the streaming DataFrame/Dataset will be written to the sink 
every time there are
+   * some updates. If the query doesn't contain aggregations, it will be 
equivalent to `append`
+   * mode.</li> </ul>
+   *
+   * @since 3.5.0
+   */
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+    sinkBuilder.setOutputMode(outputMode)
+    this
+  }
+
+  /**
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will
+   * run the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.writeStream.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.writeStream.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.writeStream().trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 3.5.0
+   */
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+    trigger match {
+      case ProcessingTimeTrigger(intervalMs) =>
+        sinkBuilder.setProcessingTimeInterval(s"$intervalMs milliseconds")
+      case AvailableNowTrigger =>
+        sinkBuilder.setAvailableNow(true)
+      case OneTimeTrigger =>
+        sinkBuilder.setOnce(true)
+      case ContinuousTrigger(intervalMs) =>
+        sinkBuilder.setContinuousCheckpointInterval(s"$intervalMs 
milliseconds")
+    }
+    this
+  }
+
+  /**
+   * Specifies the name of the [[StreamingQuery]] that can be started with 
`start()`. This name
+   * must be unique among all the currently active queries in the associated 
SQLContext.
+   *
+   * @since 3.5.0
+   */
+  def queryName(queryName: String): DataStreamWriter[T] = {
+    sinkBuilder.setQueryName(queryName)
+    this
+  }
+
+  /**
+   * Specifies the underlying output data source.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamWriter[T] = {
+    sinkBuilder.setFormat(source)
+    this
+  }
+
+  /**
+   * Partitions the output by the given columns on the file system. If 
specified, the output is
+   * laid out on the file system similar to Hive's partitioning scheme. As an 
example, when we
+   * partition a dataset by year and then month, the directory layout would 
look like:
+   *
+   * <ul> <li> year=2016/month=01/</li> <li> year=2016/month=02/</li> </ul>
+   *
+   * Partitioning is one of the most widely used techniques to optimize 
physical data layout. It
+   * provides a coarse-grained index for skipping unnecessary data reads when 
queries have
+   * predicates on the partitioned columns. In order for partitioning to work 
well, the number of
+   * distinct values in each column should typically be less than tens of 
thousands.
+   *
+   * @since 3.5.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colNames: String*): DataStreamWriter[T] = {
+    sinkBuilder.clearPartitioningColumnNames()
+    sinkBuilder.addAllPartitioningColumnNames(colNames.asJava)
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: String): DataStreamWriter[T] = {
+    sinkBuilder.putOptions(key, value)
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, 
value.toString)
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Long): DataStreamWriter[T] = option(key, 
value.toString)
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Double): DataStreamWriter[T] = option(key, 
value.toString)
+
+  /**
+   * (Scala-specific) Adds output options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: scala.collection.Map[String, String]): 
DataStreamWriter[T] = {
+    this.options(options.asJava)
+    this
+  }
+
+  /**
+   * Adds output options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: java.util.Map[String, String]): DataStreamWriter[T] = {
+    sinkBuilder.putAllOptions(options)
+    this
+  }
+
+  /**
+   * Starts the execution of the streaming query, which will continually 
output results to the
+   * given path as new data arrives. The returned [[StreamingQuery]] object 
can be used to
+   * interact with the stream.
+   *
+   * @since 3.5.0
+   */
+  def start(path: String): StreamingQuery = {
+    sinkBuilder.setPath(path)
+    start()
+  }
+
+  /**
+   * Starts the execution of the streaming query, which will continually 
output results to the
+   * given path as new data arrives. The returned [[StreamingQuery]] object 
can be used to
+   * interact with the stream. Throws a `TimeoutException` if the following 
conditions are met:
+   *   - Another run of the same streaming query, that is a streaming query 
sharing the same
+   *     checkpoint location, is already active on the same Spark Driver
+   *   - The SQL configuration `spark.sql.streaming.stopActiveRunOnRestart` is 
enabled
+   *   - The active run cannot be stopped within the timeout controlled by the 
SQL configuration
+   *     `spark.sql.streaming.stopTimeout`
+   *
+   * @since 3.5.0
+   */
+  @throws[TimeoutException]
+  def start(): StreamingQuery = {
+    val startCmd = Command
+      .newBuilder()
+      .setWriteStreamOperationStart(sinkBuilder.build())
+      .build()
+
+    val resp = ds.sparkSession.execute(startCmd).head

Review Comment:
   `start()` method is on DataStreamWriter(). Better to start here and create 
`RemoteStreamingQuery` with the response. 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3017,8 +3028,37 @@ class Dataset[T] private[sql] (
         .getStorageLevel)
   }
 
+  /**
+   * Defines an event time watermark for this [[Dataset]]. A watermark tracks 
a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes: <ul> <li>To know when 
a given time window
+   * aggregation can be finalized and thus can be emitted when using output 
modes that do not
+   * allow updates.</li> <li>To minimize the amount of state that we need to 
keep for on-going
+   * aggregations, `mapGroupsWithState` and `dropDuplicates` operators.</li> 
</ul> The current
+   * watermark is computed by looking at the `MAX(eventTime)` seen across all 
of the partitions in
+   * the query minus a user specified `delayThreshold`. Due to the cost of 
coordinating this value
+   * across partitions, the actual watermark used is only guaranteed to be at 
least
+   * `delayThreshold` behind the actual event time. In some cases we may still 
process records
+   * that arrive more than `delayThreshold` late.
+   *
+   * @param eventTime
+   *   the name of the column that contains the event time of the row.
+   * @param delayThreshold
+   *   the minimum delay to wait to data to arrive late, relative to the 
latest record that has
+   *   been processed in the form of an interval (e.g. "1 minute" or "5 
hours"). NOTE: This should
+   *   not be negative.
+   *
+   * @group streaming
+   * @since 3.5.0
+   */
   def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = {
-    throw new UnsupportedOperationException("withWatermark is not 
implemented.")
+    sparkSession.newDataset(encoder) { builder =>

Review Comment:
   That requires parsing the string. We will let the server do it. Same as in 
python.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Read.DataSource
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamReader private[sql] (sparkSession: SparkSession) {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamReader = {
+    sourceBuilder.setFormat(source)
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
+   * automatically from data. By specifying the schema here, the underlying 
data source can skip
+   * the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schema: StructType): DataStreamReader = {
+    if (schema != null) {
+      sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail 
all the attributes.
+    }
+    this
+  }
+
+  /**
+   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON)
+   * can infer the input schema automatically from data. By specifying the 
schema here, the
+   * underlying data source can skip the schema inference step, and thus speed 
up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schemaString: String): DataStreamReader = {
+    sourceBuilder.setSchema(schemaString)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: String): DataStreamReader = {
+    sourceBuilder.putOptions(key, value)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Boolean): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Long): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Double): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * (Scala-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataStreamReader 
= {
+    this.options(options.asJava)
+    this
+  }
+
+  /**
+   * (Java-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: java.util.Map[String, String]): DataStreamReader = {
+    sourceBuilder.putAllOptions(options)
+    this
+  }
+
+  /**
+   * Loads input data stream in as a `DataFrame`, for data streams that don't 
require a path (e.g.
+   * external key-value stores).
+   *
+   * @since 3.5.0
+   */
+  def load(): DataFrame = {
+    sparkSession.newDataFrame { relationBuilder =>
+      relationBuilder.getReadBuilder
+        .setIsStreaming(true)
+        .setDataSource(sourceBuilder.build())
+    }
+  }
+
+  /**
+   * Loads input in as a `DataFrame`, for data streams that read from some 
path.
+   *
+   * @since 3.5.0
+   */
+  def load(path: String): DataFrame = {
+    sourceBuilder.clearPaths()

Review Comment:
   Yes, It would would in those cases, and matches server side. 
   Streaming source supports only single path. There is no 'replace' option for 
Protobuf repeated field. So cleared it to ensure any existing path settings 
don't affect.
   
   There is a bug though (same on server side too). Should I fix it?
   
   ```scala
   val rs = spark.readStream ..... 
   val parquetDf = rs.parquet("parquet-path")
   val df = rs.load() // BUG: incorrectly does rs.load("parquet-path")
   ```



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Read.DataSource
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamReader private[sql] (sparkSession: SparkSession) {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamReader = {
+    sourceBuilder.setFormat(source)
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
+   * automatically from data. By specifying the schema here, the underlying 
data source can skip
+   * the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schema: StructType): DataStreamReader = {
+    if (schema != null) {
+      sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail 
all the attributes.
+    }
+    this
+  }
+
+  /**
+   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON)
+   * can infer the input schema automatically from data. By specifying the 
schema here, the
+   * underlying data source can skip the schema inference step, and thus speed 
up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schemaString: String): DataStreamReader = {

Review Comment:
   It can be json. But I didn't change the scaladoc. The server supports both.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Read.DataSource
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamReader private[sql] (sparkSession: SparkSession) {

Review Comment:
   It should be the same since `sourceBuilder` holds all the variable state. 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Read.DataSource
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamReader private[sql](sparkSession: SparkSession) {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamReader = {
+    sourceBuilder.setFormat(source)
+    this
+  }
+
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
+   * automatically from data. By specifying the schema here, the underlying 
data source can
+   * skip the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schema: StructType): DataStreamReader = {
+    if (schema != null) {
+      sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail 
all the attributes.
+    }
+    this
+  }
+
+  /**
+   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON) can
+   * infer the input schema automatically from data. By specifying the schema 
here, the underlying
+   * data source can skip the schema inference step, and thus speed up data 
loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schemaString: String): DataStreamReader = {
+    sourceBuilder.setSchema(schemaString)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: String): DataStreamReader = {
+    sourceBuilder.putOptions(key, value)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Boolean): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Long): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Double): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * (Scala-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataStreamReader 
= {
+    this.options(options.asJava)
+    this
+  }
+
+  /**
+   * (Java-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: java.util.Map[String, String]): DataStreamReader = {
+    sourceBuilder.putAllOptions(options)
+    this
+  }
+
+
+  /**
+   * Loads input data stream in as a `DataFrame`, for data streams that don't 
require a path
+   * (e.g. external key-value stores).
+   *
+   * @since 3.5.0
+   */
+  def load(): DataFrame = {
+    sparkSession.newDataFrame { relationBuilder =>
+      relationBuilder
+        .getReadBuilder
+        .setIsStreaming(true)
+        .setDataSource(sourceBuilder.build())
+    }
+  }
+
+  /**
+   * Loads input in as a `DataFrame`, for data streams that read from some 
path.
+   *
+   * @since 3.5.0
+   */
+  def load(path: String): DataFrame = {
+    sourceBuilder.clearPaths()
+    sourceBuilder.addPaths(path)
+    load()
+  }
+
+  /**
+   * Loads a JSON file stream and returns the results as a `DataFrame`.
+   *
+   * <a href="http://jsonlines.org/";>JSON Lines</a> (newline-delimited JSON) 
is supported by
+   * default. For JSON (one record per file), set the `multiLine` option to 
true.
+   *
+   * This function goes through the input once to determine the input schema. 
If you know the
+   * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
+   *
+   * You can set the following option(s):
+   * <ul>
+   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
+   * considered in every trigger.</li>
+   * </ul>
+   *
+   * You can find the JSON-specific options for reading JSON file stream in
+   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
+   *   Data Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def json(path: String): DataFrame = {
+    format("json").load(path)
+  }
+
+  /**
+   * Loads a CSV file stream and returns the result as a `DataFrame`.
+   *
+   * This function will go through the input once to determine the input 
schema if `inferSchema`
+   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
+   * specify the schema explicitly using `schema`.
+   *
+   * You can set the following option(s):
+   * <ul>
+   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
+   * considered in every trigger.</li>
+   * </ul>
+   *
+   * You can find the CSV-specific options for reading CSV file stream in
+   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
+   *   Data Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def csv(path: String): DataFrame = format("csv").load(path)

Review Comment:
   Sg. Updated `orc()` and `parquet()` to match this. 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.Futures.timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.SQLHelper
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.functions.window
+
+class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
+
+  test("Streaming API with windowed aggregate query") {
+    // This verifies standard streaming API by starting a streaming query with 
windowed count.
+    withSQLConf(
+      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+    ) {
+      val readDF = spark.readStream
+        .format("rate")
+        .option("rowsPerSecond", "10")
+        .option("numPartitions", "1")
+        .load()
+
+      // Verify schema (results in RPC
+      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
+
+      val countsDF = readDF
+        .withWatermark("timestamp", "10 seconds")
+        .groupBy(window(col("timestamp"), "5 seconds"))
+        .count()
+        .selectExpr("window.start as timestamp", "count as num_events")
+
+      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT 
NOT NULL")
+
+      // Start the query
+      val queryName = "sparkConnectStreamingQuery"
+
+      val query = countsDF.writeStream
+        .format("memory")
+        .queryName(queryName)
+        .trigger(Trigger.ProcessingTime("1 second"))
+        .start()
+
+      // Verify some of the API.
+      assert(query.isActive)
+
+      eventually(timeout(10.seconds)) {
+        assert(query.status.isDataAvailable)
+        assert(query.recentProgress.length > 0) // Query made progress.
+      }
+
+      query.explain() // Prints the plan to console.

Review Comment:
   Nice. Looked into it. It requires moving some of the code to a common class 
to reuse (capturing stdout etc). Can I leave a comment about this for now?



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -126,7 +126,11 @@ object CheckConnectJvmClientCompatibility {
       IncludeByName("org.apache.spark.sql.RuntimeConfig.*"),
       IncludeByName("org.apache.spark.sql.TypedColumn.*"),
       IncludeByName("org.apache.spark.sql.SQLImplicits.*"),
-      IncludeByName("org.apache.spark.sql.DatasetHolder.*"))
+      IncludeByName("org.apache.spark.sql.DatasetHolder.*"),
+      IncludeByName("org.apache.spark.sql.streaming.DataStreamReader.*"),

Review Comment:
   `StreamingQueryProgress` is not complete yet. 
   We are just saving the JSON.  We are excluding all the methods below with 
`TODO(SPARK-43128)` 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala:
##########


Review Comment:
   The end users use public API `Trigger` under 
`org.apache.spark.sql.streaming`.
   Should not be using this, but we don't know since this seems to be public 
accessible too. Retaining same package as legacy. 
   Moving to `org.apache.spark.sql.streaming` likely makes more like public API 
(also break any users using it).



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2935,6 +2936,16 @@ class Dataset[T] private[sql] (
     new DataFrameWriterV2[T](table, this)
   }
 
+  /**
+   * Interface for saving the content of the streaming Dataset out into 
external storage.
+   *
+   * @group basic
+   * @since 3.5.0
+   */
+  def writeStream: DataStreamWriter[T] = {
+    new DataStreamWriter[T](this)

Review Comment:
   Correct. It will be validated on the server.
   Didn't want to incur an RPC. 



##########
connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
+import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
+import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
+
+/**
+ * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
+ *
+ * @since 3.5.0
+ */
+@Evolving
+public class Trigger {
+  // This is a copy of the same class in sql/core/.../streaming/Trigger.java
+
+  /**
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(long intervalMs) {
+      return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * (Java-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    import java.util.concurrent.TimeUnit
+   *    df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
+      return ProcessingTimeTrigger.create(interval, timeUnit);
+  }
+
+  /**
+   * (Scala-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `duration` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    import scala.concurrent.duration._
+   *    df.writeStream.trigger(Trigger.ProcessingTime(10.seconds))
+   * }}}
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(Duration interval) {
+      return ProcessingTimeTrigger.apply(interval);
+  }
+
+  /**
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is effectively 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
+   * }}}
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(String interval) {
+      return ProcessingTimeTrigger.apply(interval);
+  }
+
+  /**
+   * A trigger that processes all available data in a single batch then 
terminates the query.
+   *
+   * @since 3.5.0
+   * @deprecated This is deprecated as of Spark 3.4.0. Use {@link 
#AvailableNow()} to leverage
+   *             better guarantee of processing, fine-grained scale of 
batches, and better gradual
+   *             processing of watermark advancement including no-data batch.
+   *             See the NOTES in {@link #AvailableNow()} for details.
+   */
+  @Deprecated

Review Comment:
   Yeah, we need to include this. Used very widely.
   `@since 3.5.0` does look strange. Is there an option to remove it in the 
comment?



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.Futures.timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.SQLHelper
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.functions.window
+
+class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
+
+  test("Streaming API with windowed aggregate query") {
+    // This verifies standard streaming API by starting a streaming query with 
windowed count.
+    withSQLConf(
+      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+    ) {
+      val readDF = spark.readStream
+        .format("rate")
+        .option("rowsPerSecond", "10")
+        .option("numPartitions", "1")
+        .load()
+
+      // Verify schema (results in RPC

Review Comment:
   Fixed. 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+import java.util.concurrent.TimeoutException
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Command
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.connect.proto.StreamingQueryCommand
+import org.apache.spark.connect.proto.StreamingQueryCommandResult
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A handle to a query that is executing continuously in the background as new 
data arrives. All
+ * these methods are thread-safe.
+ * @since 3.5.0
+ */
+@Evolving
+trait StreamingQuery {
+  // This is a copy of StreamingQuery in 
sql/core/.../streaming/StreamingQuery.scala
+
+  /**
+   * Returns the user-specified name of the query, or null if not specified. 
This name can be
+   * specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as
+   * `dataframe.writeStream.queryName("query").start()`. This name, if set, 
must be unique across
+   * all active queries.
+   *
+   * @since 3.5.0
+   */
+  def name: String
+
+  /**
+   * Returns the unique id of this query that persists across restarts from 
checkpoint data. That
+   * is, this id is generated when a query is started for the first time, and 
will be the same
+   * every time it is restarted from checkpoint data. Also see [[runId]].
+   *
+   * @since 3.5.0
+   */
+  def id: UUID
+
+  /**
+   * Returns the unique id of this run of the query. That is, every 
start/restart of a query will
+   * generate a unique runId. Therefore, every time a query is restarted from 
checkpoint, it will
+   * have the same [[id]] but different [[runId]]s.
+   */
+  def runId: UUID
+
+  /**
+   * Returns the `SparkSession` associated with `this`.
+   *
+   * @since 3.5.0
+   */
+  def sparkSession: SparkSession
+
+  /**
+   * Returns `true` if this query is actively running.
+   *
+   * @since 3.5.0
+   */
+  def isActive: Boolean
+
+  /**
+   * Returns the current status of the query.
+   *
+   * @since 3.5.0
+   */
+  def status: StreamingQueryStatus
+
+  /**
+   * Returns an array of the most recent [[StreamingQueryProgress]] updates 
for this query. The
+   * number of progress updates retained for each stream is configured by 
Spark session
+   * configuration `spark.sql.streaming.numRecentProgressUpdates`.
+   *
+   * @since 3.5.0
+   */
+  def recentProgress: Array[StreamingQueryProgress]
+
+  /**
+   * Returns the most recent [[StreamingQueryProgress]] update of this 
streaming query.
+   *
+   * @since 3.5.0
+   */
+  def lastProgress: StreamingQueryProgress
+
+  /**
+   * Blocks until all available data in the source has been processed and 
committed to the sink.
+   * This method is intended for testing. Note that in the case of continually 
arriving data, this
+   * method may block forever. Additionally, this method is only guaranteed to 
block until data
+   * that has been synchronously appended data to a
+   * `org.apache.spark.sql.execution.streaming.Source` prior to invocation. 
(i.e. `getOffset` must
+   * immediately reflect the addition).
+   * @since 3.5.0
+   */
+  def processAllAvailable(): Unit
+
+  /**
+   * Stops the execution of this query if it is running. This waits until the 
termination of the
+   * query execution threads or until a timeout is hit.
+   *
+   * By default stop will block indefinitely. You can configure a timeout by 
the configuration
+   * `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative) 
milliseconds will block
+   * indefinitely. If a `TimeoutException` is thrown, users can retry stopping 
the stream. If the
+   * issue persists, it is advisable to kill the Spark application.
+   *
+   * @since 3.5.0
+   */
+  @throws[TimeoutException]
+  def stop(): Unit
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   * @since 3.5.0
+   */
+  def explain(): Unit
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   *
+   * @param extended
+   *   whether to do extended explain or not
+   * @since 3.5.0
+   */
+  def explain(extended: Boolean): Unit
+}
+
+class RemoteStreamingQuery(

Review Comment:
   > Is there also a way to get query progress without polling?
   
   That will be new API. We can implement it when want to. It requires server 
side changes too. 
   
   > A somewhat related question, is it worth the trouble to see if we can use 
a single result collection and query monitoring code path?
   
   Not sure, query result collection is likely be lot more involved and will 
evolve over time quite a bit (cloud-fetch, QRC etc).



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Read.DataSource
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamReader private[sql] (sparkSession: SparkSession) {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamReader = {
+    sourceBuilder.setFormat(source)
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
+   * automatically from data. By specifying the schema here, the underlying 
data source can skip
+   * the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schema: StructType): DataStreamReader = {
+    if (schema != null) {
+      sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail 
all the attributes.
+    }
+    this
+  }
+
+  /**
+   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON)
+   * can infer the input schema automatically from data. By specifying the 
schema here, the
+   * underlying data source can skip the schema inference step, and thus speed 
up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schemaString: String): DataStreamReader = {
+    sourceBuilder.setSchema(schemaString)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: String): DataStreamReader = {
+    sourceBuilder.putOptions(key, value)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Boolean): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Long): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Double): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * (Scala-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataStreamReader 
= {
+    this.options(options.asJava)
+    this
+  }
+
+  /**
+   * (Java-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: java.util.Map[String, String]): DataStreamReader = {
+    sourceBuilder.putAllOptions(options)
+    this
+  }
+
+  /**
+   * Loads input data stream in as a `DataFrame`, for data streams that don't 
require a path (e.g.
+   * external key-value stores).
+   *
+   * @since 3.5.0
+   */
+  def load(): DataFrame = {
+    sparkSession.newDataFrame { relationBuilder =>
+      relationBuilder.getReadBuilder
+        .setIsStreaming(true)
+        .setDataSource(sourceBuilder.build())
+    }
+  }
+
+  /**
+   * Loads input in as a `DataFrame`, for data streams that read from some 
path.
+   *
+   * @since 3.5.0
+   */
+  def load(path: String): DataFrame = {
+    sourceBuilder.clearPaths()
+    sourceBuilder.addPaths(path)
+    load()
+  }
+
+  /**
+   * Loads a JSON file stream and returns the results as a `DataFrame`.
+   *
+   * <a href="http://jsonlines.org/";>JSON Lines</a> (newline-delimited JSON) 
is supported by
+   * default. For JSON (one record per file), set the `multiLine` option to 
true.
+   *
+   * This function goes through the input once to determine the input schema. 
If you know the
+   * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * You can find the JSON-specific options for reading JSON file stream in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def json(path: String): DataFrame = {
+    format("json").load(path)
+  }
+
+  /**
+   * Loads a CSV file stream and returns the result as a `DataFrame`.
+   *
+   * This function will go through the input once to determine the input 
schema if `inferSchema`
+   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
+   * specify the schema explicitly using `schema`.
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * You can find the CSV-specific options for reading CSV file stream in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def csv(path: String): DataFrame = format("csv").load(path)
+
+  /**
+   * Loads a ORC file stream, returning the result as a `DataFrame`.
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * ORC-specific option(s) for reading ORC file stream can be found in <a 
href=
+   * 
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
 Data
+   * Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def orc(path: String): DataFrame = {
+    format("orc").load(path)
+  }
+
+  /**
+   * Loads a Parquet file stream, returning the result as a `DataFrame`.
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * Parquet-specific option(s) for reading Parquet file stream can be found 
in <a href=
+   * 
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
 Data
+   * Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def parquet(path: String): DataFrame = {
+    format("parquet").load(path)
+  }
+
+  /**
+   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
+   * "value", and followed by partitioned columns if there are any. The text 
files must be encoded
+   * as UTF-8.
+   *
+   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
+   * {{{
+   *   // Scala:
+   *   spark.readStream.text("/path/to/directory/")
+   *
+   *   // Java:
+   *   spark.readStream().text("/path/to/directory/")
+   * }}}
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * You can find the text-specific options for reading text files in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def text(path: String): DataFrame = format("text").load(path)
+
+  /**
+   * Loads text file(s) and returns a `Dataset` of String. The underlying 
schema of the Dataset
+   * contains a single string column named "value". The text files must be 
encoded as UTF-8.
+   *
+   * If the directory structure of the text files contains partitioning 
information, those are
+   * ignored in the resulting Dataset. To include partitioning information as 
columns, use `text`.
+   *
+   * By default, each line in the text file is a new element in the resulting 
Dataset. For
+   * example:
+   * {{{
+   *   // Scala:
+   *   spark.readStream.textFile("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.readStream().textFile("/path/to/spark/README.md")
+   * }}}
+   *
+   * You can set the text-specific options as specified in 
`DataStreamReader.text`.
+   *
+   * @param path
+   *   input path
+   * @since 3.5.0
+   */
+  def textFile(path: String): Dataset[String] = {
+    
text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)

Review Comment:
   Done.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+import java.util.concurrent.TimeoutException
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Command
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.connect.proto.StreamingQueryCommand
+import org.apache.spark.connect.proto.StreamingQueryCommandResult
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A handle to a query that is executing continuously in the background as new 
data arrives. All
+ * these methods are thread-safe.
+ * @since 3.5.0
+ */
+@Evolving
+trait StreamingQuery {
+  // This is a copy of StreamingQuery in 
sql/core/.../streaming/StreamingQuery.scala
+
+  /**
+   * Returns the user-specified name of the query, or null if not specified. 
This name can be
+   * specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as
+   * `dataframe.writeStream.queryName("query").start()`. This name, if set, 
must be unique across
+   * all active queries.
+   *
+   * @since 3.5.0
+   */
+  def name: String
+
+  /**
+   * Returns the unique id of this query that persists across restarts from 
checkpoint data. That
+   * is, this id is generated when a query is started for the first time, and 
will be the same
+   * every time it is restarted from checkpoint data. Also see [[runId]].
+   *
+   * @since 3.5.0
+   */
+  def id: UUID
+
+  /**
+   * Returns the unique id of this run of the query. That is, every 
start/restart of a query will
+   * generate a unique runId. Therefore, every time a query is restarted from 
checkpoint, it will
+   * have the same [[id]] but different [[runId]]s.
+   */
+  def runId: UUID
+
+  /**
+   * Returns the `SparkSession` associated with `this`.
+   *
+   * @since 3.5.0
+   */
+  def sparkSession: SparkSession
+
+  /**
+   * Returns `true` if this query is actively running.
+   *
+   * @since 3.5.0
+   */
+  def isActive: Boolean
+
+  /**
+   * Returns the current status of the query.
+   *
+   * @since 3.5.0
+   */
+  def status: StreamingQueryStatus
+
+  /**
+   * Returns an array of the most recent [[StreamingQueryProgress]] updates 
for this query. The
+   * number of progress updates retained for each stream is configured by 
Spark session
+   * configuration `spark.sql.streaming.numRecentProgressUpdates`.
+   *
+   * @since 3.5.0
+   */
+  def recentProgress: Array[StreamingQueryProgress]
+
+  /**
+   * Returns the most recent [[StreamingQueryProgress]] update of this 
streaming query.
+   *
+   * @since 3.5.0
+   */
+  def lastProgress: StreamingQueryProgress
+
+  /**
+   * Blocks until all available data in the source has been processed and 
committed to the sink.
+   * This method is intended for testing. Note that in the case of continually 
arriving data, this
+   * method may block forever. Additionally, this method is only guaranteed to 
block until data
+   * that has been synchronously appended data to a
+   * `org.apache.spark.sql.execution.streaming.Source` prior to invocation. 
(i.e. `getOffset` must
+   * immediately reflect the addition).
+   * @since 3.5.0
+   */
+  def processAllAvailable(): Unit
+
+  /**
+   * Stops the execution of this query if it is running. This waits until the 
termination of the
+   * query execution threads or until a timeout is hit.
+   *
+   * By default stop will block indefinitely. You can configure a timeout by 
the configuration
+   * `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative) 
milliseconds will block
+   * indefinitely. If a `TimeoutException` is thrown, users can retry stopping 
the stream. If the
+   * issue persists, it is advisable to kill the Spark application.
+   *
+   * @since 3.5.0
+   */
+  @throws[TimeoutException]
+  def stop(): Unit
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   * @since 3.5.0
+   */
+  def explain(): Unit
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   *
+   * @param extended
+   *   whether to do extended explain or not
+   * @since 3.5.0
+   */
+  def explain(extended: Boolean): Unit
+}
+
+class RemoteStreamingQuery(
+    override val id: UUID,
+    override val runId: UUID,
+    override val name: String,
+    override val sparkSession: SparkSession)
+    extends StreamingQuery {
+
+  override def isActive: Boolean = {
+    executeQueryCmd(_.setStatus(true)).getStatus.getIsActive
+  }
+
+  override def status: StreamingQueryStatus = {
+    val statusResp = executeQueryCmd(_.setStatus(true)).getStatus
+    new StreamingQueryStatus(
+      message = statusResp.getStatusMessage,
+      isDataAvailable = statusResp.getIsDataAvailable,
+      isTriggerActive = statusResp.getIsTriggerActive)
+  }
+
+  override def recentProgress: Array[StreamingQueryProgress] = {
+    
executeQueryCmd(_.setRecentProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala
+      .map(json => new StreamingQueryProgress(json))
+      .toArray
+  }
+
+  override def lastProgress: StreamingQueryProgress = {
+    executeQueryCmd(
+      
_.setLastProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala.headOption
+      .map(json => new StreamingQueryProgress(json))
+      .orNull
+  }
+
+  override def processAllAvailable(): Unit = {
+    executeQueryCmd(_.setProcessAllAvailable(true))
+  }
+
+  override def stop(): Unit = {
+    executeQueryCmd(_.setStop(true))
+  }
+
+  override def explain(): Unit = {
+    explain(extended = false)
+  }
+
+  override def explain(extended: Boolean): Unit = {
+    val explainCmd = StreamingQueryCommand.ExplainCommand
+      .newBuilder()
+      .setExtended(extended)
+      .build()
+
+    val explain = 
executeQueryCmd(_.setExplain(explainCmd)).getExplain.getResult
+
+    // scalastyle:off println
+    println(explain)
+    // scalastyle:on println
+  }
+
+  private def executeQueryCmd(
+      setCmdFn: StreamingQueryCommand.Builder => Unit // Sets the command 
field, like stop().
+  ): StreamingQueryCommandResult = {
+
+    val cmdBuilder = Command.newBuilder()
+    val queryCmdBuilder = cmdBuilder.getStreamingQueryCommandBuilder
+
+    // Set queryId.
+    queryCmdBuilder.getQueryIdBuilder
+      .setId(id.toString)
+      .setRunId(runId.toString)
+
+    // Set command.
+    setCmdFn(queryCmdBuilder)
+
+    val resp = sparkSession.execute(cmdBuilder.build()).head

Review Comment:
   We could, but does not seem required. Alternately we could pick the first 
response that _hasStreamingQueryCommandResult_. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to