HeartSaVioR commented on code in PR #47188:
URL: https://github.com/apache/spark/pull/47188#discussion_r1667893863


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -94,10 +94,21 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
           manager.readSchemaFile()
       }
 
-      new StructType()
-        .add("key", keySchema)
-        .add("value", valueSchema)
-        .add("partition_id", IntegerType)
+      if (sourceOptions.readChangeFeed) {
+        new StructType()

Review Comment:
   I'd expect `change_type` and `batch_id` to begin with, and even batch ID to 
be placed earlier (batch_id, change_type). 
   
   Given the characteristic of change feed, the output is expected to be 
ordered by batch ID (among partition IDs, which may be uneasy), or even the 
data source does not do so, users should be able to do so easily because they 
will high likely do.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -94,10 +94,21 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
           manager.readSchemaFile()
       }
 
-      new StructType()
-        .add("key", keySchema)
-        .add("value", valueSchema)
-        .add("partition_id", IntegerType)
+      if (sourceOptions.readChangeFeed) {
+        new StructType()
+          .add("key", keySchema)
+          .add("value", valueSchema)
+          .add("change_type", StringType)
+          .add("batch_id", LongType)
+          .add("partition_id", IntegerType)
+      } else {
+        new StructType()
+          .add("key", keySchema)
+          .add("value", valueSchema)
+          .add("partition_id", IntegerType)
+      }
+

Review Comment:
   nit: not necessary to have two empty lines



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -37,8 +39,14 @@ class StatePartitionReaderFactory(
     stateStoreMetadata: Array[StateMetadataTableEntry]) extends 
PartitionReaderFactory {
 
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
-    new StatePartitionReader(storeConf, hadoopConf,
-      partition.asInstanceOf[StateStoreInputPartition], schema, 
stateStoreMetadata)
+    val stateStoreInputPartition = 
partition.asInstanceOf[StateStoreInputPartition]
+    if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
+      new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
+        partition.asInstanceOf[StateStoreInputPartition], schema, 
stateStoreMetadata)

Review Comment:
   nit: stateStoreInputPartition (no longer need to cast here)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala:
##########
@@ -76,6 +76,9 @@ class StateTable(
   override def properties(): util.Map[String, String] = Map.empty[String, 
String].asJava
 
   private def isValidSchema(schema: StructType): Boolean = {
+    if (sourceOptions.readChangeFeed) {
+      return isValidChangeDataSchema(schema)
+    }

Review Comment:
   nit: one empty line to clearly denote it is early-return



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -55,6 +56,15 @@ object RecordType extends Enumeration {
     }
   }
 
+  def getRecordTypeAsString(recordType: RecordType): String = {
+    recordType match {
+      case PUT_RECORD => "update"

Review Comment:
   MERGE_RECORD is a valid one - we just need to add more type to state data 
source reader.
   
   We'd also like to add a test, but I guess we are yet to address integration 
for transformWithState and State data source reader.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -390,3 +400,83 @@ class StateStoreChangelogReaderV2(
     }
   }
 }
+
+/**
+ * Base class representing a iterator that iterates over a range of changelog 
files in a state
+ * store. In each iteration, it will return a tuple of (changeType: 
[[RecordType]],
+ * nested key: [[UnsafeRow]], nested value: [[UnsafeRow]], batchId: [[Long]])
+ *
+ * @param fm checkpoint file manager used to manage streaming query checkpoint
+ * @param stateLocation location of the state store
+ * @param startVersion start version of the changelog file to read
+ * @param endVersion end version of the changelog file to read
+ * @param compressionCodec de-compression method using for reading changelog 
file
+ */
+abstract class StateStoreChangeDataReader(
+  fm: CheckpointFileManager,

Review Comment:
   nit: 2 more spaces for params



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -132,7 +143,10 @@ case class StateSourceOptions(
     storeName: String,
     joinSide: JoinSideValues,
     snapshotStartBatchId: Option[Long],
-    snapshotPartitionId: Option[Int]) {
+    snapshotPartitionId: Option[Int],

Review Comment:
   While we are here, it'd be nice to structure sub-options as the parameters 
are now 10 and 5 params aren't common ones. Options for 1) starting with 
snapshot 2) readChangeFeed can be grouped together and be `Option[<option model 
class>]` for each.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -37,8 +39,14 @@ class StatePartitionReaderFactory(
     stateStoreMetadata: Array[StateMetadataTableEntry]) extends 
PartitionReaderFactory {
 
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
-    new StatePartitionReader(storeConf, hadoopConf,
-      partition.asInstanceOf[StateStoreInputPartition], schema, 
stateStoreMetadata)
+    val stateStoreInputPartition = 
partition.asInstanceOf[StateStoreInputPartition]
+    if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
+      new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
+        partition.asInstanceOf[StateStoreInputPartition], schema, 
stateStoreMetadata)
+    } else {
+      new StatePartitionReader(storeConf, hadoopConf,
+        partition.asInstanceOf[StateStoreInputPartition], schema, 
stateStoreMetadata)

Review Comment:
   nit: ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -469,6 +469,15 @@ trait SupportsFineGrainedReplay {
   def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): 
ReadStateStore = {
     new WrappedReadStateStore(replayStateFromSnapshot(snapshotVersion, 
endVersion))
   }
+
+  /**
+   *
+   * @param startVersion
+   * @param endVersion
+   * @return
+   */
+  def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):

Review Comment:
   Strictly saying, 3rd party state store providers can implement their own 
format of delta/changelog files. We need to define an interface for change data 
reader, and have a built-in implementation of the interface which works for 
both HDFS and RocksDB.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -231,9 +248,45 @@ object StateSourceOptions extends DataSourceOptions {
       throw 
StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_PARTITION_ID)
     }
 
