Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun closed pull request #44173: [SPARK-46258][CORE] Add 
`RocksDBPersistenceEngine`
URL: https://github.com/apache/spark/pull/44173


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840284937

   Thank you so much both of you, @LuciferYang and @yaooqinn . Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840281483

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415148979


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)
+if (serialized.hasArray) {
+  db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   > When do you think persist is invoked with the duplication?
   
   I guess it won't happen. I'm just asking because of the existing test 
`SPARK-46191: FileSystemPersistenceEngine.persist error message for the 
existing file`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840278347

   I updated the benchmark result with the final commit, too~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415147988


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)

Review Comment:
   To @LuciferYang , the benchmark is updated with your suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415142557


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)
+if (serialized.hasArray) {
+  db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   To @yaooqinn , `PersistenceEngine` is a secondary storage instead of lookup 
tables. In general, `Master` has a way to keep the uniqueness in the memory 
structure before reaching here.
   
   For example, `WorkerInfo` is under tracking of 
   
   
https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L76C9-L76C9
   ```
   private val idToWorker = new HashMap[String, WorkerInfo]
   ```
   
   In addition, Driver IDs and App IDs are generated by Master in the unique 
way by the patterns including timestamps.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415142557


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)
+if (serialized.hasArray) {
+  db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   To @yaooqinn , `PersistenceEngine` is a secondary storage instead of lookup 
tables. In general, `Master` has a way to keep the uniqueness in the memory 
structure before reaching here.
   
   For example, `WorkerInfo` is under tracking of 
   
   
https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L76C9-L76C9
   ```
   private val idToWorker = new HashMap[String, WorkerInfo]
   ```
   
   In addition, Driver IDs and App IDs are generated by Master in the unique 
way including timestamps.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415126869


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)
+if (serialized.hasArray) {
+  db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   When do you think `persist` is invoked with the duplication?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415108828


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Thank you, @yaooqinn . Ya, that's true. ZookeeperPersistenceEngine has a 
separate configuration namespace for that. That's one of the reasons why I 
don't want to add a new namespace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415095668


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)
+if (serialized.hasArray) {
+  db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   Does `db.put` behave the same as file creation in 
FileSystemPersistenceEngine and znode creation in ZooKeeperPersistenceEngine 
with duplicated `name`s?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840203815

   Thank you, @LuciferYang .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415066164


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   The thing is, it's pretty common to see that old naming conventions just 
don't cut it when adding new features.
   
   Technically, we have the `spark.deploy.zookeeper.dir` for ZK-mode although 
it's a znode. 
   
   I don't have a strong opinion for a new configuration here as it's trivial.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840187474

   Thank you for pointing out that.
   > Maybe the documents also need to be updated in this pr?
   
   Actually, I thought about making a new doc PR for the following accumulated 
stuffs. I also want to add some recommendation for the user choices. Let me 
proceed them separately~
   ```
   [SPARK-46258][CORE] Add RocksDBPersistenceEngine
   [SPARK-46216][CORE] Improve `FileSystemPersistenceEngine` to support 
compressions
   [SPARK-46205][CORE] Improve `PersistenceEngine` performance with 
`KryoSerializer`
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415034531


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   No, it will be okay. For example, rocksdb content will be ignored because 
FILESYSTEM scans only the following `prefix` files. I guess RocksDB also do the 
same things for the FILESYSTEM's leftover.
   
   
https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala#L56-L78
   
   However, that kind of changes are not supposed to be there. For the 
following three configurations, the directory should be cleared.
   
   - spark.deploy.recoveryMode
   - spark.deploy.recoverySerializer
   - spark.deploy.recoveryCompressionCodec



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


LuciferYang commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840164897

   Maybe the documents also need to be updated in this pr?
   
   
https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/docs/spark-standalone.md?plain=1#L736


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414983766


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Thank you for review. @yaooqinn and @LuciferYang 
   
   I tried according to your advices but it seems to make the configuration 
namespace a little weird because this existing configuration is a general one, 
`spark.deploy.recoveryDirectory`.
   
   
https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala#L54
   
   If we introduce a new configuration for `RocksDBPersistenceEngine` like the 
following, the users will ask why we cannot use 
`spark.deploy.recoveryDirectory` for `ROCKSDB` mode. Here is the example. The 
AS-IS design aims to allow the users to switch a single configuration 
`recoveryMode`.
   
   **BEFORE**
   ```
   spark.deploy.recoveryMode=FILESYSTEM
   spark.deploy.recoveryDirectory=/opt/master
   ```
   
   **AFTER**
   ```
   spark.deploy.recoveryMode=ROCKSDB
   spark.deploy.recoveryDirectory=/opt/master
   ```
   
   If we add a new configuration, it would be the following where 
`spark.deploy.recoveryDirectory` is no-op.
   ```
   spark.deploy.recoveryMode=ROCKSDB
   spark.deploy.recoveryDirectory=/opt/master
   spark.deploy.recoveryRocksDBDirectory=/opt/db
   ```
   
   In addition, we assume that the users clear the location properly when they 
changes the settings like the following. 
   - `spark.deploy.recoverySerializer`
   - `spark.deploy.recoveryCompressionCodec`
   
   To sum up, if you guys don't mind, I'd like to use the existing general 
configuration name for this backend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415001683


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415000391


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414999619


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   From this perspective, I think reusing the configuration is also fine. But, 
I have another question: If the user first uses `FILESYSTEM` and then switches 
to `ROCKSDB`, if the user does not clean up `/opt/master` manually, will the 
residual data in the `/opt/master` directory cause the master to fail to start 
In this scenario?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414996134


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)

Review Comment:
   This one, right?
   ```scala
 override def persist(name: String, obj: Object): Unit = {
   val serialized = serializer.newInstance().serialize(obj)
   if (serialized.hasArray) {
 db.put(name.getBytes(UTF_8), serialized.array())
   } else {
 val bytes = new Array[Byte](serialized.remaining())
 serialized.get(bytes)
 db.put(name.getBytes(UTF_8), bytes)
   }
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414987963


##
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##
@@ -186,6 +186,10 @@ private[deploy] class Master(
 val fsFactory =
   new FileSystemRecoveryModeFactory(conf, serializer)
 (fsFactory.createPersistenceEngine(), 
fsFactory.createLeaderElectionAgent(this))
+  case "ROCKSDB" =>

Review Comment:
   Yes, it's a legacy behavior which `spark.deploy.recoveryMode` always assumes 
all upper cases from Spark 0.8.1.
   - NONE, ZOOKEEPER, FILESYSTEM, CUSTOM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414986088


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)
+val bytes = new Array[Byte](serialized.remaining())
+serialized.get(bytes)
+db.put(name.getBytes(UTF_8), bytes)
+  }
+
+  override def unpersist(name: String): Unit = {
+db.delete(name.getBytes(UTF_8))
+  }
+
+  override def read[T: ClassTag](name: String): Seq[T] = {
+val result = new ArrayBuffer[T]
+val iter = db.newIterator()
+iter.seek(name.getBytes(UTF_8))
+while (iter.isValid && new String(iter.key()).startsWith(name)) {
+  
result.append(serializer.newInstance().deserialize[T](ByteBuffer.wrap(iter.value(
+  iter.next()
+}
+iter.close()

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414985071


##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))

Review Comment:
   Thanks. Let me switch it to `private`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414983766


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Thank you for review. @yaooqinn and @LuciferYang 
   
   I tried according to your advices but it seems to make the configuration 
namespace a little weird because this existing configuration is a general one, 
`spark.deploy.recoveryDirectory`.
   
   
https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala#L54
   
   If we introduce a new configuration for `RocksDBPersistenceEngine` like the 
following, the users will ask why we cannot use 
`spark.deploy.recoveryDirectory` for `ROCKSDB` mode. Here is the example. The 
AS-IS design aims to allow the users to switch a single configuration 
`recoveryMode`.
   
   **BEFORE**
   ```
   spark.deploy.recoveryMode=FILESYSTEM
   spark.deploy.recoveryDirectory=/opt/master
   ```
   
   **AFTER**
   ```
   spark.deploy.recoveryMode=ROCKSDB
   spark.deploy.recoveryDirectory=/opt/master
   ```
   
   If we add a new configuration, it would be the following where 
`spark.deploy.recoveryDirectory` is no-op.
   ```
   spark.deploy.recoveryMode=ROCKSDB
   spark.deploy.recoveryDirectory=/opt/master
   spark.deploy.recoveryRocksDBDirectory=/opt/db
   ```
   
   In addition, we assume that the users clear the location properly when they 
changes the setting like the following. 
   - `spark.deploy.recoverySerializer`
   - `spark.deploy.recoveryCompressionCodec`
   
   To sum up, if you guys don't mind, I'd like to use the existing general 
configuration name for this backend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414940118


##
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##
@@ -186,6 +186,10 @@ private[deploy] class Master(
 val fsFactory =
   new FileSystemRecoveryModeFactory(conf, serializer)
 (fsFactory.createPersistenceEngine(), 
fsFactory.createLeaderElectionAgent(this))
+  case "ROCKSDB" =>

Review Comment:
   Is this configuration item required to be `ROCKSDB`? Is it not allowed to 
configure it as `rocksdb` or `Rocksdb` now? (Perhaps this is a legacy issue?)



##
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##
@@ -186,6 +186,10 @@ private[deploy] class Master(
 val fsFactory =
   new FileSystemRecoveryModeFactory(conf, serializer)
 (fsFactory.createPersistenceEngine(), 
fsFactory.createLeaderElectionAgent(this))
+  case "ROCKSDB" =>

Review Comment:
   Is this configuration item required to be `ROCKSDB`? Is it not allowed to 
configure it as `rocksdb` or `Rocksdb` now? (Perhaps this is a legacy issue?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414938380


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414916176


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Although this may be to maintain consistency with the style of 
`FileSystemRecoveryModeFactory`, it may be more suitable as a local variable 
within the `createPersistenceEngine()` method.



##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+val dir: String,
+val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+.setFilterPolicy(new BloomFilter(10.0D, false))
+.setEnableIndexCompression(false)
+.setIndexBlockRestartInterval(8)
+.setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+.setCreateIfMissing(true)
+.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+.setCompressionType(CompressionType.LZ4_COMPRESSION)
+.setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+val serialized = serializer.newInstance().serialize(obj)
+val bytes = new Array[Byte](serialized.remaining())
+serialized.get(bytes)
+db.put(name.getBytes(UTF_8), bytes)
+  }
+
+  override def unpersist(name: String): Unit = {
+db.delete(name.getBytes(UTF_8))
+  }
+
+  override def read[T: ClassTag](name: String): Seq[T] = {
+val result = new ArrayBuffer[T]
+val iter = db.newIterator()
+iter.seek(name.getBytes(UTF_8))
+while (iter.isValid && new String(iter.key()).startsWith(name)) {
+  
result.append(serializer.newInstance().deserialize[T](ByteBuffer.wrap(iter.value(
+  iter.next()
+}
+iter.close()

Review Comment:
   should we close iter in finally block?



##
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, 

Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


yaooqinn commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414926099


##
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: 
Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   It looks better to have a separate directory from the 
FileSystemRecoveryModeFactory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840044360

   Could you review this PR, @yaooqinn ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840012871

   Could you review this PR, @LuciferYang ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org