HeartSaVioR commented on code in PR #41099:
URL: https://github.com/apache/spark/pull/41099#discussion_r1211034659
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -24,14 +24,275 @@ import scala.language.implicitConversions
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
import org.apache.spark._
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.util.{ThreadUtils, Utils}
-class RocksDBSuite extends SparkFunSuite {
+
+trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils {
+ override protected def test(testName: String, testTags: Tag*)(testBody: =>
Any)
+ (implicit pos: Position): Unit = {
+ super.test(testName, testTags: _*) {
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ // in case tests have any code that needs to execute after every test
+ super.afterEach()
+ }
+
+ super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+ // in case tests have any code that needs to execute before every test
+ super.beforeEach()
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ }
+ }
+
+ def rocksdbChangelogCheckpointingConfKey: String =
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
+ ".changelogCheckpointing.enabled"
+
+ def isChangelogCheckpointingEnabled: Boolean =
+ SQLConf.get.getConfString(rocksdbChangelogCheckpointingConfKey) == "true"
+}
+
+class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with
SharedSparkSession {
Review Comment:
Shall we extract out tests which do not require executing two times? Either
separate suite, or some tag/annotation on the test. I'll comment as "check"
which I figure out, but would be nice if you can revisit tests and do more.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -24,14 +24,275 @@ import scala.language.implicitConversions
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
import org.apache.spark._
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.util.{ThreadUtils, Utils}
-class RocksDBSuite extends SparkFunSuite {
+
+trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils {
+ override protected def test(testName: String, testTags: Tag*)(testBody: =>
Any)
+ (implicit pos: Position): Unit = {
+ super.test(testName, testTags: _*) {
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ // in case tests have any code that needs to execute after every test
+ super.afterEach()
+ }
+
+ super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+ // in case tests have any code that needs to execute before every test
+ super.beforeEach()
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ }
+ }
+
+ def rocksdbChangelogCheckpointingConfKey: String =
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
+ ".changelogCheckpointing.enabled"
+
+ def isChangelogCheckpointingEnabled: Boolean =
+ SQLConf.get.getConfString(rocksdbChangelogCheckpointingConfKey) == "true"
+}
+
+class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with
SharedSparkSession {
+
+ sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS,
classOf[RocksDBStateStoreProvider].getName)
+
+ def snapshotVersionsPresent(dir: String): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".zip"))
+ .map(_.getName.stripSuffix(".zip"))
+ .map(_.toLong)
+ .sorted
+ }
+ def changelogVersionsPresent(dir: String): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".changelog"))
+ .map(_.getName.stripSuffix(".changelog"))
+ .map(_.toLong)
+ .sorted
+ }
+
+ test("RocksDB: check changelog and snapshot version") {
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 1)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ for (version <- 0 to 49) {
+ withDB(remoteDir, version = version, conf = conf) { db =>
+ db.put(version.toString, version.toString)
+ db.commit()
+ if ((version + 1) % 5 == 0) db.doMaintenance()
+ }
+ }
+
+ if (isChangelogCheckpointingEnabled) {
+ assert(changelogVersionsPresent(remoteDir) === (1 to 50))
+ assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50, 5))
+ } else {
+ assert(changelogVersionsPresent(remoteDir) === Seq.empty)
+ assert(snapshotVersionsPresent(remoteDir) === (1 to 50))
+ }
+ }
+
+ test("RocksDB: load version that doesn't exist") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir) { db =>
+ intercept[IllegalStateException] {
+ db.load(1)
+ }
+ }
+ }
+
+ test("RocksDB: purge changelog and snapshots") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val conf = dbConf.copy(enableChangelogCheckpointing = true,
+ minVersionsToRetain = 3, minDeltasForSnapshot = 1)
+ withDB(remoteDir, conf = conf) { db =>
+ db.load(0)
+ db.commit()
+ for (version <- 1 to 2) {
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+ assert(changelogVersionsPresent(remoteDir) == Seq(1, 2, 3))
+
+ for (version <- 3 to 4) {
+ db.load(version)
+ db.commit()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+ assert(changelogVersionsPresent(remoteDir) == (1 to 5))
+ db.doMaintenance()
+ // 3 is the latest snapshot <= maxSnapshotVersionPresent -
minVersionsToRetain + 1
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+ assert(changelogVersionsPresent(remoteDir) == (3 to 5))
+
+ for (version <- 5 to 7) {
+ db.load(version)
+ db.commit()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+ assert(changelogVersionsPresent(remoteDir) == (3 to 8))
+ db.doMaintenance()
+ // 5 is the latest snapshot <= maxSnapshotVersionPresent -
minVersionsToRetain + 1
+ assert(snapshotVersionsPresent(remoteDir) === Seq(5, 8))
+ assert(changelogVersionsPresent(remoteDir) == (5 to 8))
+ }
+ }
+
+ test("RocksDB: minDeltasForSnapshot") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val conf = dbConf.copy(enableChangelogCheckpointing = true,
minDeltasForSnapshot = 3)
+ withDB(remoteDir, conf = conf) { db =>
+ for (version <- 0 to 1) {
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+ // Snapshot should not be created because minDeltasForSnapshot = 3
+ assert(snapshotVersionsPresent(remoteDir) === Seq.empty)
+ assert(changelogVersionsPresent(remoteDir) == Seq(1, 2))
+ db.load(2)
+ db.commit()
+ db.doMaintenance()
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3))
+ db.load(3)
+ for (i <- 1 to 10001) {
+ db.put(i.toString, i.toString)
+ }
+ db.commit()
+ db.doMaintenance()
+ // Snapshot should be created this time because the size of the change
log > 1000
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4))
+ for (version <- 4 to 7) {
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7))
+ for (version <- 8 to 20) {
+ db.load(version)
+ db.commit()
+ }
+ db.doMaintenance()
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7, 19))
+ }
+ }
+
+ // A rocksdb instance with changelog checkpointing enabled should be able to
load
+ // an existing checkpoint without changelog.
+ test("RocksDB: changelog checkpointing backward compatibility") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val disableChangelogCheckpointingConf =
+ dbConf.copy(enableChangelogCheckpointing = false, minVersionsToRetain =
30)
+ withDB(remoteDir, conf = disableChangelogCheckpointingConf) { db =>
+ for (version <- 1 to 30) {
+ db.load(version - 1)
+ db.put(version.toString, version.toString)
+ db.remove((version - 1).toString)
+ db.commit()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
+ }
+
+ // Now enable changelog checkpointing in a checkpoint created by a state
store
+ // that disable changelog checkpointing.
+ val enableChangelogCheckpointingConf =
+ dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain =
30,
+ minDeltasForSnapshot = 1)
+ withDB(remoteDir, conf = enableChangelogCheckpointingConf) { db =>
+ for (version <- 1 to 30) {
+ db.load(version)
+ assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
+ }
+ for (version <- 30 to 60) {
+ db.load(version - 1)
+ db.put(version.toString, version.toString)
+ db.remove((version - 1).toString)
+ db.commit()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
+ assert(changelogVersionsPresent(remoteDir) === (30 to 60))
+ for (version <- 1 to 60) {
+ db.load(version, readOnly = true)
+ assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
+ }
+ // Check that snapshots and changelogs get purged correctly.
+ db.doMaintenance()
+ assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
+ assert(changelogVersionsPresent(remoteDir) === (30 to 60))
+ // Verify the content of retained versions.
+ for (version <- 30 to 60) {
+ db.load(version, readOnly = true)
+ assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
+ }
+ }
+ }
+
+ // A rocksdb instance with changelog checkpointing disabled should be able
to load
+ // an existing checkpoint with changelog.
+ test("RocksDB: changelog checkpointing forward compatibility") {
Review Comment:
check
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -24,14 +24,275 @@ import scala.language.implicitConversions
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
import org.apache.spark._
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.util.{ThreadUtils, Utils}
-class RocksDBSuite extends SparkFunSuite {
+
+trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils {
+ override protected def test(testName: String, testTags: Tag*)(testBody: =>
Any)
+ (implicit pos: Position): Unit = {
+ super.test(testName, testTags: _*) {
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ // in case tests have any code that needs to execute after every test
+ super.afterEach()
+ }
+
+ super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+ // in case tests have any code that needs to execute before every test
+ super.beforeEach()
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ }
+ }
+
+ def rocksdbChangelogCheckpointingConfKey: String =
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
+ ".changelogCheckpointing.enabled"
+
+ def isChangelogCheckpointingEnabled: Boolean =
+ SQLConf.get.getConfString(rocksdbChangelogCheckpointingConfKey) == "true"
+}
+
+class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with
SharedSparkSession {
+
+ sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS,
classOf[RocksDBStateStoreProvider].getName)
+
+ def snapshotVersionsPresent(dir: String): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".zip"))
+ .map(_.getName.stripSuffix(".zip"))
+ .map(_.toLong)
+ .sorted
+ }
+ def changelogVersionsPresent(dir: String): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".changelog"))
+ .map(_.getName.stripSuffix(".changelog"))
+ .map(_.toLong)
+ .sorted
+ }
+
+ test("RocksDB: check changelog and snapshot version") {
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 1)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ for (version <- 0 to 49) {
+ withDB(remoteDir, version = version, conf = conf) { db =>
+ db.put(version.toString, version.toString)
+ db.commit()
+ if ((version + 1) % 5 == 0) db.doMaintenance()
+ }
+ }
+
+ if (isChangelogCheckpointingEnabled) {
+ assert(changelogVersionsPresent(remoteDir) === (1 to 50))
+ assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50, 5))
+ } else {
+ assert(changelogVersionsPresent(remoteDir) === Seq.empty)
+ assert(snapshotVersionsPresent(remoteDir) === (1 to 50))
+ }
+ }
+
+ test("RocksDB: load version that doesn't exist") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir) { db =>
+ intercept[IllegalStateException] {
+ db.load(1)
+ }
+ }
+ }
+
+ test("RocksDB: purge changelog and snapshots") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val conf = dbConf.copy(enableChangelogCheckpointing = true,
+ minVersionsToRetain = 3, minDeltasForSnapshot = 1)
+ withDB(remoteDir, conf = conf) { db =>
+ db.load(0)
+ db.commit()
+ for (version <- 1 to 2) {
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+ assert(changelogVersionsPresent(remoteDir) == Seq(1, 2, 3))
+
+ for (version <- 3 to 4) {
+ db.load(version)
+ db.commit()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+ assert(changelogVersionsPresent(remoteDir) == (1 to 5))
+ db.doMaintenance()
+ // 3 is the latest snapshot <= maxSnapshotVersionPresent -
minVersionsToRetain + 1
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+ assert(changelogVersionsPresent(remoteDir) == (3 to 5))
+
+ for (version <- 5 to 7) {
+ db.load(version)
+ db.commit()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+ assert(changelogVersionsPresent(remoteDir) == (3 to 8))
+ db.doMaintenance()
+ // 5 is the latest snapshot <= maxSnapshotVersionPresent -
minVersionsToRetain + 1
+ assert(snapshotVersionsPresent(remoteDir) === Seq(5, 8))
+ assert(changelogVersionsPresent(remoteDir) == (5 to 8))
+ }
+ }
+
+ test("RocksDB: minDeltasForSnapshot") {
Review Comment:
check
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -865,10 +1076,14 @@ class RocksDBSuite extends SparkFunSuite {
}
}
+ private def sqlConf = SQLConf.get
Review Comment:
I'd rather say, clone the config and use cloned one rather than modifying
the config in session directly. If it does break any tests I'd consider it as
not properly isolated.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala:
##########
@@ -956,3 +956,70 @@ class StreamingAggregationSuite extends
StateStoreMetricsTest with Assertions {
}
}
}
+
+class RocksDBStateStoreStreamingAggregationSuite
+ extends StreamingAggregationSuite with RocksDBStateStoreTest {
+ import testImplicits._
+
+ def snapshotVersionsPresent(dir: File): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".zip"))
+ .map(_.getName.stripSuffix(".zip"))
+ .map(_.toLong)
+ .sorted
+ }
+
+ def changelogVersionsPresent(dir: File): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".changelog"))
+ .map(_.getName.stripSuffix(".changelog"))
+ .map(_.toLong)
+ .sorted
+ }
+
+ test("Streaming aggregation RocksDB State Store backward compatibility.") {
Review Comment:
This can move to RocksDB side suite. Let's not put provider-specific test to
the operator test suite. RocksDBStateStoreIntegrationSuite?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -588,28 +822,6 @@ class RocksDBSuite extends SparkFunSuite {
verifyMetrics(putCount = 2, getCount = 3, metrics = db.metrics)
}
}
-
- // force compaction and check the compaction metrics
Review Comment:
Can we retain this part of test and be selectively executed for incremental
checkpoint? Please try not to remove the test where the functionality is not
removed.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -24,14 +24,275 @@ import scala.language.implicitConversions
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
import org.apache.spark._
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.util.{ThreadUtils, Utils}
-class RocksDBSuite extends SparkFunSuite {
+
+trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils {
+ override protected def test(testName: String, testTags: Tag*)(testBody: =>
Any)
+ (implicit pos: Position): Unit = {
+ super.test(testName, testTags: _*) {
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ // in case tests have any code that needs to execute after every test
+ super.afterEach()
+ }
+
+ super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+ // in case tests have any code that needs to execute before every test
+ super.beforeEach()
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ }
+ }
+
+ def rocksdbChangelogCheckpointingConfKey: String =
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
+ ".changelogCheckpointing.enabled"
+
+ def isChangelogCheckpointingEnabled: Boolean =
+ SQLConf.get.getConfString(rocksdbChangelogCheckpointingConfKey) == "true"
+}
+
+class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with
SharedSparkSession {
+
+ sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS,
classOf[RocksDBStateStoreProvider].getName)
+
+ def snapshotVersionsPresent(dir: String): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".zip"))
+ .map(_.getName.stripSuffix(".zip"))
+ .map(_.toLong)
+ .sorted
+ }
+ def changelogVersionsPresent(dir: String): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".changelog"))
+ .map(_.getName.stripSuffix(".changelog"))
+ .map(_.toLong)
+ .sorted
+ }
+
+ test("RocksDB: check changelog and snapshot version") {
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 1)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ for (version <- 0 to 49) {
+ withDB(remoteDir, version = version, conf = conf) { db =>
+ db.put(version.toString, version.toString)
+ db.commit()
+ if ((version + 1) % 5 == 0) db.doMaintenance()
+ }
+ }
+
+ if (isChangelogCheckpointingEnabled) {
+ assert(changelogVersionsPresent(remoteDir) === (1 to 50))
+ assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50, 5))
+ } else {
+ assert(changelogVersionsPresent(remoteDir) === Seq.empty)
+ assert(snapshotVersionsPresent(remoteDir) === (1 to 50))
+ }
+ }
+
+ test("RocksDB: load version that doesn't exist") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir) { db =>
+ intercept[IllegalStateException] {
+ db.load(1)
+ }
+ }
+ }
+
+ test("RocksDB: purge changelog and snapshots") {
Review Comment:
check
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -588,28 +822,6 @@ class RocksDBSuite extends SparkFunSuite {
verifyMetrics(putCount = 2, getCount = 3, metrics = db.metrics)
}
}
-
- // force compaction and check the compaction metrics
Review Comment:
(The config is even applied to the snapshot of changelog checkpointing,
although we do not do that in every microbatch hence not easy to test that. OK
to skip test for changelog checkpointing.)
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -102,73 +363,53 @@ class RocksDBSuite extends SparkFunSuite {
}
}
- test("RocksDB: cleanup old files") {
- val remoteDir = Utils.createTempDir().toString
- val conf = RocksDBConf().copy(compactOnCommit = true, minVersionsToRetain
= 10)
-
- def versionsPresent: Seq[Long] = {
- remoteDir.listFiles.filter(_.getName.endsWith(".zip"))
- .map(_.getName.stripSuffix(".zip"))
- .map(_.toLong)
- .sorted
- }
-
- withDB(remoteDir, conf = conf) { db =>
- // Generate versions without cleaning up
- for (version <- 1 to 50) {
- if (version > 1) {
- // remove keys we wrote in previous iteration to ensure compaction
happens
- db.remove((version - 1).toString)
- }
- db.put(version.toString, version.toString)
- db.commit()
- }
-
- // Clean up and verify version files and SST files were deleted
- require(versionsPresent === (1L to 50L))
- val sstDir = new File(remoteDir, "SSTs")
- val numSstFiles = listFiles(sstDir).length
- db.cleanup()
- assert(versionsPresent === (41L to 50L))
- assert(listFiles(sstDir).length < numSstFiles)
-
- // Verify data in retained vesions.
- versionsPresent.foreach { version =>
- db.load(version)
- val data = db.iterator().map(toStr).toSet
- assert(data === Set((version.toString, version.toString)))
- }
- }
- }
-
test("RocksDB: handle commit failures and aborts") {
val hadoopConf = new Configuration()
hadoopConf.set(
SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
classOf[CreateAtomicTestManager].getName)
val remoteDir = Utils.createTempDir().getAbsolutePath
- val conf = RocksDBConf().copy(compactOnCommit = true)
- withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
+ withDB(remoteDir, hadoopConf = hadoopConf) { db =>
// Disable failure of output stream and generate versions
CreateAtomicTestManager.shouldFailInCreateAtomic = false
for (version <- 1 to 10) {
+ db.load(version - 1)
db.put(version.toString, version.toString) // update "1" -> "1", "2"
-> "2", ...
db.commit()
}
val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet
// Fail commit for next version and verify that reloading resets the
files
CreateAtomicTestManager.shouldFailInCreateAtomic = true
+ db.load(10)
db.put("11", "11")
intercept[IOException] { quietly { db.commit() } }
- assert(db.load(10).iterator().map(toStr).toSet === version10Data)
+ assert(db.load(10, readOnly = true).iterator().map(toStr).toSet ===
version10Data)
CreateAtomicTestManager.shouldFailInCreateAtomic = false
// Abort commit for next version and verify that reloading resets the
files
db.load(10)
db.put("11", "11")
db.rollback()
- assert(db.load(10).iterator().map(toStr).toSet === version10Data)
+ assert(db.load(10, readOnly = true).iterator().map(toStr).toSet ===
version10Data)
+ }
+ }
+
+ test("RocksDBFileManager: read and write changelog") {
Review Comment:
check
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -24,14 +24,275 @@ import scala.language.implicitConversions
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
import org.apache.spark._
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.util.{ThreadUtils, Utils}
-class RocksDBSuite extends SparkFunSuite {
+
+trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils {
+ override protected def test(testName: String, testTags: Tag*)(testBody: =>
Any)
+ (implicit pos: Position): Unit = {
+ super.test(testName, testTags: _*) {
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ // in case tests have any code that needs to execute after every test
+ super.afterEach()
+ }
+
+ super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+ // in case tests have any code that needs to execute before every test
+ super.beforeEach()
+ withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+ testBody
+ }
+ }
+ }
+
+ def rocksdbChangelogCheckpointingConfKey: String =
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
+ ".changelogCheckpointing.enabled"
+
+ def isChangelogCheckpointingEnabled: Boolean =
+ SQLConf.get.getConfString(rocksdbChangelogCheckpointingConfKey) == "true"
+}
+
+class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with
SharedSparkSession {
+
+ sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS,
classOf[RocksDBStateStoreProvider].getName)
+
+ def snapshotVersionsPresent(dir: String): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".zip"))
+ .map(_.getName.stripSuffix(".zip"))
+ .map(_.toLong)
+ .sorted
+ }
+ def changelogVersionsPresent(dir: String): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".changelog"))
+ .map(_.getName.stripSuffix(".changelog"))
+ .map(_.toLong)
+ .sorted
+ }
+
+ test("RocksDB: check changelog and snapshot version") {
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 1)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ for (version <- 0 to 49) {
+ withDB(remoteDir, version = version, conf = conf) { db =>
+ db.put(version.toString, version.toString)
+ db.commit()
+ if ((version + 1) % 5 == 0) db.doMaintenance()
+ }
+ }
+
+ if (isChangelogCheckpointingEnabled) {
+ assert(changelogVersionsPresent(remoteDir) === (1 to 50))
+ assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50, 5))
+ } else {
+ assert(changelogVersionsPresent(remoteDir) === Seq.empty)
+ assert(snapshotVersionsPresent(remoteDir) === (1 to 50))
+ }
+ }
+
+ test("RocksDB: load version that doesn't exist") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir) { db =>
+ intercept[IllegalStateException] {
+ db.load(1)
+ }
+ }
+ }
+
+ test("RocksDB: purge changelog and snapshots") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val conf = dbConf.copy(enableChangelogCheckpointing = true,
+ minVersionsToRetain = 3, minDeltasForSnapshot = 1)
+ withDB(remoteDir, conf = conf) { db =>
+ db.load(0)
+ db.commit()
+ for (version <- 1 to 2) {
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+ assert(changelogVersionsPresent(remoteDir) == Seq(1, 2, 3))
+
+ for (version <- 3 to 4) {
+ db.load(version)
+ db.commit()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+ assert(changelogVersionsPresent(remoteDir) == (1 to 5))
+ db.doMaintenance()
+ // 3 is the latest snapshot <= maxSnapshotVersionPresent -
minVersionsToRetain + 1
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+ assert(changelogVersionsPresent(remoteDir) == (3 to 5))
+
+ for (version <- 5 to 7) {
+ db.load(version)
+ db.commit()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+ assert(changelogVersionsPresent(remoteDir) == (3 to 8))
+ db.doMaintenance()
+ // 5 is the latest snapshot <= maxSnapshotVersionPresent -
minVersionsToRetain + 1
+ assert(snapshotVersionsPresent(remoteDir) === Seq(5, 8))
+ assert(changelogVersionsPresent(remoteDir) == (5 to 8))
+ }
+ }
+
+ test("RocksDB: minDeltasForSnapshot") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val conf = dbConf.copy(enableChangelogCheckpointing = true,
minDeltasForSnapshot = 3)
+ withDB(remoteDir, conf = conf) { db =>
+ for (version <- 0 to 1) {
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+ // Snapshot should not be created because minDeltasForSnapshot = 3
+ assert(snapshotVersionsPresent(remoteDir) === Seq.empty)
+ assert(changelogVersionsPresent(remoteDir) == Seq(1, 2))
+ db.load(2)
+ db.commit()
+ db.doMaintenance()
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3))
+ db.load(3)
+ for (i <- 1 to 10001) {
+ db.put(i.toString, i.toString)
+ }
+ db.commit()
+ db.doMaintenance()
+ // Snapshot should be created this time because the size of the change
log > 1000
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4))
+ for (version <- 4 to 7) {
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7))
+ for (version <- 8 to 20) {
+ db.load(version)
+ db.commit()
+ }
+ db.doMaintenance()
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7, 19))
+ }
+ }
+
+ // A rocksdb instance with changelog checkpointing enabled should be able to
load
+ // an existing checkpoint without changelog.
+ test("RocksDB: changelog checkpointing backward compatibility") {
Review Comment:
check
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala:
##########
@@ -956,3 +956,70 @@ class StreamingAggregationSuite extends
StateStoreMetricsTest with Assertions {
}
}
}
+
+class RocksDBStateStoreStreamingAggregationSuite
+ extends StreamingAggregationSuite with RocksDBStateStoreTest {
+ import testImplicits._
+
+ def snapshotVersionsPresent(dir: File): Seq[Long] = {
Review Comment:
Maybe better to have an helper object for RocksDB test to deduplicate this
and below methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]