xuanyuanking commented on a change in pull request #31842:
URL: https://github.com/apache/spark/pull/31842#discussion_r597489348
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
##########
@@ -239,82 +237,39 @@ class StreamingQueryManager private[sql] (sparkSession:
SparkSession) extends Lo
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamingQueryWrapper = {
- var deleteCheckpointOnStop = false
- val checkpointLocation = userSpecifiedCheckpointLocation.map {
userSpecified =>
- new Path(userSpecified).toString
- }.orElse {
- df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
- new Path(location,
userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString
- }
- }.getOrElse {
- if (useTempCheckpointLocation) {
- deleteCheckpointOnStop = true
- val tempDir = Utils.createTempDir(namePrefix =
s"temporary").getCanonicalPath
- logWarning("Temporary checkpoint location created which is deleted
normally when" +
- s" the query didn't fail: $tempDir. If it's required to delete it
under any" +
- s" circumstances, please set
${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
- s" true. Important to know deleting temp checkpoint folder is best
effort.")
- tempDir
- } else {
- throw new AnalysisException(
- "checkpointLocation must be specified either " +
- """through option("checkpointLocation", ...) or """ +
- s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}",
...)""")
- }
- }
-
- // If offsets have already been created, we trying to resume a query.
- if (!recoverFromCheckpointLocation) {
- val checkpointPath = new Path(checkpointLocation, "offsets")
- val fs =
checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
- if (fs.exists(checkpointPath)) {
- throw new AnalysisException(
- s"This query does not support recovering from checkpoint location. "
+
- s"Delete $checkpointPath to start over.")
- }
- }
-
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
- val operationCheckEnabled =
sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled
+ val dataStreamWritePlan = WriteToStreamStatement(
+ userSpecifiedName,
+ userSpecifiedCheckpointLocation,
+ useTempCheckpointLocation,
+ recoverFromCheckpointLocation,
+ sink,
+ outputMode,
+ df.sparkSession.sessionState.newHadoopConf(),
+ trigger.isInstanceOf[ContinuousTrigger],
+ analyzedPlan)
- if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
- logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
- "is not supported in streaming DataFrames/Datasets and will be
disabled.")
- }
+ val analyzedStreamWritePlan =
+ sparkSession.sessionState.executePlan(dataStreamWritePlan).analyzed
Review comment:
We plan to let more rules to resolve the different parts of
WriteToStreamStatement in the future. e.g. resolve table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]