Author: frm
Date: Thu Feb 25 16:31:05 2016
New Revision: 1732330

URL: http://svn.apache.org/viewvc?rev=1732330&view=rev
Log:
OAK-4058 - Shut down executor groups after the channel handlers that use them

Modified:
    
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
    
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
    
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java

Modified: 
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java?rev=1732330&r1=1732329&r2=1732330&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
 Thu Feb 25 16:31:05 2016
@@ -21,15 +21,15 @@ package org.apache.jackrabbit.oak.plugin
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 import static 
org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetBlobReq;
 import static 
org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.concurrent.EventExecutorGroup;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.EventExecutorGroup;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;

Modified: 
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java?rev=1732330&r1=1732329&r2=1732330&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
 Thu Feb 25 16:31:05 2016
@@ -18,6 +18,17 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.standby.client;
 
+import java.io.Closeable;
+import java.lang.management.ManagementFactory;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+import javax.net.ssl.SSLException;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -35,13 +46,6 @@ import io.netty.handler.timeout.ReadTime
 import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
-
-import java.io.Closeable;
-import java.lang.management.ManagementFactory;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
 import 
org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ClientStandbyStatusMBean;
@@ -51,11 +55,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-import javax.net.ssl.SSLException;
-
 public final class StandbyClient implements ClientStandbyStatusMBean, 
Runnable, Closeable {
     public static final String CLIENT_ID_PROPERTY_NAME = "standbyID";
 
@@ -197,17 +196,17 @@ public final class StandbyClient impleme
     }
 
     private void shutdownNetty() {
-        if (group != null && !group.isShuttingDown()) {
-            group.shutdownGracefully(0, 1, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
+        if (handler != null) {
+            handler.close();
+            handler = null;
         }
         if (executor != null && !executor.isShuttingDown()) {
             executor.shutdownGracefully(0, 1, TimeUnit.SECONDS)
                     .syncUninterruptibly();
         }
-        if (handler != null) {
-            handler.close();
-            handler = null;
+        if (group != null && !group.isShuttingDown()) {
+            group.shutdownGracefully(0, 1, TimeUnit.SECONDS)
+                    .syncUninterruptibly();
         }
     }
 

Modified: 
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java?rev=1732330&r1=1732329&r2=1732330&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
 Thu Feb 25 16:31:05 2016
@@ -19,17 +19,16 @@
 package org.apache.jackrabbit.oak.plugins.segment.standby.client;
 
 import static 
org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetHeadReq;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import io.netty.util.concurrent.EventExecutorGroup;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder;
@@ -51,7 +50,6 @@ public class StandbyClientHandler extend
     private final boolean autoClean;
 
     private EventExecutorGroup loaderExecutor;
-    private ChannelHandlerContext ctx;
 
     public StandbyClientHandler(final StandbyStore store,
             CommunicationObserver observer, AtomicBoolean running,
@@ -65,7 +63,6 @@ public class StandbyClientHandler extend
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        this.ctx = ctx;
         log.debug("sending head request");
         ctx.writeAndFlush(newGetHeadReq(this.observer.getID()));
         log.debug("did send head request");
@@ -74,7 +71,7 @@ public class StandbyClientHandler extend
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, RecordId msg)
             throws Exception {
-        setHead(msg);
+        setHead(ctx, msg);
     };
 
     @Override
@@ -82,7 +79,7 @@ public class StandbyClientHandler extend
         ctx.flush();
     }
 
-    synchronized void setHead(RecordId head) {
+    synchronized void setHead(ChannelHandlerContext ctx, RecordId head) {
 
         if (store.getHead().getRecordId().equals(head)) {
             // all sync'ed up
@@ -115,14 +112,6 @@ public class StandbyClientHandler extend
 
     @Override
     public synchronized void close() {
-        if (ctx != null) {
-            for (ChannelHandler h : ctx.pipeline().toMap().values()) {
-                ctx.pipeline().remove(h);
-            }
-
-            ctx.close();
-            ctx = null;
-        }
         if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
             loaderExecutor.shutdownGracefully(0, 1, TimeUnit.SECONDS)
                     .syncUninterruptibly();


Reply via email to