This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4939416  Allow multiple directories in DbLedgerStorage
4939416 is described below

commit 4939416c68f20c990d5c98ee19d59ba662f1b4a0
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sun Apr 8 11:31:17 2018 -0700

    Allow multiple directories in DbLedgerStorage
    
    In normal conditions, the max throughput that a bookie can sustain is 
mostly determined by how fast we can write entries on the journal.
    
    If we assume a *very* fast journal, for example using multiple journal 
threads and setting `journalSyncData=false` or with the journal on a RAM disk 
(there are some cases in which it might make sense), then the LedgerStorage 
becomes the "bottleneck". We need to be able to flush data on disk faster than 
the incoming rate, otherwise the write cache will get full and then we have to 
apply backpressure.
    
    In different testing scenario, the bottleneck has become the LedgerStoarge 
flush/checkpoint because it's done by a single thread.
    
    For smaller entries (1KB) the limit is ~250 K write/s and the limitation is 
due to preparing a batch with the offsets to write into rocksdb. Each insertion 
in the batch needs to cross JNI boundaries and that is expensive.
    
    For bigger entries (10KB), the limit is ~50K write/s (or 500MB /s). This 
mostly because the single flush thread cannot drive 100% IO utilization on the 
disks (since it's doing the rest of work). Raw disk writes in that test 
environment (12 HDDs) could reach 1.4GB/s.
    
    This change is only relative to DbLedgerStorage and it's a simple 
refactoring that, when configured with multiple directories, it will hash a 
ledger into a certain directory. Each directory is flushed independently on its 
own thread.
    
    Author: Matteo Merli <mme...@apache.org>
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #1289 from merlimat/db-storage-multiple-dirs
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   7 +-
 .../bookkeeper/bookie/GarbageCollectorThread.java  |  17 +-
 .../bookkeeper/bookie/LedgerDirsManager.java       |   7 +-
 .../bookie/storage/ldb/DbLedgerStorage.java        | 954 +++------------------
 ...ge.java => SingleDirectoryDbLedgerStorage.java} | 325 ++-----
 .../bookie/storage/ldb/TransientLedgerInfo.java    | 156 ++++
 .../bookie/storage/ldb/DbLedgerStorageTest.java    |  17 +-
 .../storage/ldb/DbLedgerStorageWriteCacheTest.java |  69 +-
 8 files changed, 425 insertions(+), 1127 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index d01a17c..92863e1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -76,7 +76,6 @@ import 
org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
-import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
 import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.MetaStoreException;
