DonalEvans commented on a change in pull request #7323: URL: https://github.com/apache/geode/pull/7323#discussion_r838809964
########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java ########## @@ -217,6 +217,8 @@ public Object getDeserialized(boolean copyOnRead) { } } + private boolean receivedGWStopped = false; Review comment: Could this (and the accessor methods for it) be renamed to something a little clearer? Maybe "receivedGatewaySenderStoppedMessage"? ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ########## @@ -771,6 +771,8 @@ public synchronized String dump() { private boolean regionCreationNotified; + private boolean sentGWStopped = false; Review comment: Could this (and the accessor methods for it) be renamed to something a little clearer? Maybe "sentGatewaySenderStoppedMessage"? ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java ########## @@ -257,11 +273,47 @@ protected void loadEventsFromTempQueue() { } getInitializationLock().writeLock().unlock(); } + if (regionToDispatchedKeysMap.size() > 0 + && getPartitionedRegion().getRegionAdvisor() != null) { + Set<InternalDistributedMember> recipients = + getPartitionedRegion().getRegionAdvisor().adviseDataStore(); + + if (recipients.isEmpty()) { + return; + } + + InternalDistributedSystem ids = getCache().getInternalDistributedSystem(); + DistributionManager dm = ids.getDistributionManager(); + dm.retainMembersWithSameOrNewerVersion(recipients, KnownVersion.GEODE_1_15_0); + + if (!recipients.isEmpty()) { + ParallelQueueSetPossibleDuplicateMessage pqspdm = + new ParallelQueueSetPossibleDuplicateMessage(LOAD_FROM_TEMP_QUEUE, + regionToDispatchedKeysMap); + pqspdm.setRecipients(recipients); + dm.putOutgoing(pqspdm); + } + } } } + } + } + + private void addDuplicateEvent(Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap, + GatewaySenderEventImpl event) { + Map<Integer, List<Object>> bucketIdToDispatchedKeys = + regionToDispatchedKeysMap.get(getPartitionedRegion().getFullPath()); + if (bucketIdToDispatchedKeys == null) { + bucketIdToDispatchedKeys = new ConcurrentHashMap<>(); + regionToDispatchedKeysMap.put(getPartitionedRegion().getFullPath(), bucketIdToDispatchedKeys); + } - // } + List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(getId()); + if (dispatchedKeys == null) { + dispatchedKeys = new ArrayList<>(); + bucketIdToDispatchedKeys.put(getId(), dispatchedKeys); } Review comment: This can be simplified to: ``` List<Object> dispatchedKeys = bucketIdToDispatchedKeys.computeIfAbsent(getId(), k -> new ArrayList<>()); ``` ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.parallel; + +import static org.apache.geode.cache.Region.SEPARATOR; +import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.DataSerializer; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.PooledDistributionMessage; +import org.apache.geode.internal.cache.BucketRegionQueue; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.LocalRegion.InitializationLevel; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.logging.internal.log4j.api.LogService; + +/** + * Removes a batch of events from the remote secondary queues + * + * @since GemFire 8.0 + */ Review comment: This comment is incorrect. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java ########## @@ -1160,6 +1169,7 @@ private boolean acquiredPrimaryLock() { try { synchronized (this) { if (isHosting() && (isVolunteering() || isBecomingPrimary())) { + hasBecomePrimary = isBecomingPrimary(); Review comment: This might be better placed inside the if statement on line 1177, since we haven't actually become primary until `requestPrimaryState(IS_PRIMARY_HOSTING)` returns. Also, once this is set to true, there is no way for it to be set to false, which means that if a bucket becomes primary, then stops being primary (due to a rebalance, for example) this field will still be true. It might be better to instead change the visibility of the `isBecomingPrimary()` method to allow it to be called from `BucketRegionQueue.beforeAcquiringPrimaryState()`. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ########## @@ -2187,6 +2187,13 @@ boolean doClose(String reason, Throwable systemFailureCause, boolean keepAlive, return false; } + for (GatewaySender sender : allGatewaySenders) { + try { + sender.prepareForStop(); + } catch (Exception ignore) { + } Review comment: Is it safe to just silently ignore any exceptions seen here? Also, would this call to `sender.prepareForStop()` be better on line 2246, just before we're stopping the gateway senders? Having it up here by itself seems potentially confusing. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java ########## @@ -223,7 +231,12 @@ protected void loadEventsFromTempQueue() { // .getBucketToTempQueueMap().get(getId()); if (tempQueue != null && !tempQueue.isEmpty()) { synchronized (tempQueue) { + Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap = Review comment: This would be better named as something like "regionToDuplicateEventsMap" ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java ########## @@ -257,11 +273,47 @@ protected void loadEventsFromTempQueue() { } getInitializationLock().writeLock().unlock(); } + if (regionToDispatchedKeysMap.size() > 0 + && getPartitionedRegion().getRegionAdvisor() != null) { + Set<InternalDistributedMember> recipients = + getPartitionedRegion().getRegionAdvisor().adviseDataStore(); + + if (recipients.isEmpty()) { + return; + } + + InternalDistributedSystem ids = getCache().getInternalDistributedSystem(); + DistributionManager dm = ids.getDistributionManager(); + dm.retainMembersWithSameOrNewerVersion(recipients, KnownVersion.GEODE_1_15_0); + + if (!recipients.isEmpty()) { + ParallelQueueSetPossibleDuplicateMessage pqspdm = Review comment: Could this be named "possibleDuplicateMessage" instead? Variable names like this are often confusing, and we don't need to try and save characters in the source code by using abbreviations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@geode.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org