Author: frm
Date: Thu Aug 31 08:12:36 2017
New Revision: 1806761

URL: http://svn.apache.org/viewvc?rev=1806761&view=rev
Log:
OAK-6602 - Improve resource management in BulkTransferBenchmark

Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BenchmarkBase.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BulkTransferBenchmark.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BenchmarkBase.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BenchmarkBase.java?rev=1806761&r1=1806760&r2=1806761&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BenchmarkBase.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BenchmarkBase.java
 Thu Aug 31 08:12:36 2017
@@ -18,62 +18,61 @@
  */
 package org.apache.jackrabbit.oak.segment.standby.benchmark;
 
-import static java.io.File.createTempFile;
 import static 
org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
+import com.google.common.io.Closer;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync;
 import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
 
-public class BenchmarkBase {
-    static final int port = Integer.getInteger("standby.server.port", 52800);
-    static final String LOCALHOST = "127.0.0.1";
-    static final int MB = 1024 * 1024;
-    static final int timeout = Integer.getInteger("standby.test.timeout", 500);
-
-    File directoryS;
-    FileStore storeS;
-    ScheduledExecutorService executorS;
-
-    File directoryC;
-    FileStore storeC;
-    ScheduledExecutorService executorC;
-
-    public void setUpServerAndClient() throws Exception {
-        directoryS = createTmpTargetDir(getClass().getSimpleName() + 
"-Server");
-        executorS = Executors.newSingleThreadScheduledExecutor();
-        storeS = setupPrimary(directoryS, executorS);
-
-        // client
-        directoryC = createTmpTargetDir(getClass().getSimpleName() + 
"-Client");
-        executorC = Executors.newSingleThreadScheduledExecutor();
-        storeC = setupSecondary(directoryC, executorC);
-    }
-
-    public void closeServerAndClient() {
-        storeS.close();
-        storeC.close();
-        
+class BenchmarkBase {
+
+    private static File createTmpTargetDir(String name) throws IOException {
+        return Files.createTempDirectory(Paths.get("target"), name).toFile();
+    }
+
+    private Closer closer;
+
+    FileStore primaryStore;
+
+    FileStore standbyStore;
+
+    void setUpServerAndClient() throws Exception {
+        closer = Closer.create();
+
+        File primaryDirectory = createTmpTargetDir("primary-");
+        closer.register(() -> FileUtils.deleteDirectory(primaryDirectory));
+
+        ScheduledExecutorService primaryExecutor = 
Executors.newSingleThreadScheduledExecutor();
+        closer.register(new ExecutorCloser(primaryExecutor));
+
+        primaryStore = setupPrimary(primaryDirectory, primaryExecutor);
+        closer.register(primaryStore);
+
+        File standbyDirectory = createTmpTargetDir("standby-");
+        closer.register(() -> FileUtils.deleteDirectory(standbyDirectory));
+
+        ScheduledExecutorService standbyExecutor = 
Executors.newSingleThreadScheduledExecutor();
+        closer.register(new ExecutorCloser(standbyExecutor));
+
+        standbyStore = setupStandby(standbyDirectory, standbyExecutor);
+        closer.register(standbyStore);
+    }
+
+    void closeServerAndClient() {
         try {
-            FileUtils.deleteDirectory(directoryS);
-            FileUtils.deleteDirectory(directoryC);
+            closer.close();
         } catch (IOException e) {
-            // ignore
-        } finally {
-            if (executorS != null) {
-                new ExecutorCloser(executorS).close();
-            }
-
-            if (executorC != null) {
-                new ExecutorCloser(executorC).close();
-            }
+            e.printStackTrace(System.err);
         }
     }
 
@@ -89,30 +88,12 @@ public class BenchmarkBase {
                 .build();
     }
 
-    protected FileStore setupPrimary(File directory, ScheduledExecutorService 
executor) throws Exception {
+    private FileStore setupPrimary(File directory, ScheduledExecutorService 
executor) throws Exception {
         return newFileStore(directory, executor);
     }
 
-    protected FileStore setupSecondary(File directory, 
ScheduledExecutorService executor) throws Exception {
+    private FileStore setupStandby(File directory, ScheduledExecutorService 
executor) throws Exception {
         return newFileStore(directory, executor);
     }
 
-    public StandbyClientSync newStandbyClientSync(FileStore store) throws 
Exception {
-        return newStandbyClientSync(store, port, false);
-    }
-
-    public StandbyClientSync newStandbyClientSync(FileStore store, int port) 
throws Exception {
-        return newStandbyClientSync(store, port, false);
-    }
-
-    public StandbyClientSync newStandbyClientSync(FileStore store, int port, 
boolean secure) throws Exception {
-        return new StandbyClientSync(LOCALHOST, port, store, secure, timeout, 
false);
-    }
-
-    private static File createTmpTargetDir(String name) throws IOException {
-        File f = createTempFile(name, "dir", new File("target"));
-        f.delete();
-        f.mkdir();
-        return f;
-    }
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BulkTransferBenchmark.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BulkTransferBenchmark.java?rev=1806761&r1=1806760&r2=1806761&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BulkTransferBenchmark.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BulkTransferBenchmark.java
 Thu Aug 31 08:12:36 2017
@@ -20,12 +20,13 @@
 package org.apache.jackrabbit.oak.segment.standby.benchmark;
 
 import java.lang.management.ManagementFactory;
-import java.lang.reflect.Method;
+import java.util.Random;
 import java.util.Set;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.base.Stopwatch;
 import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
 import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync;
 import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
@@ -37,115 +38,122 @@ import org.apache.jackrabbit.oak.spi.sta
 
 public class BulkTransferBenchmark extends BenchmarkBase {
 
-    public void setUp() throws Exception {
-        setUpServerAndClient();
-    }
+    private static final String HOST = "127.0.0.1";
+
+    private static final int PORT = Integer.getInteger("standby.server.port", 
52800);
 
-    public void after() {
-        closeServerAndClient();
+    private static final int TIMEOUT = 
Integer.getInteger("standby.test.timeout", 500);
+
+    private void test100Nodes() throws Exception {
+        test("100 nodes", 100);
     }
 
-    public void test100Nodes() throws Exception {
-        test(100, 1, 1, 3000, 3100);
+    private void test1000Nodes() throws Exception {
+        test("1K nodes", 1000);
     }
 
-    public void test1000Nodes() throws Exception {
-        test(1000, 1, 1, 53000, 55000);
+    private void test10000Nodes() throws Exception {
+        test("10K nodes", 10000);
     }
 
-    public void test10000Nodes() throws Exception {
-        test(10000, 1, 1, 245000, 246000);
+    private void test100000Nodes() throws Exception {
+        test("100K nodes", 100000);
     }
 
-    public void test100000Nodes() throws Exception {
-        test(100000, 9, 9, 2210000, 2220000);
+    private void test1MillionNodes() throws Exception {
+        test("1M nodes", 1000000);
     }
 
-    public void test1MillionNodes() throws Exception {
-        test(1000000, 87, 87, 22700000, 22800000);
+    private void test1MillionNodesUsingSSL() throws Exception {
+        test("1M nodes with SSL", 1000000, true);
     }
 
-    public void test1MillionNodesUsingSSL() throws Exception {
-        test(1000000, 87, 87, 22700000, 22800000, true);
+    private void test10MillionNodes() throws Exception {
+        test("10M nodes", 10000000);
     }
 
-    public void test10MillionNodes() throws Exception {
-        test(10000000, 856, 856, 223000000, 224000000);
+    private void test(String name, int number) throws Exception {
+        test(name, number, false);
     }
 
-    private void test(int number, int minExpectedSegments, int 
maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes) throws 
Exception {
-        test(number, minExpectedSegments, maxExpectedSegments, 
minExpectedBytes, maxExpectedBytes, false);
+    private void createNodes(int nodeCount) throws Exception {
+        NodeStore store = 
SegmentNodeStoreBuilders.builder(primaryStore).build();
+        NodeBuilder rootBuilder = store.getRoot().builder();
+        createNodes(rootBuilder.child("store"), nodeCount, new Random());
+        store.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        primaryStore.flush();
     }
 
-    private void test(int number, int minExpectedSegments, int 
maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes,
-                      boolean useSSL) throws Exception {
-        NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build();
-        NodeBuilder rootbuilder = store.getRoot().builder();
-        NodeBuilder b = rootbuilder.child("store");
-        for (int j=0; j<=number / 1000; j++) {
-            NodeBuilder builder = b.child("Folder#" + j);
-            for (int i = 0; i <(number < 1000 ? number : 1000); i++) {
-                builder.child("Test#" + i).setProperty("ts", 
System.currentTimeMillis());
+    private static void createNodes(NodeBuilder builder, int nodeCount, Random 
random) {
+        for (int j = 0; j <= nodeCount / 1000; j++) {
+            NodeBuilder folder = builder.child("Folder#" + j);
+            for (int i = 0; i < (nodeCount < 1000 ? nodeCount : 1000); i++) {
+                folder.child("Test#" + i).setProperty("ts", random.nextLong());
             }
         }
-        store.merge(rootbuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-        storeS.flush();
-
-        final StandbyServerSync serverSync = new StandbyServerSync(port, 
storeS, 1 * MB, useSSL);
-        serverSync.start();
-
-        System.setProperty(StandbyClientSync.CLIENT_ID_PROPERTY_NAME, "Bar");
-        StandbyClientSync clientSync = newStandbyClientSync(storeC, port, 
useSSL);
+    }
 
-        final MBeanServer jmxServer = 
ManagementFactory.getPlatformMBeanServer();
-        ObjectName status = new ObjectName(StandbyStatusMBean.JMX_NAME + 
",id=*");
-        ObjectName clientStatus = new ObjectName(clientSync.getMBeanName());
-        ObjectName serverStatus = new ObjectName(serverSync.getMBeanName());
+    private void test(String name, int nodeCount, boolean useSSL) throws 
Exception {
+        createNodes(nodeCount);
 
-        long start = System.currentTimeMillis();
-        clientSync.run();
+        try (StandbyServerSync serverSync = new StandbyServerSync(PORT, 
primaryStore, 1024 * 1024, useSSL);
+             StandbyClientSync clientSync = new StandbyClientSync(HOST, PORT, 
standbyStore, useSSL, TIMEOUT, false)) {
+            serverSync.start();
+
+            MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+            ObjectName status = new ObjectName(StandbyStatusMBean.JMX_NAME + 
",id=*");
+            ObjectName clientStatus = new 
ObjectName(clientSync.getMBeanName());
+            ObjectName serverStatus = new 
ObjectName(serverSync.getMBeanName());
+
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            clientSync.run();
+            stopwatch.stop();
 
-        try {
             Set<ObjectName> instances = jmxServer.queryNames(status, null);
             ObjectName connectionStatus = null;
             for (ObjectName s : instances) {
-                if (!s.equals(clientStatus) && !s.equals(serverStatus)) 
connectionStatus = s;
+                if (!s.equals(clientStatus) && !s.equals(serverStatus)) {
+                    connectionStatus = s;
+                }
             }
-            assert(connectionStatus != null);
+            assert (connectionStatus != null);
 
-            long segments = ((Long)jmxServer.getAttribute(connectionStatus, 
"TransferredSegments")).longValue();
-            long bytes = ((Long)jmxServer.getAttribute(connectionStatus, 
"TransferredSegmentBytes")).longValue();
+            long segments = (Long) jmxServer.getAttribute(connectionStatus, 
"TransferredSegments");
+            long bytes = (Long) jmxServer.getAttribute(connectionStatus, 
"TransferredSegmentBytes");
 
-            System.out.println("did transfer " + segments + " segments with " 
+ bytes + " bytes in " + (System.currentTimeMillis() - start) / 1000 + " 
seconds.");
-        } finally {
-            serverSync.close();
-            clientSync.close();
+            System.out.printf("%s: segments = %d, segments size = %d bytes, 
time = %s\n", name, segments, bytes, stopwatch);
         }
     }
-    
+
+    private interface Test {
+
+        void run() throws Exception;
+
+    }
+
     public static void main(String[] args) {
         BulkTransferBenchmark benchmark = new BulkTransferBenchmark();
-        
-        String[] methodNames = new String[] {
-                "test100Nodes",
-                "test1000Nodes",
-                "test10000Nodes",
-                "test100000Nodes",
-                "test1MillionNodes",
-                "test1MillionNodesUsingSSL",
-                "test10MillionNodes"
+
+        Test[] tests = new Test[] {
+            benchmark::test100Nodes,
+            benchmark::test1000Nodes,
+            benchmark::test10000Nodes,
+            benchmark::test100000Nodes,
+            benchmark::test1MillionNodes,
+            benchmark::test1MillionNodesUsingSSL,
+            benchmark::test10MillionNodes
         };
-        
-        for (String methodName : methodNames) {
+
+        for (Test test : tests) {
             try {
-                Method method = benchmark.getClass().getMethod(methodName);
-                
-                benchmark.setUp();
-                method.invoke(benchmark);
-                benchmark.after();
+                benchmark.setUpServerAndClient();
+                test.run();
             } catch (Exception e) {
-                e.printStackTrace();
-            } 
+                e.printStackTrace(System.err);
+            } finally {
+                benchmark.closeServerAndClient();
+            }
         }
     }
+
 }


Reply via email to