@@ -2640,8 +2639,6 @@ public class BookieShell implements Tool {
                     null, checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
             LedgerCache interleavedLedgerCache = 
interleavedStorage.ledgerCache;
 
-            EntryLocationIndex dbEntryLocationIndex = 
dbStorage.getEntryLocationIndex();
-
             int convertedLedgers = 0;
             for (long ledgerId : dbStorage.getActiveLedgersInRange(0, 
Long.MAX_VALUE)) {
                 if (LOG.isDebugEnabled()) {
@@ -2653,10 +2650,10 @@ public class BookieShell implements Tool {
                     interleavedStorage.setFenced(ledgerId);
                 }
 
-                long lastEntryInLedger = 
dbEntryLocationIndex.getLastEntryInLedger(ledgerId);
+                long lastEntryInLedger = 
dbStorage.getLastEntryInLedger(ledgerId);
                 for (long entryId = 0; entryId <= lastEntryInLedger; 
entryId++) {
                     try {
-                        long location = 
dbEntryLocationIndex.getLocation(ledgerId, entryId);
+                        long location = dbStorage.getLocation(ledgerId, 
entryId);
                         if (location != 0L) {
                             interleavedLedgerCache.putEntryOffset(ledgerId, 
entryId, location);
                         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 79515ea..4b1c834 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -125,6 +125,18 @@ public class GarbageCollectorThread extends SafeRunnable {
 
     final ServerConfiguration conf;
 
+    /**
+     * Create a garbage collector thread.
+     *
+     * @param conf
+     *          Server Configuration Object.
+     * @throws IOException
+     */
+    public GarbageCollectorThread(ServerConfiguration conf, LedgerManager 
ledgerManager,
+            final CompactableLedgerStorage ledgerStorage, StatsLogger 
statsLogger) throws IOException {
+        this(conf, ledgerManager, ledgerStorage, statsLogger,
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("GarbageCollectorThread")));
+    }
 
     /**
      * Create a garbage collector thread.
@@ -136,9 +148,10 @@ public class GarbageCollectorThread extends SafeRunnable {
     public GarbageCollectorThread(ServerConfiguration conf,
                                   LedgerManager ledgerManager,
                                   final CompactableLedgerStorage ledgerStorage,
-                                  StatsLogger statsLogger)
+                                  StatsLogger statsLogger,
+                                    ScheduledExecutorService gcExecutor)
         throws IOException {
-        gcExecutor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("GarbageCollectorThread"));
+        this.gcExecutor = gcExecutor;
         this.conf = conf;
 
         this.entryLogger = ledgerStorage.getEntryLogger();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
index a615cfa..f30821a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
@@ -62,8 +62,7 @@ public class LedgerDirsManager {
         this(conf, dirs, diskChecker, NullStatsLogger.INSTANCE);
     }
 
-    @VisibleForTesting
-    LedgerDirsManager(ServerConfiguration conf, File[] dirs, DiskChecker 
diskChecker, StatsLogger statsLogger) {
+    public LedgerDirsManager(ServerConfiguration conf, File[] dirs, 
DiskChecker diskChecker, StatsLogger statsLogger) {
         this.ledgerDirectories = Arrays.asList(Bookie
                 .getCurrentDirectories(dirs));
         this.writableLedgerDirectories = new 
ArrayList<File>(ledgerDirectories);
@@ -343,6 +342,10 @@ public class LedgerDirsManager {
         }
     }
 
+    public DiskChecker getDiskChecker() {
+        return diskChecker;
+    }
+
     /**
      * Indicates All configured ledger directories are full.
      */
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 55b0945..8753363 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -20,323 +20,114 @@
  */
 package org.apache.bookkeeper.bookie.storage.ldb;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.SortedMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.StampedLock;
 
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
-import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
-import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.GarbageCollectorThread;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerStorage;
 import org.apache.bookkeeper.bookie.StateManager;
-import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
-import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
 import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
-import org.apache.bookkeeper.common.util.Watchable;
+import 
org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage.LedgerLoggerProcessor;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+
 
 /**
  * Implementation of LedgerStorage that uses RocksDB to keep the indexes for 
entries stored in EntryLogs.
  */
-public class DbLedgerStorage implements CompactableLedgerStorage {
-
-    private static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
-
-    /**
-     * This class borrows the logic from FileInfo.
-     *
-     * <p>This class is used for holding all the transient states for a given 
ledger.
-     */
-    private static class TransientLedgerInfo extends 
Watchable<LastAddConfirmedUpdateNotification>
-            implements AutoCloseable {
-
-        // lac
-        private volatile long lac = NOT_ASSIGNED_LAC;
-        // request from explicit lac requests
-        private ByteBuffer explicitLac = null;
-        // is the ledger info closed?
-        private boolean isClosed;
-
-        private final long ledgerId;
-        // reference to LedgerMetadataIndex
-        private final LedgerMetadataIndex ledgerIndex;
-
-        private long lastAccessed;
-
-        /**
-         * Construct an Watchable with zero watchers.
-         */
-        public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex 
ledgerIndex) {
-            super(WATCHER_RECYCLER);
-            this.ledgerId = ledgerId;
-            this.ledgerIndex = ledgerIndex;
-            this.lastAccessed = System.currentTimeMillis();
-        }
-
-        long getLastAddConfirmed() {
-            return lac;
-        }
-
-        long setLastAddConfirmed(long lac) {
-            long lacToReturn;
-            boolean changed = false;
-            synchronized (this) {
-                if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
-                    this.lac = lac;
-                    changed = true;
-                    lastAccessed = System.currentTimeMillis();
-                }
-                lacToReturn = this.lac;
-            }
-            if (changed) {
-                notifyWatchers(lacToReturn);
-            }
-            return lacToReturn;
-        }
-
-        synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
-                Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
-            lastAccessed = System.currentTimeMillis();
-            if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || 
ledgerIndex.get(ledgerId).getFenced()) {
-                return false;
-            }
-
-            addWatcher(watcher);
-            return true;
-        }
-
-        public ByteBuf getExplicitLac() {
-            ByteBuf retLac = null;
-            synchronized (this) {
-                if (explicitLac != null) {
-                    retLac = Unpooled.buffer(explicitLac.capacity());
-                    explicitLac.rewind(); // copy from the beginning
-                    retLac.writeBytes(explicitLac);
-                    explicitLac.rewind();
-                    return retLac;
-                }
-            }
-            return retLac;
-        }
-
-        public void setExplicitLac(ByteBuf lac) {
-            long explicitLacValue;
-            synchronized (this) {
-                if (explicitLac == null) {
-                    explicitLac = ByteBuffer.allocate(lac.capacity());
-                }
-                lac.readBytes(explicitLac);
-                explicitLac.rewind();
-
-                // skip the ledger id
-                explicitLac.getLong();
-                explicitLacValue = explicitLac.getLong();
-                explicitLac.rewind();
-
-                lastAccessed = System.currentTimeMillis();
-            }
-            setLastAddConfirmed(explicitLacValue);
-        }
-
-        boolean isStale() {
-            return (lastAccessed + 
TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
-                    .currentTimeMillis();
-        }
-
-        void notifyWatchers(long lastAddConfirmed) {
-            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, 
lastAddConfirmed);
-        }
-
-        @Override
-        public void close() {
-            synchronized (this) {
-                if (isClosed) {
-                    return;
-                }
-                isClosed = true;
-            }
-            // notify watchers
-            notifyWatchers(Long.MAX_VALUE);
-        }
-
-    }
-
-    private EntryLogger entryLogger;
-
-    private LedgerMetadataIndex ledgerIndex;
-    private EntryLocationIndex entryLocationIndex;
-
-    private static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
-    private ConcurrentLongHashMap<TransientLedgerInfo> 
transientLedgerInfoCache;
-
-    private GarbageCollectorThread gcThread;
-
-    // Write cache where all new entries are inserted into
-    protected volatile WriteCache writeCache;
-
-    // Write cache that is used to swap with writeCache during flushes
-    protected volatile WriteCache writeCacheBeingFlushed;
-
-    // Cache where we insert entries for speculative reading
-    private ReadCache readCache;
-
-    private final StampedLock writeCacheRotationLock = new StampedLock();
-
-    protected final ReentrantLock flushMutex = new ReentrantLock();
-
-    protected final AtomicBoolean hasFlushBeenTriggered = new 
AtomicBoolean(false);
-    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
-
-    private final ExecutorService executor = 
Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
-
-    // Executor used to for db index cleanup
-    private final ScheduledExecutorService cleanupExecutor = Executors
-            .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("db-storage-cleanup"));
+@Slf4j
+public class DbLedgerStorage implements LedgerStorage {
 
     static final String WRITE_CACHE_MAX_SIZE_MB = 
"dbStorage_writeCacheMaxSizeMb";
-    static final String READ_AHEAD_CACHE_BATCH_SIZE = 
"dbStorage_readAheadCacheBatchSize";
+
     static final String READ_AHEAD_CACHE_MAX_SIZE_MB = 
"dbStorage_readAheadCacheMaxSizeMb";
 
     static final String MAX_THROTTLE_TIME_MILLIS = 
"dbStorage_maxThrottleTimeMs";
 
     private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
     private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
-    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
-
-    private static final long DEFAUL_MAX_THROTTLE_TIME_MILLIS = 
TimeUnit.SECONDS.toMillis(10);
 
     private static final int MB = 1024 * 1024;
 
-    private final CopyOnWriteArrayList<LedgerDeletionListener> 
ledgerDeletionListeners = Lists
-            .newCopyOnWriteArrayList();
-
-    private long writeCacheMaxSize;
-
-    private CheckpointSource checkpointSource = null;
-    private Checkpoint lastCheckpoint = Checkpoint.MIN;
-
-    private long readCacheMaxSize;
-    private int readAheadCacheBatchSize;
-
-    private long maxThrottleTimeNanos;
-
-    private StatsLogger stats;
+    private int numberOfDirs;
+    private List<SingleDirectoryDbLedgerStorage> ledgerStorageList;
 
-    private OpStatsLogger addEntryStats;
-    private OpStatsLogger readEntryStats;
-    private OpStatsLogger readCacheHitStats;
-    private OpStatsLogger readCacheMissStats;
-    private OpStatsLogger readAheadBatchCountStats;
-    private OpStatsLogger readAheadBatchSizeStats;
-    private OpStatsLogger flushStats;
-    private OpStatsLogger flushSizeStats;
-
-    private Counter throttledWriteRequests;
-    private Counter rejectedWriteRequests;
+    // Keep 1 single Bookie GC thread so the the compactions from multiple 
individual directories are serialized
+    private ScheduledExecutorService gcExecutor;
 
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager 
ledgerManager, LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager, StateManager stateManager, 
CheckpointSource checkpointSource,
             Checkpointer checkpointer, StatsLogger statsLogger) throws 
IOException {
-        checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
-                "Db implementation only allows for one storage dir");
-
-        String baseDir = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
-
-        writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, 
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
-
-        writeCache = new WriteCache(writeCacheMaxSize / 2);
-        writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
-
-        this.checkpointSource = checkpointSource;
-
-        readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, 
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
-        readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, 
DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
-
-        long maxThrottleTimeMillis = conf.getLong(MAX_THROTTLE_TIME_MILLIS, 
DEFAUL_MAX_THROTTLE_TIME_MILLIS);
-        maxThrottleTimeNanos = 
TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
-
-        readCache = new ReadCache(readCacheMaxSize);
+        long writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, 
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
+        long readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, 
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
 
-        this.stats = statsLogger;
+        this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
 
         log.info("Started Db Ledger Storage");
+        log.info(" - Number of directories: {}", numberOfDirs);
         log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
         log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
-        log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
 
-        ledgerIndex = new LedgerMetadataIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
-        entryLocationIndex = new EntryLocationIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
+        long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
+        long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
 
-        transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
-                Runtime.getRuntime().availableProcessors() * 2);
-        
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo, 
LEDGER_INFO_CACHING_TIME_MINUTES,
-                LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
+        gcExecutor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("GarbageCollector"));
 
-        entryLogger = new EntryLogger(conf, ledgerDirsManager);
-        gcThread = new GarbageCollectorThread(conf, ledgerManager, this, 
statsLogger);
+        ledgerStorageList = Lists.newArrayList();
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            // Create a ledger dirs manager for the single directory
+            File[] dirs = new File[1];
+            // Remove the `/current` suffix which will be appended again by 
LedgersDirManager
+            dirs[0] = ledgerDir.getParentFile();
+            LedgerDirsManager ldm = new LedgerDirsManager(conf, dirs, 
ledgerDirsManager.getDiskChecker(), statsLogger);
+            ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, 
ledgerManager, ldm, indexDirsManager,
+                    stateManager, checkpointSource, checkpointer, statsLogger, 
gcExecutor, perDirectoryWriteCacheSize,
+                    perDirectoryReadCacheSize));
+        }
 
-        registerStats();
+        registerStats(statsLogger);
     }
 
-    /**
-     * Evict all the ledger info object that were not used recently.
-     */
-    private void cleanupStaleTransientLedgerInfo() {
-        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
-            boolean isStale = ledgerInfo.isStale();
-            if (isStale) {
-                ledgerInfo.close();
-            }
-
-            return isStale;
-        });
+    @VisibleForTesting
+    protected SingleDirectoryDbLedgerStorage 
newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
+            LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, 
LedgerDirsManager indexDirsManager,
+            StateManager stateManager, CheckpointSource checkpointSource, 
Checkpointer checkpointer,
+            StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long 
writeCacheSize, long readCacheSize)
+            throws IOException {
+        return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, 
ledgerDirsManager, indexDirsManager,
+                stateManager, checkpointSource, checkpointer, statsLogger, 
gcExecutor, writeCacheSize, readCacheSize);
     }
 
