[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-15 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181626498
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +810,392 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * flush both current and rotated logs.
+ */
+void flush() throws IOException;
+
+/*
+ * close current logs.
+ */
+void close() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceClose();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ */
+void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
+/*
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
+ *
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final List rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = new 
CopyOnWriteArrayList();
 
 Review comment:
   regarding rotatedlogs i don't see the need of having separate implementation 
for EntryLogManagerForSingleEntryLog and EntryLogManagerForEntryLogPerLedger 
thats why I kept it in EntryLogManagerBase. I don't have strong opinion on 
making this collection set or list, thats why I changed it back to List as we 
discussed (to minimize the change in behavior).
   
   But for the sake of correctness and simplicity it has to be concurrentlist. 
   for eg:
   In this line 'logChannelsToFlush' is called in the log line, which would in 
turn call toString method of List and toString method of List returns "the 
string representation consists of a list of the collection's elements in the 
order they are returned by its iterator". So while iterator is iterating if an 
entrylog is added to rotatedLogChannels, LinkedList iterator will fail because 
"if the list is structurally modified at any time after the iterator is 
created, in any way except through the Iterator's own remove or add methods, 
the iterator will throw a ConcurrentModificationException."
   
   

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-15 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181623201
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +810,392 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * flush both current and rotated logs.
+ */
+void flush() throws IOException;
+
+/*
+ * close current logs.
+ */
+void close() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceClose();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ */
+void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
+/*
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
+ *
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final List rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = new 
CopyOnWriteArrayList();
 }
-if (null == channels) {
-return;
+
+private final FastThreadLocal sizeBufferForAdd = new 
FastThreadLocal() {
+@Override
+protected ByteBuf initialValue() throws Exception {
+return Unpooled.buffer(4);
+}
+};
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = 
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+ByteBuf sizeBuffer = sizeBufferForAdd.get();
+sizeBuffer.clear();
+sizeBuffer.writeInt(entry.readableBytes());
+logChannel.write(sizeBuffer);
+
+long pos = logChannel.position();
+logChannel.write(entry);
+

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-15 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181623201
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +810,392 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * flush both current and rotated logs.
+ */
+void flush() throws IOException;
+
+/*
+ * close current logs.
+ */
+void close() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceClose();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ */
+void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
+/*
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
+ *
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final List rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = new 
CopyOnWriteArrayList();
 }
-if (null == channels) {
-return;
+
+private final FastThreadLocal sizeBufferForAdd = new 
FastThreadLocal() {
+@Override
+protected ByteBuf initialValue() throws Exception {
+return Unpooled.buffer(4);
+}
+};
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = 
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+ByteBuf sizeBuffer = sizeBufferForAdd.get();
+sizeBuffer.clear();
+sizeBuffer.writeInt(entry.readableBytes());
+logChannel.write(sizeBuffer);
+
+long pos = logChannel.position();
+logChannel.write(entry);
+

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180947780
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -950,19 +939,21 @@ public long addEntry(Long ledger, ByteBuf entry, boolean 
rollLog) throws IOExcep
 
 /**
  * if disk of the logChannel is full or if the entrylog limit is
- * reached of if the logchannel is not initialized, then
+ * reached or if the logchannel is not initialized, then
  * createNewLog. If allDisks are full then proceed with the current
  * logChannel, since Bookie must have turned to readonly mode and
  * the addEntry traffic would be from GC and it is ok to proceed in
  * this case.
  */
 if ((diskFull && (!allDisksFull)) || reachEntryLogLimit || 
(logChannel == null)) {
-flushCurrentLog(logChannel, false);
+if (logChannel != null) {
+logChannel.flushAndForceWriteIfRegularFlush(false);
 
 Review comment:
   this is added because with this commit 
https://github.com/apache/bookkeeper/commit/30261eae3fd8ab25239d57cfb86a200d5f7b6b7d#diff-66d58c001864b142c348f9191407c410
 it was deleted and it was a mistake.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180942869
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
+: 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+  

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180941785
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -173,7 +175,7 @@ public void checkpoint(final Checkpoint checkpoint) throws 
IOException {
 // it means bytes might live at current active entry log, we need
 // roll current entry log and then issue checkpoint to underlying
 // interleaved ledger storage.
-entryLogger.rollLog();
+entryLogger.rollLogs();
 
 Review comment:
   ok..anyhow, it is a minor change and I don't have strong opinion on this, 
I'll move rollLogs method in interface to prepareSortedLedgerStorageCheckpoint.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180941835
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
 
 Review comment:
   ok..anyhow, it is a minor change and I don't have strong opinion on this, 
I'll move rollLogs method in interface to prepareSortedLedgerStorageCheckpoint.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180935161
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
 
 Review comment:
   Hey @sijie it is not much of a change here. There are minor changes here and 
these changes make code more organized and appropriate.
   
   1) I removed the 

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180918911
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180905195
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180898594
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
 
 Review comment:
   moved flushCurrentLogs and flushRotatedLogs from EntryLogManager interface 
to EntryLogManagerBase and added flush method in interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180894219
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
 
 Review comment:
   1) in both the cases (singleentrylog and entrylogperledger) 
rotatedLogChannels are dealt similarly. So it makes sense to have this variable 
and related methods (flushRotatedLogs) in Base class.
   2) I dont see it to be much of an issue. Anyhow, I'll change it to List 
interface, but it is incorrect to have non-synchronized version of List for 
rotatedLogChannels. I'll use CopyOnWriteArrayList, so that the list can be 
modified while its iterator is being used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180670849
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180670849
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180669426
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
 
 Review comment:
   rollLogs is not used in memtable flush related logic, but it is used in 
sortedledgerstorage.checkpoint logic. You made another comment related to 
rollLogs, I'm copying the same response..
   
   i'm flexible here to make the change but
   
   1) to begin with, from what i understand, this check (numBytesFlushed > 0) 
