Author: adulceanu
Date: Wed Aug 19 13:05:22 2020
New Revision: 1880989
URL: http://svn.apache.org/viewvc?rev=1880989&view=rev
Log:
OAK-9180 - Optimise synchronisation between threads when writing to 3rd level
segment cache
Contribution by Miroslav Smiljanic
Modified:
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
Modified:
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java?rev=1880989&r1=1880988&r2=1880989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
Wed Aug 19 13:05:22 2020
@@ -138,7 +138,7 @@ public class PersistentDiskCache extends
Buffer bufferCopy = buffer.duplicate();
Runnable task = () -> {
- if (lockSegmentWrite(segmentId)) {
+ if (writesPending.add(segmentId)) {
try (FileChannel channel = new
FileOutputStream(tempSegmentFile).getChannel()) {
int fileSize = bufferCopy.write(channel);
try {
@@ -156,7 +156,7 @@ public class PersistentDiskCache extends
logger.error("Error while deleting corrupted segment
file {}", segmentId, i);
}
} finally {
- unlockSegmentWrite(segmentId);
+ writesPending.remove(segmentId);
}
}
cleanUp();
Modified:
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java?rev=1880989&r1=1880988&r2=1880989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
Wed Aug 19 13:05:22 2020
@@ -165,7 +165,7 @@ public class PersistentRedisCache extend
Buffer bufferCopy = buffer.duplicate();
Runnable task = () -> {
- if (lockSegmentWrite(segmentId)) {
+ if (writesPending.add(segmentId)) {
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (WritableByteChannel channel = Channels.newChannel(bos);
Jedis redis = redisPool.getResource()) {
while (bufferCopy.hasRemaining()) {
@@ -177,7 +177,7 @@ public class PersistentRedisCache extend
} catch (Throwable t) {
logger.error("Error writing segment {} to cache: {}",
segmentId, t);
} finally {
- unlockSegmentWrite(segmentId);
+ writesPending.remove(segmentId);
}
}
};
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java?rev=1880989&r1=1880988&r2=1880989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
Wed Aug 19 13:05:22 2020
@@ -29,9 +29,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.util.HashSet;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -45,13 +46,13 @@ public abstract class AbstractPersistent
protected ExecutorService executor;
protected AtomicLong cacheSize = new AtomicLong(0);
protected PersistentCache nextCache;
- protected final HashSet<String> writesPending;
+ protected final Set<String> writesPending;
protected SegmentCacheStats segmentCacheStats;
public AbstractPersistentCache() {
executor = Executors.newFixedThreadPool(THREADS);
- writesPending = new HashSet<>();
+ writesPending = ConcurrentHashMap.newKeySet();
}
public PersistentCache linkWith(AbstractPersistentCache nextCache) {
@@ -139,20 +140,6 @@ public abstract class AbstractPersistent
}
public int getWritesPending() {
- synchronized (writesPending) {
- return writesPending.size();
- }
- }
-
- protected boolean lockSegmentWrite(String segmentId) {
- synchronized (writesPending) {
- return writesPending.add(segmentId);
- }
- }
-
- protected void unlockSegmentWrite(String segmentId) {
- synchronized (writesPending) {
- writesPending.remove(segmentId);
- }
+ return writesPending.size();
}
}