HeartSaVioR commented on code in PR #47188:
URL: https://github.com/apache/spark/pull/47188#discussion_r1668285119


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -390,3 +400,83 @@ class StateStoreChangelogReaderV2(
     }
   }
 }
+
+/**
+ * Base class representing a iterator that iterates over a range of changelog 
files in a state
+ * store. In each iteration, it will return a tuple of (changeType: 
[[RecordType]],
+ * nested key: [[UnsafeRow]], nested value: [[UnsafeRow]], batchId: [[Long]])
+ *
+ * @param fm checkpoint file manager used to manage streaming query checkpoint
+ * @param stateLocation location of the state store
+ * @param startVersion start version of the changelog file to read
+ * @param endVersion end version of the changelog file to read
+ * @param compressionCodec de-compression method using for reading changelog 
file
+ */
+abstract class StateStoreChangeDataReader(
+  fm: CheckpointFileManager,
+  stateLocation: Path,
+  startVersion: Long,
+  endVersion: Long,
+  compressionCodec: CompressionCodec)
+  extends NextIterator[(RecordType.Value, UnsafeRow, UnsafeRow, Long)] with 
Logging {
+
+  assert(startVersion >= 1)
+  assert(endVersion >= startVersion)
+
+  /**
+   * Iterator that iterates over the changelog files in the state store.
+   */
+  private class ChangeLogFileIterator extends Iterator[Path] {
+
+    private var currentVersion = StateStoreChangeDataReader.this.startVersion 
- 1
+
+    /** returns the version of the changelog returned by the latest [[next]] 
function call */
+    def getVersion: Long = currentVersion
+
+    override def hasNext: Boolean = currentVersion < 
StateStoreChangeDataReader.this.endVersion
+
+    override def next(): Path = {
+      currentVersion += 1
+      getChangelogPath(currentVersion)
+    }
+
+    private def getChangelogPath(version: Long): Path =
+      new Path(
+        StateStoreChangeDataReader.this.stateLocation,
+        s"$version.${StateStoreChangeDataReader.this.changelogSuffix}")
+  }
+
+  /** file format of the changelog files */
+  protected var changelogSuffix: String
+  private lazy val fileIterator = new ChangeLogFileIterator
+  private var changelogReader: StateStoreChangelogReader = null
+
+  /**
+   * Get a changelog reader that has at least one record left to read. If 
there is no readers left,
+   * return null.
+   */
+  protected def currentChangelogReader(): StateStoreChangelogReader = {
+    while (changelogReader == null || !changelogReader.hasNext) {
+      if (changelogReader != null) {
+        changelogReader.close()
+      }
+      if (!fileIterator.hasNext) {
+        finished = true
+        return null
+      }
+      // Todo: Does not support StateStoreChangelogReaderV2

Review Comment:
   @anishshri-db Would SPARK-47047 aim to solve this? If then we can put 
SPARK-47047 and mark SPARK-47047 as a blocker. (Technically every remaining 
tickets in transformWithState are blockers for Spark 4.0.0) If it's not aiming 
to solve this TODO, @eason-yuchen-liu please file a JIRA ticket and put JIRA 
ticket number.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to