This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b4f784d Revert "KUDU-3371 [fs] Use RocksDB to store LBM metadata"
86b4f784d is described below

commit 86b4f784d74b8746d1da5563c433702fa1f2ed39
Author: Marton Greber <greber...@gmail.com>
AuthorDate: Fri Feb 9 18:09:20 2024 +0000

    Revert "KUDU-3371 [fs] Use RocksDB to store LBM metadata"
    
    This reverts commit 58d2338de8d586106ea29cfa281ee653392a9145.
    
    Reason for revert: constant Jenkins failures for the last week+.
    
    Change-Id: I792b4b9d775e60a301a01d9af81d7a0955e4ce2c
    Reviewed-on: http://gerrit.cloudera.org:8080/21027
    Reviewed-by: Zoltan Chovan <zcho...@cloudera.com>
    Reviewed-by: Marton Greber <greber...@gmail.com>
    Tested-by: Marton Greber <greber...@gmail.com>
---
 src/kudu/benchmarks/CMakeLists.txt              |   8 +-
 src/kudu/client/CMakeLists.txt                  |   3 +-
 src/kudu/consensus/CMakeLists.txt               |   1 -
 src/kudu/fs/CMakeLists.txt                      |   5 +-
 src/kudu/fs/block_manager-stress-test.cc        |  15 +-
 src/kudu/fs/block_manager-test.cc               | 126 ++----
 src/kudu/fs/block_manager.h                     |   2 +-
 src/kudu/fs/data_dirs.cc                        |   7 +-
 src/kudu/fs/dir_manager.cc                      | 141 ------
 src/kudu/fs/dir_manager.h                       |  59 +--
 src/kudu/fs/dir_util.cc                         |   4 +-
 src/kudu/fs/file_block_manager.h                |   2 -
 src/kudu/fs/fs_manager-test.cc                  |  39 +-
 src/kudu/fs/fs_manager.cc                       |  16 +-
 src/kudu/fs/fs_manager.h                        |   3 -
 src/kudu/fs/fs_report.cc                        |  31 --
 src/kudu/fs/fs_report.h                         |  21 -
 src/kudu/fs/log_block_manager-test-util.cc      | 230 ++--------
 src/kudu/fs/log_block_manager-test-util.h       |  32 +-
 src/kudu/fs/log_block_manager-test.cc           | 499 +++-------------------
 src/kudu/fs/log_block_manager.cc                | 542 +-----------------------
 src/kudu/fs/log_block_manager.h                 |  94 +---
 src/kudu/integration-tests/CMakeLists.txt       |   2 +-
 src/kudu/integration-tests/dense_node-itest.cc  |  19 +-
 src/kudu/integration-tests/ts_recovery-itest.cc |  18 +-
 src/kudu/server/CMakeLists.txt                  |   4 +-
 src/kudu/tablet/compaction-test.cc              |   2 +-
 src/kudu/tools/CMakeLists.txt                   |   1 -
 src/kudu/tools/kudu-tool-test.cc                |  15 -
 src/kudu/tserver/tablet_server-test.cc          |  13 +-
 src/kudu/util/CMakeLists.txt                    |   7 +-
 thirdparty/build-definitions.sh                 |   6 +-
 32 files changed, 185 insertions(+), 1782 deletions(-)

diff --git a/src/kudu/benchmarks/CMakeLists.txt 
b/src/kudu/benchmarks/CMakeLists.txt
index 73b54462c..6a17d4727 100644
--- a/src/kudu/benchmarks/CMakeLists.txt
+++ b/src/kudu/benchmarks/CMakeLists.txt
@@ -33,15 +33,13 @@ target_link_libraries(tpch
 add_executable(tpch1 tpch/tpch1.cc)
 target_link_libraries(tpch1
   ${KUDU_MIN_TEST_LIBS}
-  tpch
-  rocksdb)
+  tpch)
 
 # tpch_real_world
 add_executable(tpch_real_world tpch/tpch_real_world.cc)
 target_link_libraries(tpch_real_world
   ${KUDU_MIN_TEST_LIBS}
-  tpch
-  rocksdb)
+  tpch)
 
 # rle
 add_executable(rle rle.cc)
@@ -61,5 +59,5 @@ endif()
 # Unit tests
 #######################################
 
-SET_KUDU_TEST_LINK_LIBS(tpch rocksdb)
+SET_KUDU_TEST_LINK_LIBS(tpch)
 ADD_KUDU_TEST(tpch/rpc_line_item_dao-test)
diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index bb5479e13..cecaf6546 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -280,8 +280,7 @@ endif()
 SET_KUDU_TEST_LINK_LIBS(
   itest_util
   kudu_client
-  mini_cluster
-  rocksdb)
+  mini_cluster)
 ADD_KUDU_TEST(client-test NUM_SHARDS 8 PROCESSORS 2
                           DATA_FILES ../scripts/first_argument.sh)
 ADD_KUDU_TEST(client-unittest)