and rollLogs call is not needed. Since entries which come before checkpoint are 
already flushed from memtable (because of the way 
sortedledgerstorage.checkpoint) is called.
   2) im not super convinced with moving from rollLogs method in interface to 
prepareCheckpoint, because this has meaning/use only for sortedledgerstorage 
and for single entrylog manager and having such a generic method name 
"prepareCheckpoint" seems to be a stretch.
   3)in the method signature - "void prepareCheckpoint(Checkpoint checkpoint, 
long numBytesFlushedBetweenCheckpoint)" 'checkpoint' is not needed and we can 
remove it. parameter name 'numBytesFlushedBetweenCheckpoint' is not appropriate 
since it is just bytes added between previous flush and this checkpoint.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180649257
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180658981
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -880,39 +1195,6 @@ protected ByteBuf initialValue() throws Exception {
 }
 };
 
-public synchronized long addEntry(long ledger, ByteBuf entry, boolean 
rollLog) throws IOException {
-if (null == logChannel) {
-// log channel can be null because the file is deferred to be 
created when no writable ledger directory
-// is available.
-createNewLog();
-}
-
-int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
-boolean reachEntryLogLimit =
-rollLog ? reachEntryLogLimit(entrySize) : 
readEntryLogHardLimit(entrySize);
-// Create new log if logSizeLimit reached or current disk is full
-boolean createNewLog = shouldCreateNewEntryLog.get();
 
 Review comment:
   i removed the need of LedgerDirsListener in EntryLogger. Instead in 
EntryLogManagerBase.addEntry, I check if dir is full
   
   boolean diskFull = (logChannel == null) ? false
   : 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
   
   and diskFull will be used for determining if newlog needs to be created or 
not


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180649257
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
-/**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
-
-// Write the ledgers map into several batches
-
-final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-try {
-ledgersMap.forEach(new BiConsumerLong() {
-int remainingLedgers = numberOfLedgers;
-boolean startNewBatch = true;
-int remainingInBatch = 0;
-
-@Override
-public void accept(long ledgerId, long size) {
-if (startNewBatch) {
-int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-serializedMap.clear();
-serializedMap.writeInt(ledgerMapSize - 4);
-serializedMap.writeLong(INVALID_LID);
-serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-serializedMap.writeInt(batchSize);
-
-startNewBatch = false;
-remainingInBatch = batchSize;
-}
-// Dump the ledger in the current batch
-serializedMap.writeLong(ledgerId);
-serializedMap.writeLong(size);
---remainingLedgers;
-
-if (--remainingInBatch == 0) {
-// Close current batch
-try {
-entryLogChannel.write(serializedMap);
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-
-startNewBatch = true;
-}
-}
-});
-} catch (RuntimeException e) {
-if (e.getCause() instanceof IOException) {
-throw (IOException) e.getCause();
-} else {
-throw e;
-}
-} finally {
-serializedMap.release();
-}
-// Flush the ledger's map out before we write the header.
-// Otherwise the header might point to something that is not fully 
written
-entryLogChannel.flush();
-
-// Update the headers with the map offset and count of ledgers
-ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-mapInfo.putLong(ledgerMapOffset);
-mapInfo.putInt(numberOfLedgers);
-mapInfo.flip();
-entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-}
 
 /**
  * An allocator pre-allocates entry log files.
  */
 class EntryLoggerAllocator {
 
 private long preallocatedLogId;
-private Future preallocation = null;
+Future preallocation = null;
 private ExecutorService allocatorExecutor;
-private final Object createEntryLogLock = new Object();
-private final Object createCompactionLogLock = new Object();
 
 EntryLoggerAllocator(long logId) {
 preallocatedLogId = logId;
 allocatorExecutor = Executors.newSingleThreadExecutor();
 }
 
-BufferedLogChannel createNewLog() throws IOException {
-synchronized (createEntryLogLock) {
-BufferedLogChannel bc;
-if (!entryLogPreAllocationEnabled){
-// create a new log directly
-bc = allocateNewLog();
-return bc;
-} else {
-// allocate directly to response request
-if (null == preallocation){
-bc = allocateNewLog();
+synchronized long getPreallocatedLogId(){
+return preallocatedLogId;
+}
+
+synchronized BufferedLogChannel createNewLog() throws IOException {
+BufferedLogChannel bc;
+if (!entryLogPreAllocationEnabled || null == 

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-11 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180642426
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
+: 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+  

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180642016
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -92,13 +99,10 @@
 private final long logId;
 private final EntryLogMetadata entryLogMetadata;
 private final File logFile;
+private Long ledgerIdAssigned = UNASSIGNED_LEDGERID;
 
 Review comment:
   changed it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180593414
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
+: 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+  

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180590406
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
+: 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+  

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180588670
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
+
+/*
+ * force close current logs.
+ */
+void forceCloseCurrentLogs();
+
+/*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+void prepareEntryMemTableFlush();
+
 /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * this method should be called after doing entrymemtable flush,it 
would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
  *
- * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based 
logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+ * would be called from the same thread.
  *
+ * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+ * after this commit method.
  */
-if (entryLogPerLedgerEnabled) {
-flushCurrentLog();
-}
+boolean commitEntryMemTableFlush() throws IOException;
 }
 
-void flushRotatedLogs() throws IOException {
-List channels = null;
-long flushedLogId = INVALID_LID;
-synchronized (this) {
-channels = logChannelsToFlush;
-logChannelsToFlush = null;
+abstract class EntryLogManagerBase implements EntryLogManager {
+final Set rotatedLogChannels;
+
+EntryLogManagerBase() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
 }
-if (null == channels) {
-return;
+
+/*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+@Override
+public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+: readEntryLogHardLimit(logChannel, entrySize);
+// Create new log if logSizeLimit reached or current disk is full
+boolean diskFull = (logChannel == null) ? false
+: 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+  

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180584833
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
 }
 }
 
-/**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
-void checkpoint() throws IOException {
-flushRotatedLogs();
+interface EntryLogManager {
+
+/*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * flush rotated logs.
+ */
+void flushRotatedLogs() throws IOException;
+
+/*
+ * flush current logs.
+ */
+void flushCurrentLogs() throws IOException;
+
+/*
+ * close current logs.
+ */
+void closeCurrentLogs() throws IOException;
 
 Review comment:
   will do


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180583759
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -120,21 +124,104 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 return entryLogMetadata.getLedgersMap();
 }
 
+public Long getLedgerIdAssigned() {
+return ledgerIdAssigned;
+}
+
+public void setLedgerIdAssigned(Long ledgerId) {
+this.ledgerIdAssigned = ledgerId;
+}
+
 @Override
 public String toString() {
 return MoreObjects.toStringHelper(BufferedChannel.class)
 .add("logId", logId)
 .add("logFile", logFile)
+.add("ledgerIdAssigned", ledgerIdAssigned)
 .toString();
 }
+
+/**
+ * Append the ledger map at the end of the entry log.
+ * Updates the entry log file header with the offset and size of the 
map.
+ */
+private void appendLedgersMap() throws IOException {
+
+long ledgerMapOffset = this.position();
+
+ConcurrentLongLongHashMap ledgersMap = this.getLedgersMap();
+int numberOfLedgers = (int) ledgersMap.size();
+
+// Write the ledgers map into several batches
+
+final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
+
+try {
+ledgersMap.forEach(new BiConsumerLong() {
+int remainingLedgers = numberOfLedgers;
+boolean startNewBatch = true;
+int remainingInBatch = 0;
+
+@Override
+public void accept(long ledgerId, long size) {
+if (startNewBatch) {
+int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
+int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
+
+serializedMap.clear();
+serializedMap.writeInt(ledgerMapSize - 4);
+serializedMap.writeLong(INVALID_LID);
+serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+serializedMap.writeInt(batchSize);
+
+startNewBatch = false;
+remainingInBatch = batchSize;
+}
+// Dump the ledger in the current batch
+serializedMap.writeLong(ledgerId);
+serializedMap.writeLong(size);
+--remainingLedgers;
+
+if (--remainingInBatch == 0) {
+// Close current batch
+try {
+write(serializedMap);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+startNewBatch = true;
+}
+}
+});
+} catch (RuntimeException e) {
+if (e.getCause() instanceof IOException) {
+throw (IOException) e.getCause();
+} else {
+throw e;
+}
+} finally {
+serializedMap.release();
+}
+// Flush the ledger's map out before we write the header.
+// Otherwise the header might point to something that is not fully
+// written
+super.flush();
+
+// Update the headers with the map offset and count of ledgers
+ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
+mapInfo.putLong(ledgerMapOffset);
+mapInfo.putInt(numberOfLedgers);
+mapInfo.flip();
+this.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
+}
 }
 
-volatile File currentDir;
 private final LedgerDirsManager ledgerDirsManager;
 private final boolean entryLogPerLedgerEnabled;
-private final AtomicBoolean shouldCreateNewEntryLog = new 
AtomicBoolean(false);
 
-private volatile long leastUnflushedLogId;
+RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180583624
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -84,6 +88,9 @@
  */
 public class EntryLogger {
 private static final Logger LOG = 
LoggerFactory.getLogger(EntryLogger.class);
+static final Long UNASSIGNED_LEDGERID = Long.valueOf(-1);
 
 Review comment:
   changed it to long


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180581487
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
 ##
 @@ -54,7 +49,7 @@ protected boolean removeEntryLog(long entryLogId) {
 }
 
 @Override
-public synchronized long addEntry(long ledger, ByteBuffer entry) throws 
IOException {
+public synchronized long addEntry(Long ledgerId, ByteBuffer entry) throws 
IOException {
 
 Review comment:
   ok..will revert it back to long primitive datatype in interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180565394
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
 ##
 @@ -93,16 +98,16 @@ public void testCreateNewLog() throws Exception {
 // Extracted from createNewLog()
 String logFileName = Long.toHexString(1) + ".log";
 File dir = ledgerDirsManager.pickRandomWritableDir();
-LOG.info("Picked this directory: " + dir);
+LOG.info("Picked this directory: {}", dir);
 File newLogFile = new File(dir, logFileName);
 newLogFile.createNewFile();
 
 EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
 // Calls createNewLog, and with the number of directories we
 // are using, if it picks one at random it will fail.
-el.createNewLog();
-LOG.info("This is the current log id: " + el.getCurrentLogId());
-assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+((EntryLogManagerBase) el.entryLogManager).createNewLog(0L);
 
 Review comment:
   1) I can add simple getter for EntryLogManager in EntryLogger, but wondering 
how would it help us in mocking if inside EntryLogger class direct reference of 
EntryLogger instance is used and using getter of EntryLogManager in all the 
places it is used in EntryLogger class is a stretch. 
   2) if mocking is the requirement here then i can create 
initializeEntryLogManager method, which would initialize the EntryLogManager 
depending on the config, and for mocking purpose that method can be mocked to 
return what we want.
   3) ok. in some places I wanted to test  getPreviousAllocatedEntryLogId, but 
it seems in this test it is not appropriate to change it to 
getPreviousAllocatedEntryLogId. will revert it back.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180555218
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -173,7 +175,7 @@ public void checkpoint(final Checkpoint checkpoint) throws 
IOException {
 // it means bytes might live at current active entry log, we need
 // roll current entry log and then issue checkpoint to underlying
 // interleaved ledger storage.
-entryLogger.rollLog();
+entryLogger.rollLogs();
 
 Review comment:
   @sijie i'm flexible here to make the change but
   
   1) to begin with, from what i understand, this check (numBytesFlushed > 0) 
and rollLogs call is not needed. Since entries which come before checkpoint are 
already flushed from memtable (because of the way 
sortedledgerstorage.checkpoint) is called.
   2) im not super convinced with moving from rollLogs method in interface to 
prepareCheckpoint, because this has meaning/use only for sortedledgerstorage 
and for single entrylog manager and having such a generic method name 
"prepareCheckpoint" seems to be stretch.
   3) in the method signature - "void prepareCheckpoint(Checkpoint checkpoint, 
long numBytesFlushedBetweenCheckpoint)" 'checkpoint' is not needed and we can 
remove it. parameter name 'numBytesFlushedBetweenCheckpoint' is not appropriate 
since it is just bytes added between previous flush and this checkpoint.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180555218
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -173,7 +175,7 @@ public void checkpoint(final Checkpoint checkpoint) throws 
IOException {
 // it means bytes might live at current active entry log, we need
 // roll current entry log and then issue checkpoint to underlying
 // interleaved ledger storage.
-entryLogger.rollLog();
+entryLogger.rollLogs();
 
 Review comment:
   @sijie i'm flexible here to make the change but
   
   1) to begin with, from what i understand, this check (numBytesFlushed > 0) 
and rollLogs call is not needed. Since entries which come before checkpoint are 
already flushed from memtable (because of the way 
sortedledgerstorage.checkpoint) is called.
   2) im not super convinced with moving from rollLogs method in interface to 
prepareCheckpoint, because this has meaning/use only for sortedledgerstorage 
and for single entrylog manager and having such a generic method name 
"prepareCheckpoint" seems to be a stretch.
   3) in the method signature - "void prepareCheckpoint(Checkpoint checkpoint, 
long numBytesFlushedBetweenCheckpoint)" 'checkpoint' is not needed and we can 
remove it. parameter name 'numBytesFlushedBetweenCheckpoint' is not appropriate 
since it is just bytes added between previous flush and this checkpoint.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-10 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180548567
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -78,6 +79,7 @@ public void initialize(ServerConfiguration conf,
 .setNameFormat("SortedLedgerStorage-%d")
 .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 
2).build());
 this.stateManager = stateManager;
+this.isTransactionalCompactionEnabled = 
conf.getUseTransactionalCompaction();
 
 Review comment:
   yes, it is not needed. Will remove.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-09 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180011787
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -802,88 +881,230 @@ private long readLastLogId(File f) {
 }
 }
 
+interface EntryLogManager {
+/*
+ * acquire lock for this ledger.
+ */
+void acquireLock(Long ledgerId);
+
+/*
+ * acquire lock for this ledger if it is not already available for this
+ * ledger then it will create a new one and then acquire lock.
+ */
+void acquireLockByCreatingIfRequired(Long ledgerId);
+
+/*
+ * release lock for this ledger
+ */
+void releaseLock(Long ledgerId);
+
+/*
+ * sets the logChannel for the given ledgerId. The previous one will be
+ * removed from replicaOfCurrentLogChannels. Previous logChannel will 
be
+ * added to rotatedLogChannels.
+ */
+void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel 
logChannel);
+
+/*
+ * gets the logChannel for the given ledgerId.
+ */
+BufferedLogChannel getCurrentLogForLedger(Long ledgerId);
+
+/*
+ * gets the copy of rotatedLogChannels
+ */
+Set getCopyOfRotatedLogChannels();
+
+/*
+ * gets the copy of replicaOfCurrentLogChannels
+ */
+Set getCopyOfCurrentLogs();
+
+/*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+/*
+ * removes the logChannel from rotatedLogChannels collection
+ */
+void removeFromRotatedLogChannels(BufferedLogChannel 
rotatedLogChannelToRemove);
+
+/*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+File getDirForNextEntryLog(List writableLedgerDirs);
+
+/*
+ * Do the operations required for checkpoint.
+ */
+void checkpoint() throws IOException;
+
+/*
+ * roll entryLogs.
+ */
+void rollLogs() throws IOException;
+
+/*
+ * gets the log id of the current activeEntryLog. for
+ * EntryLogManagerForSingleEntryLog it returns logid of current
+ * activelog, for EntryLogManagerForEntryLogPerLedger it would return
+ * some constant value.
+ */
+long getCurrentLogId();
+}
+
+class EntryLogManagerForSingleEntryLog implements EntryLogManager {
+
+private volatile BufferedLogChannel activeLogChannel;
+private Lock lockForActiveLogChannel;
+private final Set rotatedLogChannels;
+
+EntryLogManagerForSingleEntryLog() {
+rotatedLogChannels = ConcurrentHashMap.newKeySet();
+lockForActiveLogChannel = new ReentrantLock();
+}
+
+/*
+ * since entryLogPerLedger is not enabled, it is just one lock for all
+ * ledgers.
+ */
+@Override
+public void acquireLock(Long ledgerId) {
+lockForActiveLogChannel.lock();
+}
+
+@Override
+public void acquireLockByCreatingIfRequired(Long ledgerId) {
+acquireLock(ledgerId);
+}
+
+@Override
+public void releaseLock(Long ledgerId) {
+lockForActiveLogChannel.unlock();
+}
+
+@Override
+public void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel 
logChannel) {
+acquireLock(ledgerId);
+try {
+BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+activeLogChannel = logChannel;
+if (hasToRotateLogChannel != null) {
+rotatedLogChannels.add(hasToRotateLogChannel);
+}
+} finally {
+releaseLock(ledgerId);
+}
+}
+
+@Override
+public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) {
+return activeLogChannel;
+}
+
+@Override
+public Set getCopyOfRotatedLogChannels() {
+return new HashSet(rotatedLogChannels);
+}
+
+@Override
+public Set getCopyOfCurrentLogs() {
+HashSet copyOfCurrentLogs = new 
HashSet();
+copyOfCurrentLogs.add(activeLogChannel);
+return copyOfCurrentLogs;
+}
+
+@Override
+public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+if ((activeLogChannelTemp != null) && 
(activeLogChannelTemp.getLogId() == entryLogId)) {
+return activeLogChannelTemp;
+}
+   