-    public void registerStats() {
+    public void registerStats(StatsLogger stats) {
         stats.registerGauge("write-cache-size", new Gauge<Long>() {
             @Override
             public Long getDefaultValue() {
@@ -345,7 +136,7 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
 
             @Override
             public Long getSample() {
-                return writeCache.size() + writeCacheBeingFlushed.size();
+                return 
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheSize).sum();
             }
         });
         stats.registerGauge("write-cache-count", new Gauge<Long>() {
@@ -356,7 +147,7 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
 
             @Override
             public Long getSample() {
-                return writeCache.count() + writeCacheBeingFlushed.count();
+                return 
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheCount).sum();
             }
         });
         stats.registerGauge("read-cache-size", new Gauge<Long>() {
@@ -367,7 +158,7 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
 
             @Override
             public Long getSample() {
-                return readCache.size();
+                return 
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheSize).sum();
             }
         });
         stats.registerGauge("read-cache-count", new Gauge<Long>() {
@@ -378,666 +169,142 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
 
             @Override
             public Long getSample() {
-                return readCache.count();
+                return 
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheCount).sum();
             }
         });
-
-        addEntryStats = stats.getOpStatsLogger("add-entry");
-        readEntryStats = stats.getOpStatsLogger("read-entry");
-        readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
-        readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
-        readAheadBatchCountStats = 
stats.getOpStatsLogger("readahead-batch-count");
-        readAheadBatchSizeStats = 
stats.getOpStatsLogger("readahead-batch-size");
-        flushStats = stats.getOpStatsLogger("flush");
-        flushSizeStats = stats.getOpStatsLogger("flush-size");
-
-        throttledWriteRequests = stats.getCounter("throttled-write-requests");
-        rejectedWriteRequests = stats.getCounter("rejected-write-requests");
     }
 
     @Override
     public void start() {
-        gcThread.start();
+        ledgerStorageList.forEach(LedgerStorage::start);
     }
 
     @Override
     public void shutdown() throws InterruptedException {
-        try {
-            flush();
-
-            gcThread.shutdown();
-            entryLogger.shutdown();
-
-            cleanupExecutor.shutdown();
-            cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS);
-
-            ledgerIndex.close();
-            entryLocationIndex.close();
-
-            writeCache.close();
-            writeCacheBeingFlushed.close();
-            readCache.close();
-            executor.shutdown();
-
-        } catch (IOException e) {
-            log.error("Error closing db storage", e);
+        for (LedgerStorage ls : ledgerStorageList) {
+            ls.shutdown();
         }
     }
 
     @Override
     public boolean ledgerExists(long ledgerId) throws IOException {
-        try {
-            LedgerData ledgerData = ledgerIndex.get(ledgerId);
-            if (log.isDebugEnabled()) {
-                log.debug("Ledger exists. ledger: {} : {}", ledgerId, 
ledgerData.getExists());
-            }
-            return ledgerData.getExists();
-        } catch (Bookie.NoLedgerException nle) {
-            // ledger does not exist
-            return false;
-        }
+        return getLedgerSorage(ledgerId).ledgerExists(ledgerId);
     }
 
     @Override
-    public boolean isFenced(long ledgerId) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("isFenced. ledger: {}", ledgerId);
-        }
-        return ledgerIndex.get(ledgerId).getFenced();
+    public boolean setFenced(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).setFenced(ledgerId);
     }
 
     @Override
-    public boolean setFenced(long ledgerId) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Set fenced. ledger: {}", ledgerId);
-        }
-        boolean changed = ledgerIndex.setFenced(ledgerId);
-        if (changed) {
-            // notify all the watchers if a ledger is fenced
-            TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
-            if (null != ledgerInfo) {
-                ledgerInfo.notifyWatchers(Long.MAX_VALUE);
-            }
-        }
-        return changed;
+    public boolean isFenced(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).isFenced(ledgerId);
     }
 
     @Override
     public void setMasterKey(long ledgerId, byte[] masterKey) throws 
IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Set master key. ledger: {}", ledgerId);
-        }
-        ledgerIndex.setMasterKey(ledgerId, masterKey);
+        getLedgerSorage(ledgerId).setMasterKey(ledgerId, masterKey);
     }
 
     @Override
     public byte[] readMasterKey(long ledgerId) throws IOException, 
BookieException {
-        if (log.isDebugEnabled()) {
-            log.debug("Read master key. ledger: {}", ledgerId);
-        }
-        return ledgerIndex.get(ledgerId).getMasterKey().toByteArray();
+        return getLedgerSorage(ledgerId).readMasterKey(ledgerId);
     }
 
     @Override
     public long addEntry(ByteBuf entry) throws IOException, BookieException {
-        long startTime = MathUtils.nowInNano();
-
         long ledgerId = entry.getLong(entry.readerIndex());
-        long entryId = entry.getLong(entry.readerIndex() + 8);
-        long lac = entry.getLong(entry.readerIndex() + 16);
-
-        if (log.isDebugEnabled()) {
-            log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac);
-        }
-
-        // First we try to do an optimistic locking to get access to the 
current write cache.
-        // This is based on the fact that the write cache is only being 
rotated (swapped) every 1 minute. During the
-        // rest of the time, we can have multiple thread using the optimistic 
lock here without interfering.
-        long stamp = writeCacheRotationLock.tryOptimisticRead();
-        boolean inserted = false;
-
-        inserted = writeCache.put(ledgerId, entryId, entry);
-        if (!writeCacheRotationLock.validate(stamp)) {
-            // The write cache was rotated while we were inserting. We need to 
acquire the proper read lock and repeat
-            // the operation because we might have inserted in a write cache 
that was already being flushed and cleared,
-            // without being sure about this last entry being flushed or not.
-            stamp = writeCacheRotationLock.readLock();
-            try {
-                inserted = writeCache.put(ledgerId, entryId, entry);
-            } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
-            }
-        }
-
-        if (!inserted) {
-            triggerFlushAndAddEntry(ledgerId, entryId, entry);
-        }
-
-        // after successfully insert the entry, update LAC and notify the 
watchers
-        updateCachedLacIfNeeded(ledgerId, lac);
-
-        recordSuccessfulEvent(addEntryStats, startTime);
-        return entryId;
-    }
-
-    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf 
entry)
-            throws IOException, BookieException {
-        // Write cache is full, we need to trigger a flush so that it gets 
rotated
-        // If the flush has already been triggered or flush has already 
switched the
-        // cache, we don't need to trigger another flush
-        if (!isFlushOngoing.get() && 
hasFlushBeenTriggered.compareAndSet(false, true)) {
-            // Trigger an early flush in background
-            log.info("Write cache is full, triggering flush");
-            executor.execute(() -> {
-                try {
-                    flush();
-                } catch (IOException e) {
-                    log.error("Error during flush", e);
-                }
-            });
-        }
-
-        throttledWriteRequests.inc();
-        long absoluteTimeoutNanos = System.nanoTime() + maxThrottleTimeNanos;
-
-        while (System.nanoTime() < absoluteTimeoutNanos) {
-            long stamp = writeCacheRotationLock.readLock();
-            try {
-                if (writeCache.put(ledgerId, entryId, entry)) {
-                    // We succeeded in putting the entry in write cache in the
-                    return;
-                }
-            } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
-            }
-
-            // Wait some time and try again
-            try {
-                Thread.sleep(1);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new IOException("Interrupted when adding entry " + 
ledgerId + "@" + entryId);
-            }
-        }
-
-        // Timeout expired and we weren't able to insert in write cache
-        rejectedWriteRequests.inc();
-        throw new OperationRejectedException();
+        return getLedgerSorage(ledgerId).addEntry(entry);
     }
 
     @Override
     public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
