Author: adulceanu
Date: Wed Aug 19 13:05:22 2020
New Revision: 1880989

URL: http://svn.apache.org/viewvc?rev=1880989&view=rev
Log:
OAK-9180 - Optimise synchronisation between threads when writing to 3rd level 
segment cache
Contribution by Miroslav Smiljanic

Modified:
    
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
    
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java

Modified: 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java?rev=1880989&r1=1880988&r2=1880989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
 Wed Aug 19 13:05:22 2020
@@ -138,7 +138,7 @@ public class PersistentDiskCache extends
         Buffer bufferCopy = buffer.duplicate();
 
         Runnable task = () -> {
-            if (lockSegmentWrite(segmentId)) {
+            if (writesPending.add(segmentId)) {
                 try (FileChannel channel = new 
FileOutputStream(tempSegmentFile).getChannel()) {
                     int fileSize = bufferCopy.write(channel);
                     try {
@@ -156,7 +156,7 @@ public class PersistentDiskCache extends
                         logger.error("Error while deleting corrupted segment 
file {}", segmentId, i);
                     }
                 } finally {
-                    unlockSegmentWrite(segmentId);
+                    writesPending.remove(segmentId);
                 }
             }
             cleanUp();

Modified: 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java?rev=1880989&r1=1880988&r2=1880989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
 Wed Aug 19 13:05:22 2020
@@ -165,7 +165,7 @@ public class PersistentRedisCache extend
         Buffer bufferCopy = buffer.duplicate();
 
         Runnable task = () -> {
-            if (lockSegmentWrite(segmentId)) {
+            if (writesPending.add(segmentId)) {
                 final ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 try (WritableByteChannel channel = Channels.newChannel(bos); 
Jedis redis = redisPool.getResource()) {
                     while (bufferCopy.hasRemaining()) {
@@ -177,7 +177,7 @@ public class PersistentRedisCache extend
                 } catch (Throwable t) {
                     logger.error("Error writing segment {} to cache: {}", 
segmentId, t);
                 } finally {
-                    unlockSegmentWrite(segmentId);
+                    writesPending.remove(segmentId);
                 }
             }
         };

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java?rev=1880989&r1=1880988&r2=1880989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
 Wed Aug 19 13:05:22 2020
@@ -29,9 +29,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -45,13 +46,13 @@ public abstract class AbstractPersistent
     protected ExecutorService executor;
     protected AtomicLong cacheSize = new AtomicLong(0);
     protected PersistentCache nextCache;
-    protected final HashSet<String> writesPending;
+    protected final Set<String> writesPending;
 
     protected SegmentCacheStats segmentCacheStats;
 
     public AbstractPersistentCache() {
         executor = Executors.newFixedThreadPool(THREADS);
-        writesPending = new HashSet<>();
+        writesPending = ConcurrentHashMap.newKeySet();
     }
 
     public PersistentCache linkWith(AbstractPersistentCache nextCache) {
@@ -139,20 +140,6 @@ public abstract class AbstractPersistent
     }
 
     public int getWritesPending() {
-        synchronized (writesPending) {
-            return writesPending.size();
-        }
-    }
-
-    protected boolean lockSegmentWrite(String segmentId) {
-        synchronized (writesPending) {
-            return writesPending.add(segmentId);
-        }
-    }
-
-    protected void unlockSegmentWrite(String segmentId) {
-        synchronized (writesPending) {
-            writesPending.remove(segmentId);
-        }
+        return writesPending.size();
     }
 }


Reply via email to