[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-16 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181924215
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +812,418 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * flush both current and rotated logs.
+ */
+void flush() throws IOException;
+
+/*
+ * close current logs.
+ */
+void close() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceClose();
+
+/*
+ *
+ */
+void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
+/*
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
+boolean commitEntryMemTableFlush() throws IOException;
+}
+
+abstract class EntryLogManagerBase implements EntryLogManager {
+volatile List rotatedLogChannels;
+
+private final FastThreadLocal sizeBufferForAdd = new 
FastThreadLocal() {
+@Override
+protected ByteBuf initialValue() throws Exception {
+return Unpooled.buffer(4);
+}
+};
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = 
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+ByteBuf sizeBuffer = sizeBufferForAdd.get();
+sizeBuffer.clear();
+sizeBuffer.writeInt(entry.readableBytes());
+logChannel.write(sizeBuffer);
+
+long pos = logChannel.position();
+logChannel.write(entry);
+logChannel.registerWrittenEntry(ledger, entrySize);
+
+return (logChannel.getLogId() << 32L) | pos;
+}
+
+boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
+if (logChannel == null) {
+return false;
+}
+return logChannel.position() + size > logSizeLimit;
+}
+
+boolean readEntryLogHardLimit(BufferedLogChannel 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-16 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181630491
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +810,392 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * flush both current and rotated logs.
+ */
+void flush() throws IOException;
+
+/*
+ * close current logs.
+ */
+void close() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceClose();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ */
+void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
+/*
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
+ *
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final List rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = new 
CopyOnWriteArrayList();
 
 Review comment:
   > LinkedList iterator will fail because "if the list is structurally 
modified at any time after the iterator is created, in any way except through 
the Iterator's own remove or add methods, the iterator will throw a 
ConcurrentModificationException."
   
   I see the access to `rotatedLogChannels` are under synchronized block. so I 
am not sure when this `ConcurrentModificationException` can happen. If it can 
happen, please write a test to reproduce it and show your fix actually work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-16 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181630193
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +810,392 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * flush both current and rotated logs.
+ */
+void flush() throws IOException;
+
+/*
+ * close current logs.
+ */
+void close() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceClose();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ */
+void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
+/*
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
+ *
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final List rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = new 
CopyOnWriteArrayList();
 }
-if (null == channels) {
-return;
+
+private final FastThreadLocal sizeBufferForAdd = new 
FastThreadLocal() {
+@Override
+protected ByteBuf initialValue() throws Exception {
+return Unpooled.buffer(4);
+}
+};
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = 
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+ByteBuf sizeBuffer = sizeBufferForAdd.get();
+sizeBuffer.clear();
+sizeBuffer.writeInt(entry.readableBytes());
+logChannel.write(sizeBuffer);
+
+long pos = logChannel.position();
+logChannel.write(entry);
+

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-15 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181599453
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +810,392 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * flush both current and rotated logs.
+ */
+void flush() throws IOException;
+
+/*
+ * close current logs.
+ */
+void close() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceClose();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ */
+void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
+/*
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
+ *
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final List rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = new 
CopyOnWriteArrayList();
 
 Review comment:
   based on my understanding, you will use Set for per-ledger manager. why not 
move `rotatedLogChannels` to single-log manager, and keep the original logic 
unchanged? because I am not convinced that we need to change this to 
`CopyOnWriteArrayList`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-15 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181599541
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +810,392 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * flush both current and rotated logs.
+ */
+void flush() throws IOException;
+
+/*
+ * close current logs.
+ */
+void close() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceClose();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ */
+void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
+/*
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
+ *
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final List rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = new 
CopyOnWriteArrayList();
 }
-if (null == channels) {
-return;
+
+private final FastThreadLocal sizeBufferForAdd = new 
FastThreadLocal() {
+@Override
+protected ByteBuf initialValue() throws Exception {
+return Unpooled.buffer(4);
+}
+};
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = 
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+ByteBuf sizeBuffer = sizeBufferForAdd.get();
+sizeBuffer.clear();
+sizeBuffer.writeInt(entry.readableBytes());
+logChannel.write(sizeBuffer);
+
+long pos = logChannel.position();
+logChannel.write(entry);
+

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180948843
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
 
 Review comment:
   > I removed the need of LedgerDirsListener and another flag variable - 
shouldCreateNewEntryLog. Instead I check if the dir is full by calling 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180948843
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
 
 Review comment:
   > I removed the need of LedgerDirsListener and another flag variable - 
shouldCreateNewEntryLog. Instead I check if the dir is full by calling 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180947466
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
+: 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180945411
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
 
 Review comment:
   okay that's find, it you want to leave `rollLogs` here, I am fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180945411
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
 
 Review comment:
   okay that's fine, it you want to leave `rollLogs` here, I am fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180944411
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
 
 Review comment:
   >  it is incorrect to have non-synchronized version of List for 
rotatedLogChannels. I'll use CopyOnWriteArrayList, so that the list can be 
modified while its iterator is being used.
   
   why? the original code is synchronizing the rotatedLogChannels list 
properly. If you just move the logic into the abstraction, this logic should 
work. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180944180
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
+: 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180943513
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
 
 Review comment:
   > it is not much of a change here. There are minor changes here and these 
changes make code more organized and appropriate.
   
   understand you want to reorganize 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180942977
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180942749
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180524406
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -173,7 +175,7 @@ public void checkpoint(final Checkpoint checkpoint) throws 
IOException {
 // it means bytes might live at current active entry log, we need
 // roll current entry log and then issue checkpoint to underlying
 // interleaved ledger storage.
-entryLogger.rollLog();
+entryLogger.rollLogs();
 
 Review comment:
   I think `numBytesFlushed > 0` logic is a behavior that specific to 
single-log manager. `rollLogs` is a not proper term for the meaning here. I 
would suggest adding a method in the EntryLogManager:
   
   `void prepareCheckpoint(Checkpoint checkpoint, long 
numBytesFlushedBetweenCheckpoints)` -> it means prepare checkpoint in the entry 
logger. 
   
   Then logic here can be changed to:
   
   ```
   long numBytesFlushed = memTable.flush(this, checkpoint);
   entryLogger.prepareCheckpoint(checkpoint, numBytesFlushed);
   super.checkpoint(checkpoint);
   ```
   
   in single-log manager, you can implement 
   
   ```
   void prepareCheckpoint(Checkpoint checkpoint, long 
numBytesFlushedBetweenCheckpoint) {
   if (numBytesFlushedBetweenCheckpoint > 0) {
   rollLogs();
   }
   }
   ```
   
   in per-ledger-log manager, it is a no-op.
   
   so in this way, you can get rid of `rollLogs` in EntryLogManager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180514001
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
 
 Review comment:
   I am not sure we need to put this rotatedLogChannels in this base class. for 
two reasons, 1) there is really no commonality between single-log manager and 
per-ledger-log manager around checkpoint/rotation.  I don't think it is 
necessarily to put them in the single model. 2) I would prefer using an ordered 
structure (before this change, it is a `list`), because entry log file rotation 
in single-log manager is in order and checkpoint logic is in order. changing 
single-log manager to use `Set` could expose unknown potential bugs, which is a 
risky change to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180514235
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
+: 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180515728
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
+: 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180516776
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
 
 Review comment:
   `flushRotatedLogs` and `flushCurrentLogs` are only used in `flush()` method, 
why not just provide `flush()` method in EntryLogManager, like `checkpoint`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180505515
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -120,21 +124,104 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 return entryLogMetadata.getLedgersMap();
 }
 
+public Long getLedgerIdAssigned() {
+return ledgerIdAssigned;
+}
+
+public void setLedgerIdAssigned(Long ledgerId) {
+this.ledgerIdAssigned = ledgerId;
+}
+
 @Override
 public String toString() {
 return MoreObjects.toStringHelper(BufferedChannel.class)
 .add("logId", logId)
 .add("logFile", logFile)
+.add("ledgerIdAssigned", ledgerIdAssigned)
 .toString();
 }
+
+/**
+ * Append the ledger map at the end of the entry log.
+ * Updates the entry log file header with the offset and size of the 
map.
+ */
+private void appendLedgersMap() throws IOException {
+
+long ledgerMapOffset = this.position();
+
+ConcurrentLongLongHashMap ledgersMap = this.getLedgersMap();
+int numberOfLedgers = (int) ledgersMap.size();
+
+// Write the ledgers map into several batches
+
+final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
+
+try {
+ledgersMap.forEach(new BiConsumerLong() {
+int remainingLedgers = numberOfLedgers;
+boolean startNewBatch = true;
+int remainingInBatch = 0;
+
+@Override
+public void accept(long ledgerId, long size) {
+if (startNewBatch) {
+int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
+int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
+
+serializedMap.clear();
+serializedMap.writeInt(ledgerMapSize - 4);
+serializedMap.writeLong(INVALID_LID);
+serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+serializedMap.writeInt(batchSize);
+
+startNewBatch = false;
+remainingInBatch = batchSize;
+}
+// Dump the ledger in the current batch
+serializedMap.writeLong(ledgerId);
+serializedMap.writeLong(size);
+--remainingLedgers;
+
+if (--remainingInBatch == 0) {
+// Close current batch
+try {
+write(serializedMap);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+startNewBatch = true;
+}
+}
+});
+} catch (RuntimeException e) {
+if (e.getCause() instanceof IOException) {
+throw (IOException) e.getCause();
+} else {
+throw e;
+}
+} finally {
+serializedMap.release();
+}
+// Flush the ledger's map out before we write the header.
+// Otherwise the header might point to something that is not fully
+// written
+super.flush();
+
+// Update the headers with the map offset and count of ledgers
+ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
+mapInfo.putLong(ledgerMapOffset);
+mapInfo.putInt(numberOfLedgers);
+mapInfo.flip();
+this.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
+}
 }
 
-volatile File currentDir;
 private final LedgerDirsManager ledgerDirsManager;
 private final boolean entryLogPerLedgerEnabled;
-private final AtomicBoolean shouldCreateNewEntryLog = new 
AtomicBoolean(false);
 
-private volatile long leastUnflushedLogId;
+RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
 
 Review comment:
   final


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180520311
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -78,6 +79,7 @@ public void initialize(ServerConfiguration conf,
 .setNameFormat("SortedLedgerStorage-%d")
 .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 
2).build());
 this.stateManager = stateManager;