-        long startTime = MathUtils.nowInNano();
-        if (log.isDebugEnabled()) {
-            log.debug("Get Entry: {}@{}", ledgerId, entryId);
-        }
-
-        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
-            return getLastEntry(ledgerId);
-        }
-
-        // We need to try to read from both write caches, since recent entries 
could be found in either of the two. The
-        // write caches are already thread safe on their own, here we just 
need to make sure we get references to both
-        // of them. Using an optimistic lock since the read lock is always 
free, unless we're swapping the caches.
-        long stamp = writeCacheRotationLock.tryOptimisticRead();
-        WriteCache localWriteCache = writeCache;
-        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
-        if (!writeCacheRotationLock.validate(stamp)) {
-            // Fallback to regular read lock approach
-            stamp = writeCacheRotationLock.readLock();
-            try {
-                localWriteCache = writeCache;
-                localWriteCacheBeingFlushed = writeCacheBeingFlushed;
-            } finally {
-                writeCacheRotationLock.unlockRead(stamp);
-            }
-        }
-
-        // First try to read from the write cache of recent entries
-        ByteBuf entry = localWriteCache.get(ledgerId, entryId);
-        if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
-            return entry;
-        }
-
-        // If there's a flush going on, the entry might be in the flush buffer
-        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
-        if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
-            return entry;
-        }
-
-        // Try reading from read-ahead cache
-        entry = readCache.get(ledgerId, entryId);
-        if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
-            return entry;
-        }
-
-        // Read from main storage
-        long entryLocation;
-        try {
-            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
-            if (entryLocation == 0) {
-                throw new NoEntryException(ledgerId, entryId);
-            }
-            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
-        } catch (NoEntryException e) {
-            recordFailedEvent(readEntryStats, startTime);
-            throw e;
-        }
-
-        readCache.put(ledgerId, entryId, entry);
-
-        // Try to read more entries
-        long nextEntryLocation = entryLocation + 4 /* size header */ + 
entry.readableBytes();
-        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
-
-        recordSuccessfulEvent(readCacheMissStats, startTime);
-        recordSuccessfulEvent(readEntryStats, startTime);
-        return entry;
-    }
-
-    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, 
long firstEntryLocation) {
-        try {
-            long firstEntryLogId = (firstEntryLocation >> 32);
-            long currentEntryLogId = firstEntryLogId;
-            long currentEntryLocation = firstEntryLocation;
-            int count = 0;
-            long size = 0;
-
-            while (count < readAheadCacheBatchSize && currentEntryLogId == 
firstEntryLogId) {
-                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, 
-1, currentEntryLocation);
-
-                try {
-                    long currentEntryLedgerId = entry.getLong(0);
-                    long currentEntryId = entry.getLong(8);
-
-                    if (currentEntryLedgerId != orginalLedgerId) {
-                        // Found an entry belonging to a different ledger, 
stopping read-ahead
-                        entry.release();
-                        return;
-                    }
-
-                    // Insert entry in read cache
-                    readCache.put(orginalLedgerId, currentEntryId, entry);
-
-                    count++;
-                    size += entry.readableBytes();
-
-                    currentEntryLocation += 4 + entry.readableBytes();
-                    currentEntryLogId = currentEntryLocation >> 32;
-                } finally {
-                    entry.release();
-                }
-            }
-
-            readAheadBatchCountStats.registerSuccessfulValue(count);
-            readAheadBatchSizeStats.registerSuccessfulValue(size);
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("Exception during read ahead for ledger: {}: e", 
orginalLedgerId, e);
-            }
-        }
-    }
-
-    public ByteBuf getLastEntry(long ledgerId) throws IOException {
-        long startTime = MathUtils.nowInNano();
-
-        long stamp = writeCacheRotationLock.readLock();
-        try {
-            // First try to read from the write cache of recent entries
-            ByteBuf entry = writeCache.getLastEntry(ledgerId);
-            if (entry != null) {
-                if (log.isDebugEnabled()) {
-                    long foundLedgerId = entry.readLong(); // ledgedId
-                    long entryId = entry.readLong();
-                    entry.resetReaderIndex();
-                    if (log.isDebugEnabled()) {
-                        log.debug("Found last entry for ledger {} in write 
cache: {}@{}", ledgerId, foundLedgerId,
-                                entryId);
-                    }
-                }
-
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
-            }
-
-            // If there's a flush going on, the entry might be in the flush 
buffer
-            entry = writeCacheBeingFlushed.getLastEntry(ledgerId);
-            if (entry != null) {
-                if (log.isDebugEnabled()) {
-                    entry.readLong(); // ledgedId
-                    long entryId = entry.readLong();
-                    entry.resetReaderIndex();
-                    if (log.isDebugEnabled()) {
-                        log.debug("Found last entry for ledger {} in write 
cache being flushed: {}", ledgerId, entryId);
-                    }
-                }
-
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
-            }
-        } finally {
-            writeCacheRotationLock.unlockRead(stamp);
-        }
-
-        // Search the last entry in storage
-        long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
-        if (log.isDebugEnabled()) {
-            log.debug("Found last entry for ledger {} in db: {}", ledgerId, 
lastEntryId);
-        }
-
-        long entryLocation = entryLocationIndex.getLocation(ledgerId, 
lastEntryId);
-        ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, 
entryLocation);
-
-        recordSuccessfulEvent(readCacheMissStats, startTime);
-        recordSuccessfulEvent(readEntryStats, startTime);
-        return content;
-    }
-
-    @VisibleForTesting
-    boolean isFlushRequired() {
-        long stamp = writeCacheRotationLock.readLock();
-        try {
-            return !writeCache.isEmpty();
-        } finally {
-            writeCacheRotationLock.unlockRead(stamp);
-        }
+        return getLedgerSorage(ledgerId).getEntry(ledgerId, entryId);
     }
 
     @Override
-    public void checkpoint(Checkpoint checkpoint) throws IOException {
-        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
-        if (lastCheckpoint.compareTo(checkpoint) > 0) {
-            return;
-        }
-
-        long startTime = MathUtils.nowInNano();
-
-        // Only a single flush operation can happen at a time
-        flushMutex.lock();
-
-        try {
-            // Swap the write cache so that writes can continue to happen 
while the flush is
-            // ongoing
-            swapWriteCache();
-
-            long sizeToFlush = writeCacheBeingFlushed.size();
-            if (log.isDebugEnabled()) {
-                log.debug("Flushing entries. count: {} -- size {} Mb", 
writeCacheBeingFlushed.count(),
-                        sizeToFlush / 1024.0 / 1024);
-            }
-
-            // Write all the pending entries into the entry logger and collect 
the offset
-            // position for each entry
-
-            Batch batch = entryLocationIndex.newBatch();
-            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
-                try {
-                    long location = entryLogger.addEntry(ledgerId, entry, 
true);
-                    entryLocationIndex.addLocation(batch, ledgerId, entryId, 
location);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            });
-
-            entryLogger.flush();
-
-            long batchFlushStarTime = System.nanoTime();
-            batch.flush();
-            batch.close();
-            if (log.isDebugEnabled()) {
-                log.debug("DB batch flushed time : {} s",
-                        MathUtils.elapsedNanos(batchFlushStarTime) / (double) 
TimeUnit.SECONDS.toNanos(1));
-            }
-
-            ledgerIndex.flush();
-
-            cleanupExecutor.execute(() -> {
-                // There can only be one single cleanup task running because 
the cleanupExecutor
-                // is single-threaded
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Removing deleted ledgers from db indexes");
-                    }
-
-                    entryLocationIndex.removeOffsetFromDeletedLedgers();
-                    ledgerIndex.removeDeletedLedgers();
-                } catch (Throwable t) {
-                    log.warn("Failed to cleanup db indexes", t);
-                }
-            });
-
-            lastCheckpoint = thisCheckpoint;
-
-            // Discard all the entry from the write cache, since they're now 
persisted
-            writeCacheBeingFlushed.clear();
-
-            double flushTimeSeconds = MathUtils.elapsedNanos(startTime) / 
(double) TimeUnit.SECONDS.toNanos(1);
-            double flushThroughput = sizeToFlush / 1024.0 / 1024.0 / 
flushTimeSeconds;
-
-            if (log.isDebugEnabled()) {
-                log.debug("Flushing done time {} s -- Written {} MB/s", 
flushTimeSeconds, flushThroughput);
-            }
-
-            recordSuccessfulEvent(flushStats, startTime);
-            flushSizeStats.registerSuccessfulValue(sizeToFlush);
-        } catch (IOException e) {
-            // Leave IOExecption as it is
-            throw e;
-        } catch (RuntimeException e) {
-            // Wrap unchecked exceptions
-            throw new IOException(e);
-        } finally {
-            try {
-                isFlushOngoing.set(false);
-            } finally {
-                flushMutex.unlock();
-            }
-        }
-    }
-
-    /**
-     * Swap the current write cache with the replacement cache.
-     */
-    private void swapWriteCache() {
-        long stamp = writeCacheRotationLock.writeLock();
-        try {
-            // First, swap the current write-cache map with an empty one so 
that writes will
-            // go on unaffected. Only a single flush is happening at the same 
time
-            WriteCache tmp = writeCacheBeingFlushed;
-            writeCacheBeingFlushed = writeCache;
-            writeCache = tmp;
-
-            // since the cache is switched, we can allow flush to be triggered
-            hasFlushBeenTriggered.set(false);
-        } finally {
-            try {
-                isFlushOngoing.set(true);
-            } finally {
-                writeCacheRotationLock.unlockWrite(stamp);
-            }
-        }
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).getLastAddConfirmed(ledgerId);
     }
 
     @Override
