[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171949745
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelBackingCache.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileChannelBackingCache used to cache RefCntFileChannels for read.
+ * In order to avoid get released file, adopt design of FileInfoBackingCache.
+ * @see FileInfoBackingCache
+ */
+class FileChannelBackingCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(FileChannelBackingCache.class);
+static final int DEAD_REF = -0xdead;
+final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+final FileLoader fileLoader;
+
+FileChannelBackingCache(FileLoader fileLoader) {
+this.fileLoader = fileLoader;
+}
+
+final ConcurrentHashMap fileChannels = new 
ConcurrentHashMap<>();
+
+CachedFileChannel loadFileChannel(long logId) throws IOException {
+lock.readLock().lock();
+try {
+CachedFileChannel cachedFileChannel = fileChannels.get(logId);
+if (cachedFileChannel != null) {
+boolean retained = cachedFileChannel.tryRetain();
+checkArgument(retained);
+return cachedFileChannel;
+}
+} finally {
+lock.readLock().unlock();
+}
+
+lock.writeLock().lock();
+try {
+File file = fileLoader.load(logId);
+// get channel is used to open an existing entry log file
+// it would be better to open using read mode
+FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
+CachedFileChannel cachedFileChannel = new CachedFileChannel(logId, 
newFc);
+fileChannels.put(logId, cachedFileChannel);
 
 Review comment:
   you need to check the return value. there can be race condition that a file 
channel is added after get.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171948589
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelBackingCache.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileChannelBackingCache used to cache RefCntFileChannels for read.
+ * In order to avoid get released file, adopt design of FileInfoBackingCache.
+ * @see FileInfoBackingCache
+ */
+class FileChannelBackingCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(FileChannelBackingCache.class);
+static final int DEAD_REF = -0xdead;
+final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+final FileLoader fileLoader;
+
+FileChannelBackingCache(FileLoader fileLoader) {
+this.fileLoader = fileLoader;
+}
+
+final ConcurrentHashMap fileChannels = new 
ConcurrentHashMap<>();
+
+CachedFileChannel loadFileChannel(long logId) throws IOException {
+lock.readLock().lock();
+try {
+CachedFileChannel cachedFileChannel = fileChannels.get(logId);
+if (cachedFileChannel != null) {
+boolean retained = cachedFileChannel.tryRetain();
+checkArgument(retained);
+return cachedFileChannel;
+}
+} finally {
+lock.readLock().unlock();
+}
+
+lock.writeLock().lock();
+try {
+File file = fileLoader.load(logId);
 
 Review comment:
   why do we need to lock while loading a file?
   
   don't we just need to lock when putting the channel back?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171945675
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -1094,25 +1104,40 @@ private Header getHeaderForLogId(long entryLogId) 
throws IOException {
 }
 }
 
-private BufferedReadChannel getChannelForLogId(long entryLogId) throws 
IOException {
-BufferedReadChannel fc = getFromChannels(entryLogId);
-if (fc != null) {
-return fc;
-}
-File file = findFile(entryLogId);
-// get channel is used to open an existing entry log file
-// it would be better to open using read mode
-FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
-FileChannel oldFc = logid2FileChannel.putIfAbsent(entryLogId, newFc);
-if (null != oldFc) {
-newFc.close();
-newFc = oldFc;
-}
-// We set the position of the write buffer of this buffered channel to 
Long.MAX_VALUE
-// so that there are no overlaps with the write buffer while reading
-fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
-putInReadChannels(entryLogId, fc);
-return fc;
+/**
+ * Add one refCnt for BufferedReadChannel and return it, caller need to 
subtract one refCnt.
+ * @param entryLogId
+ * @return
+ * @throws IOException
+ */
+private EntryLogBufferedReadChannel getChannelForLogId(long entryLogId) 
throws IOException {
+try {
+EntryLogBufferedReadChannel brc;
+Callable loader = () -> {
+CachedFileChannel cachedFileChannel = 
fileChannelBackingCache.loadFileChannel(entryLogId);
+// We set the position of the write buffer of this buffered 
channel to Long.MAX_VALUE
+// so that there are no overlaps with the write buffer while 
reading
+return new EntryLogBufferedReadChannel(cachedFileChannel, 
conf.getReadBufferBytes());
+};
+do {
+// Put the logId, bc pair in the cache responsible for the 
current thread.
+brc = logid2ReadChannel.get().get(entryLogId, loader);
+if (!brc.cachedFileChannel.tryRetain()) {
+if (logid2ReadChannel.get().asMap().remove(entryLogId, 
brc)){
+LOG.error("Dead fileChannel({}) forced out of cache."
++ "It must have been double-released somewhere.", 
brc.cachedFileChannel);
+}
+brc = null;
 
 Review comment:
   need 4 more spaces indent. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171946538
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelBackingCache.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileChannelBackingCache used to cache RefCntFileChannels for read.
+ * In order to avoid get released file, adopt design of FileInfoBackingCache.
+ * @see FileInfoBackingCache
+ */
+class FileChannelBackingCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(FileChannelBackingCache.class);
+static final int DEAD_REF = -0xdead;
+final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+final FileLoader fileLoader;
+
+FileChannelBackingCache(FileLoader fileLoader) {
+this.fileLoader = fileLoader;
+}
+
+final ConcurrentHashMap fileChannels = new 
ConcurrentHashMap<>();
+
+CachedFileChannel loadFileChannel(long logId) throws IOException {
+lock.readLock().lock();
+try {
+CachedFileChannel cachedFileChannel = fileChannels.get(logId);
+if (cachedFileChannel != null) {
+boolean retained = cachedFileChannel.tryRetain();
+checkArgument(retained);
+return cachedFileChannel;
+}
+} finally {
+lock.readLock().unlock();
+}
+
+lock.writeLock().lock();
+try {
+File file = fileLoader.load(logId);
+// get channel is used to open an existing entry log file
+// it would be better to open using read mode
+FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
+CachedFileChannel cachedFileChannel = new CachedFileChannel(logId, 
newFc);
+fileChannels.put(logId, cachedFileChannel);
+boolean retained = cachedFileChannel.tryRetain();
+checkArgument(retained);
+return cachedFileChannel;
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * close FileChannel and remove from cache when possible.
+ * @param logId
+ * @param cachedFileChannel
+ */
+private void releaseFileChannel(long logId, CachedFileChannel 
cachedFileChannel) {
+lock.writeLock().lock();
+try {
+if (cachedFileChannel.markDead()) {
+try {
+cachedFileChannel.fileChannel.close();
+} catch (IOException e) {
+LOG.warn("Exception occurred in 
ReferenceCountedFileChannel"
++ " while closing channel for log file: {}", 
cachedFileChannel);
+} finally {
+IOUtils.close(LOG, cachedFileChannel.fileChannel);
+}
+// to guarantee the removed cachedFileChannel is what we want 
to remove.
+fileChannels.remove(logId, cachedFileChannel);
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+@VisibleForTesting
+CachedFileChannel get(Long logId) {
+lock.readLock().lock();
+try {
+return fileChannels.get(logId);
+} finally {
+lock.readLock().unlock();
+}
+}
+
+class CachedFileChannel {
 
 Review comment:
   can be static


This is an automated message from the Apache Git Service.
To respond to the 

[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171945469
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -1094,25 +1104,40 @@ private Header getHeaderForLogId(long entryLogId) 
throws IOException {
 }
 }
 
-private BufferedReadChannel getChannelForLogId(long entryLogId) throws 
IOException {
-BufferedReadChannel fc = getFromChannels(entryLogId);
-if (fc != null) {
-return fc;
-}
-File file = findFile(entryLogId);
-// get channel is used to open an existing entry log file
-// it would be better to open using read mode
-FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
-FileChannel oldFc = logid2FileChannel.putIfAbsent(entryLogId, newFc);
-if (null != oldFc) {
-newFc.close();
-newFc = oldFc;
-}
-// We set the position of the write buffer of this buffered channel to 
Long.MAX_VALUE
-// so that there are no overlaps with the write buffer while reading
-fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
-putInReadChannels(entryLogId, fc);
-return fc;
+/**
+ * Add one refCnt for BufferedReadChannel and return it, caller need to 
subtract one refCnt.
+ * @param entryLogId
+ * @return
+ * @throws IOException
+ */
+private EntryLogBufferedReadChannel getChannelForLogId(long entryLogId) 
throws IOException {
+try {
+EntryLogBufferedReadChannel brc;
+Callable loader = () -> {
 
 Review comment:
   why do we create loader everytime, this is generating a lot of garbages on 
jvm? I remember the first time I reviewed this, I don't think see loader here...
   
   why can't we use LoadingCache instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171944363
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -999,51 +1015,47 @@ public ByteBuf internalReadEntry(long ledgerId, long 
entryId, long location)
 ByteBuf sizeBuff = sizeBuffer.get();
 sizeBuff.clear();
 pos -= 4; // we want to get the ledgerId and length to check
-BufferedReadChannel fc;
-try {
-fc = getChannelForLogId(entryLogId);
+try (EntryLogBufferedReadChannel bc = getChannelForLogId(entryLogId)){
+if (readFromLogChannel(entryLogId, bc, sizeBuff, pos) != 
sizeBuff.capacity()) {
+throw new Bookie.NoEntryException("Short read from entrylog " 
+ entryLogId,
+ledgerId, entryId);
+}
+pos += 4;
+int entrySize = sizeBuff.readInt();
+
+// entrySize does not include the ledgerId
+if (entrySize > maxSaneEntrySize) {
+LOG.warn("Sanity check failed for entry size of " + entrySize 
+ " at location " + pos + " in "
 
 Review comment:
   nit:
   
   it can be written with '{}'


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171943920
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -326,59 +353,45 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
 return channel.read(buff, pos);
 }
 
+final ConcurrentLinkedQueue> 
readChannelCaches =
+new ConcurrentLinkedQueue<>();
+
 /**
  * A thread-local variable that wraps a mapping of log ids to 
bufferedchannels
  * These channels should be used only for reading. logChannel is the one
  * that is used for writes.
+ * We use this Guava cache to store the BufferedReadChannel.
+ * When the BufferedReadChannel is removed, the underlying fileChannel's 
refCnt decrease 1,
+ * temporally use 1h to relax replace after reading.
  */
-private final ThreadLocal> logid2Channel =
-new ThreadLocal>() {
+final ThreadLocal> 
logid2ReadChannel =
+new ThreadLocal>() {
 @Override
-public Map initialValue() {
+public Cache initialValue() {
 // Since this is thread local there only one modifier
 // We dont really need the concurrency, but we need to use
 // the weak values. Therefore using the concurrency level of 1
-return new MapMaker().concurrencyLevel(1)
-.weakValues()
-.makeMap();
+Cache cache =
+CacheBuilder.newBuilder().concurrencyLevel(1)
+.expireAfterAccess(readChannelCacheExpireTimeMs, 
TimeUnit.MILLISECONDS)
+//decrease the refCnt
+.removalListener(( RemovalListener) removal
 
 Review comment:
   nit:
   
   it is a bit hard to read. can you write it in a better format? also remove 
space before `RemovalListener`.
   
   ```
   .removal((RemovalListener) notification ->
   removal.getValue().release()
   )
   .ticker(getTicker())
   .build();


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171949224
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelBackingCache.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileChannelBackingCache used to cache RefCntFileChannels for read.
+ * In order to avoid get released file, adopt design of FileInfoBackingCache.
+ * @see FileInfoBackingCache
+ */
+class FileChannelBackingCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(FileChannelBackingCache.class);
+static final int DEAD_REF = -0xdead;
+final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+final FileLoader fileLoader;
+
+FileChannelBackingCache(FileLoader fileLoader) {
+this.fileLoader = fileLoader;
+}
+
+final ConcurrentHashMap fileChannels = new 
ConcurrentHashMap<>();
+
+CachedFileChannel loadFileChannel(long logId) throws IOException {
+lock.readLock().lock();
+try {
+CachedFileChannel cachedFileChannel = fileChannels.get(logId);
+if (cachedFileChannel != null) {
+boolean retained = cachedFileChannel.tryRetain();
+checkArgument(retained);
+return cachedFileChannel;
+}
+} finally {
+lock.readLock().unlock();
+}
+
+lock.writeLock().lock();
+try {
+File file = fileLoader.load(logId);
 
 Review comment:
   If this is borrowed from FileInfoBackingCache, same question is applied 
there. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171945877
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -1094,25 +1104,40 @@ private Header getHeaderForLogId(long entryLogId) 
throws IOException {
 }
 }
 
-private BufferedReadChannel getChannelForLogId(long entryLogId) throws 
IOException {
-BufferedReadChannel fc = getFromChannels(entryLogId);
-if (fc != null) {
-return fc;
-}
-File file = findFile(entryLogId);
-// get channel is used to open an existing entry log file
-// it would be better to open using read mode
-FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
-FileChannel oldFc = logid2FileChannel.putIfAbsent(entryLogId, newFc);
-if (null != oldFc) {
-newFc.close();
-newFc = oldFc;
-}
-// We set the position of the write buffer of this buffered channel to 
Long.MAX_VALUE
-// so that there are no overlaps with the write buffer while reading
-fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
-putInReadChannels(entryLogId, fc);
-return fc;
+/**
+ * Add one refCnt for BufferedReadChannel and return it, caller need to 
subtract one refCnt.
+ * @param entryLogId
+ * @return
+ * @throws IOException
+ */
+private EntryLogBufferedReadChannel getChannelForLogId(long entryLogId) 
throws IOException {
+try {
+EntryLogBufferedReadChannel brc;
+Callable loader = () -> {
+CachedFileChannel cachedFileChannel = 
fileChannelBackingCache.loadFileChannel(entryLogId);
+// We set the position of the write buffer of this buffered 
channel to Long.MAX_VALUE
+// so that there are no overlaps with the write buffer while 
reading
+return new EntryLogBufferedReadChannel(cachedFileChannel, 
conf.getReadBufferBytes());
+};
+do {
+// Put the logId, bc pair in the cache responsible for the 
current thread.
+brc = logid2ReadChannel.get().get(entryLogId, loader);
+if (!brc.cachedFileChannel.tryRetain()) {
+if (logid2ReadChannel.get().asMap().remove(entryLogId, 
brc)){
+LOG.error("Dead fileChannel({}) forced out of cache."
 
 Review comment:
   wrong indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171943988
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -326,59 +353,45 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
 return channel.read(buff, pos);
 }
 
+final ConcurrentLinkedQueue> 
readChannelCaches =
+new ConcurrentLinkedQueue<>();
+
 /**
  * A thread-local variable that wraps a mapping of log ids to 
bufferedchannels
  * These channels should be used only for reading. logChannel is the one
  * that is used for writes.
+ * We use this Guava cache to store the BufferedReadChannel.
+ * When the BufferedReadChannel is removed, the underlying fileChannel's 
refCnt decrease 1,
+ * temporally use 1h to relax replace after reading.
  */
-private final ThreadLocal> logid2Channel =
-new ThreadLocal>() {
+final ThreadLocal> 
logid2ReadChannel =
+new ThreadLocal>() {
 @Override
-public Map initialValue() {
+public Cache initialValue() {
 // Since this is thread local there only one modifier
 // We dont really need the concurrency, but we need to use
 // the weak values. Therefore using the concurrency level of 1
-return new MapMaker().concurrencyLevel(1)
-.weakValues()
-.makeMap();
+Cache cache =
+CacheBuilder.newBuilder().concurrencyLevel(1)
+.expireAfterAccess(readChannelCacheExpireTimeMs, 
TimeUnit.MILLISECONDS)
+//decrease the refCnt
+.removalListener(( RemovalListener) removal
+-> 
removal.getValue().release()).ticker(getTicker()).build();
+readChannelCaches.add(cache);
+return cache;
 }
 };
 
-/**
- * Each thread local buffered read channel can share the same file handle 
because reads are not relative
- * and don't cause a change in the channel's position. We use this map to 
store the file channels. Each
- * file channel is mapped to a log id which represents an open log file.
- */
-private final ConcurrentMap logid2FileChannel = new 
ConcurrentHashMap();
-
-/**
- * Put the logId, bc pair in the map responsible for the current thread.
- * @param logId
- * @param bc
- */
-public BufferedReadChannel putInReadChannels(long logId, 
BufferedReadChannel bc) {
-Map threadMap = logid2Channel.get();
-return threadMap.put(logId, bc);
+Ticker getTicker() {
+return Ticker.systemTicker();
 }
 
 /**
- * Remove all entries for this log file in each thread's cache.
- * @param logId
+ * Each thread local buffered read channel can share the same file handle 
because reads are not relative
+ * and don't cause a change in the channel's position.
+ * Each file channel is mapped to a log id which represents an open log 
file.
  */
-public void removeFromChannelsAndClose(long logId) {
-FileChannel fileChannel = logid2FileChannel.remove(logId);
-if (null != fileChannel) {
-try {
-fileChannel.close();
-} catch (IOException e) {
-LOG.warn("Exception while closing channel for log file:" + 
logId);
-}
-}
-}
-
-public BufferedReadChannel getFromChannels(long logId) {
-return logid2Channel.get().get(logId);
-}
+FileChannelBackingCache fileChannelBackingCache = new 
FileChannelBackingCache(this::findFile);
 
 Review comment:
   nit: make it final


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-03-01 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r171654510
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -330,54 +350,47 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
  * A thread-local variable that wraps a mapping of log ids to 
bufferedchannels
  * These channels should be used only for reading. logChannel is the one
  * that is used for writes.
+ * We use this Guava cache to store the BufferedReadChannel.
+ * When the BufferedReadChannel is removed, the underlying fileChannel's 
refCnt decrease 1,
+ * temporally use 1h to relax replace after reading.
  */
-private final ThreadLocal> logid2Channel =
-new ThreadLocal>() {
+private final ThreadLocal> 
logid2ReadChannel =
+new ThreadLocal>() {
 @Override
-public Map initialValue() {
+public Cache initialValue() {
 // Since this is thread local there only one modifier
 // We dont really need the concurrency, but we need to use
 // the weak values. Therefore using the concurrency level of 1
-return new MapMaker().concurrencyLevel(1)
-.weakValues()
-.makeMap();
+return CacheBuilder.newBuilder().concurrencyLevel(1)
+.expireAfterAccess(readChannelCacheExpireTimeMs, 
TimeUnit.MILLISECONDS)
+//decrease the refCnt
+.removalListener(removal -> ((EntryLogBufferedReadChannel) 
removal.getValue()).release())
 
 Review comment:
   @ArvinDevel 
   
   a better way here is not to cast the object itself, but assigning the type 
for the lambda closure.
   
   ```
   CacheBuilder.newBuilder()
   .removalListener((RemovalListener) notification -> notification.getValue.release)
   .build();
   ```
   
   @ivankelly because it is not pure RemovalListener. it is 
RemovalListener, you need type to do the inference. 
I believe oracle jdk and open jdk have different ways on interpreting this, 
even different java versions as well. so a safer approach is to have the 
interface type for lambda function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-01-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r159347662
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -330,54 +334,111 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
  * A thread-local variable that wraps a mapping of log ids to 
bufferedchannels
  * These channels should be used only for reading. logChannel is the one
  * that is used for writes.
+ * We use this Guava cache to store the BufferedReadChannel.
+ * When the BufferedReadChannel is removed, the underlying fileChannel's 
refCnt decrease 1,
+ * temporally use 1h to relax replace after reading.
  */
-private final ThreadLocal> logid2Channel =
-new ThreadLocal>() {
+private final ThreadLocal> logid2Channel =
+new ThreadLocal>() {
 @Override
-public Map initialValue() {
+public Cache initialValue() {
 // Since this is thread local there only one modifier
 // We dont really need the concurrency, but we need to use
 // the weak values. Therefore using the concurrency level of 1
-return new MapMaker().concurrencyLevel(1)
-.weakValues()
-.makeMap();
+return CacheBuilder.newBuilder().concurrencyLevel(1)
+.expireAfterAccess(expireReadChannelCache, 
TimeUnit.MILLISECONDS)
+//decrease the refCnt
+.removalListener(removal -> 
logid2FileChannel.get(removal.getKey()).release())
+.build(readChannelLoader);
 }
 };
 
+// only used for test.
+ThreadLocal> getLogid2Channel() {
 
 Review comment:
   accessing internal state is okay if you are just testing the specific test 
case. I don't see a reason we will change it in future. 
   
   @ArvinDevel can you add `@VisibleForTesting`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2018-01-02 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r159347434
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -315,6 +316,104 @@ public void testRecoverFromLedgersMapOnV0EntryLog() 
throws Exception {
 assertEquals(120, meta.getRemainingSize());
 }
 
+/**
+ * Test Cache for logid2Channel and concurrentMap for logid2FileChannel 
work correctly.
+ * Note that, when an entryLogger is initialized, the entry log id will 
increase one.
+ * when the preallocation is enabled, a new entrylogger will cost 2 logId.
+ */
+@Test
+public void testCacheInEntryLog() throws Exception {
+File tmpDir = createTempDir("bkTest", ".dir");
+File curDir = Bookie.getCurrentDirectory(tmpDir);
+Bookie.checkDirectoryStructure(curDir);
+
+int gcWaitTime = 1000;
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setGcWaitTime(gcWaitTime);
+conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+//since last access, expire after 1s
+conf.setExpireReadChannelCache(1000);
+conf.setEntryLogFilePreAllocationEnabled(false);
+// below one will cost logId 0
+Bookie bookie = new Bookie(conf);
+// create some entries
+int numLogs = 4;
+int numEntries = 10;
+long[][] positions = new long[numLogs][];
+for (int i = 0; i < numLogs; i++) {
+positions[i] = new long[numEntries];
+EntryLogger logger = new EntryLogger(conf,
+bookie.getLedgerDirsManager());
+for (int j = 0; j < numEntries; j++) {
+positions[i][j] = logger.addEntry(i, generateEntry(i, 
j).nioBuffer());
+}
+logger.flush();
+LOG.info("log id is {}, LeastUnflushedLogId is {} ", 
logger.getCurrentLogId(),
+logger.getLeastUnflushedLogId());
+}
+
+for (int i = 1; i < numLogs + 1; i++) {
+File logFile = new File(curDir, Long.toHexString(i) + ".log");
+assertTrue(logFile.exists());
+}
+
+// create some read for the entry log
+EntryLogger logger = ((InterleavedLedgerStorage) 
bookie.ledgerStorage).entryLogger;
+ThreadLocal>  cacheThreadLocal = 
logger.getLogid2Channel();
+ConcurrentMap 
logid2FileChannel = logger.getLogid2FileChannel();
+for (int j = 0; j < numEntries; j++) {
+logger.readEntry(0, j, positions[0][j]);
+}
+LOG.info("cache size is {}, content is {}", 
cacheThreadLocal.get().size(),
+cacheThreadLocal.get().asMap().toString());
+// the cache has readChannel for 1.log
+assertNotNull(cacheThreadLocal.get().getIfPresent(1L));
+for (int j = 0; j < numEntries; j++) {
+logger.readEntry(1, j, positions[1][j]);
+}
+LOG.info("cache size is {}, content is {}", 
cacheThreadLocal.get().size(),
+cacheThreadLocal.get().asMap().toString());
+// the cache has readChannel for 2.log
+assertNotNull(cacheThreadLocal.get().getIfPresent(2L));
+// expire time
+Thread.sleep(1000);
 
 Review comment:
   @ArvinDevel : guava cache has a mock ticker that allows you to advance time 
to trigger expriation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-26 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r158774969
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -315,6 +314,93 @@ public void testRecoverFromLedgersMapOnV0EntryLog() 
throws Exception {
 assertEquals(120, meta.getRemainingSize());
 }
 
+/**
+ * Test Cache for logid2Channel and concurrentMap for logid2FileChannel 
work correctly.
+ * Note that, when an entryLogger is initialized, the entry log id will 
increase one.
+ * when the preallocation is enabled, a new entrylogger will cost 2 logId.
+ */
+@Test
+public void testCacheInEntryLog() throws Exception {
+File tmpDir = createTempDir("bkTest", ".dir");
+File curDir = Bookie.getCurrentDirectory(tmpDir);
+Bookie.checkDirectoryStructure(curDir);
+
+int gcWaitTime = 1000;
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setGcWaitTime(gcWaitTime);
+conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+//since last access, expire after 1s
+conf.setExpireReadChannelCache(1000);
+conf.setEntryLogFilePreAllocationEnabled(false);
+// below one will cost logId 0
+Bookie bookie = new Bookie(conf);
+// create some entries
+int numLogs = 4;
+int numEntries = 10;
+long[][] positions = new long[numLogs][];
+for (int i = 0; i < numLogs; i++) {
+positions[i] = new long[numEntries];
+EntryLogger logger = new EntryLogger(conf,
+bookie.getLedgerDirsManager());
+for (int j = 0; j < numEntries; j++) {
+positions[i][j] = logger.addEntry(i, generateEntry(i, 
j).nioBuffer());
+}
+logger.flush();
+LOG.info("log id is {}, LeastUnflushedLogId is {} ", 
logger.getCurrentLogId(),
+logger.getLeastUnflushedLogId());
+}
+
+for (int i = 1; i < numLogs + 1; i++) {
+File logFile = new File(curDir, Long.toHexString(i) + ".log");
+assertTrue(logFile.exists());
+}
+
+// create some read for the entry log
+EntryLogger logger = ((InterleavedLedgerStorage) 
bookie.ledgerStorage).entryLogger;
+ThreadLocal>  cacheThreadLocal = 
logger.getLogid2Channel();
+ConcurrentMap 
logid2FileChannel = logger.getLogid2FileChannel();
+for (int j = 0; j < numEntries; j++) {
+logger.readEntry(0, j, positions[0][j]);
+}
+LOG.info("cache size is {}, content is {}", 
cacheThreadLocal.get().size(),
+cacheThreadLocal.get().asMap().toString());
+// the cache has readChannel for 1.log
+assertNotNull(cacheThreadLocal.get().getIfPresent(1L));
+for (int j = 0; j < numEntries; j++) {
+logger.readEntry(1, j, positions[1][j]);
+}
+LOG.info("cache size is {}, content is {}", 
cacheThreadLocal.get().size(),
+cacheThreadLocal.get().asMap().toString());
+// the cache has readChannel for 2.log
+assertNotNull(cacheThreadLocal.get().getIfPresent(2L));
+// expire time
+Thread.sleep(1000);
+// read to new entry log, the old values in logid2Channel should has 
been invalidated
+for (int j = 0; j < numEntries; j++) {
+logger.readEntry(2, j, positions[2][j]);
+}
+for (int j = 0; j < numEntries; j++) {
+logger.readEntry(3, j, positions[3][j]);
+}
+LOG.info("cache size is {}, content is {}", 
cacheThreadLocal.get().size(),
+cacheThreadLocal.get().asMap().toString());
+// the cache has readChannel for 3.log
+assertNotNull(cacheThreadLocal.get().getIfPresent(3L));
+// the cache has readChannel for 4.log
+assertNotNull(cacheThreadLocal.get().getIfPresent(4L));
+// the cache hasn't readChannel for 1.log
+assertNull(cacheThreadLocal.get().getIfPresent(1L));
+// the cache hasn't readChannel for 2.log
+assertNull(cacheThreadLocal.get().getIfPresent(2L));
+// the corresponding file channel should be closed
+LOG.info("map content is {}", logid2FileChannel.toString());
+assertEquals(0, logid2FileChannel.get(1L).refCnt());
+assertEquals(0, logid2FileChannel.get(2L).refCnt());
+assertEquals(1, logid2FileChannel.get(3L).refCnt());
+assertEquals(1, logid2FileChannel.get(4L).refCnt());
+//assertNull(logid2FileChannel.get(2L).getFc());
 
 Review comment:
   remove this line if it is not needed


This is an automated message from the Apache 

[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-26 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r158775123
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 ##
 @@ -239,6 +240,27 @@ public ServerConfiguration 
setEntryLogFilePreAllocationEnabled(boolean enabled)
 return this;
 }
 
+/**
+ * get ReadChannelCache expire time.
+ *
+ * @return server configuration object.
+ */
+public long getExpireReadChannelCache() {
+return this.getLong(EXPIRE_READ_CHANNEL_CACHE, 360);
+}
+
+/**
+ * set ReadChannelCache expire time. Default value is 1h.
+ *
+ * @param millis
+ *  expire time.
+ * @return server configuration object.
+ */
+public ServerConfiguration setExpireReadChannelCache(long millis) {
 
 Review comment:
   it is better to call out "time" and "ms" in the setting, like 
READ_CHANNEL_CACHE_EXPIRE_TIME_MS 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-13 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r156776410
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -327,54 +333,100 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
  * A thread-local variable that wraps a mapping of log ids to 
bufferedchannels
  * These channels should be used only for reading. logChannel is the one
  * that is used for writes.
+ * We use this Guava cache to store the BufferedReadChannel.
+ * When the BufferedReadChannel is removed, the underlying fileChannel's 
refCnt decrease 1,
+ * temporally use 1h to relax replace after reading.
  */
-private final ThreadLocal> logid2Channel =
-new ThreadLocal>() {
+private final ThreadLocal> logid2Channel =
+new ThreadLocal>() {
 @Override
-public Map initialValue() {
+public Cache initialValue() {
 // Since this is thread local there only one modifier
 // We dont really need the concurrency, but we need to use
 // the weak values. Therefore using the concurrency level of 1
-return new MapMaker().concurrencyLevel(1)
-.weakValues()
-.makeMap();
+return CacheBuilder.newBuilder().concurrencyLevel(1)
+.expireAfterAccess(expireReadChannelCacheInHour, 
TimeUnit.HOURS)
+//decrease the refCnt
+.removalListener(removal -> 
logid2FileChannel.get(removal.getKey()).release())
+.build(readChannelLoader);
 }
 };
 
+private final  CacheLoader readChannelLoader =
+new CacheLoader () {
+public BufferedReadChannel load(Long entryLogId) throws Exception {
+
+return getChannelForLogId(entryLogId);
+
+}
+};
+
+
+private static class ReferenceCountedFileChannel extends 
AbstractReferenceCounted {
+private final FileChannel fc;
+
+public ReferenceCountedFileChannel(FileChannel fileChannel) {
+this.fc = fileChannel;
+}
+
+
+@Override
+public ReferenceCounted touch(Object hint) {
+return this;
+}
+
+// when the refCnt decreased to
+@Override
+protected void deallocate() {
+try {
+fc.close();
+} catch (IOException e) {
+LOG.warn("Exception occurred in ReferenceCountedFileChannel"
++ " while closing channel for log file: {}", fc);
+} finally {
+IOUtils.close(LOG, fc);
+}
+}
+
+}
+
+
 /**
  * Each thread local buffered read channel can share the same file handle 
because reads are not relative
- * and don't cause a change in the channel's position. We use this map to 
store the file channels. Each
- * file channel is mapped to a log id which represents an open log file.
+ * and don't cause a change in the channel's position.
+ * Each file channel is mapped to a log id which represents an open log 
file.
+ * when ReferenceCountedFileChannel's refCnt decrease to 0, close the 
fileChannel.
  */
-private final ConcurrentMap logid2FileChannel = new 
ConcurrentHashMap();
+private ConcurrentMap
+logid2FileChannel = new ConcurrentHashMap<>();
+
 
 /**
  * Put the logId, bc pair in the map responsible for the current thread.
  * @param logId
  * @param bc
  */
-public BufferedReadChannel putInReadChannels(long logId, 
BufferedReadChannel bc) {
-Map threadMap = logid2Channel.get();
-return threadMap.put(logId, bc);
+public void putInReadChannels(long logId, BufferedReadChannel bc) {
+Cache threadCahe = logid2Channel.get();
+threadCahe.put(logId, bc);
 }
 
 /**
  * Remove all entries for this log file in each thread's cache.
  * @param logId
  */
 public void removeFromChannelsAndClose(long logId) {
-FileChannel fileChannel = logid2FileChannel.remove(logId);
-if (null != fileChannel) {
-try {
-fileChannel.close();
-} catch (IOException e) {
-LOG.warn("Exception while closing channel for log file:" + 
logId);
-}
-}
+//remove the fileChannel from logId2Channel
 
 Review comment:
   I don't think you need to invalidate 

[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-13 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r156773781
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -70,6 +72,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 
 Review comment:
   remove these 3 blank lines


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-13 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r156776499
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -1078,25 +1130,26 @@ private Header getHeaderForLogId(long entryLogId) 
throws IOException {
 }
 }
 
+
 
 Review comment:
   remove blank line


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-13 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r156774512
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -327,54 +333,100 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
  * A thread-local variable that wraps a mapping of log ids to 
bufferedchannels
  * These channels should be used only for reading. logChannel is the one
  * that is used for writes.
+ * We use this Guava cache to store the BufferedReadChannel.
+ * When the BufferedReadChannel is removed, the underlying fileChannel's 
refCnt decrease 1,
+ * temporally use 1h to relax replace after reading.
  */
-private final ThreadLocal> logid2Channel =
-new ThreadLocal>() {
+private final ThreadLocal> logid2Channel =
+new ThreadLocal>() {
 @Override
-public Map initialValue() {
+public Cache initialValue() {
 // Since this is thread local there only one modifier
 // We dont really need the concurrency, but we need to use
 // the weak values. Therefore using the concurrency level of 1
-return new MapMaker().concurrencyLevel(1)
-.weakValues()
-.makeMap();
+return CacheBuilder.newBuilder().concurrencyLevel(1)
+.expireAfterAccess(expireReadChannelCacheInHour, 
TimeUnit.HOURS)
+//decrease the refCnt
+.removalListener(removal -> 
logid2FileChannel.get(removal.getKey()).release())
 
 Review comment:
   it might be good to check if the returned value is null or not.
   
   ```
   ReferenceCountUtils.release(logid2FileChannel.get(removal.getKey());
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-13 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r156776713
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -1078,25 +1130,26 @@ private Header getHeaderForLogId(long entryLogId) 
throws IOException {
 }
 }
 
+
 private BufferedReadChannel getChannelForLogId(long entryLogId) throws 
IOException {
-BufferedReadChannel fc = getFromChannels(entryLogId);
-if (fc != null) {
-return fc;
+BufferedReadChannel brc = getFromChannels(entryLogId);
+if (brc != null) {
+// increment the refCnt
+logid2FileChannel.get(entryLogId).retain();
+return brc;
 }
+
 File file = findFile(entryLogId);
 // get channel is used to open an existing entry log file
 // it would be better to open using read mode
-FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
-FileChannel oldFc = logid2FileChannel.putIfAbsent(entryLogId, newFc);
-if (null != oldFc) {
-newFc.close();
-newFc = oldFc;
-}
+FileChannel fc = new RandomAccessFile(file, "r").getChannel();
+logid2FileChannel.put(entryLogId, new ReferenceCountedFileChannel(fc));
 
 Review comment:
   the logic should be similar as before. you need to use `putIfAbsent` for 
concurrent operations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-12 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r156466600
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -341,12 +346,43 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
 }
 };
 
+private final  CacheLoader loader = new 
CacheLoader () {
+public FileChannel load(Long entryLogId) throws Exception {
 
 Review comment:
   I am not sure if you can use lambda for `CacheLoader` here, it is not an 
interface. CacheLoader is an abstract class


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-12 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r156467340
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -341,12 +346,43 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
 }
 };
 
+private final  CacheLoader loader = new 
CacheLoader () {
+public FileChannel load(Long entryLogId) throws Exception {
+
+File file = findFile(entryLogId);
+// get channel is used to open an existing entry log file
+// it would be better to open using read mode
+return new RandomAccessFile(file, "r").getChannel();
+}
+};
+// close the file channel, when it was removed from cache
+private final RemovalListener removalListener = new 
RemovalListener() {
 
 Review comment:
   rename this to 'readonlyFileChannelRemovalListener'


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-12 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r156466917
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -341,12 +346,43 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
 }
 };
 
+private final  CacheLoader loader = new 
CacheLoader () {
 
 Review comment:
   I would suggest renaming this to a more meaningful name, like 
`readonlyFileChannelLoader`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #832: Issue 620: Close the fileChannels for read when they are idle

2017-12-12 Thread GitBox
sijie commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r156465690
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##
 @@ -1079,24 +1109,26 @@ private Header getHeaderForLogId(long entryLogId) 
throws IOException {
 }
 
 private BufferedReadChannel getChannelForLogId(long entryLogId) throws 
IOException {
-BufferedReadChannel fc = getFromChannels(entryLogId);
-if (fc != null) {
-return fc;
-}
-File file = findFile(entryLogId);
-// get channel is used to open an existing entry log file
-// it would be better to open using read mode
-FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
-FileChannel oldFc = logid2FileChannel.putIfAbsent(entryLogId, newFc);
-if (null != oldFc) {
-newFc.close();
-newFc = oldFc;
+BufferedReadChannel brc = getFromChannels(entryLogId);
+if (brc != null) {
+return brc;
 }
+
+// logid2FileChannel cache will load fileChannel automatically
+FileChannel fc = null;
+try {
+fc = logid2FileChannel.get(entryLogId);
+} catch (ExecutionException e){
+LOG.error("ExecutionException found in get fileChannel for log {} 
in logid2FileChannel cache", entryLogId);
+// throw exception to avoid pass null to BufferedReadChannel
+throw new IOException(e);
 
 Review comment:
   ExecutionException wrap the actual cause on loading the file channel. so you 
need to unwrap this.
   ```
   if (e.getCause() instanceof IOException) {
   throw (IOException) e.getCause();
   } else {
   throw new IOException("Encountered unknown exception on opening read 
channel for entry log " + entryLogId, e.getCause());
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services