anishshri-db commented on code in PR #47490:
URL: https://github.com/apache/spark/pull/47490#discussion_r1746138791


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -80,6 +80,57 @@ class IncrementalExecution(
       TransformWithStateInPandasStrategy :: Nil
   }
 
+  // Methods to enable the use of AsyncLogPurge
+  protected val minLogEntriesToMaintain: Int =
+    sparkSession.sessionState.conf.minBatchesToRetain
+
+  val errorNotifier: ErrorNotifier = new ErrorNotifier()
+
+  override protected def purge(threshold: Long): Unit = {}
+
+  def stateSchemaDirPath(
+      ssw: StateStoreWriter,
+      storeName: Option[String] = None): Path = {
+    def stateInfo = ssw.getStateInfo
+    val stateCheckpointPath =
+      new Path(stateInfo.checkpointLocation,
+        s"${stateInfo.operatorId.toString}")
+    storeName match {
+      case Some(storeName) =>
+        new Path(new Path(stateCheckpointPath, "_stateSchema"), storeName)
+      case None =>
+        new Path(new Path(stateCheckpointPath, "_stateSchema"), "default")
+    }
+  }
+
+  override protected def purgeOldest(statefulOp: StatefulOperator): Unit = {
+    statefulOp match {
+      case ssw: StateStoreWriter =>
+        ssw.operatorStateMetadataVersion match {
+          case 2 =>
+            // checkpointLocation of the operator is runId/state, and 
commitLog path is
+            // runId/commits, so we want the parent of the checkpointLocation 
to get the
+            // commit log path.
+            val parentCheckpointLocation = new 
Path(checkpointLocation).getParent
+            val fileManager = new OperatorStateMetadataV2FileManager(
+              new Path(checkpointLocation, 
ssw.getStateInfo.operatorId.toString),
+              stateSchemaDirPath(ssw, Some(StateStoreId.DEFAULT_STORE_NAME)),
+              new CommitLog(
+                sparkSession,
+                new Path(parentCheckpointLocation, "commits").toString
+              ),
+              hadoopConf)
+              fileManager.purgeMetadataFiles()

Review Comment:
   nit: indent seems off ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to