Author: frm
Date: Thu Dec  7 10:03:41 2017
New Revision: 1817343

URL: http://svn.apache.org/viewvc?rev=1817343&view=rev
Log:
OAK-7027 - Close StandbyClient as soon as the synchronization is over

Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java?rev=1817343&r1=1817342&r2=1817343&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
 Thu Dec  7 10:03:41 2017
@@ -33,7 +33,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
-import com.google.common.base.Supplier;
 import io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
@@ -86,6 +85,18 @@ public final class StandbyClientSync imp
 
     private final File spoolFolder;
 
+    private final StandbyClientSyncExecution execution;
+
+    private static String clientId() {
+        String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
+
+        if (s == null || s.isEmpty()) {
+            return UUID.randomUUID().toString();
+        }
+
+        return s;
+    }
+
     public StandbyClientSync(String host, int port, FileStore store, boolean 
secure, int readTimeoutMs, boolean autoClean, File spoolFolder) {
         this.state = STATUS_INITIALIZING;
         this.lastSuccessfulRequest = -1;
@@ -98,14 +109,12 @@ public final class StandbyClientSync imp
         this.readTimeoutMs = readTimeoutMs;
         this.autoClean = autoClean;
         this.fileStore = store;
-        String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
-        this.observer = new CommunicationObserver((s == null || s.isEmpty()) ? 
UUID.randomUUID().toString() : s);
+        this.observer = new CommunicationObserver(clientId());
         this.group = new NioEventLoopGroup(0, new 
NamedThreadFactory("standby"));
+        this.execution = new StandbyClientSyncExecution(fileStore, 
running::get);
         this.spoolFolder = spoolFolder;
-
-        final MBeanServer jmxServer = 
ManagementFactory.getPlatformMBeanServer();
         try {
-            jmxServer.registerMBean(new StandardMBean(this, 
ClientStandbyStatusMBean.class), new ObjectName(this.getMBeanName()));
+            ManagementFactory.getPlatformMBeanServer().registerMBean(new 
StandardMBean(this, ClientStandbyStatusMBean.class), new 
ObjectName(this.getMBeanName()));
         } catch (Exception e) {
             log.error("cannot register standby status mbean", e);
         }
@@ -154,18 +163,23 @@ public final class StandbyClientSync imp
 
             try {
                 long startTimestamp = System.currentTimeMillis();
+
+                GCGeneration genBefore = headGeneration(fileStore);
+
                 try (StandbyClient client = new StandbyClient(group, 
observer.getID(), secure, readTimeoutMs, spoolFolder)) {
                     client.connect(host, port);
+                    execution.execute(client);
+                }
+
+                fileStore.flush();
 
-                    GCGeneration genBefore = headGeneration(fileStore);
-                    new StandbyClientSyncExecution(fileStore, client, 
newRunningSupplier()).execute();
-                    GCGeneration genAfter = headGeneration(fileStore);
-
-                    if (autoClean && (genAfter.compareWith(genBefore)) > 0) {
-                        log.info("New head generation detected (prevHeadGen: 
{} newHeadGen: {}), running cleanup.", genBefore, genAfter);
-                        cleanupAndRemove();
-                    }
+                GCGeneration genAfter = headGeneration(fileStore);
+
+                if (autoClean && genAfter.compareWith(genBefore) > 0) {
+                    log.info("New head generation detected (prevHeadGen: {} 
newHeadGen: {}), running cleanup.", genBefore, genAfter);
+                    cleanupAndRemove();
                 }
+
                 this.failedRequests = 0;
                 this.syncStartTimestamp = startTimestamp;
                 this.syncEndTimestamp = System.currentTimeMillis();
@@ -192,17 +206,6 @@ public final class StandbyClientSync imp
         fileStore.cleanup();
     }
 
-    private Supplier<Boolean> newRunningSupplier() {
-        return new Supplier<Boolean>() {
-
-            @Override
-            public Boolean get() {
-                return running.get();
-            }
-
-        };
-    }
-
     @Nonnull
     @Override
     public String getMode() {

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java?rev=1817343&r1=1817342&r2=1817343&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
 Thu Dec  7 10:03:41 2017
@@ -24,6 +24,7 @@ import java.util.UUID;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.SegmentId;
@@ -49,19 +50,16 @@ class StandbyClientSyncExecution {
 
     private final SegmentIdProvider idProvider;
 
-    private final StandbyClient client;
-
     private final Supplier<Boolean> running;
 
-    StandbyClientSyncExecution(FileStore store, StandbyClient client, 
Supplier<Boolean> running) {
+    StandbyClientSyncExecution(FileStore store, Supplier<Boolean> running) {
         this.store = store;
         this.idProvider = store.getSegmentIdProvider();
-        this.client = client;
         this.running = running;
     }
 
-    void execute() throws Exception {
-        RecordId remoteHead = getHead();
+    void execute(StandbyClient client) throws Exception {
+        RecordId remoteHead = getHead(client);
 
         if (remoteHead == null) {
             log.error("Unable to fetch remote head");
@@ -72,18 +70,17 @@ class StandbyClientSyncExecution {
             return;
         }
 
-        long t = System.currentTimeMillis();
+        Stopwatch stopwatch = Stopwatch.createStarted();
         SegmentNodeState before = store.getHead();
         SegmentNodeBuilder builder = before.builder();
         SegmentNodeState current = newSegmentNodeState(remoteHead);
-        compareAgainstBaseState(current, before, builder);
-        boolean ok = store.getRevisions().setHead(before.getRecordId(), 
remoteHead);
-        store.flush();
-        log.info("updated head state successfully: {} in {}ms.", ok, 
System.currentTimeMillis() - t);
+        compareAgainstBaseState(client, current, before, builder);
+        store.getRevisions().setHead(before.getRecordId(), remoteHead);
+        log.info("Updated head state in {}", stopwatch);
     }
 
     @Nullable
-    private RecordId getHead() throws Exception {
+    private RecordId getHead(StandbyClient client) throws Exception {
         String head = client.getHead();
         if (head == null) {
             return null;
@@ -95,18 +92,19 @@ class StandbyClientSyncExecution {
         return store.getReader().readNode(id);
     }
 
-    private boolean compareAgainstBaseState(SegmentNodeState current, 
SegmentNodeState before, SegmentNodeBuilder builder) throws Exception {
+    private void compareAgainstBaseState(StandbyClient client, 
SegmentNodeState current, SegmentNodeState before, SegmentNodeBuilder builder) 
throws Exception {
         while (true) {
             try {
-                return current.compareAgainstBaseState(before, new 
StandbyDiff(builder, store, client, running));
+                current.compareAgainstBaseState(before, new 
StandbyDiff(builder, store, client, running));
+                return;
             } catch (SegmentNotFoundException e) {
                 log.debug("Found missing segment {}", e.getSegmentId());
-                
copySegmentHierarchyFromPrimary(UUID.fromString(e.getSegmentId()));
+                copySegmentHierarchyFromPrimary(client, 
UUID.fromString(e.getSegmentId()));
             }
         }
     }
 
-    private void copySegmentHierarchyFromPrimary(UUID segmentId) throws 
Exception {
+    private void copySegmentHierarchyFromPrimary(StandbyClient client, UUID 
segmentId) throws Exception {
         LinkedList<UUID> batch = new LinkedList<>();
 
         batch.offer(segmentId);
@@ -137,7 +135,7 @@ class StandbyClientSyncExecution {
                 continue;
             }
 
-            for (String s : readReferences(current)) {
+            for (String s : readReferences(client, current)) {
                 UUID referenced = UUID.fromString(s);
 
                 // Short circuit for the "backward reference". The segment 
graph
@@ -189,17 +187,16 @@ class StandbyClientSyncExecution {
 
         for (UUID id : bulk) {
             log.info("Copying bulk segment {} from primary", id);
-            copySegmentFromPrimary(id);
+            copySegmentFromPrimary(client, id);
         }
 
         for (UUID id : data) {
             log.info("Copying data segment {} from primary", id);
-            copySegmentFromPrimary(id);
+            copySegmentFromPrimary(client, id);
         }
-
     }
 
-    private Iterable<String> readReferences(UUID id) throws 
InterruptedException {
+    private Iterable<String> readReferences(StandbyClient client, UUID id) 
throws InterruptedException {
         Iterable<String> references = client.getReferences(id.toString());
 
         if (references == null) {
@@ -216,7 +213,7 @@ class StandbyClientSyncExecution {
         ));
     }
 
-    private void copySegmentFromPrimary(UUID uuid) throws Exception {
+    private void copySegmentFromPrimary(StandbyClient client, UUID uuid) 
throws Exception {
         byte[] data = client.getSegment(uuid.toString());
 
         if (data == null) {


Reply via email to