brkyvz commented on code in PR #49063:
URL: https://github.com/apache/spark/pull/49063#discussion_r1869968513
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala:
##########
@@ -108,6 +132,67 @@ class CommitLogSuite extends SparkFunSuite with
SharedSparkSession {
assert(commitMetadata.nextBatchWatermarkMs === 233)
assert(commitMetadata.stateUniqueIds === Map.empty)
}
+
+ // Test an old version of Spark can ser-de the new version of commit log,
+ // but running under V1 (i.e. no stateUniqueIds)
+ test("v1 Serde backward compatibility") {
+ // json looks like: {"nextBatchWatermarkMs":1,"stateUniqueIds":{}}
+ val commitMetadata = CommitMetadata(1)
+ val path = testCommitLogV1CrossVersionFilePath
+
+ if (regenerateGoldenFiles) {
+ // Generate the commit log file using the most up-to-date CommitLog
+ // that has the empty stateUniqueIds field
+ val commitLog = new CommitLog(spark, path.toString)
+ val outputStream = new
FileOutputStream(path.resolve("testCommitLog").toFile)
+ commitLog.serialize(commitMetadata, outputStream)
+ } else {
+ // Load the generated commit log file using the old CommitLog
+ val commitLog = new CommitLogLegacy(spark, path.toString)
+ val inputStream = new
FileInputStream(path.resolve("testCommitLog").toFile)
+ // json looks like: {"nextBatchWatermarkMs":1}
+ val metadata = commitLog.deserialize(inputStream)
+ // Array comparison are reference based, so we need to compare the
elements
+ assert(metadata.nextBatchWatermarkMs ==
commitMetadata.nextBatchWatermarkMs)
+ }
+ }
+}
+
+// Below are the legacy commit log code carbon copied from Spark branch-3.5,
except
+// adding a "Legacy" to the class names.
+case class CommitMetadataLegacy(nextBatchWatermarkMs: Long = 0) {
+ def json: String = Serialization.write(this)(CommitMetadataLegacy.format)
}
+object CommitMetadataLegacy {
+ implicit val format: Formats = Serialization.formats(NoTypeHints)
+ def apply(json: String): CommitMetadataLegacy =
Serialization.read[CommitMetadataLegacy](json)
+}
+
+class CommitLogLegacy(sparkSession: SparkSession, path: String)
+ extends HDFSMetadataLog[CommitMetadataLegacy](sparkSession, path) {
+
+ private val VERSION = 1
+ private val EMPTY_JSON = "{}"
+
+ override def deserialize(in: InputStream): CommitMetadataLegacy = {
+ // called inside a try-finally where the underlying stream is closed in
the caller
+ val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+ if (!lines.hasNext) {
+ throw new IllegalStateException("Incomplete log file in the offset
commit log")
+ }
+ validateVersion(lines.next().trim, VERSION)
+ val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON
+ CommitMetadataLegacy(metadataJson)
+ }
+
+ override def serialize(metadata: CommitMetadataLegacy, out: OutputStream):
Unit = {
+ // called inside a try-finally where the underlying stream is closed in
the caller
+ out.write(s"v${VERSION}".getBytes(UTF_8))
+ out.write('\n')
+
+ // write metadata
+ out.write(metadata.json.getBytes(UTF_8))
+ }
+}
Review Comment:
Instead of creating this, the standard way is to introduce a separate
resource file like you did below without the `stateUniqueIds` field in it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]