This is an automated email from the ASF dual-hosted git repository. agingade pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 1f77a60 GEODE-5111: Set offline members to null only when done waiting for them (#1873) 1f77a60 is described below commit 1f77a603e5766ed14e980c2ca52902219c2bdab0 Author: Dale Emery <d...@dhemery.com> AuthorDate: Tue May 1 09:55:01 2018 -0700 GEODE-5111: Set offline members to null only when done waiting for them (#1873) * GEODE-5111: Set offline members to null only when done waiting for them --- .../internal/cache/BucketPersistenceAdvisor.java | 57 +- .../internal/cache/CacheDistributionAdvisor.java | 2 +- .../persistence/InternalPersistenceAdvisor.java | 44 + .../persistence/MembershipChangeListener.java | 127 +++ .../cache/persistence/PersistenceAdvisor.java | 11 +- .../cache/persistence/PersistenceAdvisorImpl.java | 949 ++++++++------------- .../PersistenceInitialImageAdvisor.java | 229 +++++ .../geode/internal/lang/SystemPropertyHelper.java | 4 + .../PersistenceInitialImageAdvisorTest.java | 120 +++ 9 files changed, 932 insertions(+), 611 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java index e3602d2..00b53f2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java @@ -31,6 +31,7 @@ import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.PartitionedRegion.BucketLock; import org.apache.geode.internal.cache.partitioned.RedundancyAlreadyMetException; +import org.apache.geode.internal.cache.persistence.MembershipChangeListener; import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl; import org.apache.geode.internal.cache.persistence.PersistentMemberID; import org.apache.geode.internal.cache.persistence.PersistentMemberManager; @@ -74,7 +75,7 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { public void recoveryDone(RuntimeException e) { this.recovering = false; if (!getPersistedMembers().isEmpty()) { - ((BucketAdvisor) advisor).setHadPrimary(); + ((BucketAdvisor) cacheDistributionAdvisor).setHadPrimary(); } // Make sure any removes that we saw during recovery are // applied. @@ -94,7 +95,8 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { } } - protected void checkInterruptedByShutdownAll() { + @Override + public void checkInterruptedByShutdownAll() { // when ShutdownAll is on-going, break all the GII for BR if (proxyBucket.getCache().isCacheAtShutdownAll()) { throw proxyBucket.getCache().getCacheClosedException("Cache is being closed by ShutdownAll"); @@ -107,7 +109,7 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { } @Override - protected void beginWaitingForMembershipChange(Set<PersistentMemberID> membersToWaitFor) { + public void beginWaitingForMembershipChange(Set<PersistentMemberID> membersToWaitFor) { if (recovering) { bucketLock.unlock(); } else { @@ -121,14 +123,13 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { } @Override - protected void logWaitingForMember(Set<PersistentMemberID> allMembersToWaitFor, - Set<PersistentMemberID> offlineMembersToWaitFor) { + public void logWaitingForMembers() { // We only log the bucket level information at fine level. if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { Set<String> membersToWaitForPrettyFormat = new HashSet<String>(); - if (offlineMembersToWaitFor != null && !offlineMembersToWaitFor.isEmpty()) { - TransformUtils.transform(offlineMembersToWaitFor, membersToWaitForPrettyFormat, + if (offlineMembersWaitingFor != null && !offlineMembersWaitingFor.isEmpty()) { + TransformUtils.transform(offlineMembersWaitingFor, membersToWaitForPrettyFormat, TransformUtils.persistentMemberIdToLogEntryTransformer); logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, LocalizedMessage.create( LocalizedStrings.BucketPersistenceAdvisor_WAITING_FOR_LATEST_MEMBER, @@ -137,7 +138,7 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()), membersToWaitForPrettyFormat})); } else { - TransformUtils.transform(allMembersToWaitFor, membersToWaitForPrettyFormat, + TransformUtils.transform(allMembersWaitingFor, membersToWaitForPrettyFormat, TransformUtils.persistentMemberIdToLogEntryTransformer); if (logger.isDebugEnabled()) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, @@ -154,7 +155,7 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { } @Override - protected void endWaitingForMembershipChange() { + public void endWaitingForMembershipChange() { if (recovering) { bucketLock.lock(); // We allow regions with persistent colocated children to exceed redundancy @@ -193,17 +194,17 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { * */ public void initializeMembershipView() { - MembershipChangeListener listener = new MembershipChangeListener(); + MembershipChangeListener listener = new MembershipChangeListener(this); addListener(listener); boolean interrupted = false; try { while (!isClosed) { - advisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null); + cacheDistributionAdvisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null); // Look for any online copies of the bucket. // If there are any, get a membership view from them. Map<InternalDistributedMember, PersistentMemberID> onlineMembers = - advisor.adviseInitializedPersistentMembers(); + cacheDistributionAdvisor.adviseInitializedPersistentMembers(); if (onlineMembers != null) { if (updateMembershipView(onlineMembers.keySet())) { break; @@ -211,7 +212,7 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { } Set<InternalDistributedMember> postRecoveryMembers = - ((BucketAdvisor) advisor).adviseRecoveredFromDisk(); + ((BucketAdvisor) cacheDistributionAdvisor).adviseRecoveredFromDisk(); if (postRecoveryMembers != null) { if (updateMembershipView(postRecoveryMembers)) { break; @@ -221,8 +222,9 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { Set<PersistentMemberID> membersToWaitFor = getPersistedMembers(); if (!membersToWaitFor.isEmpty()) { + setWaitingOnMembers(membersToWaitFor, membersToWaitFor); try { - listener.waitForChange(membersToWaitFor, membersToWaitFor); + listener.waitForChange(); } catch (InterruptedException e) { interrupted = true; } @@ -232,6 +234,7 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { } } } finally { + setWaitingOnMembers(null, null); removeListener(listener); if (interrupted) { Thread.currentThread().interrupt(); @@ -258,12 +261,14 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { this.resetState(); } + @Override public boolean acquireTieLock() { // We don't actually need to get a dlock here for PRs, we're already // holding the bucket lock when we create a bucket region return true; } + @Override public void releaseTieLock() { // We don't actually need to get a dlock here for PRs, we're already // holding the bucket lock when we create a bucket region @@ -281,14 +286,14 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { return super.getMissingMembers(); } else { Set<PersistentMemberID> offlineMembers = getPersistedMembers(); - offlineMembers.removeAll(advisor.advisePersistentMembers().values()); + offlineMembers.removeAll(cacheDistributionAdvisor.advisePersistentMembers().values()); return offlineMembers; } } @Override public PersistentMemberID generatePersistentID() { - PersistentMemberID id = storage.generatePersistentID(); + PersistentMemberID id = persistentMemberView.generatePersistentID(); if (id == null) { return id; } else { @@ -321,37 +326,37 @@ public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl { * */ public void dump(String infoMsg) { - storage.getOnlineMembers(); - storage.getOfflineMembers(); - storage.getOfflineAndEqualMembers(); - storage.getMyInitializingID(); - storage.getMyPersistentID(); + persistentMemberView.getOnlineMembers(); + persistentMemberView.getOfflineMembers(); + persistentMemberView.getOfflineAndEqualMembers(); + persistentMemberView.getMyInitializingID(); + persistentMemberView.getMyPersistentID(); final StringBuilder buf = new StringBuilder(2000); if (infoMsg != null) { buf.append(infoMsg); buf.append(": "); } buf.append("\nMY PERSISTENT ID:\n"); - buf.append(storage.getMyPersistentID()); + buf.append(persistentMemberView.getMyPersistentID()); buf.append("\nMY INITIALIZING ID:\n"); - buf.append(storage.getMyInitializingID()); + buf.append(persistentMemberView.getMyInitializingID()); buf.append("\nONLINE MEMBERS:\n"); - for (PersistentMemberID id : storage.getOnlineMembers()) { + for (PersistentMemberID id : persistentMemberView.getOnlineMembers()) { buf.append("\t"); buf.append(id); buf.append("\n"); } buf.append("\nOFFLINE MEMBERS:\n"); - for (PersistentMemberID id : storage.getOfflineMembers()) { + for (PersistentMemberID id : persistentMemberView.getOfflineMembers()) { buf.append("\t"); buf.append(id); buf.append("\n"); } buf.append("\nOFFLINE AND EQUAL MEMBERS:\n"); - for (PersistentMemberID id : storage.getOfflineAndEqualMembers()) { + for (PersistentMemberID id : persistentMemberView.getOfflineAndEqualMembers()) { buf.append("\t"); buf.append(id); buf.append("\n"); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java index d9efd55..8d2cce8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java @@ -955,7 +955,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { private final Map<InternalDistributedMember, CacheProfile> memberProfiles; - protected InitialImageAdvice(Set<InternalDistributedMember> replicates, + public InitialImageAdvice(Set<InternalDistributedMember> replicates, Set<InternalDistributedMember> others, Set<InternalDistributedMember> preloaded, Set<InternalDistributedMember> empties, Set<InternalDistributedMember> uninitialized, Set<InternalDistributedMember> nonPersistent, diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/InternalPersistenceAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/InternalPersistenceAdvisor.java new file mode 100644 index 0000000..53006cb --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/InternalPersistenceAdvisor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.persistence; + + +import java.util.Set; + +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.internal.cache.CacheDistributionAdvisor; + +public interface InternalPersistenceAdvisor extends PersistenceAdvisor { + + default void beginWaitingForMembershipChange(Set<PersistentMemberID> membersToWaitFor) {} + + void checkInterruptedByShutdownAll(); + + void clearEqualMembers(); + + default void endWaitingForMembershipChange() {} + + Set<PersistentMemberID> getMembersToWaitFor(Set<PersistentMemberID> previouslyOnlineMembers, + Set<PersistentMemberID> offlineMembers) throws ReplyException; + + boolean isClosed(); + + CacheDistributionAdvisor getCacheDistributionAdvisor(); + + void logWaitingForMembers(); + + void setWaitingOnMembers(Set<PersistentMemberID> allMembersToWaitFor, + Set<PersistentMemberID> offlineMembersToWaitFor); +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/MembershipChangeListener.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/MembershipChangeListener.java new file mode 100644 index 0000000..86c1ac8 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/MembershipChangeListener.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.persistence; + +import static org.apache.geode.internal.lang.SystemPropertyHelper.PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS; +import static org.apache.geode.internal.lang.SystemPropertyHelper.getProductIntegerProperty; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Set; +import java.util.function.BooleanSupplier; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; + +public class MembershipChangeListener implements MembershipListener, PersistentStateListener { + private static final int POLL_INTERVAL_MILLIS = 100; + + private final Runnable warning; + private final BooleanSupplier cancelCondition; + private final Duration pollDuration; + private final Duration warningDelay; + + private boolean membershipChanged; + private boolean warned; + + public MembershipChangeListener(InternalPersistenceAdvisor persistenceAdvisor) { + warningDelay = warningDelay(persistenceAdvisor); + cancelCondition = cancelCondition(persistenceAdvisor); + warning = persistenceAdvisor::logWaitingForMembers; + pollDuration = pollDuration(); + } + + private Duration warningDelay(InternalPersistenceAdvisor persistenceAdvisor) { + return Duration.ofSeconds(persistenceAdvisor.getCacheDistributionAdvisor() + .getDistributionManager().getConfig().getAckWaitThreshold()); + } + + public synchronized void waitForChange() throws InterruptedException { + Instant now = Instant.now(); + Instant timeoutTime = now.plus(pollDuration); + Instant warningTime = now.plus(warningDelay); + + while (!membershipChanged && !cancelCondition.getAsBoolean() + && Instant.now().isBefore(timeoutTime)) { + warnOnceAfter(warningTime); + wait(POLL_INTERVAL_MILLIS); + } + membershipChanged = false; + } + + private void warnOnceAfter(Instant warningTime) { + if (!warned && warningTime.isBefore(Instant.now())) { + warning.run(); + warned = true; + } + } + + private synchronized void afterMembershipChange() { + membershipChanged = true; + notifyAll(); + } + + @Override + public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) { + afterMembershipChange(); + } + + @Override + public void memberDeparted(DistributionManager distributionManager, InternalDistributedMember id, + boolean crashed) { + afterMembershipChange(); + } + + @Override + public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id, + InternalDistributedMember whoSuspected, String reason) {} + + @Override + public void quorumLost(DistributionManager distributionManager, + Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {} + + @Override + public void memberOffline(InternalDistributedMember member, PersistentMemberID persistentID) { + afterMembershipChange(); + } + + @Override + public void memberOnline(InternalDistributedMember member, PersistentMemberID persistentID) { + afterMembershipChange(); + } + + @Override + public void memberRemoved(PersistentMemberID id, boolean revoked) { + afterMembershipChange(); + } + + private static BooleanSupplier cancelCondition(InternalPersistenceAdvisor persistenceAdvisor) { + CancelCriterion cancelCriterion = + persistenceAdvisor.getCacheDistributionAdvisor().getAdvisee().getCancelCriterion(); + return () -> { + persistenceAdvisor.checkInterruptedByShutdownAll(); + cancelCriterion.checkCancelInProgress(null); + return persistenceAdvisor.isClosed(); + }; + } + + private static Duration pollDuration() { + return Duration + .ofSeconds(getProductIntegerProperty(PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS).orElse(5)); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisor.java index 1ca82e4..c1204b5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisor.java @@ -21,7 +21,7 @@ import java.util.Set; import org.apache.geode.cache.persistence.ConflictingPersistentDataException; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.CacheDistributionAdvisor; +import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice; public interface PersistenceAdvisor { @@ -37,7 +37,6 @@ public interface PersistenceAdvisor { */ boolean acquireTieLock(); - /** * Determine the state of this member on it's peers, along with the PersistentMemberID of those * peers. @@ -45,7 +44,7 @@ public interface PersistenceAdvisor { * @return a map from the peers persistentId to the state of this member according to that peer. */ PersistentStateQueryResults getMyStateOnMembers(Set<InternalDistributedMember> members) - throws ReplyException, InterruptedException; + throws ReplyException; /** * Retrieve the state of a particular member from storage. @@ -143,7 +142,7 @@ public interface PersistenceAdvisor { * @return true if we detected that we actually have the same data on disk as another member. */ boolean checkMyStateOnMembers(Set<InternalDistributedMember> replicates) - throws ReplyException, InterruptedException, ConflictingPersistentDataException; + throws ReplyException, ConflictingPersistentDataException; void releaseTieLock(); @@ -156,8 +155,8 @@ public interface PersistenceAdvisor { * @throws ConflictingPersistentDataException if there are active members which are not based on * the state that is persisted in this member. */ - CacheDistributionAdvisor.InitialImageAdvice getInitialImageAdvice( - CacheDistributionAdvisor.InitialImageAdvice previousAdvice, boolean recoverFromDisk); + InitialImageAdvice getInitialImageAdvice(InitialImageAdvice previousAdvice, + boolean hasDiskImageToRecoverFrom); /** * Returns true if this member used to host data. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java index 35ca50d..e5b1ef7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java @@ -18,11 +18,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.Logger; @@ -32,9 +30,7 @@ import org.apache.geode.cache.persistence.ConflictingPersistentDataException; import org.apache.geode.cache.persistence.RevokedPersistentDataException; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; -import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.distributed.internal.MembershipListener; import org.apache.geode.distributed.internal.ProfileListener; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; @@ -50,68 +46,70 @@ import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.process.StartupStatus; import org.apache.geode.internal.util.TransformUtils; -public class PersistenceAdvisorImpl implements PersistenceAdvisor { +public class PersistenceAdvisorImpl implements InternalPersistenceAdvisor { private static final Logger logger = LogService.getLogger(); + private static final PersistenceAdvisorObserver DEFAULT_PERSISTENCE_ADVISOR_OBSERVER = s -> { + }; + private static PersistenceAdvisorObserver persistenceAdvisorObserver = + DEFAULT_PERSISTENCE_ADVISOR_OBSERVER; - protected CacheDistributionAdvisor advisor; - private DistributedLockService dl; - protected String regionPath; - protected PersistentMemberView storage; - protected volatile boolean online = false; - private volatile Set<PersistentStateListener> listeners = Collections.emptySet(); - private DiskRegionStats stats; - private PersistentMemberManager memberManager; - private ProfileChangeListener listener; + protected final Object lock; + + protected final CacheDistributionAdvisor cacheDistributionAdvisor; + protected final String regionPath; + protected final PersistentMemberView persistentMemberView; + private final DiskRegionStats diskRegionStats; + private final PersistentMemberManager persistentMemberManager; + private final ProfileChangeListener profileChangeListener; + + private final Set<PersistentMemberID> recoveredMembers; + private final Set<PersistentMemberID> removedMembers = new HashSet<>(); + private final Set<PersistentMemberID> equalMembers; + private final DistributedLockService distributedLockService; + + private volatile boolean holdingTieLock; + + protected volatile boolean online; + private volatile Set<PersistentStateListener> persistentStateListeners = Collections.emptySet(); private volatile boolean initialized; private volatile boolean shouldUpdatePersistentView; protected volatile boolean isClosed; - private volatile boolean holdingTieLock; - - private Set<PersistentMemberID> recoveredMembers; - private Set<PersistentMemberID> removedMembers = new HashSet<PersistentMemberID>(); - private Set<PersistentMemberID> equalMembers; - private volatile Set<PersistentMemberID> allMembersWaitingFor; - private volatile Set<PersistentMemberID> offlineMembersWaitingFor; - protected final Object lock; - private static PersistenceAdvisorObserver observer = null; - private static final int PERSISTENT_VIEW_RETRY = - Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "PERSISTENT_VIEW_RETRY", 5); + protected volatile Set<PersistentMemberID> allMembersWaitingFor; + protected volatile Set<PersistentMemberID> offlineMembersWaitingFor; - public PersistenceAdvisorImpl(CacheDistributionAdvisor advisor, DistributedLockService dl, - PersistentMemberView storage, String regionPath, DiskRegionStats diskStats, - PersistentMemberManager memberManager) { - this.advisor = advisor; - this.dl = dl; + public PersistenceAdvisorImpl(CacheDistributionAdvisor cacheDistributionAdvisor, + DistributedLockService distributedLockService, PersistentMemberView persistentMemberView, + String regionPath, DiskRegionStats diskRegionStats, + PersistentMemberManager persistentMemberManager) { + this.cacheDistributionAdvisor = cacheDistributionAdvisor; + this.distributedLockService = distributedLockService; this.regionPath = regionPath; - this.storage = storage; - this.stats = diskStats; - this.listener = new ProfileChangeListener(); - this.memberManager = memberManager; - - // Prevent membership changes while we are persisting the membership view - // online. TODO prpersist is this the best thing to sync on? - // If we synchronize on something else, we need to be careful about - // lock ordering because the membership notifications are called - // with the advisor lock held. - this.lock = advisor; - - // Remember which members we know about because of what - // we have persisted - // We will later use this to handle updates from peers. + this.persistentMemberView = persistentMemberView; + this.diskRegionStats = diskRegionStats; + profileChangeListener = new ProfileChangeListener(); + this.persistentMemberManager = persistentMemberManager; + + // Prevent membership changes while we are persisting the membership view online. If we + // synchronize on something else, we need to be careful about lock ordering because the + // membership notifications are called with the advisor lock held. + lock = cacheDistributionAdvisor; + + // Remember which members we know about because of what we have persisted. We will later use + // this to handle updates from peers. recoveredMembers = getPersistedMembers(); - // To prevent races if we crash during initialization, - // mark equal members as online before we initialize. We will - // still report these members as equal, but if we crash and recover - // they will no longer be considered equal. - equalMembers = new HashSet<PersistentMemberID>(storage.getOfflineAndEqualMembers()); + // To prevent races if we crash during initialization, mark equal members as online before we + // initialize. We will still report these members as equal, but if we crash and recover they + // will no longer be considered equal. + equalMembers = new HashSet<>(persistentMemberView.getOfflineAndEqualMembers()); for (PersistentMemberID id : equalMembers) { - storage.memberOnline(id); + persistentMemberView.memberOnline(id); } } + @Override public void initialize() { if (initialized) { return; @@ -123,10 +121,10 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { finishPendingDestroy(); } - advisor.addProfileChangeListener(listener); + cacheDistributionAdvisor.addProfileChangeListener(profileChangeListener); - Set<PersistentMemberPattern> revokedMembers = - this.memberManager.addRevocationListener(listener, storage.getRevokedMembers()); + Set<PersistentMemberPattern> revokedMembers = persistentMemberManager + .addRevocationListener(profileChangeListener, persistentMemberView.getRevokedMembers()); for (PersistentMemberPattern pattern : revokedMembers) { memberRevoked(pattern); @@ -141,8 +139,8 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { /** * Adds a PersistentStateListener whose job is to log changes in the persistent view. */ - protected void startMemberLogging() { - this.addListener(new PersistentStateListener.PersistentStateAdapter() { + private void startMemberLogging() { + addListener(new PersistentStateListener.PersistentStateAdapter() { /** * A persistent member has gone offline. Log the offline member and log which persistent * members are still online (the current persistent view). @@ -150,19 +148,18 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { @Override public void memberOffline(InternalDistributedMember member, PersistentMemberID persistentID) { if (logger.isDebugEnabled()) { - Set<String> onlineMembers = new HashSet<String>(); - Set<PersistentMemberID> members = new HashSet<PersistentMemberID>(); - members.addAll( - PersistenceAdvisorImpl.this.advisor.adviseInitializedPersistentMembers().values()); + Set<PersistentMemberID> members = + new HashSet<>(cacheDistributionAdvisor.adviseInitializedPersistentMembers().values()); members.remove(persistentID); + Set<String> onlineMembers = new HashSet<>(); TransformUtils.transform(members, onlineMembers, TransformUtils.persistentMemberIdToLogEntryTransformer); logger.info(LocalizedMessage.create( LocalizedStrings.PersistenceAdvisorImpl_PERSISTENT_VIEW, - new Object[] {PersistenceAdvisorImpl.this.regionPath, + new Object[] {regionPath, TransformUtils.persistentMemberIdToLogEntryTransformer.transform(persistentID), onlineMembers})); } @@ -170,31 +167,18 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { }); } - public boolean acquireTieLock() { - holdingTieLock = dl.lock("PERSISTENCE_" + regionPath, 0, -1); - return holdingTieLock; - } - - public void releaseTieLock() { - if (holdingTieLock) { - dl.unlock("PERSISTENCE_" + regionPath); - holdingTieLock = false; - } - } - + @Override public PersistentStateQueryResults getMyStateOnMembers(Set<InternalDistributedMember> members) throws ReplyException { - - PersistentStateQueryResults results = - PersistentStateQueryMessage.send(members, advisor.getDistributionManager(), regionPath, - storage.getMyPersistentID(), storage.getMyInitializingID()); - - return results; + return PersistentStateQueryMessage.send(members, + cacheDistributionAdvisor.getDistributionManager(), regionPath, + persistentMemberView.getMyPersistentID(), persistentMemberView.getMyInitializingID()); } /** * Return what state we have persisted for a given peer's id. */ + @Override public PersistentMemberState getPersistedStateOfMember(PersistentMemberID id) { if (isRevoked(id)) { return PersistentMemberState.REVOKED; @@ -205,17 +189,17 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { return PersistentMemberState.EQUAL; } - // If we have a member that is marked as online that - // is an older version of the peers id, tell them they are online - for (PersistentMemberID online : storage.getOnlineMembers()) { - if (online.isOlderOrEqualVersionOf(id)) { + // If we have a member that is marked as online that is an older version of the peers id, tell + // them they are online + for (PersistentMemberID onlineMember : persistentMemberView.getOnlineMembers()) { + if (onlineMember.isOlderOrEqualVersionOf(id)) { return PersistentMemberState.ONLINE; } } - // If we have a member that is marked as offline that - // is a newer version of the peers id, tell them they are online - for (PersistentMemberID offline : storage.getOfflineMembers()) { + // If we have a member that is marked as offline that is a newer version of the peers id, tell + // them they are online + for (PersistentMemberID offline : persistentMemberView.getOfflineMembers()) { if (id.isOlderOrEqualVersionOf(offline)) { return PersistentMemberState.OFFLINE; } @@ -223,22 +207,23 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { return null; } - public void updateMembershipView(InternalDistributedMember replicate, - boolean targetReinitializing) { + @Override + public void updateMembershipView(InternalDistributedMember peer, boolean targetReinitializing) { beginUpdatingPersistentView(); - DistributionManager dm = advisor.getDistributionManager(); - PersistentMembershipView view = - MembershipViewRequest.send(replicate, dm, regionPath, targetReinitializing); + DistributionManager dm = cacheDistributionAdvisor.getDistributionManager(); + PersistentMembershipView peersPersistentMembershipView = + MembershipViewRequest.send(peer, dm, regionPath, targetReinitializing); if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Updating persistent view from {}", - shortDiskStoreId(), regionPath, replicate); + shortDiskStoreId(), regionPath, peer); } synchronized (lock) { PersistentMemberID myId = getPersistentID(); Map<InternalDistributedMember, PersistentMemberID> peersOnlineMembers = - view.getOnlineMembers(); - Set<PersistentMemberID> peersOfflineMembers = view.getOfflineMembers(); + peersPersistentMembershipView.getOnlineMembers(); + Set<PersistentMemberID> peersOfflineMembers = + peersPersistentMembershipView.getOfflineMembers(); for (PersistentMemberID id : peersOnlineMembers.values()) { if (!isRevoked(id) && !removedMembers.contains(id)) { @@ -247,29 +232,28 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Processing membership view from peer. Marking {} as online because {} says its online", - shortDiskStoreId(), regionPath, id, replicate); + shortDiskStoreId(), regionPath, id, peer); } - storage.memberOnline(id); + persistentMemberView.memberOnline(id); } } } for (PersistentMemberID id : peersOfflineMembers) { if (!isRevoked(id) && !removedMembers.contains(id)) { - // This method is called before the current member is online. - // if the peer knows about a member that the current member doesn't know - // about, that means that member must have been added to the DS after - // the current member went offline. Therefore, that member is *newer* than - // the current member. So mark that member as online (meaning, online later + // This method is called before the current member is online. if the peer knows about a + // member that the current member doesn't know about, that means that member must have + // been added to the DS after the current member went offline. Therefore, that member is + // *newer* than the current member. So mark that member as online (meaning, online later // than the current member). if (!id.equals(myId) && !recoveredMembers.remove(id) && !id.diskStoreId.equals(getDiskStoreID())) { if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Processing membership view from peer. Marking {} as online because {} says its offline, but we have never seen it", - shortDiskStoreId(), regionPath, id, replicate); + shortDiskStoreId(), regionPath, id, peer); } - storage.memberOnline(id); + persistentMemberView.memberOnline(id); } } } @@ -279,25 +263,25 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Processing membership view from peer. Removing {} because {} doesn't have it", - shortDiskStoreId(), regionPath, id, replicate); + shortDiskStoreId(), regionPath, id, peer); } - storage.memberRemoved(id); + persistentMemberView.memberRemoved(id); } } - // Update the set of revoked members from the peer - // This should be called without holding the lock to - // avoid deadlocks - Set<PersistentMemberPattern> revokedMembers = view.getRevokedMembers(); + // Update the set of revoked members from the peer. This should be called without holding the + // lock to avoid deadlocks + Set<PersistentMemberPattern> revokedMembers = peersPersistentMembershipView.getRevokedMembers(); for (PersistentMemberPattern revoked : revokedMembers) { - memberManager.revokeMember(revoked); + persistentMemberManager.revokeMember(revoked); } } - protected boolean isRevoked(PersistentMemberID id) { - return memberManager.isRevoked(this.regionPath, id); + private boolean isRevoked(PersistentMemberID id) { + return persistentMemberManager.isRevoked(regionPath, id); } + @Override public void setOnline(boolean didGII, boolean atomicCreation, PersistentMemberID newId) throws ReplyException { if (online) { @@ -310,65 +294,57 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { synchronized (lock) { - // Transition any members that are marked as online, but not actually - // currently running, to offline. + // Transition any members that are marked as online, but not actually currently running, to + // offline. Set<PersistentMemberID> membersToMarkOffline = - new HashSet<PersistentMemberID>(storage.getOnlineMembers()); + new HashSet<>(persistentMemberView.getOnlineMembers()); Map<InternalDistributedMember, PersistentMemberID> onlineMembers; if (!atomicCreation) { - onlineMembers = advisor.adviseInitializedPersistentMembers(); + onlineMembers = cacheDistributionAdvisor.adviseInitializedPersistentMembers(); } else { - // Fix for 41100 - If this is an atomic bucket creation, don't - // mark our peers, which are concurrently intitializing, as offline - // they have the exact same data as we do (none), so we are not - // technically "newer," and this avoids a race where both members - // can think the other is offline ("older"). - onlineMembers = advisor.advisePersistentMembers(); + // Fix for 41100 - If this is an atomic bucket creation, don't mark our peers, which are + // concurrently initializing, as offline they have the exact same data as we do (none), so + // we are not technically "newer," and this avoids a race where both members can think the + // other is offline ("older"). + onlineMembers = cacheDistributionAdvisor.advisePersistentMembers(); } membersToMarkOffline.removeAll(onlineMembers.values()); - // Another fix for 41100 - // Don't mark equal members as offline if that are currently running. - // We don't have newer data than these members - // so this is safe, and it it avoids a race where we mark them offline - // at this point, and then later they mark us as offline. + // Another fix for 41100 - Don't mark equal members as offline if that are currently running. + // We don't have newer data than these members so this is safe, and it it avoids a race where + // we mark them offline at this point, and then later they mark us as offline. if (equalMembers != null && !equalMembers.isEmpty()) { - // This is slightly hacky. We're looking for a running member that has - // the same disk store as our equal members, because all have is a persistent - // id of the equal members. The persistent id of the running member may be - // different than what we have marked as equal, because the id in the profile - // is the new id for the member. - Collection<PersistentMemberID> allMembers = advisor.advisePersistentMembers().values(); - Set<DiskStoreID> runningDiskStores = new HashSet<DiskStoreID>(); + // This is slightly hacky. We're looking for a running member that has the same disk store + // as our equal members, because all have is a persistent id of the equal members. The + // persistent id of the running member may be different than what we have marked as equal, + // because the id in the profile is the new id for the member. + Collection<PersistentMemberID> allMembers = + cacheDistributionAdvisor.advisePersistentMembers().values(); + Set<DiskStoreID> runningDiskStores = new HashSet<>(); for (PersistentMemberID mem : allMembers) { runningDiskStores.add(mem.diskStoreId); } // Remove any equal members which are not actually running right now. - for (Iterator<PersistentMemberID> itr = equalMembers.iterator(); itr.hasNext();) { - PersistentMemberID id = itr.next(); - if (!runningDiskStores.contains(id.diskStoreId)) { - itr.remove(); - } - } + equalMembers.removeIf(id -> !runningDiskStores.contains(id.diskStoreId)); membersToMarkOffline.removeAll(equalMembers); } for (PersistentMemberID id : membersToMarkOffline) { - storage.memberOffline(id); + persistentMemberView.memberOffline(id); } if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Persisting the new membership view and ID as online. Online members {}. Offline members {}. Equal memebers {}.", - shortDiskStoreId(), regionPath, storage.getOnlineMembers(), storage.getOfflineMembers(), - equalMembers); + shortDiskStoreId(), regionPath, persistentMemberView.getOnlineMembers(), + persistentMemberView.getOfflineMembers(), equalMembers); } - storage.setInitialized(); + persistentMemberView.setInitialized(); online = true; - removedMembers = Collections.emptySet(); + removedMembers.clear(); } - if (stats != null) { - stats.incInitializations(!didGII); + if (diskRegionStats != null) { + diskRegionStats.incInitializations(!didGII); } } @@ -386,7 +362,7 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { if (!shouldUpdatePersistentView) { shouldUpdatePersistentView = true; Map<InternalDistributedMember, PersistentMemberID> onlineMembers = - advisor.adviseInitializedPersistentMembers(); + cacheDistributionAdvisor.adviseInitializedPersistentMembers(); for (Map.Entry<InternalDistributedMember, PersistentMemberID> entry : onlineMembers .entrySet()) { memberOnline(entry.getKey(), entry.getValue()); @@ -395,38 +371,37 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } } + @Override public void setInitializing(PersistentMemberID newId) { - beginUpdatingPersistentView(); - DistributionManager dm = advisor.getDistributionManager(); + DistributionManager dm = cacheDistributionAdvisor.getDistributionManager(); PersistentMemberID oldId = getPersistentID(); PersistentMemberID initializingId = getInitializingID(); - Set profileUpdateRecipients = advisor.adviseProfileUpdate(); - if (newId == null || (!newId.equals(oldId) && !newId.equals(initializingId))) { + Set<InternalDistributedMember> profileUpdateRecipients = + cacheDistributionAdvisor.adviseProfileUpdate(); + if (newId == null || !newId.equals(oldId) && !newId.equals(initializingId)) { // If we have not yet prepared the old id, prepare it now. - // This will only be the case if we crashed - // while initializing previously. In the case, we are essentially - // finishing what we started by preparing that ID first. This - // will remove that ID from the peers. + // This will only be the case if we crashed while initializing previously. In the case, we are + // essentially finishing what we started by preparing that ID first. This will remove that ID + // from the peers. if (initializingId != null) { if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: We still have an initializing id: {}. Telling peers to remove the old id {} and transitioning this initializing id to old id. recipients {}", shortDiskStoreId(), regionPath, initializingId, oldId, profileUpdateRecipients); } - // TODO prpersist - clean this up - long viewVersion = advisor.startOperation(); + long viewVersion = cacheDistributionAdvisor.startOperation(); try { PrepareNewPersistentMemberMessage.send(profileUpdateRecipients, dm, regionPath, oldId, initializingId); } finally { if (viewVersion != -1) { - advisor.endOperation(viewVersion); + cacheDistributionAdvisor.endOperation(viewVersion); } } oldId = initializingId; @@ -435,10 +410,10 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { if (logger.isDebugEnabled()) { logger.debug("Persisting my new persistent ID {}", newId); } - storage.setInitializing(newId); + persistentMemberView.setInitializing(newId); } - profileUpdateRecipients = advisor.adviseProfileUpdate(); + profileUpdateRecipients = cacheDistributionAdvisor.adviseProfileUpdate(); if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Sending the new ID to peers. They should remove the old id {}. Recipients: {}", @@ -449,43 +424,94 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } } + @Override public PersistentMemberID generatePersistentID() { - return storage.generatePersistentID(); + return persistentMemberView.generatePersistentID(); } + @Override public PersistentMembershipView getMembershipView() { if (!initialized) { return null; } Set<PersistentMemberID> offlineMembers = getPersistedMembers(); Map<InternalDistributedMember, PersistentMemberID> onlineMembers = - advisor.adviseInitializedPersistentMembers(); + cacheDistributionAdvisor.adviseInitializedPersistentMembers(); offlineMembers.removeAll(onlineMembers.values()); PersistentMemberID myId = getPersistentID(); if (myId != null) { - onlineMembers.put(advisor.getDistributionManager().getDistributionManagerId(), myId); + onlineMembers + .put(cacheDistributionAdvisor.getDistributionManager().getDistributionManagerId(), myId); } - PersistentMembershipView view = new PersistentMembershipView(offlineMembers, onlineMembers, - memberManager.getRevokedMembers()); - return view; + return new PersistentMembershipView(offlineMembers, onlineMembers, + persistentMemberManager.getRevokedMembers()); } + @Override public Set<PersistentMemberID> getPersistedMembers() { - Set<PersistentMemberID> offlineMembers = storage.getOfflineMembers(); - Set<PersistentMemberID> equalMembers = storage.getOfflineAndEqualMembers(); - Set<PersistentMemberID> onlineMembers = storage.getOnlineMembers(); - Set<PersistentMemberID> persistentMembers = new HashSet<PersistentMemberID>(); - persistentMembers.addAll(offlineMembers); - persistentMembers.addAll(equalMembers); - persistentMembers.addAll(onlineMembers); + Set<PersistentMemberID> persistentMembers = new HashSet<>(); + persistentMembers.addAll(persistentMemberView.getOfflineMembers()); + persistentMembers.addAll(persistentMemberView.getOfflineAndEqualMembers()); + persistentMembers.addAll(persistentMemberView.getOnlineMembers()); return persistentMembers; } + @Override + public boolean checkMyStateOnMembers(Set<InternalDistributedMember> replicates) + throws ReplyException { + PersistentStateQueryResults remoteStates = getMyStateOnMembers(replicates); + + persistenceAdvisorObserver.observe(regionPath); + + boolean equal = false; + for (Map.Entry<InternalDistributedMember, PersistentMemberState> entry : remoteStates.stateOnPeers + .entrySet()) { + InternalDistributedMember member = entry.getKey(); + PersistentMemberID remoteId = remoteStates.persistentIds.get(member); + + final PersistentMemberID myId = getPersistentID(); + PersistentMemberState stateOnPeer = entry.getValue(); + + if (PersistentMemberState.REVOKED.equals(stateOnPeer)) { + throw new RevokedPersistentDataException( + LocalizedStrings.PersistentMemberManager_Member_0_is_already_revoked + .toLocalizedString(myId)); + } + + if (myId != null && stateOnPeer == null) { + String message = LocalizedStrings.CreatePersistentRegionProcessor_SPLIT_DISTRIBUTED_SYSTEM + .toLocalizedString(regionPath, member, remoteId, myId); + throw new ConflictingPersistentDataException(message); + } + + if (myId != null && stateOnPeer == PersistentMemberState.EQUAL) { + equal = true; + } + + // The other member changes its ID when it comes back online. + if (remoteId != null) { + PersistentMemberState remoteState = getPersistedStateOfMember(remoteId); + if (remoteState == PersistentMemberState.OFFLINE) { + String message = + LocalizedStrings.CreatePersistentRegionProcessor_INITIALIZING_FROM_OLD_DATA + .toLocalizedString(regionPath, member, remoteId, myId); + throw new ConflictingPersistentDataException(message); + } + } + } + return equal; + } + + public static void setPersistenceAdvisorObserver(PersistenceAdvisorObserver o) { + persistenceAdvisorObserver = o == null ? DEFAULT_PERSISTENCE_ADVISOR_OBSERVER : o; + } + + @Override public PersistentMemberID getPersistentIDIfOnline() { if (online) { - return storage.getMyPersistentID(); + return persistentMemberView.getMyPersistentID(); } else { return null; } @@ -499,23 +525,22 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { distributedMember, persistentID); } synchronized (lock) { - boolean foundMember = false; - foundMember |= recoveredMembers.remove(persistentID); + boolean foundMember = recoveredMembers.remove(persistentID); foundMember |= equalMembers.remove(persistentID); foundMember |= getPersistedMembers().contains(persistentID); - // Don't persist members as offline until we are online. Otherwise, we may - // think we have later data than them during recovery. + // Don't persist members as offline until we are online. Otherwise, we may think we have later + // data than them during recovery. if (shouldUpdatePersistentView && online) { try { // Don't persistent members as offline if we have already persisted them as equal. - if (storage.getOfflineAndEqualMembers().contains(persistentID)) { + if (persistentMemberView.getOfflineAndEqualMembers().contains(persistentID)) { return; } - // Don't mark the member as offline if we have never seen it. If we haven't seen it - // that means it's not done initializing yet. + // Don't mark the member as offline if we have never seen it. If we haven't seen it that + // means it's not done initializing yet. if (foundMember) { if (PersistenceObserverHolder.getInstance().memberOffline(regionPath, persistentID)) { - storage.memberOffline(persistentID); + persistentMemberView.memberOffline(persistentID); } PersistenceObserverHolder.getInstance().afterPersistedOffline(regionPath, persistentID); } @@ -541,7 +566,7 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { recoveredMembers.remove(persistentID); try { if (PersistenceObserverHolder.getInstance().memberOnline(regionPath, persistentID)) { - storage.memberOnline(persistentID); + persistentMemberView.memberOnline(persistentID); } PersistenceObserverHolder.getInstance().afterPersistedOnline(regionPath, persistentID); } catch (DiskAccessException e) { @@ -561,22 +586,21 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } private void memberRevoked(PersistentMemberPattern pattern) { - // Persist the revoked member, so if we recover later we will - // remember that they were revoked. - storage.memberRevoked(pattern); + // Persist the revoked member, so if we recover later we will remember that they were revoked. + persistentMemberView.memberRevoked(pattern); // Remove the revoked member from our view. - for (PersistentMemberID id : storage.getOfflineMembers()) { + for (PersistentMemberID id : persistentMemberView.getOfflineMembers()) { if (pattern.matches(id)) { memberRemoved(id, true); } } - for (PersistentMemberID id : storage.getOnlineMembers()) { + for (PersistentMemberID id : persistentMemberView.getOnlineMembers()) { if (pattern.matches(id)) { memberRemoved(id, true); } } - for (PersistentMemberID id : storage.getOfflineAndEqualMembers()) { + for (PersistentMemberID id : persistentMemberView.getOfflineAndEqualMembers()) { if (pattern.matches(id)) { memberRemoved(id, true); } @@ -597,14 +621,13 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } try { if (PersistenceObserverHolder.getInstance().memberRemoved(regionPath, id)) { - storage.memberRemoved(id); + persistentMemberView.memberRemoved(id); } - // Purge any IDs that are old versions of the the id that - // we just removed + // Purge any IDs that are old versions of the the id that we just removed for (PersistentMemberID persistedId : getPersistedMembers()) { if (persistedId.isOlderOrEqualVersionOf(id)) { - storage.memberRemoved(persistedId); + persistentMemberView.memberRemoved(persistedId); } } PersistenceObserverHolder.getInstance().afterRemovePersisted(regionPath, id); @@ -616,61 +639,63 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } } + @Override public PersistentMemberID getPersistentID() { - return storage.getMyPersistentID(); + return persistentMemberView.getMyPersistentID(); } + @Override public PersistentMemberID getInitializingID() { - return storage.getMyInitializingID(); + return persistentMemberView.getMyInitializingID(); } + @Override public void addListener(PersistentStateListener listener) { synchronized (this) { - HashSet<PersistentStateListener> tmpListeners = - new HashSet<PersistentStateListener>(listeners); + Set<PersistentStateListener> tmpListeners = new HashSet<>(persistentStateListeners); tmpListeners.add(listener); - listeners = Collections.unmodifiableSet(tmpListeners); + persistentStateListeners = Collections.unmodifiableSet(tmpListeners); } } + @Override public void removeListener(PersistentStateListener listener) { synchronized (this) { - HashSet<PersistentStateListener> tmpListeners = - new HashSet<PersistentStateListener>(listeners); + Set<PersistentStateListener> tmpListeners = new HashSet<>(persistentStateListeners); tmpListeners.remove(listener); - listeners = Collections.unmodifiableSet(tmpListeners); + persistentStateListeners = Collections.unmodifiableSet(tmpListeners); } } private void notifyListenersMemberOnline(InternalDistributedMember member, PersistentMemberID persistentID) { - for (PersistentStateListener listener : listeners) { + for (PersistentStateListener listener : persistentStateListeners) { listener.memberOnline(member, persistentID); } } private void notifyListenersMemberOffline(InternalDistributedMember member, PersistentMemberID persistentID) { - for (PersistentStateListener listener : listeners) { + for (PersistentStateListener listener : persistentStateListeners) { listener.memberOffline(member, persistentID); } } private void notifyListenersMemberRemoved(PersistentMemberID persistentID, boolean revoked) { - for (PersistentStateListener listener : listeners) { + for (PersistentStateListener listener : persistentStateListeners) { listener.memberRemoved(persistentID, revoked); } - } + @Override public HashSet<PersistentMemberID> getPersistedOnlineOrEqualMembers() { - HashSet<PersistentMemberID> members = - new HashSet<PersistentMemberID>(storage.getOnlineMembers()); + HashSet<PersistentMemberID> members = new HashSet<>(persistentMemberView.getOnlineMembers()); members.addAll(equalMembers); return members; } + @Override public void prepareNewMember(InternalDistributedMember sender, PersistentMemberID oldId, PersistentMemberID newId) { if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { @@ -679,9 +704,9 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { newId, oldId); } synchronized (lock) { - // Don't prepare the ID if the advisor doesn't have a profile. This prevents - // A race with the advisor remove - if (!advisor.containsId(sender)) { + // Don't prepare the ID if the advisor doesn't have a profile. This prevents a race with the + // advisor remove + if (!cacheDistributionAdvisor.containsId(sender)) { if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Refusing to prepare id because {} is not in our advisor", shortDiskStoreId(), @@ -689,13 +714,11 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } return; } - // Persist new members even if we are not online yet - // Two members can become online at once. This way, - // they will know about each other. - storage.memberOnline(newId); + // Persist new members even if we are not online yet. Two members can become online at once. + // This way, they will know about each other. + persistentMemberView.memberOnline(newId); - // The oldId and newId could be the same if the member - // is retrying a GII. See bug #42051 + // The oldId and newId could be the same if the member is retrying a GII. See bug #42051 if (oldId != null && !oldId.equals(newId)) { if (initialized) { memberRemoved(oldId, false); @@ -709,79 +732,46 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { return diskStoreID == null ? "mem" : diskStoreID.abbrev(); } + @Override public void removeMember(PersistentMemberID id) { memberRemoved(id, false); } + @Override public void markMemberOffline(InternalDistributedMember member, PersistentMemberID id) { memberOffline(member, id); } + @Override + public CacheDistributionAdvisor getCacheDistributionAdvisor() { + return cacheDistributionAdvisor; + } + + @Override public void setWaitingOnMembers(Set<PersistentMemberID> allMembersToWaitFor, Set<PersistentMemberID> offlineMembersToWaitFor) { - this.allMembersWaitingFor = allMembersToWaitFor; - this.offlineMembersWaitingFor = offlineMembersToWaitFor; + allMembersWaitingFor = allMembersToWaitFor; + offlineMembersWaitingFor = offlineMembersToWaitFor; } - public boolean checkMyStateOnMembers(Set<InternalDistributedMember> replicates) - throws ReplyException { - PersistentStateQueryResults remoteStates = getMyStateOnMembers(replicates); - boolean equal = false; - - if (observer != null) { - observer.observe(regionPath); - } - - for (Map.Entry<InternalDistributedMember, PersistentMemberState> entry : remoteStates.stateOnPeers - .entrySet()) { - InternalDistributedMember member = entry.getKey(); - PersistentMemberID remoteId = remoteStates.persistentIds.get(member); - - final PersistentMemberID myId = getPersistentID(); - PersistentMemberState stateOnPeer = entry.getValue(); - - if (PersistentMemberState.REVOKED.equals(stateOnPeer)) { - throw new RevokedPersistentDataException( - LocalizedStrings.PersistentMemberManager_Member_0_is_already_revoked - .toLocalizedString(myId)); - } - - if (myId != null && stateOnPeer == null) { - String message = LocalizedStrings.CreatePersistentRegionProcessor_SPLIT_DISTRIBUTED_SYSTEM - .toLocalizedString(regionPath, member, remoteId, myId); - throw new ConflictingPersistentDataException(message); - } - - if (myId != null && stateOnPeer == PersistentMemberState.EQUAL) { - equal = true; - } - - // TODO prpersist - This check might not help much. The other member changes it's ID when it - // comes back online. - if (remoteId != null) { - PersistentMemberState remoteState = getPersistedStateOfMember(remoteId); - if (remoteState == PersistentMemberState.OFFLINE) { - String message = - LocalizedStrings.CreatePersistentRegionProcessor_INITIALIZING_FROM_OLD_DATA - .toLocalizedString(regionPath, member, remoteId, myId); - throw new ConflictingPersistentDataException(message); - } - } - } - return equal; + @Override + public boolean isClosed() { + return isClosed; } + public void finishPendingDestroy() { // send a message to peers indicating that they should remove this profile - long viewVersion = advisor.startOperation(); + long viewVersion = cacheDistributionAdvisor.startOperation(); try { - RemovePersistentMemberMessage.send(advisor.adviseProfileUpdate(), - advisor.getDistributionManager(), regionPath, getPersistentID(), getInitializingID()); + RemovePersistentMemberMessage.send(cacheDistributionAdvisor.adviseProfileUpdate(), + cacheDistributionAdvisor.getDistributionManager(), regionPath, getPersistentID(), + getInitializingID()); - storage.finishPendingDestroy(); + persistentMemberView.finishPendingDestroy(); } finally { if (viewVersion != -1) { - advisor.endOperation(viewVersion); + cacheDistributionAdvisor.endOperation(viewVersion); } } synchronized (lock) { @@ -798,155 +788,12 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { * @throws ConflictingPersistentDataException if there are active members which are not based on * the state that is persisted in this member. */ - public CacheDistributionAdvisor.InitialImageAdvice getInitialImageAdvice( - CacheDistributionAdvisor.InitialImageAdvice previousAdvice, boolean recoverFromDisk) { - final boolean isPersistAdvisorDebugEnabled = - logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE); - - MembershipChangeListener listener = new MembershipChangeListener(); - advisor.addMembershipAndProxyListener(listener); - addListener(listener); - try { - while (true) { - Set<PersistentMemberID> previouslyOnlineMembers = getPersistedOnlineOrEqualMembers(); - - advisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null); - try { - InitialImageAdvice advice = advisor.adviseInitialImage(previousAdvice, true); - - if (!advice.getReplicates().isEmpty()) { - if (isPersistAdvisorDebugEnabled) { - logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, - "{}-{}: There are members currently online. Checking for our state on those members and then initializing", - shortDiskStoreId(), regionPath); - } - // We will go ahead and take the other members contents if we ourselves didn't recover - // from disk. - if (recoverFromDisk) { - // Check with these members to make sure that they - // have heard of us - // If any of them say we have the same data on disk, we don't need to do a GII - if (checkMyStateOnMembers(advice.getReplicates())) { - if (isPersistAdvisorDebugEnabled) { - logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, - "{}-{}: We have the same data on disk as one of {} recovering gracefully", - shortDiskStoreId(), regionPath, advice.getReplicates()); - } - advice.getReplicates().clear(); - } else { - // If we have to do a GII, we have not equal members anymore. - synchronized (lock) { - equalMembers.clear(); - } - } - } - return advice; - } else if (!advice.getNonPersistent().isEmpty()) { - // We support a persistent member getting a membership view - // from a non persistent member and using that information to wait - // for the other known persistent members. See - // PersistentRecoveryOrderDUnitTest.testTransmitCrashedMembersWithNonPeristentRegion - updateViewFromNonPersistent(recoverFromDisk, advice); - previouslyOnlineMembers = getPersistedOnlineOrEqualMembers(); - } - - // Fix for 51698 - If there are online members that we previously - // failed to get a GII from, retry those members rather than wait - // for new persistent members to recover. - if (previousAdvice != null && !previousAdvice.getReplicates().isEmpty()) { - logger.info( - LocalizedMessage.create(LocalizedStrings.PersistenceAdvisorImpl_RETRYING_GII)); - previousAdvice = null; - continue; - } - - // If there are no currently online members, and no - // previously online members, this member should just go with what's - // on it's own disk - if (previouslyOnlineMembers.isEmpty()) { - if (isPersistAdvisorDebugEnabled) { - logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, - "{}-{}: No previously online members. Recovering with the data from the local disk", - shortDiskStoreId(), regionPath); - } - return advice; - } - - - Set<PersistentMemberID> offlineMembers = new HashSet<PersistentMemberID>(); - Set<PersistentMemberID> membersToWaitFor = - getMembersToWaitFor(previouslyOnlineMembers, offlineMembers); - - if (membersToWaitFor.isEmpty()) { - if (isPersistAdvisorDebugEnabled) { - logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, - "{}-{}: All of the previously online members are now online and waiting for us. Acquiring tie lock. Previously online members {}", - shortDiskStoreId(), regionPath, advice.getReplicates()); - } - // We're tied for the latest copy of the data. try to get the distributed lock. - if (acquireTieLock()) { - advice = advisor.adviseInitialImage(previousAdvice, true); - if (isPersistAdvisorDebugEnabled) { - logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, - "{}-{}: Acquired the lock. This member will initialize", shortDiskStoreId(), - regionPath); - } - if (!advice.getReplicates().isEmpty()) { - if (isPersistAdvisorDebugEnabled) { - logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, - "{}-{}: Another member has initialized while we were getting the lock. We will initialize from that member", - shortDiskStoreId(), regionPath); - } - checkMyStateOnMembers(advice.getReplicates()); - } - return advice; - } else { - if (isPersistAdvisorDebugEnabled) { - logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, - "{}-{}: Failed to acquire the lock.", shortDiskStoreId(), regionPath); - } - } - } else { - if (isPersistAdvisorDebugEnabled) { - logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, - "{}-{}: Going to wait for these member ids: {}", shortDiskStoreId(), regionPath, - membersToWaitFor); - } - } - - beginWaitingForMembershipChange(membersToWaitFor); - try { - // The persistence advisor needs to know which members are really not available - // because the user uses this information to decide which members they - // haven't started yet. membersToWaitFor includes members that - // are still waiting to start up, but are waiting for members other than - // the current member. So we pass the set of offline members here - listener.waitForChange(membersToWaitFor, offlineMembers); - } finally { - endWaitingForMembershipChange(); - } - } catch (InterruptedException e) { - logger.debug("Interrupted while trying to determine latest persisted copy: {}", - e.getMessage(), e); - } - } - } finally { - advisor.removeMembershipAndProxyListener(listener); - removeListener(listener); - } - } - - public void updateViewFromNonPersistent(boolean recoverFromDisk, InitialImageAdvice advice) { - for (InternalDistributedMember replicate : advice.getNonPersistent()) { - try { - updateMembershipView(replicate, recoverFromDisk); - return; - } catch (ReplyException e) { - if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { - logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "Failed to update membership view", e); - } - } - } + @Override + public InitialImageAdvice getInitialImageAdvice(InitialImageAdvice previousAdvice, + boolean hasDiskImageToRecoverFrom) { + PersistenceInitialImageAdvisor piia = new PersistenceInitialImageAdvisor(this, + shortDiskStoreId(), regionPath, cacheDistributionAdvisor, hasDiskImageToRecoverFrom); + return piia.getAdvice(previousAdvice); } /** @@ -956,33 +803,33 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { * need to wait for - this member may end up waiting on member that is actually running. * @return the list of members that this member needs to wait for before it can initialize. */ + @Override public Set<PersistentMemberID> getMembersToWaitFor( Set<PersistentMemberID> previouslyOnlineMembers, Set<PersistentMemberID> offlineMembers) - throws ReplyException, InterruptedException { + throws ReplyException { PersistentMemberID myPersistentID = getPersistentID(); PersistentMemberID myInitializingId = getInitializingID(); // This is the set of members that are currently waiting for this member // to come online. - Set<PersistentMemberID> membersToWaitFor = - new HashSet<PersistentMemberID>(previouslyOnlineMembers); + Set<PersistentMemberID> membersToWaitFor = new HashSet<>(previouslyOnlineMembers); offlineMembers.addAll(previouslyOnlineMembers); // If our persistent ID is null, we need to wait for all of the previously online members. if (myPersistentID != null || myInitializingId != null) { - Set<InternalDistributedMember> members = advisor.adviseProfileUpdate(); - Set<InternalDistributedMember> membersHostingThisRegion = advisor.adviseGeneric(); + Set<InternalDistributedMember> members = cacheDistributionAdvisor.adviseProfileUpdate(); + Set<InternalDistributedMember> membersHostingThisRegion = + cacheDistributionAdvisor.adviseGeneric(); // Fetch the persistent view from all of our peers. PersistentStateQueryResults results = PersistentStateQueryMessage.send(members, - advisor.getDistributionManager(), regionPath, myPersistentID, myInitializingId); - - // iterate through all of the peers. For each peer: - // if the guy was previously online according to us, grab it's online - // members and add them to the members to wait for set. - // We may need to do this several times until we discover all of the - // members that may have newer data than - // us, + cacheDistributionAdvisor.getDistributionManager(), regionPath, myPersistentID, + myInitializingId); + + // iterate through all of the peers. For each peer: if the guy was previously online according + // to us, grab its online members and add them to the members to wait for set. We may need to + // do this several times until we discover all of the members that may have newer data than + // us. boolean addedMembers = true; while (addedMembers) { addedMembers = false; @@ -997,11 +844,11 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { for (PersistentMemberID peerOnlineMember : peersOnlineMembers) { if (!isRevoked(peerOnlineMember) && !peerOnlineMember.diskStoreId.equals(getDiskStoreID()) - && !storage.getOfflineMembers().contains(peerOnlineMember)) { + && !persistentMemberView.getOfflineMembers().contains(peerOnlineMember)) { if (membersToWaitFor.add(peerOnlineMember)) { addedMembers = true; // Make sure we also persist that this member is online. - storage.memberOnline(peerOnlineMember); + persistentMemberView.memberOnline(peerOnlineMember); if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Adding {} to the list of members we're wait for, because {} has newer or equal data than is and is waiting for that member", @@ -1034,8 +881,7 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { .toLocalizedString(myPersistentID)); } - // If the peer thinks we are newer or equal to them, we don't - // need to wait for this peer. + // If the peer thinks we are newer or equal to them, we don't need to wait for this peer. if (membersHostingThisRegion.contains(memberId) && persistentID != null && state != null && myInitializingId == null && (state.equals(PersistentMemberState.ONLINE) || state.equals(PersistentMemberState.EQUAL))) { @@ -1052,8 +898,7 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { removeNewerPersistentID(offlineMembers, persistentID); } - // If the peer thinks we are newer or equal to them, we don't - // need to wait for this peer. + // If the peer thinks we are newer or equal to them, we don't need to wait for this peer. if (membersHostingThisRegion.contains(memberId) && initializingID != null && state != null && (state.equals(PersistentMemberState.ONLINE) || state.equals(PersistentMemberState.EQUAL))) { @@ -1070,9 +915,8 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { removeNewerPersistentID(offlineMembers, initializingID); } - // If we were able to determine what disk store this member - // is in, and it doesn't have a persistent ID, but we think - // we should be waiting for it, stop waiting for it. + // If we were able to determine what disk store this member is in, and it doesn't have a + // persistent ID, but we think we should be waiting for it, stop waiting for it. if (initializingID == null && persistentID == null & diskStoreID != null) { removeByDiskStoreID(membersToWaitFor, diskStoreID); removeByDiskStoreID(offlineMembers, diskStoreID); @@ -1123,14 +967,7 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } } - protected void beginWaitingForMembershipChange(Set<PersistentMemberID> membersToWaitFor) { - // do nothing - } - - protected void endWaitingForMembershipChange() { - // do nothing - } - + @Override public boolean wasHosting() { return getPersistentID() != null || getInitializingID() != null; } @@ -1155,125 +992,125 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { return allMembersWaitingFor; } - protected void logWaitingForMember(Set<PersistentMemberID> allMembersToWaitFor, - Set<PersistentMemberID> offlineMembersToWaitFor) { - Set<String> membersToWaitForLogEntries = new HashSet<String>(); + @Override + public void logWaitingForMembers() { + Set<String> membersToWaitForLogEntries = new HashSet<>(); - if (offlineMembersToWaitFor != null && !offlineMembersToWaitFor.isEmpty()) { - TransformUtils.transform(offlineMembersToWaitFor, membersToWaitForLogEntries, + if (offlineMembersWaitingFor != null && !offlineMembersWaitingFor.isEmpty()) { + TransformUtils.transform(offlineMembersWaitingFor, membersToWaitForLogEntries, TransformUtils.persistentMemberIdToLogEntryTransformer); StartupStatus.startup( - LocalizedStrings.CreatePersistentRegionProcessor_WAITING_FOR_LATEST_MEMBER, - new Object[] {regionPath, - TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()), - membersToWaitForLogEntries}); + LocalizedStrings.CreatePersistentRegionProcessor_WAITING_FOR_LATEST_MEMBER, regionPath, + TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()), + membersToWaitForLogEntries); } else { - TransformUtils.transform(allMembersToWaitFor, membersToWaitForLogEntries, + TransformUtils.transform(allMembersWaitingFor, membersToWaitForLogEntries, TransformUtils.persistentMemberIdToLogEntryTransformer); StartupStatus.startup( LocalizedStrings.CreatePersistentRegionProcessor_WAITING_FOR_ONLINE_LATEST_MEMBER, - new Object[] {regionPath, - TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()), - membersToWaitForLogEntries}); + regionPath, + TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()), + membersToWaitForLogEntries); } } - protected void checkInterruptedByShutdownAll() {} - - protected class MembershipChangeListener implements MembershipListener, PersistentStateListener { - - private boolean warned = false; - private final long warningTime; - - public MembershipChangeListener() { - long waitThreshold = advisor.getDistributionManager().getConfig().getAckWaitThreshold(); - warningTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(waitThreshold); + @Override + public void clearEqualMembers() { + synchronized (lock) { + equalMembers.clear(); } + } - private boolean membershipChanged = false; - - public void waitForChange(Set<PersistentMemberID> allMembersToWaitFor, - Set<PersistentMemberID> offlineMembersToWaitFor) throws InterruptedException { - synchronized (this) { - try { - setWaitingOnMembers(allMembersToWaitFor, offlineMembersToWaitFor); - long exitTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(PERSISTENT_VIEW_RETRY); - while (!membershipChanged && !isClosed) { - checkInterruptedByShutdownAll(); - advisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null); - this.wait(100); - long time = System.nanoTime(); - - // Fix for #50415 go out and message other members to see if there - // status has changed. This handles any case where we might have - // missed a notification due to concurrent startup. - if (time > exitTime) { - break; - } - - if (!warned && time > warningTime) { - - logWaitingForMember(allMembersToWaitFor, offlineMembersToWaitFor); + @Override + public void checkInterruptedByShutdownAll() {} - warned = true; - } - } - this.membershipChanged = false; - } finally { - setWaitingOnMembers(null, null); - } - } - } + @Override + public void close() { + isClosed = true; + persistentMemberManager.removeRevocationListener(profileChangeListener); + cacheDistributionAdvisor.removeProfileChangeListener(profileChangeListener); + releaseTieLock(); + } - public void memberJoined(DistributionManager distributionManager, - InternalDistributedMember id) { - afterMembershipChange(); - } - private void afterMembershipChange() { - synchronized (this) { - this.membershipChanged = true; - this.notifyAll(); + /** + * Try to acquire the distributed lock which members must grab for in the case of a tie. Whoever + * gets the lock initializes first. + */ + @Override + public boolean acquireTieLock() { + // We're tied for the latest copy of the data. try to get the distributed lock. + holdingTieLock = distributedLockService.lock("PERSISTENCE_" + regionPath, 0, -1); + if (!holdingTieLock) { + if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { + logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Failed to acquire the lock.", + shortDiskStoreId(), regionPath); } } + return holdingTieLock; + } - public void memberDeparted(DistributionManager distributionManager, - InternalDistributedMember id, boolean crashed) { - afterMembershipChange(); + @Override + public void releaseTieLock() { + if (holdingTieLock) { + distributedLockService.unlock("PERSISTENCE_" + regionPath); + holdingTieLock = false; } + } - public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id, - InternalDistributedMember whoSuspected, String reason) {} + private boolean wasAboutToDestroy() { + return persistentMemberView.wasAboutToDestroy() + || persistentMemberView.wasAboutToDestroyDataStorage(); + } - @Override - public void quorumLost(DistributionManager distributionManager, - Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {} + protected synchronized void resetState() { + online = false; + removedMembers.clear(); + } - public void memberOffline(InternalDistributedMember member, PersistentMemberID persistentID) { - afterMembershipChange(); + public void flushMembershipChanges() { + try { + cacheDistributionAdvisor.waitForCurrentOperations(); + } catch (RegionDestroyedException ignored) { } + } - public void memberOnline(InternalDistributedMember member, PersistentMemberID persistentID) { - afterMembershipChange(); + @Override + public void persistMembersOfflineAndEqual( + Map<InternalDistributedMember, PersistentMemberID> map) { + for (PersistentMemberID persistentID : map.values()) { + persistentMemberView.memberOfflineAndEqual(persistentID); } + } - public void memberRemoved(PersistentMemberID id, boolean revoked) { - afterMembershipChange(); - } + @Override + public DiskStoreID getDiskStoreID() { + return persistentMemberView.getDiskStoreID(); + } + + @Override + public boolean isOnline() { + return online; + } + + public interface PersistenceAdvisorObserver { + void observe(String regionPath); } private class ProfileChangeListener implements ProfileListener, MemberRevocationListener { + @Override public void profileCreated(Profile profile) { profileUpdated(profile); } - public void profileRemoved(Profile profile, boolean regionDestroyed) { + @Override + public void profileRemoved(Profile profile, boolean destroyed) { CacheProfile cp = (CacheProfile) profile; if (cp.persistentID != null) { - if (regionDestroyed) { + if (destroyed) { memberRemoved(cp.persistentID, false); } else { memberOffline(profile.getDistributedMember(), cp.persistentID); @@ -1281,6 +1118,7 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } } + @Override public void profileUpdated(Profile profile) { CacheProfile cp = (CacheProfile) profile; if (cp.persistentID != null && cp.persistenceInitialized) { @@ -1288,14 +1126,17 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } } + @Override public void revoked(PersistentMemberPattern pattern) { memberRevoked(pattern); } + @Override public Set<PersistentMemberID> getMissingMemberIds() { return getMissingMembers(); } + @Override public String getRegionPath() { return getRegionPathForOfflineMembers(); } @@ -1317,52 +1158,4 @@ public class PersistenceAdvisorImpl implements PersistenceAdvisor { } } } - - public void close() { - isClosed = true; - memberManager.removeRevocationListener(listener); - advisor.removeProfileChangeListener(listener); - releaseTieLock(); - } - - private boolean wasAboutToDestroy() { - return storage.wasAboutToDestroy() || storage.wasAboutToDestroyDataStorage(); - } - - protected synchronized void resetState() { - this.online = false; - this.removedMembers = new HashSet<PersistentMemberID>(); - } - - public void flushMembershipChanges() { - try { - advisor.waitForCurrentOperations(); - } catch (RegionDestroyedException e) { - // continue with the next region - } - - } - - public void persistMembersOfflineAndEqual( - Map<InternalDistributedMember, PersistentMemberID> map) { - for (PersistentMemberID persistentID : map.values()) { - storage.memberOfflineAndEqual(persistentID); - } - } - - public DiskStoreID getDiskStoreID() { - return storage.getDiskStoreID(); - } - - public boolean isOnline() { - return online; - } - - public interface PersistenceAdvisorObserver { - default void observe(String regionPath) {} - } - - public static void setPersistenceAdvisorObserver(PersistenceAdvisorObserver o) { - observer = o; - } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisor.java new file mode 100644 index 0000000..71e969a --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisor.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.persistence; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.CacheDistributionAdvisor; +import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice; +import org.apache.geode.internal.i18n.LocalizedStrings; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.logging.log4j.LocalizedMessage; +import org.apache.geode.internal.logging.log4j.LogMarker; + +public class PersistenceInitialImageAdvisor { + + private static final Logger logger = LogService.getLogger(); + + private final InternalPersistenceAdvisor persistenceAdvisor; + private final String shortDiskStoreID; + private final String regionPath; + private final CacheDistributionAdvisor cacheDistributionAdvisor; + private final boolean hasDiskImageToRecoverFrom; + + public PersistenceInitialImageAdvisor(InternalPersistenceAdvisor persistenceAdvisor, + String shortDiskStoreID, String regionPath, CacheDistributionAdvisor cacheDistributionAdvisor, + boolean hasDiskImageToRecoverFrom) { + this.persistenceAdvisor = persistenceAdvisor; + this.shortDiskStoreID = shortDiskStoreID; + this.regionPath = regionPath; + this.cacheDistributionAdvisor = cacheDistributionAdvisor; + this.hasDiskImageToRecoverFrom = hasDiskImageToRecoverFrom; + } + + public InitialImageAdvice getAdvice(InitialImageAdvice previousAdvice) { + final boolean isPersistAdvisorDebugEnabled = + logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE); + + MembershipChangeListener listener = new MembershipChangeListener(persistenceAdvisor); + cacheDistributionAdvisor.addMembershipAndProxyListener(listener); + persistenceAdvisor.addListener(listener); + try { + while (true) { + cacheDistributionAdvisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null); + try { + // On first pass, previous advice is null. On subsequent passes, it's the advice + // from the previous iteration. + InitialImageAdvice advice = + cacheDistributionAdvisor.adviseInitialImage(previousAdvice, true); + + if (hasReplicates(advice)) { + if (hasDiskImageToRecoverFrom) { + removeReplicatesIfWeAreEqualToAnyOrElseClearEqualMembers(advice.getReplicates()); + } + return advice; + } else if (hasNonPersistentMember(advice)) { + updateMembershipViewFromAnyPeer(advice.getNonPersistent(), hasDiskImageToRecoverFrom); + } + + // Fix for 51698 - If there are online members that we previously failed to get a GII + // from, retry those members rather than wait for new persistent members to recover. + if (hasReplicates(previousAdvice)) { + logger.info( + LocalizedMessage.create(LocalizedStrings.PersistenceAdvisorImpl_RETRYING_GII)); + previousAdvice = null; + continue; + } + + Set<PersistentMemberID> previouslyOnlineMembers = + persistenceAdvisor.getPersistedOnlineOrEqualMembers(); + + // If there are no currently online members, and no previously online members, this member + // should just go with what's on its own disk + if (previouslyOnlineMembers.isEmpty()) { + if (isPersistAdvisorDebugEnabled()) { + logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, + "{}-{}: No previously online members. Recovering with the data from the local disk", + shortDiskStoreID, regionPath); + } + return advice; + } + + Set<PersistentMemberID> offlineMembers = new HashSet<>(); + Set<PersistentMemberID> membersToWaitFor = + persistenceAdvisor.getMembersToWaitFor(previouslyOnlineMembers, offlineMembers); + + if (membersToWaitFor.isEmpty()) { + if (isPersistAdvisorDebugEnabled()) { + logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, + "{}-{}: All of the previously online members are now online and waiting for us. Acquiring tie lock. Previously online members {}", + shortDiskStoreID, regionPath, advice.getReplicates()); + } + if (persistenceAdvisor.acquireTieLock()) { + return refreshInitialImageAdviceAndThenCheckMyStateWithReplicates(previousAdvice); + } + } else { + if (isPersistAdvisorDebugEnabled) { + logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, + "{}-{}: Going to wait for these member ids: {}", shortDiskStoreID, regionPath, + membersToWaitFor); + } + } + + waitForMembershipChangeForMissingDiskStores(listener, offlineMembers, membersToWaitFor); + } catch (InterruptedException e) { + logger.debug("Interrupted while trying to determine latest persisted copy", e); + } + } + } finally { + persistenceAdvisor.setWaitingOnMembers(null, null); + cacheDistributionAdvisor.removeMembershipAndProxyListener(listener); + persistenceAdvisor.removeListener(listener); + } + } + + private void updateMembershipViewFromAnyPeer(Set<InternalDistributedMember> peers, + boolean recoverFromDisk) { + for (InternalDistributedMember peer : peers) { + try { + persistenceAdvisor.updateMembershipView(peer, recoverFromDisk); + return; + } catch (ReplyException e) { + if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { + logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "Failed to update membership view", e); + } + } + } + } + + private InitialImageAdvice refreshInitialImageAdviceAndThenCheckMyStateWithReplicates( + InitialImageAdvice previousAdvice) { + if (isPersistAdvisorDebugEnabled()) { + logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, + "{}-{}: Acquired the lock. This member will initialize", shortDiskStoreID, regionPath); + } + InitialImageAdvice advice = cacheDistributionAdvisor.adviseInitialImage(previousAdvice, true); + if (hasReplicates(advice)) { + if (isPersistAdvisorDebugEnabled()) { + logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, + "{}-{}: Another member has initialized while we were getting the lock. We will initialize from that member", + shortDiskStoreID, regionPath); + } + persistenceAdvisor.checkMyStateOnMembers(advice.getReplicates()); + } + return advice; + } + + private boolean hasNonPersistentMember(InitialImageAdvice advice) { + return !advice.getNonPersistent().isEmpty(); + } + + /** + * if one or more replicates are equal to this member: remove replicates from advice, return + * advice for GII loop + */ + private void removeReplicatesIfWeAreEqualToAnyOrElseClearEqualMembers( + Set<InternalDistributedMember> replicates) { + if (isPersistAdvisorDebugEnabled()) { + logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, + "{}-{}: There are members currently online. Checking for our state on those members and then initializing", + shortDiskStoreID, regionPath); + } + // Check with these members to make sure that they have heard of us. If any of them + // say we have the same data on disk, we don't need to do a GII. + boolean weAreEqualToAReplicate = persistenceAdvisor.checkMyStateOnMembers(replicates); + if (weAreEqualToAReplicate) { + // prevent GII by removing all replicates + removeReplicates(replicates); + } else { + persistenceAdvisor.clearEqualMembers(); + } + // Either a replicate has said we're equal and we've cleared replicates, + // or none of them said we're equal and we've cleared our equal members. + // We had replicates. Now one of these things is true: + // - We've cleared replicates (meaning we're equal to one, so can load from disk) + // - No replicates report we're equal (so must GII from one, which we indicate by clearing equal + // members). + } + + private boolean hasReplicates(InitialImageAdvice advice) { + return advice != null && !advice.getReplicates().isEmpty(); + } + + private void removeReplicates(Set<InternalDistributedMember> replicates) { + if (isPersistAdvisorDebugEnabled()) { + logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, + "{}-{}: We have the same data on disk as one of {} recovering gracefully", + shortDiskStoreID, regionPath, replicates); + } + replicates.clear(); + } + + private void waitForMembershipChangeForMissingDiskStores(MembershipChangeListener listener, + Set<PersistentMemberID> offlineMembers, Set<PersistentMemberID> membersToWaitFor) + throws InterruptedException { + persistenceAdvisor.beginWaitingForMembershipChange(membersToWaitFor); + try { + // The persistence advisor needs to know which members are really not available because the + // user uses this information to decide which members they haven't started yet. + // membersToWaitFor includes members that are still waiting to start up, but are waiting for + // members other than the current member. So we pass the set of offline members here. + + persistenceAdvisor.setWaitingOnMembers(membersToWaitFor, offlineMembers); + listener.waitForChange(); + } finally { + persistenceAdvisor.endWaitingForMembershipChange(); + } + } + + private boolean isPersistAdvisorDebugEnabled() { + return logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java index a039628..91ede6c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java @@ -16,6 +16,7 @@ package org.apache.geode.internal.lang; import java.util.Optional; + /** * The SystemPropertyHelper class is an helper class for accessing system properties used in geode. * The method name to get the system property should be the same as the system property name. @@ -65,6 +66,9 @@ public class SystemPropertyHelper { public static final String THREAD_ID_EXPIRY_TIME_PROPERTY = "threadIdExpiryTime"; + public static final String PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS = + "PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS"; + /** * This method will try to look up "geode." and "gemfire." versions of the system property. It * will check and prefer "geode." setting first, then try to check "gemfire." setting. diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisorTest.java new file mode 100644 index 0000000..f3f4f54 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisorTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.persistence; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toSet; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNotNull; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.IntStream; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.RestoreSystemProperties; +import org.junit.experimental.categories.Category; +import org.mockito.InOrder; + +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.CacheDistributionAdvisor; +import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice; +import org.apache.geode.internal.lang.SystemPropertyHelper; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class PersistenceInitialImageAdvisorTest { + @Rule + public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + + private InternalPersistenceAdvisor persistenceAdvisor; + private CacheDistributionAdvisor cacheDistributionAdvisor; + private PersistenceInitialImageAdvisor persistenceInitialImageAdvisor; + + @Before + public void setup() { + cacheDistributionAdvisor = mock(CacheDistributionAdvisor.class, RETURNS_DEEP_STUBS); + when(cacheDistributionAdvisor.getDistributionManager().getConfig().getAckWaitThreshold()) + .thenReturn(15); + + persistenceAdvisor = mock(InternalPersistenceAdvisor.class); + when(persistenceAdvisor.getCacheDistributionAdvisor()).thenReturn(cacheDistributionAdvisor); + + persistenceInitialImageAdvisor = new PersistenceInitialImageAdvisor(persistenceAdvisor, + "short disk store ID", "region path", cacheDistributionAdvisor, true); + } + + @Test + public void publishesListOfMissingMembersWhenWaitingForMissingMembers() { + System.setProperty("geode." + SystemPropertyHelper.PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS, "0"); + Set<PersistentMemberID> offlineMembersToWaitFor = givenOfflineMembersToWaitFor(1); + + when(cacheDistributionAdvisor.adviseInitialImage(null, true)) + .thenReturn(adviceWithReplicates(0), adviceWithReplicates(1)); + + persistenceInitialImageAdvisor.getAdvice(null); + + verify(persistenceAdvisor, times(2)).setWaitingOnMembers(any(), any()); + + InOrder inOrder = inOrder(persistenceAdvisor); + inOrder.verify(persistenceAdvisor, times(1)).setWaitingOnMembers(isNotNull(), + eq(offlineMembersToWaitFor)); + inOrder.verify(persistenceAdvisor, times(1)).setWaitingOnMembers(isNull(), isNull()); + } + + private Set<PersistentMemberID> givenOfflineMembersToWaitFor(int memberCount) { + HashSet<PersistentMemberID> offlineMembersToWaitFor = + IntStream.range(0, memberCount).mapToObj(i -> persistentMemberID("offline member " + i)) + .distinct().collect(toCollection(HashSet::new)); + Set<PersistentMemberID> membersToWaitFor = new HashSet<>(offlineMembersToWaitFor); + + when(persistenceAdvisor.getPersistedOnlineOrEqualMembers()).thenReturn(offlineMembersToWaitFor); + when(persistenceAdvisor.getMembersToWaitFor(any(), any())).thenAnswer(invocation -> { + Set<PersistentMemberID> previouslyOnlineMembers = invocation.getArgument(0); + Set<PersistentMemberID> offlineMembers = invocation.getArgument(1); + offlineMembers.addAll(previouslyOnlineMembers); + return membersToWaitFor; + }); + + return offlineMembersToWaitFor; + } + + private static InternalDistributedMember internalDistributedMember(String name) { + return mock(InternalDistributedMember.class, name); + } + + private static PersistentMemberID persistentMemberID(String name) { + return mock(PersistentMemberID.class, name); + } + + private static InitialImageAdvice adviceWithReplicates(int replicateCount) { + Set<InternalDistributedMember> replicates = IntStream.range(0, replicateCount) + .mapToObj(i -> internalDistributedMember("replicate " + i)).collect(toSet()); + return new InitialImageAdvice(replicates, emptySet(), emptySet(), emptySet(), emptySet(), + emptySet(), emptyMap()); + } +} -- To stop receiving notification emails like this one, please contact aging...@apache.org.