This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bafb953f32e [SPARK-42819][SS] Add support for setting max_write_buffer_number and write_buffer_size for RocksDB used in streaming bafb953f32e is described below commit bafb953f32e4f7c04f7f163354ec997bae8aa0e6 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Thu Mar 16 19:52:16 2023 +0900 [SPARK-42819][SS] Add support for setting max_write_buffer_number and write_buffer_size for RocksDB used in streaming ### What changes were proposed in this pull request? Add support for setting max_write_buffer_number and write_buffer_size for RocksDB used in streaming ### Why are the changes needed? We need these settings in order to control memory tuning for RocksDB. We already expose settings for blockCache size. However, these 2 settings are missing. This change proposes to add them. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests and docs in the guide doc RocksDBSuite ``` [info] Run completed in 59 seconds, 336 milliseconds. [info] Total number of tests run: 27 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 27, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 165 s (02:45), completed Mar 15, 2023, 11:24:17 PM ``` RocksDBStateStoreSuite ``` [info] Run completed in 1 minute, 16 seconds. [info] Total number of tests run: 73 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 73, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #40455 from anishshri-db/task/SPARK-42819. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- docs/structured-streaming-programming-guide.md | 10 ++++++ .../sql/execution/streaming/state/RocksDB.scala | 41 ++++++++++++++++++++-- .../streaming/state/RocksDBStateStoreSuite.scala | 4 +++ .../execution/streaming/state/RocksDBSuite.scala | 27 ++++++++++++++ 4 files changed, 79 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index a71c774f328..486bed7184f 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2289,6 +2289,16 @@ Here are the configs regarding to RocksDB instance of the state store provider: <td>Whether we track the total number of rows in state store. Please refer the details in <a href="#performance-aspect-considerations">Performance-aspect considerations</a>.</td> <td>True</td> </tr> + <tr> + <td>spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB</td> + <td>The maximum size of MemTable in RocksDB. Value of -1 means that RocksDB internal default values will be used</td> + <td>-1</td> + </tr> + <tr> + <td>spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber</td> + <td>The maximum number of MemTables in RocksDB, both active and immutable. Value of -1 means that RocksDB internal default values will be used</td> + <td>-1</td> + </tr> </table> ##### Performance-aspect considerations diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 89872afb80e..363cc2b5c46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -72,7 +72,21 @@ class RocksDB( tableFormatConfig.setFilterPolicy(bloomFilter) tableFormatConfig.setFormatVersion(conf.formatVersion) - private val dbOptions = new Options() // options to open the RocksDB + private val columnFamilyOptions = new ColumnFamilyOptions() + + private val dbOptions = + new Options(new DBOptions(), columnFamilyOptions) // options to open the RocksDB + + // Set RocksDB options around MemTable memory usage. By default, we let RocksDB + // use its internal default values for these settings. + if (conf.writeBufferSizeMB > 0L) { + columnFamilyOptions.setWriteBufferSize(conf.writeBufferSizeMB * 1024 * 1024) + } + + if (conf.maxWriteBufferNumber > 0L) { + columnFamilyOptions.setMaxWriteBufferNumber(conf.maxWriteBufferNumber) + } + dbOptions.setCreateIfMissing(true) dbOptions.setTableFormatConfig(tableFormatConfig) dbOptions.setMaxOpenFiles(conf.maxOpenFiles) @@ -558,7 +572,9 @@ case class RocksDBConf( resetStatsOnLoad : Boolean, formatVersion: Int, trackTotalNumberOfRows: Boolean, - maxOpenFiles: Int) + maxOpenFiles: Int, + writeBufferSizeMB: Long, + maxWriteBufferNumber: Int) object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ @@ -609,6 +625,15 @@ object RocksDBConf { // again when you really need the know the number for observability/debuggability. private val TRACK_TOTAL_NUMBER_OF_ROWS = SQLConfEntry("trackTotalNumberOfRows", "true") + // Configuration to control maximum size of MemTable in RocksDB + private val WRITE_BUFFER_SIZE_MB_CONF = SQLConfEntry("writeBufferSizeMB", "-1") + + // Configuration to set maximum number of MemTables in RocksDB, both active and immutable. + // If the active MemTable fills up and the total number of MemTables is larger than + // maxWriteBufferNumber, then RocksDB will stall further writes. + // This may happen if the flush process is slower than the write rate. + private val MAX_WRITE_BUFFER_NUMBER_CONF = SQLConfEntry("maxWriteBufferNumber", "-1") + def apply(storeConf: StateStoreConf): RocksDBConf = { val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs) val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions) @@ -633,6 +658,14 @@ object RocksDBConf { } } + def getLongConf(conf: ConfEntry): Long = { + Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toLong } getOrElse { + throw new IllegalArgumentException( + s"Invalid value for '${conf.fullName}', must be a long" + ) + } + } + def getPositiveLongConf(conf: ConfEntry): Long = { Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toLong @@ -660,7 +693,9 @@ object RocksDBConf { getBooleanConf(RESET_STATS_ON_LOAD), getPositiveIntConf(FORMAT_VERSION), getBooleanConf(TRACK_TOTAL_NUMBER_OF_ROWS), - getIntConf(MAX_OPEN_FILES_CONF)) + getIntConf(MAX_OPEN_FILES_CONF), + getLongConf(WRITE_BUFFER_SIZE_MB_CONF), + getIntConf(MAX_WRITE_BUFFER_NUMBER_CONF)) } def apply(): RocksDBConf = apply(new StateStoreConf()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index 1998e2af114..54c99741874 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -76,6 +76,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".compactOnCommit", "true"), (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10"), (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"), + (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", "3"), + (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB", "16"), (SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4") ) testConfs.foreach { case (k, v) => spark.conf.set(k, v) } @@ -102,6 +104,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid assert(rocksDBConfInTask.lockAcquireTimeoutMs == 10L) assert(rocksDBConfInTask.formatVersion == 4) assert(rocksDBConfInTask.maxOpenFiles == 1000) + assert(rocksDBConfInTask.maxWriteBufferNumber == 3) + assert(rocksDBConfInTask.writeBufferSizeMB == 16L) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 417eff65482..3d45a8868e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -660,6 +660,33 @@ class RocksDBSuite extends SparkFunSuite { } } + Seq("1", "2", "3").foreach { maxWriteBufferNumber => + Seq("16", "32", "64").foreach {writeBufferSizeMB => + test(s"SPARK-42819: configure memtable memory usage with " + + s"maxWriteBufferNumber=$maxWriteBufferNumber and writeBufferSize=$writeBufferSizeMB") { + withTempDir { dir => + val sqlConf = new SQLConf + sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber", + maxWriteBufferNumber) + sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB", + writeBufferSizeMB) + val dbConf = RocksDBConf(StateStoreConf(sqlConf)) + assert(dbConf.maxWriteBufferNumber === maxWriteBufferNumber.toInt) + assert(dbConf.writeBufferSizeMB === writeBufferSizeMB.toInt) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf) { db => + // Do some DB ops + db.load(0) + db.put("a", "1") + db.commit() + assert(toStr(db.get("a")) === "1") + } + } + } + } + } + test("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart") { withTempDir { dir => val remoteDir = dir.getCanonicalPath --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org