-    public void flush() throws IOException {
-        Checkpoint cp = checkpointSource.newCheckpoint();
-        checkpoint(cp);
-        checkpointSource.checkpointComplete(cp, true);
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId, long 
previousLAC,
+            Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
+        return 
getLedgerSorage(ledgerId).waitForLastAddConfirmedUpdate(ledgerId, previousLAC, 
watcher);
     }
 
     @Override
-    public void deleteLedger(long ledgerId) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Deleting ledger {}", ledgerId);
-        }
-
-        // Delete entries from this ledger that are still in the write cache
-        long stamp = writeCacheRotationLock.readLock();
-        try {
-            writeCache.deleteLedger(ledgerId);
-        } finally {
-            writeCacheRotationLock.unlockRead(stamp);
-        }
-
-        entryLocationIndex.delete(ledgerId);
-        ledgerIndex.delete(ledgerId);
-
-        for (int i = 0, size = ledgerDeletionListeners.size(); i < size; i++) {
-            LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
-            listener.ledgerDeleted(ledgerId);
-        }
-
-        TransientLedgerInfo tli = transientLedgerInfoCache.remove(ledgerId);
-        if (tli != null) {
-            tli.close();
+    public void flush() throws IOException {
+        for (LedgerStorage ls : ledgerStorageList) {
+            ls.flush();
         }
     }
 
     @Override
-    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long 
lastLedgerId) throws IOException {
-        return ledgerIndex.getActiveLedgersInRange(firstLedgerId, 
lastLedgerId);
-    }
-
-    @Override
-    public void updateEntriesLocations(Iterable<EntryLocation> locations) 
throws IOException {
-        // Trigger a flush to have all the entries being compacted in the db 
storage
-        flush();
-
-        entryLocationIndex.updateLocations(locations);
-    }
-
-    @Override
-    public EntryLogger getEntryLogger() {
-        return entryLogger;
+    public void checkpoint(Checkpoint checkpoint) throws IOException {
+        for (LedgerStorage ls : ledgerStorageList) {
+            ls.checkpoint(checkpoint);
+        }
     }
 
     @Override
-    public long getLastAddConfirmed(long ledgerId) throws IOException {
-        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
-        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : 
NOT_ASSIGNED_LAC;
-        if (lac == NOT_ASSIGNED_LAC) {
-            ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
-            try {
-                bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
-                lac = bb.readLong();
-                lac = getOrAddLedgerInfo(ledgerId).setLastAddConfirmed(lac);
-            } finally {
-                bb.release();
-            }
-        }
-        return lac;
+    public void deleteLedger(long ledgerId) throws IOException {
+        getLedgerSorage(ledgerId).deleteLedger(ledgerId);
     }
 
     @Override
-    public boolean waitForLastAddConfirmedUpdate(long ledgerId, long 
previousLAC,
-            Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
-        return 
getOrAddLedgerInfo(ledgerId).waitForLastAddConfirmedUpdate(previousLAC, 
watcher);
+    public void registerLedgerDeletionListener(LedgerDeletionListener 
listener) {
+        ledgerStorageList.forEach(ls -> 
ls.registerLedgerDeletionListener(listener));
     }
 
     @Override
     public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
-        getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
+        getLedgerSorage(ledgerId).setExplicitlac(ledgerId, lac);
     }
 
     @Override
     public ByteBuf getExplicitLac(long ledgerId) {
-        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
-        if (null == ledgerInfo) {
-            return null;
-        } else {
-            return ledgerInfo.getExplicitLac();
-        }
+        return getLedgerSorage(ledgerId).getExplicitLac(ledgerId);
     }
 
-    private TransientLedgerInfo getOrAddLedgerInfo(long ledgerId) {
-        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
-        if (tli != null) {
-            return tli;
-        } else {
-            TransientLedgerInfo newTli = new TransientLedgerInfo(ledgerId, 
ledgerIndex);
-            tli = transientLedgerInfoCache.putIfAbsent(ledgerId, newTli);
-            if (tli != null) {
-                newTli.close();
-                return tli;
-            } else {
-                return newTli;
-            }
-        }
+    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] 
masterKey,
+            Iterable<SortedMap<Long, Long>> entries) throws Exception {
+        return getLedgerSorage(ledgerId).addLedgerToIndex(ledgerId, isFenced, 
masterKey, entries);
     }
 
-    private void updateCachedLacIfNeeded(long ledgerId, long lac) {
-        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
-        if (tli != null) {
-            tli.setLastAddConfirmed(lac);
-        }
+    public long getLastEntryInLedger(long ledgerId) throws IOException {
+        return 
getLedgerSorage(ledgerId).getEntryLocationIndex().getLastEntryInLedger(ledgerId);
     }
 
-    @Override
-    public void flushEntriesLocationsIndex() throws IOException {
-        // No-op. Location index is already flushed in 
updateEntriesLocations() call
+    public long getLocation(long ledgerId, long entryId) throws IOException {
+        return 
getLedgerSorage(ledgerId).getEntryLocationIndex().getLocation(ledgerId, 
entryId);
     }
 
-    /**
-     * Add an already existing ledger to the index.
-     *
-     * <p>This method is only used as a tool to help the migration from 
InterleaveLedgerStorage to DbLedgerStorage
-     *
-     * @param ledgerId
-     *            the ledger id
-     * @param entries
-     *            a map of entryId -> location
-     * @return the number of
-     */
-    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] 
masterKey,
-            Iterable<SortedMap<Long, Long>> entries) throws Exception {
-        LedgerData ledgerData = 
LedgerData.newBuilder().setExists(true).setFenced(isFenced)
-                .setMasterKey(ByteString.copyFrom(masterKey)).build();
-        ledgerIndex.set(ledgerId, ledgerData);
-        AtomicLong numberOfEntries = new AtomicLong();
-
-        // Iterate over all the entries pages
-        Batch batch = entryLocationIndex.newBatch();
-        entries.forEach(map -> {
-            map.forEach((entryId, location) -> {
-                try {
-                    entryLocationIndex.addLocation(batch, ledgerId, entryId, 
location);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-
-                numberOfEntries.incrementAndGet();
-            });
-        });
-
-        batch.flush();
-        batch.close();
-
-        return numberOfEntries.get();
+    private SingleDirectoryDbLedgerStorage getLedgerSorage(long ledgerId) {
+        return ledgerStorageList.get(MathUtils.signSafeMod(ledgerId, 
numberOfDirs));
     }
 
-    @Override
-    public void registerLedgerDeletionListener(LedgerDeletionListener 
listener) {
-        ledgerDeletionListeners.add(listener);
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long 
lastLedgerId) throws IOException {
+        List<Iterable<Long>> listIt = new ArrayList<>(numberOfDirs);
+        for (SingleDirectoryDbLedgerStorage ls : ledgerStorageList) {
+            listIt.add(ls.getActiveLedgersInRange(firstLedgerId, 
lastLedgerId));
+        }
+
+        return Iterables.concat(listIt);
     }
 
-    public EntryLocationIndex getEntryLocationIndex() {
-        return entryLocationIndex;
+    public ByteBuf getLastEntry(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).getLastEntry(ledgerId);
     }
 
-    private void recordSuccessfulEvent(OpStatsLogger logger, long 
startTimeNanos) {
-        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+    @VisibleForTesting
+    boolean isFlushRequired() {
+        return 
ledgerStorageList.stream().allMatch(SingleDirectoryDbLedgerStorage::isFlushRequired);
     }
 
-    private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
-        logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+    @VisibleForTesting
+    List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
+        return ledgerStorageList;
     }
 
     /**
@@ -1056,7 +323,10 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
 
         LedgerDirsManager ledgerDirsManager = new 
LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
                 new DiskChecker(serverConf.getDiskUsageThreshold(), 
serverConf.getDiskUsageWarnThreshold()));
-        String ledgerBasePath = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+        List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
+
+        int dirIndex = MathUtils.signSafeMod(ledgerId, ledgerDirs.size());
+        String ledgerBasePath = ledgerDirs.get(dirIndex).toString();
 
         EntryLocationIndex entryLocationIndex = new 
EntryLocationIndex(serverConf,
                 (path, dbConfigType, conf1) -> new 
KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
@@ -1078,12 +348,4 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
         }
     }
 
-    /**
-     * Interface which process ledger logger.
-     */
-    public interface LedgerLoggerProcessor {
-        void process(long entryId, long entryLogId, long position);
-    }
-
-    private static final Logger log = 
LoggerFactory.getLogger(DbLedgerStorage.class);
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
similarity index 78%
copy from 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
copy to 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 55b0945..c8e784a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -21,19 +21,15 @@
 package org.apache.bookkeeper.bookie.storage.ldb;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.SortedMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
@@ -61,158 +57,34 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.StateManager;
 import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
-import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
-import org.apache.bookkeeper.common.util.Watchable;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of LedgerStorage that uses RocksDB to keep the indexes for 
entries stored in EntryLogs.
+ * Single directory implementation of LedgerStorage that uses RocksDB to keep 
the indexes for entries stored in
+ * EntryLogs.
+ *
+ * <p>This is meant only to be used from {@link DbLedgerStorage}.
  */
-public class DbLedgerStorage implements CompactableLedgerStorage {
-
-    private static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
-
-    /**
-     * This class borrows the logic from FileInfo.
-     *
-     * <p>This class is used for holding all the transient states for a given 
ledger.
-     */
-    private static class TransientLedgerInfo extends 
Watchable<LastAddConfirmedUpdateNotification>
-            implements AutoCloseable {
-
-        // lac
-        private volatile long lac = NOT_ASSIGNED_LAC;
-        // request from explicit lac requests
-        private ByteBuffer explicitLac = null;
-        // is the ledger info closed?
-        private boolean isClosed;
-
-        private final long ledgerId;
-        // reference to LedgerMetadataIndex
-        private final LedgerMetadataIndex ledgerIndex;
-
-        private long lastAccessed;
-
-        /**
-         * Construct an Watchable with zero watchers.
-         */
-        public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex 
ledgerIndex) {
-            super(WATCHER_RECYCLER);
-            this.ledgerId = ledgerId;
-            this.ledgerIndex = ledgerIndex;
-            this.lastAccessed = System.currentTimeMillis();
-        }
-
-        long getLastAddConfirmed() {
-            return lac;
-        }
-
-        long setLastAddConfirmed(long lac) {
-            long lacToReturn;
-            boolean changed = false;
-            synchronized (this) {
-                if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
-                    this.lac = lac;
-                    changed = true;
-                    lastAccessed = System.currentTimeMillis();
-                }
-                lacToReturn = this.lac;
-            }
-            if (changed) {
-                notifyWatchers(lacToReturn);
-            }
-            return lacToReturn;
-        }
+public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage {
+    private final EntryLogger entryLogger;
 
-        synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
-                Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
-            lastAccessed = System.currentTimeMillis();
-            if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || 
ledgerIndex.get(ledgerId).getFenced()) {
-                return false;
-            }
-
-            addWatcher(watcher);
-            return true;
-        }
-
-        public ByteBuf getExplicitLac() {
-            ByteBuf retLac = null;
-            synchronized (this) {
-                if (explicitLac != null) {
-                    retLac = Unpooled.buffer(explicitLac.capacity());
-                    explicitLac.rewind(); // copy from the beginning
-                    retLac.writeBytes(explicitLac);
-                    explicitLac.rewind();
-                    return retLac;
-                }
-            }
-            return retLac;
-        }
+    private final LedgerMetadataIndex ledgerIndex;
+    private final EntryLocationIndex entryLocationIndex;
 
-        public void setExplicitLac(ByteBuf lac) {
-            long explicitLacValue;
-            synchronized (this) {
-                if (explicitLac == null) {
-                    explicitLac = ByteBuffer.allocate(lac.capacity());
-                }
-                lac.readBytes(explicitLac);
-                explicitLac.rewind();
-
-                // skip the ledger id
-                explicitLac.getLong();
-                explicitLacValue = explicitLac.getLong();
-                explicitLac.rewind();
+    private final ConcurrentLongHashMap<TransientLedgerInfo> 
transientLedgerInfoCache;
 
-                lastAccessed = System.currentTimeMillis();
-            }
-            setLastAddConfirmed(explicitLacValue);
-        }
-
-        boolean isStale() {
-            return (lastAccessed + 
TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
-                    .currentTimeMillis();
-        }
-
-        void notifyWatchers(long lastAddConfirmed) {
-            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, 
lastAddConfirmed);
-        }
-
-        @Override
-        public void close() {
-            synchronized (this) {
-                if (isClosed) {
-                    return;
-                }
-                isClosed = true;
-            }
-            // notify watchers
-            notifyWatchers(Long.MAX_VALUE);
-        }
-
-    }
-
-    private EntryLogger entryLogger;
-
-    private LedgerMetadataIndex ledgerIndex;
-    private EntryLocationIndex entryLocationIndex;
-
-    private static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
-    private ConcurrentLongHashMap<TransientLedgerInfo> 
transientLedgerInfoCache;
-
-    private GarbageCollectorThread gcThread;
+    private final GarbageCollectorThread gcThread;
 
     // Write cache where all new entries are inserted into
     protected volatile WriteCache writeCache;
@@ -221,7 +93,7 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
     protected volatile WriteCache writeCacheBeingFlushed;
 
     // Cache where we insert entries for speculative reading
-    private ReadCache readCache;
+    private final ReadCache readCache;
 
     private final StampedLock writeCacheRotationLock = new StampedLock();
 
@@ -236,107 +108,77 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
     private final ScheduledExecutorService cleanupExecutor = Executors
             .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("db-storage-cleanup"));
 
