HeartSaVioR commented on code in PR #47445:
URL: https://github.com/apache/spark/pull/47445#discussion_r1689705247
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -188,29 +191,54 @@ class StateMetadataPartitionReader(
} else Array.empty
}
- private def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
+ private[sql] def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
Review Comment:
nit: better to leave code comment to justify rationale of package private
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -208,13 +208,16 @@ class IncrementalExecution(
}
val schemaValidationResult = statefulOp.
validateAndMaybeEvolveStateSchema(hadoopConf, currentBatchId,
stateSchemaVersion)
+ val stateSchemaPaths = schemaValidationResult.map(_.schemaPath)
// write out the state schema paths to the metadata file
statefulOp match {
- case stateStoreWriter: StateStoreWriter =>
- val metadata = stateStoreWriter.operatorStateMetadata()
- // TODO: [SPARK-48849] Populate metadata with stateSchemaPaths if
metadata version is v2
- val metadataWriter = new OperatorStateMetadataWriter(new Path(
- checkpointLocation,
stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf)
+ case ssw: StateStoreWriter =>
+ val metadata = ssw.operatorStateMetadata(stateSchemaPaths)
Review Comment:
What is the expected behavior if the state metadata file exists and there is
nothing to be updated for this microbatch? Will we still write a state metadata
file per query run at least?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -188,29 +191,54 @@ class StateMetadataPartitionReader(
} else Array.empty
}
- private def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
+ private[sql] def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
val stateDir = new Path(checkpointLocation, "state")
val opIds = fileManager
.list(stateDir, pathNameCanBeParsedAsLongFilter).map(f =>
pathToLong(f.getPath)).sorted
- opIds.map { opId =>
- new OperatorStateMetadataReader(new Path(stateDir, opId.toString),
hadoopConf).read()
+ opIds.flatMap { opId =>
+ val operatorIdPath = new Path(stateDir, opId.toString)
+ // check if OperatorStateMetadataV2 path exists, if it does, read it
+ // otherwise, fall back to OperatorStateMetadataV1
+ val operatorStateMetadataV2Path =
OperatorStateMetadataV2.metadataDirPath(operatorIdPath)
+ val operatorStateMetadataVersion = if
(fileManager.exists(operatorStateMetadataV2Path)) {
+ 2
+ } else {
+ 1
+ }
+ OperatorStateMetadataReader.createReader(
+ operatorIdPath, hadoopConf, operatorStateMetadataVersion).read()
}
}
private[sql] lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
allOperatorStateMetadata.flatMap { operatorStateMetadata =>
- require(operatorStateMetadata.version == 1)
- val operatorStateMetadataV1 =
operatorStateMetadata.asInstanceOf[OperatorStateMetadataV1]
- operatorStateMetadataV1.stateStoreInfo.map { stateStoreMetadata =>
-
StateMetadataTableEntry(operatorStateMetadataV1.operatorInfo.operatorId,
- operatorStateMetadataV1.operatorInfo.operatorName,
- stateStoreMetadata.storeName,
- stateStoreMetadata.numPartitions,
- if (batchIds.nonEmpty) batchIds.head else -1,
- if (batchIds.nonEmpty) batchIds.last else -1,
- stateStoreMetadata.numColsPrefixKey
- )
+ require(operatorStateMetadata.version == 1 ||
operatorStateMetadata.version == 2)
+ operatorStateMetadata match {
+ case v1: OperatorStateMetadataV1 =>
+ v1.stateStoreInfo.map { stateStoreMetadata =>
+ StateMetadataTableEntry(v1.operatorInfo.operatorId,
+ v1.operatorInfo.operatorName,
+ stateStoreMetadata.storeName,
+ stateStoreMetadata.numPartitions,
+ if (batchIds.nonEmpty) batchIds.head else -1,
+ if (batchIds.nonEmpty) batchIds.last else -1,
+ "",
Review Comment:
If this is to represent that the value is not available, `null` is probably
a better value. Spark SQL will deal with null value, but probably not for empty
string. Also, assume the usage of the column is to parse, being an empty string
requires conditional handling anyway. (Unless from_json() / parse_json() treats
an empty string and an empty json the same way.)
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -188,29 +191,54 @@ class StateMetadataPartitionReader(
} else Array.empty
}
- private def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
+ private[sql] def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
val stateDir = new Path(checkpointLocation, "state")
val opIds = fileManager
.list(stateDir, pathNameCanBeParsedAsLongFilter).map(f =>
pathToLong(f.getPath)).sorted
- opIds.map { opId =>
- new OperatorStateMetadataReader(new Path(stateDir, opId.toString),
hadoopConf).read()
+ opIds.flatMap { opId =>
Review Comment:
I think the semantic of flatMap & Option isn't the same with previous one.
Previously the query fails if any of operator does not have metadata. When this
could be None and how the None value will be reflected to user facing output?
Are we going to "drop" operator from the result?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -86,62 +153,162 @@ object OperatorStateMetadataUtils {
operatorStateMetadata.version match {
case 1 =>
Serialization.write(operatorStateMetadata.asInstanceOf[OperatorStateMetadataV1],
out)
-
+ case 2 =>
+
Serialization.write(operatorStateMetadata.asInstanceOf[OperatorStateMetadataV2],
out)
case _ =>
throw new IllegalArgumentException(s"Failed to serialize operator
metadata with " +
s"version=${operatorStateMetadata.version}")
}
}
}
+object OperatorStateMetadataReader {
+ def createReader(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration,
+ version: Int): OperatorStateMetadataReader = {
+ version match {
+ case 1 =>
+ new OperatorStateMetadataV1Reader(stateCheckpointPath, hadoopConf)
+ case 2 =>
+ new OperatorStateMetadataV2Reader(stateCheckpointPath, hadoopConf)
+ case _ =>
+ throw new IllegalArgumentException(s"Failed to create reader for
operator metadata " +
+ s"with version=$version")
+ }
+ }
+}
+
+object OperatorStateMetadataWriter {
+ def createWriter(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration,
+ version: Int,
+ currentBatchId: Option[Long] = None): OperatorStateMetadataWriter = {
+ version match {
+ case 1 =>
+ new OperatorStateMetadataV1Writer(stateCheckpointPath, hadoopConf)
+ case 2 =>
+ if (currentBatchId.isEmpty) {
+ throw new IllegalArgumentException("currentBatchId is required for
version 2")
+ }
+ new OperatorStateMetadataV2Writer(stateCheckpointPath, hadoopConf,
currentBatchId.get)
+ case _ =>
+ throw new IllegalArgumentException(s"Failed to create writer for
operator metadata " +
+ s"with version=$version")
+ }
+ }
+}
+
+object OperatorStateMetadataV1 {
+ def metadataFilePath(stateCheckpointPath: Path): Path =
+ new Path(new Path(stateCheckpointPath, "_metadata"), "metadata")
+}
+
+object OperatorStateMetadataV2 {
+ private implicit val formats: Formats = Serialization.formats(NoTypeHints)
+
+ @scala.annotation.nowarn
+ private implicit val manifest = Manifest
+
.classType[OperatorStateMetadataV2](implicitly[ClassTag[OperatorStateMetadataV2]].runtimeClass)
+
+ def metadataDirPath(stateCheckpointPath: Path): Path =
+ new Path(new Path(new Path(stateCheckpointPath, "_metadata"), "metadata"),
"v2")
+
+ def metadataFilePath(stateCheckpointPath: Path, currentBatchId: Long): Path =
+ new Path(metadataDirPath(stateCheckpointPath), currentBatchId.toString)
+}
+
/**
* Write OperatorStateMetadata into the state checkpoint directory.
*/
-class OperatorStateMetadataWriter(stateCheckpointPath: Path, hadoopConf:
Configuration)
- extends Logging {
+class OperatorStateMetadataV1Writer(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration)
+ extends OperatorStateMetadataWriter with Logging {
- private val metadataFilePath =
OperatorStateMetadataUtils.metadataFilePath(stateCheckpointPath)
+ private val metadataFilePath =
OperatorStateMetadataV1.metadataFilePath(stateCheckpointPath)
private lazy val fm = CheckpointFileManager.create(stateCheckpointPath,
hadoopConf)
+ override def version: Int = 1
+
def write(operatorMetadata: OperatorStateMetadata): Unit = {
if (fm.exists(metadataFilePath)) return
fm.mkdirs(metadataFilePath.getParent)
val outputStream = fm.createAtomic(metadataFilePath, overwriteIfPossible =
false)
- try {
-
outputStream.write(s"v${operatorMetadata.version}\n".getBytes(StandardCharsets.UTF_8))
- OperatorStateMetadataUtils.serialize(outputStream, operatorMetadata)
- outputStream.close()
- } catch {
- case e: Throwable =>
- logError(
- log"Fail to write state metadata file to ${MDC(LogKeys.META_FILE,
metadataFilePath)}", e)
- outputStream.cancel()
- throw e
- }
+ OperatorStateMetadataUtils.writeMetadata(outputStream, operatorMetadata,
metadataFilePath)
}
}
/**
- * Read OperatorStateMetadata from the state checkpoint directory.
+ * Read OperatorStateMetadata from the state checkpoint directory. This class
will only be
+ * used to read OperatorStateMetadataV1.
+ * OperatorStateMetadataV2 will be read by the OperatorStateMetadataLog.
*/
-class OperatorStateMetadataReader(stateCheckpointPath: Path, hadoopConf:
Configuration) {
+class OperatorStateMetadataV1Reader(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration) extends OperatorStateMetadataReader {
+ override def version: Int = 1
- private val metadataFilePath =
OperatorStateMetadataUtils.metadataFilePath(stateCheckpointPath)
+ private val metadataFilePath =
OperatorStateMetadataV1.metadataFilePath(stateCheckpointPath)
private lazy val fm = CheckpointFileManager.create(stateCheckpointPath,
hadoopConf)
- def read(): OperatorStateMetadata = {
+ def read(): Option[OperatorStateMetadata] = {
val inputStream = fm.open(metadataFilePath)
- val inputReader =
- new BufferedReader(new InputStreamReader(inputStream,
StandardCharsets.UTF_8))
- try {
- val versionStr = inputReader.readLine()
- val version = MetadataVersionUtil.validateVersion(versionStr, 1)
- OperatorStateMetadataUtils.deserialize(version, inputReader)
- } finally {
- inputStream.close()
+ OperatorStateMetadataUtils.readMetadata(inputStream)
+ }
+}
+
+class OperatorStateMetadataV2Writer(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration,
+ currentBatchId: Long) extends OperatorStateMetadataWriter with Logging {
Review Comment:
nit: Logging does not seem to be used
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -98,7 +95,7 @@ class StateSchemaCompatibilityChecker(
stateStoreColFamilySchema: List[StateStoreColFamilySchema],
stateSchemaVersion: Int): Unit = {
// Ensure that schema file path is passed explicitly for schema version 3
- if (stateSchemaVersion == 3 && schemaFilePath.isEmpty) {
+ if (stateSchemaVersion == 3 && newSchemaFilePath.isEmpty) {
throw new IllegalStateException("Schema file path is required for schema
version 3")
Review Comment:
Let's apply error class framework here while we are here. internalError
would be OK.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -86,62 +153,162 @@ object OperatorStateMetadataUtils {
operatorStateMetadata.version match {
case 1 =>
Serialization.write(operatorStateMetadata.asInstanceOf[OperatorStateMetadataV1],
out)
-
+ case 2 =>
+
Serialization.write(operatorStateMetadata.asInstanceOf[OperatorStateMetadataV2],
out)
case _ =>
throw new IllegalArgumentException(s"Failed to serialize operator
metadata with " +
s"version=${operatorStateMetadata.version}")
}
}
}
+object OperatorStateMetadataReader {
+ def createReader(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration,
+ version: Int): OperatorStateMetadataReader = {
+ version match {
+ case 1 =>
+ new OperatorStateMetadataV1Reader(stateCheckpointPath, hadoopConf)
+ case 2 =>
+ new OperatorStateMetadataV2Reader(stateCheckpointPath, hadoopConf)
+ case _ =>
+ throw new IllegalArgumentException(s"Failed to create reader for
operator metadata " +
+ s"with version=$version")
+ }
+ }
+}
+
+object OperatorStateMetadataWriter {
+ def createWriter(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration,
+ version: Int,
+ currentBatchId: Option[Long] = None): OperatorStateMetadataWriter = {
+ version match {
+ case 1 =>
+ new OperatorStateMetadataV1Writer(stateCheckpointPath, hadoopConf)
+ case 2 =>
+ if (currentBatchId.isEmpty) {
+ throw new IllegalArgumentException("currentBatchId is required for
version 2")
+ }
+ new OperatorStateMetadataV2Writer(stateCheckpointPath, hadoopConf,
currentBatchId.get)
+ case _ =>
+ throw new IllegalArgumentException(s"Failed to create writer for
operator metadata " +
+ s"with version=$version")
+ }
+ }
+}
+
+object OperatorStateMetadataV1 {
+ def metadataFilePath(stateCheckpointPath: Path): Path =
+ new Path(new Path(stateCheckpointPath, "_metadata"), "metadata")
+}
+
+object OperatorStateMetadataV2 {
+ private implicit val formats: Formats = Serialization.formats(NoTypeHints)
+
+ @scala.annotation.nowarn
+ private implicit val manifest = Manifest
+
.classType[OperatorStateMetadataV2](implicitly[ClassTag[OperatorStateMetadataV2]].runtimeClass)
+
+ def metadataDirPath(stateCheckpointPath: Path): Path =
+ new Path(new Path(new Path(stateCheckpointPath, "_metadata"), "metadata"),
"v2")
+
+ def metadataFilePath(stateCheckpointPath: Path, currentBatchId: Long): Path =
+ new Path(metadataDirPath(stateCheckpointPath), currentBatchId.toString)
+}
+
/**
* Write OperatorStateMetadata into the state checkpoint directory.
*/
-class OperatorStateMetadataWriter(stateCheckpointPath: Path, hadoopConf:
Configuration)
- extends Logging {
+class OperatorStateMetadataV1Writer(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration)
+ extends OperatorStateMetadataWriter with Logging {
- private val metadataFilePath =
OperatorStateMetadataUtils.metadataFilePath(stateCheckpointPath)
+ private val metadataFilePath =
OperatorStateMetadataV1.metadataFilePath(stateCheckpointPath)
private lazy val fm = CheckpointFileManager.create(stateCheckpointPath,
hadoopConf)
+ override def version: Int = 1
+
def write(operatorMetadata: OperatorStateMetadata): Unit = {
if (fm.exists(metadataFilePath)) return
fm.mkdirs(metadataFilePath.getParent)
val outputStream = fm.createAtomic(metadataFilePath, overwriteIfPossible =
false)
- try {
-
outputStream.write(s"v${operatorMetadata.version}\n".getBytes(StandardCharsets.UTF_8))
- OperatorStateMetadataUtils.serialize(outputStream, operatorMetadata)
- outputStream.close()
- } catch {
- case e: Throwable =>
- logError(
- log"Fail to write state metadata file to ${MDC(LogKeys.META_FILE,
metadataFilePath)}", e)
- outputStream.cancel()
- throw e
- }
+ OperatorStateMetadataUtils.writeMetadata(outputStream, operatorMetadata,
metadataFilePath)
}
}
/**
- * Read OperatorStateMetadata from the state checkpoint directory.
+ * Read OperatorStateMetadata from the state checkpoint directory. This class
will only be
+ * used to read OperatorStateMetadataV1.
+ * OperatorStateMetadataV2 will be read by the OperatorStateMetadataLog.
*/
-class OperatorStateMetadataReader(stateCheckpointPath: Path, hadoopConf:
Configuration) {
+class OperatorStateMetadataV1Reader(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration) extends OperatorStateMetadataReader {
+ override def version: Int = 1
- private val metadataFilePath =
OperatorStateMetadataUtils.metadataFilePath(stateCheckpointPath)
+ private val metadataFilePath =
OperatorStateMetadataV1.metadataFilePath(stateCheckpointPath)
private lazy val fm = CheckpointFileManager.create(stateCheckpointPath,
hadoopConf)
- def read(): OperatorStateMetadata = {
+ def read(): Option[OperatorStateMetadata] = {
val inputStream = fm.open(metadataFilePath)
- val inputReader =
- new BufferedReader(new InputStreamReader(inputStream,
StandardCharsets.UTF_8))
- try {
- val versionStr = inputReader.readLine()
- val version = MetadataVersionUtil.validateVersion(versionStr, 1)
- OperatorStateMetadataUtils.deserialize(version, inputReader)
- } finally {
- inputStream.close()
+ OperatorStateMetadataUtils.readMetadata(inputStream)
+ }
+}
+
+class OperatorStateMetadataV2Writer(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration,
+ currentBatchId: Long) extends OperatorStateMetadataWriter with Logging {
+
+ private val metadataFilePath = OperatorStateMetadataV2.metadataFilePath(
+ stateCheckpointPath, currentBatchId)
+
+ private lazy val fm = CheckpointFileManager.create(stateCheckpointPath,
hadoopConf)
+
+ override def version: Int = 2
+
+ override def write(operatorMetadata: OperatorStateMetadata): Unit = {
+ if (fm.exists(metadataFilePath)) return
+
+ fm.mkdirs(metadataFilePath.getParent)
+ val outputStream = fm.createAtomic(metadataFilePath, overwriteIfPossible =
false)
+ OperatorStateMetadataUtils.writeMetadata(outputStream, operatorMetadata,
metadataFilePath)
+ }
+}
+
+class OperatorStateMetadataV2Reader(
+ stateCheckpointPath: Path,
+ hadoopConf: Configuration) extends OperatorStateMetadataReader with
Logging {
Review Comment:
ditto, Logging does not seem to be used
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -60,19 +81,65 @@ case class OperatorStateMetadataV1(
override def version: Int = 1
}
-object OperatorStateMetadataUtils {
+case class OperatorStateMetadataV2(
+ operatorInfo: OperatorInfoV1,
+ stateStoreInfo: Array[StateStoreMetadataV2],
+ operatorPropertiesJson: String) extends OperatorStateMetadata {
+ override def version: Int = 2
+}
+
+object OperatorStateMetadataUtils extends Logging {
+
+ sealed trait OperatorStateMetadataReader {
+ def version: Int
+
Review Comment:
nit: shall we be consistent? see below trait - no empty line between two or
both empty line
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -208,7 +211,10 @@ object StateSchemaCompatibilityChecker {
* @param stateSchemaVersion - version of the state schema to be used
* @param extraOptions - any extra options to be passed for StateStoreConf
creation
* @param storeName - optional state store name
- * @param schemaFilePath - optional schema file path
+ * @param oldSchemaFilePath - optional path to the old schema file. If not
provided, will default
Review Comment:
Is it valid for schema version 3 to have None? Otherwise let's also mention
here as same as below; Needed for schema version 3.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -60,19 +81,65 @@ case class OperatorStateMetadataV1(
override def version: Int = 1
}
-object OperatorStateMetadataUtils {
+case class OperatorStateMetadataV2(
+ operatorInfo: OperatorInfoV1,
+ stateStoreInfo: Array[StateStoreMetadataV2],
+ operatorPropertiesJson: String) extends OperatorStateMetadata {
+ override def version: Int = 2
+}
+
+object OperatorStateMetadataUtils extends Logging {
+
+ sealed trait OperatorStateMetadataReader {
+ def version: Int
+
+ def read(): Option[OperatorStateMetadata]
+ }
+
+ sealed trait OperatorStateMetadataWriter {
+ def version: Int
+ def write(operatorMetadata: OperatorStateMetadata): Unit
+ }
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
- def metadataFilePath(stateCheckpointPath: Path): Path =
- new Path(new Path(stateCheckpointPath, "_metadata"), "metadata")
+ def readMetadata(inputStream: FSDataInputStream):
Option[OperatorStateMetadata] = {
+ val inputReader =
+ new BufferedReader(new InputStreamReader(inputStream,
StandardCharsets.UTF_8))
+ try {
+ val versionStr = inputReader.readLine()
+ val version = MetadataVersionUtil.validateVersion(versionStr, 2)
+ Some(deserialize(version, inputReader))
+ } finally {
+ inputStream.close()
+ }
+ }
+
+ def writeMetadata(
+ outputStream: CancellableFSDataOutputStream,
Review Comment:
nit: 2 more spaces
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]