[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181626498 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +810,392 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * flush both current and rotated logs. + */ +void flush() throws IOException; + +/* + * close current logs. + */ +void close() throws IOException; + +/* + * force close current logs. + */ +void forceClose(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + */ +void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException; + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + +/* + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. + * + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final List rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = new CopyOnWriteArrayList(); Review comment: regarding rotatedlogs i don't see the need of having separate implementation for EntryLogManagerForSingleEntryLog and EntryLogManagerForEntryLogPerLedger thats why I kept it in EntryLogManagerBase. I don't have strong opinion on making this collection set or list, thats why I changed it back to List as we discussed (to minimize the change in behavior). But for the sake of correctness and simplicity it has to be concurrentlist. for eg: In this line 'logChannelsToFlush' is called in the log line, which would in turn call toString method of List and toString method of List returns "the string representation consists of a list of the collection's elements in the order they are returned by its iterator". So while iterator is iterating if an entrylog is added to rotatedLogChannels, LinkedList iterator will fail because "if the list is structurally modified at any time after the iterator is created, in any way except through the Iterator's own remove or add methods, the iterator will throw a ConcurrentModificationException." https://g
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181623201 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +810,392 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * flush both current and rotated logs. + */ +void flush() throws IOException; + +/* + * close current logs. + */ +void close() throws IOException; + +/* + * force close current logs. + */ +void forceClose(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + */ +void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException; + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + +/* + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. + * + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final List rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = new CopyOnWriteArrayList(); } -if (null == channels) { -return; + +private final FastThreadLocal sizeBufferForAdd = new FastThreadLocal() { +@Override +protected ByteBuf initialValue() throws Exception { +return Unpooled.buffer(4); +} +}; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog); +ByteBuf sizeBuffer = sizeBufferForAdd.get(); +sizeBuffer.clear(); +sizeBuffer.writeInt(entry.readableBytes()); +logChannel.write(sizeBuffer); + +long pos = logChannel.position(); +logChannel.write(entry); +logChannel.registe
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181623201 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +810,392 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * flush both current and rotated logs. + */ +void flush() throws IOException; + +/* + * close current logs. + */ +void close() throws IOException; + +/* + * force close current logs. + */ +void forceClose(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + */ +void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException; + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + +/* + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. + * + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final List rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = new CopyOnWriteArrayList(); } -if (null == channels) { -return; + +private final FastThreadLocal sizeBufferForAdd = new FastThreadLocal() { +@Override +protected ByteBuf initialValue() throws Exception { +return Unpooled.buffer(4); +} +}; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog); +ByteBuf sizeBuffer = sizeBufferForAdd.get(); +sizeBuffer.clear(); +sizeBuffer.writeInt(entry.readableBytes()); +logChannel.write(sizeBuffer); + +long pos = logChannel.position(); +logChannel.write(entry); +logChannel.registe
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180947780 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -950,19 +939,21 @@ public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOExcep /** * if disk of the logChannel is full or if the entrylog limit is - * reached of if the logchannel is not initialized, then + * reached or if the logchannel is not initialized, then * createNewLog. If allDisks are full then proceed with the current * logChannel, since Bookie must have turned to readonly mode and * the addEntry traffic would be from GC and it is ok to proceed in * this case. */ if ((diskFull && (!allDisksFull)) || reachEntryLogLimit || (logChannel == null)) { -flushCurrentLog(logChannel, false); +if (logChannel != null) { +logChannel.flushAndForceWriteIfRegularFlush(false); Review comment: this is added because with this commit https://github.com/apache/bookkeeper/commit/30261eae3fd8ab25239d57cfb86a200d5f7b6b7d#diff-66d58c001864b142c348f9191407c410L885 it was deleted and it was a mistake. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180947780 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -950,19 +939,21 @@ public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOExcep /** * if disk of the logChannel is full or if the entrylog limit is - * reached of if the logchannel is not initialized, then + * reached or if the logchannel is not initialized, then * createNewLog. If allDisks are full then proceed with the current * logChannel, since Bookie must have turned to readonly mode and * the addEntry traffic would be from GC and it is ok to proceed in * this case. */ if ((diskFull && (!allDisksFull)) || reachEntryLogLimit || (logChannel == null)) { -flushCurrentLog(logChannel, false); +if (logChannel != null) { +logChannel.flushAndForceWriteIfRegularFlush(false); Review comment: this is added because with this commit https://github.com/apache/bookkeeper/commit/30261eae3fd8ab25239d57cfb86a200d5f7b6b7d#diff-66d58c001864b142c348f9191407c410 it was deleted and it was a mistake. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180942869 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false +: ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); +boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs(); + +
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180941785 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -173,7 +175,7 @@ public void checkpoint(final Checkpoint checkpoint) throws IOException { // it means bytes might live at current active entry log, we need // roll current entry log and then issue checkpoint to underlying // interleaved ledger storage. -entryLogger.rollLog(); +entryLogger.rollLogs(); Review comment: ok..anyhow, it is a minor change and I don't have strong opinion on this, I'll move rollLogs method in interface to prepareSortedLedgerStorageCheckpoint. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180941835 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; Review comment: ok..anyhow, it is a minor change and I don't have strong opinion on this, I'll move rollLogs method in interface to prepareSortedLedgerStorageCheckpoint. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180935161 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false Review comment: Hey @sijie it is not much of a change here. There are minor changes here and these changes make code more organized and appropriate. 1) I removed the nee
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180918911 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null == pr
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180905195 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null == pr
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180898594 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; Review comment: moved flushCurrentLogs and flushRotatedLogs from EntryLogManager interface to EntryLogManagerBase and added flush method in interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180894219 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; Review comment: 1) in both the cases (singleentrylog and entrylogperledger) rotatedLogChannels are dealt similarly. So it makes sense to have this variable and related methods (flushRotatedLogs) in Base class. 2) I dont see it to be much of an issue. Anyhow, I'll change it to List interface, but it is incorrect to have non-synchronized version of List for rotatedLogChannels. I'll use CopyOnWriteArrayList, so that the list can be modified while its iterator is being used. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180670849 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null == pr
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180670849 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null == pr
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180669426 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; Review comment: rollLogs is not used in memtable flush related logic, but it is used in sortedledgerstorage.checkpoint logic. You made another comment related to rollLogs, I'm copying the same response.. i'm flexible here to make the change but 1) to begin with, from what i understand, this check (numBytesFlushed > 0) and rollLogs call is not needed. Since entries which come before checkpoint are already flushed from memtable (because of the way sortedledgerstorage.checkpoint) is called. 2) im not super convinced with moving from rollLogs method in interface to prepareCheckpoint, because this has meaning/use only for sortedledgerstorage and for single entrylog manager and having such a generic method name "prepareCheckpoint" seems to be a stretch. 3)in the method signature - "void prepareCheckpoint(Checkpoint checkpoint, long numBytesFlushedBetweenCheckpoint)" 'checkpoint' is not needed and we can remove it. parameter name 'numBytesFlushedBetweenCheckpoint' is not appropriate since it is just bytes added between previous flush and this checkpoint. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180649257 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null == pr
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180658981 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -880,39 +1195,6 @@ protected ByteBuf initialValue() throws Exception { } }; -public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { -if (null == logChannel) { -// log channel can be null because the file is deferred to be created when no writable ledger directory -// is available. -createNewLog(); -} - -int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size -boolean reachEntryLogLimit = -rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize); -// Create new log if logSizeLimit reached or current disk is full -boolean createNewLog = shouldCreateNewEntryLog.get(); Review comment: i removed the need of LedgerDirsListener in EntryLogger. Instead in EntryLogManagerBase.addEntry, I check if dir is full boolean diskFull = (logChannel == null) ? false : ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); and diskFull will be used for determining if newlog needs to be created or not This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180649257 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null == pr
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180642426 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false +: ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); +boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs(); + +
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180642016 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -92,13 +99,10 @@ private final long logId; private final EntryLogMetadata entryLogMetadata; private final File logFile; +private Long ledgerIdAssigned = UNASSIGNED_LEDGERID; Review comment: changed it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180593414 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false +: ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); +boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs(); + +
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180590406 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false +: ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); +boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs(); + +
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180588670 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false +: ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); +boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs(); + +
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180584833 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; Review comment: will do This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180565394 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java ## @@ -93,16 +98,16 @@ public void testCreateNewLog() throws Exception { // Extracted from createNewLog() String logFileName = Long.toHexString(1) + ".log"; File dir = ledgerDirsManager.pickRandomWritableDir(); -LOG.info("Picked this directory: " + dir); +LOG.info("Picked this directory: {}", dir); File newLogFile = new File(dir, logFileName); newLogFile.createNewFile(); EntryLogger el = new EntryLogger(conf, ledgerDirsManager); // Calls createNewLog, and with the number of directories we // are using, if it picks one at random it will fail. -el.createNewLog(); -LOG.info("This is the current log id: " + el.getCurrentLogId()); -assertTrue("Wrong log id", el.getCurrentLogId() > 1); +((EntryLogManagerBase) el.entryLogManager).createNewLog(0L); Review comment: 1) made this variable entryLogManager private and added getter with package scope for this. 3) ok. in some places I wanted to test getPreviousAllocatedEntryLogId, but it seems in this test it is not appropriate to change it to getPreviousAllocatedEntryLogId. will revert it back. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180583759 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -120,21 +124,104 @@ public ConcurrentLongLongHashMap getLedgersMap() { return entryLogMetadata.getLedgersMap(); } +public Long getLedgerIdAssigned() { +return ledgerIdAssigned; +} + +public void setLedgerIdAssigned(Long ledgerId) { +this.ledgerIdAssigned = ledgerId; +} + @Override public String toString() { return MoreObjects.toStringHelper(BufferedChannel.class) .add("logId", logId) .add("logFile", logFile) +.add("ledgerIdAssigned", ledgerIdAssigned) .toString(); } + +/** + * Append the ledger map at the end of the entry log. + * Updates the entry log file header with the offset and size of the map. + */ +private void appendLedgersMap() throws IOException { + +long ledgerMapOffset = this.position(); + +ConcurrentLongLongHashMap ledgersMap = this.getLedgersMap(); +int numberOfLedgers = (int) ledgersMap.size(); + +// Write the ledgers map into several batches + +final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; +final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); + +try { +ledgersMap.forEach(new BiConsumerLong() { +int remainingLedgers = numberOfLedgers; +boolean startNewBatch = true; +int remainingInBatch = 0; + +@Override +public void accept(long ledgerId, long size) { +if (startNewBatch) { +int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); +int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; + +serializedMap.clear(); +serializedMap.writeInt(ledgerMapSize - 4); +serializedMap.writeLong(INVALID_LID); +serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); +serializedMap.writeInt(batchSize); + +startNewBatch = false; +remainingInBatch = batchSize; +} +// Dump the ledger in the current batch +serializedMap.writeLong(ledgerId); +serializedMap.writeLong(size); +--remainingLedgers; + +if (--remainingInBatch == 0) { +// Close current batch +try { +write(serializedMap); +} catch (IOException e) { +throw new RuntimeException(e); +} + +startNewBatch = true; +} +} +}); +} catch (RuntimeException e) { +if (e.getCause() instanceof IOException) { +throw (IOException) e.getCause(); +} else { +throw e; +} +} finally { +serializedMap.release(); +} +// Flush the ledger's map out before we write the header. +// Otherwise the header might point to something that is not fully +// written +super.flush(); + +// Update the headers with the map offset and count of ledgers +ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); +mapInfo.putLong(ledgerMapOffset); +mapInfo.putInt(numberOfLedgers); +mapInfo.flip(); +this.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); +} } -volatile File currentDir; private final LedgerDirsManager ledgerDirsManager; private final boolean entryLogPerLedgerEnabled; -private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false); -private volatile long leastUnflushedLogId; +RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apach
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180583624 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -84,6 +88,9 @@ */ public class EntryLogger { private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class); +static final Long UNASSIGNED_LEDGERID = Long.valueOf(-1); Review comment: changed it to long This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180581487 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java ## @@ -54,7 +49,7 @@ protected boolean removeEntryLog(long entryLogId) { } @Override -public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException { +public synchronized long addEntry(Long ledgerId, ByteBuffer entry) throws IOException { Review comment: ok..will revert it back to long primitive datatype in interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180565394 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java ## @@ -93,16 +98,16 @@ public void testCreateNewLog() throws Exception { // Extracted from createNewLog() String logFileName = Long.toHexString(1) + ".log"; File dir = ledgerDirsManager.pickRandomWritableDir(); -LOG.info("Picked this directory: " + dir); +LOG.info("Picked this directory: {}", dir); File newLogFile = new File(dir, logFileName); newLogFile.createNewFile(); EntryLogger el = new EntryLogger(conf, ledgerDirsManager); // Calls createNewLog, and with the number of directories we // are using, if it picks one at random it will fail. -el.createNewLog(); -LOG.info("This is the current log id: " + el.getCurrentLogId()); -assertTrue("Wrong log id", el.getCurrentLogId() > 1); +((EntryLogManagerBase) el.entryLogManager).createNewLog(0L); Review comment: 1) I can add simple getter for EntryLogManager in EntryLogger, but wondering how would it help us in mocking if inside EntryLogger class direct reference of EntryLogger instance is used and using getter of EntryLogManager in all the places it is used in EntryLogger class is a stretch. 2) if mocking is the requirement here then i can create initializeEntryLogManager method, which would initialize the EntryLogManager depending on the config, and for mocking purpose that method can be mocked to return what we want. 3) ok. in some places I wanted to test getPreviousAllocatedEntryLogId, but it seems in this test it is not appropriate to change it to getPreviousAllocatedEntryLogId. will revert it back. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180555218 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -173,7 +175,7 @@ public void checkpoint(final Checkpoint checkpoint) throws IOException { // it means bytes might live at current active entry log, we need // roll current entry log and then issue checkpoint to underlying // interleaved ledger storage. -entryLogger.rollLog(); +entryLogger.rollLogs(); Review comment: @sijie i'm flexible here to make the change but 1) to begin with, from what i understand, this check (numBytesFlushed > 0) and rollLogs call is not needed. Since entries which come before checkpoint are already flushed from memtable (because of the way sortedledgerstorage.checkpoint) is called. 2) im not super convinced with moving from rollLogs method in interface to prepareCheckpoint, because this has meaning/use only for sortedledgerstorage and for single entrylog manager and having such a generic method name "prepareCheckpoint" seems to be a stretch. 3) in the method signature - "void prepareCheckpoint(Checkpoint checkpoint, long numBytesFlushedBetweenCheckpoint)" 'checkpoint' is not needed and we can remove it. parameter name 'numBytesFlushedBetweenCheckpoint' is not appropriate since it is just bytes added between previous flush and this checkpoint. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180555218 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -173,7 +175,7 @@ public void checkpoint(final Checkpoint checkpoint) throws IOException { // it means bytes might live at current active entry log, we need // roll current entry log and then issue checkpoint to underlying // interleaved ledger storage. -entryLogger.rollLog(); +entryLogger.rollLogs(); Review comment: @sijie i'm flexible here to make the change but 1) to begin with, from what i understand, this check (numBytesFlushed > 0) and rollLogs call is not needed. Since entries which come before checkpoint are already flushed from memtable (because of the way sortedledgerstorage.checkpoint) is called. 2) im not super convinced with moving from rollLogs method in interface to prepareCheckpoint, because this has meaning/use only for sortedledgerstorage and for single entrylog manager and having such a generic method name "prepareCheckpoint" seems to be stretch. 3) in the method signature - "void prepareCheckpoint(Checkpoint checkpoint, long numBytesFlushedBetweenCheckpoint)" 'checkpoint' is not needed and we can remove it. parameter name 'numBytesFlushedBetweenCheckpoint' is not appropriate since it is just bytes added between previous flush and this checkpoint. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180548567 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -78,6 +79,7 @@ public void initialize(ServerConfiguration conf, .setNameFormat("SortedLedgerStorage-%d") .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 2).build()); this.stateManager = stateManager; +this.isTransactionalCompactionEnabled = conf.getUseTransactionalCompaction(); Review comment: yes, it is not needed. Will remove. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180011787 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -802,88 +881,230 @@ private long readLastLogId(File f) { } } +interface EntryLogManager { +/* + * acquire lock for this ledger. + */ +void acquireLock(Long ledgerId); + +/* + * acquire lock for this ledger if it is not already available for this + * ledger then it will create a new one and then acquire lock. + */ +void acquireLockByCreatingIfRequired(Long ledgerId); + +/* + * release lock for this ledger + */ +void releaseLock(Long ledgerId); + +/* + * sets the logChannel for the given ledgerId. The previous one will be + * removed from replicaOfCurrentLogChannels. Previous logChannel will be + * added to rotatedLogChannels. + */ +void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel); + +/* + * gets the logChannel for the given ledgerId. + */ +BufferedLogChannel getCurrentLogForLedger(Long ledgerId); + +/* + * gets the copy of rotatedLogChannels + */ +Set getCopyOfRotatedLogChannels(); + +/* + * gets the copy of replicaOfCurrentLogChannels + */ +Set getCopyOfCurrentLogs(); + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * removes the logChannel from rotatedLogChannels collection + */ +void removeFromRotatedLogChannels(BufferedLogChannel rotatedLogChannelToRemove); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * gets the log id of the current activeEntryLog. for + * EntryLogManagerForSingleEntryLog it returns logid of current + * activelog, for EntryLogManagerForEntryLogPerLedger it would return + * some constant value. + */ +long getCurrentLogId(); +} + +class EntryLogManagerForSingleEntryLog implements EntryLogManager { + +private volatile BufferedLogChannel activeLogChannel; +private Lock lockForActiveLogChannel; +private final Set rotatedLogChannels; + +EntryLogManagerForSingleEntryLog() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); +lockForActiveLogChannel = new ReentrantLock(); +} + +/* + * since entryLogPerLedger is not enabled, it is just one lock for all + * ledgers. + */ +@Override +public void acquireLock(Long ledgerId) { +lockForActiveLogChannel.lock(); +} + +@Override +public void acquireLockByCreatingIfRequired(Long ledgerId) { +acquireLock(ledgerId); +} + +@Override +public void releaseLock(Long ledgerId) { +lockForActiveLogChannel.unlock(); +} + +@Override +public void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel) { +acquireLock(ledgerId); +try { +BufferedLogChannel hasToRotateLogChannel = activeLogChannel; +activeLogChannel = logChannel; +if (hasToRotateLogChannel != null) { +rotatedLogChannels.add(hasToRotateLogChannel); +} +} finally { +releaseLock(ledgerId); +} +} + +@Override +public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) { +return activeLogChannel; +} + +@Override +public Set getCopyOfRotatedLogChannels() { +return new HashSet(rotatedLogChannels); +} + +@Override +public Set getCopyOfCurrentLogs() { +HashSet copyOfCurrentLogs = new HashSet(); +copyOfCurrentLogs.add(activeLogChannel); +return copyOfCurrentLogs; +} + +@Override +public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) { +BufferedLogChannel activeLogChannelTemp = activeLogChannel; +if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() == entryLogId)) { +return activeLogChannelTemp; +} +
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180010928 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java ## @@ -100,9 +104,9 @@ public void testCreateNewLog() throws Exception { EntryLogger el = new EntryLogger(conf, ledgerDirsManager); // Calls createNewLog, and with the number of directories we // are using, if it picks one at random it will fail. -el.createNewLog(); -LOG.info("This is the current log id: " + el.getCurrentLogId()); -assertTrue("Wrong log id", el.getCurrentLogId() > 1); +el.createNewLog(0L); +LOG.info("This is the current log id: " + el.getPreviousAllocatedEntryLogId()); Review comment: in all these places i considered whether preallocation is enabled or not appropriately and getPrevisousAllocatedEntryLogId is the appropriate one to validate the cases here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180010398 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java ## @@ -54,7 +49,7 @@ protected boolean removeEntryLog(long entryLogId) { } @Override -public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException { +public synchronized long addEntry(Long ledgerId, ByteBuffer entry) throws IOException { Review comment: in EntryLogManagerForEntryLogPerLedger I maintain couple of datastructures, where key for HashMap is ledgerId and hence Long wrapper class is needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180007829 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -393,12 +513,37 @@ public BufferedReadChannel getFromChannels(long logId) { * * @return least unflushed log id. */ -synchronized long getLeastUnflushedLogId() { -return leastUnflushedLogId; +long getLeastUnflushedLogId() { +return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId(); } -synchronized long getCurrentLogId() { -return logChannel.getLogId(); +long getPreviousAllocatedEntryLogId() { +return entryLoggerAllocator.getPreallocatedLogId(); +} + +boolean rollLogsIfEntryLogLimitReached() throws IOException { Review comment: changed EntryLogManager API as we discussed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180007729 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java ## @@ -100,9 +104,9 @@ public void testCreateNewLog() throws Exception { EntryLogger el = new EntryLogger(conf, ledgerDirsManager); // Calls createNewLog, and with the number of directories we // are using, if it picks one at random it will fail. -el.createNewLog(); -LOG.info("This is the current log id: " + el.getCurrentLogId()); -assertTrue("Wrong log id", el.getCurrentLogId() > 1); +el.createNewLog(0L); +LOG.info("This is the current log id: " + el.getPreviousAllocatedEntryLogId()); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180006729 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -116,21 +121,103 @@ public ConcurrentLongLongHashMap getLedgersMap() { return entryLogMetadata.getLedgersMap(); } +public Long getLedgerId() { +return ledgerId; +} + +public void setLedgerId(Long ledgerId) { +this.ledgerId = ledgerId; +} + @Override public String toString() { return MoreObjects.toStringHelper(BufferedChannel.class) Review comment: changed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180006501 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -116,21 +121,103 @@ public ConcurrentLongLongHashMap getLedgersMap() { return entryLogMetadata.getLedgersMap(); } +public Long getLedgerId() { +return ledgerId; +} + +public void setLedgerId(Long ledgerId) { Review comment: its true that ledgerId is assigned in the case of entrylogperledger and it wont be changed. But it wouldn't be possible to assign the ledgerId during the creation of BufferedLogChannel because of preallocation optimization. So there has to be setter for ledgerIdAssigned in BufferedLogChannel. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180004935 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -83,18 +88,18 @@ */ public class EntryLogger { private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class); +private static final Long INVALID_LEDGERID = Long.valueOf(-1); +// log file suffix +private static final String LOG_FILE_SUFFIX = ".log"; static class BufferedLogChannel extends BufferedChannel { private final long logId; private final EntryLogMetadata entryLogMetadata; private final File logFile; +private Long ledgerId = INVALID_LEDGERID; Review comment: changed it to 'ledgerIdAssigned' This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180004899 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -83,18 +88,18 @@ */ public class EntryLogger { private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class); +private static final Long INVALID_LEDGERID = Long.valueOf(-1); Review comment: changed it to UNASSIGNED_LEDGERID This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177590256 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -209,16 +211,39 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); -long logIdBeforeFlush = entryLogger.getCurrentLogId(); +long prevAllocLogIdBeforeFlush = entryLogger.getPreviousAllocatedEntryLogId(); Review comment: "This change is not correct. Because it should be "current" log id, not "previous allocated entry log id". That says the behavior is different between preallocation enabled and disabled." @sijie why would you say that behavior would be different between preallocation enabled and disabled? Is it because of async nature of preallocation? Ok, I think I can do the following, which would solve 2 problems here. I can add "getCurrentLogId" to the EntryLogManager Interface. For EntryLogManagerForSingleEntryLog implementation it would give the logId of the current activelog (which is current behavior). For EntryLogManagerForEntryLogPerLedger it would give some constant value all the times and it should be fine in entryLogPerLedger case, because checkpoint is not made when entrylog is rotated but instead SyncThread drives it periodically. Also it would remove the need of extra condition regarding "isTransactionalCompactionEnabled" which I added in this change. So it would be clean and straightforward. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177236857 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -264,6 +274,8 @@ public EntryLogger(ServerConfiguration conf, // but the protocol varies so an exact value is difficult to determine this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500; this.ledgerDirsManager = ledgerDirsManager; +this.conf = conf; +entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled(); Review comment: hmm..I considered moving EntryLogManager, EntryLogManagerForSingleEntryLog and EntryLogManagerForEntryLogPerLedger out off EntryLogger, but I realized that there is circular dependency. EntryLogger has a EntryLogManager reference, and for certain functionalities of EntryLogManager, it need to have reference of EntryLogger to call its methods in EntryLogManager. So I don’t see much benefits other than just creating new files and new classes (which I'm ok with if it is must needed). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177236857 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -264,6 +274,8 @@ public EntryLogger(ServerConfiguration conf, // but the protocol varies so an exact value is difficult to determine this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500; this.ledgerDirsManager = ledgerDirsManager; +this.conf = conf; +entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled(); Review comment: hmm..I considered moving out EntryLogManager, EntryLogManagerForSingleEntryLog and EntryLogManagerForEntryLogPerLedger out of EntryLogger, but I realized that there is circular dependency. EntryLogger has a EntryLogManager reference, and for certain functionalities of EntryLogManager, it need to have reference of EntryLogger to call its methods in EntryLogManager. So I don’t see much benefits other than just creating new files and new classes (which I'm ok with if it is must needed). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177239911 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -78,6 +79,7 @@ public void initialize(ServerConfiguration conf, .setNameFormat("SortedLedgerStorage-%d") .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 2).build()); this.stateManager = stateManager; +this.isTransactionalCompactionEnabled = conf.getUseTransactionalCompaction(); Review comment: this is part of this change itself. Provided more comments for why it is needed - https://github.com/apache/bookkeeper/pull/1281/files#diff-c5b1fb6226c382c12ed9f2dd9e6c8ed2R226 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177236857 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -264,6 +274,8 @@ public EntryLogger(ServerConfiguration conf, // but the protocol varies so an exact value is difficult to determine this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500; this.ledgerDirsManager = ledgerDirsManager; +this.conf = conf; +entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled(); Review comment: hmm..I considered moving out EntryLogManager, EntryLogManagerForSingleEntryLog and EntryLogManagerForEntryLogPerLedger out of EntryLogger, but I realized that there is circular dependency. EntryLogger has a EntryLogManager reference, and for certain functionalities of EntryLogManager, it need to have reference of EntryLogger to call its methods in EntryLogManager. So I don’t see much benefits other than just creating new files and new classes (which I'm ok with). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177191892 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -292,12 +304,37 @@ public EntryLogger(ServerConfiguration conf, logId = lastLogId; } } -this.leastUnflushedLogId = logId + 1; +this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1); Review comment: i kept the logic related to newlog creation and flush logs with in EntryLogger. So it should be here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177188321 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -802,88 +876,217 @@ private long readLastLogId(File f) { } } +interface EntryLogManager { Review comment: will do This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177188354 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -802,88 +876,217 @@ private long readLastLogId(File f) { } } +interface EntryLogManager { +/* + * acquire lock for this ledger. + */ +void acquireLock(Long ledgerId); + +/* + * acquire lock for this ledger if it is not already available for this + * ledger then it will create a new one and then acquire lock. + */ +void acquireLockByCreatingIfRequired(Long ledgerId); + +/* + * release lock for this ledger + */ +void releaseLock(Long ledgerId); + +/* + * sets the logChannel for the given ledgerId. The previous one will be + * removed from replicaOfCurrentLogChannels. Previous logChannel will be + * added to rotatedLogChannels. + */ +void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel); + +/* + * gets the logChannel for the given ledgerId. + */ +BufferedLogChannel getCurrentLogForLedger(Long ledgerId); + +/* + * gets the copy of rotatedLogChannels + */ +Set getCopyOfRotatedLogChannels(); + +/* + * gets the copy of replicaOfCurrentLogChannels + */ +Set getCopyOfCurrentLogs(); + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * removes the logChannel from rotatedLogChannels collection + */ +void removeFromRotatedLogChannels(BufferedLogChannel rotatedLogChannelToRemove); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; +} + +class EntryLogManagerForSingleEntryLog implements EntryLogManager { Review comment: will do This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177174411 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -513,78 +519,86 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } + /** * Append the ledger map at the end of the entry log. * Updates the entry log file header with the offset and size of the map. */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); +private void appendLedgersMap(Long ledgerId) throws IOException { Review comment: will move this appendLedgersMap method to BufferedLogChannel. Because of that explicit need of converting from ledgerid to logchannel and synchronization wont be required. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176893779 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); -long logIdBeforeFlush = entryLogger.getCurrentLogId(); memTable.flush(SortedLedgerStorage.this); -long logIdAfterFlush = entryLogger.getCurrentLogId(); // in any case that an entry log reaches the limit, we roll the log and start checkpointing. // if a memory table is flushed spanning over two entry log files, we also roll log. this is // for performance consideration: since we don't wanna checkpoint a new log file that ledger // storage is writing to. -if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) { -LOG.info("Rolling entry logger since it reached size limitation"); -entryLogger.rollLog(); +if (entryLogger.rollLogsIfEntryLogLimitReached()) { Review comment: brought back rollLogs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176881392 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java ## @@ -584,15 +584,6 @@ public void testWithDiskFullReadOnlyDisabledOrForceGCAllowDisabled() throws Exce } catch (NoWritableLedgerDirException e) { // expected } - Review comment: in the existing implementation, when Bookie is initialized, it will eventually call EntryLogger.initialize() -> EntryLogger.createNewLog() and it would fail with exception because of https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java#L184 . But now with this change we are not creating newlog during initialization of Bookie ( and EntryLogger), EntryLog would be created only after receiving first EntryLogger.addEntry call, hence Bookie initialization would succeed in this case though IsForceGCAllowWhenNoSpace is set to false. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176653861 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); -long logIdBeforeFlush = entryLogger.getCurrentLogId(); memTable.flush(SortedLedgerStorage.this); -long logIdAfterFlush = entryLogger.getCurrentLogId(); // in any case that an entry log reaches the limit, we roll the log and start checkpointing. // if a memory table is flushed spanning over two entry log files, we also roll log. this is // for performance consideration: since we don't wanna checkpoint a new log file that ledger // storage is writing to. -if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) { -LOG.info("Rolling entry logger since it reached size limitation"); -entryLogger.rollLog(); +if (entryLogger.rollLogsIfEntryLogLimitReached()) { Review comment: i don't think it is significant to have any perf impact. First of all this will be called once EntryMemTable reaches size limit, and in this method I'm getting a collection of active entrylogs (in this case just one) and checks its size. For entrylogperledger this collection is required but for single entrylog it might not be required, but I want common implementation and it is called just once per EntryMemTable size limit, so it is not a thing to be concerned about. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176652302 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -167,14 +167,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { @Override public void checkpoint(final Checkpoint checkpoint) throws IOException { -long numBytesFlushed = memTable.flush(this, checkpoint); -if (numBytesFlushed > 0) { -// if bytes are added between previous flush and this checkpoint, -// it means bytes might live at current active entry log, we need -// roll current entry log and then issue checkpoint to underlying -// interleaved ledger storage. -entryLogger.rollLog(); -} +memTable.flush(this, checkpoint); Review comment: First of all I’m not sure, why this is added with this commit https://github.com/apache/bookkeeper/commit/81cbba3cf620f2a6df48c0da6ee1ac019de24fbc. SortedLedgerStorage.checkpoint should be called from onSizeLimitReached’s scheduler. In onSizeLimitReached’s scheduler, memtable is flushed completely and if entrylog has reached its capacity then it will roll log. So here by the time startCheckpoint for the checkpoint cp is called all the entries must be flushed from memtable to entry log and the entry log must be rolled. So in this checkpoint method, the entries which come before checkpoint cp are already added to entry log and rolled over. So I don’t see the need of this extra rollLog call. Mainly I removed rollLog method, instead I introduced rollLogsIfEntryLogLimitReached, which is not appropriate here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176651575 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -292,12 +308,28 @@ public EntryLogger(ServerConfiguration conf, logId = lastLogId; } } -this.leastUnflushedLogId = logId + 1; +this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1); this.entryLoggerAllocator = new EntryLoggerAllocator(logId); -this.conf = conf; -this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled(); - -initialize(); +if (entryLogPerLedgerEnabled) { +this.entryLogManager = new EntryLogManagerForSingleEntryLog() { +@Override +public void checkpoint() throws IOException { +/* + * In the case of entryLogPerLedgerEnabled we need to flush + * both rotatedlogs and currentlogs. This is needed because + * syncThread periodically does checkpoint and at this time + * all the logs should be flushed. + * + */ +if (entryLogPerLedgerEnabled) { Review comment: nit: it should be simply, (will fix it in next iteration) ``` public void checkpoint() throws IOException { flushCurrentLogs(); super.checkpoint(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176651575 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -292,12 +308,28 @@ public EntryLogger(ServerConfiguration conf, logId = lastLogId; } } -this.leastUnflushedLogId = logId + 1; +this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1); this.entryLoggerAllocator = new EntryLoggerAllocator(logId); -this.conf = conf; -this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled(); - -initialize(); +if (entryLogPerLedgerEnabled) { +this.entryLogManager = new EntryLogManagerForSingleEntryLog() { +@Override +public void checkpoint() throws IOException { +/* + * In the case of entryLogPerLedgerEnabled we need to flush + * both rotatedlogs and currentlogs. This is needed because + * syncThread periodically does checkpoint and at this time + * all the logs should be flushed. + * + */ +if (entryLogPerLedgerEnabled) { Review comment: nit: it should be simply, (will fix it in next iteration) public void checkpoint() throws IOException { flushCurrentLogs(); super.checkpoint(); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176638435 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -802,88 +864,207 @@ private long readLastLogId(File f) { } } +interface EntryLogManager { +/* + * acquire lock for this ledger. + */ +void acquireLock(Long ledgerId); + +/* + * acquire lock for this ledger if it is not already available for this + * ledger then it will create a new one and then acquire lock. + */ +void acquireLockByCreatingIfRequired(Long ledgerId); + +/* + * release lock for this ledger + */ +void releaseLock(Long ledgerId); + +/* + * sets the logChannel for the given ledgerId. The previous one will be + * removed from replicaOfCurrentLogChannels. Previous logChannel will be + * added to rotatedLogChannels. + */ +void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel); Review comment: With my changes in this sub-task and subsequent sub-tasks I would like to abstract out the code/logic pertaining to singleentrylog/entrylogperledger and move it to the implementations of EntryLogManager, but leave the rest of the code as it is. With that reasoning, these methods in EntryLogManager and variables need to be exposed and EntryLogger class has to work with these methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176549929 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -1441,4 +1658,40 @@ static long fileName2LogId(String fileName) { static String logId2HexString(long logId) { return Long.toHexString(logId); } -} + +/** + * Datastructure which maintains the status of logchannels. When a + * logChannel is created entry of < entryLogId, false > will be made to this + * sortedmap and when logChannel is rotated and flushed then the entry is + * updated to < entryLogId, true > and all the lowest entries with + * < entryLogId, true > status will be removed from the sortedmap. So that way + * we could get least unflushed LogId. + * + */ +static class RecentEntryLogsStatus { +private SortedMap entryLogsStatusMap; Review comment: will do This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176549632 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -802,88 +864,207 @@ private long readLastLogId(File f) { } } +interface EntryLogManager { +/* + * acquire lock for this ledger. + */ +void acquireLock(Long ledgerId); Review comment: currently in all the places 'synchronized' is used, but for entrylogperledger we should have a lock for each ledger/entrylog. Hence these lock methods in EntryLogManager. The contract with this interface is only in addEntry call "acquireLockByCreatingIfRequired" (because this call would create a lock if it is not created yet and we need this in addEntry method since addEntry will be the first call related to that ledger in write path) and for all other methods in EntryLogger when they are dealing/writing to that particular ledger/entrylog they would call "acquireLock". I don't see why it needs to be hidden. EntryLogger needs to synchronize, instead of synchronizing on its intrinsic lock it delegates it to EntryLogManager, and EntryLogManager deals with inner details but all it needs is call to acquireLock and releaseLock with ledgerId. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
reddycharan commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176460672 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -116,6 +121,18 @@ public ConcurrentLongLongHashMap getLedgersMap() { return entryLogMetadata.getLedgersMap(); } +public boolean isLedgerDirFull() { Review comment: ok will change that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services