DonalEvans commented on a change in pull request #7323:
URL: https://github.com/apache/geode/pull/7323#discussion_r838809964
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
##########
@@ -217,6 +217,8 @@ public Object getDeserialized(boolean copyOnRead) {
}
}
+ private boolean receivedGWStopped = false;
Review comment:
Could this (and the accessor methods for it) be renamed to something a
little clearer? Maybe "receivedGatewaySenderStoppedMessage"?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
##########
@@ -771,6 +771,8 @@ public synchronized String dump() {
private boolean regionCreationNotified;
+ private boolean sentGWStopped = false;
Review comment:
Could this (and the accessor methods for it) be renamed to something a
little clearer? Maybe "sentGatewaySenderStoppedMessage"?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
##########
@@ -257,11 +273,47 @@ protected void loadEventsFromTempQueue() {
}
getInitializationLock().writeLock().unlock();
}
+ if (regionToDispatchedKeysMap.size() > 0
+ && getPartitionedRegion().getRegionAdvisor() != null) {
+ Set<InternalDistributedMember> recipients =
+ getPartitionedRegion().getRegionAdvisor().adviseDataStore();
+
+ if (recipients.isEmpty()) {
+ return;
+ }
+
+ InternalDistributedSystem ids =
getCache().getInternalDistributedSystem();
+ DistributionManager dm = ids.getDistributionManager();
+ dm.retainMembersWithSameOrNewerVersion(recipients,
KnownVersion.GEODE_1_15_0);
+
+ if (!recipients.isEmpty()) {
+ ParallelQueueSetPossibleDuplicateMessage pqspdm =
+ new
ParallelQueueSetPossibleDuplicateMessage(LOAD_FROM_TEMP_QUEUE,
+ regionToDispatchedKeysMap);
+ pqspdm.setRecipients(recipients);
+ dm.putOutgoing(pqspdm);
+ }
+ }
}
}
+ }
+ }
+
+ private void addDuplicateEvent(Map<String, Map<Integer, List<Object>>>
regionToDispatchedKeysMap,
+ GatewaySenderEventImpl event) {
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+ regionToDispatchedKeysMap.get(getPartitionedRegion().getFullPath());
+ if (bucketIdToDispatchedKeys == null) {
+ bucketIdToDispatchedKeys = new ConcurrentHashMap<>();
+ regionToDispatchedKeysMap.put(getPartitionedRegion().getFullPath(),
bucketIdToDispatchedKeys);
+ }
- // }
+ List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(getId());
+ if (dispatchedKeys == null) {
+ dispatchedKeys = new ArrayList<>();
+ bucketIdToDispatchedKeys.put(getId(), dispatchedKeys);
}
Review comment:
This can be simplified to:
```
List<Object> dispatchedKeys =
bucketIdToDispatchedKeys.computeIfAbsent(getId(), k -> new
ArrayList<>());
```
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static
org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.PooledDistributionMessage;
+import org.apache.geode.internal.cache.BucketRegionQueue;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+/**
+ * Removes a batch of events from the remote secondary queues
+ *
+ * @since GemFire 8.0
+ */
Review comment:
This comment is incorrect.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
##########
@@ -1160,6 +1169,7 @@ private boolean acquiredPrimaryLock() {
try {
synchronized (this) {
if (isHosting() && (isVolunteering() || isBecomingPrimary())) {
+ hasBecomePrimary = isBecomingPrimary();
Review comment:
This might be better placed inside the if statement on line 1177, since
we haven't actually become primary until
`requestPrimaryState(IS_PRIMARY_HOSTING)` returns. Also, once this is set to
true, there is no way for it to be set to false, which means that if a bucket
becomes primary, then stops being primary (due to a rebalance, for example)
this field will still be true. It might be better to instead change the
visibility of the `isBecomingPrimary()` method to allow it to be called from
`BucketRegionQueue.beforeAcquiringPrimaryState()`.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
##########
@@ -2187,6 +2187,13 @@ boolean doClose(String reason, Throwable
systemFailureCause, boolean keepAlive,
return false;
}
+ for (GatewaySender sender : allGatewaySenders) {
+ try {
+ sender.prepareForStop();
+ } catch (Exception ignore) {
+ }
Review comment:
Is it safe to just silently ignore any exceptions seen here? Also, would
this call to `sender.prepareForStop()` be better on line 2246, just before
we're stopping the gateway senders? Having it up here by itself seems
potentially confusing.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
##########
@@ -223,7 +231,12 @@ protected void loadEventsFromTempQueue() {
// .getBucketToTempQueueMap().get(getId());
if (tempQueue != null && !tempQueue.isEmpty()) {
synchronized (tempQueue) {
+ Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap =
Review comment:
This would be better named as something like "regionToDuplicateEventsMap"
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
##########
@@ -257,11 +273,47 @@ protected void loadEventsFromTempQueue() {
}
getInitializationLock().writeLock().unlock();
}
+ if (regionToDispatchedKeysMap.size() > 0
+ && getPartitionedRegion().getRegionAdvisor() != null) {
+ Set<InternalDistributedMember> recipients =
+ getPartitionedRegion().getRegionAdvisor().adviseDataStore();
+
+ if (recipients.isEmpty()) {
+ return;
+ }
+
+ InternalDistributedSystem ids =
getCache().getInternalDistributedSystem();
+ DistributionManager dm = ids.getDistributionManager();
+ dm.retainMembersWithSameOrNewerVersion(recipients,
KnownVersion.GEODE_1_15_0);
+
+ if (!recipients.isEmpty()) {
+ ParallelQueueSetPossibleDuplicateMessage pqspdm =
Review comment:
Could this be named "possibleDuplicateMessage" instead? Variable names
like this are often confusing, and we don't need to try and save characters in
the source code by using abbreviations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]