[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189416753 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java ## @@ -268,4 +508,64 @@ public void testCreateNewLogAndCompactionLog() throws Exception { "previousAllocatedEntryLogId after 2 times createNewLog is called", 2, el.getPreviousAllocatedEntryLogId()); } + +/* + * In this testcase entrylogs for ledgers are tried to create concurrently. + */ +@Test +public void testConcurrentEntryLogCreations() throws Exception { +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + +// Creating a new configuration with a number of ledger directories. +conf.setLedgerDirNames(ledgerDirs); +// pre-allocation is enabled +conf.setEntryLogFilePreAllocationEnabled(true); +conf.setEntryLogPerLedgerEnabled(true); +LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), +new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); +EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); +EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) +entryLogger.getEntryLogManager(); + +int numOfLedgers = 10; +int numOfThreadsForSameLedger = 10; +AtomicInteger createdEntryLogs = new AtomicInteger(0); +CountDownLatch startLatch = new CountDownLatch(1); +CountDownLatch createdLatch = new CountDownLatch(numOfLedgers * numOfThreadsForSameLedger); + +for (long i = 0; i < numOfLedgers; i++) { +for (int j = 0; j < numOfThreadsForSameLedger; j++) { +long ledgerId = i; +new Thread() { +@Override +public void run() { Review comment: changed it to lambda This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189413885 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java ## @@ -142,6 +151,237 @@ public void testCreateNewLogWithNoWritableLedgerDirs() throws Exception { assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1); } +/* + * entryLogPerLedger is enabled and various scenarios of entrylogcreation are tested + */ +@Test +public void testEntryLogPerLedgerCreationWithPreAllocation() throws Exception { +/* + * I wish I could shorten this testcase or split it into multiple testcases, + * but I want to cover a scenario and it requires multiple operations in + * sequence and validations along the way. Please bear with the length of this + * testcase, I added as many comments as I can to simplify it. + */ + +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + +// Creating a new configuration with a number of ledger directories. +conf.setLedgerDirNames(ledgerDirs); +conf.setIsForceGCAllowWhenNoSpace(true); +// preAllocation is Enabled +conf.setEntryLogFilePreAllocationEnabled(true); +conf.setEntryLogPerLedgerEnabled(true); +LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), +new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); +EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); +EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator; +EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger +.getEntryLogManager(); + +/* + * no entrylog will be created during initialization + */ +int expectedPreAllocatedLogID = -1; +Assert.assertEquals("PreallocatedlogId after initialization of Entrylogger", +expectedPreAllocatedLogID, entryLoggerAllocator.getPreallocatedLogId()); + +int numOfLedgers = 6; + +for (long i = 0; i < numOfLedgers; i++) { +/* since we are starting creation of new ledgers, entrylogid will be ledgerid */ +entryLogManager.createNewLog(i); +} + +Thread.sleep(100); Review comment: guava to the rescue! removed these sleeps, for entryloggerallocator's allocatorexecutor using newDirectExecutorService() This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189074159 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -85,6 +83,7 @@ private final EntryLogMetadata entryLogMetadata; private final File logFile; private long ledgerIdAssigned = UNASSIGNED_LEDGERID; +volatile boolean ledgerDirFull = false; Review comment: Considered that as well, having new class BufferedLogChannelWithDirInfo extending BufferedLogChannelWithDirInfo in EntryLogManagerForEntryLogPerLedger class. But BufferedLogChannel instance is created by EntryLoggerAllocator https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java#L162 , so if I create new BufferedLogChannelWithDirInfo class, then here I should have if/else block based on conf value (which I'm not very inclined, but I can live with it). is that ok? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189074159 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -85,6 +83,7 @@ private final EntryLogMetadata entryLogMetadata; private final File logFile; private long ledgerIdAssigned = UNASSIGNED_LEDGERID; +volatile boolean ledgerDirFull = false; Review comment: Considered that as well, having new class BufferedLogChannelWithDirInfo extending BufferedLogChannelWithDirInfo in EntryLogManagerForEntryLogPerLedger class. But BufferedLogChannel instance is created by EntryLoggerAllocator https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java#L162 , so if I create new BufferedLogChannelWithDirInfo class, then here I should have if/else block based on conf value. is that ok? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189063807 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; +private final int entrylogMapAccessExpiryTimeInSeconds; +private final int maximumNumberOfActiveEntryLogs; + +EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, +EntryLoggerAllocator entryLoggerAllocator, List listeners, +EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException { +super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); +this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; +this.rotatedLogChannels = new CopyOnWriteArrayList(); +this.replicaOfCurrentLogChannels = new ConcurrentHashMap (); +this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds(); +this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs(); +ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); +this.entryLogAndLockTupleCacheLoader = new CacheLoader () { +@Override +public EntryLogAndLockTuple load(Long key) throws Exception { +return new EntryLogAndLockTuple(); +} +}; +/* + * Currently we are relying on access time based eviction
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189063807 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; +private final int entrylogMapAccessExpiryTimeInSeconds; +private final int maximumNumberOfActiveEntryLogs; + +EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, +EntryLoggerAllocator entryLoggerAllocator, List listeners, +EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException { +super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); +this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; +this.rotatedLogChannels = new CopyOnWriteArrayList(); +this.replicaOfCurrentLogChannels = new ConcurrentHashMap (); +this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds(); +this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs(); +ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); +this.entryLogAndLockTupleCacheLoader = new CacheLoader () { +@Override +public EntryLogAndLockTuple load(Long key) throws Exception { +return new EntryLogAndLockTuple(); +} +}; +/* + * Currently we are relying on access time based eviction
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188817538 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; +private final int entrylogMapAccessExpiryTimeInSeconds; +private final int maximumNumberOfActiveEntryLogs; + +EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, +EntryLoggerAllocator entryLoggerAllocator, List listeners, +EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException { +super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); +this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; +this.rotatedLogChannels = new CopyOnWriteArrayList(); +this.replicaOfCurrentLogChannels = new ConcurrentHashMap (); +this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds(); +this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs(); +ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); +this.entryLogAndLockTupleCacheLoader = new CacheLoader () { +@Override +public EntryLogAndLockTuple load(Long key) throws Exception { +return new EntryLogAndLockTuple(); +} +}; +/* + * Currently we are relying on access time based eviction
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188817433 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ## @@ -85,6 +83,7 @@ private final EntryLogMetadata entryLogMetadata; private final File logFile; private long ledgerIdAssigned = UNASSIGNED_LEDGERID; +volatile boolean ledgerDirFull = false; Review comment: Hey Sijie, I considered about your concern before reintroducing this flag variable here. It would unnecessarily complicate the logic of LedgerDirsListener for maintaining the ledgerdir full status in the entrylogmanager implementation. Thats why I chose to keep it here. I don't think it is inappropriate to have it here, since there is final 'logfile' (File) variable for this BufferedLogChannel here, so I would consider 'ledgerDirFull' variable as extension of 'logFile' variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188813910 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; +private final int entrylogMapAccessExpiryTimeInSeconds; +private final int maximumNumberOfActiveEntryLogs; + +EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, +EntryLoggerAllocator entryLoggerAllocator, List listeners, +EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException { +super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); +this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; +this.rotatedLogChannels = new CopyOnWriteArrayList(); +this.replicaOfCurrentLogChannels = new ConcurrentHashMap (); +this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds(); +this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs(); +ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); +this.entryLogAndLockTupleCacheLoader = new CacheLoader () { +@Override +public EntryLogAndLockTuple load(Long key) throws Exception { +return new EntryLogAndLockTuple(); +} +}; +/* + * Currently we are relying on access time based eviction
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188813031 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; +private final int entrylogMapAccessExpiryTimeInSeconds; +private final int maximumNumberOfActiveEntryLogs; + +EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, +EntryLoggerAllocator entryLoggerAllocator, List listeners, +EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException { +super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); +this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; +this.rotatedLogChannels = new CopyOnWriteArrayList(); +this.replicaOfCurrentLogChannels = new ConcurrentHashMap (); +this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds(); +this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs(); +ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); +this.entryLogAndLockTupleCacheLoader = new CacheLoader () { +@Override +public EntryLogAndLockTuple load(Long key) throws Exception { +return new EntryLogAndLockTuple(); +} +}; +/* + * Currently we are relying on access time based eviction
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188810829 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; +private final int entrylogMapAccessExpiryTimeInSeconds; +private final int maximumNumberOfActiveEntryLogs; + +EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, +EntryLoggerAllocator entryLoggerAllocator, List listeners, +EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException { +super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); +this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; +this.rotatedLogChannels = new CopyOnWriteArrayList(); +this.replicaOfCurrentLogChannels = new ConcurrentHashMap (); +this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds(); +this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs(); +ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); +this.entryLogAndLockTupleCacheLoader = new CacheLoader () { +@Override +public EntryLogAndLockTuple load(Long key) throws Exception { +return new EntryLogAndLockTuple(); +} +}; +/* + * Currently we are relying on access time based eviction
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188810041 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; +private final int entrylogMapAccessExpiryTimeInSeconds; +private final int maximumNumberOfActiveEntryLogs; + +EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, +EntryLoggerAllocator entryLoggerAllocator, List listeners, +EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException { +super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); +this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; +this.rotatedLogChannels = new CopyOnWriteArrayList(); +this.replicaOfCurrentLogChannels = new ConcurrentHashMap (); +this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds(); +this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs(); +ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); +this.entryLogAndLockTupleCacheLoader = new CacheLoader () { +@Override +public EntryLogAndLockTuple load(Long key) throws Exception { +return new EntryLogAndLockTuple(); +} +}; +/* + * Currently we are relying on access time based eviction
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188809247 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; +private final int entrylogMapAccessExpiryTimeInSeconds; +private final int maximumNumberOfActiveEntryLogs; + +EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, +EntryLoggerAllocator entryLoggerAllocator, List listeners, +EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException { +super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); +this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; +this.rotatedLogChannels = new CopyOnWriteArrayList(); +this.replicaOfCurrentLogChannels = new ConcurrentHashMap (); +this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds(); +this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs(); +ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); +this.entryLogAndLockTupleCacheLoader = new CacheLoader () { +@Override +public EntryLogAndLockTuple load(Long key) throws Exception { +return new EntryLogAndLockTuple(); +} +}; +/* + * Currently we are relying on access time based eviction
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188808410 ## File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java ## @@ -142,6 +151,237 @@ public void testCreateNewLogWithNoWritableLedgerDirs() throws Exception { assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1); } +/* + * entryLogPerLedger is enabled and various scenarios of entrylogcreation are tested + */ +@Test +public void testEntryLogPerLedgerCreationWithPreAllocation() throws Exception { +/* + * I wish I could shorten this testcase or split it into multiple testcases, + * but I want to cover a scenario and it requires multiple operations in + * sequence and validations along the way. Please bear with the length of this + * testcase, I added as many comments as I can to simplify it. + */ + +ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + +// Creating a new configuration with a number of ledger directories. +conf.setLedgerDirNames(ledgerDirs); +conf.setIsForceGCAllowWhenNoSpace(true); +// preAllocation is Enabled +conf.setEntryLogFilePreAllocationEnabled(true); +conf.setEntryLogPerLedgerEnabled(true); +LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), +new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); +EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); +EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator; +EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger +.getEntryLogManager(); + +/* + * no entrylog will be created during initialization + */ +int expectedPreAllocatedLogID = -1; +Assert.assertEquals("PreallocatedlogId after initialization of Entrylogger", +expectedPreAllocatedLogID, entryLoggerAllocator.getPreallocatedLogId()); + +int numOfLedgers = 6; + +for (long i = 0; i < numOfLedgers; i++) { +/* since we are starting creation of new ledgers, entrylogid will be ledgerid */ +entryLogManager.createNewLog(i); +} + +Thread.sleep(100); +/* + * preallocation is enabled so though entryLogId starts with 0, preallocatedLogId would be equal to numOfLedgers + */ +expectedPreAllocatedLogID = numOfLedgers; +Assert.assertEquals("PreallocatedlogId after creation of logs for ledgers", expectedPreAllocatedLogID, +entryLoggerAllocator.getPreallocatedLogId()); +Assert.assertEquals("Number of current ", numOfLedgers, +entryLogManager.getCopyOfCurrentLogs().size()); +Assert.assertEquals("Number of LogChannels to flush", 0, +entryLogManager.getRotatedLogChannels().size()); + +// create dummy entrylog file with id - (expectedPreAllocatedLogID + 1) +String logFileName = Long.toHexString(expectedPreAllocatedLogID + 1) + ".log"; +File dir = ledgerDirsManager.pickRandomWritableDir(); +LOG.info("Picked this directory: " + dir); +File newLogFile = new File(dir, logFileName); +newLogFile.createNewFile(); + +/* + * since there is already preexisting entrylog file with id - + * (expectedPreAllocatedLogIDDuringInitialization + 1), when new + * entrylog is created it should have + * (expectedPreAllocatedLogIDDuringInitialization + 2) id + */ +long rotatedLedger = 1L; +entryLogManager.createNewLog(rotatedLedger); +Thread.sleep(100); +expectedPreAllocatedLogID = expectedPreAllocatedLogID + 2; +Assert.assertEquals("PreallocatedlogId ", +expectedPreAllocatedLogID, entryLoggerAllocator.getPreallocatedLogId()); +Assert.assertEquals("Number of current ", numOfLedgers, +entryLogManager.getCopyOfCurrentLogs().size()); +List rotatedLogChannels = entryLogManager.getRotatedLogChannels(); +Assert.assertEquals("Number of LogChannels rotated", 1, rotatedLogChannels.size()); +Assert.assertEquals("Rotated logchannel logid", rotatedLedger, rotatedLogChannels.iterator().next().getLogId()); +entryLogger.flush(); +/* + * when flush is called all the rotatedlogchannels are flushed and + * removed from rotatedlogchannels list. But here since entrylogId - 0, + * is not yet rotated and flushed yet, getLeastUnflushedLogId will still + * return 0. + */ +rotatedLogChannels =
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188808144 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; Review comment: yes, it can be This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188808007 ## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java ## @@ -0,0 +1,463 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.netty.buffer.ByteBuf; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.commons.lang.mutable.MutableInt; + +@Slf4j +class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase { + +static class EntryLogAndLockTuple { +private final Lock ledgerLock; +private BufferedLogChannel entryLog; + +public EntryLogAndLockTuple() { +ledgerLock = new ReentrantLock(); +} + +public Lock getLedgerLock() { +return ledgerLock; +} + +public BufferedLogChannel getEntryLog() { +return entryLog; +} + +public void setEntryLog(BufferedLogChannel entryLog) { +this.entryLog = entryLog; +} +} + +private LoadingCacheledgerIdEntryLogMap; +/* + * every time active logChannel is accessed from ledgerIdEntryLogMap + * cache, the accesstime of that entry is updated. But for certain + * operations we dont want to impact accessTime of the entries (like + * periodic flush of current active logChannels), and those operations + * can use this copy of references. + */ +private final ConcurrentHashMap replicaOfCurrentLogChannels; +private final CacheLoader entryLogAndLockTupleCacheLoader; +private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; +private final int entrylogMapAccessExpiryTimeInSeconds; +private final int maximumNumberOfActiveEntryLogs; + +EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, +EntryLoggerAllocator entryLoggerAllocator, List listeners, +EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException { +super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); +this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; +this.rotatedLogChannels = new CopyOnWriteArrayList(); +this.replicaOfCurrentLogChannels = new ConcurrentHashMap (); +this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds(); +this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs(); +ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); +this.entryLogAndLockTupleCacheLoader = new CacheLoader () { +@Override +public EntryLogAndLockTuple load(Long key) throws Exception { +return new EntryLogAndLockTuple(); +} +}; +/* + * Currently we are relying on access time based eviction