dschneider-pivotal commented on a change in pull request #7408: URL: https://github.com/apache/geode/pull/7408#discussion_r824943346
########## File path: geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.Collections; +import java.util.List; + +import org.apache.geode.redis.internal.commands.Command; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.commands.executor.RedisResponse; +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class BlockingCommandListener implements EventListener { + + private final ExecutionHandlerContext context; + private final RedisCommandType command; + private final List<RedisKey> keys; + private final List<byte[]> commandArgs; + private final long timeoutNanos; + private final long timeSubmitted; + private Runnable cleanupTask; + + /** + * Constructor to create an instance of a BlockingCommandListener in response to a blocking + * command. When receiving a relevant event, blocking commands simply resubmit the command + * into the Netty pipeline. + * + * @param context the associated ExecutionHandlerContext + * @param command the blocking command associated with this listener + * @param keys the list of keys the command is interested in + * @param timeoutSeconds the timeout for the command to block in seconds + * @param commandArgs all arguments to the command which are used for resubmission + */ + public BlockingCommandListener(ExecutionHandlerContext context, RedisCommandType command, + List<RedisKey> keys, double timeoutSeconds, List<byte[]> commandArgs) { + this.context = context; + this.command = command; Review comment: I was confused by this being named "command". I thought it was the actual Command instance. "commandType" would be better. ########## File path: geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.Collections; +import java.util.List; + +import org.apache.geode.redis.internal.commands.Command; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.commands.executor.RedisResponse; +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class BlockingCommandListener implements EventListener { + + private final ExecutionHandlerContext context; + private final RedisCommandType command; + private final List<RedisKey> keys; + private final List<byte[]> commandArgs; + private final long timeoutNanos; + private final long timeSubmitted; + private Runnable cleanupTask; + + /** + * Constructor to create an instance of a BlockingCommandListener in response to a blocking + * command. When receiving a relevant event, blocking commands simply resubmit the command + * into the Netty pipeline. + * + * @param context the associated ExecutionHandlerContext + * @param command the blocking command associated with this listener + * @param keys the list of keys the command is interested in + * @param timeoutSeconds the timeout for the command to block in seconds + * @param commandArgs all arguments to the command which are used for resubmission + */ + public BlockingCommandListener(ExecutionHandlerContext context, RedisCommandType command, + List<RedisKey> keys, double timeoutSeconds, List<byte[]> commandArgs) { + this.context = context; + this.command = command; + this.timeoutNanos = (long) (timeoutSeconds * 1e9); + this.keys = Collections.unmodifiableList(keys); + this.commandArgs = commandArgs; + timeSubmitted = System.nanoTime(); + } + + @Override + public List<RedisKey> keys() { + return keys; + } + + @Override + public EventResponse process(RedisCommandType commandType, RedisKey key) { + if (!keys.contains(key)) { + return EventResponse.CONTINUE; + } + + resubmitCommand(); + return EventResponse.REMOVE_AND_STOP; + } + + @Override + public void resubmitCommand() { + // Recalculate the timeout since we've already been waiting + double adjustedTimeoutSeconds = 0; + if (timeoutNanos > 0) { + long adjustedTimeoutNanos = timeoutNanos - (System.nanoTime() - timeSubmitted); + adjustedTimeoutNanos = Math.max(1, adjustedTimeoutNanos); + adjustedTimeoutSeconds = ((double) adjustedTimeoutNanos) / 1e9; + } + + // The commands we are currently supporting all have the timeout at the end of the argument + // list. Some newer Redis 7 commands (BLMPOP and BZMPOP) have the timeout as the first argument + // after the command. We'll need to adjust this once those commands are supported. + commandArgs.set(commandArgs.size() - 1, Coder.doubleToBytes(adjustedTimeoutSeconds)); + + context.resubmitCommand(new Command(command, commandArgs)); Review comment: why don't we just pass down to blocking commands a reference to the Command instance? That way we don't need to keep recreating its commandArgs. All we need to do is ask that Command instance to update one of its args (at a specific index) with a new byte array. ########## File path: geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.cache.partition.PartitionListenerAdapter; +import org.apache.geode.logging.internal.executors.LoggingThreadFactory; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; + +public class EventDistributor extends PartitionListenerAdapter { + + private final Map<RedisKey, Queue<EventListener>> listeners = new ConcurrentHashMap<>(); + + private final ScheduledThreadPoolExecutor timerExecutor = + new ScheduledThreadPoolExecutor(1, + new LoggingThreadFactory("GeodeForRedisEventTimer-", true)); + + private int keysRegistered = 0; Review comment: since this is only for testing I think it would be better to just have the method that returns this value to tests to instead iterator over the data structures asking each listener how many keys it has. I don't think the tests care how performant asking for this value is. And that way we do not complicate the EventDistributor implementation with having to update keysRegistered in a thread safe way. I was reviewing the synchronization on this class and a couple of times I considered a change that then would be a problem because this field would not get updated in a thread safe way ########## File path: geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.cache.partition.PartitionListenerAdapter; +import org.apache.geode.logging.internal.executors.LoggingThreadFactory; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; + +public class EventDistributor extends PartitionListenerAdapter { + + private final Map<RedisKey, Queue<EventListener>> listeners = new ConcurrentHashMap<>(); + + private final ScheduledThreadPoolExecutor timerExecutor = + new ScheduledThreadPoolExecutor(1, + new LoggingThreadFactory("GeodeForRedisEventTimer-", true)); + + private int keysRegistered = 0; + + public EventDistributor() { + timerExecutor.setRemoveOnCancelPolicy(true); + } + + public synchronized void registerListener(EventListener listener) { + for (RedisKey key : listener.keys()) { + listeners.computeIfAbsent(key, k -> new LinkedBlockingQueue<>()).add(listener); + } + + if (listener.getTimeout() != 0) { + scheduleTimeout(listener, listener.getTimeout()); + } + + keysRegistered += listener.keys().size(); + } + + public void fireEvent(RedisCommandType command, RedisKey key) { + Queue<EventListener> listenerList = listeners.get(key); + if (listenerList == null) { + return; + } + + for (EventListener listener : listenerList) { + if (listener.process(command, key) == EventResponse.REMOVE_AND_STOP) { + removeListener(listener); + break; + } + } + } + + /** + * The total number of keys registered by all listeners (includes duplicates). + */ + @VisibleForTesting + public int size() { Review comment: this should probably be named "getRegisteredKeys" since it is only for testing. "size" makes it more likely to be used by product code. ########## File path: geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.cache.partition.PartitionListenerAdapter; +import org.apache.geode.logging.internal.executors.LoggingThreadFactory; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; + +public class EventDistributor extends PartitionListenerAdapter { + + private final Map<RedisKey, Queue<EventListener>> listeners = new ConcurrentHashMap<>(); + + private final ScheduledThreadPoolExecutor timerExecutor = + new ScheduledThreadPoolExecutor(1, + new LoggingThreadFactory("GeodeForRedisEventTimer-", true)); + + private int keysRegistered = 0; + + public EventDistributor() { + timerExecutor.setRemoveOnCancelPolicy(true); + } + + public synchronized void registerListener(EventListener listener) { Review comment: I don't think this method needs to be synchronized. I think the only concurrency issue you have with these data structures is a race with removeListener when it completely removes the queue because it thought it was empty. If this method adds to the queue right after that method checks isEmpty then this thread will add to a queue that is not longer in the map. So you can protect that by narrowing you sync to just be the putIfAbsent+add call. The other concurrency problem you have is making the listener itself thread safe. Basically you have possible races with one thread calling process on it and another calling scheduleTimeout or cleanup. I think you should allow the listener to be responsible for making sure that if it has already been processed then it will not be scheduled. I think you do this by adding a scheduleTimeout method on the listener that takes a ScheduledExecutorService and takes care of calling schedule and remembering the ScheduleFuture and it could only do this if the listener has not already been processed or closed. It is likely that the listener will use synchronization to make it thread safe but now it is on the listener instead of the EventDistributor singleton. ########## File path: geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.cache.partition.PartitionListenerAdapter; +import org.apache.geode.logging.internal.executors.LoggingThreadFactory; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; + +public class EventDistributor extends PartitionListenerAdapter { + + private final Map<RedisKey, Queue<EventListener>> listeners = new ConcurrentHashMap<>(); + + private final ScheduledThreadPoolExecutor timerExecutor = + new ScheduledThreadPoolExecutor(1, + new LoggingThreadFactory("GeodeForRedisEventTimer-", true)); + + private int keysRegistered = 0; + + public EventDistributor() { + timerExecutor.setRemoveOnCancelPolicy(true); + } + + public synchronized void registerListener(EventListener listener) { + for (RedisKey key : listener.keys()) { + listeners.computeIfAbsent(key, k -> new LinkedBlockingQueue<>()).add(listener); Review comment: should you use JavaWorkarounds.computeIfAbsent to make this call? ########## File path: geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.cache.partition.PartitionListenerAdapter; +import org.apache.geode.logging.internal.executors.LoggingThreadFactory; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; + +public class EventDistributor extends PartitionListenerAdapter { + + private final Map<RedisKey, Queue<EventListener>> listeners = new ConcurrentHashMap<>(); + + private final ScheduledThreadPoolExecutor timerExecutor = + new ScheduledThreadPoolExecutor(1, + new LoggingThreadFactory("GeodeForRedisEventTimer-", true)); + + private int keysRegistered = 0; + + public EventDistributor() { + timerExecutor.setRemoveOnCancelPolicy(true); + } + + public synchronized void registerListener(EventListener listener) { + for (RedisKey key : listener.keys()) { + listeners.computeIfAbsent(key, k -> new LinkedBlockingQueue<>()).add(listener); + } + + if (listener.getTimeout() != 0) { + scheduleTimeout(listener, listener.getTimeout()); + } + + keysRegistered += listener.keys().size(); + } + + public void fireEvent(RedisCommandType command, RedisKey key) { + Queue<EventListener> listenerList = listeners.get(key); + if (listenerList == null) { + return; + } + + for (EventListener listener : listenerList) { + if (listener.process(command, key) == EventResponse.REMOVE_AND_STOP) { + removeListener(listener); + break; + } + } + } + + /** + * The total number of keys registered by all listeners (includes duplicates). + */ + @VisibleForTesting + public int size() { + return keysRegistered; + } + + @Override + public void afterBucketRemoved(int bucketId, Iterable<?> keys) { + Set<EventListener> resubmittingList = new HashSet<>(); + for (Map.Entry<RedisKey, Queue<EventListener>> entry : listeners.entrySet()) { + if (entry.getKey().getBucketId() == bucketId) { + resubmittingList.addAll(entry.getValue()); + } + } + + resubmittingList.forEach(x -> { + x.resubmitCommand(); + removeListener(x); + }); + } + + private synchronized void removeListener(EventListener listener) { + boolean listenerRemoved = false; + for (RedisKey key : listener.keys()) { + Queue<EventListener> listenerList = listeners.get(key); + if (listenerList == null) { + continue; + } + listenerRemoved |= listenerList.remove(listener); + if (listenerList.isEmpty()) { Review comment: I think the only concurrency issue you have is these two lines (isEmpty and remove) and the add call in registerListener. If you got rid of these two lines then you would have no concurrency issues but you would leave empty queues in the map that may never be used again. But if you do this listeners.remove(key) call then you have an issue. The way I addressed this in the pubsub data structures was to create the queue non-empty in registerListener and if it every finds an empty queue it refuses to add to it and instead spins around and reads from the map again until it find a non-empty one or creates its own non-empty one. To make this thread safe you would create you own queue class whose add and remove inc and dec an atomic. The constructor of that class would always start with one element so the atomic would start at 1. If it ever goes to 0 that sticks. This gets complicated so for now maybe you should keep some synchronization. You can make the sync issue more clear by narrowing your sync to be around just the putIfAbsent+add call and around this isEmpty+remove calls. Also you only need to do the sync(isEmpty+remove) calls if listenerList.remove returned true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@geode.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org