-    static final String WRITE_CACHE_MAX_SIZE_MB = 
"dbStorage_writeCacheMaxSizeMb";
-    static final String READ_AHEAD_CACHE_BATCH_SIZE = 
"dbStorage_readAheadCacheBatchSize";
-    static final String READ_AHEAD_CACHE_MAX_SIZE_MB = 
"dbStorage_readAheadCacheMaxSizeMb";
-
-    static final String MAX_THROTTLE_TIME_MILLIS = 
"dbStorage_maxThrottleTimeMs";
-
-    private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
-    private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
-    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
-
-    private static final long DEFAUL_MAX_THROTTLE_TIME_MILLIS = 
TimeUnit.SECONDS.toMillis(10);
-
-    private static final int MB = 1024 * 1024;
-
     private final CopyOnWriteArrayList<LedgerDeletionListener> 
ledgerDeletionListeners = Lists
             .newCopyOnWriteArrayList();
 
-    private long writeCacheMaxSize;
-
-    private CheckpointSource checkpointSource = null;
+    private final CheckpointSource checkpointSource;
     private Checkpoint lastCheckpoint = Checkpoint.MIN;
 
-    private long readCacheMaxSize;
-    private int readAheadCacheBatchSize;
+    private final long writeCacheMaxSize;
+    private final long readCacheMaxSize;
+    private final int readAheadCacheBatchSize;
 
-    private long maxThrottleTimeNanos;
+    private final long maxThrottleTimeNanos;
 
