dschneider-pivotal commented on a change in pull request #7408:
URL: https://github.com/apache/geode/pull/7408#discussion_r824943346



##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BlockingCommandListener implements EventListener {
+
+  private final ExecutionHandlerContext context;
+  private final RedisCommandType command;
+  private final List<RedisKey> keys;
+  private final List<byte[]> commandArgs;
+  private final long timeoutNanos;
+  private final long timeSubmitted;
+  private Runnable cleanupTask;
+
+  /**
+   * Constructor to create an instance of a BlockingCommandListener in 
response to a blocking
+   * command. When receiving a relevant event, blocking commands simply 
resubmit the command
+   * into the Netty pipeline.
+   *
+   * @param context the associated ExecutionHandlerContext
+   * @param command the blocking command associated with this listener
+   * @param keys the list of keys the command is interested in
+   * @param timeoutSeconds the timeout for the command to block in seconds
+   * @param commandArgs all arguments to the command which are used for 
resubmission
+   */
+  public BlockingCommandListener(ExecutionHandlerContext context, 
RedisCommandType command,
+      List<RedisKey> keys, double timeoutSeconds, List<byte[]> commandArgs) {
+    this.context = context;
+    this.command = command;

Review comment:
       I was confused by this being named "command". I thought it was the 
actual Command instance. "commandType" would be better.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BlockingCommandListener implements EventListener {
+
+  private final ExecutionHandlerContext context;
+  private final RedisCommandType command;
+  private final List<RedisKey> keys;
+  private final List<byte[]> commandArgs;
+  private final long timeoutNanos;
+  private final long timeSubmitted;
+  private Runnable cleanupTask;
+
+  /**
+   * Constructor to create an instance of a BlockingCommandListener in 
response to a blocking
+   * command. When receiving a relevant event, blocking commands simply 
resubmit the command
+   * into the Netty pipeline.
+   *
+   * @param context the associated ExecutionHandlerContext
+   * @param command the blocking command associated with this listener
+   * @param keys the list of keys the command is interested in
+   * @param timeoutSeconds the timeout for the command to block in seconds
+   * @param commandArgs all arguments to the command which are used for 
resubmission
+   */
+  public BlockingCommandListener(ExecutionHandlerContext context, 
RedisCommandType command,
+      List<RedisKey> keys, double timeoutSeconds, List<byte[]> commandArgs) {
+    this.context = context;
+    this.command = command;
+    this.timeoutNanos = (long) (timeoutSeconds * 1e9);
+    this.keys = Collections.unmodifiableList(keys);
+    this.commandArgs = commandArgs;
+    timeSubmitted = System.nanoTime();
+  }
+
+  @Override
+  public List<RedisKey> keys() {
+    return keys;
+  }
+
+  @Override
+  public EventResponse process(RedisCommandType commandType, RedisKey key) {
+    if (!keys.contains(key)) {
+      return EventResponse.CONTINUE;
+    }
+
+    resubmitCommand();
+    return EventResponse.REMOVE_AND_STOP;
+  }
+
+  @Override
+  public void resubmitCommand() {
+    // Recalculate the timeout since we've already been waiting
+    double adjustedTimeoutSeconds = 0;
+    if (timeoutNanos > 0) {
+      long adjustedTimeoutNanos = timeoutNanos - (System.nanoTime() - 
timeSubmitted);
+      adjustedTimeoutNanos = Math.max(1, adjustedTimeoutNanos);
+      adjustedTimeoutSeconds = ((double) adjustedTimeoutNanos) / 1e9;
+    }
+
+    // The commands we are currently supporting all have the timeout at the 
end of the argument
+    // list. Some newer Redis 7 commands (BLMPOP and BZMPOP) have the timeout 
as the first argument
+    // after the command. We'll need to adjust this once those commands are 
supported.
+    commandArgs.set(commandArgs.size() - 1, 
Coder.doubleToBytes(adjustedTimeoutSeconds));
+
+    context.resubmitCommand(new Command(command, commandArgs));

Review comment:
       why don't we just pass down to blocking commands a reference to the 
Command instance? That way we don't need to keep recreating its commandArgs. 
All we need to do is ask that Command instance to update one of its args (at a 
specific index) with a new byte array.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.partition.PartitionListenerAdapter;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributor extends PartitionListenerAdapter {
+
+  private final Map<RedisKey, Queue<EventListener>> listeners = new 
ConcurrentHashMap<>();
+
+  private final ScheduledThreadPoolExecutor timerExecutor =
+      new ScheduledThreadPoolExecutor(1,
+          new LoggingThreadFactory("GeodeForRedisEventTimer-", true));
+
+  private int keysRegistered = 0;

Review comment:
       since this is only for testing I think it would be better to just have 
the method that returns this value to tests to instead iterator over the data 
structures asking each listener how many keys it has. I don't think the tests 
care how performant asking for this value is. And that way we do not complicate 
the EventDistributor implementation with having to update keysRegistered in a 
thread safe way. I was reviewing the synchronization on this class and a couple 
of times I considered a change that then would be a problem because this field 
would not get updated in a thread safe way

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.partition.PartitionListenerAdapter;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributor extends PartitionListenerAdapter {
+
+  private final Map<RedisKey, Queue<EventListener>> listeners = new 
ConcurrentHashMap<>();
+
+  private final ScheduledThreadPoolExecutor timerExecutor =
+      new ScheduledThreadPoolExecutor(1,
+          new LoggingThreadFactory("GeodeForRedisEventTimer-", true));
+
+  private int keysRegistered = 0;
+
+  public EventDistributor() {
+    timerExecutor.setRemoveOnCancelPolicy(true);
+  }
+
+  public synchronized void registerListener(EventListener listener) {
+    for (RedisKey key : listener.keys()) {
+      listeners.computeIfAbsent(key, k -> new 
LinkedBlockingQueue<>()).add(listener);
+    }
+
+    if (listener.getTimeout() != 0) {
+      scheduleTimeout(listener, listener.getTimeout());
+    }
+
+    keysRegistered += listener.keys().size();
+  }
+
+  public void fireEvent(RedisCommandType command, RedisKey key) {
+    Queue<EventListener> listenerList = listeners.get(key);
+    if (listenerList == null) {
+      return;
+    }
+
+    for (EventListener listener : listenerList) {
+      if (listener.process(command, key) == EventResponse.REMOVE_AND_STOP) {
+        removeListener(listener);
+        break;
+      }
+    }
+  }
+
+  /**
+   * The total number of keys registered by all listeners (includes 
duplicates).
+   */
+  @VisibleForTesting
+  public int size() {

Review comment:
       this should probably be named "getRegisteredKeys" since it is only for 
testing. "size" makes it more likely to be used by product code.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.partition.PartitionListenerAdapter;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributor extends PartitionListenerAdapter {
+
+  private final Map<RedisKey, Queue<EventListener>> listeners = new 
ConcurrentHashMap<>();
+
+  private final ScheduledThreadPoolExecutor timerExecutor =
+      new ScheduledThreadPoolExecutor(1,
+          new LoggingThreadFactory("GeodeForRedisEventTimer-", true));
+
+  private int keysRegistered = 0;
+
+  public EventDistributor() {
+    timerExecutor.setRemoveOnCancelPolicy(true);
+  }
+
+  public synchronized void registerListener(EventListener listener) {

Review comment:
       I don't think this method needs to be synchronized. I think the only 
concurrency issue you have with these data structures is a race with 
removeListener when it completely removes the queue because it thought it was 
empty. If this method adds to the queue right after that method checks isEmpty 
then this thread will add to a queue that is not longer in the map. So you can 
protect that by narrowing you sync to just be the putIfAbsent+add call. 
   The other concurrency problem you have is making the listener itself thread 
safe. Basically you have possible races with one thread calling process on it 
and another calling scheduleTimeout or cleanup. I think you should allow the 
listener to be responsible for making sure that if it has already been 
processed then it will not be scheduled. I think you do this by adding a 
scheduleTimeout method on the listener that takes a ScheduledExecutorService 
and takes care of calling schedule and remembering the ScheduleFuture and it 
could only do this if the listener has not already been processed or closed. It 
is likely that the listener will use synchronization to make it thread safe but 
now it is on the listener instead of  the EventDistributor singleton.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.partition.PartitionListenerAdapter;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributor extends PartitionListenerAdapter {
+
+  private final Map<RedisKey, Queue<EventListener>> listeners = new 
ConcurrentHashMap<>();
+
+  private final ScheduledThreadPoolExecutor timerExecutor =
+      new ScheduledThreadPoolExecutor(1,
+          new LoggingThreadFactory("GeodeForRedisEventTimer-", true));
+
+  private int keysRegistered = 0;
+
+  public EventDistributor() {
+    timerExecutor.setRemoveOnCancelPolicy(true);
+  }
+
+  public synchronized void registerListener(EventListener listener) {
+    for (RedisKey key : listener.keys()) {
+      listeners.computeIfAbsent(key, k -> new 
LinkedBlockingQueue<>()).add(listener);

Review comment:
       should you use JavaWorkarounds.computeIfAbsent to make this call?

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.partition.PartitionListenerAdapter;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributor extends PartitionListenerAdapter {
+
+  private final Map<RedisKey, Queue<EventListener>> listeners = new 
ConcurrentHashMap<>();
+
+  private final ScheduledThreadPoolExecutor timerExecutor =
+      new ScheduledThreadPoolExecutor(1,
+          new LoggingThreadFactory("GeodeForRedisEventTimer-", true));
+
+  private int keysRegistered = 0;
+
+  public EventDistributor() {
+    timerExecutor.setRemoveOnCancelPolicy(true);
+  }
+
+  public synchronized void registerListener(EventListener listener) {
+    for (RedisKey key : listener.keys()) {
+      listeners.computeIfAbsent(key, k -> new 
LinkedBlockingQueue<>()).add(listener);
+    }
+
+    if (listener.getTimeout() != 0) {
+      scheduleTimeout(listener, listener.getTimeout());
+    }
+
+    keysRegistered += listener.keys().size();
+  }
+
+  public void fireEvent(RedisCommandType command, RedisKey key) {
+    Queue<EventListener> listenerList = listeners.get(key);
+    if (listenerList == null) {
+      return;
+    }
+
+    for (EventListener listener : listenerList) {
+      if (listener.process(command, key) == EventResponse.REMOVE_AND_STOP) {
+        removeListener(listener);
+        break;
+      }
+    }
+  }
+
+  /**
+   * The total number of keys registered by all listeners (includes 
duplicates).
+   */
+  @VisibleForTesting
+  public int size() {
+    return keysRegistered;
+  }
+
+  @Override
+  public void afterBucketRemoved(int bucketId, Iterable<?> keys) {
+    Set<EventListener> resubmittingList = new HashSet<>();
+    for (Map.Entry<RedisKey, Queue<EventListener>> entry : 
listeners.entrySet()) {
+      if (entry.getKey().getBucketId() == bucketId) {
+        resubmittingList.addAll(entry.getValue());
+      }
+    }
+
+    resubmittingList.forEach(x -> {
+      x.resubmitCommand();
+      removeListener(x);
+    });
+  }
+
+  private synchronized void removeListener(EventListener listener) {
+    boolean listenerRemoved = false;
+    for (RedisKey key : listener.keys()) {
+      Queue<EventListener> listenerList = listeners.get(key);
+      if (listenerList == null) {
+        continue;
+      }
+      listenerRemoved |= listenerList.remove(listener);
+      if (listenerList.isEmpty()) {

Review comment:
       I think the only concurrency issue you have is these two lines (isEmpty 
and remove) and the add call in registerListener. If you got rid of these two 
lines then you would have no concurrency issues but you would leave empty 
queues in the map that may never be used again. But if you do this 
listeners.remove(key) call then you have an issue. The way I addressed this in 
the pubsub data structures was to create the queue non-empty in 
registerListener and if it every finds an empty queue it refuses to add to it 
and instead spins around and reads from the map again until it find a non-empty 
one or creates its own non-empty one. To make this thread safe you would create 
you own queue class whose add and remove inc and dec an atomic. The constructor 
of that class would always start with one element so the atomic would start at 
1. If it ever goes to 0 that sticks.
   This gets complicated so for now maybe you should keep some synchronization. 
You can make the sync issue more clear by narrowing your sync to be around just 
the putIfAbsent+add call and around this isEmpty+remove calls. Also you only 
need to do the sync(isEmpty+remove) calls if listenerList.remove returned true.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@geode.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to