diff --git a/src/kudu/consensus/CMakeLists.txt 
b/src/kudu/consensus/CMakeLists.txt
index 3fc36c388..353c7468a 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -126,7 +126,6 @@ SET_KUDU_TEST_LINK_LIBS(
   consensus
   tserver_proto
   cfile
-  rocksdb
   tablet
   kudu_util)
 
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index ef7d56b96..5482f3c42 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -43,8 +43,7 @@ target_link_libraries(kudu_fs
   fs_proto
   kudu_util
   gutil
-  ranger_kms_client
-  rocksdb)
+  ranger_kms_client)
 
 if(NOT NO_TESTS)
   add_library(kudu_fs_test_util
@@ -61,7 +60,7 @@ endif()
 # Unit tests
 #######################################
 
-SET_KUDU_TEST_LINK_LIBS(kudu_fs kudu_fs_test_util rocksdb lz4)
+SET_KUDU_TEST_LINK_LIBS(kudu_fs kudu_fs_test_util)
 ADD_KUDU_TEST(block_manager-test)
 ADD_KUDU_TEST(block_manager-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(data_dirs-test)
diff --git a/src/kudu/fs/block_manager-stress-test.cc 
b/src/kudu/fs/block_manager-stress-test.cc
index 14482e8fb..659234887 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -65,7 +65,6 @@
 DECLARE_bool(cache_force_single_shard);
 DECLARE_double(log_container_excess_space_before_cleanup_fraction);
 DECLARE_double(log_container_live_metadata_before_compact_ratio);
-DECLARE_string(block_manager);
 DECLARE_uint64(log_container_max_size);
 DECLARE_uint64(log_container_preallocate_bytes);
 
@@ -133,7 +132,6 @@ class BlockManagerStressTest : public KuduTest {
       total_blocks_read_(0),
       total_bytes_read_(0),
       total_blocks_deleted_(0) {
-    FLAGS_block_manager = T::name();
 
     // Increase the number of containers created.
     FLAGS_log_container_max_size = 1 * 1024 * 1024;
@@ -484,8 +482,8 @@ int 
BlockManagerStressTest<FileBlockManager>::GetMaxFdCount() const {
       FLAGS_num_reader_threads;
 }
 
-template <typename T>
-int BlockManagerStressTest<T>::GetMaxFdCount() const {
+template <>
+int BlockManagerStressTest<LogBlockManagerNativeMeta>::GetMaxFdCount() const {
   return FLAGS_max_open_files +
       // If all containers are full, each open block could theoretically
       // result in a new container, which is two files briefly outside the
@@ -498,9 +496,9 @@ void 
BlockManagerStressTest<FileBlockManager>::InjectNonFatalInconsistencies() {
   // Do nothing; the FBM has no repairable inconsistencies.
 }
 
-template <typename T>
-void BlockManagerStressTest<T>::InjectNonFatalInconsistencies() {
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), rand_seed_);
+template <>
+void 
BlockManagerStressTest<LogBlockManagerNativeMeta>::InjectNonFatalInconsistencies()
 {
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
rand_seed_);
   ASSERT_OK(corruptor->Init());
 
   for (int i = 0; i < FLAGS_num_inconsistencies; i++) {
@@ -510,8 +508,7 @@ void 
BlockManagerStressTest<T>::InjectNonFatalInconsistencies() {
 
 // What kinds of BlockManagers are supported?
 #if defined(__linux__)
-typedef ::testing::Types<FileBlockManager, LogBlockManagerNativeMeta, 
LogBlockManagerRdbMeta>
-    BlockManagers;
+typedef ::testing::Types<FileBlockManager, LogBlockManagerNativeMeta> 
BlockManagers;
 #else
 typedef ::testing::Types<FileBlockManager> BlockManagers;
 #endif
diff --git a/src/kudu/fs/block_manager-test.cc 
b/src/kudu/fs/block_manager-test.cc
index 3cc55fb4c..724d3d927 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -41,7 +41,6 @@
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
-#include "kudu/fs/fs_manager.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/basictypes.h"
@@ -89,10 +88,8 @@ 
METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_reading);
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_writing);
 METRIC_DECLARE_counter(block_manager_total_writable_blocks);
 METRIC_DECLARE_counter(block_manager_total_readable_blocks);
-METRIC_DECLARE_counter(block_manager_total_blocks_deleted);
 METRIC_DECLARE_counter(block_manager_total_bytes_written);
 METRIC_DECLARE_counter(block_manager_total_bytes_read);
-METRIC_DECLARE_counter(log_block_manager_holes_punched);
 
 // Data directory metrics.
 METRIC_DECLARE_gauge_uint64(data_dirs_full);
@@ -100,7 +97,7 @@ METRIC_DECLARE_gauge_uint64(data_dirs_full);
 // The LogBlockManager is only supported on Linux, since it requires hole 
punching.
 #define RETURN_NOT_LOG_BLOCK_MANAGER() \
   do { \
-    if (FLAGS_block_manager != "log" && FLAGS_block_manager != "logr") { \
+    if (FLAGS_block_manager != "log") { \
       GTEST_SKIP() << "This platform does not use the log block manager by 
default. " \
                       "Skipping test."; \
     } \
@@ -112,7 +109,13 @@ namespace fs {
 static const char* kTestData = "test data";
 
 template<typename T>
-string block_manager_type() { return T::name(); }
+string block_manager_type();
+
+template<>
+string block_manager_type<FileBlockManager>() { return "file"; }
+
+template<>
+string block_manager_type<LogBlockManagerNativeMeta>() { return "log"; }
 
 template <typename T>
 class BlockManagerTest : public KuduTest {
@@ -120,9 +123,9 @@ class BlockManagerTest : public KuduTest {
   BlockManagerTest() :
       test_tablet_name_("test_tablet"),
       test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
-      file_cache_("test_cache", env_, 1, scoped_refptr<MetricEntity>()) {
-    FLAGS_block_manager = T::name();
-    bm_.reset(CreateBlockManager(scoped_refptr<MetricEntity>(), 
shared_ptr<MemTracker>()));
+      file_cache_("test_cache", env_, 1, scoped_refptr<MetricEntity>()),
+      bm_(CreateBlockManager(scoped_refptr<MetricEntity>(),
+                             shared_ptr<MemTracker>())) {
     CHECK_OK(file_cache_.Init());
   }
 
@@ -180,7 +183,7 @@ class BlockManagerTest : public KuduTest {
     opts.metric_entity = metric_entity;
     opts.parent_mem_tracker = parent_mem_tracker;
     error_manager_ = new FsErrorManager();
-    return new T(env_, dd_manager_, error_manager_,
+    return new T(env_, this->dd_manager_.get(), error_manager_,
                  &file_cache_, std::move(opts), fs::kDefaultTenantName);
   }
 
@@ -220,16 +223,11 @@ class BlockManagerTest : public KuduTest {
   void RunBlockDistributionTest(const vector<string>& paths);
 
   static Status CountFilesCb(int* num_files, Env::FileType type,
-                             const string& dirname,
-                             const string& basename) {
+                      const string& /*dirname*/,
+                      const string& basename) {
     if (basename == kInstanceMetadataFileName) {
       return Status::OK();
     }
-    // Ignore the 'rdb' directory which contains the RocksDB related files.
-    string parent_dir = *SplitPath(dirname).rbegin();
-    if (parent_dir == "rdb") {
-      return Status::OK();
-    }
     if (type == Env::FILE_TYPE) {
       *num_files += 1;
     }
@@ -237,7 +235,7 @@ class BlockManagerTest : public KuduTest {
   }
 
   // Utility function that counts the number of files within a directory
-  // hierarchy, ignoring '.', '..', 'rdb', and file 
'kInstanceMetadataFileName'.
+  // hierarchy, ignoring '.', '..', and file 'kInstanceMetadataFileName'.
   Status CountFiles(const string& root, int* num_files) {
     *num_files = 0;
     return env_->Walk(
@@ -305,8 +303,9 @@ void 
BlockManagerTest<FileBlockManager>::RunBlockDistributionTest(const vector<s
   }
 }
 
-template<typename T>
-void BlockManagerTest<T>::RunBlockDistributionTest(const vector<string>& 
paths) {
+template <>
+void BlockManagerTest<LogBlockManagerNativeMeta>::RunBlockDistributionTest(
+    const vector<string>& paths) {
   vector<int> files_in_each_path(paths.size());
   int num_blocks_per_dir = 30;
   // Spread across 1, then 3, then 5 data directories.
@@ -384,8 +383,8 @@ void 
BlockManagerTest<FileBlockManager>::RunMultipathTest(const vector<string>&
   ASSERT_EQ(20, num_blocks);
 }
 
-template<typename T>
-void BlockManagerTest<T>::RunMultipathTest(const vector<string>& paths) {
+template <>
+void BlockManagerTest<LogBlockManagerNativeMeta>::RunMultipathTest(const 
vector<string>& paths) {
   // Write (3 * numPaths * 2) blocks, in groups of (numPaths * 2). That should
   // yield two containers per path.
   CreateBlockOptions opts({ "multipath_test" });
@@ -403,7 +402,9 @@ void BlockManagerTest<T>::RunMultipathTest(const 
vector<string>& paths) {
   }
   ASSERT_OK(transaction->CommitCreatedBlocks());
 
-  // Verify the results.
+  // Verify the results. (numPaths * 2) containers were created, each
+  // consisting of 2 files. Thus, there should be a total of
+  // (numPaths * 4) files, ignoring '.', '..', and instance files.
   int sum = 0;
   for (const string& path : paths) {
     vector<string> children;
@@ -412,20 +413,7 @@ void BlockManagerTest<T>::RunMultipathTest(const 
vector<string>& paths) {
     ASSERT_OK(CountFiles(path, &files_in_path));
     sum += files_in_path;
   }
-
-  if (std::is_same<T, LogBlockManagerNativeMeta>::value) {
-    // (numPaths * 2) containers were created, each consisting of 2 files.
-    // Thus, there should be a total of (numPaths * 4) files, ignoring '.',
-    // '..', and instance files.
-    ASSERT_EQ(paths.size() * 4, sum);
-  } else {
-    // (numPaths * 2) containers were created, each consisting of 1 file.
-    // Thus, there should be a total of (numPaths * 2) files, ignoring '.',
-    // '..', 'rdb', and instance files.
-    bool is_logr = std::is_same<T, LogBlockManagerRdbMeta>::value;
-    ASSERT_TRUE(is_logr);
-    ASSERT_EQ(paths.size() * 2, sum);
-  }
+  ASSERT_EQ(paths.size() * 4, sum);
 }
 
 template <>
@@ -445,8 +433,8 @@ void 
BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
   ASSERT_EQ(tracker->consumption(), initial_mem);
 }
 
-template<typename T>
-void BlockManagerTest<T>::RunMemTrackerTest() {
+template <>
+void BlockManagerTest<LogBlockManagerNativeMeta>::RunMemTrackerTest() {
   shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test 
tracker");
   ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
                                tracker,
@@ -466,8 +454,7 @@ void BlockManagerTest<T>::RunMemTrackerTest() {
 
 // What kinds of BlockManagers are supported?
 #if defined(__linux__)
-typedef ::testing::Types<FileBlockManager, LogBlockManagerNativeMeta, 
LogBlockManagerRdbMeta>
-    BlockManagers;
+typedef ::testing::Types<FileBlockManager, LogBlockManagerNativeMeta> 
BlockManagers;
 #else
 typedef ::testing::Types<FileBlockManager> BlockManagers;
 #endif
@@ -1229,13 +1216,6 @@ TYPED_TEST(BlockManagerTest, 
ConcurrentCloseFinalizedWritableBlockTest) {
 }
 
 TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
-  MetricRegistry registry;
-  scoped_refptr<MetricEntity> entity = 
METRIC_ENTITY_server.Instantiate(&registry, "test");
-  ASSERT_OK(this->ReopenBlockManager(entity,
-                                     shared_ptr<MemTracker>(),
-                                     { this->test_dir_ },
-                                     false /* create */));
-
   // Create a BlockCreationTransaction. In this transaction,
   // create some blocks and commit the writes all together.
   const string kTestData = "test data";
@@ -1276,26 +1256,6 @@ TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
   }
   vector<BlockId> deleted_blocks;
   ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted_blocks));
-  deletion_transaction.reset();
-  this->dd_manager_->WaitOnClosures();
-
-  // Metric 'log_block_manager_holes_punched' is only updated when 
--block_manager
-  // is "log" or "logr".
-  int64_t holes_punched = 0;
-  if (FsManager::IsLogType(FLAGS_block_manager)) {
-    // Continuous blocks will be hole punched together, so 
'log_block_manager_holes_punched' is
-    // less or equal to the size of 'deleted_blocks', but it must be greater 
than 0.
-    holes_punched = down_cast<Counter *>(
-        
entity->FindOrNull(METRIC_log_block_manager_holes_punched).get())->value();
-    ASSERT_GE(deleted_blocks.size(), holes_punched);
-    ASSERT_GT(holes_punched, 0);
-  }
-
-  // Metric 'block_manager_total_blocks_deleted' is equal to the size of 
'deleted_blocks'.
-  int64_t total_blocks_deleted = down_cast<Counter*>(
-      
entity->FindOrNull(METRIC_block_manager_total_blocks_deleted).get())->value();
-  ASSERT_EQ(deleted_blocks.size(), total_blocks_deleted);
-
   for (const auto& block : deleted_blocks) {
     created_blocks.erase(std::remove(created_blocks.begin(), 
created_blocks.end(), block),
                          created_blocks.end());
@@ -1306,38 +1266,16 @@ TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
   // in order to test that the first failure properly propagates.
   FLAGS_crash_on_eio = false;
   FLAGS_env_inject_eio = 1.0;
-  deletion_transaction = this->bm_->NewDeletionTransaction();
   for (const auto& block : created_blocks) {
     deletion_transaction->AddDeletedBlock(block);
   }
   deleted_blocks.clear();
   Status s = deletion_transaction->CommitDeletedBlocks(&deleted_blocks);
-  deletion_transaction.reset();
-  this->dd_manager_->WaitOnClosures();
-
-  if (FsManager::IsLogType(FLAGS_block_manager)) {
-    // Metric 'log_block_manager_holes_punched' is not changed because IO 
error injected.
-    ASSERT_EQ(holes_punched, down_cast<Counter*>(
-        
entity->FindOrNull(METRIC_log_block_manager_holes_punched).get())->value());
-  }
-
-  // Because the "logr" block_manager uses RocksDB to store metadata, and the 
IO error injected
-  // into util/env_posix.cc does not affect RocksDB, updating the metadata 
stored in the
-  // logr-based block manager succeeds without any errors.
-  if (FLAGS_block_manager == "logr") {
-    ASSERT_OK(s);
-    ASSERT_EQ(created_blocks.size(), deleted_blocks.size());
-    ASSERT_EQ(total_blocks_deleted + deleted_blocks.size(), 
down_cast<Counter*>(
-        
entity->FindOrNull(METRIC_block_manager_total_blocks_deleted).get())->value());
-  } else {
-    ASSERT_TRUE(s.IsIOError());
-    ASSERT_TRUE(deleted_blocks.empty());
-    ASSERT_EQ(total_blocks_deleted, down_cast<Counter*>(
-        
entity->FindOrNull(METRIC_block_manager_total_blocks_deleted).get())->value());
-    ASSERT_STR_MATCHES(s.ToString(),
-                       Substitute("only 0/$0 blocks deleted, first failure",
-                                  created_blocks.size()));
-  }
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_TRUE(deleted_blocks.empty());
+  ASSERT_STR_MATCHES(s.ToString(),
+                     Substitute("only 0/$0 blocks deleted, first failure",
+                                created_blocks.size()));
 }
 
 } // namespace fs
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index 380e5b6c4..264d22507 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -200,7 +200,7 @@ class BlockManager : public 
RefCountedThreadSafe<BlockManager> {
   // Lists the available block manager types.
   static std::vector<std::string> block_manager_types() {
 #if defined(__linux__)
-    return { "file", "log", "logr" };
+    return { "file", "log" };
 #else
     return { "file" };
 #endif
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index 2198a6c9b..20a73ddf0 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -39,7 +39,6 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/dir_util.h"
 #include "kudu/fs/fs.pb.h"
-#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
@@ -201,10 +200,6 @@ std::unique_ptr<Dir> DataDirManager::CreateNewDir(
     Env* env, DirMetrics* metrics, FsType fs_type,
     std::string dir, std::unique_ptr<DirInstanceMetadataFile> metadata_file,
     std::unique_ptr<ThreadPool> pool) {
-  if (FLAGS_block_manager == "logr") {
-    return std::make_unique<RdbDir>(env, metrics, fs_type, std::move(dir),
-                                    std::move(metadata_file), std::move(pool));
-  }
   return std::make_unique<Dir>(env, metrics, fs_type, std::move(dir),
                                std::move(metadata_file), std::move(pool));
 }
@@ -275,7 +270,7 @@ int DataDirManager::max_dirs() const {
 }
 
 Status DataDirManager::PopulateDirectoryMaps(const vector<unique_ptr<Dir>>& 
dirs) {
-  if (FsManager::IsLogType(opts_.dir_type)) {
+  if (opts_.dir_type == "log") {
     return DirManager::PopulateDirectoryMaps(dirs);
   }
   DCHECK_EQ("file", opts_.dir_type);
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index 767ffa6cd..a0f489118 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -32,12 +32,6 @@
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
-#include <rocksdb/cache.h>
-#include <rocksdb/filter_policy.h>
-#include <rocksdb/options.h>
-#include <rocksdb/slice_transform.h>
-#include <rocksdb/status.h>
-#include <rocksdb/table.h>
 
 #include "kudu/fs/dir_util.h"
 #include "kudu/fs/fs.pb.h"
@@ -45,7 +39,6 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/strings/util.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/oid_generator.h"
@@ -54,11 +47,9 @@
 #include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
-#include "kudu/util/test_util_prod.h"
 #include "kudu/util/threadpool.h"
 
 using std::set;
-using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::unordered_map;
@@ -68,33 +59,9 @@ using strings::Substitute;
 
 DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
 DECLARE_int64(fs_data_dirs_reserved_bytes);
-DECLARE_string(block_manager);
 
 namespace kudu {
 
-Status FromRdbStatus(const rocksdb::Status& s) {
-  switch (s.code()) {
-    case rocksdb::Status::kOk:
-      return Status::OK();
-    case rocksdb::Status::kNotFound:
-      return Status::NotFound(s.ToString());
-    case rocksdb::Status::kCorruption:
-      return Status::Corruption(s.ToString());
-    case rocksdb::Status::kNotSupported:
-      return Status::NotSupported(s.ToString());
-    case rocksdb::Status::kInvalidArgument:
-      return Status::InvalidArgument(s.ToString());
-    case rocksdb::Status::kIOError:
-      return Status::IOError(s.ToString());
-    case rocksdb::Status::kIncomplete:
-      return Status::Incomplete(s.ToString());
-    case rocksdb::Status::kAborted:
-      return Status::Aborted(s.ToString());
-    default:
-      return Status::RuntimeError(s.ToString());
-  }
-}
-
 namespace {
 
 // Wrapper for env_util::DeleteTmpFilesRecursively that is suitable for 
parallel
@@ -204,105 +171,6 @@ int Dir::reserved_bytes() {
   return FLAGS_fs_data_dirs_reserved_bytes;
 }
 
-shared_ptr<rocksdb::Cache> RdbDir::s_block_cache_;
-RdbDir::RdbDir(Env* env, DirMetrics* metrics,
-               FsType fs_type,
-               string dir,
-               unique_ptr<DirInstanceMetadataFile> metadata_file,
-               unique_ptr<ThreadPool> pool)
-    : Dir(env, metrics, fs_type, std::move(dir), std::move(metadata_file), 
std::move(pool)) {}
-
-Status RdbDir::Prepare() {
-  DCHECK_STREQ(FLAGS_block_manager.c_str(), "logr");
-  if (db_) {
-    // Some unit tests (e.g. BlockManagerTest.PersistenceTest) reopen the 
block manager,
-    // 'db_' is possible to be non-nullptr.
-    // In non-test environments, 'db_' is always to be nullptr, log a fatal 
error if not.
-    DCHECK(IsGTest()) <<
-        Substitute("It's not allowed to reopen the RocksDB $0 except in 
tests", dir_);
-    return Status::OK();
-  }
-
-  // See the rocksdb::Options details:
-  // https://github.com/facebook/rocksdb/blob/main/include/rocksdb/options.h
-  rocksdb::Options opts;
-  // A RocksDB instance is created if it does not exist when opening the Dir.
-  // TODO(yingchun): We should distinguish creating new data directory and 
opening existing data
-  //                 directory, and set proper options to avoid mishaps.
-  //                 When creating new data directory, set 
opts.error_if_exists = true.
-  //                 When opening existing data directory, set 
opts.create_if_missing = false.
-  opts.create_if_missing = true;
-  // TODO(yingchun): parameterize more rocksDB options, including:
-  //  opts.use_fsync
-  //  opts.error_if_exists
-  //  opts.db_log_dir
-  //  opts.wal_dir
-  //  opts.max_log_file_size
-  //  opts.keep_log_file_num
-  //  opts.max_manifest_file_size
-  //  opts.max_background_jobs
-  //  opts.write_buffer_size
-  //  opts.level0_file_num_compaction_trigger
-  //  opts.max_write_buffer_number
-
-  static std::once_flag flag;
-  std::call_once(flag, [&]() {
-    // TODO(yingchun): parameterize the rocksdb block cache size.
-    s_block_cache_ = rocksdb::NewLRUCache(10 << 20);
-  });
-  rocksdb::BlockBasedTableOptions tbl_opts;
-  tbl_opts.block_cache = s_block_cache_;
-  tbl_opts.whole_key_filtering = false;
-  // TODO(yingchun): parameterize these options.
-  tbl_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(9.9));
-  opts.table_factory.reset(NewBlockBasedTableFactory(tbl_opts));
-  // Take advantage of Prefix-Seek, see 
https://github.com/facebook/rocksdb/wiki/Prefix-Seek.
-  
opts.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(ObjectIdGenerator::IdLength()));
-  opts.memtable_prefix_bloom_size_ratio = 0.1;
-
-  rdb_dir_ = JoinPathSegments(dir_, "rdb");
-  rocksdb::DB* db_temp = nullptr;
-  rocksdb::Status rdb_s = rocksdb::DB::Open(opts, *rdb_dir_, &db_temp);
-  RETURN_NOT_OK_PREPEND(FromRdbStatus(rdb_s),
-                        Substitute("open RocksDB failed, path: $0", 
*rdb_dir_));
-  db_.reset(db_temp);
-  return Status::OK();
-}
-
-void RdbDir::Shutdown() {
-  if (is_shutdown_) {
-    return;
-  }
-
-  // Shut down the thread pool before closing RocksDB to make sure there 
aren't any in-flight
-  // write operations.
-  WaitOnClosures();
-  pool_->Shutdown();
-
-  // The 'db_' is nullptr if the Dir open failed.
-  if (db_) {
-    // Flushing memtable before closing RocksDB reduces bootstrapping time 
upon next start-up.
-    // Call Flush() rather than WaitForCompact(), because it's enough to wait 
for the flush jobs
-    // to finish, compactions jobs may take more time, which results in longer 
times to shut down
-    // a server.
-    rocksdb::FlushOptions options;
-    options.wait = true;
-    WARN_NOT_OK(FromRdbStatus(db_->Flush(options)),
-                Substitute("Flush RocksDB failed, path: $0", *rdb_dir_));
-    WARN_NOT_OK(FromRdbStatus(db_->Close()),
-                Substitute("Closed RocksDB with error, path: $0", *rdb_dir_));
-    db_.reset();
-  }
-
-  is_shutdown_ = true;
-}
-
-rocksdb::DB* RdbDir::rdb() {
-  DCHECK_STREQ(FLAGS_block_manager.c_str(), "logr");
-  DCHECK(db_);
-  return db_.get();
-}
-
 DirManagerOptions::DirManagerOptions(string dir_type,
                                      string tid)
     : dir_type(std::move(dir_type)),
@@ -763,15 +631,6 @@ Dir* DirManager::FindDirByUuidIndex(int uuid_idx) const {
   return FindPtrOrNull(dir_by_uuid_idx_, uuid_idx);
 }
 
-Dir* DirManager::FindDirByFullPathForTests(const std::string& full_path) const 
{
-  for (const auto& d : dirs_) {
-    if (HasPrefixString(full_path, d->dir())) {
-      return d.get();
-    }
-  }
-  return nullptr;
-}
-
 bool DirManager::FindUuidIndexByDir(Dir* dir, int* uuid_idx) const {
   return FindCopy(uuid_idx_by_dir_, dir, uuid_idx);
 }
diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h
index 85cca3be5..a6f14ff9d 100644
--- a/src/kudu/fs/dir_manager.h
+++ b/src/kudu/fs/dir_manager.h
@@ -22,15 +22,11 @@
 #include <functional>
 #include <memory>
 #include <mutex>
-#include <optional>
 #include <set>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
-#include <gtest/gtest_prod.h>
-#include <rocksdb/db.h>
-
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/locks.h"
@@ -39,18 +35,8 @@
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
 
-namespace rocksdb {
-class Cache;
-class Status;
-} // namespace rocksdb
-
 namespace kudu {
 
-// Convert a rocksdb::Status to a kudu::Status.
-// NOTE: Keep it here rather than util/status.h because the latter is
-// an exported header, but FromRdbStatus() is used only internally.
-Status FromRdbStatus(const rocksdb::Status& s);
-
 class Env;
 class ThreadPool;
 
@@ -118,12 +104,9 @@ class Dir {
       std::unique_ptr<ThreadPool> pool);
   virtual ~Dir();
 
-  // Some preparatory work before opening the directory.
-  virtual Status Prepare() { return Status::OK(); }
-
   // Shuts down this dir's thread pool, waiting for any closures submitted via
   // ExecClosure() to finish first.
-  virtual void Shutdown();
+  void Shutdown();
 
   // Run a task on this dir's thread pool.
   //
@@ -176,7 +159,7 @@ class Dir {
   // value of -1 means 1% of the disk space in a directory will be reserved.
   static int reserved_bytes();
 
- protected:
+ private:
   Env* env_;
   DirMetrics* metrics_;
   const FsType fs_type_;
@@ -197,37 +180,6 @@ class Dir {
   DISALLOW_COPY_AND_ASSIGN(Dir);
 };
 
-// Representation of a directory for LogBlockManagerRdbMeta specially.
-class RdbDir: public Dir {
- public:
-  RdbDir(Env* env,
-         DirMetrics* metrics,
-         FsType fs_type,
-         std::string dir,
-         std::unique_ptr<DirInstanceMetadataFile> metadata_file,
-         std::unique_ptr<ThreadPool> pool);
-
-  // Initialize the RocksDB instance for the directory.
-  //
-  // Returns Status::OK() if prepared successfully, otherwise returns non-OK.
-  Status Prepare() override;
-
-  // Similar to Dir::Shutdown(), but close the RocksDB instance additionally.
-  void Shutdown() override;
-
-  rocksdb::DB* rdb();
-
- private:
-  // The shared RocksDB instance for this directory.
-  std::unique_ptr<rocksdb::DB> db_;
-  // The RocksDB full path.
-  std::optional<std::string> rdb_dir_;
-  // The block cache shared by all RocksDB instances in all data directories.
-  static std::shared_ptr<rocksdb::Cache> s_block_cache_;
-
-  DISALLOW_COPY_AND_ASSIGN(RdbDir);
-};
-
 struct DirManagerOptions {
  public:
   // The type of directory this directory manager should support.
@@ -352,8 +304,6 @@ class DirManager {
                                             std::unique_ptr<ThreadPool> pool) 
= 0;
 
  protected:
-  FRIEND_TEST(LogBlockManagerRdbMetaTest, TestHalfPresentContainer);
-
   // The name to be used by this directory manager for each sub-directory of
   // each directory root.
   virtual const char* dir_name() const = 0;
@@ -447,11 +397,6 @@ class DirManager {
       const std::vector<std::unique_ptr<DirInstanceMetadataFile>>& 
instances_to_update,
       const std::set<std::string>& new_all_uuids);
 
-  // Finds a directory by full path name, returning null if it can't be found.
-  //
-  // NOTE: Only for test purpose.
-  Dir* FindDirByFullPathForTests(const std::string& full_path) const;
-
   // The environment to be used for all directory operations.
   Env* env_;
 
diff --git a/src/kudu/fs/dir_util.cc b/src/kudu/fs/dir_util.cc
index 206e5b59f..a65a79bb0 100644
--- a/src/kudu/fs/dir_util.cc
+++ b/src/kudu/fs/dir_util.cc
@@ -14,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
 #include "kudu/fs/dir_util.h"
 
 #include <cstdint>
@@ -26,7 +25,6 @@
 #include <glog/logging.h>
 
 #include "kudu/fs/fs.pb.h"
-#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -170,7 +168,7 @@ Status DirInstanceMetadataFile::Create(const set<string>& 
all_uuids,
 
   // If we're initializing the log block manager, check that we support
   // hole-punching.
-  if (FsManager::IsLogType(dir_type_)) {
+  if (dir_type_ == "log") {
     RETURN_NOT_OK_FAIL_INSTANCE_PREPEND(CheckHolePunch(env_, dir_name),
                                         kHolePunchErrorMsg);
   }
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index bafee8e3d..81ab17561 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -71,8 +71,6 @@ struct BlockManagerMetrics;
 // The file-backed block manager.
 class FileBlockManager : public BlockManager {
  public:
-  static constexpr const char* const name() { return "file"; }
-
   // Note: all objects passed as pointers should remain alive for the lifetime
   // of the block manager.
   FileBlockManager(Env* env,
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 7da7803a2..4bebe106d 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -62,13 +62,11 @@
 #include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
-#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 using kudu::pb_util::ReadPBContainerFromPath;
 using kudu::pb_util::SecureDebugString;
-using std::make_tuple;
 using std::nullopt;
 using std::shared_ptr;
 using std::string;
@@ -218,23 +216,9 @@ class FsManagerTestBase : public KuduTest,
 };
 
 INSTANTIATE_TEST_SUITE_P(BlockManagerTypes, FsManagerTestBase,
-// TODO(yingchun): When --enable_multi_tenancy is set, the data directories 
are still shared by
-//  all tenants, which will cause some errors when --block_manager=logr. This 
will be fixed in the
-//  future as the TODO mentioned in [1]. We can enable all the following test 
cases when the TODO
-//  is addressed.
-//  
1.https://github.com/acelyc111/kudu/blob/master/src/kudu/fs/fs_manager.cc#L1190
-//
-//    ::testing::ValuesIn(BlockManager::block_manager_types()),
-//    ::testing::ValuesIn(kEncryptionType))
-    ::testing::Values(
-    make_tuple("file", kEncryptionType[0]),
-    make_tuple("file", kEncryptionType[1]),
-    make_tuple("file", kEncryptionType[2]),
-    make_tuple("log", kEncryptionType[0]),
-    make_tuple("log", kEncryptionType[1]),
-    make_tuple("log", kEncryptionType[2]),
-    make_tuple("logr", kEncryptionType[0]),
-    make_tuple("logr", kEncryptionType[1])));
+    ::testing::Combine(
+        ::testing::ValuesIn(BlockManager::block_manager_types()),
+        ::testing::ValuesIn(kEncryptionType)));
 
 TEST_P(FsManagerTestBase, TestBaseOperations) {
   fs_manager()->DumpFileSystemTree(std::cout, tenant_id());
@@ -513,7 +497,8 @@ TEST_P(FsManagerTestBase, TestMetadataDirInDataRoot) {
   // Now let's test adding data directories with metadata in the data root.
   // Adding data directories is not supported by the file block manager.
   if (FLAGS_block_manager == "file") {
-    GTEST_SKIP() << "Skipping the rest of test, file block manager not 
supported";
+    LOG(INFO) << "Skipping the rest of test, file block manager not supported";
+    return;
   }
 
   // Adding a data dir to the front of the FS root list (i.e. such that the
@@ -1244,20 +1229,12 @@ TEST_P(FsManagerTestBase, TestAddRemoveSpeculative) {
 }
 
 TEST_P(FsManagerTestBase, TestAddRemoveDataDirsFuzz) {
+  const int kNumAttempts = AllowSlowTests() ? 1000 : 100;
+
   if (FLAGS_block_manager == "file") {
     GTEST_SKIP() << "Skipping test, file block manager not supported";
   }
 
-#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
-  // When using a sanitizer, reduce the loop times to get a more stable result.
-  const int kNumAttempts = 50;
-#else
-  // In some situations, the tests would last too long time, so we reduce the 
loop times if not
-  // AllowSlowTests(). For example, when FLAGS_block_manager == "logr", opens 
a data directory will
-  // open a RocksDB instance, it consumes more time than that if 
FLAGS_block_manager == "log".
-  const int kNumAttempts = AllowSlowTests() ? 1000 : 50;
-#endif
-
   Random rng_(SeedRandom());
 
   FsManagerOpts fs_opts;
@@ -1286,7 +1263,7 @@ TEST_P(FsManagerTestBase, TestAddRemoveDataDirsFuzz) {
     }
 
     // Try to add or remove it with failure injection enabled.
-    SCOPED_LOG_TIMING(INFO, Substitute("$0ing $1", action_was_add ? "add" : 
"remov", fs_root));
+    LOG(INFO) << Substitute("$0ing $1", action_was_add ? "Add" : "Remov", 
fs_root);
     bool update_succeeded;
     {
       google::FlagSaver saver;
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 49e67de23..702ca2b1b 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -80,9 +80,8 @@ TAG_FLAG(enable_data_block_fsync, unsafe);
 
 #if defined(__linux__)
 DEFINE_string(block_manager, "log", "Which block manager to use for storage. "
-              "Valid options are 'file', 'log' and 'logr'. The file block "
-              "manager is not suitable for production use due to scaling "
-              "limitations.");
+              "Valid options are 'file' and 'log'. The file block manager is 
not suitable for "
+              "production use due to scaling limitations.");
 #else
 DEFINE_string(block_manager, "file", "Which block manager to use for storage. "
               "Only the file block manager is supported for non-Linux 
systems.");
@@ -177,7 +176,6 @@ using kudu::fs::FsErrorManager;
 using kudu::fs::FileBlockManager;
 using kudu::fs::FsReport;
 using kudu::fs::LogBlockManagerNativeMeta;
-using kudu::fs::LogBlockManagerRdbMeta;
 using kudu::fs::ReadableBlock;
 using kudu::fs::UpdateInstanceBehavior;
 using kudu::fs::WritableBlock;
@@ -409,10 +407,6 @@ scoped_refptr<BlockManager> 
FsManager::InitBlockManager(const string& tenant_id)
     block_manager.reset(new LogBlockManagerNativeMeta(
         GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
         opts_.file_cache, std::move(bm_opts), tenant_id));
-  } else if (opts_.block_manager_type == "logr") {
-    block_manager.reset(new LogBlockManagerRdbMeta(
-        GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
-        opts_.file_cache, std::move(bm_opts), tenant_id));
   } else {
     LOG(FATAL) << "Unknown block_manager_type: " << opts_.block_manager_type;
   }
@@ -617,7 +611,7 @@ Status FsManager::Open(FsReport* report, Timer* 
read_instance_metadata_files,
                                           
BlockManager::MergeReport::REQUIRED));
     if (read_data_directories) {
       read_data_directories->Stop();
-      if (opts_.metric_entity && 
FsManager::IsLogType(opts_.block_manager_type)) {
+      if (opts_.metric_entity && opts_.block_manager_type == "log") {
         
METRIC_log_block_manager_containers_processing_time_startup.Instantiate(opts_.metric_entity,
             (read_data_directories->TimeElapsed()).ToMilliseconds());
       }
@@ -796,10 +790,6 @@ void 
FsManager::UpdateMetadataFormatAndStampUnlock(InstanceMetadataPB* metadata)
   metadata->set_format_stamp(Substitute("Formatted at $0 on $1", time_str, 
hostname));
 }
 
-bool FsManager::IsLogType(const std::string& block_manager_type) {
-  return (block_manager_type == "log" || block_manager_type == "logr");
-}
-
 Status FsManager::AddTenantMetadata(const string& tenant_name,
                                     const string& tenant_id,
                                     const string& tenant_key,
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 4a18962b5..7abdb9fac 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -447,9 +447,6 @@ class FsManager {
     return meta_on_xfs_;
   }
 
-  // Return true if 'block_manager_type' represents a log-backed block manager.
-  static bool IsLogType(const std::string& block_manager_type);
-
  private:
   FRIEND_TEST(fs::FsManagerTestBase, TestDuplicatePaths);
   FRIEND_TEST(fs::FsManagerTestBase, TestEIOWhileRunningUpdateDirsTool);
diff --git a/src/kudu/fs/fs_report.cc b/src/kudu/fs/fs_report.cc
index b4f7986f0..e4ee60440 100644
--- a/src/kudu/fs/fs_report.cc
+++ b/src/kudu/fs/fs_report.cc
@@ -252,35 +252,6 @@ LBMPartialRecordCheck::Entry::Entry(string c, int64_t o)
       repaired(false) {
 }
 
-///////////////////////////////////////////////////////////////////////////////
-// LBMCorruptedRdbRecordCheck
-///////////////////////////////////////////////////////////////////////////////
-
-void LBMCorruptedRdbRecordCheck::MergeFrom(
-    const LBMCorruptedRdbRecordCheck& other) {
-  MERGE_ENTRIES_FROM(other);
-}
-
-string LBMCorruptedRdbRecordCheck::ToString() const {
-  // Aggregate interesting stats from all of the entries.
-  int64_t corrupted_records_repaired = 0;
-  for (const auto& pr : entries) {
-    if (pr.repaired) {
-      corrupted_records_repaired++;
-    }
-  }
-
-  return Substitute("Total corrupted LBM metadata records in RocksDB: "
-      "$0 ($1 repaired)\n",
-                    entries.size(), corrupted_records_repaired);
-}
-
-LBMCorruptedRdbRecordCheck::Entry::Entry(string c, string k)
-    : container(std::move(c)),
-      rocksdb_key(std::move(k)),
-      repaired(false) {
-}
-
 ///////////////////////////////////////////////////////////////////////////////
 // FsReport::Stats
 ///////////////////////////////////////////////////////////////////////////////
@@ -330,7 +301,6 @@ void FsReport::MergeFrom(const FsReport& other) {
   MERGE_ONE_CHECK(malformed_record_check);
   MERGE_ONE_CHECK(misaligned_block_check);
   MERGE_ONE_CHECK(partial_record_check);
-  MERGE_ONE_CHECK(corrupted_rdb_record_check);
 
 #undef MERGE_ONE_CHECK
 }
@@ -359,7 +329,6 @@ string FsReport::ToString() const {
   TOSTRING_ONE_CHECK(malformed_record_check, "malformed LBM records");
   TOSTRING_ONE_CHECK(misaligned_block_check, "misaligned LBM blocks");
   TOSTRING_ONE_CHECK(partial_record_check, "partial LBM records");
-  TOSTRING_ONE_CHECK(corrupted_rdb_record_check, "corrupted LBM rdb records");
 
 #undef TOSTRING_ONE_CHECK
   return s;
diff --git a/src/kudu/fs/fs_report.h b/src/kudu/fs/fs_report.h
index fc983d847..0ff43e674 100644
--- a/src/kudu/fs/fs_report.h
+++ b/src/kudu/fs/fs_report.h
@@ -173,26 +173,6 @@ struct LBMPartialRecordCheck {
   std::vector<Entry> entries;
 };
 
-// Checks for corrupted LBM metadata records in RocksDB.
-//
-// Error type: non-fatal and repairable (by removing the records from RocksDB).
-struct LBMCorruptedRdbRecordCheck {
-
-  // Merges the contents of another check into this one.
-  void MergeFrom(const LBMCorruptedRdbRecordCheck& other);
-
-  // Returns a multi-line string representation of this check.
-  std::string ToString() const;
-
-  struct Entry {
-    Entry(std::string c, std::string k);
-    std::string container;
-    std::string rocksdb_key;
-    bool repaired;
-  };
-  std::vector<Entry> entries;
-};
-
 // Results of a Kudu filesystem-wide check. The report contains general
 // statistics about the filesystem as well as a series of "checks" that
 // describe possible on-disk inconsistencies.
@@ -288,7 +268,6 @@ struct FsReport {
   std::optional<LBMMalformedRecordCheck> malformed_record_check;
   std::optional<LBMMisalignedBlockCheck> misaligned_block_check;
   std::optional<LBMPartialRecordCheck> partial_record_check;
-  std::optional<LBMCorruptedRdbRecordCheck> corrupted_rdb_record_check;
 };
 
 } // namespace fs
diff --git a/src/kudu/fs/log_block_manager-test-util.cc 
b/src/kudu/fs/log_block_manager-test-util.cc
index c2aa8bb89..79147b3d6 100644
--- a/src/kudu/fs/log_block_manager-test-util.cc
+++ b/src/kudu/fs/log_block_manager-test-util.cc
@@ -28,16 +28,10 @@
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
-#include <rocksdb/db.h>
-#include <rocksdb/options.h>
-#include <rocksdb/slice.h>
 
 #include "kudu/fs/block_id.h"
-#include "kudu/fs/data_dirs.h"
-#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/log_block_manager.h"
-#include "kudu/gutil/casts.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/strip.h"
@@ -65,23 +59,19 @@ using strings::Substitute;
 // LBMCorruptor to trace the LogBlockManagerNativeMeta data corruption.
 class NativeMetadataLBMCorruptor : public LBMCorruptor {
  public:
-  NativeMetadataLBMCorruptor(Env* env, DataDirManager* dd_manager, uint32_t 
rand_seed)
-      : LBMCorruptor(env, dd_manager, rand_seed) {
+  NativeMetadataLBMCorruptor(Env* env, vector<string> data_dirs, uint32_t 
rand_seed)
+      : LBMCorruptor(env, std::move(data_dirs), rand_seed) {
     max_malformed_types_ = MalformedRecordType::kTwoCreatesForSameBlockId;
   }
 
  private:
   Status CreateIncompleteContainer() override;
 
-  Status AppendRecord(const Container* c,
-                      BlockId block_id,
-                      int64_t block_offset,
-                      int64_t block_length) override;
+  Status AppendRecord(const Container* c, BlockId block_id,
+                      int64_t block_offset, int64_t block_length) override;
 
-  Status AppendCreateRecord(const Container* c,
-                            BlockId block_id,
-                            uint64_t block_offset,
-                            int64_t block_length) override;
+  Status AppendCreateRecord(const Container* c, BlockId block_id,
+                            uint64_t block_offset, int64_t block_length) 
override;
 
   Status AppendPartialRecord(const Container* c, BlockId block_id) override;
 
@@ -89,100 +79,58 @@ class NativeMetadataLBMCorruptor : public LBMCorruptor {
 
   Status CreateContainerMetadataPart(const string& unsuffixed_path) const;
 
-  static Status AppendCreateRecordInternal(WritablePBContainerFile* writer,
-                                           BlockId block_id,
-                                           int64_t block_offset,
-                                           int64_t block_length);
+  static Status AppendCreateRecordInternal(WritablePBContainerFile* writer, 
BlockId block_id,
+                                           int64_t block_offset, int64_t 
block_length);
   static Status AppendDeleteRecordInternal(WritablePBContainerFile* writer, 
BlockId block_id);
 
-  Status CreateMalformedRecord(const Container* c,
-                               MalformedRecordType error_type,
-                               BlockRecordPB* record) override;
-};
-
-// LBMCorruptor to trace the LogBlockManagerRdbMeta data corruption.
-class RdbMetadataLBMCorruptor : public LBMCorruptor {
- public:
-  RdbMetadataLBMCorruptor(Env* env, DataDirManager* dd_manager, uint32_t 
rand_seed)
-      : LBMCorruptor(env, dd_manager, rand_seed) {
-    max_malformed_types_ = MalformedRecordType::kUnrecognizedOpType;
-  }
-
- private:
-  Status CreateIncompleteContainer() override;
-
-  Status AppendRecord(const Container* c,
-                      BlockId block_id,
-                      int64_t block_offset,
-                      int64_t block_length) override;
-
-  Status AppendCreateRecord(const Container* c,
-                            BlockId block_id,
-                            uint64_t block_offset,
-                            int64_t block_length) override;
-
-  Status AppendPartialRecord(const Container* c, BlockId block_id) override;
-
-  Status AppendMetadataRecord(const Container* c, const BlockRecordPB& record) 
override;
-
-  static Status AppendCreateRecordInternal(RdbDir* dir,
-                                           const string& id,
-                                           BlockId block_id,
-                                           int64_t block_offset,
-                                           int64_t block_length);
-  static Status AppendDeleteRecordInternal(RdbDir* dir, const string& id, 
BlockId block_id);
-
-  // Get the RdbDir for the given container.
-  RdbDir* GetRdbDir(const Container* c) const;
+  Status CreateMalformedRecord(
+      const Container* c, MalformedRecordType error_type, BlockRecordPB* 
record) override;
 };
 
 unique_ptr<LBMCorruptor> LBMCorruptor::Create(
-        Env* env, DataDirManager* dd_manager, uint32_t rand_seed) {
-    if (FLAGS_block_manager == "log") {
-        return std::make_unique<NativeMetadataLBMCorruptor>(env, dd_manager, 
rand_seed);
-    }
-    if (FLAGS_block_manager == "logr") {
-        return std::make_unique<RdbMetadataLBMCorruptor>(env, dd_manager, 
rand_seed);
-    }
-    return nullptr;
+    Env* env, std::vector<std::string> data_dirs, uint32_t rand_seed) {
+  if (FLAGS_block_manager == "log") {
+    return std::make_unique<NativeMetadataLBMCorruptor>(env, 
std::move(data_dirs), rand_seed);
+  }
+  return nullptr;
 }
 
-LBMCorruptor::LBMCorruptor(Env* env, DataDirManager* dd_manager, uint32_t 
rand_seed)
+LBMCorruptor::LBMCorruptor(Env* env, vector<string> data_dirs, uint32_t 
rand_seed)
     : env_(env),
-      dd_manager_(dd_manager),
+      data_dirs_(std::move(data_dirs)),
       rand_(rand_seed) {
-  CHECK_GT(dd_manager_->dirs().size(), 0);
+  CHECK_GT(data_dirs_.size(), 0);
 }
 
 Status LBMCorruptor::Init() {
   vector<Container> all_containers;
   vector<Container> full_containers;
 
-  for (const auto& dd : dd_manager_->dirs()) {
+  for (const auto& dd : data_dirs_) {
     vector<string> dd_files;
     unordered_map<string, Container> containers_by_name;
-    RETURN_NOT_OK(env_->GetChildren(dd->dir(), &dd_files));
+    RETURN_NOT_OK(env_->GetChildren(dd, &dd_files));
     for (const auto& f : dd_files) {
       // As we iterate over each file in the data directory, keep track of data
       // and metadata files, so that only containers with both will be 
included.
       string stripped;
       if (TryStripSuffixString(
           f, LogBlockManager::kContainerDataFileSuffix, &stripped)) {
-        containers_by_name[stripped].dir = dd->dir();
+        containers_by_name[stripped].dir = dd;
         containers_by_name[stripped].name = stripped;
-        containers_by_name[stripped].data_filename = 
JoinPathSegments(dd->dir(), f);
+        containers_by_name[stripped].data_filename = JoinPathSegments(dd, f);
       } else if (TryStripSuffixString(
           f, LogBlockManager::kContainerMetadataFileSuffix, &stripped)) {
-        containers_by_name[stripped].dir = dd->dir();
+        containers_by_name[stripped].dir = dd;
         containers_by_name[stripped].name = stripped;
-        containers_by_name[stripped].metadata_filename = 
JoinPathSegments(dd->dir(), f);
+        containers_by_name[stripped].metadata_filename = JoinPathSegments(dd, 
f);
       }
     }
 
     for (const auto& e : containers_by_name) {
-      // Only include the container if valid files were present.
+      // Only include the container if both of its files were present.
       if (!e.second.data_filename.empty() &&
-          (!e.second.metadata_filename.empty() || FLAGS_block_manager == 
"logr")) {
+          !e.second.metadata_filename.empty()) {
         all_containers.push_back(e.second);
 
         // File size is an imprecise proxy for whether a container is full, but
@@ -319,9 +267,8 @@ Status LBMCorruptor::AddMalformedRecordToContainer() {
   return Status::OK();
 }
 
-Status LBMCorruptor::CreateMalformedRecord(const Container* /* c */,
-                                           MalformedRecordType error_type,
-                                           BlockRecordPB* record) {
+Status LBMCorruptor::CreateMalformedRecord(
+    const Container* /* c */, MalformedRecordType error_type, BlockRecordPB* 
record) {
   switch (error_type) {
     case MalformedRecordType::kNoBlockOffset:
       record->clear_offset();
@@ -516,8 +463,8 @@ Status LBMCorruptor::GetRandomContainer(FindContainerMode 
mode,
   return Status::OK();
 }
 
-string LBMCorruptor::GetRandomDataDir() const {
-  return dd_manager_->GetDirs()[rand_.Uniform(dd_manager_->GetDirs().size())];
+const string& LBMCorruptor::GetRandomDataDir() const {
+  return data_dirs_[rand_.Uniform(data_dirs_.size())];
 }
 
 Status NativeMetadataLBMCorruptor::CreateIncompleteContainer() {
@@ -541,10 +488,8 @@ Status 
NativeMetadataLBMCorruptor::CreateIncompleteContainer() {
   return Status::OK();
 }
 
-Status NativeMetadataLBMCorruptor::AppendRecord(const Container* c,
-                                                BlockId block_id,
-                                                int64_t block_offset,
-                                                int64_t block_length) {
+Status NativeMetadataLBMCorruptor::AppendRecord(
+    const Container* c, BlockId block_id, int64_t block_offset, int64_t 
block_length) {
   unique_ptr<WritablePBContainerFile> metadata_writer;
   RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
   RETURN_NOT_OK(AppendCreateRecordInternal(metadata_writer.get(), block_id,
@@ -553,10 +498,8 @@ Status NativeMetadataLBMCorruptor::AppendRecord(const 
Container* c,
   return metadata_writer->Close();
 }
 
-Status NativeMetadataLBMCorruptor::AppendCreateRecord(const Container* c,
-                                                      BlockId block_id,
-                                                      uint64_t block_offset,
-                                                      int64_t block_length) {
+Status NativeMetadataLBMCorruptor::AppendCreateRecord(
+    const Container* c, BlockId block_id, uint64_t block_offset, int64_t 
block_length) {
   unique_ptr<WritablePBContainerFile> metadata_writer;
   RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
   RETURN_NOT_OK(AppendCreateRecordInternal(metadata_writer.get(), block_id,
@@ -602,9 +545,8 @@ Status 
NativeMetadataLBMCorruptor::CreateContainerMetadataPart(
   return metadata_file->Close();
 }
 
-Status NativeMetadataLBMCorruptor::CreateMalformedRecord(const Container* c,
-                                                         MalformedRecordType 
error_type,
-                                                         BlockRecordPB* 
record) {
+Status NativeMetadataLBMCorruptor::CreateMalformedRecord(
+    const Container* c, MalformedRecordType error_type, BlockRecordPB* record) 
{
   switch (error_type) {
     case MalformedRecordType::kDeleteWithoutFirstMatchingCreate:
       record->clear_offset();
@@ -620,10 +562,9 @@ Status 
NativeMetadataLBMCorruptor::CreateMalformedRecord(const Container* c,
   return Status::OK();
 }
 
-Status 
NativeMetadataLBMCorruptor::AppendCreateRecordInternal(WritablePBContainerFile* 
writer,
-                                                              BlockId block_id,
-                                                              int64_t 
block_offset,
-                                                              int64_t 
block_length) {
+Status NativeMetadataLBMCorruptor::AppendCreateRecordInternal(
+    WritablePBContainerFile* writer, BlockId block_id,
+    int64_t block_offset, int64_t block_length) {
   BlockRecordPB record;
   block_id.CopyToPB(record.mutable_block_id());
   record.set_op_type(CREATE);
@@ -642,98 +583,5 @@ Status 
NativeMetadataLBMCorruptor::AppendDeleteRecordInternal(
   return writer->Append(record);
 }
 
-Status RdbMetadataLBMCorruptor::AppendPartialRecord(
-    const Container* c, const BlockId block_id) {
-  auto* rdb_dir = GetRdbDir(c);
-  DCHECK(rdb_dir);
-  // Add a new good record to the container.
-  RETURN_NOT_OK(AppendCreateRecordInternal(rdb_dir, c->name, block_id, 0, 0));
-  // Corrupt the record by truncating one byte off the end of it.
-  string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(c->name, block_id));
-  rocksdb::Slice slice_key(key);
-  string value;
-  RETURN_NOT_OK(FromRdbStatus(rdb_dir->rdb()->Get(
-      rocksdb::ReadOptions(), slice_key, &value)));
-  return FromRdbStatus(rdb_dir->rdb()->Put(
-      rocksdb::WriteOptions(), slice_key, rocksdb::Slice(value.data(), 
value.size() - 1)));
-}
-
-Status RdbMetadataLBMCorruptor::AppendMetadataRecord(
-    const Container* c, const BlockRecordPB& record) {
-  auto* rdb_dir = GetRdbDir(c);
-  DCHECK(rdb_dir);
-  string buf;
-  record.SerializeToString(&buf);
-  string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(
-      c->name, BlockId::FromPB(record.block_id())));
-  return FromRdbStatus(rdb_dir->rdb()->Put(
-      rocksdb::WriteOptions(), rocksdb::Slice(key), rocksdb::Slice(buf)));
-}
-
-RdbDir* RdbMetadataLBMCorruptor::GetRdbDir(const Container* c) const {
-  DCHECK(c);
-  for (const auto& dir : dd_manager_->dirs()) {
-    if (dir->dir() == c->dir) {
-      DCHECK(dir);
-      return down_cast<RdbDir*>(dir.get());
-    }
-  }
-  return nullptr;
-}
-
-Status RdbMetadataLBMCorruptor::AppendRecord(const Container* c,
-                                             const BlockId block_id,
-                                             int64_t block_offset,
-                                             int64_t block_length) {
-  auto* rdb_dir = GetRdbDir(c);
-  DCHECK(rdb_dir);
-  RETURN_NOT_OK(AppendCreateRecordInternal(rdb_dir, c->name, block_id, 
block_offset, block_length));
-  return AppendDeleteRecordInternal(rdb_dir, c->name, block_id);
-}
-
-Status RdbMetadataLBMCorruptor::CreateIncompleteContainer() {
-  string unsuffixed_path = JoinPathSegments(GetRandomDataDir(), 
oid_generator_.Next());
-  // Create an incomplete container. Types of incomplete containers:
-  //
-  // 0. Empty data file but no metadata file.
-  return CreateContainerDataPart(unsuffixed_path);
-}
-
-Status RdbMetadataLBMCorruptor::AppendCreateRecord(const Container* c,
-                                                   const BlockId block_id,
-                                                   uint64_t block_offset,
-                                                   int64_t block_length) {
-  auto* rdb_dir = GetRdbDir(c);
-  DCHECK(rdb_dir);
-  return AppendCreateRecordInternal(rdb_dir, c->name, block_id,
-                                    block_offset, block_length);
-}
-
-Status RdbMetadataLBMCorruptor::AppendCreateRecordInternal(RdbDir* dir,
-                                                           const string& id,
-                                                           BlockId block_id,
-                                                           int64_t 
block_offset,
-                                                           int64_t 
block_length) {
-  // Construct value.
-  BlockRecordPB record;
-  block_id.CopyToPB(record.mutable_block_id());
-  record.set_op_type(CREATE);
-  record.set_offset(block_offset);
-  record.set_length(block_length);
-  record.set_timestamp_us(0); // has no effect
-  string value;
-  record.SerializeToString(&value);
-
-  // Construct key.
-  string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(id, 
BlockId::FromPB(record.block_id())));
-  return FromRdbStatus(dir->rdb()->Put({}, rocksdb::Slice(key), 
rocksdb::Slice(value)));
-}
-
-Status RdbMetadataLBMCorruptor::AppendDeleteRecordInternal(
-    RdbDir* dir, const string& id, BlockId block_id) {
-  string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(id, block_id));
-  return FromRdbStatus(dir->rdb()->Delete({}, rocksdb::Slice(key)));
-}
-
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/log_block_manager-test-util.h 
b/src/kudu/fs/log_block_manager-test-util.h
index f8661ad73..ff3dfd20b 100644
--- a/src/kudu/fs/log_block_manager-test-util.h
+++ b/src/kudu/fs/log_block_manager-test-util.h
@@ -37,14 +37,13 @@ class WritablePBContainerFile;
 } // namespace pb_util
 
 namespace fs {
-class DataDirManager;
 
 // Corrupts various log block manager on-disk data structures.
 class LBMCorruptor {
  public:
   // Create a LBMCorruptor according to the --block_manager flag.
   static std::unique_ptr<LBMCorruptor> Create(
-      Env* env, DataDirManager* dd_manager, uint32_t rand_seed);
+      Env* env, std::vector<std::string> data_dirs, uint32_t rand_seed);
 
   virtual ~LBMCorruptor() = default;
 
@@ -104,13 +103,8 @@ class LBMCorruptor {
   // Injects one of the above non-fatal inconsistencies (chosen at random).
   Status InjectRandomNonFatalInconsistency();
 
-  // Resets the DataDirManager when the bound DataDirManager changes.
-  void ResetDataDirManager(DataDirManager* dd_manager) {
-    dd_manager_ = dd_manager;
-  }
-
  protected:
-  LBMCorruptor(Env* env, DataDirManager* dd_manager, uint32_t rand_seed);
+  LBMCorruptor(Env* env, std::vector<std::string> data_dirs, uint32_t 
rand_seed);
 
   // Describes an on-disk LBM container.
   struct Container {
@@ -144,21 +138,17 @@ class LBMCorruptor {
                             const Container** container) const;
 
   // Gets a data directory chosen at random.
-  std::string GetRandomDataDir() const;
+  const std::string& GetRandomDataDir() const;
 
   // Appends a CREATE-DELETE pair of records to the container 'c', the newly
   // created record has a unique id of 'block_id' and is located at
   // 'block_offset' with a size of 'block_length'.
-  virtual Status AppendRecord(const Container* c,
-                              BlockId block_id,
-                              int64_t block_offset,
-                              int64_t block_length) = 0;
+  virtual Status AppendRecord(const Container* c, BlockId block_id,
+                              int64_t block_offset, int64_t block_length) = 0;
 
   // Similar to the above, but only appends the CREATE record.
-  virtual Status AppendCreateRecord(const Container* c,
-                                    BlockId block_id,
-                                    uint64_t block_offset,
-                                    int64_t block_length) = 0;
+  virtual Status AppendCreateRecord(const Container* c, BlockId block_id,
+                                    uint64_t block_offset, int64_t 
block_length) = 0;
 
   // Appends a partial CREATE record to the metadata part of container 'c', the
   // newly created record has a unique id of 'block_id'. The record is 
corrupted
@@ -183,16 +173,14 @@ class LBMCorruptor {
     kTwoCreatesForSameBlockId
   };
   // Returns an error if a container could not be found.
-  virtual Status CreateMalformedRecord(const Container* c,
-                                       MalformedRecordType error_type,
-                                       BlockRecordPB* record);
+  virtual Status CreateMalformedRecord(
+      const Container* c, MalformedRecordType error_type, BlockRecordPB* 
record);
 
   Status CreateContainerDataPart(const std::string& unsuffixed_path);
 
   // Initialized in the constructor.
   Env* env_;
-  // Use DataDirManager to access the Dir objects conveniently.
-  DataDirManager* dd_manager_;
+  const std::vector<std::string> data_dirs_;
   mutable Random rand_;
   ObjectIdGenerator oid_generator_;
   MalformedRecordType max_malformed_types_;
diff --git a/src/kudu/fs/log_block_manager-test.cc 
b/src/kudu/fs/log_block_manager-test.cc
index ce7fd97ee..05f814761 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -30,7 +30,6 @@
 #include <random>
 #include <set>
 #include <string>
-#include <tuple>
 #include <type_traits>
 #include <unordered_map>
 #include <unordered_set>
@@ -40,11 +39,6 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
-#include <rocksdb/db.h>
-#include <rocksdb/iterator.h>
-#include <rocksdb/options.h>
-#include <rocksdb/slice.h>
-#include <rocksdb/status.h>
 
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
@@ -57,7 +51,6 @@
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
@@ -76,7 +69,6 @@
 #include "kudu/util/threadpool.h"
 
 using kudu::pb_util::ReadablePBContainerFile;
-using std::make_tuple;
 using std::set;
 using std::string;
 using std::shared_ptr;
@@ -84,8 +76,6 @@ using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
-using strings::SkipEmpty;
-using strings::Split;
 using strings::Substitute;
 
 DECLARE_bool(cache_force_single_shard);
@@ -135,21 +125,7 @@ namespace internal {
 class LogBlockContainer;
 } // namespace internal
 
-// Parameterize test cases:
-// +------------+---------------+
-// | encryption | block_manager |
-// +------------+---------------+
-// |    no      |     log       |
-// +------------+---------------+
-// |    no      |     logr      |
-// +------------+---------------+
-// |    yes     |     log       |
-// +------------+---------------+
-// |    yes     |     logr      |
-// +------------+---------------+
-class LogBlockManagerTest : public KuduTest,
-                            public ::testing::WithParamInterface<
-                                std::tuple<bool, string>> {
+class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterface<bool> {
  public:
   LogBlockManagerTest() :
       test_tablet_name_("test_tablet"),
@@ -158,8 +134,7 @@ class LogBlockManagerTest : public KuduTest,
       //
       // Not strictly necessary except for 
TestDeleteFromContainerAfterMetadataCompaction.
       file_cache_("test_cache", env_, 50, scoped_refptr<MetricEntity>()) {
-    SetEncryptionFlags(std::get<0>(GetParam()));
-    FLAGS_block_manager = std::get<1>(GetParam());
+    SetEncryptionFlags(GetParam());
     CHECK_OK(file_cache_.Init());
     error_manager_ = make_scoped_refptr(new FsErrorManager());
     bm_ = CreateBlockManager(scoped_refptr<MetricEntity>());
@@ -187,20 +162,13 @@ class LogBlockManagerTest : public KuduTest,
 
     BlockManagerOptions opts;
     opts.metric_entity = metric_entity;
-    if (FLAGS_block_manager == "log") {
-      return make_scoped_refptr(new LogBlockManagerNativeMeta(
-          env_, dd_manager_.get(), error_manager_,
-          &file_cache_, std::move(opts), fs::kDefaultTenantID));
-    }
-    CHECK_EQ(FLAGS_block_manager, "logr");
-    return make_scoped_refptr(new LogBlockManagerRdbMeta(
-          env_, dd_manager_.get(), error_manager_,
-          &file_cache_, std::move(opts), fs::kDefaultTenantID));
+    CHECK_EQ(FLAGS_block_manager, "log");
+    return make_scoped_refptr(new LogBlockManagerNativeMeta(env_, 
dd_manager_.get(), error_manager_,
+                              &file_cache_, std::move(opts), 
fs::kDefaultTenantID));
   }
 
   Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity = 
nullptr,
                             FsReport* report = nullptr,
-                            LBMCorruptor* corruptor = nullptr,
                             vector<string> test_data_dirs = {},
                             bool force = false) {
     PrepareDataDirs(&test_data_dirs);
@@ -221,10 +189,6 @@ class LogBlockManagerTest : public KuduTest,
                                                          
DataDirManagerOptions(), &dd_manager_));
       RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, 
test_group_pb_));
     }
-    if (corruptor) {
-      // Reset the DataDirManager which has been changed.
-      corruptor->ResetDataDirManager(dd_manager_.get());
-    }
     bm_ = CreateBlockManager(metric_entity, test_data_dirs);
     RETURN_NOT_OK(bm_->Open(report, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
     return Status::OK();
@@ -280,7 +244,6 @@ class LogBlockManagerTest : public KuduTest,
     ASSERT_TRUE(report.malformed_record_check->entries.empty());
     ASSERT_TRUE(report.misaligned_block_check->entries.empty());
     ASSERT_TRUE(report.partial_record_check->entries.empty());
-    ASSERT_TRUE(report.corrupted_rdb_record_check->entries.empty());
   }
 
   DataDirGroupPB test_group_pb_;
@@ -380,24 +343,7 @@ static void CheckLogMetrics(const 
scoped_refptr<MetricEntity>& entity,
   }
 }
 
-INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerTest,
-                         ::testing::Combine(
-                             ::testing::Values(false, true),
-                             ::testing::Values("log", "logr")));
-
-// Parameterize test cases:
-// +------------+---------------+
-// | encryption | block_manager |
-// +------------+---------------+
-// |    no      |     log       |
-// +------------+---------------+
-// |    yes     |     log       |
-// +------------+---------------+
-class LogBlockManagerNativeMetaTest : public LogBlockManagerTest {
-};
-INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerNativeMetaTest,
-    ::testing::Values(make_tuple(false, "log"),
-                      make_tuple(true, "log")));
+INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerTest, 
::testing::Values(false, true));
 
 TEST_P(LogBlockManagerTest, MetricsTest) {
   MetricRegistry registry;
@@ -422,7 +368,7 @@ TEST_P(LogBlockManagerTest, MetricsTest) {
   // encryption header occuppies one block on disk, so set 
FLAGS_log_container_max_size
   // (2 * fs_block_size) when the encryption is enabled and set it 
fs_block_size when
   // the encryption is disabled.
-  FLAGS_log_container_max_size = std::get<0>(GetParam()) ? 2 * fs_block_size : 
fs_block_size;
+  FLAGS_log_container_max_size = GetParam() ? 2 * fs_block_size : 
fs_block_size;
   // One block --> one container.
   unique_ptr<WritableBlock> writer;
   ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
@@ -721,7 +667,7 @@ TEST_P(LogBlockManagerTest, TestReuseBlockIds) {
 //
 // Note that we rely on filesystem integrity to ensure that we do not lose
 // trailing, fsync()ed metadata.
-TEST_P(LogBlockManagerNativeMetaTest, TestMetadataTruncation) {
+TEST_P(LogBlockManagerTest, TestMetadataTruncation) {
   // Create several blocks.
   vector<BlockId> created_blocks;
   BlockId last_block_id;
@@ -1108,7 +1054,6 @@ TEST_P(LogBlockManagerTest, TestParseKernelRelease) {
 // However it still can be used to micro-optimize the startup process.
 TEST_P(LogBlockManagerTest, StartupBenchmark) {
   SKIP_IF_SLOW_NOT_ALLOWED();
-
   // Disable preflushing since this can slow down our writes. In particular,
   // since we write such small blocks in this test, each block will likely
   // begin on the same 4KB page as the prior one we wrote, and due to the
@@ -1125,7 +1070,7 @@ TEST_P(LogBlockManagerTest, StartupBenchmark) {
       test_dirs.emplace_back(test_dir_ + "/" + std::to_string(i));
     }
     // Re-open block manager to place data on multiple data directories.
-    ASSERT_OK(ReopenBlockManager(nullptr, nullptr, nullptr, test_dirs, /* 
force= */ true));
+    ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs, /* force= */ 
true));
   }
 
   // Creates FLAGS_startup_benchmark_batch_count_for_testing *
@@ -1181,9 +1126,8 @@ TEST_P(LogBlockManagerTest, StartupBenchmark) {
 
   for (int i = 0; i < FLAGS_startup_benchmark_reopen_times; i++) {
     SCOPED_LOG_TIMING(INFO, "reopening block manager");
-    ASSERT_OK(ReopenBlockManager(nullptr, nullptr, nullptr, test_dirs));
+    ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs));
   }
-  LOG(INFO) << "Test on --block_manager=" << FLAGS_block_manager;
 }
 #endif
 
@@ -1296,7 +1240,7 @@ TEST_P(LogBlockManagerTest, 
TestContainerBlockLimitingByBlockNum) {
   NO_FATALS(AssertNumContainers(4));
 }
 
-TEST_P(LogBlockManagerNativeMetaTest, 
TestContainerBlockLimitingByMetadataSize) {
+TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSize) {
   const int kNumBlocks = 1000;
 
   // Creates 'kNumBlocks' blocks with minimal data.
@@ -1332,7 +1276,7 @@ TEST_P(LogBlockManagerNativeMetaTest, 
TestContainerBlockLimitingByMetadataSize)
   NO_FATALS(AssertNumContainers(4));
 }
 
-TEST_P(LogBlockManagerNativeMetaTest, 
TestContainerBlockLimitingByMetadataSizeWithCompaction) {
+TEST_P(LogBlockManagerTest, 
TestContainerBlockLimitingByMetadataSizeWithCompaction) {
   const int kNumBlocks = 2000;
   const int kNumThreads = 10;
   const double kLiveBlockRatio = 0.1;
@@ -1343,9 +1287,9 @@ TEST_P(LogBlockManagerNativeMetaTest, 
TestContainerBlockLimitingByMetadataSizeWi
     // Creates 'kNumBlocks' blocks.
     for (int i = 0; i < kNumBlocks; i++) {
       unique_ptr<WritableBlock> block;
-      CHECK_OK(bm_->CreateBlock(test_block_opts_, &block));
-      CHECK_OK(block->Append("aaaa"));
-      CHECK_OK(block->Close());
+      RETURN_NOT_OK(bm_->CreateBlock(test_block_opts_, &block));
+      RETURN_NOT_OK(block->Append("aaaa"));
+      RETURN_NOT_OK(block->Close());
       ids.push_back(block->id());
     }
 
@@ -1358,7 +1302,9 @@ TEST_P(LogBlockManagerNativeMetaTest, 
TestContainerBlockLimitingByMetadataSizeWi
       }
       deletion_transaction->AddDeletedBlock(id);
     }
-    CHECK_OK(deletion_transaction->CommitDeletedBlocks(nullptr));
+    RETURN_NOT_OK(deletion_transaction->CommitDeletedBlocks(nullptr));
+
+    return Status::OK();
   };
 
   // Create a thread pool to create and delete blocks.
@@ -1408,7 +1354,7 @@ TEST_P(LogBlockManagerNativeMetaTest, 
TestContainerBlockLimitingByMetadataSizeWi
   NO_FATALS(GetContainerMetadataFiles(&metadata_files));
   bool exist_larger_one = false;
   for (const auto& metadata_file : metadata_files) {
-    uint64_t file_size = 0;
+    uint64_t file_size;
     NO_FATALS(env_->GetFileSize(metadata_file, &file_size));
     if (file_size > FLAGS_log_container_metadata_max_size *
                          
FLAGS_log_container_metadata_size_before_compact_ratio) {
@@ -1431,7 +1377,7 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add a mixture of regular and misaligned blocks to it.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
   ASSERT_OK(corruptor->Init());
   int num_misaligned_blocks = 0;
   for (int i = 0; i < kNumBlocks; i++) {
@@ -1442,7 +1388,7 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
       // container metadata writers do not expect the metadata files to have
       // been changed underneath them.
       FsReport report;
-      ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+      ASSERT_OK(ReopenBlockManager(nullptr, &report));
       ASSERT_FALSE(report.HasFatalErrors());
       num_misaligned_blocks++;
     } else {
@@ -1469,7 +1415,7 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
     }
   }
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors()) << report.ToString();
   ASSERT_EQ(num_misaligned_blocks, 
report.misaligned_block_check->entries.size());
   for (const auto& mb : report.misaligned_block_check->entries) {
@@ -1487,16 +1433,14 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
         deletion_transaction->AddDeletedBlock(id);
       }
     }
-    vector<BlockId> deleted;
-    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
-    ASSERT_FALSE(deleted.empty());
+    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(nullptr));
   }
 
   // Wait for the block manager to punch out all of the holes. It's easiest to
   // do this by reopening it; shutdown will wait for outstanding hole punches.
   //
   // On reopen, some misaligned blocks should be gone from the report.
-  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_GT(report.misaligned_block_check->entries.size(), 0);
   ASSERT_LT(report.misaligned_block_check->entries.size(), 
num_misaligned_blocks);
@@ -1550,13 +1494,13 @@ TEST_P(LogBlockManagerTest, 
TestRepairPreallocateExcessSpace) {
   NO_FATALS(GetContainerNames(&container_names));
 
   // Corrupt one container.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
   ASSERT_OK(corruptor->Init());
   ASSERT_OK(corruptor->PreallocateFullContainer());
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(1, report.full_container_space_check->entries.size());
   const LBMFullContainerSpaceCheck::Entry& fcs =
@@ -1578,7 +1522,7 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
 
   // Force our single container to become full once created.
-  FLAGS_log_container_max_size = std::get<0>(GetParam()) ? 4096 : 0;
+  FLAGS_log_container_max_size = GetParam() ? 4096 : 0;
 
   // Force the test to measure extra space in unpunched holes, not in the
   // preallocation buffer.
@@ -1594,7 +1538,7 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &initial_file_size_on_disk));
 
   // Add some "unpunched blocks" to the container.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumBlocks; i++) {
     ASSERT_OK(corruptor->AddUnpunchedBlockToFullContainer());
@@ -1606,7 +1550,7 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(1, report.full_container_space_check->entries.size());
   const LBMFullContainerSpaceCheck::Entry& fcs =
@@ -1622,23 +1566,12 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   // Wait for the block manager to punch out all of the holes (done as part of
   // repair at startup). It's easiest to do this by reopening it; shutdown will
   // wait for outstanding hole punches.
-  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
-  if (FLAGS_block_manager == "logr" && !FLAGS_encrypt_data_at_rest) {
-    // In this case, the data file is too small (zero here), the container 
will be added to
-    // incomplete_container_check, of course, it will be repaired 
automatically when bootstrap.
-    ASSERT_FALSE(report.HasFatalErrors());
-    ASSERT_EQ(1, report.incomplete_container_check->entries.size());
-    const auto& ics = report.incomplete_container_check->entries[0];
-    ASSERT_EQ(container, ics.container);
-    ASSERT_TRUE(ics.repaired);
-    report.incomplete_container_check->entries.clear();
-    ASSERT_FALSE(env_->FileExists(data_file));
-  } else {
-    // File size should be 0 post-repair.
-    ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &file_size_on_disk));
-    ASSERT_EQ(initial_file_size_on_disk, file_size_on_disk);
-  }
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   NO_FATALS(AssertEmptyReport(report));
+
+  // File size should be 0 post-repair.
+  ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &file_size_on_disk));
+  ASSERT_EQ(initial_file_size_on_disk, file_size_on_disk);
 }
 
 TEST_P(LogBlockManagerTest, TestRepairIncompleteContainer) {
@@ -1647,7 +1580,7 @@ TEST_P(LogBlockManagerTest, 
TestRepairIncompleteContainer) {
   // Create some incomplete containers. The corruptor will select between
   // several variants of "incompleteness" at random (see
   // LBMCorruptor::CreateIncompleteContainer() for details).
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumContainers; i++) {
     ASSERT_OK(corruptor->CreateIncompleteContainer());
@@ -1658,7 +1591,7 @@ TEST_P(LogBlockManagerTest, 
TestRepairIncompleteContainer) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(kNumContainers, report.incomplete_container_check->entries.size());
   unordered_set<string> container_name_set(container_names.begin(),
@@ -1685,7 +1618,7 @@ TEST_P(LogBlockManagerTest, TestDetectMalformedRecords) {
   // Add some malformed records. The corruptor will select between
   // several variants of "malformedness" at random (see
   // LBMCorruptor::AddMalformedRecordToContainer for details).
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumRecords; i++) {
     ASSERT_OK(corruptor->AddMalformedRecordToContainer());
@@ -1693,7 +1626,7 @@ TEST_P(LogBlockManagerTest, TestDetectMalformedRecords) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_TRUE(report.HasFatalErrors());
   ASSERT_EQ(kNumRecords, report.malformed_record_check->entries.size());
   for (const auto& mr : report.malformed_record_check->entries) {
@@ -1715,7 +1648,7 @@ TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) {
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add some misaligned blocks.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumBlocks; i++) {
     ASSERT_OK(corruptor->AddMisalignedBlockToContainer());
@@ -1723,7 +1656,7 @@ TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(kNumBlocks, report.misaligned_block_check->entries.size());
   uint64_t fs_block_size;
@@ -1735,7 +1668,7 @@ TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) {
   NO_FATALS(AssertEmptyReport(report));
 }
 
-TEST_P(LogBlockManagerNativeMetaTest, TestRepairPartialRecords) {
+TEST_P(LogBlockManagerTest, TestRepairPartialRecords) {
   const int kNumContainers = 50;
   const int kNumRecords = 10;
 
@@ -1754,7 +1687,7 @@ TEST_P(LogBlockManagerNativeMetaTest, 
TestRepairPartialRecords) {
   ASSERT_EQ(kNumContainers, container_names.size());
 
   // Add some partial records.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumRecords; i++) {
     ASSERT_OK(corruptor->AddPartialRecordToContainer());
@@ -1762,7 +1695,7 @@ TEST_P(LogBlockManagerNativeMetaTest, 
TestRepairPartialRecords) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(kNumRecords, report.partial_record_check->entries.size());
   unordered_set<string> container_name_set(container_names.begin(),
@@ -1812,7 +1745,7 @@ TEST_P(LogBlockManagerTest, 
TestDeleteDeadContainersAtStartup) {
   ASSERT_FALSE(env_->FileExists(metadata_file_name));
 }
 
-TEST_P(LogBlockManagerNativeMetaTest, 
TestCompactFullContainerMetadataAtStartup) {
+TEST_P(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
   // With this ratio, the metadata of a full container comprised of half dead
   // blocks will be compacted at startup.
   FLAGS_log_container_live_metadata_before_compact_ratio = 0.50;
@@ -1876,7 +1809,7 @@ TEST_P(LogBlockManagerNativeMetaTest, 
TestCompactFullContainerMetadataAtStartup)
 //
 // The bug was related to a stale file descriptor left in the file_cache, so
 // this test explicitly targets that scenario.
-TEST_P(LogBlockManagerNativeMetaTest, 
TestDeleteFromContainerAfterMetadataCompaction) {
+TEST_P(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
   // Compact aggressively.
   FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;
   // Use a single shard so that we have an accurate max cache capacity
@@ -2099,7 +2032,7 @@ TEST_P(LogBlockManagerTest, 
TestDeleteDeadContainersByDeletionTransaction) {
     }
     {
       // The last block makes a full container.
-      FLAGS_log_container_max_size = std::get<0>(GetParam()) ? 4097 : 1;
+      FLAGS_log_container_max_size = GetParam() ? 4097 : 1;
       unique_ptr<WritableBlock> writer;
       ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
       blocks.emplace_back(writer->id());
@@ -2119,12 +2052,10 @@ TEST_P(LogBlockManagerTest, 
TestDeleteDeadContainersByDeletionTransaction) {
 
     // Check the container files.
     string data_file_name;
-    NO_FATALS(GetOnlyContainerDataFile(&data_file_name));
     string metadata_file_name;
-    // The metadata file exists only when --block_manager=log.
-    if (FLAGS_block_manager == "log") {
-      NO_FATALS(GetOnlyContainerMetadataFile(&metadata_file_name));
-    }
+    NO_FATALS(GetOnlyContainerDataFile(&data_file_name));
+    NO_FATALS(GetOnlyContainerMetadataFile(&metadata_file_name));
+
     // Open the last block for reading.
     unique_ptr<ReadableBlock> reader;
     ASSERT_OK(bm_->OpenBlock(blocks[block_num-1], &reader));
@@ -2174,9 +2105,7 @@ TEST_P(LogBlockManagerTest, 
TestDeleteDeadContainersByDeletionTransaction) {
 
     // The container files should have been deleted.
     ASSERT_FALSE(env_->FileExists(data_file_name));
-    if (FLAGS_block_manager == "log") {
-      ASSERT_FALSE(env_->FileExists(metadata_file_name));
-    }
+    ASSERT_FALSE(env_->FileExists(metadata_file_name));
   };
 
   for (int i = 1; i < 4; ++i) {
@@ -2246,7 +2175,7 @@ TEST_P(LogBlockManagerTest, 
TestDoNotDeleteFakeDeadContainer) {
   Process(false);
 }
 
-TEST_P(LogBlockManagerNativeMetaTest, TestHalfPresentContainer) {
+TEST_P(LogBlockManagerTest, TestHalfPresentContainer) {
   BlockId block_id;
   string data_file_name;
   string metadata_file_name;
@@ -2499,331 +2428,5 @@ TEST_P(LogBlockManagerNativeMetaTest, 
TestHalfPresentContainer) {
   }
 }
 
-// Parameterize test cases:
-// +------------+---------------+
-// | encryption | block_manager |
-// +------------+---------------+
-// |    no      |     logr      |
-// +------------+---------------+
-// |    yes     |     logr      |
-// +------------+---------------+
-class LogBlockManagerRdbMetaTest : public LogBlockManagerTest {
-};
-INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerRdbMetaTest,
-    ::testing::Values(make_tuple(false, "logr"),
-                      make_tuple(true, "logr")));
-
-TEST_P(LogBlockManagerRdbMetaTest, TestHalfPresentContainer) {
-  BlockId block_id;
-  string data_file_name;
-  Dir* pdir = nullptr;
-  string container_name;
-  MetricRegistry registry;
-  scoped_refptr<MetricEntity> entity = 
METRIC_ENTITY_server.Instantiate(&registry, "test");
-
-  const auto CreateContainer = [&] (bool create_block = false) {
-    ASSERT_OK(ReopenBlockManager(entity));
-    unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
-    block_id = writer->id();
-    if (create_block) {
-      ASSERT_OK(writer->Append("a"));
-    }
-    ASSERT_OK(writer->Finalize());
-    ASSERT_OK(writer->Close());
-
-    // Get the data file name and container name.
-    NO_FATALS(GetOnlyContainerDataFile(&data_file_name));
-    pdir = dd_manager_->FindDirByFullPathForTests(data_file_name);
-    ASSERT_NE(pdir, nullptr);
-    string file_name = BaseName(data_file_name);
-    vector<string> tmp = Split(file_name, ".", SkipEmpty());
-    ASSERT_FALSE(tmp.empty());
-    container_name = tmp[0];
-  };
-
-  const auto CreateDataFile = [&] () {
-    // We're often recreating an existing file, so we must invalidate any
-    // entry in the file cache first.
-    file_cache_.Invalidate(data_file_name);
-
-    unique_ptr<WritableFile> data_file_writer;
-    WritableFileOptions opts;
-    opts.is_sensitive = true;
-    ASSERT_OK(env_->NewWritableFile(opts, data_file_name, &data_file_writer));
-    data_file_writer->Close();
-  };
-
-  const auto DeleteBlock = [&] () {
-    shared_ptr<BlockDeletionTransaction> transaction = 
bm_->NewDeletionTransaction();
-    transaction->AddDeletedBlock(block_id);
-    ASSERT_OK(transaction->CommitDeletedBlocks(nullptr));
-    transaction.reset();
-    dd_manager_->WaitOnClosures();
-  };
-
-  const auto CheckOK = [&] () {
-    FsReport report;
-    ASSERT_OK(ReopenBlockManager(entity, &report));
-    ASSERT_FALSE(report.HasFatalErrors());
-    NO_FATALS(AssertEmptyReport(report));
-    pdir = dd_manager_->FindDirByFullPathForTests(data_file_name);
-    ASSERT_NE(pdir, nullptr);
-  };
-
-  const auto CheckRepaired = [&] (const string& type) {
-    FsReport report;
-    ASSERT_OK(ReopenBlockManager(entity, &report));
-    ASSERT_FALSE(report.HasFatalErrors());
-    if (type == "incomplete_container_check") {
-      ASSERT_EQ(1, report.incomplete_container_check->entries.size());
-      report.incomplete_container_check->entries.clear();
-    } else if (type == "corrupted_rdb_record_check") {
-      ASSERT_EQ(1, report.corrupted_rdb_record_check->entries.size());
-      report.corrupted_rdb_record_check->entries.clear();
-    }
-    NO_FATALS(AssertEmptyReport(report));
-    pdir = dd_manager_->FindDirByFullPathForTests(data_file_name);
-    ASSERT_NE(pdir, nullptr);
-  };
-
-  const auto WriteBadMetadata = [&] (
-      const string& container_name, const BlockId& block_id) {
-    string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(container_name, 
block_id));
-    rocksdb::Slice slice_key(key);
-    string value = "bad_value";
-    rocksdb::Status s = down_cast<RdbDir*>(pdir)->rdb()->Put(
-        rocksdb::WriteOptions(), slice_key, rocksdb::Slice(value));
-    ASSERT_OK(FromRdbStatus(s));
-  };
-
-  const auto MetadataEntriesCount = [&] (const string& container_name) -> int {
-    rocksdb::Slice begin_key = container_name;
-    int count = 0;
-    auto* rdb_dir = down_cast<RdbDir*>(pdir);
-    unique_ptr<rocksdb::Iterator> it(rdb_dir->rdb()->NewIterator(
-        rocksdb::ReadOptions(), rdb_dir->rdb()->DefaultColumnFamily()));
-    it->Seek(begin_key);
-    while (it->Valid() && it->key().starts_with(begin_key)) {
-      count++;
-      it->Next();
-    }
-    CHECK_OK(FromRdbStatus(it->status()));
-    return count;
-  };
-
-  // Case1: the metadata in rdb is empty and
-  //        the size of the existing data file is 0.
-  {
-    // Create a container.
-    NO_FATALS(CreateContainer());
-
-    // Delete the metadata part.
-    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
-
-    // The container has been repaired.
-    NO_FATALS(CheckRepaired("incomplete_container_check"));
-
-    // Data file has been removed, metadata is empty.
-    ASSERT_FALSE(env_->FileExists(data_file_name));
-    ASSERT_EQ(0, MetadataEntriesCount(container_name));
-  }
-
-  // Case2: the metadata in rdb is empty and
-  //        the size of the existing data file is >0.
-  {
-    // Create a container.
-    NO_FATALS(CreateContainer(true));
-
-    // Delete the metadata part.
-    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
-
-    // The container has been repaired.
-    NO_FATALS(CheckOK());
-
-    // Data file exists, but metadata is empty.
-    ASSERT_TRUE(env_->FileExists(data_file_name));
-    ASSERT_EQ(0, MetadataEntriesCount(container_name));
-
-    // Delete the data file to keep path clean.
-    ASSERT_OK(env_->DeleteFile(data_file_name));
-  }
-
-  // Case3: the metadata in rdb is empty and
-  //        the data file has gone missing.
-  {
-    // Create a container.
-    NO_FATALS(CreateContainer());
-
-    // Delete the data file&metadata part, and keep the path.
-    ASSERT_OK(env_->DeleteFile(data_file_name));
-    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
-
-    // The container is not exist.
-    NO_FATALS(CheckOK());
-
-    // Data file has been removed, metadata is empty.
-    ASSERT_FALSE(env_->FileExists(data_file_name));
-    ASSERT_EQ(0, MetadataEntriesCount(container_name));
-  }
-
-  // Case4: the metadata in rdb has bad value and
-  //        the size of the existing data file is 0.
-  {
-    // Create a container.
-    NO_FATALS(CreateContainer());
-
-    // Delete the metadata part.
-    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
-
-    // Create a metadata record with bad value.
-    WriteBadMetadata(container_name, block_id);
-    ASSERT_EQ(1, MetadataEntriesCount(container_name));
-
-    // The container has been repaired.
-    NO_FATALS(CheckRepaired("incomplete_container_check"));
-
-    // Data file has been removed, metadata is empty.
-    ASSERT_FALSE(env_->FileExists(data_file_name));
-    ASSERT_EQ(0, MetadataEntriesCount(container_name));
-  }
-
-  // Case5: the metadata in rdb has bad value and
-  //        the size of the existing data file is >0.
-  {
-    // Create a container.
-    NO_FATALS(CreateContainer(true));
-
-    // Delete the metadata part.
-    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
-
-    // Create a metadata record with bad value.
-    WriteBadMetadata(container_name, block_id);
-    ASSERT_EQ(1, MetadataEntriesCount(container_name));
-
-    // The container has been repaired.
-    NO_FATALS(CheckRepaired("corrupted_rdb_record_check"));
-
-    // Data file exists, metadata is empty.
-    ASSERT_TRUE(env_->FileExists(data_file_name));
-    ASSERT_EQ(0, MetadataEntriesCount(container_name));
-  }
-
-  // Case6: the existing metadata part has no live blocks and
-  //        the data file has gone missing.
-  {
-    NO_FATALS(CreateContainer(true));
-
-    // Delete the only block.
-    NO_FATALS(DeleteBlock());
-
-    // Delete the data file.
-    ASSERT_OK(env_->DeleteFile(data_file_name));
-
-    // The container is not exist.
-    NO_FATALS(CheckOK());
-
-    // Data file has been removed, metadata is empty.
-    ASSERT_FALSE(env_->FileExists(data_file_name));
-    ASSERT_EQ(0, MetadataEntriesCount(container_name));
-  }
-
-  // Case7: the existing metadata part has no live blocks and
-  //        the size of the existing data file is 0.
-  {
-    NO_FATALS(CreateContainer(true));
-
-    // Delete the only block.
-    NO_FATALS(DeleteBlock());
-
-    // Delete the data file.
-    ASSERT_OK(env_->DeleteFile(data_file_name));
-
-    // Create an empty data file.
-    NO_FATALS(CreateDataFile());
-
-    // The container has been repaired.
-    NO_FATALS(CheckRepaired("incomplete_container_check"));
-
-    // Data file has been removed, metadata is empty.
-    ASSERT_FALSE(env_->FileExists(data_file_name));
-    ASSERT_EQ(0, MetadataEntriesCount(container_name));
-  }
-
-  // Case8: the existing metadata part has no live blocks and
-  //        the size of the existing data file is >0.
-  {
-    NO_FATALS(CreateContainer(true));
-
-    // Delete the only block.
-    NO_FATALS(DeleteBlock());
-
-    // The container is ok.
-    NO_FATALS(CheckOK());
-
-    // Data file exists, but metadata is empty.
-    ASSERT_TRUE(env_->FileExists(data_file_name));
-    ASSERT_EQ(0, MetadataEntriesCount(container_name));
-
-    // Delete the data file to keep path clean.
-    ASSERT_OK(env_->DeleteFile(data_file_name));
-  }
-
-  // TODO(yingchun): need to clear up dead metadata
-  // Case9: the existing metadata part has live blocks and
-  //        the data file has gone missing.
-  {
-    // Create a container.
-    NO_FATALS(CreateContainer(true));
-
-    // Delete the data file.
-    ASSERT_OK(env_->DeleteFile(data_file_name));
-
-    // The container is OK (in fact, it is considered as not exist, because it 
is judged according
-    // to the existence of the data file).
-    NO_FATALS(CheckOK());
-
-    // Data file has been removed, metadata has 1 record.
-    ASSERT_FALSE(env_->FileExists(data_file_name));
-    ASSERT_EQ(1, MetadataEntriesCount(container_name));
-
-    // Delete the metadata part to keep it clean.
-    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
-  }
-
-  // Case10: the existing metadata part has live blocks and
-  //         the size of the existing data file is 0.
-  {
-    // Create a container.
-    NO_FATALS(CreateContainer(true));
-
-    // Delete the data file.
-    ASSERT_OK(env_->DeleteFile(data_file_name));
-
-    // Create an empty data file.
-    NO_FATALS(CreateDataFile());
-
-    // The container has been repaired.
-    NO_FATALS(CheckRepaired("incomplete_container_check"));
-
-    // Data file has been removed, metadata is empty.
-    ASSERT_FALSE(env_->FileExists(data_file_name));
-    ASSERT_EQ(0, MetadataEntriesCount(container_name));
-  }
-
-  // Case11: the existing metadata part has live blocks and
-  //         the size of the existing data file is >0.
-  {
-    // Create a container.
-    NO_FATALS(CreateContainer(true));
-
-    // The container is ok.
-    NO_FATALS(CheckOK());
-
-    ASSERT_TRUE(env_->FileExists(data_file_name));
-    ASSERT_EQ(1, MetadataEntriesCount(container_name));
-  }
-}
-
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 729393474..d8b43e742 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -39,12 +39,6 @@
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
-#include <rocksdb/db.h>
-#include <rocksdb/iterator.h>
-#include <rocksdb/options.h>
-#include <rocksdb/slice.h>
-#include <rocksdb/status.h>
-#include <rocksdb/write_batch.h>
 
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/data_dirs.h"
@@ -57,7 +51,6 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/numbers.h"
-#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -118,12 +111,6 @@ DEFINE_uint64(log_container_preallocate_bytes, 32LU * 1024 
* 1024,
               "creating new blocks. Set to 0 to disable preallocation");
 TAG_FLAG(log_container_preallocate_bytes, advanced);
 
-DEFINE_uint64(log_container_rdb_delete_batch_count, 256,
-              "The batch count for deleting blocks in one operation against 
RocksDB. It is only "
-              "effective when --block_manager='logr'");
-TAG_FLAG(log_container_rdb_delete_batch_count, experimental);
-TAG_FLAG(log_container_rdb_delete_batch_count, advanced);
-
 DEFINE_double(log_container_excess_space_before_cleanup_fraction, 0.10,
               "Additional fraction of a log container's calculated size that "
               "must be consumed on disk before the container is considered to "
@@ -219,7 +206,6 @@ METRIC_DEFINE_gauge_uint64(server, 
log_block_manager_processed_containers_startu
 using kudu::fs::internal::LogBlock;
 using kudu::fs::internal::LogBlockContainer;
 using kudu::fs::internal::LogBlockContainerNativeMeta;
-using kudu::fs::internal::LogBlockContainerRdbMeta;
 using kudu::fs::internal::LogBlockDeletionTransaction;
 using kudu::fs::internal::LogWritableBlock;
 using kudu::pb_util::ReadablePBContainerFile;
@@ -234,8 +220,6 @@ using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
-using strings::SkipEmpty;
-using strings::Split;
 using strings::Substitute;
 
 namespace kudu {
@@ -898,92 +882,6 @@ class LogBlockContainerNativeMeta final : public 
LogBlockContainer {
   DISALLOW_COPY_AND_ASSIGN(LogBlockContainerNativeMeta);
 };
 
-////////////////////////////////////////////////////////////
-// LogBlockContainerRdbMeta
-////////////////////////////////////////////////////////////
-
-// The metadata part is stored in a directory-wide shared RocksDB instance, 
located in
-// "<dir>/rdb/", the keys are prefixed by "<id>.", where the "id" is the 
container's ID.
-class LogBlockContainerRdbMeta final : public LogBlockContainer {
- public:
-  // Creates a new LogBlockContainer managed by 'block_manager' in 'dir', 
'container' will be set as
-  // the newly created container.
-  //
-  // Returns Status::OK() if created successfully, otherwise returns an error.
-  static Status Create(LogBlockManager* block_manager,
-                       Dir* dir,
-                       LogBlockContainerRefPtr* container);
-
-  // Opens an existing LogBlockContainer in 'dir'.
-  //
-  // Every container is comprised of two parts: "<dir>/<id>.data" and
-  // metadata (i.e. key-value pairs in the RocksDB instance). Together,
-  // 'dir' and 'id' fully describe both parts.
-  //
-  // Returns Status::Aborted() in the case that the data part appears to have 
no
-  // data (e.g. due to a crash just after creating it but before writing any
-  // records). This is recorded in 'report'.
-  static Status Open(LogBlockManager* block_manager,
-                     Dir* dir,
-                     FsReport* report,
-                     const string& id,
-                     LogBlockContainerRefPtr* container);
-
-  // Check the container whether it is fine.
-  // Only the data part is checked, the metadata part (i.e. the RocksDB 
instance) has been checked
-  // when open the container, it would fail immediately if it's unhealthy when 
open the data
-  // directory.
-  //
-  // OK: data file of the container exist;
-  // Aborted: the container will be repaired later;
-  // NotFound: data file of the container has gone missing;
-  //
-  // Note: the status here only represents the result of check.
-  static Status CheckContainerFiles(LogBlockManager* block_manager,
-                                    FsReport* report,
-                                    const Dir* dir,
-                                    const string& common_path,
-                                    const string& data_path);
-
-  LogBlockContainerRdbMeta(LogBlockManager* block_manager,
-                           Dir* data_dir,
-                           string id,
-                           shared_ptr<RWFile> data_file,
-                           uint64_t data_file_size);
-
-  ~LogBlockContainerRdbMeta() override;
-
-  Status RemoveBlockIdsFromMetadata(const vector<LogBlockRefPtr>& lbs,
-                                    vector<BlockId>* deleted_block_ids) 
override;
-
-  Status AddBlockIdsToMetadata(const vector<LogWritableBlock*>& blocks) 
override;
-
-  Status SyncMetadata() override;
-
-  Status ProcessRecords(
-      FsReport* report,
-      LogBlockManager::UntrackedBlockMap* live_blocks,
-      LogBlockManager::BlockRecordMap* live_block_records,
-      vector<LogBlockRefPtr>* dead_blocks,
-      uint64_t* max_block_id,
-      ProcessRecordType type) override;
-
- private:
-  void ProcessDeleteRecord(
-      BlockRecordPB* /*record*/,
-      FsReport* /*report*/,
-      LogBlockManager::UntrackedBlockMap* /*live_blocks*/,
-      LogBlockManager::BlockRecordMap* /*live_block_records*/,
-      vector<LogBlockRefPtr>* /*dead_blocks*/,
-      ProcessRecordType /*type*/) override {
-    LOG(FATAL) << Substitute("Container $0 contains a DELETE type record", 
ToString());
-  }
-
-  rocksdb::DB* const rdb_;
-
-  DISALLOW_COPY_AND_ASSIGN(LogBlockContainerRdbMeta);
-};
-
 #define CONTAINER_DISK_FAILURE(status_expr, msg) do { \
   Status s_ = (status_expr); \
   HANDLE_DISK_FAILURE(s_, 
block_manager_->error_manager_->RunErrorNotificationCb( \
@@ -1088,7 +986,6 @@ void LogBlockContainerNativeMeta::CompactMetadata() {
   report.malformed_record_check.emplace();
   report.misaligned_block_check.emplace();
   report.partial_record_check.emplace();
-  report.corrupted_rdb_record_check.emplace();
 
   LogBlockManager::UntrackedBlockMap live_blocks;
   LogBlockManager::BlockRecordMap live_block_records;
@@ -1878,300 +1775,6 @@ void LogBlockContainer::ContainerDeletionAsync(int64_t 
offset, int64_t length) {
                             data_dir()->dir()));
 }
 
-Status LogBlockContainerRdbMeta::Create(LogBlockManager* block_manager,
-                                        Dir* dir,
-                                        LogBlockContainerRefPtr* container) {
-  DCHECK(container);
-  DCHECK(!container->get());
-  string id;
-  string common_path;
-  string data_path;
-  Status metadata_status;
-  Status data_status;
-  shared_ptr<RWFile> data_file;
-
-  // Repeat in the event of a container id collision (unlikely).
-  //
-  // When looping, we delete any created-and-orphaned records.
-  do {
-    id = block_manager->oid_generator()->Next();
-    common_path = JoinPathSegments(dir->dir(), id);
-    data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
-    metadata_status = LogBlockManagerRdbMeta::DeleteContainerMetadata(dir, id);
-    WARN_NOT_OK(metadata_status, "could not delete all orphaned metadata");
-    if (PREDICT_TRUE(block_manager->file_cache_)) {
-      if (data_file) {
-        WARN_NOT_OK(block_manager->file_cache_->DeleteFile(data_path),
-                    "could not delete orphaned data file thru file cache");
-      }
-      data_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>(
-          data_path, &data_file);
-    } else {
-      if (data_file) {
-        WARN_NOT_OK(block_manager->env()->DeleteFile(data_path),
-                    "could not delete orphaned data file");
-      }
-      unique_ptr<RWFile> rwf;
-      RWFileOptions rw_opts;
-
-      rw_opts.mode = Env::MUST_CREATE;
-      rw_opts.is_sensitive = true;
-      data_status = block_manager->env()->NewRWFile(
-          rw_opts, data_path, &rwf);
-      data_file.reset(rwf.release());
-    }
-  } while (PREDICT_FALSE(data_status.IsAlreadyPresent()));
-  if (metadata_status.ok() && data_status.ok()) {
-    container->reset(new LogBlockContainerRdbMeta(block_manager,
-                                                  dir,
-                                                  id,
-                                                  std::move(data_file),
-                                                  0 /*data_file_size*/));
-    VLOG(1) << "Created log block container " << (*container)->ToString();
-  }
-
-  // Prefer metadata status (arbitrarily).
-  auto em = block_manager->error_manager();
-  HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb(
-      ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id()));
-  HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(
-      ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id()));
-  return !metadata_status.ok() ? metadata_status : data_status;
-}
-
-Status LogBlockContainerRdbMeta::Open(LogBlockManager* block_manager,
-                                      Dir* dir,
-                                      FsReport* report,
-                                      const string& id,
-                                      LogBlockContainerRefPtr* container) {
-  DCHECK(container);
-  DCHECK(!container->get());
-  string common_path = JoinPathSegments(dir->dir(), id);
-  string data_path = StrCat(common_path, 
LogBlockManager::kContainerDataFileSuffix);
-  RETURN_NOT_OK(CheckContainerFiles(block_manager, report, dir, common_path, 
data_path));
-
-  // Open the existing data file for writing.
-  shared_ptr<RWFile> data_file;
-  if (PREDICT_TRUE(block_manager->file_cache_)) {
-    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(
-        block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(data_path, 
&data_file));
-  } else {
-    RWFileOptions opts;
-    opts.mode = Env::MUST_EXIST;
-    opts.is_sensitive = true;
-    unique_ptr<RWFile> rwf;
-    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(
-        opts, data_path, &rwf));
-    data_file.reset(rwf.release());
-  }
-
-  uint64_t data_file_size;
-  RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size));
-
-  // Create the in-memory container and populate it.
-  LogBlockContainerRefPtr open_container(new LogBlockContainerRdbMeta(
-      block_manager, dir, id, std::move(data_file), data_file_size));
-  VLOG(1) << "Opened log block container " << open_container->ToString();
-  container->swap(open_container);
-  return Status::OK();
-}
-
-Status LogBlockContainerRdbMeta::CheckContainerFiles(LogBlockManager* 
block_manager,
-                                                     FsReport* report,
-                                                     const Dir* dir,
-                                                     const string& common_path,
-                                                     const string& data_path) {
-  Env* env = block_manager->env();
-  uint64_t data_size = 0;
-  Status s_data = env->GetFileSize(data_path, &data_size);
-  if (!s_data.ok() && !s_data.IsNotFound()) {
-    s_data = s_data.CloneAndPrepend("unable to determine data file size");
-    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(s_data);
-  }
-
-  const auto kEncryptionHeaderSize = env->GetEncryptionHeaderSize();
-
-  // TODO(yingchun): In the case of data file doesn't exist but metadata 
records in RocksDB exist,
-  //                 the garbage data in RocksDB has no chance to be cleaned 
up, because the
-  //                 container is not found (since the data file is not found) 
when loading
-  //                 containers. Consider to clean up these garbage data.
-  // Check data file exists and has valid length.
-  if (PREDICT_FALSE(data_size <= kEncryptionHeaderSize)) {
-    report->incomplete_container_check->entries.emplace_back(common_path);
-    return Status::Aborted(Substitute("orphaned empty or invalid length file 
$0", common_path));
-  }
-
-  return Status::OK();
-}
-
-LogBlockContainerRdbMeta::LogBlockContainerRdbMeta(
-    LogBlockManager* block_manager,
-    Dir* data_dir,
-    string id,
-    shared_ptr<RWFile> data_file,
-    uint64_t data_file_size)
-    : LogBlockContainer(block_manager, data_dir, std::move(id),
-                        std::move(data_file), data_file_size),
-      rdb_(down_cast<RdbDir*>(data_dir)->rdb()) {
-}
-
-LogBlockContainerRdbMeta::~LogBlockContainerRdbMeta() {
-  if (dead()) {
-    CHECK(!block_manager_->opts_.read_only);
-    string data_file_name = data_file_->filename();
-    string data_failure_msg =
-        "Could not delete dead container data file " + data_file_name;
-    
CONTAINER_DISK_FAILURE(LogBlockManagerRdbMeta::DeleteContainerMetadata(data_dir_,
 id_),
-                           "Could not delete dead container metadata records " 
+ id_);
-    if (PREDICT_TRUE(block_manager_->file_cache_)) {
-      
CONTAINER_DISK_FAILURE(block_manager_->file_cache_->DeleteFile(data_file_name),
-                             data_failure_msg);
-    } else {
-      CONTAINER_DISK_FAILURE(block_manager_->env_->DeleteFile(data_file_name),
-                             data_failure_msg);
-    }
-  }
-  data_file_.reset();
-}
-
-Status LogBlockContainerRdbMeta::ProcessRecords(
-    FsReport* report,
-    LogBlockManager::UntrackedBlockMap* live_blocks,
-    LogBlockManager::BlockRecordMap* live_block_records,
-    vector<LogBlockRefPtr>* dead_blocks,
-    uint64_t* max_block_id,
-    ProcessRecordType type) {
-  int count = 0;
-  SCOPED_LOG_SLOW_EXECUTION(INFO, 100, Substitute("process $0 records in $1",
-                                                  count, data_dir_->dir()));
-  rocksdb::Slice begin_key = id_;
-  string next_id = ObjectIdGenerator::NextOf(id_);
-  rocksdb::Slice end_key = next_id;
-  rocksdb::ReadOptions scan_opt;
-  // Take advantage of Prefix-Seek, see 
https://github.com/facebook/rocksdb/wiki/Prefix-Seek.
-  scan_opt.prefix_same_as_start = true;
-  scan_opt.total_order_seek = false;
-  scan_opt.auto_prefix_mode = false;
-  scan_opt.readahead_size = 2 << 20;  // TODO(yingchun): make this 
configurable.
-  scan_opt.iterate_upper_bound = &end_key;
-
-  uint64_t data_file_size = 0;
-  BlockRecordPB record;
-  unique_ptr<rocksdb::Iterator> it(rdb_->NewIterator(scan_opt));
-  it->Seek(begin_key);
-  while (it->Valid() && it->key().starts_with(begin_key)) {
-    if (PREDICT_FALSE(!record.ParseFromArray(it->value().data(), 
it->value().size()))) {
-      report->corrupted_rdb_record_check->entries.emplace_back(ToString(), 
it->key().ToString());
-      LOG(WARNING) << Substitute("Parse metadata record in rocksdb failed, 
path: $0, key: $1",
-                                 ToString(),
-                                 it->key().ToString());
-      it->Next();
-      continue;
-    }
-
-    RETURN_NOT_OK(ProcessRecord(&record, report,
-                                live_blocks, live_block_records, dead_blocks,
-                                &data_file_size, max_block_id, type));
-    it->Next();
-    count++;
-  }
-
-  Status s = FromRdbStatus(it->status());
-  if (PREDICT_FALSE(!s.ok())) {
-    WARN_NOT_OK(s, Substitute("failed to read metadata records from RocksDB 
$0", ToString()));
-    HandleError(s);
-    return s;
-  }
-
-  return Status::OK();
-}
-
-Status LogBlockContainerRdbMeta::RemoveBlockIdsFromMetadata(
-    const vector<LogBlockRefPtr>& lbs, vector<BlockId>* deleted_block_ids) {
-  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
-  // Note: We don't check for sufficient disk space for metadata writes in
-  // order to allow for block deletion on full disks.
-  DCHECK(deleted_block_ids);
-  // Perform batch delete has a better performance than single deletes.
-  rocksdb::WriteBatch batch;
-  // The 'keys' is used to keep the lifetime of the data referenced by Slices 
in 'batch'.
-  vector<string> keys;
-  keys.reserve(lbs.size());
-  for (const auto& lb : lbs) {
-    deleted_block_ids->emplace_back(lb->block_id());
-
-    // Construct key.
-    keys.emplace_back(LogBlockManagerRdbMeta::ConstructRocksDBKey(id_, 
lb->block_id()));
-
-    // Add the delete operation to batch.
-    
RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(batch.Delete(rocksdb::Slice(keys.back()))));
-
-    // Tune --log_container_rdb_delete_batch_count to achieve better 
performance.
-    if (batch.Count() >= FLAGS_log_container_rdb_delete_batch_count) {
-      RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(rdb_->Write({}, &batch)));
-      batch.Clear();
-      keys.clear();
-    }
-  }
-
-  if (batch.Count() > 0) {
-    RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(rdb_->Write({}, &batch)));
-  }
-
-  return Status::OK();
-}
-
-Status LogBlockContainerRdbMeta::AddBlockIdsToMetadata(
-    const vector<LogWritableBlock*>& blocks) {
-  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
-  BlockRecordPB record;
-  record.set_op_type(CREATE);
-  record.set_timestamp_us(GetCurrentTimeMicros());
-
-  // Perform batch put has a better performance than single puts.
-  rocksdb::WriteBatch batch;
-  // The 'keys' and 'values' are used to keep the lifetime of the data 
referenced by Slices
-  // in 'batch'.
-  vector<string> keys;
-  keys.reserve(blocks.size());
-  vector<string> values;
-  values.reserve(blocks.size());
-  for (const auto* block : blocks) {
-    // Construct key.
-    keys.emplace_back(LogBlockManagerRdbMeta::ConstructRocksDBKey(id_, 
block->id()));
-
-    // Construct value.
-    block->id().CopyToPB(record.mutable_block_id());
-    record.set_offset(block->block_offset());
-    record.set_length(block->block_length());
-    string value;
-    record.SerializeToString(&value);
-    values.emplace_back(value);
-
-    // Add the put operation to batch.
-    RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(batch.Put(
-        rocksdb::Slice(keys.back()), rocksdb::Slice(values.back()))));
-  }
-
-  // TODO(yingchun): Add a flag similar to 
--log_container_rdb_delete_batch_count to add metadata
-  //  records in batch.
-  rocksdb::Status s = rdb_->Write({}, &batch);
-  RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(s));
-
-  return Status::OK();
-}
-
-Status LogBlockContainerRdbMeta::SyncMetadata() {
-  VLOG(3) << "Syncing WAL of RocksDB in " << data_dir_->dir();
-  // TODO(yingchun): It's too costly to
-//  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
-//  if (FLAGS_enable_data_block_fsync) {
-//    if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
-//    RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(rdb_->SyncWAL()));
-//  }
-  return Status::OK();
-}
-
 ///////////////////////////////////////////////////////////
 // LogBlockCreationTransaction
 ////////////////////////////////////////////////////////////
@@ -2372,7 +1975,6 @@ struct LogBlockContainerLoadResult {
     report.malformed_record_check.emplace();
     report.misaligned_block_check.emplace();
     report.partial_record_check.emplace();
-    report.corrupted_rdb_record_check.emplace();
   }
 };
 
@@ -3384,17 +2986,8 @@ void LogBlockManager::OpenDataDir(
     Status* result_status,
     std::atomic<int>* containers_processed,
     std::atomic<int>* containers_total) {
-  Status s = dir->Prepare();
-  if (!s.ok()) {
-    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
-        ErrorHandlerType::DISK_ERROR, dir));
-    *result_status = s.CloneAndPrepend(Substitute(
-        "Could not initialize $0", dir->dir()));
-    return;
-  }
-
   vector<string> children;
-  s = env_->GetChildren(dir->dir(), &children);
+  Status s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
     HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
         ErrorHandlerType::DISK_ERROR, dir, tenant_id()));
@@ -3405,7 +2998,7 @@ void LogBlockManager::OpenDataDir(
 
   // Find all containers and open them.
   unordered_set<string> containers_seen;
-  results->reserve(EstimateContainerCount(children.size()));
+  results->reserve(children.size() / 2);
   for (const string& child : children) {
     string container_name;
     if (!TryStripSuffixString(
@@ -3668,6 +3261,7 @@ void LogBlockManagerNativeMeta::FindContainersToRepair(
   }
 }
 
+
 Status LogBlockManager::DoRepair(
     Dir* /*dir*/,
     FsReport* report,
@@ -3989,135 +3583,5 @@ int64_t LogBlockManager::LookupBlockLimit(int64_t 
fs_block_size) {
   return kPerFsBlockSizeBlockLimits.begin()->second;
 }
 
-Status LogBlockManagerRdbMeta::CreateContainer(Dir* dir, 
LogBlockContainerRefPtr* container) {
-  return LogBlockContainerRdbMeta::Create(this, dir, container);
-}
-
-Status LogBlockManagerRdbMeta::OpenContainer(Dir* dir,
-                                             FsReport* report,
-                                             const string& id,
-                                             LogBlockContainerRefPtr* 
container) {
-  return LogBlockContainerRdbMeta::Open(this, dir, report, id, container);
-}
-
-void LogBlockManagerRdbMeta::FindContainersToRepair(
-    FsReport* report,
-    const ContainerBlocksByName& low_live_block_containers,
-    ContainersByName* containers_by_name) {
-  LogBlockManager::FindContainersToRepair(report, low_live_block_containers, 
containers_by_name);
-  if (report->corrupted_rdb_record_check) {
-    for (const auto& e : report->corrupted_rdb_record_check->entries) {
-      LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, 
e.container);
-      if (c) {
-        (*containers_by_name)[e.container] = c;
-      }
-    }
-  }
-}
-
-Status LogBlockManagerRdbMeta::DoRepair(
-    Dir* dir,
-    FsReport* report,
-    const ContainerBlocksByName& low_live_block_containers,
-    const ContainersByName& containers_by_name) {
-  RETURN_NOT_OK(LogBlockManager::DoRepair(dir, report, 
low_live_block_containers,
-                                          containers_by_name));
-  // Delete any incomplete container files.
-  //
-  // This is a non-fatal inconsistency; we can just as easily ignore the
-  // leftover container files.
-  if (report->incomplete_container_check) {
-    for (auto& ic : report->incomplete_container_check->entries) {
-      // Delete the metadata records.
-      // The ic.container is constructed as "<dir>/<cid>", where the "cid" is 
the container's id.
-      vector<string> dir_and_cid = Split(ic.container, "/", SkipEmpty());
-      CHECK(!dir_and_cid.empty());
-      const auto &cid = dir_and_cid.back();
-      WARN_NOT_OK_LBM_DISK_FAILURE(DeleteContainerMetadata(dir, cid),
-                                   "could not delete incomplete container 
metadata records");
-      // Delete the data file.
-      Status s = env_->DeleteFile(StrCat(ic.container, 
kContainerDataFileSuffix));
-      if (!s.ok() && !s.IsNotFound()) {
-        WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container 
data file");
-      }
-      ic.repaired = true;
-    }
-  }
-
-  // Delete any corrupted records from RocksDB.
-  //
-  // This is a non-fatal inconsistency; we can just as easily delete the
-  // records.
-  if (report->corrupted_rdb_record_check) {
-    for (auto& e : report->corrupted_rdb_record_check->entries) {
-      // TODO(yingchun): optimize the delete operations to delete in batch.
-      rocksdb::Slice key(e.rocksdb_key);
-      auto s = down_cast<RdbDir*>(dir)->rdb()->Delete({}, key);
-      WARN_NOT_OK_LBM_DISK_FAILURE(
-          FromRdbStatus(s),
-          Substitute("could not delete container corrupted metadata record 
$0", e.rocksdb_key));
-      e.repaired = true;
-    }
-  }
-
-  return Status::OK();
-}
-
-Status LogBlockManagerRdbMeta::DeleteContainerMetadata(Dir* dir, const string& 
id) {
-  rocksdb::Slice begin_key(id);
-  string next_id = ObjectIdGenerator::NextOf(id);
-  rocksdb::Slice end_key(next_id);
-  auto* rdb = down_cast<RdbDir*>(dir)->rdb();
-  // TODO(yingchun): DeleteRange() seems not stable, it is declared as now 
usable in production.
-  //  See 
https://github.com/facebook/rocksdb/blob/v7.7.3/include/rocksdb/db.h#L475
-  // auto s = rdb->DeleteRange(
-  //     rocksdb::WriteOptions(), rdb->DefaultColumnFamily(), begin_key, 
end_key);
-  int count = 0;
-  SCOPED_LOG_SLOW_EXECUTION(INFO, 100, Substitute("delete $0 metadata records 
from $1",
-                                                  count, dir->dir()));
-  // Perform batch delete has a better performance than single deletes.
-  rocksdb::WriteBatch batch;
-  // The 'keys' is used to keep the lifetime of the data referenced by Slices 
in 'batch'.
-  vector<string> keys;
-  keys.reserve(FLAGS_log_container_rdb_delete_batch_count);
-
-  rocksdb::ReadOptions scan_opt;
-  // Take advantage of Prefix-Seek, see 
https://github.com/facebook/rocksdb/wiki/Prefix-Seek.
-  scan_opt.prefix_same_as_start = true;
-  scan_opt.total_order_seek = false;
-  scan_opt.auto_prefix_mode = false;
-  scan_opt.readahead_size = 2 << 20;  // TODO(yingchun): make this 
configurable.
-  scan_opt.iterate_upper_bound = &end_key;
-  unique_ptr<rocksdb::Iterator> it(rdb->NewIterator(scan_opt));
-  it->Seek(begin_key);
-  while (it->Valid() && it->key().starts_with(begin_key)) {
-    keys.emplace_back(it->key().ToString());
-
-    // Add the delete operation to batch.
-    RETURN_NOT_OK(FromRdbStatus(batch.Delete(rocksdb::Slice(keys.back()))));
-
-    // Tune --log_container_rdb_delete_batch_count to achieve better 
performance.
-    if (batch.Count() >= FLAGS_log_container_rdb_delete_batch_count) {
-      RETURN_NOT_OK(FromRdbStatus(rdb->Write({}, &batch)));
-      batch.Clear();
-      keys.clear();
-    }
-
-    it->Next();
-    count++;
-  }
-
-  if (batch.Count() > 0) {
-    RETURN_NOT_OK(FromRdbStatus(rdb->Write({}, &batch)));
-  }
-
-  return Status::OK();
-}
-
-std::string LogBlockManagerRdbMeta::ConstructRocksDBKey(
-    const std::string& container_id, const BlockId& block_id) {
-  return Substitute("$0.$1", container_id, block_id.ToString());
-}
-
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 00d3ee849..f2d20bb57 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -18,7 +18,6 @@
 #pragma once
 
 #include <atomic>
-#include <cstddef>
 #include <cstdint>
 #include <deque>
 #include <map>
@@ -59,7 +58,6 @@ namespace internal {
 class LogBlock;
 class LogBlockContainer;
 class LogBlockContainerNativeMeta;
-class LogBlockContainerRdbMeta;
 class LogBlockDeletionTransaction;
 class LogWritableBlock;
 struct LogBlockContainerLoadResult;
@@ -69,7 +67,6 @@ struct LogBlockManagerMetrics;
 typedef scoped_refptr<internal::LogBlock> LogBlockRefPtr;
 typedef scoped_refptr<internal::LogBlockContainer> LogBlockContainerRefPtr;
 typedef scoped_refptr<internal::LogBlockContainerNativeMeta> 
LogBlockContainerNativeMetaRefPtr;
-typedef scoped_refptr<internal::LogBlockContainerRdbMeta> 
LogBlockContainerRdbMetaRefPtr;
 typedef std::unordered_map<std::string, std::vector<BlockRecordPB>> 
ContainerBlocksByName;
 typedef std::unordered_map<std::string, LogBlockContainerRefPtr> 
ContainersByName;
 
@@ -218,13 +215,13 @@ class LogBlockManager : public BlockManager {
                   BlockManagerOptions opts,
                   std::string tenant_id);
 
-  FRIEND_TEST(LogBlockManagerNativeMetaTest, TestMetadataTruncation);
   FRIEND_TEST(LogBlockManagerTest, TestAbortBlock);
   FRIEND_TEST(LogBlockManagerTest, TestCloseFinalizedBlock);
   FRIEND_TEST(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup);
   FRIEND_TEST(LogBlockManagerTest, TestFinalizeBlock);
   FRIEND_TEST(LogBlockManagerTest, TestLIFOContainerSelection);
   FRIEND_TEST(LogBlockManagerTest, TestLookupBlockLimit);
+  FRIEND_TEST(LogBlockManagerTest, TestMetadataTruncation);
   FRIEND_TEST(LogBlockManagerTest, TestMisalignedBlocksFuzz);
   FRIEND_TEST(LogBlockManagerTest, TestParseKernelRelease);
   FRIEND_TEST(LogBlockManagerTest, TestBumpBlockIds);
@@ -233,9 +230,9 @@ class LogBlockManager : public BlockManager {
 
   friend class internal::LogBlockContainer;
   friend class internal::LogBlockContainerNativeMeta;
-  friend class internal::LogBlockContainerRdbMeta;
   friend class internal::LogBlockDeletionTransaction;
   friend class internal::LogWritableBlock;
+  friend class LogBlockManagerTest;
 
   // Type for the actual block map used to store all live blocks.
   // We use sparse_hash_map<> here to reduce memory overhead.
@@ -419,9 +416,6 @@ class LogBlockManager : public BlockManager {
                    std::atomic<int>* containers_processed = nullptr,
                    std::atomic<int>* containers_total = nullptr);
 
-  // Estimate the count of containers according to the count of children 
directories.
-  virtual size_t EstimateContainerCount(size_t children_count) const = 0;
-
   // Reads records from one log block container in the data directory.
   // The result details will be collected into 'result'.
   void LoadContainer(Dir* dir,
@@ -543,8 +537,6 @@ class LogBlockManager : public BlockManager {
 // metadata is simple and performant at open time.
 class LogBlockManagerNativeMeta : public LogBlockManager {
  public:
-  static constexpr const char* const name() { return "log"; }
-
   LogBlockManagerNativeMeta(Env* env,
                             scoped_refptr<DataDirManager> dd_manager,
                             scoped_refptr<FsErrorManager> error_manager,
@@ -556,10 +548,6 @@ class LogBlockManagerNativeMeta : public LogBlockManager {
   }
 
  private:
-  size_t EstimateContainerCount(size_t children_count) const override {
-    return children_count / 2;
-  }
-
   Status CreateContainer(Dir* dir, LogBlockContainerRefPtr* container) 
override;
 
   // Returns Status::Aborted() in the case that the metadata and data files
@@ -589,84 +577,6 @@ class LogBlockManagerNativeMeta : public LogBlockManager {
                   const ContainersByName& containers_by_name) override;
 };
 
-// All the container's metadata is written into a RocksDB instance which is
-// shared by all containers in the same data directory.
-// The metadata records in RocksDB are all of CREATE type, the records are
-// deleted from RocksDB when the corresponding blocks are being removed from
-// the block manager. The data in the RocksDB instance is compacted in
-// background automatically, see 
https://github.com/facebook/rocksdb/wiki/Compaction.
-//
-// Comparing with the LogBlockManagerNativeMeta, the LogBlockManagerRdbMeta 
has a
-// better performance on opening containers, the latter only contains live 
block
-// records, while the former needs to scan through all the records in the 
metadata
-// which may adversely affect the performance when the live block ratio is 
very low.
-// RocksDB provides many configuration options, we can tune the performance, 
minimize
-// the read/write and space amplification.
-class LogBlockManagerRdbMeta : public LogBlockManager {
- public:
-  static constexpr const char* const name() { return "logr"; }
-
-  LogBlockManagerRdbMeta(Env* env,
-                         scoped_refptr<DataDirManager> dd_manager,
-                         scoped_refptr<FsErrorManager> error_manager,
-                         FileCache* file_cache,
-                         BlockManagerOptions opts,
-                         std::string tenant_id)
-      : LogBlockManager(env, std::move(dd_manager), std::move(error_manager),
-                        file_cache, std::move(opts), std::move(tenant_id)) {
-  }
-
- private:
-  friend class internal::LogBlockContainerRdbMeta;
-  friend class RdbMetadataLBMCorruptor;
-  FRIEND_TEST(LogBlockManagerRdbMetaTest, TestHalfPresentContainer);
-
-  size_t EstimateContainerCount(size_t children_count) const override {
-    // TODO(yingchun): exclude the "rdb" directory when get the children count.
-    return children_count;
-  }
-
-  Status CreateContainer(Dir* dir, LogBlockContainerRefPtr* container) 
override;
-
-  // Returns Status::Aborted() in the case that the data part appears to have 
no
-  // data (e.g. due to a crash just after creating it but before writing any
-  // records).
-  Status OpenContainer(Dir* dir,
-                       FsReport* report,
-                       const std::string& id,
-                       LogBlockContainerRefPtr* container) override;
-
-  void FindContainersToRepair(FsReport* report,
-                              const ContainerBlocksByName& 
low_live_block_containers,
-                              ContainersByName* containers_by_name) override;
-
-  // Do the repair work.
-  //
-  // The following repairs will be performed:
-  // 1. Repairs in LogBlockManager::DoRepair().
-  // 2. Container data files in 'report->incomplete_container_check' will be 
deleted, and the
-  //    records of these containers in RocksDB will be deleted as well.
-  // 3. Container metadata records in 'report->corrupted_rdb_record_check' 
will be deleted from
-  //    RocksDB.
-  //
-  // Returns an error if repairing a fatal inconsistency failed.
-  Status DoRepair(Dir* dir,
-                  FsReport* report,
-                  const ContainerBlocksByName& low_live_block_containers,
-                  const ContainersByName& containers_by_name) override;
-
-  // Delete all the metadata part of a container.
-  //
-  // The metadata records are prefixed by the container's 'id' of the RocksDB 
instance in 'dir'.
-  static Status DeleteContainerMetadata(Dir* dir, const std::string& id);
-
-  // Construct the key used in RocksDB.
-  //
-  // The key is constructed by the container's 'container_id' and the block's 
'block_id'.
-  static std::string ConstructRocksDBKey(const std::string& container_id,
-                                         const BlockId& block_id);
-};
-
 } // namespace fs
 } // namespace kudu
 
diff --git a/src/kudu/integration-tests/CMakeLists.txt 
b/src/kudu/integration-tests/CMakeLists.txt
index 008a1539d..0e7ca24b5 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -55,7 +55,7 @@ add_dependencies(itest_util
   kudu-tserver)
 
 # Tests
-SET_KUDU_TEST_LINK_LIBS(itest_util gumbo-parser gumbo-query rocksdb)
+SET_KUDU_TEST_LINK_LIBS(itest_util gumbo-parser gumbo-query)
 ADD_KUDU_TEST(all_types-itest
   PROCESSORS 4
   NUM_SHARDS 8)
diff --git a/src/kudu/integration-tests/dense_node-itest.cc 
b/src/kudu/integration-tests/dense_node-itest.cc
index afd849e6a..c63ce2b7e 100644
--- a/src/kudu/integration-tests/dense_node-itest.cc
+++ b/src/kudu/integration-tests/dense_node-itest.cc
@@ -22,7 +22,6 @@
 #include <memory>
 #include <ostream>
 #include <string>
-#include <tuple>
 #include <utility>
 #include <vector>
 
@@ -31,8 +30,6 @@
 #include <gtest/gtest.h>
 
 #include "kudu/client/schema.h"
-#include "kudu/fs/block_manager.h"
-#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
@@ -78,7 +75,6 @@ using client::KuduColumnSchema;
 using client::KuduSchema;
 using client::KuduSchemaBuilder;
 using cluster::ExternalMiniClusterOptions;
-using kudu::fs::BlockManager;
 using std::pair;
 using std::string;
 using std::unique_ptr;
@@ -87,13 +83,10 @@ using strings::Substitute;
 
 class DenseNodeTest :
   public ExternalMiniClusterITestBase,
-  public testing::WithParamInterface<std::tuple<bool, string>> {
+  public testing::WithParamInterface<bool> {
 };
 
-INSTANTIATE_TEST_SUITE_P(, DenseNodeTest,
-                         ::testing::Combine(
-                             ::testing::Bool(),
-                             
::testing::ValuesIn(BlockManager::block_manager_types())));
+INSTANTIATE_TEST_SUITE_P(, DenseNodeTest, testing::Values(false, true));
 
 // Integration test that simulates "dense" Kudu nodes.
 //
@@ -159,15 +152,11 @@ TEST_P(DenseNodeTest, RunTest) {
     opts.extra_master_flags.emplace_back("--never_fsync=false");
   }
 
-  if (std::get<0>(GetParam())) {
+  if (GetParam()) {
     opts.extra_master_flags.emplace_back("--encrypt_data_at_rest=true");
     opts.extra_tserver_flags.emplace_back("--encrypt_data_at_rest=true");
   }
 
-  FLAGS_block_manager = std::get<1>(GetParam());
-  opts.extra_master_flags.emplace_back(Substitute("--block_manager=$0", 
FLAGS_block_manager));
-  opts.extra_tserver_flags.emplace_back(Substitute("--block_manager=$0", 
FLAGS_block_manager));
-
   // With the amount of data we're going to write, we need to make sure the
   // tserver has enough time to start back up (startup is only considered to be
   // "complete" when the tserver has loaded all fs metadata from disk).
@@ -210,7 +199,7 @@ TEST_P(DenseNodeTest, RunTest) {
   // metrics are logged so that they're easier to find in the log output.
   vector<pair<string, int64_t>> metrics;
   vector<GaugePrototype<uint64>*> metric_prototypes;
-  if (FsManager::IsLogType(FLAGS_block_manager)) {
+  if (FLAGS_block_manager == "log") {
     metric_prototypes = { &METRIC_log_block_manager_blocks_under_management,
                           &METRIC_log_block_manager_bytes_under_management,
                           &METRIC_log_block_manager_containers,
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc 
b/src/kudu/integration-tests/ts_recovery-itest.cc
index f501fe349..b74fadc84 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -28,7 +28,6 @@
 #include <unordered_set>
 #include <vector>
 
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
@@ -79,8 +78,6 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-DECLARE_string(block_manager);
-
 METRIC_DECLARE_gauge_uint64(tablets_num_failed);
 
 using kudu::client::KuduClient;
@@ -126,9 +123,6 @@ int IntToKey(int i) {
 class TsRecoveryITest : public ExternalMiniClusterITestBase,
                         public ::testing::WithParamInterface<string> {
  public:
-  TsRecoveryITest() {
-    FLAGS_block_manager = GetParam();
-  }
 
  protected:
   void StartClusterOneTs(vector<string> extra_tserver_flags = {},
@@ -140,7 +134,7 @@ void TsRecoveryITest::StartClusterOneTs(vector<string> 
extra_tserver_flags,
                                         vector<string> extra_master_flags) {
   ExternalMiniClusterOptions opts;
   opts.num_tablet_servers = 1;
-  opts.block_manager_type = FLAGS_block_manager;
+  opts.block_manager_type = GetParam();
   opts.extra_tserver_flags = std::move(extra_tserver_flags);
   opts.extra_master_flags = std::move(extra_master_flags);
   StartClusterWithOpts(opts);
@@ -242,7 +236,7 @@ TEST_P(TsRecoveryITest, 
TestTabletRecoveryAfterSegmentDelete) {
 // Test for KUDU-2202 that ensures that blocks not found in the FS layer but
 // that are referenced by a tablet will not be reused.
 TEST_P(TsRecoveryITest, TestNoBlockIDReuseIfMissingBlocks) {
-  if (FLAGS_block_manager != "log" && FLAGS_block_manager != "logr") {
+  if (GetParam() != "log") {
     GTEST_SKIP() << "Missing blocks is currently only supported by the log "
                     "block manager. Exiting early!";
   }
@@ -314,8 +308,7 @@ TEST_P(TsRecoveryITest, TestNoBlockIDReuseIfMissingBlocks) {
     vector<string> children;
     ASSERT_OK(env_->GetChildren(data_dir, &children));
     for (const string& child : children) {
-      if (child != "." && child != ".." &&
-          child != fs::kInstanceMetadataFileName && child != "rdb") {
+      if (child != "." && child != ".." && child != 
fs::kInstanceMetadataFileName) {
         ASSERT_OK(env_->DeleteFile(JoinPathSegments(data_dir, child)));
       }
     }
@@ -593,7 +586,7 @@ TEST_P(TsRecoveryITestDeathTest, RecoverFromOpIdOverflow) {
     FsManagerOpts opts;
     opts.wal_root = ets->wal_dir();
     opts.data_roots = ets->data_dirs();
-    opts.block_manager_type = FLAGS_block_manager;
+    opts.block_manager_type = GetParam();
     unique_ptr<FsManager> fs_manager(new FsManager(env_, opts));
     ASSERT_OK(fs_manager->Open());
     scoped_refptr<ConsensusMetadataManager> cmeta_manager(
@@ -787,8 +780,7 @@ class UpdaterThreads {
 // these updates would be mistakenly considered as "already flushed", despite
 // the fact that they were only written to the input rowset's memory stores, 
and
 // never hit disk.
-class Kudu969Test : public ExternalMiniClusterITestBase,
-                    public ::testing::WithParamInterface<string> {
+class Kudu969Test : public TsRecoveryITest {
 };
 INSTANTIATE_TEST_SUITE_P(DifferentFaultPoints,
                          Kudu969Test,
diff --git a/src/kudu/server/CMakeLists.txt b/src/kudu/server/CMakeLists.txt
index 188104e26..4ea34145e 100644
--- a/src/kudu/server/CMakeLists.txt
+++ b/src/kudu/server/CMakeLists.txt
@@ -83,8 +83,6 @@ SET_KUDU_TEST_LINK_LIBS(
   kudu_curl_util
   mini_kdc
   server_process
-  security_test_util
-  rocksdb
-  lz4)
+  security_test_util)
 ADD_KUDU_TEST(rpc_server-test)
 ADD_KUDU_TEST(webserver-test)
diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index 6374bae64..1d9abfe1b 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -1529,7 +1529,7 @@ TEST_F(TestCompaction, TestCompactionFreesDiskSpace) {
 // Regression test for KUDU-1237, a bug in which empty flushes or compactions
 // would result in orphaning near-empty cfile blocks on the disk.
 TEST_F(TestCompaction, TestEmptyFlushDoesntLeakBlocks) {
-  if (FLAGS_block_manager != "log" && FLAGS_block_manager != "logr") {
+  if (FLAGS_block_manager != "log") {
     GTEST_SKIP() << "Test requires the log block manager";
   }
 
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index c31c564fa..452ded584 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -142,7 +142,6 @@ set(KUDU_CLI_TOOL_LINK_LIBS
   kudu_util
   log
   master
-  rocksdb
   tablet
   tool_proto
   transactions
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 5dd5487c8..cb15e927c 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2112,10 +2112,6 @@ TEST_F(ToolTest, TestPbcTools) {
 }
 
 TEST_F(ToolTest, TestPbcToolsOnMultipleBlocks) {
-  // TODO(yingchun): `kudu rdb dump` and `kudu rdb edit` have to adapt to 
"logr" block manager.
-  if (FLAGS_block_manager == "logr") {
-    GTEST_SKIP() << "Skipping test, logr block manager is not yet supported";
-  }
   const int kNumCFileBlocks = 1024;
   const int kNumEntries = 1;
   const string kTestDir = GetTestPath("test");
@@ -4571,16 +4567,6 @@ TEST_F(ToolTest, TestLocalReplicaDelete) {
   const string& data_dir = JoinPathSegments(tserver_dir, "data");
   uint64_t size_before_delete;
   ASSERT_OK(env_->GetFileSizeOnDiskRecursively(data_dir, &size_before_delete));
-  const auto ExcludeIgnoredDirIfNeeded = [&] (uint64_t* total_size) {
-    if (FLAGS_block_manager == "logr") {
-      // Exclude the RocksDB data size.
-      uint64_t size_of_rdb;
-      ASSERT_OK(
-          env_->GetFileSizeOnDiskRecursively(JoinPathSegments(data_dir, 
"rdb"), &size_of_rdb));
-      *total_size -= size_of_rdb;
-    }
-  };
-  ExcludeIgnoredDirIfNeeded(&size_before_delete);
   NO_FATALS(RunActionStdoutNone(Substitute("local_replica delete $0 
--fs_wal_dir=$1 "
                                            "--fs_data_dirs=$1 --clean_unsafe",
                                            tablet_id, tserver_dir)));
@@ -4616,7 +4602,6 @@ TEST_F(ToolTest, TestLocalReplicaDelete) {
   // indicates that some data has been deleted from disk.
   uint64_t size_after_delete;
   ASSERT_OK(env_->GetFileSizeOnDiskRecursively(data_dir, &size_after_delete));
-  ExcludeIgnoredDirIfNeeded(&size_after_delete);
   ASSERT_LT(size_after_delete, size_before_delete);
 
   // Since there was only one tablet on the node which was deleted by tool,
diff --git a/src/kudu/tserver/tablet_server-test.cc 
b/src/kudu/tserver/tablet_server-test.cc
index 6b1229776..7a204a5e8 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -758,11 +758,6 @@ TEST_P(TabletServerDiskSpaceTest, TestFullGroupAddsDir) {
   ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId));
 
   // The group should be the updated even after restarting the tablet server.
-  // When --block_manager == "logr", it's necessary to reset 'dd_manager', 
releasing
-  // the last 'dd_manager' reference.  That's to allow for releasing the 
RocksDB LOCK file.
-  // Otherwise the 'mini_server_' will fail starting later because of the 
failure to acquire
-  // the RocksDB LOCK file.  Reset it in any case, there is no side effect.
-  dd_manager.reset();
   NO_FATALS(ShutdownAndRebuildTablet(kNumDirs));
   dd_manager = mini_server_->server()->fs_manager()->dd_manager();
   ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
@@ -949,7 +944,7 @@ TEST_F(TabletServerStartupWebPageTest, 
TestLogBlockContainerMetrics) {
   // Validate populated metrics in case of zero containers during startup.
   ASSERT_OK(c.FetchURL(Substitute("http://$0/metrics";, addr), &buf));
   string raw = buf.ToString();
-  if 
(FsManager::IsLogType(mini_server_->options()->fs_opts.block_manager_type)) {
+  if (mini_server_->options()->fs_opts.block_manager_type == "log") {
     ASSERT_STR_MATCHES(raw, "log_block_manager_total_containers_startup\",\n[ 
]+\"value\": 0");
     ASSERT_STR_MATCHES(raw, 
"log_block_manager_processed_containers_startup\",\n[ ]+\"value\": 0");
     // Since we open each directory and read all the contents, the time taken 
might not be
@@ -3989,7 +3984,7 @@ TEST_F(TabletServerTest, TestDeleteTablet) {
     METRIC_log_block_manager_blocks_under_management.Instantiate(
         mini_server_->server()->metric_entity(), 0);
   const int block_count_before_flush = ondisk->value();
-  if (FsManager::IsLogType(FLAGS_block_manager)) {
+  if (FLAGS_block_manager == "log") {
     ASSERT_EQ(block_count_before_flush, 0);
   }
 
@@ -4000,7 +3995,7 @@ TEST_F(TabletServerTest, TestDeleteTablet) {
   NO_FATALS(InsertTestRowsRemote(2, 1));
 
   const int block_count_after_flush = ondisk->value();
-  if (FsManager::IsLogType(FLAGS_block_manager)) {
+  if (FLAGS_block_manager == "log") {
     ASSERT_GT(block_count_after_flush, block_count_before_flush);
   }
 
@@ -4039,7 +4034,7 @@ TEST_F(TabletServerTest, TestDeleteTablet) {
 
   // Verify data was actually removed.
   const int block_count_after_delete = ondisk->value();
-  if (FsManager::IsLogType(FLAGS_block_manager)) {
+  if (FLAGS_block_manager == "log") {
     ASSERT_EQ(block_count_after_delete, 0);
   }
 
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index e71e11c80..a921d9a68 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -659,8 +659,7 @@ ADD_KUDU_TEST(compression/compression-test)
 if(NOT NO_TESTS)
   target_link_libraries(compression-test
     cfile
-    kudu_util_compression
-    rocksdb)
+    kudu_util_compression)
 endif()
 
 #######################################
@@ -680,9 +679,7 @@ if(NOT NO_TESTS)
   target_link_libraries(jwt-util-test
     mini_oidc
     server_process
-    jwt_test_certs
-    rocksdb
-    lz4)
+    jwt_test_certs)
 endif()
 
 #######################################
diff --git a/thirdparty/build-definitions.sh b/thirdparty/build-definitions.sh
index 9445cb170..74ede0024 100644
--- a/thirdparty/build-definitions.sh
+++ b/thirdparty/build-definitions.sh
@@ -1189,9 +1189,6 @@ build_rocksdb() {
     CXXFLAGS="$EXTRA_CXXFLAGS -fPIC" \
     cmake \
     -DROCKSDB_BUILD_SHARED=ON \
-    -DWITH_LZ4=ON \
-    -Dlz4_ROOT_DIR=$PREFIX \
-    -DUSE_RTTI=ON \
     -DFAIL_ON_WARNINGS=OFF \
     -DWITH_BENCHMARK_TOOLS=OFF \
     -DWITH_CORE_TOOLS=OFF \
@@ -1199,9 +1196,10 @@ build_rocksdb() {
     -DWITH_TRACE_TOOLS=OFF \
     -DWITH_JNI=OFF \
     -DWITH_LIBURING=OFF \
+    -DWITH_LZ4=ON \
     -DWITH_ZSTD=OFF \
+    -DWITH_SNAPPY=ON \
     -DWITH_BZ2=OFF \
-    -DWITH_SNAPPY=OFF \
     -DWITH_TESTS=OFF \
     -DWITH_GFLAGS=OFF \
     -DCMAKE_BUILD_TYPE=release \


Reply via email to