-    private StatsLogger stats;
+    private final StatsLogger stats;
 
-    private OpStatsLogger addEntryStats;
-    private OpStatsLogger readEntryStats;
-    private OpStatsLogger readCacheHitStats;
-    private OpStatsLogger readCacheMissStats;
-    private OpStatsLogger readAheadBatchCountStats;
-    private OpStatsLogger readAheadBatchSizeStats;
-    private OpStatsLogger flushStats;
-    private OpStatsLogger flushSizeStats;
+    private final OpStatsLogger addEntryStats;
+    private final OpStatsLogger readEntryStats;
+    private final OpStatsLogger readCacheHitStats;
+    private final OpStatsLogger readCacheMissStats;
+    private final OpStatsLogger readAheadBatchCountStats;
+    private final OpStatsLogger readAheadBatchSizeStats;
+    private final OpStatsLogger flushStats;
+    private final OpStatsLogger flushSizeStats;
 
-    private Counter throttledWriteRequests;
-    private Counter rejectedWriteRequests;
+    private final Counter throttledWriteRequests;
+    private final Counter rejectedWriteRequests;
+
+    static final String READ_AHEAD_CACHE_BATCH_SIZE = 
"dbStorage_readAheadCacheBatchSize";
+    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
+
+    private static final long DEFAULT_MAX_THROTTLE_TIME_MILLIS = 
TimeUnit.SECONDS.toMillis(10);
+
+    public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, 
LedgerManager ledgerManager,
+            LedgerDirsManager ledgerDirsManager, LedgerDirsManager 
indexDirsManager, StateManager stateManager,
+            CheckpointSource checkpointSource, Checkpointer checkpointer, 
StatsLogger statsLogger,
+            ScheduledExecutorService gcExecutor, long writeCacheSize, long 
readCacheSize) throws IOException {
 
-    @Override
-    public void initialize(ServerConfiguration conf, LedgerManager 
ledgerManager, LedgerDirsManager ledgerDirsManager,
-            LedgerDirsManager indexDirsManager, StateManager stateManager, 
CheckpointSource checkpointSource,
-            Checkpointer checkpointer, StatsLogger statsLogger) throws 
IOException {
         checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
                 "Db implementation only allows for one storage dir");
 
         String baseDir = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+        log.info("Creating single directory db ledger storage on {}", baseDir);
 
-        writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, 
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
-
-        writeCache = new WriteCache(writeCacheMaxSize / 2);
-        writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+        this.writeCacheMaxSize = writeCacheSize;
+        this.writeCache = new WriteCache(writeCacheMaxSize / 2);
+        this.writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
 
         this.checkpointSource = checkpointSource;
 
-        readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, 
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
+        readCacheMaxSize = readCacheSize;
         readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, 
DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
 
-        long maxThrottleTimeMillis = conf.getLong(MAX_THROTTLE_TIME_MILLIS, 
DEFAUL_MAX_THROTTLE_TIME_MILLIS);
+        long maxThrottleTimeMillis = 
conf.getLong(DbLedgerStorage.MAX_THROTTLE_TIME_MILLIS,
+                DEFAULT_MAX_THROTTLE_TIME_MILLIS);
         maxThrottleTimeNanos = 
TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
 
         readCache = new ReadCache(readCacheMaxSize);
 
         this.stats = statsLogger;
 
-        log.info("Started Db Ledger Storage");
-        log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
-        log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
-        log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
-
         ledgerIndex = new LedgerMetadataIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
         entryLocationIndex = new EntryLocationIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
 
         transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
                 Runtime.getRuntime().availableProcessors() * 2);
-        
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo, 
LEDGER_INFO_CACHING_TIME_MINUTES,
-                LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
+        
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo,
+                TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
+                TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, 
TimeUnit.MINUTES);
 
         entryLogger = new EntryLogger(conf, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, 
statsLogger);
 
-        registerStats();
-    }
-
-    /**
-     * Evict all the ledger info object that were not used recently.
-     */
-    private void cleanupStaleTransientLedgerInfo() {
-        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
-            boolean isStale = ledgerInfo.isStale();
-            if (isStale) {
-                ledgerInfo.close();
-            }
-
-            return isStale;
-        });
-    }
-
-    public void registerStats() {
         stats.registerGauge("write-cache-size", new Gauge<Long>() {
             @Override
             public Long getDefaultValue() {
@@ -396,6 +238,27 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
     }
 
     @Override
+    public void initialize(ServerConfiguration conf, LedgerManager 
ledgerManager, LedgerDirsManager ledgerDirsManager,
+            LedgerDirsManager indexDirsManager, StateManager stateManager, 
CheckpointSource checkpointSource,
+            Checkpointer checkpointer, StatsLogger statsLogger) throws 
IOException {
+        /// Initialized in constructor
+    }
+
+    /**
+     * Evict all the ledger info object that were not used recently.
+     */
+    private void cleanupStaleTransientLedgerInfo() {
+        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
+            boolean isStale = ledgerInfo.isStale();
+            if (isStale) {
+                ledgerInfo.close();
+            }
+
+            return isStale;
+        });
+    }
+
+    @Override
     public void start() {
         gcThread.start();
     }
@@ -505,7 +368,7 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
             try {
                 inserted = writeCache.put(ledgerId, entryId, entry);
             } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
+                writeCacheRotationLock.unlockRead(stamp);
             }
         }
 
@@ -548,7 +411,7 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
                     return;
                 }
             } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
+                writeCacheRotationLock.unlockRead(stamp);
             }
 
             // Wait some time and try again
