cloud-fan commented on code in PR #55145:
URL: https://github.com/apache/spark/pull/55145#discussion_r3399805436
##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala:
##########
@@ -251,7 +251,7 @@ class SystemMetadataSuite
val stSystemMetadata = FlowSystemMetadata(updateContext, stFlow, graph)
val schema2StSystemMetadata = FlowSystemMetadata(updateContext,
schema2StFlow, graph)
assert(
- stSystemMetadata.flowCheckpointsDirOpt() !=
schema2StSystemMetadata.flowCheckpointsDirOpt()
+ stSystemMetadata.flowCheckpointsDir() !=
schema2StSystemMetadata.flowCheckpointsDir()
Review Comment:
The `@throws IllegalArgumentException` contract on `flowCheckpointsDir()` is
now documented API surface, but no test exercises it. An
`intercept[IllegalArgumentException]` case for a flow whose destination is
neither a table nor a sink would pin it down.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/State.scala:
##########
@@ -89,27 +89,19 @@ object State extends Logging {
private def reset(flow: ResolvedFlow, env: PipelineUpdateContext, graph:
DataflowGraph): Unit = {
logInfo(log"Clearing out state for flow ${MDC(LogKeys.FLOW_NAME,
flow.displayName)}")
val flowMetadata = FlowSystemMetadata(env, flow, graph)
- flow match {
- case f if flowMetadata.latestCheckpointLocationOpt().isEmpty =>
- logInfo(
- s"Skipping resetting flow ${f.identifier} since its destination not
been previously" +
- s"materialized and we can't find the checkpoint location."
- )
- case _ =>
- val hadoopConf = env.spark.sessionState.newHadoopConf()
+ val hadoopConf = env.spark.sessionState.newHadoopConf()
- // Write a new checkpoint folder if needed
- val checkpointDir = new Path(flowMetadata.latestCheckpointLocation)
- val fs1 = checkpointDir.getFileSystem(hadoopConf)
- if (fs1.exists(checkpointDir)) {
- val nextVersion = checkpointDir.getName.toInt + 1
- val nextPath = new Path(checkpointDir.getParent,
nextVersion.toString)
- fs1.mkdirs(nextPath)
- logInfo(
- log"Created new checkpoint for stream ${MDC(LogKeys.FLOW_NAME,
flow.displayName)} " +
- log"at ${MDC(LogKeys.CHECKPOINT_PATH, nextPath.toString)}."
- )
- }
+ // Write a new checkpoint folder if needed
+ val checkpointDir = new Path(flowMetadata.latestCheckpointLocation)
Review Comment:
The PR description only covers the `SystemMetadata.scala` refactor — it
doesn't mention that `reset` here also changed (the
`latestCheckpointLocationOpt().isEmpty` skip branch is removed). Deleting a
guard looks behavior-changing at a glance, so it's worth recording in the
description that the branch was dead: `flowCheckpointsDirOpt()` was always
`Some`-or-throw, so `.isEmpty` could never be true, and an invalid destination
threw the same `IllegalArgumentException` from the guard itself.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SystemMetadata.scala:
##########
@@ -37,32 +37,33 @@ case class FlowSystemMetadata(
) extends SystemMetadata with Logging {
/**
- * Returns the checkpoint root directory for a given flow
- * which is storage/_checkpoints/flow_destination_table/flow_name.
- * @return the checkpoint root directory for `flow`
+ * @return the checkpoint root directory for this flow
+ * (of the form
storage/_checkpoints/flow_destination_table/flow_name)
+ * @throws IllegalArgumentException when the flow's destination is neither a
table nor sink
Review Comment:
Grammar:
```suggestion
* @throws IllegalArgumentException when the flow's destination is neither
a table nor a sink
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]