[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181924215 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +812,418 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * flush both current and rotated logs. + */ +void flush() throws IOException; + +/* + * close current logs. + */ +void close() throws IOException; + +/* + * force close current logs. + */ +void forceClose(); + +/* + * + */ +void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException; + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + +/* + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); +boolean commitEntryMemTableFlush() throws IOException; +} + +abstract class EntryLogManagerBase implements EntryLogManager { +volatile List rotatedLogChannels; + +private final FastThreadLocal sizeBufferForAdd = new FastThreadLocal() { +@Override +protected ByteBuf initialValue() throws Exception { +return Unpooled.buffer(4); +} +}; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog); +ByteBuf sizeBuffer = sizeBufferForAdd.get(); +sizeBuffer.clear(); +sizeBuffer.writeInt(entry.readableBytes()); +logChannel.write(sizeBuffer); + +long pos = logChannel.position(); +logChannel.write(entry); +logChannel.registerWrittenEntry(ledger, entrySize); + +return (logChannel.getLogId() << 32L) | pos; +} + +boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) { +if (logChannel == null) { +return false; +} +return logChannel.position() + size > logSizeLimit; +} + +boolean readEntryLogHardLimit(BufferedLogChannel
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181630491 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +810,392 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * flush both current and rotated logs. + */ +void flush() throws IOException; + +/* + * close current logs. + */ +void close() throws IOException; + +/* + * force close current logs. + */ +void forceClose(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + */ +void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException; + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + +/* + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. + * + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final List rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = new CopyOnWriteArrayList(); Review comment: > LinkedList iterator will fail because "if the list is structurally modified at any time after the iterator is created, in any way except through the Iterator's own remove or add methods, the iterator will throw a ConcurrentModificationException." I see the access to `rotatedLogChannels` are under synchronized block. so I am not sure when this `ConcurrentModificationException` can happen. If it can happen, please write a test to reproduce it and show your fix actually work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181630193 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +810,392 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * flush both current and rotated logs. + */ +void flush() throws IOException; + +/* + * close current logs. + */ +void close() throws IOException; + +/* + * force close current logs. + */ +void forceClose(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + */ +void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException; + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + +/* + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. + * + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final List rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = new CopyOnWriteArrayList(); } -if (null == channels) { -return; + +private final FastThreadLocal sizeBufferForAdd = new FastThreadLocal() { +@Override +protected ByteBuf initialValue() throws Exception { +return Unpooled.buffer(4); +} +}; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog); +ByteBuf sizeBuffer = sizeBufferForAdd.get(); +sizeBuffer.clear(); +sizeBuffer.writeInt(entry.readableBytes()); +logChannel.write(sizeBuffer); + +long pos = logChannel.position(); +logChannel.write(entry); +
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181599453 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +810,392 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * flush both current and rotated logs. + */ +void flush() throws IOException; + +/* + * close current logs. + */ +void close() throws IOException; + +/* + * force close current logs. + */ +void forceClose(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + */ +void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException; + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + +/* + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. + * + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final List rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = new CopyOnWriteArrayList(); Review comment: based on my understanding, you will use Set for per-ledger manager. why not move `rotatedLogChannels` to single-log manager, and keep the original logic unchanged? because I am not convinced that we need to change this to `CopyOnWriteArrayList`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181599541 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +810,392 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * flush both current and rotated logs. + */ +void flush() throws IOException; + +/* + * close current logs. + */ +void close() throws IOException; + +/* + * force close current logs. + */ +void forceClose(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + */ +void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException; + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + +/* + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. + * + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final List rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = new CopyOnWriteArrayList(); } -if (null == channels) { -return; + +private final FastThreadLocal sizeBufferForAdd = new FastThreadLocal() { +@Override +protected ByteBuf initialValue() throws Exception { +return Unpooled.buffer(4); +} +}; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog); +ByteBuf sizeBuffer = sizeBufferForAdd.get(); +sizeBuffer.clear(); +sizeBuffer.writeInt(entry.readableBytes()); +logChannel.write(sizeBuffer); + +long pos = logChannel.position(); +logChannel.write(entry); +
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180948843 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false Review comment: > I removed the need of LedgerDirsListener and another flag variable - shouldCreateNewEntryLog. Instead I check if the dir is full by calling
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180948843 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false Review comment: > I removed the need of LedgerDirsListener and another flag variable - shouldCreateNewEntryLog. Instead I check if the dir is full by calling
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180947466 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false +: ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); +boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs(); + +
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180945411 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; Review comment: okay that's find, it you want to leave `rollLogs` here, I am fine. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180945411 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; Review comment: okay that's fine, it you want to leave `rollLogs` here, I am fine. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180944411 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; Review comment: > it is incorrect to have non-synchronized version of List for rotatedLogChannels. I'll use CopyOnWriteArrayList, so that the list can be modified while its iterator is being used. why? the original code is synchronizing the rotatedLogChannels list properly. If you just move the logic into the abstraction, this logic should work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180944180 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false +: ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); +boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs(); + +
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180943513 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false Review comment: > it is not much of a change here. There are minor changes here and these changes make code more organized and appropriate. understand you want to reorganize
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180942977 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null ==
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180942749 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null ==
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180524406 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -173,7 +175,7 @@ public void checkpoint(final Checkpoint checkpoint) throws IOException { // it means bytes might live at current active entry log, we need // roll current entry log and then issue checkpoint to underlying // interleaved ledger storage. -entryLogger.rollLog(); +entryLogger.rollLogs(); Review comment: I think `numBytesFlushed > 0` logic is a behavior that specific to single-log manager. `rollLogs` is a not proper term for the meaning here. I would suggest adding a method in the EntryLogManager: `void prepareCheckpoint(Checkpoint checkpoint, long numBytesFlushedBetweenCheckpoints)` -> it means prepare checkpoint in the entry logger. Then logic here can be changed to: ``` long numBytesFlushed = memTable.flush(this, checkpoint); entryLogger.prepareCheckpoint(checkpoint, numBytesFlushed); super.checkpoint(checkpoint); ``` in single-log manager, you can implement ``` void prepareCheckpoint(Checkpoint checkpoint, long numBytesFlushedBetweenCheckpoint) { if (numBytesFlushedBetweenCheckpoint > 0) { rollLogs(); } } ``` in per-ledger-log manager, it is a no-op. so in this way, you can get rid of `rollLogs` in EntryLogManager. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180514001 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; Review comment: I am not sure we need to put this rotatedLogChannels in this base class. for two reasons, 1) there is really no commonality between single-log manager and per-ledger-log manager around checkpoint/rotation. I don't think it is necessarily to put them in the single model. 2) I would prefer using an ordered structure (before this change, it is a `list`), because entry log file rotation in single-log manager is in order and checkpoint logic is in order. changing single-log manager to use `Set` could expose unknown potential bugs, which is a risky change to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180514235 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false +: ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); +boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs(); + +
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180515728 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false +: ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile()); +boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs(); + +
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180516776 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; Review comment: `flushRotatedLogs` and `flushCurrentLogs` are only used in `flush()` method, why not just provide `flush()` method in EntryLogManager, like `checkpoint`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180505515 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -120,21 +124,104 @@ public ConcurrentLongLongHashMap getLedgersMap() { return entryLogMetadata.getLedgersMap(); } +public Long getLedgerIdAssigned() { +return ledgerIdAssigned; +} + +public void setLedgerIdAssigned(Long ledgerId) { +this.ledgerIdAssigned = ledgerId; +} + @Override public String toString() { return MoreObjects.toStringHelper(BufferedChannel.class) .add("logId", logId) .add("logFile", logFile) +.add("ledgerIdAssigned", ledgerIdAssigned) .toString(); } + +/** + * Append the ledger map at the end of the entry log. + * Updates the entry log file header with the offset and size of the map. + */ +private void appendLedgersMap() throws IOException { + +long ledgerMapOffset = this.position(); + +ConcurrentLongLongHashMap ledgersMap = this.getLedgersMap(); +int numberOfLedgers = (int) ledgersMap.size(); + +// Write the ledgers map into several batches + +final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; +final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); + +try { +ledgersMap.forEach(new BiConsumerLong() { +int remainingLedgers = numberOfLedgers; +boolean startNewBatch = true; +int remainingInBatch = 0; + +@Override +public void accept(long ledgerId, long size) { +if (startNewBatch) { +int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); +int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; + +serializedMap.clear(); +serializedMap.writeInt(ledgerMapSize - 4); +serializedMap.writeLong(INVALID_LID); +serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); +serializedMap.writeInt(batchSize); + +startNewBatch = false; +remainingInBatch = batchSize; +} +// Dump the ledger in the current batch +serializedMap.writeLong(ledgerId); +serializedMap.writeLong(size); +--remainingLedgers; + +if (--remainingInBatch == 0) { +// Close current batch +try { +write(serializedMap); +} catch (IOException e) { +throw new RuntimeException(e); +} + +startNewBatch = true; +} +} +}); +} catch (RuntimeException e) { +if (e.getCause() instanceof IOException) { +throw (IOException) e.getCause(); +} else { +throw e; +} +} finally { +serializedMap.release(); +} +// Flush the ledger's map out before we write the header. +// Otherwise the header might point to something that is not fully +// written +super.flush(); + +// Update the headers with the map offset and count of ledgers +ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); +mapInfo.putLong(ledgerMapOffset); +mapInfo.putInt(numberOfLedgers); +mapInfo.flip(); +this.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); +} } -volatile File currentDir; private final LedgerDirsManager ledgerDirsManager; private final boolean entryLogPerLedgerEnabled; -private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false); -private volatile long leastUnflushedLogId; +RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; Review comment: final This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180520311 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -78,6 +79,7 @@ public void initialize(ServerConfiguration conf, .setNameFormat("SortedLedgerStorage-%d") .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 2).build()); this.stateManager = stateManager; +this.isTransactionalCompactionEnabled = conf.getUseTransactionalCompaction(); Review comment: I don't think we need this flag any more, no? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180520230 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java ## @@ -54,7 +49,7 @@ protected boolean removeEntryLog(long entryLogId) { } @Override -public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException { +public synchronized long addEntry(Long ledgerId, ByteBuffer entry) throws IOException { Review comment: @reddycharan you can still use long for retreiving from a Long hashmap. why do you need to change here? in any cases long to Long is a boxing operation, if single-log manager doesn't need Long, it should be deferred to where it is really needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180517926 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; Review comment: the `closeCurrentLogs` and `forceCloseCurrentLogs` are the abstraction of the logic during `closing`. I think it is better to call it `close` and `closeNow` (or `forceClose`) - meaning closing the manager, force closing the manager. This would provide a better meaning and it is similar as `shutdown` and `shutdownNow` in scheduler. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180525552 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java ## @@ -93,16 +98,16 @@ public void testCreateNewLog() throws Exception { // Extracted from createNewLog() String logFileName = Long.toHexString(1) + ".log"; File dir = ledgerDirsManager.pickRandomWritableDir(); -LOG.info("Picked this directory: " + dir); +LOG.info("Picked this directory: {}", dir); File newLogFile = new File(dir, logFileName); newLogFile.createNewFile(); EntryLogger el = new EntryLogger(conf, ledgerDirsManager); // Calls createNewLog, and with the number of directories we // are using, if it picks one at random it will fail. -el.createNewLog(); -LOG.info("This is the current log id: " + el.getCurrentLogId()); -assertTrue("Wrong log id", el.getCurrentLogId() > 1); +((EntryLogManagerBase) el.entryLogManager).createNewLog(0L); Review comment: - nit: provide a `getEntryLogManager` in EntryLogger. so we can mock it in future if needed. - better to cast to `SingleEntryLogManager`. because you can expose `getCurrentLogId` in `SingleEntryLogManager`. ``` SingleEntryLogManager selm = (SingleEntryLogManager) (el.getEntryLogManager()); selm.createNewLog(0L); assertTrue(selm.getCurrentLogId() > 1); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180507966 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null ==
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180510817 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -880,39 +1195,6 @@ protected ByteBuf initialValue() throws Exception { } }; -public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { -if (null == logChannel) { -// log channel can be null because the file is deferred to be created when no writable ledger directory -// is available. -createNewLog(); -} - -int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size -boolean reachEntryLogLimit = -rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize); -// Create new log if logSizeLimit reached or current disk is full -boolean createNewLog = shouldCreateNewEntryLog.get(); Review comment: any reason why `shouldCreateNewEntryLog` is removed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180510213 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * flush rotated logs. + */ +void flushRotatedLogs() throws IOException; + +/* + * flush current logs. + */ +void flushCurrentLogs() throws IOException; + +/* + * close current logs. + */ +void closeCurrentLogs() throws IOException; + +/* + * force close current logs. + */ +void forceCloseCurrentLogs(); + +/* + * this method should be called before doing entrymemtable flush, it + * would save the state of the entrylogger before entrymemtable flush + * and commitEntryMemTableFlush would take appropriate action after + * entrymemtable flush. + */ +void prepareEntryMemTableFlush(); + /* - * In the case of entryLogPerLedgerEnabled we need to flush both - * rotatedlogs and currentlogs. This is needed because syncThread - * periodically does checkpoint and at this time all the logs should - * be flushed. + * this method should be called after doing entrymemtable flush,it would + * take appropriate action after entrymemtable flush depending on the + * current state of the entrylogger and the state of the entrylogger + * during prepareEntryMemTableFlush. * - * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of - * this Issue, I will move this logic to individual implamentations of - * EntryLogManager and it would be free of this booalen flag based logic. + * It is assumed that there would be corresponding + * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both + * would be called from the same thread. * + * returns boolean value indicating whether EntryMemTable should do checkpoint + * after this commit method. */ -if (entryLogPerLedgerEnabled) { -flushCurrentLog(); -} +boolean commitEntryMemTableFlush() throws IOException; } -void flushRotatedLogs() throws IOException { -List channels = null; -long flushedLogId = INVALID_LID; -synchronized (this) { -channels = logChannelsToFlush; -logChannelsToFlush = null; +abstract class EntryLogManagerBase implements EntryLogManager { +final Set rotatedLogChannels; + +EntryLogManagerBase() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); } -if (null == channels) { -return; + +/* + * This method should be guarded by a lock, so callers of this method + * should be in the right scope of the lock. + */ +@Override +public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException { + +int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size +BufferedLogChannel logChannel = getCurrentLogForLedger(ledger); +boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize) +: readEntryLogHardLimit(logChannel, entrySize); +// Create new log if logSizeLimit reached or current disk is full +boolean diskFull = (logChannel == null) ? false Review comment: you are changing logic while moving code around for abstracting interface. This makes review really hard and painful. you should just move the code first and
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180507116 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null ==
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180516551 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -788,89 +852,340 @@ private long readLastLogId(File f) { } } -/** - * Flushes all rotated log channels. After log channels are flushed, - * move leastUnflushedLogId ptr to current logId. - */ -void checkpoint() throws IOException { -flushRotatedLogs(); +interface EntryLogManager { + +/* + * add entry to the corresponding entrylog and return the position of + * the entry in the entrylog + */ +long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException; + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; Review comment: after the change of `prepareMemtableFlush` and `commitMemtableFlush`, rollLogs seems to be only the behavior of single-log manager. I don't think you need `rollLogs` here, unless I missed something. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180508758 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null ==
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180508334 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -499,176 +576,163 @@ void createNewLog() throws IOException { EntryLoggerAllocator getEntryLoggerAllocator() { return entryLoggerAllocator; } -/** - * Append the ledger map at the end of the entry log. - * Updates the entry log file header with the offset and size of the map. - */ -private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { -long ledgerMapOffset = entryLogChannel.position(); - -ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap(); -int numberOfLedgers = (int) ledgersMap.size(); - -// Write the ledgers map into several batches - -final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE; -final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize); - -try { -ledgersMap.forEach(new BiConsumerLong() { -int remainingLedgers = numberOfLedgers; -boolean startNewBatch = true; -int remainingInBatch = 0; - -@Override -public void accept(long ledgerId, long size) { -if (startNewBatch) { -int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); -int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; - -serializedMap.clear(); -serializedMap.writeInt(ledgerMapSize - 4); -serializedMap.writeLong(INVALID_LID); -serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID); -serializedMap.writeInt(batchSize); - -startNewBatch = false; -remainingInBatch = batchSize; -} -// Dump the ledger in the current batch -serializedMap.writeLong(ledgerId); -serializedMap.writeLong(size); ---remainingLedgers; - -if (--remainingInBatch == 0) { -// Close current batch -try { -entryLogChannel.write(serializedMap); -} catch (IOException e) { -throw new RuntimeException(e); -} - -startNewBatch = true; -} -} -}); -} catch (RuntimeException e) { -if (e.getCause() instanceof IOException) { -throw (IOException) e.getCause(); -} else { -throw e; -} -} finally { -serializedMap.release(); -} -// Flush the ledger's map out before we write the header. -// Otherwise the header might point to something that is not fully written -entryLogChannel.flush(); - -// Update the headers with the map offset and count of ledgers -ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); -mapInfo.putLong(ledgerMapOffset); -mapInfo.putInt(numberOfLedgers); -mapInfo.flip(); -entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); -} /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { private long preallocatedLogId; -private Future preallocation = null; +Future preallocation = null; private ExecutorService allocatorExecutor; -private final Object createEntryLogLock = new Object(); -private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } -BufferedLogChannel createNewLog() throws IOException { -synchronized (createEntryLogLock) { -BufferedLogChannel bc; -if (!entryLogPreAllocationEnabled){ -// create a new log directly -bc = allocateNewLog(); -return bc; -} else { -// allocate directly to response request -if (null == preallocation){ -bc = allocateNewLog(); +synchronized long getPreallocatedLogId(){ +return preallocatedLogId; +} + +synchronized BufferedLogChannel createNewLog() throws IOException { +BufferedLogChannel bc; +if (!entryLogPreAllocationEnabled || null ==
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180505208 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -92,13 +99,10 @@ private final long logId; private final EntryLogMetadata entryLogMetadata; private final File logFile; +private Long ledgerIdAssigned = UNASSIGNED_LEDGERID; Review comment: use `long` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180505355 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -84,6 +88,9 @@ */ public class EntryLogger { private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class); +static final Long UNASSIGNED_LEDGERID = Long.valueOf(-1); Review comment: `long` please. `long UNASSIGNED_LEDGERID = -1L` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179294437 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java ## @@ -54,7 +49,7 @@ protected boolean removeEntryLog(long entryLogId) { } @Override -public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException { +public synchronized long addEntry(Long ledgerId, ByteBuffer entry) throws IOException { Review comment: I see you changed "long" to "Long" in a lot of places. what is the consideration behind this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179293507 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -393,12 +513,37 @@ public BufferedReadChannel getFromChannels(long logId) { * * @return least unflushed log id. */ -synchronized long getLeastUnflushedLogId() { -return leastUnflushedLogId; +long getLeastUnflushedLogId() { +return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId(); } -synchronized long getCurrentLogId() { -return logChannel.getLogId(); +long getPreviousAllocatedEntryLogId() { +return entryLoggerAllocator.getPreallocatedLogId(); +} + +boolean rollLogsIfEntryLogLimitReached() throws IOException { Review comment: why not make `rollLogsIfEntryLogLimitReached` as part of `EntryLogManager`? so simple entry log manager is much simpler. you can do these heavy lifting management in per ledger entry log implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179294075 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -802,88 +881,230 @@ private long readLastLogId(File f) { } } +interface EntryLogManager { +/* + * acquire lock for this ledger. + */ +void acquireLock(Long ledgerId); + +/* + * acquire lock for this ledger if it is not already available for this + * ledger then it will create a new one and then acquire lock. + */ +void acquireLockByCreatingIfRequired(Long ledgerId); + +/* + * release lock for this ledger + */ +void releaseLock(Long ledgerId); + +/* + * sets the logChannel for the given ledgerId. The previous one will be + * removed from replicaOfCurrentLogChannels. Previous logChannel will be + * added to rotatedLogChannels. + */ +void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel); + +/* + * gets the logChannel for the given ledgerId. + */ +BufferedLogChannel getCurrentLogForLedger(Long ledgerId); + +/* + * gets the copy of rotatedLogChannels + */ +Set getCopyOfRotatedLogChannels(); + +/* + * gets the copy of replicaOfCurrentLogChannels + */ +Set getCopyOfCurrentLogs(); + +/* + * gets the active logChannel with the given entryLogId. null if it is + * not existing. + */ +BufferedLogChannel getCurrentLogIfPresent(long entryLogId); + +/* + * removes the logChannel from rotatedLogChannels collection + */ +void removeFromRotatedLogChannels(BufferedLogChannel rotatedLogChannelToRemove); + +/* + * Returns eligible writable ledger dir for the creation next entrylog + */ +File getDirForNextEntryLog(List writableLedgerDirs); + +/* + * Do the operations required for checkpoint. + */ +void checkpoint() throws IOException; + +/* + * roll entryLogs. + */ +void rollLogs() throws IOException; + +/* + * gets the log id of the current activeEntryLog. for + * EntryLogManagerForSingleEntryLog it returns logid of current + * activelog, for EntryLogManagerForEntryLogPerLedger it would return + * some constant value. + */ +long getCurrentLogId(); +} + +class EntryLogManagerForSingleEntryLog implements EntryLogManager { + +private volatile BufferedLogChannel activeLogChannel; +private Lock lockForActiveLogChannel; +private final Set rotatedLogChannels; + +EntryLogManagerForSingleEntryLog() { +rotatedLogChannels = ConcurrentHashMap.newKeySet(); +lockForActiveLogChannel = new ReentrantLock(); +} + +/* + * since entryLogPerLedger is not enabled, it is just one lock for all + * ledgers. + */ +@Override +public void acquireLock(Long ledgerId) { +lockForActiveLogChannel.lock(); +} + +@Override +public void acquireLockByCreatingIfRequired(Long ledgerId) { +acquireLock(ledgerId); +} + +@Override +public void releaseLock(Long ledgerId) { +lockForActiveLogChannel.unlock(); +} + +@Override +public void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel) { +acquireLock(ledgerId); +try { +BufferedLogChannel hasToRotateLogChannel = activeLogChannel; +activeLogChannel = logChannel; +if (hasToRotateLogChannel != null) { +rotatedLogChannels.add(hasToRotateLogChannel); +} +} finally { +releaseLock(ledgerId); +} +} + +@Override +public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) { +return activeLogChannel; +} + +@Override +public Set getCopyOfRotatedLogChannels() { +return new HashSet(rotatedLogChannels); +} + +@Override +public Set getCopyOfCurrentLogs() { +HashSet copyOfCurrentLogs = new HashSet(); +copyOfCurrentLogs.add(activeLogChannel); +return copyOfCurrentLogs; +} + +@Override +public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) { +BufferedLogChannel activeLogChannelTemp = activeLogChannel; +if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() == entryLogId)) { +return activeLogChannelTemp; +} +
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179290558 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -83,18 +88,18 @@ */ public class EntryLogger { private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class); +private static final Long INVALID_LEDGERID = Long.valueOf(-1); +// log file suffix +private static final String LOG_FILE_SUFFIX = ".log"; static class BufferedLogChannel extends BufferedChannel { private final long logId; private final EntryLogMetadata entryLogMetadata; private final File logFile; +private Long ledgerId = INVALID_LEDGERID; Review comment: I'd prefer naming it to something meaningful, like 'ledgerIdAssigned', meaning it is a entry log file that assigned to a specific ledger. make it final if possible. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179290810 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -116,21 +121,103 @@ public ConcurrentLongLongHashMap getLedgersMap() { return entryLogMetadata.getLedgersMap(); } +public Long getLedgerId() { +return ledgerId; +} + +public void setLedgerId(Long ledgerId) { Review comment: when the ledger id can be changed? if not, make this field final This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179290902 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -116,21 +121,103 @@ public ConcurrentLongLongHashMap getLedgersMap() { return entryLogMetadata.getLedgersMap(); } +public Long getLedgerId() { +return ledgerId; +} + +public void setLedgerId(Long ledgerId) { +this.ledgerId = ledgerId; +} + @Override public String toString() { return MoreObjects.toStringHelper(BufferedChannel.class) Review comment: Update the `toString` to include `ledgerIdAssigned`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179289148 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -83,18 +88,18 @@ */ public class EntryLogger { private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class); +private static final Long INVALID_LEDGERID = Long.valueOf(-1); Review comment: use a special negative value instead of `-1`. also I don't think it is `INVALID_LEDGERID`, it is actually "UNASSIGNED". because it is used in `BufferedLogChannel` when a buffered channel is not specifically assigned to a ledger in single entry log manager. also use long instead of `Long` to avoid boxing/unboxing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177532672 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -209,16 +211,39 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); -long logIdBeforeFlush = entryLogger.getCurrentLogId(); +long prevAllocLogIdBeforeFlush = entryLogger.getPreviousAllocatedEntryLogId(); Review comment: This change is not correct. Because it should be "current" log id, not "previous allocated entry log id". That says the behavior is different between preallocation enabled and disabled. That leads me to think that current entry log manager interface is not abstracted with right methods. because it abstracts a lot of methods around the underlying files, which underlying files are kind of tight with corresponding checkpointing and flushing logic. two different entry log manager can't really fit in this interface or checkpointing logic as well. So as what I said in my previous comment, we need to think about what is the right methods to put in EntryLogManager. > I would suggest the EntryLogManager should hide the details on how to create log, allocate log, > flush/checkpoint logs. the details such as preallocationLogId, unflushedLogId and how to lock them should be > hidden to the implementation, rather than defining a too finer interface here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177535531 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); -long logIdBeforeFlush = entryLogger.getCurrentLogId(); memTable.flush(SortedLedgerStorage.this); -long logIdAfterFlush = entryLogger.getCurrentLogId(); // in any case that an entry log reaches the limit, we roll the log and start checkpointing. // if a memory table is flushed spanning over two entry log files, we also roll log. this is // for performance consideration: since we don't wanna checkpoint a new log file that ledger // storage is writing to. -if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) { -LOG.info("Rolling entry logger since it reached size limitation"); -entryLogger.rollLog(); +if (entryLogger.rollLogsIfEntryLogLimitReached()) { Review comment: @ivankelly that's what I commented before. I don't think the methods in EntryLogManager provide the right abstraction. That's what I also dislike your approach on asking moving the class out at this PR. At this PR, we need to figure out what are the right methods to put in an EntryLogManager before any kind of code movement. You comments about inner classes complicate things. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r177532166 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -78,6 +79,7 @@ public void initialize(ServerConfiguration conf, .setNameFormat("SortedLedgerStorage-%d") .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 2).build()); this.stateManager = stateManager; +this.isTransactionalCompactionEnabled = conf.getUseTransactionalCompaction(); Review comment: @ivankelly > The external behaviour of SortedLedgerStorage should not change with this patch it is not an external behavior of SortedLedgerStorage. It is the logic of SortedLedgerStorage. SortedLedgerStorage needs to realize the compaction mechanism to do checkpointing. That's related to the discussion below around checkpointing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176658665 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); -long logIdBeforeFlush = entryLogger.getCurrentLogId(); memTable.flush(SortedLedgerStorage.this); -long logIdAfterFlush = entryLogger.getCurrentLogId(); // in any case that an entry log reaches the limit, we roll the log and start checkpointing. // if a memory table is flushed spanning over two entry log files, we also roll log. this is // for performance consideration: since we don't wanna checkpoint a new log file that ledger // storage is writing to. -if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) { -LOG.info("Rolling entry logger since it reached size limitation"); -entryLogger.rollLog(); +if (entryLogger.rollLogsIfEntryLogLimitReached()) { Review comment: @reddycharan well, the existence of the code had its reason. performance is one, I think the biggest problem is "checkpoint" logic can potentially never be triggered. - during memtable flushes, rollLog is disabled for appending memtable entries. - however background compaction can also advance entrylog, since it is appending entries into same entrylog. - if a memtable flush spanning over two entry log files, `rollLogsIfEntryLogLimitReached` will most likely be false, so no checkpoint will happen. - in the next memtable flush, if it is spanning over two entry log files again, then `rollLogsIfEntryLogLimitReached` still be false, no checkpoint will happen. This was actually a behavior we observed at Twitter 3 years ago. I added this logic to prevent this happen. see twitter/bookkeeper@a7a8c88eaa2659b5d6c4e29f4eb9e2d184c7cfe0 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176654536 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -167,14 +167,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { @Override public void checkpoint(final Checkpoint checkpoint) throws IOException { -long numBytesFlushed = memTable.flush(this, checkpoint); -if (numBytesFlushed > 0) { -// if bytes are added between previous flush and this checkpoint, -// it means bytes might live at current active entry log, we need -// roll current entry log and then issue checkpoint to underlying -// interleaved ledger storage. -entryLogger.rollLog(); -} +memTable.flush(this, checkpoint); Review comment: @reddycharan > First of all I’m not sure, why this is added with this commit 81cbba3. I believe there was whole discussion in the original PR introducing this. And I agreed with your logic in the original PR. I am not arguing again about the correctness. what I am suggesting here is this change is unrelated to the introduction of EntryLogManager. This change is more related to the subsequent change your want to introduce. What I am suggesting is putting this change as part of your subsequent change, that is much clearer when people looking back into the histories. Because they are related. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176336059 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -802,88 +864,207 @@ private long readLastLogId(File f) { } } +interface EntryLogManager { +/* + * acquire lock for this ledger. + */ +void acquireLock(Long ledgerId); Review comment: Can we try to hide these locking stuffs? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176336283 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -802,88 +864,207 @@ private long readLastLogId(File f) { } } +interface EntryLogManager { +/* + * acquire lock for this ledger. + */ +void acquireLock(Long ledgerId); + +/* + * acquire lock for this ledger if it is not already available for this + * ledger then it will create a new one and then acquire lock. + */ +void acquireLockByCreatingIfRequired(Long ledgerId); + +/* + * release lock for this ledger + */ +void releaseLock(Long ledgerId); + +/* + * sets the logChannel for the given ledgerId. The previous one will be + * removed from replicaOfCurrentLogChannels. Previous logChannel will be + * added to rotatedLogChannels. + */ +void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel); Review comment: All the `currentLogForLedger` `copy of rotated log channels` `copy of current logs` are kind of implementation detail of an implementation of EntryLogManager. Can we think of a better abstraction to hide these? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176332343 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java ## @@ -584,15 +584,6 @@ public void testWithDiskFullReadOnlyDisabledOrForceGCAllowDisabled() throws Exce } catch (NoWritableLedgerDirException e) { // expected } - Review comment: Can you explain why these lines are removed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176332723 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -1441,4 +1658,40 @@ static long fileName2LogId(String fileName) { static String logId2HexString(long logId) { return Long.toHexString(logId); } -} + +/** + * Datastructure which maintains the status of logchannels. When a + * logChannel is created entry of < entryLogId, false > will be made to this + * sortedmap and when logChannel is rotated and flushed then the entry is + * updated to < entryLogId, true > and all the lowest entries with + * < entryLogId, true > status will be removed from the sortedmap. So that way + * we could get least unflushed LogId. + * + */ +static class RecentEntryLogsStatus { +private SortedMapentryLogsStatusMap; Review comment: "private final" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176331968 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -209,16 +202,12 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); -long logIdBeforeFlush = entryLogger.getCurrentLogId(); memTable.flush(SortedLedgerStorage.this); -long logIdAfterFlush = entryLogger.getCurrentLogId(); // in any case that an entry log reaches the limit, we roll the log and start checkpointing. // if a memory table is flushed spanning over two entry log files, we also roll log. this is // for performance consideration: since we don't wanna checkpoint a new log file that ledger // storage is writing to. -if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) { -LOG.info("Rolling entry logger since it reached size limitation"); -entryLogger.rollLog(); +if (entryLogger.rollLogsIfEntryLogLimitReached()) { Review comment: This changes performance characteristics. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176331262 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -116,6 +121,18 @@ public ConcurrentLongLongHashMap getLedgersMap() { return entryLogMetadata.getLedgersMap(); } +public boolean isLedgerDirFull() { Review comment: it seems that you are converting this class to inner class just because you want to reference ledgerDirsManager. I don't think this is a good change. I would prefer not reference `ledgerDirManager`. you should just add `getFile()` in this class and call `ledgerDirsManager.isDirFull(channel.getFile())`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager.
sijie commented on a change in pull request #1281: Issue #570: Introducing EntryLogManager. URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r176332233 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ## @@ -167,14 +167,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { @Override public void checkpoint(final Checkpoint checkpoint) throws IOException { -long numBytesFlushed = memTable.flush(this, checkpoint); -if (numBytesFlushed > 0) { -// if bytes are added between previous flush and this checkpoint, -// it means bytes might live at current active entry log, we need -// roll current entry log and then issue checkpoint to underlying -// interleaved ledger storage. -entryLogger.rollLog(); -} +memTable.flush(this, checkpoint); Review comment: I would suggest leaving this change out of this PR. Let's discuss this when you introduce multiple entry log manager. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services