Author: frm
Date: Tue Nov 15 07:56:26 2016
New Revision: 1769753

URL: http://svn.apache.org/viewvc?rev=1769753&view=rev
Log:
OAK-5106 - Use resources more efficiently in the cold standby

- Avoid creating and destroying multiple instances of the network proxy when
  running integration tests. Instead, resuse the same instance.
- Fix a leak in the cold standby server. The cold standby server didn't release
  the Netty resources (socket, threads) acquired during initialization.
- Avoid creating a new Netty thread pool when a synchronization loop is started
  in the standby client. The synchronization loop now reuses a previously
        created thread pool owned by the client process.

Contribution by Timothee Maret.

Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java?rev=1769753&r1=1769752&r2=1769753&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
 Tue Nov 15 07:56:26 2016
@@ -77,14 +77,14 @@ class StandbyClient implements AutoClose
 
     private Channel channel;
 
-    StandbyClient(String clientId, boolean secure, int readTimeoutMs) {
+    StandbyClient(NioEventLoopGroup group, String clientId, boolean secure, 
int readTimeoutMs) {
+        this.group = group;
         this.clientId = clientId;
         this.secure = secure;
         this.readTimeoutMs = readTimeoutMs;
     }
 
     void connect(String host, int port) throws Exception {
-        group = new NioEventLoopGroup();
 
         final SslContext sslContext;
 
@@ -148,11 +148,6 @@ class StandbyClient implements AutoClose
 
     @Override
     public void close() {
-        closeChannel();
-        closeGroup();
-    }
-
-    private void closeChannel() {
         if (channel == null) {
             return;
         }
@@ -163,17 +158,6 @@ class StandbyClient implements AutoClose
         }
     }
 
-    private void closeGroup() {
-        if (group == null) {
-            return;
-        }
-        if (group.shutdownGracefully(2, 15, 
TimeUnit.SECONDS).awaitUninterruptibly(20, TimeUnit.SECONDS)) {
-            log.debug("Group shut down");
-        } else {
-            log.debug("Group shutdown timed out");
-        }
-    }
-
     @Nullable
     String getHead() throws InterruptedException {
         channel.writeAndFlush(new GetHeadRequest(clientId));

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java?rev=1769753&r1=1769752&r2=1769753&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
 Tue Nov 15 07:56:26 2016
@@ -23,6 +23,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.management.MBeanServer;
@@ -31,6 +32,7 @@ import javax.management.StandardMBean;
 import javax.net.ssl.SSLException;
 
 import com.google.common.base.Supplier;
+import io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.standby.jmx.ClientStandbyStatusMBean;
 import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
@@ -74,6 +76,8 @@ public final class StandbyClientSync imp
 
     private long syncEndTimestamp;
 
+    private final NioEventLoopGroup group;
+
     public StandbyClientSync(String host, int port, FileStore store, boolean 
secure, int readTimeoutMs, boolean autoClean) throws SSLException {
         this.state = STATUS_INITIALIZING;
         this.lastSuccessfulRequest = -1;
@@ -88,6 +92,7 @@ public final class StandbyClientSync imp
         this.fileStore = store;
         String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
         this.observer = new CommunicationObserver((s == null || s.length() == 
0) ? UUID.randomUUID().toString() : s);
+        group = new NioEventLoopGroup();
 
         final MBeanServer jmxServer = 
ManagementFactory.getPlatformMBeanServer();
         try {
@@ -111,6 +116,7 @@ public final class StandbyClientSync imp
         } catch (Exception e) {
             log.error("can unregister standby status mbean", e);
         }
+        closeGroup();
         observer.unregister();
         state = STATUS_CLOSED;
     }
@@ -134,7 +140,7 @@ public final class StandbyClientSync imp
 
         try {
             long startTimestamp = System.currentTimeMillis();
-            try (StandbyClient client = new StandbyClient(observer.getID(), 
secure, readTimeoutMs)) {
+            try (StandbyClient client = new StandbyClient(group, 
observer.getID(), secure, readTimeoutMs)) {
                 client.connect(host, port);
 
                 int genBefore = headGeneration(fileStore);
@@ -248,4 +254,15 @@ public final class StandbyClientSync imp
         return syncEndTimestamp;
     }
 
+    private void closeGroup() {
+        if (group == null) {
+            return;
+        }
+        if (group.shutdownGracefully(2, 15, 
TimeUnit.SECONDS).awaitUninterruptibly(20, TimeUnit.SECONDS)) {
+            log.debug("Group shut down");
+        } else {
+            log.debug("Group shutdown timed out");
+        }
+    }
+
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java?rev=1769753&r1=1769752&r2=1769753&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java
 Tue Nov 15 07:56:26 2016
@@ -132,7 +132,9 @@ public class StandbyServerSync implement
     public void close() {
         stop();
         state = STATUS_CLOSING;
-
+        if (server != null) {
+            server.close();
+        }
         observer.unregister();
         final MBeanServer jmxServer = 
ManagementFactory.getPlatformMBeanServer();
         try {

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java?rev=1769753&r1=1769752&r2=1769753&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
 Tue Nov 15 07:56:26 2016
@@ -32,6 +32,8 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.test.TemporaryFileStore;
 import org.apache.jackrabbit.oak.segment.test.proxy.NetworkErrorProxy;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.RuleChain;
@@ -47,12 +49,24 @@ public class BrokenNetworkIT extends Tes
 
     private TemporaryFileStore clientFileStore2 = new 
TemporaryFileStore(folder, true);
 
+    private static NetworkErrorProxy proxy;
+
     @Rule
     public RuleChain chain = RuleChain.outerRule(folder)
             .around(serverFileStore)
             .around(clientFileStore1)
             .around(clientFileStore2);
 
+    @BeforeClass
+    public static void beforeClass() {
+        proxy = new NetworkErrorProxy(getProxyPort(), getServerHost(), 
getServerPort());
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        proxy.close();
+    }
+
     @Test
     public void testProxy() throws Exception {
         useProxy(false);
@@ -133,13 +147,12 @@ public class BrokenNetworkIT extends Tes
         storeS.flush();  // this speeds up the test a little bit...
 
         try (
-                NetworkErrorProxy p = new NetworkErrorProxy(getProxyPort(), 
getServerHost(), getServerPort());
                 StandbyServerSync serverSync = new 
StandbyServerSync(getServerPort(), storeS, ssl);
                 StandbyClientSync clientSync = newStandbyClientSync(storeC, 
getProxyPort(), ssl);
         ) {
-            p.skipBytes(skipPosition, skipBytes);
-            p.flipByte(flipPosition);
-            p.connect();
+            proxy.skipBytes(skipPosition, skipBytes);
+            proxy.flipByte(flipPosition);
+            proxy.connect();
 
             serverSync.start();
 
@@ -149,7 +162,7 @@ public class BrokenNetworkIT extends Tes
                 assertFalse("stores are not expected to be equal", 
storeS.getHead().equals(storeC.getHead()));
                 assertEquals(storeC2.getHead(), storeC.getHead());
 
-                p.reset();
+                proxy.reset();
                 if (intermediateChange) {
                     addTestContent(store, "server2");
                     storeS.flush();

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java?rev=1769753&r1=1769752&r2=1769753&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
 Tue Nov 15 07:56:26 2016
@@ -43,12 +43,16 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public abstract class DataStoreTestBase extends TestBase {
 
     private static final int MB = 1024 * 1024;
 
+    private static NetworkErrorProxy proxy;
+
     abstract FileStore getPrimary();
 
     abstract FileStore getSecondary();
@@ -70,6 +74,16 @@ public abstract class DataStoreTestBase
         return data;
     }
 
+    @BeforeClass
+    public static void beforeClass() {
+        proxy = new NetworkErrorProxy(getProxyPort(), getServerHost(), 
getServerPort());
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        proxy.close();
+    }
+
     @Test
     public void testSync() throws Exception {
         final int blobSize = 5 * MB;
@@ -174,13 +188,12 @@ public abstract class DataStoreTestBase
         NodeStore store = SegmentNodeStoreBuilders.builder(primary).build();
         byte[] data = addTestContent(store, "server", blobSize);
         try (
-                NetworkErrorProxy p = new NetworkErrorProxy(getProxyPort(), 
getServerHost(), getServerPort());
                 StandbyServerSync serverSync = new 
StandbyServerSync(getServerPort(), primary);
                 StandbyClientSync clientSync = newStandbyClientSync(secondary, 
getProxyPort())
         ) {
-            p.skipBytes(skipPosition, skipBytes);
-            p.flipByte(flipPosition);
-            p.connect();
+            proxy.skipBytes(skipPosition, skipBytes);
+            proxy.flipByte(flipPosition);
+            proxy.connect();
 
             serverSync.start();
             primary.flush();
@@ -191,7 +204,7 @@ public abstract class DataStoreTestBase
                 if (!storesShouldBeEqual()) {
                     assertFalse("stores are not expected to be equal", 
primary.getHead().equals(secondary.getHead()));
                 }
-                p.reset();
+                proxy.reset();
                 if (intermediateChange) {
                     blobSize = 2 * MB;
                     data = addTestContent(store, "server", blobSize);


Reply via email to