@@ -922,8 +785,8 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
     @Override
     public long getLastAddConfirmed(long ledgerId) throws IOException {
         TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
-        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : 
NOT_ASSIGNED_LAC;
-        if (lac == NOT_ASSIGNED_LAC) {
+        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : 
TransientLedgerInfo.NOT_ASSIGNED_LAC;
+        if (lac == TransientLedgerInfo.NOT_ASSIGNED_LAC) {
             ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
             try {
                 bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
@@ -1040,42 +903,20 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
         logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
     }
 
-    /**
-     * Reads ledger index entries to get list of entry-logger that contains 
given ledgerId.
-     *
-     * @param ledgerId
-     * @param serverConf
-     * @param processor
-     * @throws IOException
-     */
-    public static void readLedgerIndexEntries(long ledgerId, 
ServerConfiguration serverConf,
-            LedgerLoggerProcessor processor) throws IOException {
+    long getWriteCacheSize() {
+        return writeCache.size() + writeCacheBeingFlushed.size();
+    }
 
-        checkNotNull(serverConf, "ServerConfiguration can't be null");
-        checkNotNull(processor, "LedgerLoggger info processor can't null");
+    long getWriteCacheCount() {
+        return writeCache.count() + writeCacheBeingFlushed.count();
+    }
 
-        LedgerDirsManager ledgerDirsManager = new 
LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
-                new DiskChecker(serverConf.getDiskUsageThreshold(), 
serverConf.getDiskUsageWarnThreshold()));
-        String ledgerBasePath = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+    long getReadCacheSize() {
+        return readCache.size();
+    }
 
-        EntryLocationIndex entryLocationIndex = new 
EntryLocationIndex(serverConf,
-                (path, dbConfigType, conf1) -> new 
KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
-                ledgerBasePath, NullStatsLogger.INSTANCE);
-        try {
-            long lastEntryId = 
entryLocationIndex.getLastEntryInLedger(ledgerId);
-            for (long currentEntry = 0; currentEntry <= lastEntryId; 
currentEntry++) {
-                long offset = entryLocationIndex.getLocation(ledgerId, 
currentEntry);
-                if (offset <= 0) {
-                    // entry not found in this bookie
-                    continue;
-                }
-                long entryLogId = offset >> 32L;
-                long position = offset & 0xffffffffL;
-                processor.process(currentEntry, entryLogId, position);
-            }
-        } finally {
-            entryLocationIndex.close();
-        }
+    long getReadCacheCount() {
+        return readCache.count();
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
new file mode 100644
index 0000000..27d63e8
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static 
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.common.util.Watchable;
+import org.apache.bookkeeper.common.util.Watcher;
+
+/**
+ * This class borrows the logic from FileInfo.
+ *
+ * <p>This class is used for holding all the transient states for a given 
ledger.
+ */
+class TransientLedgerInfo extends 
Watchable<LastAddConfirmedUpdateNotification> implements AutoCloseable {
+
+    static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
+
+    static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
+
+    // lac
+    private volatile long lac = NOT_ASSIGNED_LAC;
+    // request from explicit lac requests
+    private ByteBuffer explicitLac = null;
+    // is the ledger info closed?
+    private boolean isClosed;
+
+    private final long ledgerId;
+    // reference to LedgerMetadataIndex
+    private final LedgerMetadataIndex ledgerIndex;
+
+    private long lastAccessed;
+
+    /**
+     * Construct an Watchable with zero watchers.
+     */
+    public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) 
{
+        super(WATCHER_RECYCLER);
+        this.ledgerId = ledgerId;
+        this.ledgerIndex = ledgerIndex;
+        this.lastAccessed = System.currentTimeMillis();
+    }
+
+    long getLastAddConfirmed() {
+        return lac;
+    }
+
+    long setLastAddConfirmed(long lac) {
+        long lacToReturn;
+        boolean changed = false;
+        synchronized (this) {
+            if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
+                this.lac = lac;
+                changed = true;
+                lastAccessed = System.currentTimeMillis();
+            }
+            lacToReturn = this.lac;
+        }
+        if (changed) {
+            notifyWatchers(lacToReturn);
+        }
+        return lacToReturn;
+    }
+
+    synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
+            Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
+        lastAccessed = System.currentTimeMillis();
+        if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || 
ledgerIndex.get(ledgerId).getFenced()) {
+            return false;
+        }
+
+        addWatcher(watcher);
+        return true;
+    }
+
+    public ByteBuf getExplicitLac() {
+        ByteBuf retLac = null;
+        synchronized (this) {
+            if (explicitLac != null) {
+                retLac = Unpooled.buffer(explicitLac.capacity());
+                explicitLac.rewind(); // copy from the beginning
+                retLac.writeBytes(explicitLac);
+                explicitLac.rewind();
+                return retLac;
+            }
+        }
+        return retLac;
+    }
+
+    public void setExplicitLac(ByteBuf lac) {
+        long explicitLacValue;
+        synchronized (this) {
+            if (explicitLac == null) {
+                explicitLac = ByteBuffer.allocate(lac.capacity());
+            }
+            lac.readBytes(explicitLac);
+            explicitLac.rewind();
+
+            // skip the ledger id
+            explicitLac.getLong();
+            explicitLacValue = explicitLac.getLong();
+            explicitLac.rewind();
+
+            lastAccessed = System.currentTimeMillis();
+        }
+        setLastAddConfirmed(explicitLacValue);
+    }
+
+    boolean isStale() {
+        return (lastAccessed + 
TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
+                .currentTimeMillis();
+    }
+
+    void notifyWatchers(long lastAddConfirmed) {
+        notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, 
lastAddConfirmed);
+    }
+
+    @Override
+    public void close() {
+        synchronized (this) {
+            if (isClosed) {
+                return;
+            }
+            isClosed = true;
+        }
+        // notify watchers
+        notifyWatchers(Long.MAX_VALUE);
+    }
+
+}
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index f4d67f2..67c69fe 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -220,7 +220,8 @@ public class DbLedgerStorageTest {
         storage.addEntry(entry3);
 
         // Simulate bookie compaction
-        EntryLogger entryLogger = ((DbLedgerStorage) storage).getEntryLogger();
+        SingleDirectoryDbLedgerStorage singleDirStorage = ((DbLedgerStorage) 
storage).getLedgerStorageList().get(0);
+        EntryLogger entryLogger = singleDirStorage.getEntryLogger();
         // Rewrite entry-3
         ByteBuf newEntry3 = Unpooled.buffer(1024);
         newEntry3.writeLong(4); // ledger id
@@ -229,7 +230,7 @@ public class DbLedgerStorageTest {
         long location = entryLogger.addEntry(4, newEntry3, false);
 
         List<EntryLocation> locations = Lists.newArrayList(new 
EntryLocation(4, 3, location));
-        storage.updateEntriesLocations(locations);
+        singleDirStorage.updateEntriesLocations(locations);
 
         ByteBuf res = storage.getEntry(4, 3);
         System.out.println("res:       " + ByteBufUtil.hexDump(res));
@@ -238,20 +239,18 @@ public class DbLedgerStorageTest {
     }
 
     @Test
-    public void doubleDirectoryError() throws Exception {
+    public void doubleDirectory() throws Exception {
         int gcWaitTime = 1000;
         ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
         conf.setGcWaitTime(gcWaitTime);
         conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
         conf.setLedgerDirNames(new String[] { "dir1", "dir2" });
 
-        try {
-            new Bookie(conf);
-            fail("Should have failed because of the 2 directories");
-        } catch (IllegalArgumentException e) {
-            // ok
-        }
+        // Should not fail
+        Bookie bookie = new Bookie(conf);
+        assertEquals(2, ((DbLedgerStorage) 
bookie.getLedgerStorage()).getLedgerStorageList().size());
 
+        bookie.shutdown();
     }
 
     @Test
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index 4894814..7a45ee7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -28,11 +28,18 @@ import io.netty.buffer.Unpooled;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,29 +55,49 @@ public class DbLedgerStorageWriteCacheTest {
     private static class MockedDbLedgerStorage extends DbLedgerStorage {
 
         @Override
-        public void flush() throws IOException {
-            flushMutex.lock();
-            try {
-                // Swap the write caches and block indefinitely to simulate a 
slow disk
-                WriteCache tmp = writeCacheBeingFlushed;
-                writeCacheBeingFlushed = writeCache;
-                writeCache = tmp;
-
-                // since the cache is switched, we can allow flush to be 
triggered
-                hasFlushBeenTriggered.set(false);
-
-                // Block the flushing thread
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    return;
-                }
-            } finally {
-                flushMutex.unlock();
-            }
+        protected SingleDirectoryDbLedgerStorage 
newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
+                LedgerManager ledgerManager, LedgerDirsManager 
ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                StateManager stateManager, CheckpointSource checkpointSource, 
Checkpointer checkpointer,
+                StatsLogger statsLogger, ScheduledExecutorService gcExecutor, 
long writeCacheSize, long readCacheSize)
+                throws IOException {
+            return new MockedSingleDirectoryDbLedgerStorage(conf, 
ledgerManager, ledgerDirsManager, indexDirsManager,
+                    stateManager, checkpointSource, checkpointer, statsLogger, 
gcExecutor, writeCacheSize,
+                    readCacheSize);
         }
 
+        private static class MockedSingleDirectoryDbLedgerStorage extends 
SingleDirectoryDbLedgerStorage {
+            public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration 
conf, LedgerManager ledgerManager,
+                    LedgerDirsManager ledgerDirsManager, LedgerDirsManager 
indexDirsManager, StateManager stateManager,
+                    CheckpointSource checkpointSource, Checkpointer 
checkpointer, StatsLogger statsLogger,
+                    ScheduledExecutorService gcExecutor, long writeCacheSize, 
long readCacheSize) throws IOException {
+                super(conf, ledgerManager, ledgerDirsManager, 
indexDirsManager, stateManager, checkpointSource,
+                        checkpointer, statsLogger, gcExecutor, writeCacheSize, 
readCacheSize);
+            }
+
+          @Override
+          public void flush() throws IOException {
+              flushMutex.lock();
+              try {
+                  // Swap the write caches and block indefinitely to simulate 
a slow disk
+                  WriteCache tmp = writeCacheBeingFlushed;
+                  writeCacheBeingFlushed = writeCache;
+                  writeCache = tmp;
+
+                  // since the cache is switched, we can allow flush to be 
triggered
+                  hasFlushBeenTriggered.set(false);
+
+                  // Block the flushing thread
+                  try {
+                      Thread.sleep(1000);
+                  } catch (InterruptedException e) {
+                      Thread.currentThread().interrupt();
+                      return;
+                  }
+              } finally {
+                  flushMutex.unlock();
+              }
+          }
+        }
     }
 
     @Before

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to