ericm-db commented on code in PR #47490:
URL: https://github.com/apache/spark/pull/47490#discussion_r1746325112
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -358,3 +358,119 @@ class OperatorStateMetadataV2Reader(
}
}
}
+
+/**
+ * A helper class to manage the metadata files for the operator state
checkpoint.
+ * This class is used to manage the metadata files for
OperatorStateMetadataV2, and
+ * provides utils to purge the oldest files such that we only keep the
metadata files
+ * for which a commit log is present
+ * @param checkpointLocation The root path of the checkpoint directory
+ * @param sparkSession The sparkSession that is used to access the hadoopConf
+ * @param stateStoreWriter The operator that this fileManager is being created
for
+ */
+class OperatorStateMetadataV2FileManager(
+ checkpointLocation: Path,
+ sparkSession: SparkSession,
+ stateStoreWriter: StateStoreWriter) extends Logging {
+
+ private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ private val stateCheckpointPath = new Path(checkpointLocation, "state")
+ private val stateOpIdPath = new Path(
+ stateCheckpointPath, stateStoreWriter.getStateInfo.operatorId.toString)
+ private val commitLog =
+ new CommitLog(sparkSession, new Path(checkpointLocation,
"commits").toString)
+ private val stateSchemaPath = stateStoreWriter.stateSchemaDirPath()
+ private val metadataDirPath =
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+ private lazy val fm = CheckpointFileManager.create(metadataDirPath,
hadoopConf)
+
+ protected def isBatchFile(path: Path): Boolean = {
+ try {
+ path.getName.toLong
+ true
+ } catch {
+ case _: NumberFormatException => false
+ }
+ }
+
+ /**
+ * A `PathFilter` to filter only batch files
+ */
+ protected val batchFilesFilter: PathFilter = (path: Path) =>
isBatchFile(path)
+
+ private def pathToBatchId(path: Path): Long = {
+ path.getName.toLong
+ }
+
+ def purgeMetadataFiles(): Unit = {
+ val thresholdBatchId = findThresholdBatchId()
+ if (thresholdBatchId != -1) {
+ val earliestBatchIdKept = deleteMetadataFiles(thresholdBatchId)
+ // we need to delete everything from 0 to (earliestBatchIdKept - 1),
inclusive
+ deleteSchemaFiles(earliestBatchIdKept - 1)
+ }
+ }
+
+ // We only want to keep the metadata and schema files for which the commit
+ // log is present, so we will delete any file that precedes the batch for
the oldest
+ // commit log
+ private def findThresholdBatchId(): Long = {
+ commitLog.listBatchesOnDisk.headOption.getOrElse(0L) - 1L
+ }
+
+ private def deleteSchemaFiles(thresholdBatchId: Long): Unit = {
+ val schemaFiles = fm.list(stateSchemaPath).sorted.map(_.getPath)
+
+ if (schemaFiles.length > 1) {
Review Comment:
RIght yeah removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]