HeartSaVioR commented on code in PR #47188:
URL: https://github.com/apache/spark/pull/47188#discussion_r1667893863
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -94,10 +94,21 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
manager.readSchemaFile()
}
- new StructType()
- .add("key", keySchema)
- .add("value", valueSchema)
- .add("partition_id", IntegerType)
+ if (sourceOptions.readChangeFeed) {
+ new StructType()
Review Comment:
I'd expect `change_type` and `batch_id` to begin with, and even batch ID to
be placed earlier (batch_id, change_type).
Given the characteristic of change feed, the output is expected to be
ordered by batch ID (among partition IDs, which may be uneasy), or even the
data source does not do so, users should be able to do so easily because they
will high likely do.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -94,10 +94,21 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
manager.readSchemaFile()
}
- new StructType()
- .add("key", keySchema)
- .add("value", valueSchema)
- .add("partition_id", IntegerType)
+ if (sourceOptions.readChangeFeed) {
+ new StructType()
+ .add("key", keySchema)
+ .add("value", valueSchema)
+ .add("change_type", StringType)
+ .add("batch_id", LongType)
+ .add("partition_id", IntegerType)
+ } else {
+ new StructType()
+ .add("key", keySchema)
+ .add("value", valueSchema)
+ .add("partition_id", IntegerType)
+ }
+
Review Comment:
nit: not necessary to have two empty lines
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -37,8 +39,14 @@ class StatePartitionReaderFactory(
stateStoreMetadata: Array[StateMetadataTableEntry]) extends
PartitionReaderFactory {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
- new StatePartitionReader(storeConf, hadoopConf,
- partition.asInstanceOf[StateStoreInputPartition], schema,
stateStoreMetadata)
+ val stateStoreInputPartition =
partition.asInstanceOf[StateStoreInputPartition]
+ if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
+ new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
+ partition.asInstanceOf[StateStoreInputPartition], schema,
stateStoreMetadata)
Review Comment:
nit: stateStoreInputPartition (no longer need to cast here)
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala:
##########
@@ -76,6 +76,9 @@ class StateTable(
override def properties(): util.Map[String, String] = Map.empty[String,
String].asJava
private def isValidSchema(schema: StructType): Boolean = {
+ if (sourceOptions.readChangeFeed) {
+ return isValidChangeDataSchema(schema)
+ }
Review Comment:
nit: one empty line to clearly denote it is early-return
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -55,6 +56,15 @@ object RecordType extends Enumeration {
}
}
+ def getRecordTypeAsString(recordType: RecordType): String = {
+ recordType match {
+ case PUT_RECORD => "update"
Review Comment:
MERGE_RECORD is a valid one - we just need to add more type to state data
source reader.
We'd also like to add a test, but I guess we are yet to address integration
for transformWithState and State data source reader.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -390,3 +400,83 @@ class StateStoreChangelogReaderV2(
}
}
}
+
+/**
+ * Base class representing a iterator that iterates over a range of changelog
files in a state
+ * store. In each iteration, it will return a tuple of (changeType:
[[RecordType]],
+ * nested key: [[UnsafeRow]], nested value: [[UnsafeRow]], batchId: [[Long]])
+ *
+ * @param fm checkpoint file manager used to manage streaming query checkpoint
+ * @param stateLocation location of the state store
+ * @param startVersion start version of the changelog file to read
+ * @param endVersion end version of the changelog file to read
+ * @param compressionCodec de-compression method using for reading changelog
file
+ */
+abstract class StateStoreChangeDataReader(
+ fm: CheckpointFileManager,
Review Comment:
nit: 2 more spaces for params
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -132,7 +143,10 @@ case class StateSourceOptions(
storeName: String,
joinSide: JoinSideValues,
snapshotStartBatchId: Option[Long],
- snapshotPartitionId: Option[Int]) {
+ snapshotPartitionId: Option[Int],
Review Comment:
While we are here, it'd be nice to structure sub-options as the parameters
are now 10 and 5 params aren't common ones. Options for 1) starting with
snapshot 2) readChangeFeed can be grouped together and be `Option[<option model
class>]` for each.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -37,8 +39,14 @@ class StatePartitionReaderFactory(
stateStoreMetadata: Array[StateMetadataTableEntry]) extends
PartitionReaderFactory {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
- new StatePartitionReader(storeConf, hadoopConf,
- partition.asInstanceOf[StateStoreInputPartition], schema,
stateStoreMetadata)
+ val stateStoreInputPartition =
partition.asInstanceOf[StateStoreInputPartition]
+ if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
+ new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
+ partition.asInstanceOf[StateStoreInputPartition], schema,
stateStoreMetadata)
+ } else {
+ new StatePartitionReader(storeConf, hadoopConf,
+ partition.asInstanceOf[StateStoreInputPartition], schema,
stateStoreMetadata)
Review Comment:
nit: ditto
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -469,6 +469,15 @@ trait SupportsFineGrainedReplay {
def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
ReadStateStore = {
new WrappedReadStateStore(replayStateFromSnapshot(snapshotVersion,
endVersion))
}
+
+ /**
+ *
+ * @param startVersion
+ * @param endVersion
+ * @return
+ */
+ def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
Review Comment:
Strictly saying, 3rd party state store providers can implement their own
format of delta/changelog files. We need to define an interface for change data
reader, and have a built-in implementation of the interface which works for
both HDFS and RocksDB.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -231,9 +248,45 @@ object StateSourceOptions extends DataSourceOptions {
throw
StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_PARTITION_ID)
}
+ val readChangeFeed =
Option(options.get(READ_CHANGE_FEED)).exists(_.toBoolean)
+
+ val changeStartBatchId =
Option(options.get(CHANGE_START_BATCH_ID)).map(_.toLong)
+ var changeEndBatchId =
Option(options.get(CHANGE_END_BATCH_ID)).map(_.toLong)
+
+ if (readChangeFeed) {
+ if (joinSide != JoinSideValues.none) {
+ throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE,
READ_CHANGE_FEED))
+ }
+ if (changeStartBatchId.isEmpty) {
+ throw
StateDataSourceErrors.requiredOptionUnspecified(CHANGE_START_BATCH_ID)
+ }
+ changeEndBatchId = Option(changeEndBatchId.getOrElse(batchId))
Review Comment:
Probably we'll need to make clear the current option `batchId` to denote
that it is "ending" batch ID - it will help the option to be used among
multiple modes.
We could probably design a new option and promote the new option later.
Before that, let's simply not fall back - let's require users to specify
symmetric option. We could reconsider the option of consolidating "starting
batch ID" as well later.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -390,3 +400,83 @@ class StateStoreChangelogReaderV2(
}
}
}
+
+/**
+ * Base class representing a iterator that iterates over a range of changelog
files in a state
+ * store. In each iteration, it will return a tuple of (changeType:
[[RecordType]],
+ * nested key: [[UnsafeRow]], nested value: [[UnsafeRow]], batchId: [[Long]])
+ *
+ * @param fm checkpoint file manager used to manage streaming query checkpoint
+ * @param stateLocation location of the state store
+ * @param startVersion start version of the changelog file to read
+ * @param endVersion end version of the changelog file to read
+ * @param compressionCodec de-compression method using for reading changelog
file
+ */
+abstract class StateStoreChangeDataReader(
+ fm: CheckpointFileManager,
+ stateLocation: Path,
+ startVersion: Long,
+ endVersion: Long,
+ compressionCodec: CompressionCodec)
+ extends NextIterator[(RecordType.Value, UnsafeRow, UnsafeRow, Long)] with
Logging {
+
+ assert(startVersion >= 1)
+ assert(endVersion >= startVersion)
+
+ /**
+ * Iterator that iterates over the changelog files in the state store.
+ */
+ private class ChangeLogFileIterator extends Iterator[Path] {
+
+ private var currentVersion = StateStoreChangeDataReader.this.startVersion
- 1
+
+ /** returns the version of the changelog returned by the latest [[next]]
function call */
+ def getVersion: Long = currentVersion
+
+ override def hasNext: Boolean = currentVersion <
StateStoreChangeDataReader.this.endVersion
+
+ override def next(): Path = {
+ currentVersion += 1
+ getChangelogPath(currentVersion)
+ }
+
+ private def getChangelogPath(version: Long): Path =
+ new Path(
+ StateStoreChangeDataReader.this.stateLocation,
+ s"$version.${StateStoreChangeDataReader.this.changelogSuffix}")
+ }
+
+ /** file format of the changelog files */
+ protected var changelogSuffix: String
+ private lazy val fileIterator = new ChangeLogFileIterator
+ private var changelogReader: StateStoreChangelogReader = null
+
+ /**
+ * Get a changelog reader that has at least one record left to read. If
there is no readers left,
+ * return null.
+ */
+ protected def currentChangelogReader(): StateStoreChangelogReader = {
+ while (changelogReader == null || !changelogReader.hasNext) {
+ if (changelogReader != null) {
+ changelogReader.close()
+ }
+ if (!fileIterator.hasNext) {
+ finished = true
+ return null
+ }
+ // Todo: Does not support StateStoreChangelogReaderV2
Review Comment:
My understanding is that we do not have an information to distinguish
whether this needs to use V1 vs V2. Do I understand correctly? Since TWS
support in state data source reader isn't done yet.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -136,3 +144,48 @@ class StatePartitionReader(
row
}
}
+
+/**
+ * An implementation of [[PartitionReader]] for the readChangeFeed mode of
State Data Source.
+ * It reads the change of state over batches of a particular partition.
+ */
+class StateStoreChangeDataPartitionReader(
+ storeConf: StateStoreConf,
+ hadoopConf: SerializableConfiguration,
+ partition: StateStoreInputPartition,
+ schema: StructType,
+ stateStoreMetadata: Array[StateMetadataTableEntry])
+ extends StatePartitionReader(storeConf, hadoopConf, partition, schema,
stateStoreMetadata) {
+
+ private lazy val changeDataReader: StateStoreChangeDataReader = {
+ if (!provider.isInstanceOf[SupportsFineGrainedReplay]) {
+ throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
+ provider.getClass.toString)
+ }
+ provider.asInstanceOf[SupportsFineGrainedReplay]
+ .getStateStoreChangeDataReader(
+ partition.sourceOptions.changeStartBatchId.get + 1,
+ partition.sourceOptions.changeEndBatchId.get + 1)
+ }
+
+ override protected lazy val iter: Iterator[InternalRow] = {
Review Comment:
I'd say, the logic for iterator is simple enough and reusing the logic
partially makes things more complicated. Initializing schema, state store
provider and the store instance can be reused among two classes (store instance
isn't even reused) - maybe good to have abstract class named
`StatePartitionReaderBase` and move these common parts to the new abstract
class.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -469,6 +469,15 @@ trait SupportsFineGrainedReplay {
def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
ReadStateStore = {
new WrappedReadStateStore(replayStateFromSnapshot(snapshotVersion,
endVersion))
}
+
+ /**
Review Comment:
nit: incomplete method doc
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala:
##########
@@ -76,6 +76,9 @@ class StateTable(
override def properties(): util.Map[String, String] = Map.empty[String,
String].asJava
private def isValidSchema(schema: StructType): Boolean = {
+ if (sourceOptions.readChangeFeed) {
+ return isValidChangeDataSchema(schema)
+ }
Review Comment:
Btw, we verify the same column name with the same logic regardless of the
mode. That said, we should be able to refine the logic to reduce redundant code.
```
val expectedFieldNames = if (sourceOptions.readChangeFeed) {
Seq("key", "value", "change_type", "batch_id", "partition_id")
} else {
Seq("key", "value", "partition_id")
}
val expectedTypes = Map("key" -> classOf[StructType], ..., "batch_id" ->
classOf[LongType]) // <= should contain all 5 columns
if (schema.fieldNames.toImmutableArraySeq != expectedFieldNames) {
false
} else {
schema.fieldNames.forall { fieldName =>
SchemaUtil.getSchemaAsDataType(schema, fieldName).getClass ==
expectedTypes(fieldName)
}
}
```
The above code isn't written with IDE so please consider this as snippet and
construct yours.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.Assertions
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
+class HDFSBackedStateDataSourceChangeDataReaderSuite extends
StateDataSourceChangeDataReaderSuite {
+ override protected def newStateStoreProvider(): HDFSBackedStateStoreProvider
=
+ new HDFSBackedStateStoreProvider
+}
+
+class RocksDBWithChangelogCheckpointStateDataSourceChangeDataReaderSuite
extends
+ StateDataSourceChangeDataReaderSuite {
+ override protected def newStateStoreProvider(): RocksDBStateStoreProvider =
+ new RocksDBStateStoreProvider
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
+ "true")
+ }
+}
+
+abstract class StateDataSourceChangeDataReaderSuite extends
StateDataSourceTestBase
+ with Assertions {
+
+ import testImplicits._
+ import StateStoreTestsHelper._
+
+ protected val keySchema: StructType = StateStoreTestsHelper.keySchema
+ protected val valueSchema: StructType = StateStoreTestsHelper.valueSchema
+
+ protected def newStateStoreProvider(): StateStoreProvider
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.conf.set(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED, false)
+ spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
newStateStoreProvider().getClass.getName)
+ }
+
+ /**
+ * Calls the overridable [[newStateStoreProvider]] to create the state store
provider instance.
+ * Initialize it with the configuration set by child classes.
+ *
+ * @param checkpointDir path to store state information
+ * @return instance of class extending [[StateStoreProvider]]
+ */
+ private def getNewStateStoreProvider(checkpointDir: String):
StateStoreProvider = {
+ val provider = newStateStoreProvider()
+ provider.init(
+ StateStoreId(checkpointDir, 0, 0),
+ keySchema,
+ valueSchema,
+ NoPrefixKeyStateEncoderSpec(keySchema),
+ useColumnFamilies = false,
+ StateStoreConf(spark.sessionState.conf),
+ new Configuration)
+ provider
+ }
+
+ test("ERROR: specify changeStartBatchId in normal mode") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceInvalidOptionValue] {
+ spark.read.format("statestore")
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.CHANGE_START_BATCH_ID, 0)
+ .option(StateSourceOptions.CHANGE_END_BATCH_ID, 2)
+ .load(tempDir.getAbsolutePath)
+ }
+ assert(exc.getErrorClass === "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE")
+ }
+ }
+
+ test("ERROR: changeStartBatchId is set to negative") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+ spark.read.format("statestore")
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.READ_CHANGE_FEED, value = true)
+ .option(StateSourceOptions.CHANGE_START_BATCH_ID, -1)
+ .option(StateSourceOptions.CHANGE_END_BATCH_ID, 0)
+ .load(tempDir.getAbsolutePath)
+ }
+ assert(exc.getErrorClass === "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE")
+ }
+ }
+
+ test("ERROR: changeEndBatchId is set to less than changeStartBatchId") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceInvalidOptionValue] {
+ spark.read.format("statestore")
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.READ_CHANGE_FEED, value = true)
+ .option(StateSourceOptions.CHANGE_START_BATCH_ID, 1)
+ .option(StateSourceOptions.CHANGE_END_BATCH_ID, 0)
+ .load(tempDir.getAbsolutePath)
+ }
+ assert(exc.getErrorClass === "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE")
+ }
+ }
+
+ test("ERROR: joinSide option is used together with readChangeFeed") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceConflictOptions] {
+ spark.read.format("statestore")
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.READ_CHANGE_FEED, value = true)
+ .option(StateSourceOptions.JOIN_SIDE, "left")
+ .option(StateSourceOptions.CHANGE_START_BATCH_ID, 0)
+ .option(StateSourceOptions.CHANGE_END_BATCH_ID, 0)
+ .load(tempDir.getAbsolutePath)
+ }
+ assert(exc.getErrorClass === "STDS_CONFLICT_OPTIONS")
+ }
+ }
+
+ test("getChangeDataReader of state store provider") {
+ def withNewStateStore(provider: StateStoreProvider, version: Int)(f:
StateStore => Unit):
+ Unit = {
+ val stateStore = provider.getStore(version)
+ f(stateStore)
+ stateStore.commit()
+ }
+
+ withTempDir { tempDir =>
+ val provider = getNewStateStoreProvider(tempDir.getAbsolutePath)
+ withNewStateStore(provider, 0) { stateStore =>
+ put(stateStore, "a", 1, 1) }
+ withNewStateStore(provider, 1) { stateStore =>
+ put(stateStore, "b", 2, 2) }
+ withNewStateStore(provider, 2) { stateStore =>
+ stateStore.remove(dataToKeyRow("a", 1)) }
+ withNewStateStore(provider, 3) { stateStore =>
+ stateStore.remove(dataToKeyRow("b", 2)) }
+
+ val reader =
+
provider.asInstanceOf[SupportsFineGrainedReplay].getStateStoreChangeDataReader(1,
4)
+
+ assert(reader.next() === (RecordType.PUT_RECORD, dataToKeyRow("a", 1),
dataToValueRow(1), 0L))
+ assert(reader.next() === (RecordType.PUT_RECORD, dataToKeyRow("b", 2),
dataToValueRow(2), 1L))
+ assert(reader.next() ===
+ (RecordType.DELETE_RECORD, dataToKeyRow("a", 1), null, 2L))
+ assert(reader.next() ===
+ (RecordType.DELETE_RECORD, dataToKeyRow("b", 2), null, 3L))
+ }
+ }
+
+ test("read limit state change feed") {
Review Comment:
nit: Let's be explicit what we are testing - "global streaming limit"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]