dschneider-pivotal commented on a change in pull request #5420:
URL: https://github.com/apache/geode/pull/5420#discussion_r466037897



##########
File path: 
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
##########
@@ -86,11 +89,17 @@ public static void beforeClass() throws Exception {
     locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
 
     locator = cluster.startLocatorVM(0, locatorProperties);
-    server1 = cluster.startRedisVM(1, locator.getPort());
-    server2 = cluster.startRedisVM(2, locator.getPort());
-    server3 = cluster.startRedisVM(3, locator.getPort());
-    server4 = cluster.startRedisVM(4, locator.getPort());
-    server5 = cluster.startServerVM(5, locator.getPort());
+    Properties props = new Properties();
+    props.setProperty("statistic-archive-file", "stats1.gfs");

Review comment:
       remove the stats file config here

##########
File path: 
geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
##########
@@ -80,31 +85,59 @@
    * @param password Authentication password for each context, can be null
    */
   public ExecutionHandlerContext(Channel channel, RegionProvider 
regionProvider, PubSub pubsub,
-      EventLoopGroup subscriberGroup,
       Supplier<Boolean> allowUnsupportedSupplier,
       Runnable shutdownInvoker,
       RedisStats redisStats,
+      ExecutorService backgroundExecutor,
       byte[] password) {
     this.channel = channel;
     this.regionProvider = regionProvider;
     this.pubsub = pubsub;
-    this.subscriberGroup = subscriberGroup;
     this.allowUnsupportedSupplier = allowUnsupportedSupplier;
     this.shutdownInvoker = shutdownInvoker;
     this.redisStats = redisStats;
+    this.backgroundExecutor = backgroundExecutor;
     this.client = new Client(channel);
     this.byteBufAllocator = this.channel.alloc();
     this.authPassword = password;
     this.isAuthenticated = password == null;
     redisStats.addClient();
-  }
 
-  public ChannelFuture writeToChannel(ByteBuf message) {
-    return channel.writeAndFlush(message, channel.newPromise());
+    backgroundExecutor.submit(this::processCommandQueue);
   }
 
   public ChannelFuture writeToChannel(RedisResponse response) {
-    return channel.writeAndFlush(response.encode(byteBufAllocator), 
channel.newPromise());
+    if (response == null) {

Review comment:
       this null check is no longer needed

##########
File path: 
geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
##########
@@ -59,17 +60,21 @@
 public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
 
   private static final Logger logger = LogService.getLogger();
+  private static final Command TERMINATE_COMMAND = new Command();
 
   private final Client client;
   private final Channel channel;
   private final RegionProvider regionProvider;
   private final PubSub pubsub;
-  private final EventLoopGroup subscriberGroup;
   private final ByteBufAllocator byteBufAllocator;
   private final byte[] authPassword;
   private final Supplier<Boolean> allowUnsupportedSupplier;
   private final Runnable shutdownInvoker;
   private final RedisStats redisStats;
+  private final ExecutorService backgroundExecutor;
+  private final int MAX_QUEUED_COMMANDS = 100;
+  private final LinkedBlockingQueue<Command> commandQueue =

Review comment:
       change this to be unlimited in size

##########
File path: 
geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
##########
@@ -186,43 +210,49 @@ public void channelInactive(ChannelHandlerContext ctx) {
     if (logger.isDebugEnabled()) {
       logger.debug("GeodeRedisServer-Connection closing with " + 
ctx.channel().remoteAddress());
     }
+    commandQueue.offer(TERMINATE_COMMAND);

Review comment:
       use put instead of offer

##########
File path: 
geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
##########
@@ -113,17 +146,8 @@ public ChannelFuture writeToChannel(RedisResponse 
response) {
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
     Command command = (Command) msg;
-    try {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Executing Redis command: {}", command);
-      }
-
-      executeCommand(ctx, command);
-    } catch (Exception e) {
-      logger.warn("Execution of Redis command {} failed: {}", command, e);
-      throw e;
-    }
-
+    command.setChannelHandlerContext(ctx);
+    commandQueue.offer(command);

Review comment:
       use put instead of offer




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to