[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-09 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180010928
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
 ##
 @@ -100,9 +104,9 @@ public void testCreateNewLog() throws Exception {
 EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
 // Calls createNewLog, and with the number of directories we
 // are using, if it picks one at random it will fail.
-el.createNewLog();
-LOG.info("This is the current log id: " + el.getCurrentLogId());
-assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+el.createNewLog(0L);
+LOG.info("This is the current log id: " + 
el.getPreviousAllocatedEntryLogId());
 
 Review comment:
   in all these places i considered whether preallocation is enabled or not 
appropriately and  getPrevisousAllocatedEntryLogId is the appropriate one to 
validate the cases here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-09 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180010398
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
 ##
 @@ -54,7 +49,7 @@ protected boolean removeEntryLog(long entryLogId) {
 }
 
 @Override
-public synchronized long addEntry(long ledger, ByteBuffer entry) throws 
IOException {
+public synchronized long addEntry(Long ledgerId, ByteBuffer entry) throws 
IOException {
 
 Review comment:
   in EntryLogManagerForEntryLogPerLedger I maintain couple of datastructures, 
where key for HashMap is ledgerId and hence Long wrapper class is needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-09 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180007829
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -393,12 +513,37 @@ public BufferedReadChannel getFromChannels(long logId) {
  *
  * @return least unflushed log id.
  */
-synchronized long getLeastUnflushedLogId() {
-return leastUnflushedLogId;
+long getLeastUnflushedLogId() {
+return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
 }
 
-synchronized long getCurrentLogId() {
-return logChannel.getLogId();
+long getPreviousAllocatedEntryLogId() {
+return entryLoggerAllocator.getPreallocatedLogId();
+}
+
+boolean rollLogsIfEntryLogLimitReached() throws IOException {
 
 Review comment:
   changed EntryLogManager API as we discussed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-09 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180007729
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
 ##
 @@ -100,9 +104,9 @@ public void testCreateNewLog() throws Exception {
 EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
 // Calls createNewLog, and with the number of directories we
 // are using, if it picks one at random it will fail.
-el.createNewLog();
-LOG.info("This is the current log id: " + el.getCurrentLogId());
-assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+el.createNewLog(0L);
+LOG.info("This is the current log id: " + 
el.getPreviousAllocatedEntryLogId());
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-09 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180006729
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -116,21 +121,103 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 return entryLogMetadata.getLedgersMap();
 }
 
+public Long getLedgerId() {
+return ledgerId;
+}
+
+public void setLedgerId(Long ledgerId) {
+this.ledgerId = ledgerId;
+}
+
 @Override
 public String toString() {
 return MoreObjects.toStringHelper(BufferedChannel.class)
 
 Review comment:
   changed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-09 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180006501
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -116,21 +121,103 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 return entryLogMetadata.getLedgersMap();
 }
 
+public Long getLedgerId() {
+return ledgerId;
+}
+
+public void setLedgerId(Long ledgerId) {
 
 Review comment:
   its true that ledgerId is assigned in the case of entrylogperledger and it 
wont be changed. But it wouldn't be possible to assign the ledgerId during the 
creation of BufferedLogChannel because of preallocation optimization. So there 
has to be setter for ledgerIdAssigned in BufferedLogChannel.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-09 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180004935
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -83,18 +88,18 @@
  */
 public class EntryLogger {
 private static final Logger LOG = 
LoggerFactory.getLogger(EntryLogger.class);
+private static final Long INVALID_LEDGERID = Long.valueOf(-1);
+// log file suffix
+private static final String LOG_FILE_SUFFIX = ".log";
 
 static class BufferedLogChannel extends BufferedChannel {
 private final long logId;
 private final EntryLogMetadata entryLogMetadata;
 private final File logFile;
+private Long ledgerId = INVALID_LEDGERID;
 
 Review comment:
   changed it to 'ledgerIdAssigned'


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-04-09 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180004899
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -83,18 +88,18 @@
  */
 public class EntryLogger {
 private static final Logger LOG = 
LoggerFactory.getLogger(EntryLogger.class);
+private static final Long INVALID_LEDGERID = Long.valueOf(-1);
 
 Review comment:
   changed it to UNASSIGNED_LEDGERID


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-27 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177590256
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -209,16 +211,39 @@ public void onSizeLimitReached(final Checkpoint cp) 
throws IOException {
 public void run() {
 try {
 LOG.info("Started flushing mem table.");
-long logIdBeforeFlush = entryLogger.getCurrentLogId();
+long prevAllocLogIdBeforeFlush = 
entryLogger.getPreviousAllocatedEntryLogId();
 
 Review comment:
   "This change is not correct. Because it should be "current" log id, not 
"previous allocated entry log id". That says the behavior is different between 
preallocation enabled and disabled."
   
   @sijie why would you say that behavior would be different between 
preallocation enabled and disabled? Is it because of async nature of 
preallocation?
   
   Ok, I think I can do the following, which would solve 2 problems here.
   
   I can add "getCurrentLogId" to the EntryLogManager Interface. For 
EntryLogManagerForSingleEntryLog implementation it would give the logId of the 
current activelog (which is current behavior). For 
EntryLogManagerForEntryLogPerLedger it would give some constant value all the 
times and it should be fine in entryLogPerLedger case, because checkpoint is 
not made when entrylog is rotated but instead SyncThread drives it 
periodically. 
   
   Also it would remove the need of extra condition regarding 
"isTransactionalCompactionEnabled" which I added in this change.
   
   So it would be clean and straightforward.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-26 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177236857
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -264,6 +274,8 @@ public EntryLogger(ServerConfiguration conf,
 // but the protocol varies so an exact value is difficult to determine
 this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
 this.ledgerDirsManager = ledgerDirsManager;
+this.conf = conf;
+entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
 
 Review comment:
   hmm..I considered moving EntryLogManager, EntryLogManagerForSingleEntryLog 
and EntryLogManagerForEntryLogPerLedger out off EntryLogger, but I realized 
that there is circular dependency. EntryLogger has a EntryLogManager reference, 
and for certain functionalities of EntryLogManager, it need to have reference 
of EntryLogger to call its methods in EntryLogManager. So I don’t see much 
benefits other than just creating new files and new classes (which I'm ok with 
if it is must needed).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-26 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177174411
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -513,78 +519,86 @@ void createNewLog() throws IOException {
 EntryLoggerAllocator getEntryLoggerAllocator() {
 return entryLoggerAllocator;
 }
+
 /**
  * Append the ledger map at the end of the entry log.
  * Updates the entry log file header with the offset and size of the map.
  */
-private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-long ledgerMapOffset = entryLogChannel.position();
-
-ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-int numberOfLedgers = (int) ledgersMap.size();
+private void appendLedgersMap(Long ledgerId) throws IOException {
 
 Review comment:
   will move this appendLedgersMap method to BufferedLogChannel. Because of 
that explicit need of converting from ledgerid to logchannel and 
synchronization wont be required.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-23 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176893779
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) 
throws IOException {
 public void run() {
 try {
 LOG.info("Started flushing mem table.");
-long logIdBeforeFlush = entryLogger.getCurrentLogId();
 memTable.flush(SortedLedgerStorage.this);
-long logIdAfterFlush = entryLogger.getCurrentLogId();
 // in any case that an entry log reaches the limit, we 
roll the log and start checkpointing.
 // if a memory table is flushed spanning over two entry 
log files, we also roll log. this is
 // for performance consideration: since we don't wanna 
checkpoint a new log file that ledger
 // storage is writing to.
-if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush 
!= logIdBeforeFlush) {
-LOG.info("Rolling entry logger since it reached size 
limitation");
-entryLogger.rollLog();
+if (entryLogger.rollLogsIfEntryLogLimitReached()) {
 
 Review comment:
   brought back rollLogs 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-23 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176881392
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
 ##
 @@ -584,15 +584,6 @@ public void 
testWithDiskFullReadOnlyDisabledOrForceGCAllowDisabled() throws Exce
 } catch (NoWritableLedgerDirException e) {
 // expected
 }
-
 
 Review comment:
   in the existing implementation, when Bookie is initialized, it will 
eventually call EntryLogger.initialize() -> EntryLogger.createNewLog() and it 
would fail with exception because of 
https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java#L184
 . But now with this change we are not creating newlog during initialization of 
Bookie ( and EntryLogger), EntryLog would be created only after receiving first 
EntryLogger.addEntry call, hence Bookie initialization would succeed in this 
case though IsForceGCAllowWhenNoSpace is set to false.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-23 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176653861
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) 
throws IOException {
 public void run() {
 try {
 LOG.info("Started flushing mem table.");
-long logIdBeforeFlush = entryLogger.getCurrentLogId();
 memTable.flush(SortedLedgerStorage.this);
-long logIdAfterFlush = entryLogger.getCurrentLogId();
 // in any case that an entry log reaches the limit, we 
roll the log and start checkpointing.
 // if a memory table is flushed spanning over two entry 
log files, we also roll log. this is
 // for performance consideration: since we don't wanna 
checkpoint a new log file that ledger
 // storage is writing to.
-if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush 
!= logIdBeforeFlush) {
-LOG.info("Rolling entry logger since it reached size 
limitation");
-entryLogger.rollLog();
+if (entryLogger.rollLogsIfEntryLogLimitReached()) {
 
 Review comment:
   i don't think it is significant to have any perf impact. First of all this 
will be called once EntryMemTable reaches size limit, and in this method I'm 
getting a collection of active entrylogs (in this case just one) and checks its 
size. For entrylogperledger this collection is required but for single entrylog 
it might not be required, but I want common implementation and it is called 
just once per EntryMemTable size limit, so it is not a thing to be concerned 
about.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-23 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176652302
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 ##
 @@ -167,14 +167,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) 
throws IOException {
 
 @Override
 public void checkpoint(final Checkpoint checkpoint) throws IOException {
-long numBytesFlushed = memTable.flush(this, checkpoint);
-if (numBytesFlushed > 0) {
-// if bytes are added between previous flush and this checkpoint,
-// it means bytes might live at current active entry log, we need
-// roll current entry log and then issue checkpoint to underlying
-// interleaved ledger storage.
-entryLogger.rollLog();
-}
+memTable.flush(this, checkpoint);
 
 Review comment:
   First of all I’m not sure, why this is added with this commit 
https://github.com/apache/bookkeeper/commit/81cbba3cf620f2a6df48c0da6ee1ac019de24fbc.
 SortedLedgerStorage.checkpoint should be called from onSizeLimitReached’s 
scheduler. In onSizeLimitReached’s scheduler, memtable is flushed completely 
and if entrylog has reached its capacity then it will roll log. So here by the 
time startCheckpoint for the checkpoint cp is called all the entries must be 
flushed from memtable to entry log and the entry log must be rolled. So in this 
checkpoint method, the entries which come before checkpoint cp are already 
added to entry log and rolled over. So I don’t see the need of this extra 
rollLog call.
   
   Mainly I removed rollLog method, instead I introduced 
rollLogsIfEntryLogLimitReached, which is not appropriate here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-23 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176651575
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -292,12 +308,28 @@ public EntryLogger(ServerConfiguration conf,
 logId = lastLogId;
 }
 }
-this.leastUnflushedLogId = logId + 1;
+this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId 
+ 1);
 this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
-this.conf = conf;
-this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
-
-initialize();
+if (entryLogPerLedgerEnabled) {
+this.entryLogManager = new EntryLogManagerForSingleEntryLog() {
+@Override
+public void checkpoint() throws IOException {
+/*
+ * In the case of entryLogPerLedgerEnabled we need to flush
+ * both rotatedlogs and currentlogs. This is needed because
+ * syncThread periodically does checkpoint and at this time
+ * all the logs should be flushed.
+ *
+ */
+if (entryLogPerLedgerEnabled) {
 
 Review comment:
   nit: it should be simply, (will fix it in next iteration)
   ```
   public void checkpoint() throws IOException {
   flushCurrentLogs();
   super.checkpoint();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-23 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176651575
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -292,12 +308,28 @@ public EntryLogger(ServerConfiguration conf,
 logId = lastLogId;
 }
 }
-this.leastUnflushedLogId = logId + 1;
+this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId 
+ 1);
 this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
-this.conf = conf;
-this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
-
-initialize();
+if (entryLogPerLedgerEnabled) {
+this.entryLogManager = new EntryLogManagerForSingleEntryLog() {
+@Override
+public void checkpoint() throws IOException {
+/*
+ * In the case of entryLogPerLedgerEnabled we need to flush
+ * both rotatedlogs and currentlogs. This is needed because
+ * syncThread periodically does checkpoint and at this time
+ * all the logs should be flushed.
+ *
+ */
+if (entryLogPerLedgerEnabled) {
 
 Review comment:
   nit: it should be simply, (will fix it in next iteration)
   
   public void checkpoint() throws IOException {
   flushCurrentLogs();
   super.checkpoint();
   }


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176638435
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -802,88 +864,207 @@ private long readLastLogId(File f) {
 }
 }
 
+interface EntryLogManager {
+/*
+ * acquire lock for this ledger.
+ */
+void acquireLock(Long ledgerId);
+
+/*
+ * acquire lock for this ledger if it is not already available for this
+ * ledger then it will create a new one and then acquire lock.
+ */
+void acquireLockByCreatingIfRequired(Long ledgerId);
+
+/*
+ * release lock for this ledger
+ */
+void releaseLock(Long ledgerId);
+
+/*
+ * sets the logChannel for the given ledgerId. The previous one will be
+ * removed from replicaOfCurrentLogChannels. Previous logChannel will 
be
+ * added to rotatedLogChannels.
+ */
+void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel 
logChannel);
 
 Review comment:
   With my changes in this sub-task and subsequent sub-tasks I would like to 
abstract out the code/logic pertaining to singleentrylog/entrylogperledger and 
move it to the implementations of EntryLogManager, but leave the rest of the 
code as it is. With that reasoning, these methods in EntryLogManager and 
variables need to be exposed and EntryLogger class has to work with these 
methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176549929
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -1441,4 +1658,40 @@ static long fileName2LogId(String fileName) {
 static String logId2HexString(long logId) {
 return Long.toHexString(logId);
 }
-}
+
+/**
+ * Datastructure which maintains the status of logchannels. When a
+ * logChannel is created entry of < entryLogId, false > will be made to 
this
+ * sortedmap and when logChannel is rotated and flushed then the entry is
+ * updated to < entryLogId, true > and all the lowest entries with
+ * < entryLogId, true > status will be removed from the sortedmap. So that 
way
+ * we could get least unflushed LogId.
+ *
+ */
+static class RecentEntryLogsStatus {
+private SortedMap entryLogsStatusMap;
 
 Review comment:
   will do


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176549632
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -802,88 +864,207 @@ private long readLastLogId(File f) {
 }
 }
 
+interface EntryLogManager {
+/*
+ * acquire lock for this ledger.
+ */
+void acquireLock(Long ledgerId);
 
 Review comment:
   currently in all the places 'synchronized' is used, but for 
entrylogperledger we should have a lock for each ledger/entrylog. Hence these 
lock methods in EntryLogManager. The contract with this interface is only in 
addEntry call "acquireLockByCreatingIfRequired" (because this call would create 
a lock if it is not created yet and we need this in addEntry method since 
addEntry will be the first call related to that ledger in write path) and for 
all other methods in EntryLogger when they are dealing/writing to that 
particular ledger/entrylog they would call "acquireLock". I don't see why it 
needs to be hidden. EntryLogger needs to synchronize, instead of synchronizing 
on its intrinsic lock it delegates it to EntryLogManager, and EntryLogManager 
deals with inner details but all it needs is call to acquireLock and 
releaseLock with ledgerId.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.

2018-03-22 Thread GitBox
reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176460672
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -116,6 +121,18 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 return entryLogMetadata.getLedgersMap();
 }
 
+public boolean isLedgerDirFull() {
 
 Review comment:
   ok will change that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services