Repository: samza
Updated Branches:
  refs/heads/master c6c10d31e -> 1c7e4d7aa


SAMZA-984; Upgraded RocksDB version to 5.0.1 and added configuration for 
managing RocksDB logging.

Author: Prateek Maheshwari <pmahe...@linkedin.com>

Reviewers: Jake Maes <jacob.m...@gmail.com>, Jagadish <jagadish1...@gmail.com>

Closes #46 from prateekm/rocksdb-upgrade


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c7e4d7a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c7e4d7a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c7e4d7a

Branch: refs/heads/master
Commit: 1c7e4d7aaeb4b5036034c9182fa0259262bcda8e
Parents: c6c10d3
Author: Prateek Maheshwari <pmahe...@linkedin.com>
Authored: Mon Jan 30 14:58:50 2017 -0800
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Mon Jan 30 14:58:50 2017 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 15 ++++
 gradle/dependency-versions.gradle               |  2 +-
 .../samza/storage/kv/RocksDbOptionsHelper.java  | 83 +++++++++++---------
 .../samza/storage/kv/RocksDbKeyValueStore.scala | 10 +--
 4 files changed, 69 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 7bac935..a26bc43 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1422,6 +1422,21 @@
                     </td>
                 </tr>
 
+                <tr>
+                    <td class="property" 
id="stores-rocksdb-log-file-size">stores.<span 
class="store">store-name</span>.<br>rocksdb.max.log.file.size.bytes</td>
+                    <td class="default">67108864</td>
+                    <td class="description">
+                        The maximum size in bytes of the RocksDB LOG file 
before it is rotated.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" 
id="stores-rocksdb-num-log-files">stores.<span 
class="store">store-name</span>.<br>rocksdb.keep.log.file.num</td>
+                    <td class="default">2</td>
+                    <td class="description">
+                        The number of RocksDB LOG files (including rotated 
LOG.old.* files) to keep.
+                    </td>
+                </tr>
 
                 <tr>
                     <th colspan="3" class="section" id="cluster-manager">

http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 976a49c..db59672 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -30,7 +30,7 @@
   metricsVersion = "2.2.0"
   kafkaVersion = "0.10.0.1"
   commonsHttpClientVersion = "3.1"
-  rocksdbVersion = "3.13.1"
+  rocksdbVersion = "5.0.1"
   yarnVersion = "2.6.1"
   slf4jVersion = "1.6.2"
   log4jVersion = "1.2.17"

