[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-18 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189416753
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
 ##
 @@ -268,4 +508,64 @@ public void testCreateNewLogAndCompactionLog() throws 
Exception {
 "previousAllocatedEntryLogId after 2 times createNewLog is 
called", 2,
 el.getPreviousAllocatedEntryLogId());
 }
+
+/*
+ * In this testcase entrylogs for ledgers are tried to create concurrently.
+ */
+@Test
+public void testConcurrentEntryLogCreations() throws Exception {
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+
+// Creating a new configuration with a number of ledger directories.
+conf.setLedgerDirNames(ledgerDirs);
+// pre-allocation is enabled
+conf.setEntryLogFilePreAllocationEnabled(true);
+conf.setEntryLogPerLedgerEnabled(true);
+LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+EntryLogManagerForEntryLogPerLedger entrylogManager = 
(EntryLogManagerForEntryLogPerLedger)
+entryLogger.getEntryLogManager();
+
+int numOfLedgers = 10;
+int numOfThreadsForSameLedger = 10;
+AtomicInteger createdEntryLogs = new AtomicInteger(0);
+CountDownLatch startLatch = new CountDownLatch(1);
+CountDownLatch createdLatch = new CountDownLatch(numOfLedgers * 
numOfThreadsForSameLedger);
+
+for (long i = 0; i < numOfLedgers; i++) {
+for (int j = 0; j < numOfThreadsForSameLedger; j++) {
+long ledgerId = i;
+new Thread() {
+@Override
+public void run() {
 
 Review comment:
   changed it to lambda


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-18 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189413885
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
 ##
 @@ -142,6 +151,237 @@ public void testCreateNewLogWithNoWritableLedgerDirs() 
throws Exception {
 assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
 }
 
+/*
+ * entryLogPerLedger is enabled and various scenarios of entrylogcreation 
are tested
+ */
+@Test
+public void testEntryLogPerLedgerCreationWithPreAllocation() throws 
Exception {
+/*
+ * I wish I could shorten this testcase or split it into multiple 
testcases,
+ * but I want to cover a scenario and it requires multiple operations 
in
+ * sequence and validations along the way. Please bear with the length 
of this
+ * testcase, I added as many comments as I can to simplify it.
+ */
+
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+
+// Creating a new configuration with a number of ledger directories.
+conf.setLedgerDirNames(ledgerDirs);
+conf.setIsForceGCAllowWhenNoSpace(true);
+// preAllocation is Enabled
+conf.setEntryLogFilePreAllocationEnabled(true);
+conf.setEntryLogPerLedgerEnabled(true);
+LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+EntryLoggerAllocator entryLoggerAllocator = 
entryLogger.entryLoggerAllocator;
+EntryLogManagerForEntryLogPerLedger entryLogManager = 
(EntryLogManagerForEntryLogPerLedger) entryLogger
+.getEntryLogManager();
+
+/*
+ * no entrylog will be created during initialization
+ */
+int expectedPreAllocatedLogID = -1;
+Assert.assertEquals("PreallocatedlogId after initialization of 
Entrylogger",
+expectedPreAllocatedLogID, 
entryLoggerAllocator.getPreallocatedLogId());
+
+int numOfLedgers = 6;
+
+for (long i = 0; i < numOfLedgers; i++) {
+/* since we are starting creation of new ledgers, entrylogid will 
be ledgerid */
+entryLogManager.createNewLog(i);
+}
+
+Thread.sleep(100);
 
 Review comment:
   guava to the rescue! removed these sleeps, for entryloggerallocator's 
allocatorexecutor using newDirectExecutorService()


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-17 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189074159
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -85,6 +83,7 @@
 private final EntryLogMetadata entryLogMetadata;
 private final File logFile;
 private long ledgerIdAssigned = UNASSIGNED_LEDGERID;
+volatile boolean ledgerDirFull = false;
 
 Review comment:
   Considered that as well, having new class BufferedLogChannelWithDirInfo 
extending BufferedLogChannelWithDirInfo in EntryLogManagerForEntryLogPerLedger 
class. But BufferedLogChannel instance is created by EntryLoggerAllocator 
https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java#L162
 , so if I create new BufferedLogChannelWithDirInfo class, then here I should 
have if/else block based on conf value (which I'm not very inclined, but I can 
live with it).
   
   is that ok?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-17 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189074159
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -85,6 +83,7 @@
 private final EntryLogMetadata entryLogMetadata;
 private final File logFile;
 private long ledgerIdAssigned = UNASSIGNED_LEDGERID;
+volatile boolean ledgerDirFull = false;
 
 Review comment:
   Considered that as well, having new class BufferedLogChannelWithDirInfo 
extending BufferedLogChannelWithDirInfo in EntryLogManagerForEntryLogPerLedger 
class. But BufferedLogChannel instance is created by EntryLoggerAllocator 
https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java#L162
 , so if I create new BufferedLogChannelWithDirInfo class, then here I should 
have if/else block based on conf value.
   
   is that ok?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-17 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189063807
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+private final int entrylogMapAccessExpiryTimeInSeconds;
+private final int maximumNumberOfActiveEntryLogs;
+
+EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+EntryLoggerAllocator entryLoggerAllocator, 
List listeners,
+EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+this.rotatedLogChannels = new 
CopyOnWriteArrayList();
+this.replicaOfCurrentLogChannels = new ConcurrentHashMap();
+this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+this.entryLogAndLockTupleCacheLoader = new CacheLoader() {
+@Override
+public EntryLogAndLockTuple load(Long key) throws Exception {
+return new EntryLogAndLockTuple();
+}
+};
+/*
+ * Currently we are relying on access time based eviction 

[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-17 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189063807
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+private final int entrylogMapAccessExpiryTimeInSeconds;
+private final int maximumNumberOfActiveEntryLogs;
+
+EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+EntryLoggerAllocator entryLoggerAllocator, 
List listeners,
+EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+this.rotatedLogChannels = new 
CopyOnWriteArrayList();
+this.replicaOfCurrentLogChannels = new ConcurrentHashMap();
+this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+this.entryLogAndLockTupleCacheLoader = new CacheLoader() {
+@Override
+public EntryLogAndLockTuple load(Long key) throws Exception {
+return new EntryLogAndLockTuple();
+}
+};
+/*
+ * Currently we are relying on access time based eviction 

[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188817538
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+private final int entrylogMapAccessExpiryTimeInSeconds;
+private final int maximumNumberOfActiveEntryLogs;
+
+EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+EntryLoggerAllocator entryLoggerAllocator, 
List listeners,
+EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+this.rotatedLogChannels = new 
CopyOnWriteArrayList();
+this.replicaOfCurrentLogChannels = new ConcurrentHashMap();
+this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+this.entryLogAndLockTupleCacheLoader = new CacheLoader() {
+@Override
+public EntryLogAndLockTuple load(Long key) throws Exception {
+return new EntryLogAndLockTuple();
+}
+};
+/*
+ * Currently we are relying on access time based eviction 

[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188817433
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -85,6 +83,7 @@
 private final EntryLogMetadata entryLogMetadata;
 private final File logFile;
 private long ledgerIdAssigned = UNASSIGNED_LEDGERID;
+volatile boolean ledgerDirFull = false;
 
 Review comment:
   Hey Sijie, I considered about your concern before reintroducing this flag 
variable here. It would unnecessarily complicate the logic of 
LedgerDirsListener for maintaining the ledgerdir full status in the 
entrylogmanager implementation. Thats why I chose to keep it here. 
   
   I don't think it is inappropriate to have it here, since there is final 
'logfile' (File) variable for this BufferedLogChannel here, so I would consider 
'ledgerDirFull' variable as extension of 'logFile' variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188813910
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+private final int entrylogMapAccessExpiryTimeInSeconds;
+private final int maximumNumberOfActiveEntryLogs;
+
+EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+EntryLoggerAllocator entryLoggerAllocator, 
List listeners,
+EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+this.rotatedLogChannels = new 
CopyOnWriteArrayList();
+this.replicaOfCurrentLogChannels = new ConcurrentHashMap();
+this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+this.entryLogAndLockTupleCacheLoader = new CacheLoader() {
+@Override
+public EntryLogAndLockTuple load(Long key) throws Exception {
+return new EntryLogAndLockTuple();
+}
+};
+/*
+ * Currently we are relying on access time based eviction 

[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188813031
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+private final int entrylogMapAccessExpiryTimeInSeconds;
+private final int maximumNumberOfActiveEntryLogs;
+
+EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+EntryLoggerAllocator entryLoggerAllocator, 
List listeners,
+EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+this.rotatedLogChannels = new 
CopyOnWriteArrayList();
+this.replicaOfCurrentLogChannels = new ConcurrentHashMap();
+this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+this.entryLogAndLockTupleCacheLoader = new CacheLoader() {
+@Override
+public EntryLogAndLockTuple load(Long key) throws Exception {
+return new EntryLogAndLockTuple();
+}
+};
+/*
+ * Currently we are relying on access time based eviction 

[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188810829
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+private final int entrylogMapAccessExpiryTimeInSeconds;
+private final int maximumNumberOfActiveEntryLogs;
+
+EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+EntryLoggerAllocator entryLoggerAllocator, 
List listeners,
+EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+this.rotatedLogChannels = new 
CopyOnWriteArrayList();
+this.replicaOfCurrentLogChannels = new ConcurrentHashMap();
+this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+this.entryLogAndLockTupleCacheLoader = new CacheLoader() {
+@Override
+public EntryLogAndLockTuple load(Long key) throws Exception {
+return new EntryLogAndLockTuple();
+}
+};
+/*
+ * Currently we are relying on access time based eviction 

[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188810041
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+private final int entrylogMapAccessExpiryTimeInSeconds;
+private final int maximumNumberOfActiveEntryLogs;
+
+EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+EntryLoggerAllocator entryLoggerAllocator, 
List listeners,
+EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+this.rotatedLogChannels = new 
CopyOnWriteArrayList();
+this.replicaOfCurrentLogChannels = new ConcurrentHashMap();
+this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+this.entryLogAndLockTupleCacheLoader = new CacheLoader() {
+@Override
+public EntryLogAndLockTuple load(Long key) throws Exception {
+return new EntryLogAndLockTuple();
+}
+};
+/*
+ * Currently we are relying on access time based eviction 

[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188809247
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+private final int entrylogMapAccessExpiryTimeInSeconds;
+private final int maximumNumberOfActiveEntryLogs;
+
+EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+EntryLoggerAllocator entryLoggerAllocator, 
List listeners,
+EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+this.rotatedLogChannels = new 
CopyOnWriteArrayList();
+this.replicaOfCurrentLogChannels = new ConcurrentHashMap();
+this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+this.entryLogAndLockTupleCacheLoader = new CacheLoader() {
+@Override
+public EntryLogAndLockTuple load(Long key) throws Exception {
+return new EntryLogAndLockTuple();
+}
+};
+/*
+ * Currently we are relying on access time based eviction 

[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188808410
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
 ##
 @@ -142,6 +151,237 @@ public void testCreateNewLogWithNoWritableLedgerDirs() 
throws Exception {
 assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
 }
 
+/*
+ * entryLogPerLedger is enabled and various scenarios of entrylogcreation 
are tested
+ */
+@Test
+public void testEntryLogPerLedgerCreationWithPreAllocation() throws 
Exception {
+/*
+ * I wish I could shorten this testcase or split it into multiple 
testcases,
+ * but I want to cover a scenario and it requires multiple operations 
in
+ * sequence and validations along the way. Please bear with the length 
of this
+ * testcase, I added as many comments as I can to simplify it.
+ */
+
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+
+// Creating a new configuration with a number of ledger directories.
+conf.setLedgerDirNames(ledgerDirs);
+conf.setIsForceGCAllowWhenNoSpace(true);
+// preAllocation is Enabled
+conf.setEntryLogFilePreAllocationEnabled(true);
+conf.setEntryLogPerLedgerEnabled(true);
+LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+EntryLoggerAllocator entryLoggerAllocator = 
entryLogger.entryLoggerAllocator;
+EntryLogManagerForEntryLogPerLedger entryLogManager = 
(EntryLogManagerForEntryLogPerLedger) entryLogger
+.getEntryLogManager();
+
+/*
+ * no entrylog will be created during initialization
+ */
+int expectedPreAllocatedLogID = -1;
+Assert.assertEquals("PreallocatedlogId after initialization of 
Entrylogger",
+expectedPreAllocatedLogID, 
entryLoggerAllocator.getPreallocatedLogId());
+
+int numOfLedgers = 6;
+
+for (long i = 0; i < numOfLedgers; i++) {
+/* since we are starting creation of new ledgers, entrylogid will 
be ledgerid */
+entryLogManager.createNewLog(i);
+}
+
+Thread.sleep(100);
+/*
+ * preallocation is enabled so though entryLogId starts with 0, 
preallocatedLogId would be equal to numOfLedgers
+ */
+expectedPreAllocatedLogID = numOfLedgers;
+Assert.assertEquals("PreallocatedlogId after creation of logs for 
ledgers", expectedPreAllocatedLogID,
+entryLoggerAllocator.getPreallocatedLogId());
+Assert.assertEquals("Number of current ", numOfLedgers,
+entryLogManager.getCopyOfCurrentLogs().size());
+Assert.assertEquals("Number of LogChannels to flush", 0,
+entryLogManager.getRotatedLogChannels().size());
+
+// create dummy entrylog file with id - (expectedPreAllocatedLogID + 1)
+String logFileName = Long.toHexString(expectedPreAllocatedLogID + 1) + 
".log";
+File dir = ledgerDirsManager.pickRandomWritableDir();
+LOG.info("Picked this directory: " + dir);
+File newLogFile = new File(dir, logFileName);
+newLogFile.createNewFile();
+
+/*
+ * since there is already preexisting entrylog file with id -
+ * (expectedPreAllocatedLogIDDuringInitialization + 1), when new
+ * entrylog is created it should have
+ * (expectedPreAllocatedLogIDDuringInitialization + 2) id
+ */
+long rotatedLedger = 1L;
+entryLogManager.createNewLog(rotatedLedger);
+Thread.sleep(100);
+expectedPreAllocatedLogID = expectedPreAllocatedLogID + 2;
+Assert.assertEquals("PreallocatedlogId ",
+expectedPreAllocatedLogID, 
entryLoggerAllocator.getPreallocatedLogId());
+Assert.assertEquals("Number of current ", numOfLedgers,
+entryLogManager.getCopyOfCurrentLogs().size());
+List rotatedLogChannels = 
entryLogManager.getRotatedLogChannels();
+Assert.assertEquals("Number of LogChannels rotated", 1, 
rotatedLogChannels.size());
+Assert.assertEquals("Rotated logchannel logid", rotatedLedger, 
rotatedLogChannels.iterator().next().getLogId());
+entryLogger.flush();
+/*
+ * when flush is called all the rotatedlogchannels are flushed and
+ * removed from rotatedlogchannels list. But here since entrylogId - 0,
+ * is not yet rotated and flushed yet, getLeastUnflushedLogId will 
still
+ * return 0.
+ */
+rotatedLogChannels = 

[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188808144
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
 
 Review comment:
   yes, it can be


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] reddycharan commented on a change in pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

2018-05-16 Thread GitBox
reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r188808007
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+static class EntryLogAndLockTuple {
+private final Lock ledgerLock;
+private BufferedLogChannel entryLog;
+
+public EntryLogAndLockTuple() {
+ledgerLock = new ReentrantLock();
+}
+
+public Lock getLedgerLock() {
+return ledgerLock;
+}
+
+public BufferedLogChannel getEntryLog() {
+return entryLog;
+}
+
+public void setEntryLog(BufferedLogChannel entryLog) {
+this.entryLog = entryLog;
+}
+}
+
+private LoadingCache ledgerIdEntryLogMap;
+/*
+ * every time active logChannel is accessed from ledgerIdEntryLogMap
+ * cache, the accesstime of that entry is updated. But for certain
+ * operations we dont want to impact accessTime of the entries (like
+ * periodic flush of current active logChannels), and those operations
+ * can use this copy of references.
+ */
+private final ConcurrentHashMap 
replicaOfCurrentLogChannels;
+private final CacheLoader 
entryLogAndLockTupleCacheLoader;
+private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+private final int entrylogMapAccessExpiryTimeInSeconds;
+private final int maximumNumberOfActiveEntryLogs;
+
+EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+EntryLoggerAllocator entryLoggerAllocator, 
List listeners,
+EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+this.rotatedLogChannels = new 
CopyOnWriteArrayList();
+this.replicaOfCurrentLogChannels = new ConcurrentHashMap();
+this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+this.entryLogAndLockTupleCacheLoader = new CacheLoader() {
+@Override
+public EntryLogAndLockTuple load(Long key) throws Exception {
+return new EntryLogAndLockTuple();
+}
+};
+/*
+ * Currently we are relying on access time based eviction