Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun closed pull request #44173: [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` URL: https://github.com/apache/spark/pull/44173 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on PR #44173: URL: https://github.com/apache/spark/pull/44173#issuecomment-1840284937 Thank you so much both of you, @LuciferYang and @yaooqinn . Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
yaooqinn commented on PR #44173: URL: https://github.com/apache/spark/pull/44173#issuecomment-1840281483 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
yaooqinn commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415148979 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + private val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) +if (serialized.hasArray) { + db.put(name.getBytes(UTF_8), serialized.array()) Review Comment: > When do you think persist is invoked with the duplication? I guess it won't happen. I'm just asking because of the existing test `SPARK-46191: FileSystemPersistenceEngine.persist error message for the existing file` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on PR #44173: URL: https://github.com/apache/spark/pull/44173#issuecomment-1840278347 I updated the benchmark result with the final commit, too~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415147988 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) Review Comment: To @LuciferYang , the benchmark is updated with your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415142557 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + private val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) +if (serialized.hasArray) { + db.put(name.getBytes(UTF_8), serialized.array()) Review Comment: To @yaooqinn , `PersistenceEngine` is a secondary storage instead of lookup tables. In general, `Master` has a way to keep the uniqueness in the memory structure before reaching here. For example, `WorkerInfo` is under tracking of https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L76C9-L76C9 ``` private val idToWorker = new HashMap[String, WorkerInfo] ``` In addition, Driver IDs and App IDs are generated by Master in the unique way by the patterns including timestamps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415142557 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + private val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) +if (serialized.hasArray) { + db.put(name.getBytes(UTF_8), serialized.array()) Review Comment: To @yaooqinn , `PersistenceEngine` is a secondary storage instead of lookup tables. In general, `Master` has a way to keep the uniqueness in the memory structure before reaching here. For example, `WorkerInfo` is under tracking of https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L76C9-L76C9 ``` private val idToWorker = new HashMap[String, WorkerInfo] ``` In addition, Driver IDs and App IDs are generated by Master in the unique way including timestamps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415126869 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + private val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) +if (serialized.hasArray) { + db.put(name.getBytes(UTF_8), serialized.array()) Review Comment: When do you think `persist` is invoked with the duplication? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415108828 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: Thank you, @yaooqinn . Ya, that's true. ZookeeperPersistenceEngine has a separate configuration namespace for that. That's one of the reasons why I don't want to add a new namespace. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
yaooqinn commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415095668 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + private val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) +if (serialized.hasArray) { + db.put(name.getBytes(UTF_8), serialized.array()) Review Comment: Does `db.put` behave the same as file creation in FileSystemPersistenceEngine and znode creation in ZooKeeperPersistenceEngine with duplicated `name`s? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on PR #44173: URL: https://github.com/apache/spark/pull/44173#issuecomment-1840203815 Thank you, @LuciferYang . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
yaooqinn commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415066164 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: The thing is, it's pretty common to see that old naming conventions just don't cut it when adding new features. Technically, we have the `spark.deploy.zookeeper.dir` for ZK-mode although it's a znode. I don't have a strong opinion for a new configuration here as it's trivial. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on PR #44173: URL: https://github.com/apache/spark/pull/44173#issuecomment-1840187474 Thank you for pointing out that. > Maybe the documents also need to be updated in this pr? Actually, I thought about making a new doc PR for the following accumulated stuffs. I also want to add some recommendation for the user choices. Let me proceed them separately~ ``` [SPARK-46258][CORE] Add RocksDBPersistenceEngine [SPARK-46216][CORE] Improve `FileSystemPersistenceEngine` to support compressions [SPARK-46205][CORE] Improve `PersistenceEngine` performance with `KryoSerializer` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415034531 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: No, it will be okay. For example, rocksdb content will be ignored because FILESYSTEM scans only the following `prefix` files. I guess RocksDB also do the same things for the FILESYSTEM's leftover. https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala#L56-L78 However, that kind of changes are not supposed to be there. For the following three configurations, the directory should be cleared. - spark.deploy.recoveryMode - spark.deploy.recoverySerializer - spark.deploy.recoveryCompressionCodec -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
LuciferYang commented on PR #44173: URL: https://github.com/apache/spark/pull/44173#issuecomment-1840164897 Maybe the documents also need to be updated in this pr? https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/docs/spark-standalone.md?plain=1#L736 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414983766 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: Thank you for review. @yaooqinn and @LuciferYang I tried according to your advices but it seems to make the configuration namespace a little weird because this existing configuration is a general one, `spark.deploy.recoveryDirectory`. https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala#L54 If we introduce a new configuration for `RocksDBPersistenceEngine` like the following, the users will ask why we cannot use `spark.deploy.recoveryDirectory` for `ROCKSDB` mode. Here is the example. The AS-IS design aims to allow the users to switch a single configuration `recoveryMode`. **BEFORE** ``` spark.deploy.recoveryMode=FILESYSTEM spark.deploy.recoveryDirectory=/opt/master ``` **AFTER** ``` spark.deploy.recoveryMode=ROCKSDB spark.deploy.recoveryDirectory=/opt/master ``` If we add a new configuration, it would be the following where `spark.deploy.recoveryDirectory` is no-op. ``` spark.deploy.recoveryMode=ROCKSDB spark.deploy.recoveryDirectory=/opt/master spark.deploy.recoveryRocksDBDirectory=/opt/db ``` In addition, we assume that the users clear the location properly when they changes the settings like the following. - `spark.deploy.recoverySerializer` - `spark.deploy.recoveryCompressionCodec` To sum up, if you guys don't mind, I'd like to use the existing general configuration name for this backend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
LuciferYang commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415001683 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) Review Comment: Yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1415000391 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
LuciferYang commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414999619 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: From this perspective, I think reusing the configuration is also fine. But, I have another question: If the user first uses `FILESYSTEM` and then switches to `ROCKSDB`, if the user does not clean up `/opt/master` manually, will the residual data in the `/opt/master` directory cause the master to fail to start In this scenario? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414996134 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) Review Comment: This one, right? ```scala override def persist(name: String, obj: Object): Unit = { val serialized = serializer.newInstance().serialize(obj) if (serialized.hasArray) { db.put(name.getBytes(UTF_8), serialized.array()) } else { val bytes = new Array[Byte](serialized.remaining()) serialized.get(bytes) db.put(name.getBytes(UTF_8), bytes) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414987963 ## core/src/main/scala/org/apache/spark/deploy/master/Master.scala: ## @@ -186,6 +186,10 @@ private[deploy] class Master( val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) + case "ROCKSDB" => Review Comment: Yes, it's a legacy behavior which `spark.deploy.recoveryMode` always assumes all upper cases from Spark 0.8.1. - NONE, ZOOKEEPER, FILESYSTEM, CUSTOM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414986088 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) +val bytes = new Array[Byte](serialized.remaining()) +serialized.get(bytes) +db.put(name.getBytes(UTF_8), bytes) + } + + override def unpersist(name: String): Unit = { +db.delete(name.getBytes(UTF_8)) + } + + override def read[T: ClassTag](name: String): Seq[T] = { +val result = new ArrayBuffer[T] +val iter = db.newIterator() +iter.seek(name.getBytes(UTF_8)) +while (iter.isValid && new String(iter.key()).startsWith(name)) { + result.append(serializer.newInstance().deserialize[T](ByteBuffer.wrap(iter.value( + iter.next() +} +iter.close() Review Comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414985071 ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + val path = Files.createDirectories(Paths.get(dir)) Review Comment: Thanks. Let me switch it to `private`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414983766 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: Thank you for review. @yaooqinn and @LuciferYang I tried according to your advices but it seems to make the configuration namespace a little weird because this existing configuration is a general one, `spark.deploy.recoveryDirectory`. https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala#L54 If we introduce a new configuration for `RocksDBPersistenceEngine` like the following, the users will ask why we cannot use `spark.deploy.recoveryDirectory` for `ROCKSDB` mode. Here is the example. The AS-IS design aims to allow the users to switch a single configuration `recoveryMode`. **BEFORE** ``` spark.deploy.recoveryMode=FILESYSTEM spark.deploy.recoveryDirectory=/opt/master ``` **AFTER** ``` spark.deploy.recoveryMode=ROCKSDB spark.deploy.recoveryDirectory=/opt/master ``` If we add a new configuration, it would be the following where `spark.deploy.recoveryDirectory` is no-op. ``` spark.deploy.recoveryMode=ROCKSDB spark.deploy.recoveryDirectory=/opt/master spark.deploy.recoveryRocksDBDirectory=/opt/db ``` In addition, we assume that the users clear the location properly when they changes the setting like the following. - `spark.deploy.recoverySerializer` - `spark.deploy.recoveryCompressionCodec` To sum up, if you guys don't mind, I'd like to use the existing general configuration name for this backend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
LuciferYang commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414940118 ## core/src/main/scala/org/apache/spark/deploy/master/Master.scala: ## @@ -186,6 +186,10 @@ private[deploy] class Master( val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) + case "ROCKSDB" => Review Comment: Is this configuration item required to be `ROCKSDB`? Is it not allowed to configure it as `rocksdb` or `Rocksdb` now? (Perhaps this is a legacy issue?) ## core/src/main/scala/org/apache/spark/deploy/master/Master.scala: ## @@ -186,6 +186,10 @@ private[deploy] class Master( val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) + case "ROCKSDB" => Review Comment: Is this configuration item required to be `ROCKSDB`? Is it not allowed to configure it as `rocksdb` or `Rocksdb` now? (Perhaps this is a legacy issue?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
LuciferYang commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414938380 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
LuciferYang commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414916176 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: Although this may be to maintain consistency with the style of `FileSystemRecoveryModeFactory`, it may be more suitable as a local variable within the `createPersistenceEngine()` method. ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( +val dir: String, +val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() +.setFilterPolicy(new BloomFilter(10.0D, false)) +.setEnableIndexCompression(false) +.setIndexBlockRestartInterval(8) +.setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() +.setCreateIfMissing(true) +.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) +.setCompressionType(CompressionType.LZ4_COMPRESSION) +.setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { +val serialized = serializer.newInstance().serialize(obj) +val bytes = new Array[Byte](serialized.remaining()) +serialized.get(bytes) +db.put(name.getBytes(UTF_8), bytes) + } + + override def unpersist(name: String): Unit = { +db.delete(name.getBytes(UTF_8)) + } + + override def read[T: ClassTag](name: String): Seq[T] = { +val result = new ArrayBuffer[T] +val iter = db.newIterator() +iter.seek(name.getBytes(UTF_8)) +while (iter.isValid && new String(iter.key()).startsWith(name)) { + result.append(serializer.newInstance().deserialize[T](ByteBuffer.wrap(iter.value( + iter.next() +} +iter.close() Review Comment: should we close iter in finally block? ## core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing,
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
yaooqinn commented on code in PR #44173: URL: https://github.com/apache/spark/pull/44173#discussion_r1414926099 ## core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala: ## @@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + val recoveryDir = conf.get(RECOVERY_DIRECTORY) Review Comment: It looks better to have a separate directory from the FileSystemRecoveryModeFactory -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on PR #44173: URL: https://github.com/apache/spark/pull/44173#issuecomment-1840044360 Could you review this PR, @yaooqinn ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]
dongjoon-hyun commented on PR #44173: URL: https://github.com/apache/spark/pull/44173#issuecomment-1840012871 Could you review this PR, @LuciferYang ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org