http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index d4f765c..9b8f44b 100644
--- 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -34,6 +34,13 @@ import org.slf4j.LoggerFactory;
 public class RocksDbOptionsHelper {
   private static final Logger log = 
LoggerFactory.getLogger(RocksDbOptionsHelper.class);
 
+  private static final String ROCKSDB_COMPRESSION = "rocksdb.compression";
+  private static final String ROCKSDB_BLOCK_SIZE_BYTES = 
"rocksdb.block.size.bytes";
+  private static final String ROCKSDB_COMPACTION_STYLE = 
"rocksdb.compaction.style";
+  private static final String ROCKSDB_NUM_WRITE_BUFFERS = 
"rocksdb.num.write.buffers";
+  private static final String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = 
"rocksdb.max.log.file.size.bytes";
+  private static final String ROCKSDB_KEEP_LOG_FILE_NUM = 
"rocksdb.keep.log.file.num";
+
   public static Options options(Config storeConfig, SamzaContainerContext 
containerContext) {
     Options options = new Options();
     Long writeBufSize = 
storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024);
@@ -42,59 +49,65 @@ public class RocksDbOptionsHelper {
     options.setWriteBufferSize((int) (writeBufSize / numTasks));
 
     CompressionType compressionType = CompressionType.SNAPPY_COMPRESSION;
-    String compressionInConfig = storeConfig.get("rocksdb.compression", 
"snappy");
+    String compressionInConfig = storeConfig.get(ROCKSDB_COMPRESSION, 
"snappy");
     switch (compressionInConfig) {
-    case "snappy":
-      compressionType = CompressionType.SNAPPY_COMPRESSION;
-      break;
-    case "bzip2":
-      compressionType = CompressionType.BZLIB2_COMPRESSION;
-      break;
-    case "zlib":
-      compressionType = CompressionType.ZLIB_COMPRESSION;
-      break;
-    case "lz4":
-      compressionType = CompressionType.LZ4_COMPRESSION;
-      break;
-    case "lz4hc":
-      compressionType = CompressionType.LZ4HC_COMPRESSION;
-      break;
-    case "none":
-      compressionType = CompressionType.NO_COMPRESSION;
-      break;
-    default:
-      log.warn("Unknown rocksdb.compression codec " + compressionInConfig + ", 
overwriting to Snappy");
+      case "snappy":
+        compressionType = CompressionType.SNAPPY_COMPRESSION;
+        break;
+      case "bzip2":
+        compressionType = CompressionType.BZLIB2_COMPRESSION;
+        break;
+      case "zlib":
+        compressionType = CompressionType.ZLIB_COMPRESSION;
+        break;
+      case "lz4":
+        compressionType = CompressionType.LZ4_COMPRESSION;
+        break;
+      case "lz4hc":
+        compressionType = CompressionType.LZ4HC_COMPRESSION;
+        break;
+      case "none":
+        compressionType = CompressionType.NO_COMPRESSION;
+        break;
+      default:
+        log.warn("Unknown rocksdb.compression codec " + compressionInConfig +
+            ", overwriting to " + compressionType.name());
     }
     options.setCompressionType(compressionType);
 
     Long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 
1024 * 1024L);
     Long cacheSizePerContainer = cacheSize / numTasks;
-    int blockSize = storeConfig.getInt("rocksdb.block.size.bytes", 4096);
+
+    int blockSize = storeConfig.getInt(ROCKSDB_BLOCK_SIZE_BYTES, 4096);
     BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
     
tableOptions.setBlockCacheSize(cacheSizePerContainer).setBlockSize(blockSize);
     options.setTableFormatConfig(tableOptions);
 
     CompactionStyle compactionStyle = CompactionStyle.UNIVERSAL;
-    String compactionStyleInConfig = 
storeConfig.get("rocksdb.compaction.style", "universal");
+    String compactionStyleInConfig = storeConfig.get(ROCKSDB_COMPACTION_STYLE, 
"universal");
     switch (compactionStyleInConfig) {
-    case "universal":
-      compactionStyle = CompactionStyle.UNIVERSAL;
-      break;
-    case "fifo":
-      compactionStyle = CompactionStyle.FIFO;
-      break;
-    case "level":
-      compactionStyle = CompactionStyle.LEVEL;
-      break;
-    default:
-      log.warn("Unknown rocksdb.compactionStyle " + compactionStyleInConfig + 
", overwriting to universal");
+      case "universal":
+        compactionStyle = CompactionStyle.UNIVERSAL;
+        break;
+      case "fifo":
+        compactionStyle = CompactionStyle.FIFO;
+        break;
+      case "level":
+        compactionStyle = CompactionStyle.LEVEL;
+        break;
+      default:
+        log.warn("Unknown rocksdb.compaction.style " + compactionStyleInConfig 
+
+            ", overwriting to " + compactionStyle.name());
     }
     options.setCompactionStyle(compactionStyle);
 
-    
options.setMaxWriteBufferNumber(storeConfig.getInt("rocksdb.num.write.buffers", 
3));
+    
options.setMaxWriteBufferNumber(storeConfig.getInt(ROCKSDB_NUM_WRITE_BUFFERS, 
3));
     options.setCreateIfMissing(true);
     options.setErrorIfExists(false);
 
+    
options.setMaxLogFileSize(storeConfig.getLong(ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, 
64 * 1024 * 1024L));
+    options.setKeepLogFileNum(storeConfig.getLong(ROCKSDB_KEEP_LOG_FILE_NUM, 
2));
+
     return options;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 73b89f7..5112ac6 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -139,7 +139,7 @@ class RocksDbKeyValueStore(
     metrics.puts.inc
     require(key != null, "Null key not allowed.")
     if (value == null) {
-      db.remove(writeOptions, key)
+      db.delete(writeOptions, key)
     } else {
       metrics.bytesWritten.inc(key.size + value.size)
       db.put(writeOptions, key, value)
@@ -156,7 +156,7 @@ class RocksDbKeyValueStore(
       val curr = iter.next()
       if (curr.getValue == null) {
         deletes += 1
-        db.remove(writeOptions, curr.getKey)
+        db.delete(writeOptions, curr.getKey)
       } else {
         val key = curr.getKey
         val value = curr.getValue
@@ -204,13 +204,13 @@ class RocksDbKeyValueStore(
 
   class RocksDbIterator(iter: RocksIterator) extends 
KeyValueIterator[Array[Byte], Array[Byte]] {
     private var open = true
-    private var firstValueAccessed = false;
+    private var firstValueAccessed = false
     def close() = {
       open = false
-      iter.dispose()
+      iter.close()
     }
 
-    def remove() = throw new UnsupportedOperationException("RocksDB iterator 
doesn't support remove");
+    def remove() = throw new UnsupportedOperationException("RocksDB iterator 
doesn't support remove")
 
     def hasNext() = iter.isValid
 

Reply via email to