Author: frm
Date: Thu Feb 25 16:31:05 2016
New Revision: 1732330
URL: http://svn.apache.org/viewvc?rev=1732330&view=rev
Log:
OAK-4058 - Shut down executor groups after the channel handlers that use them
Modified:
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
Modified:
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java?rev=1732330&r1=1732329&r2=1732330&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
(original)
+++
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
Thu Feb 25 16:31:05 2016
@@ -21,15 +21,15 @@ package org.apache.jackrabbit.oak.plugin
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static
org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetBlobReq;
import static
org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.concurrent.EventExecutorGroup;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.EventExecutorGroup;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
Modified:
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java?rev=1732330&r1=1732329&r2=1732330&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
(original)
+++
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
Thu Feb 25 16:31:05 2016
@@ -18,6 +18,17 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.standby.client;
+import java.io.Closeable;
+import java.lang.management.ManagementFactory;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+import javax.net.ssl.SSLException;
+
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
@@ -35,13 +46,6 @@ import io.netty.handler.timeout.ReadTime
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
-
-import java.io.Closeable;
-import java.lang.management.ManagementFactory;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
import
org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ClientStandbyStatusMBean;
@@ -51,11 +55,6 @@ import org.apache.jackrabbit.oak.plugins
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-import javax.net.ssl.SSLException;
-
public final class StandbyClient implements ClientStandbyStatusMBean,
Runnable, Closeable {
public static final String CLIENT_ID_PROPERTY_NAME = "standbyID";
@@ -197,17 +196,17 @@ public final class StandbyClient impleme
}
private void shutdownNetty() {
- if (group != null && !group.isShuttingDown()) {
- group.shutdownGracefully(0, 1, TimeUnit.SECONDS)
- .syncUninterruptibly();
+ if (handler != null) {
+ handler.close();
+ handler = null;
}
if (executor != null && !executor.isShuttingDown()) {
executor.shutdownGracefully(0, 1, TimeUnit.SECONDS)
.syncUninterruptibly();
}
- if (handler != null) {
- handler.close();
- handler = null;
+ if (group != null && !group.isShuttingDown()) {
+ group.shutdownGracefully(0, 1, TimeUnit.SECONDS)
+ .syncUninterruptibly();
}
}
Modified:
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java?rev=1732330&r1=1732329&r2=1732330&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
(original)
+++
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
Thu Feb 25 16:31:05 2016
@@ -19,17 +19,16 @@
package org.apache.jackrabbit.oak.plugins.segment.standby.client;
import static
org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetHeadReq;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import io.netty.util.concurrent.EventExecutorGroup;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder;
@@ -51,7 +50,6 @@ public class StandbyClientHandler extend
private final boolean autoClean;
private EventExecutorGroup loaderExecutor;
- private ChannelHandlerContext ctx;
public StandbyClientHandler(final StandbyStore store,
CommunicationObserver observer, AtomicBoolean running,
@@ -65,7 +63,6 @@ public class StandbyClientHandler extend
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- this.ctx = ctx;
log.debug("sending head request");
ctx.writeAndFlush(newGetHeadReq(this.observer.getID()));
log.debug("did send head request");
@@ -74,7 +71,7 @@ public class StandbyClientHandler extend
@Override
protected void channelRead0(ChannelHandlerContext ctx, RecordId msg)
throws Exception {
- setHead(msg);
+ setHead(ctx, msg);
};
@Override
@@ -82,7 +79,7 @@ public class StandbyClientHandler extend
ctx.flush();
}
- synchronized void setHead(RecordId head) {
+ synchronized void setHead(ChannelHandlerContext ctx, RecordId head) {
if (store.getHead().getRecordId().equals(head)) {
// all sync'ed up
@@ -115,14 +112,6 @@ public class StandbyClientHandler extend
@Override
public synchronized void close() {
- if (ctx != null) {
- for (ChannelHandler h : ctx.pipeline().toMap().values()) {
- ctx.pipeline().remove(h);
- }
-
- ctx.close();
- ctx = null;
- }
if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
loaderExecutor.shutdownGracefully(0, 1, TimeUnit.SECONDS)
.syncUninterruptibly();