xuanyuanking commented on a change in pull request #31842:
URL: https://github.com/apache/spark/pull/31842#discussion_r597489348



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
##########
@@ -239,82 +237,39 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
       recoverFromCheckpointLocation: Boolean,
       trigger: Trigger,
       triggerClock: Clock): StreamingQueryWrapper = {
-    var deleteCheckpointOnStop = false
-    val checkpointLocation = userSpecifiedCheckpointLocation.map { 
userSpecified =>
-      new Path(userSpecified).toString
-    }.orElse {
-      df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
-        new Path(location, 
userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString
-      }
-    }.getOrElse {
-      if (useTempCheckpointLocation) {
-        deleteCheckpointOnStop = true
-        val tempDir = Utils.createTempDir(namePrefix = 
s"temporary").getCanonicalPath
-        logWarning("Temporary checkpoint location created which is deleted 
normally when" +
-          s" the query didn't fail: $tempDir. If it's required to delete it 
under any" +
-          s" circumstances, please set 
${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
-          s" true. Important to know deleting temp checkpoint folder is best 
effort.")
-        tempDir
-      } else {
-        throw new AnalysisException(
-          "checkpointLocation must be specified either " +
-            """through option("checkpointLocation", ...) or """ +
-            s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", 
...)""")
-      }
-    }
-
-    // If offsets have already been created, we trying to resume a query.
-    if (!recoverFromCheckpointLocation) {
-      val checkpointPath = new Path(checkpointLocation, "offsets")
-      val fs = 
checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
-      if (fs.exists(checkpointPath)) {
-        throw new AnalysisException(
-          s"This query does not support recovering from checkpoint location. " 
+
-            s"Delete $checkpointPath to start over.")
-      }
-    }
-
     val analyzedPlan = df.queryExecution.analyzed
     df.queryExecution.assertAnalyzed()
 
-    val operationCheckEnabled = 
sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled
+    val dataStreamWritePlan = WriteToStreamStatement(
+      userSpecifiedName,
+      userSpecifiedCheckpointLocation,
+      useTempCheckpointLocation,
+      recoverFromCheckpointLocation,
+      sink,
+      outputMode,
+      df.sparkSession.sessionState.newHadoopConf(),
+      trigger.isInstanceOf[ContinuousTrigger],
+      analyzedPlan)
 
-    if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
-      logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
-          "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
-    }
+    val analyzedStreamWritePlan =
+      sparkSession.sessionState.executePlan(dataStreamWritePlan).analyzed

Review comment:
       We plan to let more rules to resolve the different parts of 
WriteToStreamStatement in the future. e.g. resolve table.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to