+    val readChangeFeed = 
Option(options.get(READ_CHANGE_FEED)).exists(_.toBoolean)
+
+    val changeStartBatchId = 
Option(options.get(CHANGE_START_BATCH_ID)).map(_.toLong)
+    var changeEndBatchId = 
Option(options.get(CHANGE_END_BATCH_ID)).map(_.toLong)
+
+    if (readChangeFeed) {
+      if (joinSide != JoinSideValues.none) {
+        throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, 
READ_CHANGE_FEED))
+      }
+      if (changeStartBatchId.isEmpty) {
+        throw 
StateDataSourceErrors.requiredOptionUnspecified(CHANGE_START_BATCH_ID)
+      }
+      changeEndBatchId = Option(changeEndBatchId.getOrElse(batchId))

Review Comment:
   Probably we'll need to make clear the current option `batchId` to denote 
that it is "ending" batch ID - it will help the option to be used among 
multiple modes.
   
   We could probably design a new option and promote the new option later. 
Before that, let's simply not fall back - let's require users to specify 
symmetric option. We could reconsider the option of consolidating "starting 
batch ID" as well later.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -390,3 +400,83 @@ class StateStoreChangelogReaderV2(
     }
   }
 }
+
+/**
+ * Base class representing a iterator that iterates over a range of changelog 
files in a state
+ * store. In each iteration, it will return a tuple of (changeType: 
[[RecordType]],
+ * nested key: [[UnsafeRow]], nested value: [[UnsafeRow]], batchId: [[Long]])
+ *
+ * @param fm checkpoint file manager used to manage streaming query checkpoint
+ * @param stateLocation location of the state store
+ * @param startVersion start version of the changelog file to read
+ * @param endVersion end version of the changelog file to read
+ * @param compressionCodec de-compression method using for reading changelog 
file
+ */
+abstract class StateStoreChangeDataReader(
+  fm: CheckpointFileManager,
+  stateLocation: Path,
+  startVersion: Long,
+  endVersion: Long,
+  compressionCodec: CompressionCodec)
+  extends NextIterator[(RecordType.Value, UnsafeRow, UnsafeRow, Long)] with 
Logging {
+
+  assert(startVersion >= 1)
+  assert(endVersion >= startVersion)
+
+  /**
+   * Iterator that iterates over the changelog files in the state store.
+   */
+  private class ChangeLogFileIterator extends Iterator[Path] {
+
+    private var currentVersion = StateStoreChangeDataReader.this.startVersion 
- 1
+
+    /** returns the version of the changelog returned by the latest [[next]] 
function call */
+    def getVersion: Long = currentVersion
+
+    override def hasNext: Boolean = currentVersion < 
StateStoreChangeDataReader.this.endVersion
+
+    override def next(): Path = {
+      currentVersion += 1
+      getChangelogPath(currentVersion)
+    }
+
+    private def getChangelogPath(version: Long): Path =
+      new Path(
+        StateStoreChangeDataReader.this.stateLocation,
+        s"$version.${StateStoreChangeDataReader.this.changelogSuffix}")
+  }
+
+  /** file format of the changelog files */
+  protected var changelogSuffix: String
+  private lazy val fileIterator = new ChangeLogFileIterator
+  private var changelogReader: StateStoreChangelogReader = null
+
+  /**
+   * Get a changelog reader that has at least one record left to read. If 
there is no readers left,
+   * return null.
+   */
+  protected def currentChangelogReader(): StateStoreChangelogReader = {
+    while (changelogReader == null || !changelogReader.hasNext) {
+      if (changelogReader != null) {
+        changelogReader.close()
+      }
+      if (!fileIterator.hasNext) {
+        finished = true
+        return null
+      }
+      // Todo: Does not support StateStoreChangelogReaderV2

Review Comment:
   My understanding is that we do not have an information to distinguish 
whether this needs to use V1 vs V2. Do I understand correctly? Since TWS 
support in state data source reader isn't done yet.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -136,3 +144,48 @@ class StatePartitionReader(
     row
   }
 }
+
+/**
+ * An implementation of [[PartitionReader]] for the readChangeFeed mode of 
State Data Source.
+ * It reads the change of state over batches of a particular partition.
+ */
+class StateStoreChangeDataPartitionReader(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    partition: StateStoreInputPartition,
+    schema: StructType,
+    stateStoreMetadata: Array[StateMetadataTableEntry])
+  extends StatePartitionReader(storeConf, hadoopConf, partition, schema, 
stateStoreMetadata) {
+
+  private lazy val changeDataReader: StateStoreChangeDataReader = {
+    if (!provider.isInstanceOf[SupportsFineGrainedReplay]) {
+      throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
+        provider.getClass.toString)
+    }
+    provider.asInstanceOf[SupportsFineGrainedReplay]
+      .getStateStoreChangeDataReader(
+        partition.sourceOptions.changeStartBatchId.get + 1,
+        partition.sourceOptions.changeEndBatchId.get + 1)
+  }
+
+  override protected lazy val iter: Iterator[InternalRow] = {

Review Comment:
   I'd say, the logic for iterator is simple enough and reusing the logic 
partially makes things more complicated. Initializing schema, state store 
provider and the store instance can be reused among two classes (store instance 
isn't even reused) - maybe good to have abstract class named 
`StatePartitionReaderBase` and move these common parts to the new abstract 
class.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -469,6 +469,15 @@ trait SupportsFineGrainedReplay {
   def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): 
ReadStateStore = {
     new WrappedReadStateStore(replayStateFromSnapshot(snapshotVersion, 
endVersion))
   }
+
+  /**

Review Comment:
   nit: incomplete method doc



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala:
##########
@@ -76,6 +76,9 @@ class StateTable(
   override def properties(): util.Map[String, String] = Map.empty[String, 
String].asJava
 
   private def isValidSchema(schema: StructType): Boolean = {
+    if (sourceOptions.readChangeFeed) {
+      return isValidChangeDataSchema(schema)
+    }

Review Comment:
   Btw, we verify the same column name with the same logic regardless of the 
mode. That said, we should be able to refine the logic to reduce redundant code.
   
   ```
   val expectedFieldNames = if (sourceOptions.readChangeFeed) {
     Seq("key", "value", "change_type", "batch_id", "partition_id")
   } else {
     Seq("key", "value", "partition_id")
   }
   val expectedTypes = Map("key" -> classOf[StructType], ..., "batch_id" -> 
classOf[LongType]) // <= should contain all 5 columns
   
   if (schema.fieldNames.toImmutableArraySeq != expectedFieldNames) {
     false
   } else {
     schema.fieldNames.forall { fieldName =>
       SchemaUtil.getSchemaAsDataType(schema, fieldName).getClass == 
expectedTypes(fieldName)
     }
   }
   ```
   
   The above code isn't written with IDE so please consider this as snippet and 
construct yours.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.Assertions
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
+class HDFSBackedStateDataSourceChangeDataReaderSuite extends 
StateDataSourceChangeDataReaderSuite {
+  override protected def newStateStoreProvider(): HDFSBackedStateStoreProvider 
=
+    new HDFSBackedStateStoreProvider
+}
+
+class RocksDBWithChangelogCheckpointStateDataSourceChangeDataReaderSuite 
extends
+  StateDataSourceChangeDataReaderSuite {
+  override protected def newStateStoreProvider(): RocksDBStateStoreProvider =
+    new RocksDBStateStoreProvider
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
+      "true")
+  }
+}
+
+abstract class StateDataSourceChangeDataReaderSuite extends 
StateDataSourceTestBase
+  with Assertions {
+
+  import testImplicits._
+  import StateStoreTestsHelper._
+
+  protected val keySchema: StructType = StateStoreTestsHelper.keySchema
+  protected val valueSchema: StructType = StateStoreTestsHelper.valueSchema
+
+  protected def newStateStoreProvider(): StateStoreProvider
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED, false)
+    spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key, 
newStateStoreProvider().getClass.getName)
+  }
+
+  /**
+   * Calls the overridable [[newStateStoreProvider]] to create the state store 
provider instance.
+   * Initialize it with the configuration set by child classes.
+   *
+   * @param checkpointDir path to store state information
+   * @return instance of class extending [[StateStoreProvider]]
+   */
+  private def getNewStateStoreProvider(checkpointDir: String): 
StateStoreProvider = {
+    val provider = newStateStoreProvider()
+    provider.init(
+      StateStoreId(checkpointDir, 0, 0),
+      keySchema,
+      valueSchema,
+      NoPrefixKeyStateEncoderSpec(keySchema),
+      useColumnFamilies = false,
+      StateStoreConf(spark.sessionState.conf),
+      new Configuration)
+    provider
+  }
+
+  test("ERROR: specify changeStartBatchId in normal mode") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceInvalidOptionValue] {
+        spark.read.format("statestore")
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.CHANGE_START_BATCH_ID, 0)
+          .option(StateSourceOptions.CHANGE_END_BATCH_ID, 2)
+          .load(tempDir.getAbsolutePath)
+      }
+      assert(exc.getErrorClass === "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE")
+    }
+  }
+
+  test("ERROR: changeStartBatchId is set to negative") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+        spark.read.format("statestore")
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.READ_CHANGE_FEED, value = true)
+          .option(StateSourceOptions.CHANGE_START_BATCH_ID, -1)
+          .option(StateSourceOptions.CHANGE_END_BATCH_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+      assert(exc.getErrorClass === "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE")
+    }
+  }
+
+  test("ERROR: changeEndBatchId is set to less than changeStartBatchId") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceInvalidOptionValue] {
+        spark.read.format("statestore")
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.READ_CHANGE_FEED, value = true)
+          .option(StateSourceOptions.CHANGE_START_BATCH_ID, 1)
+          .option(StateSourceOptions.CHANGE_END_BATCH_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+      assert(exc.getErrorClass === "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE")
+    }
+  }
+
+  test("ERROR: joinSide option is used together with readChangeFeed") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceConflictOptions] {
+        spark.read.format("statestore")
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.READ_CHANGE_FEED, value = true)
+          .option(StateSourceOptions.JOIN_SIDE, "left")
+          .option(StateSourceOptions.CHANGE_START_BATCH_ID, 0)
+          .option(StateSourceOptions.CHANGE_END_BATCH_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+      assert(exc.getErrorClass === "STDS_CONFLICT_OPTIONS")
+    }
+  }
+
+  test("getChangeDataReader of state store provider") {
+    def withNewStateStore(provider: StateStoreProvider, version: Int)(f: 
StateStore => Unit):
+      Unit = {
+      val stateStore = provider.getStore(version)
+      f(stateStore)
+      stateStore.commit()
+    }
+
+    withTempDir { tempDir =>
+      val provider = getNewStateStoreProvider(tempDir.getAbsolutePath)
+      withNewStateStore(provider, 0) { stateStore =>
+        put(stateStore, "a", 1, 1) }
+      withNewStateStore(provider, 1) { stateStore =>
+        put(stateStore, "b", 2, 2) }
+      withNewStateStore(provider, 2) { stateStore =>
+        stateStore.remove(dataToKeyRow("a", 1)) }
+      withNewStateStore(provider, 3) { stateStore =>
+        stateStore.remove(dataToKeyRow("b", 2)) }
+
+      val reader =
+        
provider.asInstanceOf[SupportsFineGrainedReplay].getStateStoreChangeDataReader(1,
 4)
+
+      assert(reader.next() === (RecordType.PUT_RECORD, dataToKeyRow("a", 1), 
dataToValueRow(1), 0L))
+      assert(reader.next() === (RecordType.PUT_RECORD, dataToKeyRow("b", 2), 
dataToValueRow(2), 1L))
+      assert(reader.next() ===
+        (RecordType.DELETE_RECORD, dataToKeyRow("a", 1), null, 2L))
+      assert(reader.next() ===
+        (RecordType.DELETE_RECORD, dataToKeyRow("b", 2), null, 3L))
+    }
+  }
+
+  test("read limit state change feed") {

Review Comment:
   nit: Let's be explicit what we are testing - "global streaming limit"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to