+this.isTransactionalCompactionEnabled = 
conf.getUseTransactionalCompaction();
 
 Review comment:
   I don't think we need this flag any more, no?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180520230
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
 ##
 @@ -54,7 +49,7 @@ protected boolean removeEntryLog(long entryLogId) {
 }
 
 @Override
-public synchronized long addEntry(long ledger, ByteBuffer entry) throws 
IOException {
+public synchronized long addEntry(Long ledgerId, ByteBuffer entry) throws 
IOException {
 
 Review comment:
   @reddycharan you can still use long for retreiving from a Long hashmap. why 
do you need to change here? in any cases long to Long is a boxing operation, if 
single-log manager doesn't need Long, it should be deferred to where it is 
really needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180517926
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
 
 Review comment:
   the `closeCurrentLogs` and `forceCloseCurrentLogs` are the abstraction of 
the logic during `closing`. I think it is better to call it `close` and 
`closeNow` (or `forceClose`) - meaning closing the manager, force closing the 
manager. This would provide a better meaning and it is similar as `shutdown` 
and `shutdownNow` in scheduler.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180525552
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
 ##
 @@ -93,16 +98,16 @@ public void testCreateNewLog() throws Exception {
 // Extracted from createNewLog()
 String logFileName = Long.toHexString(1) + ".log";
 File dir = ledgerDirsManager.pickRandomWritableDir();
-LOG.info("Picked this directory: " + dir);
+LOG.info("Picked this directory: {}", dir);
 File newLogFile = new File(dir, logFileName);
 newLogFile.createNewFile();
 
 EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
 // Calls createNewLog, and with the number of directories we
 // are using, if it picks one at random it will fail.
-el.createNewLog();
-LOG.info("This is the current log id: " + el.getCurrentLogId());
-assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+((EntryLogManagerBase) el.entryLogManager).createNewLog(0L);
 
 Review comment:
   - nit: provide a `getEntryLogManager` in EntryLogger. so we can mock it in 
future if needed.
   - better to cast to `SingleEntryLogManager`. because you can expose 
`getCurrentLogId` in `SingleEntryLogManager`.
   
   ```
   SingleEntryLogManager selm = (SingleEntryLogManager) 
(el.getEntryLogManager());
   selm.createNewLog(0L);
   assertTrue(selm.getCurrentLogId() > 1);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180507966
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180510817
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -880,39 +1195,6 @@ protected ByteBuf initialValue() throws Exception {
 }
 };
 
-public synchronized long addEntry(long ledger, ByteBuf entry, boolean 
rollLog) throws IOException {
-if (null == logChannel) {
-// log channel can be null because the file is deferred to be 
created when no writable ledger directory
-// is available.
-createNewLog();
-}
-
-int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
-boolean reachEntryLogLimit =
-rollLog ? reachEntryLogLimit(entrySize) : 
readEntryLogHardLimit(entrySize);
-// Create new log if logSizeLimit reached or current disk is full
-boolean createNewLog = shouldCreateNewEntryLog.get();
 
 Review comment:
   any reason why `shouldCreateNewEntryLog` is removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180510213
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
 
 Review comment:
   you are changing logic while moving code around for abstracting interface. 
This makes review really hard and painful.  you should just move the code first 
and 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180507116
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180516551
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
 
 Review comment:
   after the change of `prepareMemtableFlush` and `commitMemtableFlush`, 
rollLogs seems to be only the behavior of single-log manager. I don't think you 
need `rollLogs` here, unless I missed something.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180508758
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180508334
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180505208
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -92,13 +99,10 @@
 private final long logId;
 private final EntryLogMetadata entryLogMetadata;
 private final File logFile;
+private Long ledgerIdAssigned = UNASSIGNED_LEDGERID;
 
 Review comment:
   use `long`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180505355
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -84,6 +88,9 @@
  */
 public class EntryLogger {
 private static final Logger LOG = 
LoggerFactory.getLogger(EntryLogger.class);
+static final Long UNASSIGNED_LEDGERID = Long.valueOf(-1);
 
 Review comment:
   `long` please. 
   
   `long UNASSIGNED_LEDGERID = -1L`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-04 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179294437
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
 ##
 @@ -54,7 +49,7 @@ protected boolean removeEntryLog(long entryLogId) {
 }
 
 @Override
-public synchronized long addEntry(long ledger, ByteBuffer entry) throws 
IOException {
+public synchronized long addEntry(Long ledgerId, ByteBuffer entry) throws 
IOException {
 
 Review comment:
   I see you changed "long" to "Long" in a lot of places. what is the 
consideration behind this? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-04 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179293507
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -393,12 +513,37 @@ public BufferedReadChannel getFromChannels(long logId) {
  *
  * @return least unflushed log id.
  */
-synchronized long getLeastUnflushedLogId() {
-return leastUnflushedLogId;
+long getLeastUnflushedLogId() {
+return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
 }
 
-synchronized long getCurrentLogId() {
-return logChannel.getLogId();
+long getPreviousAllocatedEntryLogId() {
+return entryLoggerAllocator.getPreallocatedLogId();
+}
+
+boolean rollLogsIfEntryLogLimitReached() throws IOException {
 
 Review comment:
   why not make `rollLogsIfEntryLogLimitReached` as part of `EntryLogManager`?
   
   so simple entry log manager is much simpler. you can do these heavy lifting 
management in per ledger entry log implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-04 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179294075
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -802,88 +881,230 @@ private long readLastLogId(File f) {
 }
 }
 
+interface EntryLogManager {
+/*
+ * acquire lock for this ledger.
+ */
+void acquireLock(Long ledgerId);
+
+/*
+ * acquire lock for this ledger if it is not already available for this
+ * ledger then it will create a new one and then acquire lock.
+ */
+void acquireLockByCreatingIfRequired(Long ledgerId);
+
+/*
+ * release lock for this ledger
+ */
+void releaseLock(Long ledgerId);
+
+/*
+ * sets the logChannel for the given ledgerId. The previous one will be
+ * removed from replicaOfCurrentLogChannels. Previous logChannel will 
be
+ * added to rotatedLogChannels.
+ */
+void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel 
logChannel);
+
+/*
+ * gets the logChannel for the given ledgerId.
+ */
+BufferedLogChannel getCurrentLogForLedger(Long ledgerId);
+
+/*
+ * gets the copy of rotatedLogChannels
+ */
+Set getCopyOfRotatedLogChannels();
+
+/*
+ * gets the copy of replicaOfCurrentLogChannels
+ */
+Set getCopyOfCurrentLogs();
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * removes the logChannel from rotatedLogChannels collection
+ */
+void removeFromRotatedLogChannels(BufferedLogChannel 
rotatedLogChannelToRemove);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * gets the log id of the current activeEntryLog. for
+ * EntryLogManagerForSingleEntryLog it returns logid of current
+ * activelog, for EntryLogManagerForEntryLogPerLedger it would return
+ * some constant value.
+ */
+long getCurrentLogId();
+}
+
+class EntryLogManagerForSingleEntryLog implements EntryLogManager {
+
+private volatile BufferedLogChannel activeLogChannel;
+private Lock lockForActiveLogChannel;
+private final Set rotatedLogChannels;
+
+EntryLogManagerForSingleEntryLog() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
+lockForActiveLogChannel = new ReentrantLock();
+}
+
+/*
+ * since entryLogPerLedger is not enabled, it is just one lock for all
+ * ledgers.
+ */
+@Override
+public void acquireLock(Long ledgerId) {
+lockForActiveLogChannel.lock();
+}
+
+@Override
+public void acquireLockByCreatingIfRequired(Long ledgerId) {
+acquireLock(ledgerId);
+}
+
+@Override
+public void releaseLock(Long ledgerId) {
+lockForActiveLogChannel.unlock();
+}
+
+@Override
+public void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel 
logChannel) {
+acquireLock(ledgerId);
+try {
+BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+activeLogChannel = logChannel;
+if (hasToRotateLogChannel != null) {
+rotatedLogChannels.add(hasToRotateLogChannel);
+}
+} finally {
+releaseLock(ledgerId);
+}
+}
+
+@Override
+public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) {
+return activeLogChannel;
+}
+
+@Override
+public Set getCopyOfRotatedLogChannels() {
+return new HashSet(rotatedLogChannels);
+}
+
+@Override
+public Set getCopyOfCurrentLogs() {
+HashSet copyOfCurrentLogs = new 
HashSet();
+copyOfCurrentLogs.add(activeLogChannel);
+return copyOfCurrentLogs;
+}
+
+@Override
+public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+if ((activeLogChannelTemp != null) && 
(activeLogChannelTemp.getLogId() == entryLogId)) {
+return activeLogChannelTemp;
+}
+

[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-04 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179290558
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -83,18 +88,18 @@
  */
 public class EntryLogger {
 private static final Logger LOG = 
LoggerFactory.getLogger(EntryLogger.class);
+private static final Long INVALID_LEDGERID = Long.valueOf(-1);
+// log file suffix
+private static final String LOG_FILE_SUFFIX = ".log";
 
 static class BufferedLogChannel extends BufferedChannel {
 private final long logId;
 private final EntryLogMetadata entryLogMetadata;
 private final File logFile;
+private Long ledgerId = INVALID_LEDGERID;
 
 Review comment:
   I'd prefer naming it to something meaningful, like 'ledgerIdAssigned', 
meaning it is a entry log file that assigned to a specific ledger.
   
   make it final if possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-04 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179290810
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -116,21 +121,103 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 return entryLogMetadata.getLedgersMap();
 }
 
+public Long getLedgerId() {
+return ledgerId;
+}
+
+public void setLedgerId(Long ledgerId) {
 
 Review comment:
   when the ledger id can be changed? if not, make this field final


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-04 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179290902
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -116,21 +121,103 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 return entryLogMetadata.getLedgersMap();
 }
 
+public Long getLedgerId() {
+return ledgerId;
+}
+
+public void setLedgerId(Long ledgerId) {
+this.ledgerId = ledgerId;
+}
+
 @Override
 public String toString() {
 return MoreObjects.toStringHelper(BufferedChannel.class)
 
 Review comment:
   Update the `toString` to include `ledgerIdAssigned`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-04 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179289148
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -83,18 +88,18 @@
  */
 public class EntryLogger {
 private static final Logger LOG = 
LoggerFactory.getLogger(EntryLogger.class);
+private static final Long INVALID_LEDGERID = Long.valueOf(-1);
 
 Review comment:
   use a special negative value instead of `-1`. also I don't think it is 
`INVALID_LEDGERID`, it is actually "UNASSIGNED". because it is used in 
`BufferedLogChannel` when a buffered channel is not specifically assigned to a 
ledger in single entry log manager.
   
   also use long instead of `Long` to avoid boxing/unboxing.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-27 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177532672
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -209,16 +211,39 @@ public void onSizeLimitReached(final Checkpoint cp) 
throws IOException {
 public void run() {
 try {
 LOG.info("Started flushing mem table.");
-long logIdBeforeFlush = entryLogger.getCurrentLogId();
+long prevAllocLogIdBeforeFlush = 
entryLogger.getPreviousAllocatedEntryLogId();
 
 Review comment:
   This change is not correct. Because it should be "current" log id, not 
"previous allocated entry log id". That says the behavior is different between 
preallocation enabled and disabled.
   
   That leads me to think that current entry log manager interface is not 
abstracted with right methods. because it abstracts a lot of methods around the 
underlying files, which underlying files are kind of tight with corresponding 
checkpointing and flushing logic. two different entry log manager can't really 
fit in this interface or checkpointing logic as well. 
   
   So as what I said in my previous comment, we need to think about what is the 
right methods to put in EntryLogManager.
   
   > I would suggest the EntryLogManager should hide the details on how to 
create log, allocate log,
   > flush/checkpoint logs. the details such as preallocationLogId, 
unflushedLogId and how to lock them should be > hidden to the implementation, 
rather than defining a too finer interface here.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-27 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177535531
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) 
throws IOException {
 public void run() {
 try {
 LOG.info("Started flushing mem table.");
-long logIdBeforeFlush = entryLogger.getCurrentLogId();
 memTable.flush(SortedLedgerStorage.this);
-long logIdAfterFlush = entryLogger.getCurrentLogId();
 // in any case that an entry log reaches the limit, we 
roll the log and start checkpointing.
 // if a memory table is flushed spanning over two entry 
log files, we also roll log. this is
 // for performance consideration: since we don't wanna 
checkpoint a new log file that ledger
 // storage is writing to.
-if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush 
!= logIdBeforeFlush) {
-LOG.info("Rolling entry logger since it reached size 
limitation");
-entryLogger.rollLog();
+if (entryLogger.rollLogsIfEntryLogLimitReached()) {
 
 Review comment:
   @ivankelly that's what I commented before. I don't think the methods in 
EntryLogManager provide the right abstraction. That's what I also dislike your 
approach on asking moving the class out at this PR. At this PR, we need to 
figure out what are the right methods to put in an EntryLogManager before any 
kind of code movement. You comments about inner classes complicate things.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-27 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177532166
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -78,6 +79,7 @@ public void initialize(ServerConfiguration conf,
 .setNameFormat("SortedLedgerStorage-%d")
 .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 
2).build());
 this.stateManager = stateManager;
+this.isTransactionalCompactionEnabled = 
conf.getUseTransactionalCompaction();
 
 Review comment:
   @ivankelly 
   
   > The external behaviour of SortedLedgerStorage should not change with this 
patch
   
   it is not an external behavior of SortedLedgerStorage. It is the logic of 
SortedLedgerStorage. SortedLedgerStorage needs to realize the compaction 
mechanism to do checkpointing. That's related to the discussion below around 
checkpointing.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-23 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176658665
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) 
throws IOException {
 public void run() {
 try {
 LOG.info("Started flushing mem table.");
-long logIdBeforeFlush = entryLogger.getCurrentLogId();
 memTable.flush(SortedLedgerStorage.this);
-long logIdAfterFlush = entryLogger.getCurrentLogId();
 // in any case that an entry log reaches the limit, we 
roll the log and start checkpointing.
 // if a memory table is flushed spanning over two entry 
log files, we also roll log. this is
 // for performance consideration: since we don't wanna 
checkpoint a new log file that ledger
 // storage is writing to.
-if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush 
!= logIdBeforeFlush) {
-LOG.info("Rolling entry logger since it reached size 
limitation");
-entryLogger.rollLog();
+if (entryLogger.rollLogsIfEntryLogLimitReached()) {
 
 Review comment:
   @reddycharan well, the existence of the code had its reason. performance is 
one, I think the biggest problem is "checkpoint" logic can potentially never be 
triggered.
   
   - during memtable flushes, rollLog is disabled for appending memtable 
entries.
   - however background compaction can also advance entrylog, since it is 
appending entries into same entrylog.
   - if a memtable flush spanning over two entry log files, 
`rollLogsIfEntryLogLimitReached` will most likely be false, so no checkpoint 
will happen.
   - in the next memtable flush, if it is spanning over two entry log files 
again, then `rollLogsIfEntryLogLimitReached` still be false, no checkpoint will 
happen.
   
   This was actually a behavior we observed at Twitter 3 years ago. I added 
this logic to prevent this happen. see 
twitter/bookkeeper@a7a8c88eaa2659b5d6c4e29f4eb9e2d184c7cfe0


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-23 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176654536
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -167,14 +167,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) 
throws IOException {
 
 @Override
 public void checkpoint(final Checkpoint checkpoint) throws IOException {
-long numBytesFlushed = memTable.flush(this, checkpoint);
-if (numBytesFlushed > 0) {
-// if bytes are added between previous flush and this checkpoint,
-// it means bytes might live at current active entry log, we need
-// roll current entry log and then issue checkpoint to underlying
-// interleaved ledger storage.
-entryLogger.rollLog();
-}
+memTable.flush(this, checkpoint);
 
 Review comment:
   @reddycharan 
   
   > First of all I’m not sure, why this is added with this commit 81cbba3. 
   
   I believe there was whole discussion in the original PR introducing this. 
And I agreed with your logic in the original PR. I am not arguing again about 
the correctness.
   
   what I am suggesting here is this change is unrelated to the introduction of 
EntryLogManager.  This change is more related to the subsequent change your 
want to introduce. What I am suggesting is putting this change as part of your 
subsequent change, that is much clearer when people looking back into the 
histories. Because they are related. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176336059
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -802,88 +864,207 @@ private long readLastLogId(File f) {
 }
 }
 
+interface EntryLogManager {
+/*
+ * acquire lock for this ledger.
+ */
+void acquireLock(Long ledgerId);
 
 Review comment:
   Can we try to hide these locking stuffs?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176336283
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -802,88 +864,207 @@ private long readLastLogId(File f) {
 }
 }
 
+interface EntryLogManager {
+/*
+ * acquire lock for this ledger.
+ */
+void acquireLock(Long ledgerId);
+
+/*
+ * acquire lock for this ledger if it is not already available for this
+ * ledger then it will create a new one and then acquire lock.
+ */
+void acquireLockByCreatingIfRequired(Long ledgerId);
+
+/*
+ * release lock for this ledger
+ */
+void releaseLock(Long ledgerId);
+
+/*
+ * sets the logChannel for the given ledgerId. The previous one will be
+ * removed from replicaOfCurrentLogChannels. Previous logChannel will 
be
+ * added to rotatedLogChannels.
+ */
+void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel 
logChannel);
 
 Review comment:
   All the `currentLogForLedger` `copy of rotated log channels` `copy of 
current logs` are kind of implementation detail of an implementation of 
EntryLogManager. Can we think of a better abstraction to hide these?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176332343
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
 ##
 @@ -584,15 +584,6 @@ public void 
testWithDiskFullReadOnlyDisabledOrForceGCAllowDisabled() throws Exce
 } catch (NoWritableLedgerDirException e) {
 // expected
 }
-
 
 Review comment:
   Can you explain why these lines are removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176332723
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -1441,4 +1658,40 @@ static long fileName2LogId(String fileName) {
 static String logId2HexString(long logId) {
 return Long.toHexString(logId);
 }
-}
+
+/**
+ * Datastructure which maintains the status of logchannels. When a
+ * logChannel is created entry of < entryLogId, false > will be made to 
this
+ * sortedmap and when logChannel is rotated and flushed then the entry is
+ * updated to < entryLogId, true > and all the lowest entries with
+ * < entryLogId, true > status will be removed from the sortedmap. So that 
way
+ * we could get least unflushed LogId.
+ *
+ */
+static class RecentEntryLogsStatus {
+private SortedMap entryLogsStatusMap;
 
 Review comment:
   "private final"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176331968
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) 
throws IOException {
 public void run() {
 try {
 LOG.info("Started flushing mem table.");
-long logIdBeforeFlush = entryLogger.getCurrentLogId();
 memTable.flush(SortedLedgerStorage.this);
-long logIdAfterFlush = entryLogger.getCurrentLogId();
 // in any case that an entry log reaches the limit, we 
roll the log and start checkpointing.
 // if a memory table is flushed spanning over two entry 
log files, we also roll log. this is
 // for performance consideration: since we don't wanna 
checkpoint a new log file that ledger
 // storage is writing to.
-if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush 
!= logIdBeforeFlush) {
-LOG.info("Rolling entry logger since it reached size 
limitation");
-entryLogger.rollLog();
+if (entryLogger.rollLogsIfEntryLogLimitReached()) {
 
 Review comment:
   This changes performance characteristics.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176331262
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -116,6 +121,18 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 return entryLogMetadata.getLedgersMap();
 }
 
+public boolean isLedgerDirFull() {
 
 Review comment:
   it seems that you are converting this class to inner class just because you 
want to reference ledgerDirsManager. I don't think this is a good change. 
   
   I would prefer not reference `ledgerDirManager`. you should just add 
`getFile()` in this class and call 
`ledgerDirsManager.isDirFull(channel.getFile())`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176332233
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -167,14 +167,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) 
throws IOException {
 
 @Override
 public void checkpoint(final Checkpoint checkpoint) throws IOException {
-long numBytesFlushed = memTable.flush(this, checkpoint);
-if (numBytesFlushed > 0) {
-// if bytes are added between previous flush and this checkpoint,
-// it means bytes might live at current active entry log, we need
-// roll current entry log and then issue checkpoint to underlying
-// interleaved ledger storage.
-entryLogger.rollLog();
-}
+memTable.flush(this, checkpoint);
 
 Review comment:
   I would suggest leaving this change out of this PR. Let's discuss this when 
you introduce multiple entry log manager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services