This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1f77a60  GEODE-5111: Set offline members to null only when done 
waiting for them (#1873)
1f77a60 is described below

commit 1f77a603e5766ed14e980c2ca52902219c2bdab0
Author: Dale Emery <d...@dhemery.com>
AuthorDate: Tue May 1 09:55:01 2018 -0700

    GEODE-5111: Set offline members to null only when done waiting for them 
(#1873)
    
    * GEODE-5111: Set offline members to null only when done waiting for them
---
 .../internal/cache/BucketPersistenceAdvisor.java   |  57 +-
 .../internal/cache/CacheDistributionAdvisor.java   |   2 +-
 .../persistence/InternalPersistenceAdvisor.java    |  44 +
 .../persistence/MembershipChangeListener.java      | 127 +++
 .../cache/persistence/PersistenceAdvisor.java      |  11 +-
 .../cache/persistence/PersistenceAdvisorImpl.java  | 949 ++++++++-------------
 .../PersistenceInitialImageAdvisor.java            | 229 +++++
 .../geode/internal/lang/SystemPropertyHelper.java  |   4 +
 .../PersistenceInitialImageAdvisorTest.java        | 120 +++
 9 files changed, 932 insertions(+), 611 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
index e3602d2..00b53f2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
@@ -31,6 +31,7 @@ import org.apache.geode.distributed.internal.ReplyException;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.PartitionedRegion.BucketLock;
 import 
org.apache.geode.internal.cache.partitioned.RedundancyAlreadyMetException;
+import org.apache.geode.internal.cache.persistence.MembershipChangeListener;
 import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
@@ -74,7 +75,7 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
   public void recoveryDone(RuntimeException e) {
     this.recovering = false;
     if (!getPersistedMembers().isEmpty()) {
-      ((BucketAdvisor) advisor).setHadPrimary();
+      ((BucketAdvisor) cacheDistributionAdvisor).setHadPrimary();
     }
     // Make sure any removes that we saw during recovery are
     // applied.
@@ -94,7 +95,8 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
     }
   }
 
-  protected void checkInterruptedByShutdownAll() {
+  @Override
+  public void checkInterruptedByShutdownAll() {
     // when ShutdownAll is on-going, break all the GII for BR
     if (proxyBucket.getCache().isCacheAtShutdownAll()) {
       throw proxyBucket.getCache().getCacheClosedException("Cache is being 
closed by ShutdownAll");
@@ -107,7 +109,7 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
   }
 
   @Override
-  protected void beginWaitingForMembershipChange(Set<PersistentMemberID> 
membersToWaitFor) {
+  public void beginWaitingForMembershipChange(Set<PersistentMemberID> 
membersToWaitFor) {
     if (recovering) {
       bucketLock.unlock();
     } else {
@@ -121,14 +123,13 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
   }
 
   @Override
-  protected void logWaitingForMember(Set<PersistentMemberID> 
allMembersToWaitFor,
-      Set<PersistentMemberID> offlineMembersToWaitFor) {
+  public void logWaitingForMembers() {
     // We only log the bucket level information at fine level.
     if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
       Set<String> membersToWaitForPrettyFormat = new HashSet<String>();
 
-      if (offlineMembersToWaitFor != null && 
!offlineMembersToWaitFor.isEmpty()) {
-        TransformUtils.transform(offlineMembersToWaitFor, 
membersToWaitForPrettyFormat,
+      if (offlineMembersWaitingFor != null && 
!offlineMembersWaitingFor.isEmpty()) {
+        TransformUtils.transform(offlineMembersWaitingFor, 
membersToWaitForPrettyFormat,
             TransformUtils.persistentMemberIdToLogEntryTransformer);
         logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, 
LocalizedMessage.create(
             
LocalizedStrings.BucketPersistenceAdvisor_WAITING_FOR_LATEST_MEMBER,
@@ -137,7 +138,7 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
                 
TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()),
                 membersToWaitForPrettyFormat}));
       } else {
-        TransformUtils.transform(allMembersToWaitFor, 
membersToWaitForPrettyFormat,
+        TransformUtils.transform(allMembersWaitingFor, 
membersToWaitForPrettyFormat,
             TransformUtils.persistentMemberIdToLogEntryTransformer);
         if (logger.isDebugEnabled()) {
           logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
@@ -154,7 +155,7 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
   }
 
   @Override
-  protected void endWaitingForMembershipChange() {
+  public void endWaitingForMembershipChange() {
     if (recovering) {
       bucketLock.lock();
       // We allow regions with persistent colocated children to exceed 
redundancy
@@ -193,17 +194,17 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
    *
    */
   public void initializeMembershipView() {
-    MembershipChangeListener listener = new MembershipChangeListener();
+    MembershipChangeListener listener = new MembershipChangeListener(this);
     addListener(listener);
     boolean interrupted = false;
     try {
       while (!isClosed) {
-        advisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null);
+        
cacheDistributionAdvisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null);
 
         // Look for any online copies of the bucket.
         // If there are any, get a membership view from them.
         Map<InternalDistributedMember, PersistentMemberID> onlineMembers =
-            advisor.adviseInitializedPersistentMembers();
+            cacheDistributionAdvisor.adviseInitializedPersistentMembers();
         if (onlineMembers != null) {
           if (updateMembershipView(onlineMembers.keySet())) {
             break;
@@ -211,7 +212,7 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
         }
 
         Set<InternalDistributedMember> postRecoveryMembers =
-            ((BucketAdvisor) advisor).adviseRecoveredFromDisk();
+            ((BucketAdvisor) 
cacheDistributionAdvisor).adviseRecoveredFromDisk();
         if (postRecoveryMembers != null) {
           if (updateMembershipView(postRecoveryMembers)) {
             break;
@@ -221,8 +222,9 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
         Set<PersistentMemberID> membersToWaitFor = getPersistedMembers();
 
         if (!membersToWaitFor.isEmpty()) {
+          setWaitingOnMembers(membersToWaitFor, membersToWaitFor);
           try {
-            listener.waitForChange(membersToWaitFor, membersToWaitFor);
+            listener.waitForChange();
           } catch (InterruptedException e) {
             interrupted = true;
           }
@@ -232,6 +234,7 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
         }
       }
     } finally {
+      setWaitingOnMembers(null, null);
       removeListener(listener);
       if (interrupted) {
         Thread.currentThread().interrupt();
@@ -258,12 +261,14 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
     this.resetState();
   }
 
+  @Override
   public boolean acquireTieLock() {
     // We don't actually need to get a dlock here for PRs, we're already
     // holding the bucket lock when we create a bucket region
     return true;
   }
 
+  @Override
   public void releaseTieLock() {
     // We don't actually need to get a dlock here for PRs, we're already
     // holding the bucket lock when we create a bucket region
@@ -281,14 +286,14 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
       return super.getMissingMembers();
     } else {
       Set<PersistentMemberID> offlineMembers = getPersistedMembers();
-      offlineMembers.removeAll(advisor.advisePersistentMembers().values());
+      
offlineMembers.removeAll(cacheDistributionAdvisor.advisePersistentMembers().values());
       return offlineMembers;
     }
   }
 
   @Override
   public PersistentMemberID generatePersistentID() {
-    PersistentMemberID id = storage.generatePersistentID();
+    PersistentMemberID id = persistentMemberView.generatePersistentID();
     if (id == null) {
       return id;
     } else {
@@ -321,37 +326,37 @@ public class BucketPersistenceAdvisor extends 
PersistenceAdvisorImpl {
    *
    */
   public void dump(String infoMsg) {
-    storage.getOnlineMembers();
-    storage.getOfflineMembers();
-    storage.getOfflineAndEqualMembers();
-    storage.getMyInitializingID();
-    storage.getMyPersistentID();
+    persistentMemberView.getOnlineMembers();
+    persistentMemberView.getOfflineMembers();
+    persistentMemberView.getOfflineAndEqualMembers();
+    persistentMemberView.getMyInitializingID();
+    persistentMemberView.getMyPersistentID();
     final StringBuilder buf = new StringBuilder(2000);
     if (infoMsg != null) {
       buf.append(infoMsg);
       buf.append(": ");
     }
     buf.append("\nMY PERSISTENT ID:\n");
-    buf.append(storage.getMyPersistentID());
+    buf.append(persistentMemberView.getMyPersistentID());
     buf.append("\nMY INITIALIZING ID:\n");
-    buf.append(storage.getMyInitializingID());
+    buf.append(persistentMemberView.getMyInitializingID());
 
     buf.append("\nONLINE MEMBERS:\n");
-    for (PersistentMemberID id : storage.getOnlineMembers()) {
+    for (PersistentMemberID id : persistentMemberView.getOnlineMembers()) {
       buf.append("\t");
       buf.append(id);
       buf.append("\n");
     }
 
     buf.append("\nOFFLINE MEMBERS:\n");
-    for (PersistentMemberID id : storage.getOfflineMembers()) {
+    for (PersistentMemberID id : persistentMemberView.getOfflineMembers()) {
       buf.append("\t");
       buf.append(id);
       buf.append("\n");
     }
 
     buf.append("\nOFFLINE AND EQUAL MEMBERS:\n");
-    for (PersistentMemberID id : storage.getOfflineAndEqualMembers()) {
+    for (PersistentMemberID id : 
persistentMemberView.getOfflineAndEqualMembers()) {
       buf.append("\t");
       buf.append(id);
       buf.append("\n");
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index d9efd55..8d2cce8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -955,7 +955,7 @@ public class CacheDistributionAdvisor extends 
DistributionAdvisor {
     private final Map<InternalDistributedMember, CacheProfile> memberProfiles;
 
 
-    protected InitialImageAdvice(Set<InternalDistributedMember> replicates,
+    public InitialImageAdvice(Set<InternalDistributedMember> replicates,
         Set<InternalDistributedMember> others, Set<InternalDistributedMember> 
preloaded,
         Set<InternalDistributedMember> empties, Set<InternalDistributedMember> 
uninitialized,
         Set<InternalDistributedMember> nonPersistent,
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/InternalPersistenceAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/InternalPersistenceAdvisor.java
new file mode 100644
index 0000000..53006cb
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/InternalPersistenceAdvisor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.persistence;
+
+
+import java.util.Set;
+
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor;
+
+public interface InternalPersistenceAdvisor extends PersistenceAdvisor {
+
+  default void beginWaitingForMembershipChange(Set<PersistentMemberID> 
membersToWaitFor) {}
+
+  void checkInterruptedByShutdownAll();
+
+  void clearEqualMembers();
+
+  default void endWaitingForMembershipChange() {}
+
+  Set<PersistentMemberID> getMembersToWaitFor(Set<PersistentMemberID> 
previouslyOnlineMembers,
+      Set<PersistentMemberID> offlineMembers) throws ReplyException;
+
+  boolean isClosed();
+
+  CacheDistributionAdvisor getCacheDistributionAdvisor();
+
+  void logWaitingForMembers();
+
+  void setWaitingOnMembers(Set<PersistentMemberID> allMembersToWaitFor,
+      Set<PersistentMemberID> offlineMembersToWaitFor);
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/MembershipChangeListener.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/MembershipChangeListener.java
new file mode 100644
index 0000000..86c1ac8
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/MembershipChangeListener.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.persistence;
+
+import static 
org.apache.geode.internal.lang.SystemPropertyHelper.PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS;
+import static 
org.apache.geode.internal.lang.SystemPropertyHelper.getProductIntegerProperty;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BooleanSupplier;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+public class MembershipChangeListener implements MembershipListener, 
PersistentStateListener {
+  private static final int POLL_INTERVAL_MILLIS = 100;
+
+  private final Runnable warning;
+  private final BooleanSupplier cancelCondition;
+  private final Duration pollDuration;
+  private final Duration warningDelay;
+
+  private boolean membershipChanged;
+  private boolean warned;
+
+  public MembershipChangeListener(InternalPersistenceAdvisor 
persistenceAdvisor) {
+    warningDelay = warningDelay(persistenceAdvisor);
+    cancelCondition = cancelCondition(persistenceAdvisor);
+    warning = persistenceAdvisor::logWaitingForMembers;
+    pollDuration = pollDuration();
+  }
+
+  private Duration warningDelay(InternalPersistenceAdvisor persistenceAdvisor) 
{
+    return Duration.ofSeconds(persistenceAdvisor.getCacheDistributionAdvisor()
+        .getDistributionManager().getConfig().getAckWaitThreshold());
+  }
+
+  public synchronized void waitForChange() throws InterruptedException {
+    Instant now = Instant.now();
+    Instant timeoutTime = now.plus(pollDuration);
+    Instant warningTime = now.plus(warningDelay);
+
+    while (!membershipChanged && !cancelCondition.getAsBoolean()
+        && Instant.now().isBefore(timeoutTime)) {
+      warnOnceAfter(warningTime);
+      wait(POLL_INTERVAL_MILLIS);
+    }
+    membershipChanged = false;
+  }
+
+  private void warnOnceAfter(Instant warningTime) {
+    if (!warned && warningTime.isBefore(Instant.now())) {
+      warning.run();
+      warned = true;
+    }
+  }
+
+  private synchronized void afterMembershipChange() {
+    membershipChanged = true;
+    notifyAll();
+  }
+
+  @Override
+  public void memberJoined(DistributionManager distributionManager, 
InternalDistributedMember id) {
+    afterMembershipChange();
+  }
+
+  @Override
+  public void memberDeparted(DistributionManager distributionManager, 
InternalDistributedMember id,
+      boolean crashed) {
+    afterMembershipChange();
+  }
+
+  @Override
+  public void memberSuspect(DistributionManager distributionManager, 
InternalDistributedMember id,
+      InternalDistributedMember whoSuspected, String reason) {}
+
+  @Override
+  public void quorumLost(DistributionManager distributionManager,
+      Set<InternalDistributedMember> failures, List<InternalDistributedMember> 
remaining) {}
+
+  @Override
+  public void memberOffline(InternalDistributedMember member, 
PersistentMemberID persistentID) {
+    afterMembershipChange();
+  }
+
+  @Override
+  public void memberOnline(InternalDistributedMember member, 
PersistentMemberID persistentID) {
+    afterMembershipChange();
+  }
+
+  @Override
+  public void memberRemoved(PersistentMemberID id, boolean revoked) {
+    afterMembershipChange();
+  }
+
+  private static BooleanSupplier cancelCondition(InternalPersistenceAdvisor 
persistenceAdvisor) {
+    CancelCriterion cancelCriterion =
+        
persistenceAdvisor.getCacheDistributionAdvisor().getAdvisee().getCancelCriterion();
+    return () -> {
+      persistenceAdvisor.checkInterruptedByShutdownAll();
+      cancelCriterion.checkCancelInProgress(null);
+      return persistenceAdvisor.isClosed();
+    };
+  }
+
+  private static Duration pollDuration() {
+    return Duration
+        
.ofSeconds(getProductIntegerProperty(PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS).orElse(5));
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisor.java
index 1ca82e4..c1204b5 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisor.java
@@ -21,7 +21,7 @@ import java.util.Set;
 import org.apache.geode.cache.persistence.ConflictingPersistentDataException;
 import org.apache.geode.distributed.internal.ReplyException;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.CacheDistributionAdvisor;
+import 
org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
 
 public interface PersistenceAdvisor {
 
@@ -37,7 +37,6 @@ public interface PersistenceAdvisor {
    */
   boolean acquireTieLock();
 
-
   /**
    * Determine the state of this member on it's peers, along with the 
PersistentMemberID of those
    * peers.
@@ -45,7 +44,7 @@ public interface PersistenceAdvisor {
    * @return a map from the peers persistentId to the state of this member 
according to that peer.
    */
   PersistentStateQueryResults 
getMyStateOnMembers(Set<InternalDistributedMember> members)
-      throws ReplyException, InterruptedException;
+      throws ReplyException;
 
   /**
    * Retrieve the state of a particular member from storage.
@@ -143,7 +142,7 @@ public interface PersistenceAdvisor {
    * @return true if we detected that we actually have the same data on disk 
as another member.
    */
   boolean checkMyStateOnMembers(Set<InternalDistributedMember> replicates)
-      throws ReplyException, InterruptedException, 
ConflictingPersistentDataException;
+      throws ReplyException, ConflictingPersistentDataException;
 
   void releaseTieLock();
 
@@ -156,8 +155,8 @@ public interface PersistenceAdvisor {
    * @throws ConflictingPersistentDataException if there are active members 
which are not based on
    *         the state that is persisted in this member.
    */
-  CacheDistributionAdvisor.InitialImageAdvice getInitialImageAdvice(
-      CacheDistributionAdvisor.InitialImageAdvice previousAdvice, boolean 
recoverFromDisk);
+  InitialImageAdvice getInitialImageAdvice(InitialImageAdvice previousAdvice,
+      boolean hasDiskImageToRecoverFrom);
 
   /**
    * Returns true if this member used to host data.
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
index 35ca50d..e5b1ef7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
@@ -18,11 +18,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.Logger;
 
@@ -32,9 +30,7 @@ import 
org.apache.geode.cache.persistence.ConflictingPersistentDataException;
 import org.apache.geode.cache.persistence.RevokedPersistentDataException;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
-import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.MembershipListener;
 import org.apache.geode.distributed.internal.ProfileListener;
 import org.apache.geode.distributed.internal.ReplyException;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -50,68 +46,70 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.process.StartupStatus;
 import org.apache.geode.internal.util.TransformUtils;
 
-public class PersistenceAdvisorImpl implements PersistenceAdvisor {
+public class PersistenceAdvisorImpl implements InternalPersistenceAdvisor {
 
   private static final Logger logger = LogService.getLogger();
+  private static final PersistenceAdvisorObserver 
DEFAULT_PERSISTENCE_ADVISOR_OBSERVER = s -> {
+  };
+  private static PersistenceAdvisorObserver persistenceAdvisorObserver =
+      DEFAULT_PERSISTENCE_ADVISOR_OBSERVER;
 
-  protected CacheDistributionAdvisor advisor;
-  private DistributedLockService dl;
-  protected String regionPath;
-  protected PersistentMemberView storage;
-  protected volatile boolean online = false;
-  private volatile Set<PersistentStateListener> listeners = 
Collections.emptySet();
-  private DiskRegionStats stats;
-  private PersistentMemberManager memberManager;
-  private ProfileChangeListener listener;
+  protected final Object lock;
+
+  protected final CacheDistributionAdvisor cacheDistributionAdvisor;
+  protected final String regionPath;
+  protected final PersistentMemberView persistentMemberView;
+  private final DiskRegionStats diskRegionStats;
+  private final PersistentMemberManager persistentMemberManager;
+  private final ProfileChangeListener profileChangeListener;
+
+  private final Set<PersistentMemberID> recoveredMembers;
+  private final Set<PersistentMemberID> removedMembers = new HashSet<>();
+  private final Set<PersistentMemberID> equalMembers;
+  private final DistributedLockService distributedLockService;
+
+  private volatile boolean holdingTieLock;
+
+  protected volatile boolean online;
+  private volatile Set<PersistentStateListener> persistentStateListeners = 
Collections.emptySet();
   private volatile boolean initialized;
   private volatile boolean shouldUpdatePersistentView;
   protected volatile boolean isClosed;
-  private volatile boolean holdingTieLock;
-
-  private Set<PersistentMemberID> recoveredMembers;
-  private Set<PersistentMemberID> removedMembers = new 
HashSet<PersistentMemberID>();
-  private Set<PersistentMemberID> equalMembers;
-  private volatile Set<PersistentMemberID> allMembersWaitingFor;
-  private volatile Set<PersistentMemberID> offlineMembersWaitingFor;
-  protected final Object lock;
-  private static PersistenceAdvisorObserver observer = null;
 
-  private static final int PERSISTENT_VIEW_RETRY =
-      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + 
"PERSISTENT_VIEW_RETRY", 5);
+  protected volatile Set<PersistentMemberID> allMembersWaitingFor;
+  protected volatile Set<PersistentMemberID> offlineMembersWaitingFor;
 
-  public PersistenceAdvisorImpl(CacheDistributionAdvisor advisor, 
DistributedLockService dl,
-      PersistentMemberView storage, String regionPath, DiskRegionStats 
diskStats,
-      PersistentMemberManager memberManager) {
-    this.advisor = advisor;
-    this.dl = dl;
+  public PersistenceAdvisorImpl(CacheDistributionAdvisor 
cacheDistributionAdvisor,
+      DistributedLockService distributedLockService, PersistentMemberView 
persistentMemberView,
+      String regionPath, DiskRegionStats diskRegionStats,
+      PersistentMemberManager persistentMemberManager) {
+    this.cacheDistributionAdvisor = cacheDistributionAdvisor;
+    this.distributedLockService = distributedLockService;
     this.regionPath = regionPath;
-    this.storage = storage;
-    this.stats = diskStats;
-    this.listener = new ProfileChangeListener();
-    this.memberManager = memberManager;
-
-    // Prevent membership changes while we are persisting the membership view
-    // online. TODO prpersist is this the best thing to sync on?
-    // If we synchronize on something else, we need to be careful about
-    // lock ordering because the membership notifications are called
-    // with the advisor lock held.
-    this.lock = advisor;
-
-    // Remember which members we know about because of what
-    // we have persisted
-    // We will later use this to handle updates from peers.
+    this.persistentMemberView = persistentMemberView;
+    this.diskRegionStats = diskRegionStats;
+    profileChangeListener = new ProfileChangeListener();
+    this.persistentMemberManager = persistentMemberManager;
+
+    // Prevent membership changes while we are persisting the membership view 
online. If we
+    // synchronize on something else, we need to be careful about lock 
ordering because the
+    // membership notifications are called with the advisor lock held.
+    lock = cacheDistributionAdvisor;
+
+    // Remember which members we know about because of what we have persisted. 
We will later use
+    // this to handle updates from peers.
     recoveredMembers = getPersistedMembers();
 
-    // To prevent races if we crash during initialization,
-    // mark equal members as online before we initialize. We will
-    // still report these members as equal, but if we crash and recover
-    // they will no longer be considered equal.
-    equalMembers = new 
HashSet<PersistentMemberID>(storage.getOfflineAndEqualMembers());
+    // To prevent races if we crash during initialization, mark equal members 
as online before we
+    // initialize. We will still report these members as equal, but if we 
crash and recover they
+    // will no longer be considered equal.
+    equalMembers = new 
HashSet<>(persistentMemberView.getOfflineAndEqualMembers());
     for (PersistentMemberID id : equalMembers) {
-      storage.memberOnline(id);
+      persistentMemberView.memberOnline(id);
     }
   }
 
+  @Override
   public void initialize() {
     if (initialized) {
       return;
@@ -123,10 +121,10 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       finishPendingDestroy();
     }
 
-    advisor.addProfileChangeListener(listener);
+    cacheDistributionAdvisor.addProfileChangeListener(profileChangeListener);
 
-    Set<PersistentMemberPattern> revokedMembers =
-        this.memberManager.addRevocationListener(listener, 
storage.getRevokedMembers());
+    Set<PersistentMemberPattern> revokedMembers = persistentMemberManager
+        .addRevocationListener(profileChangeListener, 
persistentMemberView.getRevokedMembers());
 
     for (PersistentMemberPattern pattern : revokedMembers) {
       memberRevoked(pattern);
@@ -141,8 +139,8 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
   /**
    * Adds a PersistentStateListener whose job is to log changes in the 
persistent view.
    */
-  protected void startMemberLogging() {
-    this.addListener(new PersistentStateListener.PersistentStateAdapter() {
+  private void startMemberLogging() {
+    addListener(new PersistentStateListener.PersistentStateAdapter() {
       /**
        * A persistent member has gone offline. Log the offline member and log 
which persistent
        * members are still online (the current persistent view).
@@ -150,19 +148,18 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       @Override
       public void memberOffline(InternalDistributedMember member, 
PersistentMemberID persistentID) {
         if (logger.isDebugEnabled()) {
-          Set<String> onlineMembers = new HashSet<String>();
 
-          Set<PersistentMemberID> members = new HashSet<PersistentMemberID>();
-          members.addAll(
-              
PersistenceAdvisorImpl.this.advisor.adviseInitializedPersistentMembers().values());
+          Set<PersistentMemberID> members =
+              new 
HashSet<>(cacheDistributionAdvisor.adviseInitializedPersistentMembers().values());
           members.remove(persistentID);
 
+          Set<String> onlineMembers = new HashSet<>();
           TransformUtils.transform(members, onlineMembers,
               TransformUtils.persistentMemberIdToLogEntryTransformer);
 
           logger.info(LocalizedMessage.create(
               LocalizedStrings.PersistenceAdvisorImpl_PERSISTENT_VIEW,
-              new Object[] {PersistenceAdvisorImpl.this.regionPath,
+              new Object[] {regionPath,
                   
TransformUtils.persistentMemberIdToLogEntryTransformer.transform(persistentID),
                   onlineMembers}));
         }
@@ -170,31 +167,18 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
     });
   }
 
-  public boolean acquireTieLock() {
-    holdingTieLock = dl.lock("PERSISTENCE_" + regionPath, 0, -1);
-    return holdingTieLock;
-  }
-
-  public void releaseTieLock() {
-    if (holdingTieLock) {
-      dl.unlock("PERSISTENCE_" + regionPath);
-      holdingTieLock = false;
-    }
-  }
-
+  @Override
   public PersistentStateQueryResults 
getMyStateOnMembers(Set<InternalDistributedMember> members)
       throws ReplyException {
-
-    PersistentStateQueryResults results =
-        PersistentStateQueryMessage.send(members, 
advisor.getDistributionManager(), regionPath,
-            storage.getMyPersistentID(), storage.getMyInitializingID());
-
-    return results;
+    return PersistentStateQueryMessage.send(members,
+        cacheDistributionAdvisor.getDistributionManager(), regionPath,
+        persistentMemberView.getMyPersistentID(), 
persistentMemberView.getMyInitializingID());
   }
 
   /**
    * Return what state we have persisted for a given peer's id.
    */
+  @Override
   public PersistentMemberState getPersistedStateOfMember(PersistentMemberID 
id) {
     if (isRevoked(id)) {
       return PersistentMemberState.REVOKED;
@@ -205,17 +189,17 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       return PersistentMemberState.EQUAL;
     }
 
-    // If we have a member that is marked as online that
-    // is an older version of the peers id, tell them they are online
-    for (PersistentMemberID online : storage.getOnlineMembers()) {
-      if (online.isOlderOrEqualVersionOf(id)) {
+    // If we have a member that is marked as online that is an older version 
of the peers id, tell
+    // them they are online
+    for (PersistentMemberID onlineMember : 
persistentMemberView.getOnlineMembers()) {
+      if (onlineMember.isOlderOrEqualVersionOf(id)) {
         return PersistentMemberState.ONLINE;
       }
     }
 
-    // If we have a member that is marked as offline that
-    // is a newer version of the peers id, tell them they are online
-    for (PersistentMemberID offline : storage.getOfflineMembers()) {
+    // If we have a member that is marked as offline that is a newer version 
of the peers id, tell
+    // them they are online
+    for (PersistentMemberID offline : 
persistentMemberView.getOfflineMembers()) {
       if (id.isOlderOrEqualVersionOf(offline)) {
         return PersistentMemberState.OFFLINE;
       }
@@ -223,22 +207,23 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
     return null;
   }
 
-  public void updateMembershipView(InternalDistributedMember replicate,
-      boolean targetReinitializing) {
+  @Override
+  public void updateMembershipView(InternalDistributedMember peer, boolean 
targetReinitializing) {
     beginUpdatingPersistentView();
-    DistributionManager dm = advisor.getDistributionManager();
-    PersistentMembershipView view =
-        MembershipViewRequest.send(replicate, dm, regionPath, 
targetReinitializing);
+    DistributionManager dm = cacheDistributionAdvisor.getDistributionManager();
+    PersistentMembershipView peersPersistentMembershipView =
+        MembershipViewRequest.send(peer, dm, regionPath, targetReinitializing);
     if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
       logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Updating 
persistent view from {}",
-          shortDiskStoreId(), regionPath, replicate);
+          shortDiskStoreId(), regionPath, peer);
     }
 
     synchronized (lock) {
       PersistentMemberID myId = getPersistentID();
       Map<InternalDistributedMember, PersistentMemberID> peersOnlineMembers =
-          view.getOnlineMembers();
-      Set<PersistentMemberID> peersOfflineMembers = view.getOfflineMembers();
+          peersPersistentMembershipView.getOnlineMembers();
+      Set<PersistentMemberID> peersOfflineMembers =
+          peersPersistentMembershipView.getOfflineMembers();
 
       for (PersistentMemberID id : peersOnlineMembers.values()) {
         if (!isRevoked(id) && !removedMembers.contains(id)) {
@@ -247,29 +232,28 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
             if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
               logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
                   "{}-{}: Processing membership view from peer. Marking {} as 
online because {} says its online",
-                  shortDiskStoreId(), regionPath, id, replicate);
+                  shortDiskStoreId(), regionPath, id, peer);
             }
-            storage.memberOnline(id);
+            persistentMemberView.memberOnline(id);
           }
         }
       }
 
       for (PersistentMemberID id : peersOfflineMembers) {
         if (!isRevoked(id) && !removedMembers.contains(id)) {
-          // This method is called before the current member is online.
-          // if the peer knows about a member that the current member doesn't 
know
-          // about, that means that member must have been added to the DS after
-          // the current member went offline. Therefore, that member is 
*newer* than
-          // the current member. So mark that member as online (meaning, 
online later
+          // This method is called before the current member is online. if the 
peer knows about a
+          // member that the current member doesn't know about, that means 
that member must have
+          // been added to the DS after the current member went offline. 
Therefore, that member is
+          // *newer* than the current member. So mark that member as online 
(meaning, online later
           // than the current member).
           if (!id.equals(myId) && !recoveredMembers.remove(id)
               && !id.diskStoreId.equals(getDiskStoreID())) {
             if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
               logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
                   "{}-{}: Processing membership view from peer. Marking {} as 
online because {} says its offline, but we have never seen it",
-                  shortDiskStoreId(), regionPath, id, replicate);
+                  shortDiskStoreId(), regionPath, id, peer);
             }
-            storage.memberOnline(id);
+            persistentMemberView.memberOnline(id);
           }
         }
       }
@@ -279,25 +263,25 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
         if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
           logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
               "{}-{}: Processing membership view from peer. Removing {} 
because {} doesn't have it",
-              shortDiskStoreId(), regionPath, id, replicate);
+              shortDiskStoreId(), regionPath, id, peer);
         }
-        storage.memberRemoved(id);
+        persistentMemberView.memberRemoved(id);
       }
     }
 
-    // Update the set of revoked members from the peer
-    // This should be called without holding the lock to
-    // avoid deadlocks
-    Set<PersistentMemberPattern> revokedMembers = view.getRevokedMembers();
+    // Update the set of revoked members from the peer. This should be called 
without holding the
+    // lock to avoid deadlocks
+    Set<PersistentMemberPattern> revokedMembers = 
peersPersistentMembershipView.getRevokedMembers();
     for (PersistentMemberPattern revoked : revokedMembers) {
-      memberManager.revokeMember(revoked);
+      persistentMemberManager.revokeMember(revoked);
     }
   }
 
-  protected boolean isRevoked(PersistentMemberID id) {
-    return memberManager.isRevoked(this.regionPath, id);
+  private boolean isRevoked(PersistentMemberID id) {
+    return persistentMemberManager.isRevoked(regionPath, id);
   }
 
+  @Override
   public void setOnline(boolean didGII, boolean atomicCreation, 
PersistentMemberID newId)
       throws ReplyException {
     if (online) {
@@ -310,65 +294,57 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
 
     synchronized (lock) {
 
-      // Transition any members that are marked as online, but not actually
-      // currently running, to offline.
+      // Transition any members that are marked as online, but not actually 
currently running, to
+      // offline.
       Set<PersistentMemberID> membersToMarkOffline =
-          new HashSet<PersistentMemberID>(storage.getOnlineMembers());
+          new HashSet<>(persistentMemberView.getOnlineMembers());
       Map<InternalDistributedMember, PersistentMemberID> onlineMembers;
       if (!atomicCreation) {
-        onlineMembers = advisor.adviseInitializedPersistentMembers();
+        onlineMembers = 
cacheDistributionAdvisor.adviseInitializedPersistentMembers();
       } else {
-        // Fix for 41100 - If this is an atomic bucket creation, don't
-        // mark our peers, which are concurrently intitializing, as offline
-        // they have the exact same data as we do (none), so we are not
-        // technically "newer," and this avoids a race where both members
-        // can think the other is offline ("older").
-        onlineMembers = advisor.advisePersistentMembers();
+        // Fix for 41100 - If this is an atomic bucket creation, don't mark 
our peers, which are
+        // concurrently initializing, as offline they have the exact same data 
as we do (none), so
+        // we are not technically "newer," and this avoids a race where both 
members can think the
+        // other is offline ("older").
+        onlineMembers = cacheDistributionAdvisor.advisePersistentMembers();
       }
       membersToMarkOffline.removeAll(onlineMembers.values());
 
-      // Another fix for 41100
-      // Don't mark equal members as offline if that are currently running.
-      // We don't have newer data than these members
-      // so this is safe, and it it avoids a race where we mark them offline
-      // at this point, and then later they mark us as offline.
+      // Another fix for 41100 - Don't mark equal members as offline if that 
are currently running.
+      // We don't have newer data than these members so this is safe, and it 
it avoids a race where
+      // we mark them offline at this point, and then later they mark us as 
offline.
       if (equalMembers != null && !equalMembers.isEmpty()) {
 
-        // This is slightly hacky. We're looking for a running member that has
-        // the same disk store as our equal members, because all have is a 
persistent
-        // id of the equal members. The persistent id of the running member 
may be
-        // different than what we have marked as equal, because the id in the 
profile
-        // is the new id for the member.
-        Collection<PersistentMemberID> allMembers = 
advisor.advisePersistentMembers().values();
-        Set<DiskStoreID> runningDiskStores = new HashSet<DiskStoreID>();
+        // This is slightly hacky. We're looking for a running member that has 
the same disk store
+        // as our equal members, because all have is a persistent id of the 
equal members. The
+        // persistent id of the running member may be different than what we 
have marked as equal,
+        // because the id in the profile is the new id for the member.
+        Collection<PersistentMemberID> allMembers =
+            cacheDistributionAdvisor.advisePersistentMembers().values();
+        Set<DiskStoreID> runningDiskStores = new HashSet<>();
         for (PersistentMemberID mem : allMembers) {
           runningDiskStores.add(mem.diskStoreId);
         }
         // Remove any equal members which are not actually running right now.
-        for (Iterator<PersistentMemberID> itr = equalMembers.iterator(); 
itr.hasNext();) {
-          PersistentMemberID id = itr.next();
-          if (!runningDiskStores.contains(id.diskStoreId)) {
-            itr.remove();
-          }
-        }
+        equalMembers.removeIf(id -> 
!runningDiskStores.contains(id.diskStoreId));
         membersToMarkOffline.removeAll(equalMembers);
       }
       for (PersistentMemberID id : membersToMarkOffline) {
-        storage.memberOffline(id);
+        persistentMemberView.memberOffline(id);
       }
       if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
         logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
             "{}-{}: Persisting the new membership view and ID as online. 
Online members {}. Offline members {}. Equal memebers {}.",
-            shortDiskStoreId(), regionPath, storage.getOnlineMembers(), 
storage.getOfflineMembers(),
-            equalMembers);
+            shortDiskStoreId(), regionPath, 
persistentMemberView.getOnlineMembers(),
+            persistentMemberView.getOfflineMembers(), equalMembers);
       }
 
-      storage.setInitialized();
+      persistentMemberView.setInitialized();
       online = true;
-      removedMembers = Collections.emptySet();
+      removedMembers.clear();
     }
-    if (stats != null) {
-      stats.incInitializations(!didGII);
+    if (diskRegionStats != null) {
+      diskRegionStats.incInitializations(!didGII);
     }
   }
 
@@ -386,7 +362,7 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       if (!shouldUpdatePersistentView) {
         shouldUpdatePersistentView = true;
         Map<InternalDistributedMember, PersistentMemberID> onlineMembers =
-            advisor.adviseInitializedPersistentMembers();
+            cacheDistributionAdvisor.adviseInitializedPersistentMembers();
         for (Map.Entry<InternalDistributedMember, PersistentMemberID> entry : 
onlineMembers
             .entrySet()) {
           memberOnline(entry.getKey(), entry.getValue());
@@ -395,38 +371,37 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
     }
   }
 
+  @Override
   public void setInitializing(PersistentMemberID newId) {
-
     beginUpdatingPersistentView();
 
-    DistributionManager dm = advisor.getDistributionManager();
+    DistributionManager dm = cacheDistributionAdvisor.getDistributionManager();
 
     PersistentMemberID oldId = getPersistentID();
     PersistentMemberID initializingId = getInitializingID();
 
-    Set profileUpdateRecipients = advisor.adviseProfileUpdate();
-    if (newId == null || (!newId.equals(oldId) && 
!newId.equals(initializingId))) {
+    Set<InternalDistributedMember> profileUpdateRecipients =
+        cacheDistributionAdvisor.adviseProfileUpdate();
+    if (newId == null || !newId.equals(oldId) && 
!newId.equals(initializingId)) {
       // If we have not yet prepared the old id, prepare it now.
 
 
-      // This will only be the case if we crashed
-      // while initializing previously. In the case, we are essentially
-      // finishing what we started by preparing that ID first. This
-      // will remove that ID from the peers.
+      // This will only be the case if we crashed while initializing 
previously. In the case, we are
+      // essentially finishing what we started by preparing that ID first. 
This will remove that ID
+      // from the peers.
       if (initializingId != null) {
         if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
           logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
               "{}-{}: We still have an initializing id: {}. Telling peers to 
remove the old id {} and transitioning this initializing id to old id. 
recipients {}",
               shortDiskStoreId(), regionPath, initializingId, oldId, 
profileUpdateRecipients);
         }
-        // TODO prpersist - clean this up
-        long viewVersion = advisor.startOperation();
+        long viewVersion = cacheDistributionAdvisor.startOperation();
         try {
           PrepareNewPersistentMemberMessage.send(profileUpdateRecipients, dm, 
regionPath, oldId,
               initializingId);
         } finally {
           if (viewVersion != -1) {
-            advisor.endOperation(viewVersion);
+            cacheDistributionAdvisor.endOperation(viewVersion);
           }
         }
         oldId = initializingId;
@@ -435,10 +410,10 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       if (logger.isDebugEnabled()) {
         logger.debug("Persisting my new persistent ID {}", newId);
       }
-      storage.setInitializing(newId);
+      persistentMemberView.setInitializing(newId);
     }
 
-    profileUpdateRecipients = advisor.adviseProfileUpdate();
+    profileUpdateRecipients = cacheDistributionAdvisor.adviseProfileUpdate();
     if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
       logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
           "{}-{}: Sending the new ID to peers. They should remove the old id 
{}. Recipients: {}",
@@ -449,43 +424,94 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
     }
   }
 
+  @Override
   public PersistentMemberID generatePersistentID() {
-    return storage.generatePersistentID();
+    return persistentMemberView.generatePersistentID();
   }
 
+  @Override
   public PersistentMembershipView getMembershipView() {
     if (!initialized) {
       return null;
     }
     Set<PersistentMemberID> offlineMembers = getPersistedMembers();
     Map<InternalDistributedMember, PersistentMemberID> onlineMembers =
-        advisor.adviseInitializedPersistentMembers();
+        cacheDistributionAdvisor.adviseInitializedPersistentMembers();
     offlineMembers.removeAll(onlineMembers.values());
 
     PersistentMemberID myId = getPersistentID();
     if (myId != null) {
-      
onlineMembers.put(advisor.getDistributionManager().getDistributionManagerId(), 
myId);
+      onlineMembers
+          
.put(cacheDistributionAdvisor.getDistributionManager().getDistributionManagerId(),
 myId);
     }
 
-    PersistentMembershipView view = new 
PersistentMembershipView(offlineMembers, onlineMembers,
-        memberManager.getRevokedMembers());
-    return view;
+    return new PersistentMembershipView(offlineMembers, onlineMembers,
+        persistentMemberManager.getRevokedMembers());
   }
 
+  @Override
   public Set<PersistentMemberID> getPersistedMembers() {
-    Set<PersistentMemberID> offlineMembers = storage.getOfflineMembers();
-    Set<PersistentMemberID> equalMembers = storage.getOfflineAndEqualMembers();
-    Set<PersistentMemberID> onlineMembers = storage.getOnlineMembers();
-    Set<PersistentMemberID> persistentMembers = new 
HashSet<PersistentMemberID>();
-    persistentMembers.addAll(offlineMembers);
-    persistentMembers.addAll(equalMembers);
-    persistentMembers.addAll(onlineMembers);
+    Set<PersistentMemberID> persistentMembers = new HashSet<>();
+    persistentMembers.addAll(persistentMemberView.getOfflineMembers());
+    persistentMembers.addAll(persistentMemberView.getOfflineAndEqualMembers());
+    persistentMembers.addAll(persistentMemberView.getOnlineMembers());
     return persistentMembers;
   }
 
+  @Override
+  public boolean checkMyStateOnMembers(Set<InternalDistributedMember> 
replicates)
+      throws ReplyException {
+    PersistentStateQueryResults remoteStates = getMyStateOnMembers(replicates);
+
+    persistenceAdvisorObserver.observe(regionPath);
+
+    boolean equal = false;
+    for (Map.Entry<InternalDistributedMember, PersistentMemberState> entry : 
remoteStates.stateOnPeers
+        .entrySet()) {
+      InternalDistributedMember member = entry.getKey();
+      PersistentMemberID remoteId = remoteStates.persistentIds.get(member);
+
+      final PersistentMemberID myId = getPersistentID();
+      PersistentMemberState stateOnPeer = entry.getValue();
+
+      if (PersistentMemberState.REVOKED.equals(stateOnPeer)) {
+        throw new RevokedPersistentDataException(
+            
LocalizedStrings.PersistentMemberManager_Member_0_is_already_revoked
+                .toLocalizedString(myId));
+      }
+
+      if (myId != null && stateOnPeer == null) {
+        String message = 
LocalizedStrings.CreatePersistentRegionProcessor_SPLIT_DISTRIBUTED_SYSTEM
+            .toLocalizedString(regionPath, member, remoteId, myId);
+        throw new ConflictingPersistentDataException(message);
+      }
+
+      if (myId != null && stateOnPeer == PersistentMemberState.EQUAL) {
+        equal = true;
+      }
+
+      // The other member changes its ID when it comes back online.
+      if (remoteId != null) {
+        PersistentMemberState remoteState = 
getPersistedStateOfMember(remoteId);
+        if (remoteState == PersistentMemberState.OFFLINE) {
+          String message =
+              
LocalizedStrings.CreatePersistentRegionProcessor_INITIALIZING_FROM_OLD_DATA
+                  .toLocalizedString(regionPath, member, remoteId, myId);
+          throw new ConflictingPersistentDataException(message);
+        }
+      }
+    }
+    return equal;
+  }
+
+  public static void setPersistenceAdvisorObserver(PersistenceAdvisorObserver 
o) {
+    persistenceAdvisorObserver = o == null ? 
DEFAULT_PERSISTENCE_ADVISOR_OBSERVER : o;
+  }
+
+  @Override
   public PersistentMemberID getPersistentIDIfOnline() {
     if (online) {
-      return storage.getMyPersistentID();
+      return persistentMemberView.getMyPersistentID();
     } else {
       return null;
     }
@@ -499,23 +525,22 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
           distributedMember, persistentID);
     }
     synchronized (lock) {
-      boolean foundMember = false;
-      foundMember |= recoveredMembers.remove(persistentID);
+      boolean foundMember = recoveredMembers.remove(persistentID);
       foundMember |= equalMembers.remove(persistentID);
       foundMember |= getPersistedMembers().contains(persistentID);
-      // Don't persist members as offline until we are online. Otherwise, we 
may
-      // think we have later data than them during recovery.
+      // Don't persist members as offline until we are online. Otherwise, we 
may think we have later
+      // data than them during recovery.
       if (shouldUpdatePersistentView && online) {
         try {
           // Don't persistent members as offline if we have already persisted 
them as equal.
-          if (storage.getOfflineAndEqualMembers().contains(persistentID)) {
+          if 
(persistentMemberView.getOfflineAndEqualMembers().contains(persistentID)) {
             return;
           }
-          // Don't mark the member as offline if we have never seen it. If we 
haven't seen it
-          // that means it's not done initializing yet.
+          // Don't mark the member as offline if we have never seen it. If we 
haven't seen it that
+          // means it's not done initializing yet.
           if (foundMember) {
             if 
(PersistenceObserverHolder.getInstance().memberOffline(regionPath, 
persistentID)) {
-              storage.memberOffline(persistentID);
+              persistentMemberView.memberOffline(persistentID);
             }
             
PersistenceObserverHolder.getInstance().afterPersistedOffline(regionPath, 
persistentID);
           }
@@ -541,7 +566,7 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
         recoveredMembers.remove(persistentID);
         try {
           if (PersistenceObserverHolder.getInstance().memberOnline(regionPath, 
persistentID)) {
-            storage.memberOnline(persistentID);
+            persistentMemberView.memberOnline(persistentID);
           }
           
PersistenceObserverHolder.getInstance().afterPersistedOnline(regionPath, 
persistentID);
         } catch (DiskAccessException e) {
@@ -561,22 +586,21 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
   }
 
   private void memberRevoked(PersistentMemberPattern pattern) {
-    // Persist the revoked member, so if we recover later we will
-    // remember that they were revoked.
-    storage.memberRevoked(pattern);
+    // Persist the revoked member, so if we recover later we will remember 
that they were revoked.
+    persistentMemberView.memberRevoked(pattern);
 
     // Remove the revoked member from our view.
-    for (PersistentMemberID id : storage.getOfflineMembers()) {
+    for (PersistentMemberID id : persistentMemberView.getOfflineMembers()) {
       if (pattern.matches(id)) {
         memberRemoved(id, true);
       }
     }
-    for (PersistentMemberID id : storage.getOnlineMembers()) {
+    for (PersistentMemberID id : persistentMemberView.getOnlineMembers()) {
       if (pattern.matches(id)) {
         memberRemoved(id, true);
       }
     }
-    for (PersistentMemberID id : storage.getOfflineAndEqualMembers()) {
+    for (PersistentMemberID id : 
persistentMemberView.getOfflineAndEqualMembers()) {
       if (pattern.matches(id)) {
         memberRemoved(id, true);
       }
@@ -597,14 +621,13 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       }
       try {
         if (PersistenceObserverHolder.getInstance().memberRemoved(regionPath, 
id)) {
-          storage.memberRemoved(id);
+          persistentMemberView.memberRemoved(id);
         }
 
-        // Purge any IDs that are old versions of the the id that
-        // we just removed
+        // Purge any IDs that are old versions of the the id that we just 
removed
         for (PersistentMemberID persistedId : getPersistedMembers()) {
           if (persistedId.isOlderOrEqualVersionOf(id)) {
-            storage.memberRemoved(persistedId);
+            persistentMemberView.memberRemoved(persistedId);
           }
         }
         
PersistenceObserverHolder.getInstance().afterRemovePersisted(regionPath, id);
@@ -616,61 +639,63 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
     }
   }
 
+  @Override
   public PersistentMemberID getPersistentID() {
-    return storage.getMyPersistentID();
+    return persistentMemberView.getMyPersistentID();
   }
 
+  @Override
   public PersistentMemberID getInitializingID() {
-    return storage.getMyInitializingID();
+    return persistentMemberView.getMyInitializingID();
   }
 
+  @Override
   public void addListener(PersistentStateListener listener) {
     synchronized (this) {
-      HashSet<PersistentStateListener> tmpListeners =
-          new HashSet<PersistentStateListener>(listeners);
+      Set<PersistentStateListener> tmpListeners = new 
HashSet<>(persistentStateListeners);
       tmpListeners.add(listener);
-      listeners = Collections.unmodifiableSet(tmpListeners);
+      persistentStateListeners = Collections.unmodifiableSet(tmpListeners);
     }
 
   }
 
+  @Override
   public void removeListener(PersistentStateListener listener) {
     synchronized (this) {
-      HashSet<PersistentStateListener> tmpListeners =
-          new HashSet<PersistentStateListener>(listeners);
+      Set<PersistentStateListener> tmpListeners = new 
HashSet<>(persistentStateListeners);
       tmpListeners.remove(listener);
-      listeners = Collections.unmodifiableSet(tmpListeners);
+      persistentStateListeners = Collections.unmodifiableSet(tmpListeners);
     }
   }
 
   private void notifyListenersMemberOnline(InternalDistributedMember member,
       PersistentMemberID persistentID) {
-    for (PersistentStateListener listener : listeners) {
+    for (PersistentStateListener listener : persistentStateListeners) {
       listener.memberOnline(member, persistentID);
     }
   }
 
   private void notifyListenersMemberOffline(InternalDistributedMember member,
       PersistentMemberID persistentID) {
-    for (PersistentStateListener listener : listeners) {
+    for (PersistentStateListener listener : persistentStateListeners) {
       listener.memberOffline(member, persistentID);
     }
   }
 
   private void notifyListenersMemberRemoved(PersistentMemberID persistentID, 
boolean revoked) {
-    for (PersistentStateListener listener : listeners) {
+    for (PersistentStateListener listener : persistentStateListeners) {
       listener.memberRemoved(persistentID, revoked);
     }
-
   }
 
+  @Override
   public HashSet<PersistentMemberID> getPersistedOnlineOrEqualMembers() {
-    HashSet<PersistentMemberID> members =
-        new HashSet<PersistentMemberID>(storage.getOnlineMembers());
+    HashSet<PersistentMemberID> members = new 
HashSet<>(persistentMemberView.getOnlineMembers());
     members.addAll(equalMembers);
     return members;
   }
 
+  @Override
   public void prepareNewMember(InternalDistributedMember sender, 
PersistentMemberID oldId,
       PersistentMemberID newId) {
     if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
@@ -679,9 +704,9 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
           newId, oldId);
     }
     synchronized (lock) {
-      // Don't prepare the ID if the advisor doesn't have a profile. This 
prevents
-      // A race with the advisor remove
-      if (!advisor.containsId(sender)) {
+      // Don't prepare the ID if the advisor doesn't have a profile. This 
prevents a race with the
+      // advisor remove
+      if (!cacheDistributionAdvisor.containsId(sender)) {
         if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
           logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
               "{}-{}: Refusing to prepare id because {} is not in our 
advisor", shortDiskStoreId(),
@@ -689,13 +714,11 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
         }
         return;
       }
-      // Persist new members even if we are not online yet
-      // Two members can become online at once. This way,
-      // they will know about each other.
-      storage.memberOnline(newId);
+      // Persist new members even if we are not online yet. Two members can 
become online at once.
+      // This way, they will know about each other.
+      persistentMemberView.memberOnline(newId);
 
-      // The oldId and newId could be the same if the member
-      // is retrying a GII. See bug #42051
+      // The oldId and newId could be the same if the member is retrying a 
GII. See bug #42051
       if (oldId != null && !oldId.equals(newId)) {
         if (initialized) {
           memberRemoved(oldId, false);
@@ -709,79 +732,46 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
     return diskStoreID == null ? "mem" : diskStoreID.abbrev();
   }
 
+  @Override
   public void removeMember(PersistentMemberID id) {
     memberRemoved(id, false);
   }
 
+  @Override
   public void markMemberOffline(InternalDistributedMember member, 
PersistentMemberID id) {
     memberOffline(member, id);
   }
 
+  @Override
+  public CacheDistributionAdvisor getCacheDistributionAdvisor() {
+    return cacheDistributionAdvisor;
+  }
+
+  @Override
   public void setWaitingOnMembers(Set<PersistentMemberID> allMembersToWaitFor,
       Set<PersistentMemberID> offlineMembersToWaitFor) {
-    this.allMembersWaitingFor = allMembersToWaitFor;
-    this.offlineMembersWaitingFor = offlineMembersToWaitFor;
+    allMembersWaitingFor = allMembersToWaitFor;
+    offlineMembersWaitingFor = offlineMembersToWaitFor;
   }
 
-  public boolean checkMyStateOnMembers(Set<InternalDistributedMember> 
replicates)
-      throws ReplyException {
-    PersistentStateQueryResults remoteStates = getMyStateOnMembers(replicates);
-    boolean equal = false;
-
-    if (observer != null) {
-      observer.observe(regionPath);
-    }
-
-    for (Map.Entry<InternalDistributedMember, PersistentMemberState> entry : 
remoteStates.stateOnPeers
-        .entrySet()) {
-      InternalDistributedMember member = entry.getKey();
-      PersistentMemberID remoteId = remoteStates.persistentIds.get(member);
-
-      final PersistentMemberID myId = getPersistentID();
-      PersistentMemberState stateOnPeer = entry.getValue();
-
-      if (PersistentMemberState.REVOKED.equals(stateOnPeer)) {
-        throw new RevokedPersistentDataException(
-            
LocalizedStrings.PersistentMemberManager_Member_0_is_already_revoked
-                .toLocalizedString(myId));
-      }
-
-      if (myId != null && stateOnPeer == null) {
-        String message = 
LocalizedStrings.CreatePersistentRegionProcessor_SPLIT_DISTRIBUTED_SYSTEM
-            .toLocalizedString(regionPath, member, remoteId, myId);
-        throw new ConflictingPersistentDataException(message);
-      }
-
-      if (myId != null && stateOnPeer == PersistentMemberState.EQUAL) {
-        equal = true;
-      }
-
-      // TODO prpersist - This check might not help much. The other member 
changes it's ID when it
-      // comes back online.
-      if (remoteId != null) {
-        PersistentMemberState remoteState = 
getPersistedStateOfMember(remoteId);
-        if (remoteState == PersistentMemberState.OFFLINE) {
-          String message =
-              
LocalizedStrings.CreatePersistentRegionProcessor_INITIALIZING_FROM_OLD_DATA
-                  .toLocalizedString(regionPath, member, remoteId, myId);
-          throw new ConflictingPersistentDataException(message);
-        }
-      }
-    }
-    return equal;
+  @Override
+  public boolean isClosed() {
+    return isClosed;
   }
 
+
   public void finishPendingDestroy() {
     // send a message to peers indicating that they should remove this profile
-    long viewVersion = advisor.startOperation();
+    long viewVersion = cacheDistributionAdvisor.startOperation();
     try {
-      RemovePersistentMemberMessage.send(advisor.adviseProfileUpdate(),
-          advisor.getDistributionManager(), regionPath, getPersistentID(), 
getInitializingID());
+      
RemovePersistentMemberMessage.send(cacheDistributionAdvisor.adviseProfileUpdate(),
+          cacheDistributionAdvisor.getDistributionManager(), regionPath, 
getPersistentID(),
+          getInitializingID());
 
-      storage.finishPendingDestroy();
+      persistentMemberView.finishPendingDestroy();
     } finally {
       if (viewVersion != -1) {
-        advisor.endOperation(viewVersion);
+        cacheDistributionAdvisor.endOperation(viewVersion);
       }
     }
     synchronized (lock) {
@@ -798,155 +788,12 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
    * @throws ConflictingPersistentDataException if there are active members 
which are not based on
    *         the state that is persisted in this member.
    */
-  public CacheDistributionAdvisor.InitialImageAdvice getInitialImageAdvice(
-      CacheDistributionAdvisor.InitialImageAdvice previousAdvice, boolean 
recoverFromDisk) {
-    final boolean isPersistAdvisorDebugEnabled =
-        logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE);
-
-    MembershipChangeListener listener = new MembershipChangeListener();
-    advisor.addMembershipAndProxyListener(listener);
-    addListener(listener);
-    try {
-      while (true) {
-        Set<PersistentMemberID> previouslyOnlineMembers = 
getPersistedOnlineOrEqualMembers();
-
-        advisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null);
-        try {
-          InitialImageAdvice advice = 
advisor.adviseInitialImage(previousAdvice, true);
-
-          if (!advice.getReplicates().isEmpty()) {
-            if (isPersistAdvisorDebugEnabled) {
-              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
-                  "{}-{}: There are members currently online. Checking for our 
state on those members and then initializing",
-                  shortDiskStoreId(), regionPath);
-            }
-            // We will go ahead and take the other members contents if we 
ourselves didn't recover
-            // from disk.
-            if (recoverFromDisk) {
-              // Check with these members to make sure that they
-              // have heard of us
-              // If any of them say we have the same data on disk, we don't 
need to do a GII
-              if (checkMyStateOnMembers(advice.getReplicates())) {
-                if (isPersistAdvisorDebugEnabled) {
-                  logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
-                      "{}-{}: We have the same data on disk as one of {} 
recovering gracefully",
-                      shortDiskStoreId(), regionPath, advice.getReplicates());
-                }
-                advice.getReplicates().clear();
-              } else {
-                // If we have to do a GII, we have not equal members anymore.
-                synchronized (lock) {
-                  equalMembers.clear();
-                }
-              }
-            }
-            return advice;
-          } else if (!advice.getNonPersistent().isEmpty()) {
-            // We support a persistent member getting a membership view
-            // from a non persistent member and using that information to wait
-            // for the other known persistent members. See
-            // 
PersistentRecoveryOrderDUnitTest.testTransmitCrashedMembersWithNonPeristentRegion
-            updateViewFromNonPersistent(recoverFromDisk, advice);
-            previouslyOnlineMembers = getPersistedOnlineOrEqualMembers();
-          }
-
-          // Fix for 51698 - If there are online members that we previously
-          // failed to get a GII from, retry those members rather than wait
-          // for new persistent members to recover.
-          if (previousAdvice != null && 
!previousAdvice.getReplicates().isEmpty()) {
-            logger.info(
-                
LocalizedMessage.create(LocalizedStrings.PersistenceAdvisorImpl_RETRYING_GII));
-            previousAdvice = null;
-            continue;
-          }
-
-          // If there are no currently online members, and no
-          // previously online members, this member should just go with what's
-          // on it's own disk
-          if (previouslyOnlineMembers.isEmpty()) {
-            if (isPersistAdvisorDebugEnabled) {
-              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
-                  "{}-{}: No previously online members. Recovering with the 
data from the local disk",
-                  shortDiskStoreId(), regionPath);
-            }
-            return advice;
-          }
-
-
-          Set<PersistentMemberID> offlineMembers = new 
HashSet<PersistentMemberID>();
-          Set<PersistentMemberID> membersToWaitFor =
-              getMembersToWaitFor(previouslyOnlineMembers, offlineMembers);
-
-          if (membersToWaitFor.isEmpty()) {
-            if (isPersistAdvisorDebugEnabled) {
-              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
-                  "{}-{}: All of the previously online members are now online 
and waiting for us. Acquiring tie lock. Previously online members {}",
-                  shortDiskStoreId(), regionPath, advice.getReplicates());
-            }
-            // We're tied for the latest copy of the data. try to get the 
distributed lock.
-            if (acquireTieLock()) {
-              advice = advisor.adviseInitialImage(previousAdvice, true);
-              if (isPersistAdvisorDebugEnabled) {
-                logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
-                    "{}-{}: Acquired the lock. This member will initialize", 
shortDiskStoreId(),
-                    regionPath);
-              }
-              if (!advice.getReplicates().isEmpty()) {
-                if (isPersistAdvisorDebugEnabled) {
-                  logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
-                      "{}-{}: Another member has initialized while we were 
getting the lock. We will initialize from that member",
-                      shortDiskStoreId(), regionPath);
-                }
-                checkMyStateOnMembers(advice.getReplicates());
-              }
-              return advice;
-            } else {
-              if (isPersistAdvisorDebugEnabled) {
-                logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
-                    "{}-{}: Failed to acquire the lock.", shortDiskStoreId(), 
regionPath);
-              }
-            }
-          } else {
-            if (isPersistAdvisorDebugEnabled) {
-              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
-                  "{}-{}: Going to wait for these member ids: {}", 
shortDiskStoreId(), regionPath,
-                  membersToWaitFor);
-            }
-          }
-
-          beginWaitingForMembershipChange(membersToWaitFor);
-          try {
-            // The persistence advisor needs to know which members are really 
not available
-            // because the user uses this information to decide which members 
they
-            // haven't started yet. membersToWaitFor includes members that
-            // are still waiting to start up, but are waiting for members 
other than
-            // the current member. So we pass the set of offline members here
-            listener.waitForChange(membersToWaitFor, offlineMembers);
-          } finally {
-            endWaitingForMembershipChange();
-          }
-        } catch (InterruptedException e) {
-          logger.debug("Interrupted while trying to determine latest persisted 
copy: {}",
-              e.getMessage(), e);
-        }
-      }
-    } finally {
-      advisor.removeMembershipAndProxyListener(listener);
-      removeListener(listener);
-    }
-  }
-
-  public void updateViewFromNonPersistent(boolean recoverFromDisk, 
InitialImageAdvice advice) {
-    for (InternalDistributedMember replicate : advice.getNonPersistent()) {
-      try {
-        updateMembershipView(replicate, recoverFromDisk);
-        return;
-      } catch (ReplyException e) {
-        if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
-          logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "Failed to update 
membership view", e);
-        }
-      }
-    }
+  @Override
+  public InitialImageAdvice getInitialImageAdvice(InitialImageAdvice 
previousAdvice,
+      boolean hasDiskImageToRecoverFrom) {
+    PersistenceInitialImageAdvisor piia = new 
PersistenceInitialImageAdvisor(this,
+        shortDiskStoreId(), regionPath, cacheDistributionAdvisor, 
hasDiskImageToRecoverFrom);
+    return piia.getAdvice(previousAdvice);
   }
 
   /**
@@ -956,33 +803,33 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
    *        need to wait for - this member may end up waiting on member that 
is actually running.
    * @return the list of members that this member needs to wait for before it 
can initialize.
    */
+  @Override
   public Set<PersistentMemberID> getMembersToWaitFor(
       Set<PersistentMemberID> previouslyOnlineMembers, Set<PersistentMemberID> 
offlineMembers)
-      throws ReplyException, InterruptedException {
+      throws ReplyException {
     PersistentMemberID myPersistentID = getPersistentID();
     PersistentMemberID myInitializingId = getInitializingID();
 
     // This is the set of members that are currently waiting for this member
     // to come online.
-    Set<PersistentMemberID> membersToWaitFor =
-        new HashSet<PersistentMemberID>(previouslyOnlineMembers);
+    Set<PersistentMemberID> membersToWaitFor = new 
HashSet<>(previouslyOnlineMembers);
     offlineMembers.addAll(previouslyOnlineMembers);
 
     // If our persistent ID is null, we need to wait for all of the previously 
online members.
     if (myPersistentID != null || myInitializingId != null) {
-      Set<InternalDistributedMember> members = advisor.adviseProfileUpdate();
-      Set<InternalDistributedMember> membersHostingThisRegion = 
advisor.adviseGeneric();
+      Set<InternalDistributedMember> members = 
cacheDistributionAdvisor.adviseProfileUpdate();
+      Set<InternalDistributedMember> membersHostingThisRegion =
+          cacheDistributionAdvisor.adviseGeneric();
 
       // Fetch the persistent view from all of our peers.
       PersistentStateQueryResults results = 
PersistentStateQueryMessage.send(members,
-          advisor.getDistributionManager(), regionPath, myPersistentID, 
myInitializingId);
-
-      // iterate through all of the peers. For each peer:
-      // if the guy was previously online according to us, grab it's online
-      // members and add them to the members to wait for set.
-      // We may need to do this several times until we discover all of the
-      // members that may have newer data than
-      // us,
+          cacheDistributionAdvisor.getDistributionManager(), regionPath, 
myPersistentID,
+          myInitializingId);
+
+      // iterate through all of the peers. For each peer: if the guy was 
previously online according
+      // to us, grab its online members and add them to the members to wait 
for set. We may need to
+      // do this several times until we discover all of the members that may 
have newer data than
+      // us.
       boolean addedMembers = true;
       while (addedMembers) {
         addedMembers = false;
@@ -997,11 +844,11 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
             for (PersistentMemberID peerOnlineMember : peersOnlineMembers) {
               if (!isRevoked(peerOnlineMember)
                   && !peerOnlineMember.diskStoreId.equals(getDiskStoreID())
-                  && !storage.getOfflineMembers().contains(peerOnlineMember)) {
+                  && 
!persistentMemberView.getOfflineMembers().contains(peerOnlineMember)) {
                 if (membersToWaitFor.add(peerOnlineMember)) {
                   addedMembers = true;
                   // Make sure we also persist that this member is online.
-                  storage.memberOnline(peerOnlineMember);
+                  persistentMemberView.memberOnline(peerOnlineMember);
                   if 
(logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
                     logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
                         "{}-{}: Adding {} to the list of members we're wait 
for, because {} has newer or equal data than is and is waiting for that member",
@@ -1034,8 +881,7 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
                   .toLocalizedString(myPersistentID));
         }
 
-        // If the peer thinks we are newer or equal to them, we don't
-        // need to wait for this peer.
+        // If the peer thinks we are newer or equal to them, we don't need to 
wait for this peer.
         if (membersHostingThisRegion.contains(memberId) && persistentID != 
null && state != null
             && myInitializingId == null && 
(state.equals(PersistentMemberState.ONLINE)
                 || state.equals(PersistentMemberState.EQUAL))) {
@@ -1052,8 +898,7 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
           removeNewerPersistentID(offlineMembers, persistentID);
         }
 
-        // If the peer thinks we are newer or equal to them, we don't
-        // need to wait for this peer.
+        // If the peer thinks we are newer or equal to them, we don't need to 
wait for this peer.
         if (membersHostingThisRegion.contains(memberId) && initializingID != 
null && state != null
             && (state.equals(PersistentMemberState.ONLINE)
                 || state.equals(PersistentMemberState.EQUAL))) {
@@ -1070,9 +915,8 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
           removeNewerPersistentID(offlineMembers, initializingID);
         }
 
-        // If we were able to determine what disk store this member
-        // is in, and it doesn't have a persistent ID, but we think
-        // we should be waiting for it, stop waiting for it.
+        // If we were able to determine what disk store this member is in, and 
it doesn't have a
+        // persistent ID, but we think we should be waiting for it, stop 
waiting for it.
         if (initializingID == null && persistentID == null & diskStoreID != 
null) {
           removeByDiskStoreID(membersToWaitFor, diskStoreID);
           removeByDiskStoreID(offlineMembers, diskStoreID);
@@ -1123,14 +967,7 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
     }
   }
 
-  protected void beginWaitingForMembershipChange(Set<PersistentMemberID> 
membersToWaitFor) {
-    // do nothing
-  }
-
-  protected void endWaitingForMembershipChange() {
-    // do nothing
-  }
-
+  @Override
   public boolean wasHosting() {
     return getPersistentID() != null || getInitializingID() != null;
   }
@@ -1155,125 +992,125 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
     return allMembersWaitingFor;
   }
 
-  protected void logWaitingForMember(Set<PersistentMemberID> 
allMembersToWaitFor,
-      Set<PersistentMemberID> offlineMembersToWaitFor) {
-    Set<String> membersToWaitForLogEntries = new HashSet<String>();
+  @Override
+  public void logWaitingForMembers() {
+    Set<String> membersToWaitForLogEntries = new HashSet<>();
 
-    if (offlineMembersToWaitFor != null && !offlineMembersToWaitFor.isEmpty()) 
{
-      TransformUtils.transform(offlineMembersToWaitFor, 
membersToWaitForLogEntries,
+    if (offlineMembersWaitingFor != null && 
!offlineMembersWaitingFor.isEmpty()) {
+      TransformUtils.transform(offlineMembersWaitingFor, 
membersToWaitForLogEntries,
           TransformUtils.persistentMemberIdToLogEntryTransformer);
 
       StartupStatus.startup(
-          
LocalizedStrings.CreatePersistentRegionProcessor_WAITING_FOR_LATEST_MEMBER,
-          new Object[] {regionPath,
-              
TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()),
-              membersToWaitForLogEntries});
+          
LocalizedStrings.CreatePersistentRegionProcessor_WAITING_FOR_LATEST_MEMBER, 
regionPath,
+          
TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()),
+          membersToWaitForLogEntries);
     } else {
-      TransformUtils.transform(allMembersToWaitFor, membersToWaitForLogEntries,
+      TransformUtils.transform(allMembersWaitingFor, 
membersToWaitForLogEntries,
           TransformUtils.persistentMemberIdToLogEntryTransformer);
 
       StartupStatus.startup(
           
LocalizedStrings.CreatePersistentRegionProcessor_WAITING_FOR_ONLINE_LATEST_MEMBER,
-          new Object[] {regionPath,
-              
TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()),
-              membersToWaitForLogEntries});
+          regionPath,
+          
TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()),
+          membersToWaitForLogEntries);
     }
   }
 
-  protected void checkInterruptedByShutdownAll() {}
-
-  protected class MembershipChangeListener implements MembershipListener, 
PersistentStateListener {
-
-    private boolean warned = false;
-    private final long warningTime;
-
-    public MembershipChangeListener() {
-      long waitThreshold = 
advisor.getDistributionManager().getConfig().getAckWaitThreshold();
-      warningTime = System.nanoTime() + 
TimeUnit.SECONDS.toNanos(waitThreshold);
+  @Override
+  public void clearEqualMembers() {
+    synchronized (lock) {
+      equalMembers.clear();
     }
+  }
 
-    private boolean membershipChanged = false;
-
-    public void waitForChange(Set<PersistentMemberID> allMembersToWaitFor,
-        Set<PersistentMemberID> offlineMembersToWaitFor) throws 
InterruptedException {
-      synchronized (this) {
-        try {
-          setWaitingOnMembers(allMembersToWaitFor, offlineMembersToWaitFor);
-          long exitTime = System.nanoTime() + 
TimeUnit.SECONDS.toNanos(PERSISTENT_VIEW_RETRY);
-          while (!membershipChanged && !isClosed) {
-            checkInterruptedByShutdownAll();
-            
advisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null);
-            this.wait(100);
-            long time = System.nanoTime();
-
-            // Fix for #50415 go out and message other members to see if there
-            // status has changed. This handles any case where we might have
-            // missed a notification due to concurrent startup.
-            if (time > exitTime) {
-              break;
-            }
-
-            if (!warned && time > warningTime) {
-
-              logWaitingForMember(allMembersToWaitFor, 
offlineMembersToWaitFor);
+  @Override
+  public void checkInterruptedByShutdownAll() {}
 
-              warned = true;
-            }
-          }
-          this.membershipChanged = false;
-        } finally {
-          setWaitingOnMembers(null, null);
-        }
-      }
-    }
+  @Override
+  public void close() {
+    isClosed = true;
+    persistentMemberManager.removeRevocationListener(profileChangeListener);
+    
cacheDistributionAdvisor.removeProfileChangeListener(profileChangeListener);
+    releaseTieLock();
+  }
 
-    public void memberJoined(DistributionManager distributionManager,
-        InternalDistributedMember id) {
-      afterMembershipChange();
-    }
 
-    private void afterMembershipChange() {
-      synchronized (this) {
-        this.membershipChanged = true;
-        this.notifyAll();
+  /**
+   * Try to acquire the distributed lock which members must grab for in the 
case of a tie. Whoever
+   * gets the lock initializes first.
+   */
+  @Override
+  public boolean acquireTieLock() {
+    // We're tied for the latest copy of the data. try to get the distributed 
lock.
+    holdingTieLock = distributedLockService.lock("PERSISTENCE_" + regionPath, 
0, -1);
+    if (!holdingTieLock) {
+      if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
+        logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Failed to 
acquire the lock.",
+            shortDiskStoreId(), regionPath);
       }
     }
+    return holdingTieLock;
+  }
 
-    public void memberDeparted(DistributionManager distributionManager,
-        InternalDistributedMember id, boolean crashed) {
-      afterMembershipChange();
+  @Override
+  public void releaseTieLock() {
+    if (holdingTieLock) {
+      distributedLockService.unlock("PERSISTENCE_" + regionPath);
+      holdingTieLock = false;
     }
+  }
 
-    public void memberSuspect(DistributionManager distributionManager, 
InternalDistributedMember id,
-        InternalDistributedMember whoSuspected, String reason) {}
+  private boolean wasAboutToDestroy() {
+    return persistentMemberView.wasAboutToDestroy()
+        || persistentMemberView.wasAboutToDestroyDataStorage();
+  }
 
-    @Override
-    public void quorumLost(DistributionManager distributionManager,
-        Set<InternalDistributedMember> failures, 
List<InternalDistributedMember> remaining) {}
+  protected synchronized void resetState() {
+    online = false;
+    removedMembers.clear();
+  }
 
-    public void memberOffline(InternalDistributedMember member, 
PersistentMemberID persistentID) {
-      afterMembershipChange();
+  public void flushMembershipChanges() {
+    try {
+      cacheDistributionAdvisor.waitForCurrentOperations();
+    } catch (RegionDestroyedException ignored) {
     }
+  }
 
-    public void memberOnline(InternalDistributedMember member, 
PersistentMemberID persistentID) {
-      afterMembershipChange();
+  @Override
+  public void persistMembersOfflineAndEqual(
+      Map<InternalDistributedMember, PersistentMemberID> map) {
+    for (PersistentMemberID persistentID : map.values()) {
+      persistentMemberView.memberOfflineAndEqual(persistentID);
     }
+  }
 
-    public void memberRemoved(PersistentMemberID id, boolean revoked) {
-      afterMembershipChange();
-    }
+  @Override
+  public DiskStoreID getDiskStoreID() {
+    return persistentMemberView.getDiskStoreID();
+  }
+
+  @Override
+  public boolean isOnline() {
+    return online;
+  }
+
+  public interface PersistenceAdvisorObserver {
+    void observe(String regionPath);
   }
 
   private class ProfileChangeListener implements ProfileListener, 
MemberRevocationListener {
 
+    @Override
     public void profileCreated(Profile profile) {
       profileUpdated(profile);
     }
 
-    public void profileRemoved(Profile profile, boolean regionDestroyed) {
+    @Override
+    public void profileRemoved(Profile profile, boolean destroyed) {
       CacheProfile cp = (CacheProfile) profile;
       if (cp.persistentID != null) {
-        if (regionDestroyed) {
+        if (destroyed) {
           memberRemoved(cp.persistentID, false);
         } else {
           memberOffline(profile.getDistributedMember(), cp.persistentID);
@@ -1281,6 +1118,7 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       }
     }
 
+    @Override
     public void profileUpdated(Profile profile) {
       CacheProfile cp = (CacheProfile) profile;
       if (cp.persistentID != null && cp.persistenceInitialized) {
@@ -1288,14 +1126,17 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       }
     }
 
+    @Override
     public void revoked(PersistentMemberPattern pattern) {
       memberRevoked(pattern);
     }
 
+    @Override
     public Set<PersistentMemberID> getMissingMemberIds() {
       return getMissingMembers();
     }
 
+    @Override
     public String getRegionPath() {
       return getRegionPathForOfflineMembers();
     }
@@ -1317,52 +1158,4 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       }
     }
   }
-
-  public void close() {
-    isClosed = true;
-    memberManager.removeRevocationListener(listener);
-    advisor.removeProfileChangeListener(listener);
-    releaseTieLock();
-  }
-
-  private boolean wasAboutToDestroy() {
-    return storage.wasAboutToDestroy() || 
storage.wasAboutToDestroyDataStorage();
-  }
-
-  protected synchronized void resetState() {
-    this.online = false;
-    this.removedMembers = new HashSet<PersistentMemberID>();
-  }
-
-  public void flushMembershipChanges() {
-    try {
-      advisor.waitForCurrentOperations();
-    } catch (RegionDestroyedException e) {
-      // continue with the next region
-    }
-
-  }
-
-  public void persistMembersOfflineAndEqual(
-      Map<InternalDistributedMember, PersistentMemberID> map) {
-    for (PersistentMemberID persistentID : map.values()) {
-      storage.memberOfflineAndEqual(persistentID);
-    }
-  }
-
-  public DiskStoreID getDiskStoreID() {
-    return storage.getDiskStoreID();
-  }
-
-  public boolean isOnline() {
-    return online;
-  }
-
-  public interface PersistenceAdvisorObserver {
-    default void observe(String regionPath) {}
-  }
-
-  public static void setPersistenceAdvisorObserver(PersistenceAdvisorObserver 
o) {
-    observer = o;
-  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisor.java
new file mode 100644
index 0000000..71e969a
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisor.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.persistence;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.distributed.internal.ReplyException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor;
+import 
org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.geode.internal.logging.log4j.LogMarker;
+
+public class PersistenceInitialImageAdvisor {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final InternalPersistenceAdvisor persistenceAdvisor;
+  private final String shortDiskStoreID;
+  private final String regionPath;
+  private final CacheDistributionAdvisor cacheDistributionAdvisor;
+  private final boolean hasDiskImageToRecoverFrom;
+
+  public PersistenceInitialImageAdvisor(InternalPersistenceAdvisor 
persistenceAdvisor,
+      String shortDiskStoreID, String regionPath, CacheDistributionAdvisor 
cacheDistributionAdvisor,
+      boolean hasDiskImageToRecoverFrom) {
+    this.persistenceAdvisor = persistenceAdvisor;
+    this.shortDiskStoreID = shortDiskStoreID;
+    this.regionPath = regionPath;
+    this.cacheDistributionAdvisor = cacheDistributionAdvisor;
+    this.hasDiskImageToRecoverFrom = hasDiskImageToRecoverFrom;
+  }
+
+  public InitialImageAdvice getAdvice(InitialImageAdvice previousAdvice) {
+    final boolean isPersistAdvisorDebugEnabled =
+        logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE);
+
+    MembershipChangeListener listener = new 
MembershipChangeListener(persistenceAdvisor);
+    cacheDistributionAdvisor.addMembershipAndProxyListener(listener);
+    persistenceAdvisor.addListener(listener);
+    try {
+      while (true) {
+        
cacheDistributionAdvisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null);
+        try {
+          // On first pass, previous advice is null. On subsequent passes, 
it's the advice
+          // from the previous iteration.
+          InitialImageAdvice advice =
+              cacheDistributionAdvisor.adviseInitialImage(previousAdvice, 
true);
+
+          if (hasReplicates(advice)) {
+            if (hasDiskImageToRecoverFrom) {
+              
removeReplicatesIfWeAreEqualToAnyOrElseClearEqualMembers(advice.getReplicates());
+            }
+            return advice;
+          } else if (hasNonPersistentMember(advice)) {
+            updateMembershipViewFromAnyPeer(advice.getNonPersistent(), 
hasDiskImageToRecoverFrom);
+          }
+
+          // Fix for 51698 - If there are online members that we previously 
failed to get a GII
+          // from, retry those members rather than wait for new persistent 
members to recover.
+          if (hasReplicates(previousAdvice)) {
+            logger.info(
+                
LocalizedMessage.create(LocalizedStrings.PersistenceAdvisorImpl_RETRYING_GII));
+            previousAdvice = null;
+            continue;
+          }
+
+          Set<PersistentMemberID> previouslyOnlineMembers =
+              persistenceAdvisor.getPersistedOnlineOrEqualMembers();
+
+          // If there are no currently online members, and no previously 
online members, this member
+          // should just go with what's on its own disk
+          if (previouslyOnlineMembers.isEmpty()) {
+            if (isPersistAdvisorDebugEnabled()) {
+              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
+                  "{}-{}: No previously online members. Recovering with the 
data from the local disk",
+                  shortDiskStoreID, regionPath);
+            }
+            return advice;
+          }
+
+          Set<PersistentMemberID> offlineMembers = new HashSet<>();
+          Set<PersistentMemberID> membersToWaitFor =
+              persistenceAdvisor.getMembersToWaitFor(previouslyOnlineMembers, 
offlineMembers);
+
+          if (membersToWaitFor.isEmpty()) {
+            if (isPersistAdvisorDebugEnabled()) {
+              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
+                  "{}-{}: All of the previously online members are now online 
and waiting for us. Acquiring tie lock. Previously online members {}",
+                  shortDiskStoreID, regionPath, advice.getReplicates());
+            }
+            if (persistenceAdvisor.acquireTieLock()) {
+              return 
refreshInitialImageAdviceAndThenCheckMyStateWithReplicates(previousAdvice);
+            }
+          } else {
+            if (isPersistAdvisorDebugEnabled) {
+              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
+                  "{}-{}: Going to wait for these member ids: {}", 
shortDiskStoreID, regionPath,
+                  membersToWaitFor);
+            }
+          }
+
+          waitForMembershipChangeForMissingDiskStores(listener, 
offlineMembers, membersToWaitFor);
+        } catch (InterruptedException e) {
+          logger.debug("Interrupted while trying to determine latest persisted 
copy", e);
+        }
+      }
+    } finally {
+      persistenceAdvisor.setWaitingOnMembers(null, null);
+      cacheDistributionAdvisor.removeMembershipAndProxyListener(listener);
+      persistenceAdvisor.removeListener(listener);
+    }
+  }
+
+  private void updateMembershipViewFromAnyPeer(Set<InternalDistributedMember> 
peers,
+      boolean recoverFromDisk) {
+    for (InternalDistributedMember peer : peers) {
+      try {
+        persistenceAdvisor.updateMembershipView(peer, recoverFromDisk);
+        return;
+      } catch (ReplyException e) {
+        if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
+          logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "Failed to update 
membership view", e);
+        }
+      }
+    }
+  }
+
+  private InitialImageAdvice 
refreshInitialImageAdviceAndThenCheckMyStateWithReplicates(
+      InitialImageAdvice previousAdvice) {
+    if (isPersistAdvisorDebugEnabled()) {
+      logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
+          "{}-{}: Acquired the lock. This member will initialize", 
shortDiskStoreID, regionPath);
+    }
+    InitialImageAdvice advice = 
cacheDistributionAdvisor.adviseInitialImage(previousAdvice, true);
+    if (hasReplicates(advice)) {
+      if (isPersistAdvisorDebugEnabled()) {
+        logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
+            "{}-{}: Another member has initialized while we were getting the 
lock. We will initialize from that member",
+            shortDiskStoreID, regionPath);
+      }
+      persistenceAdvisor.checkMyStateOnMembers(advice.getReplicates());
+    }
+    return advice;
+  }
+
+  private boolean hasNonPersistentMember(InitialImageAdvice advice) {
+    return !advice.getNonPersistent().isEmpty();
+  }
+
+  /**
+   * if one or more replicates are equal to this member: remove replicates 
from advice, return
+   * advice for GII loop
+   */
+  private void removeReplicatesIfWeAreEqualToAnyOrElseClearEqualMembers(
+      Set<InternalDistributedMember> replicates) {
+    if (isPersistAdvisorDebugEnabled()) {
+      logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
+          "{}-{}: There are members currently online. Checking for our state 
on those members and then initializing",
+          shortDiskStoreID, regionPath);
+    }
+    // Check with these members to make sure that they have heard of us. If 
any of them
+    // say we have the same data on disk, we don't need to do a GII.
+    boolean weAreEqualToAReplicate = 
persistenceAdvisor.checkMyStateOnMembers(replicates);
+    if (weAreEqualToAReplicate) {
+      // prevent GII by removing all replicates
+      removeReplicates(replicates);
+    } else {
+      persistenceAdvisor.clearEqualMembers();
+    }
+    // Either a replicate has said we're equal and we've cleared replicates,
+    // or none of them said we're equal and we've cleared our equal members.
+    // We had replicates. Now one of these things is true:
+    // - We've cleared replicates (meaning we're equal to one, so can load 
from disk)
+    // - No replicates report we're equal (so must GII from one, which we 
indicate by clearing equal
+    // members).
+  }
+
+  private boolean hasReplicates(InitialImageAdvice advice) {
+    return advice != null && !advice.getReplicates().isEmpty();
+  }
+
+  private void removeReplicates(Set<InternalDistributedMember> replicates) {
+    if (isPersistAdvisorDebugEnabled()) {
+      logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
+          "{}-{}: We have the same data on disk as one of {} recovering 
gracefully",
+          shortDiskStoreID, regionPath, replicates);
+    }
+    replicates.clear();
+  }
+
+  private void 
waitForMembershipChangeForMissingDiskStores(MembershipChangeListener listener,
+      Set<PersistentMemberID> offlineMembers, Set<PersistentMemberID> 
membersToWaitFor)
+      throws InterruptedException {
+    persistenceAdvisor.beginWaitingForMembershipChange(membersToWaitFor);
+    try {
+      // The persistence advisor needs to know which members are really not 
available because the
+      // user uses this information to decide which members they haven't 
started yet.
+      // membersToWaitFor includes members that are still waiting to start up, 
but are waiting for
+      // members other than the current member. So we pass the set of offline 
members here.
+
+      persistenceAdvisor.setWaitingOnMembers(membersToWaitFor, offlineMembers);
+      listener.waitForChange();
+    } finally {
+      persistenceAdvisor.endWaitingForMembershipChange();
+    }
+  }
+
+  private boolean isPersistAdvisorDebugEnabled() {
+    return logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
 
b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
index a039628..91ede6c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.lang;
 
 import java.util.Optional;
 
+
 /**
  * The SystemPropertyHelper class is an helper class for accessing system 
properties used in geode.
  * The method name to get the system property should be the same as the system 
property name.
@@ -65,6 +66,9 @@ public class SystemPropertyHelper {
 
   public static final String THREAD_ID_EXPIRY_TIME_PROPERTY = 
"threadIdExpiryTime";
 
+  public static final String PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS =
+      "PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS";
+
   /**
    * This method will try to look up "geode." and "gemfire." versions of the 
system property. It
    * will check and prefer "geode." setting first, then try to check 
"gemfire." setting.
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisorTest.java
new file mode 100644
index 0000000..f3f4f54
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/persistence/PersistenceInitialImageAdvisorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.persistence;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toSet;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNotNull;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor;
+import 
org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
+import org.apache.geode.internal.lang.SystemPropertyHelper;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class PersistenceInitialImageAdvisorTest {
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
+  private InternalPersistenceAdvisor persistenceAdvisor;
+  private CacheDistributionAdvisor cacheDistributionAdvisor;
+  private PersistenceInitialImageAdvisor persistenceInitialImageAdvisor;
+
+  @Before
+  public void setup() {
+    cacheDistributionAdvisor = mock(CacheDistributionAdvisor.class, 
RETURNS_DEEP_STUBS);
+    
when(cacheDistributionAdvisor.getDistributionManager().getConfig().getAckWaitThreshold())
+        .thenReturn(15);
+
+    persistenceAdvisor = mock(InternalPersistenceAdvisor.class);
+    
when(persistenceAdvisor.getCacheDistributionAdvisor()).thenReturn(cacheDistributionAdvisor);
+
+    persistenceInitialImageAdvisor = new 
PersistenceInitialImageAdvisor(persistenceAdvisor,
+        "short disk store ID", "region path", cacheDistributionAdvisor, true);
+  }
+
+  @Test
+  public void publishesListOfMissingMembersWhenWaitingForMissingMembers() {
+    System.setProperty("geode." + 
SystemPropertyHelper.PERSISTENT_VIEW_RETRY_TIMEOUT_SECONDS, "0");
+    Set<PersistentMemberID> offlineMembersToWaitFor = 
givenOfflineMembersToWaitFor(1);
+
+    when(cacheDistributionAdvisor.adviseInitialImage(null, true))
+        .thenReturn(adviceWithReplicates(0), adviceWithReplicates(1));
+
+    persistenceInitialImageAdvisor.getAdvice(null);
+
+    verify(persistenceAdvisor, times(2)).setWaitingOnMembers(any(), any());
+
+    InOrder inOrder = inOrder(persistenceAdvisor);
+    inOrder.verify(persistenceAdvisor, 
times(1)).setWaitingOnMembers(isNotNull(),
+        eq(offlineMembersToWaitFor));
+    inOrder.verify(persistenceAdvisor, times(1)).setWaitingOnMembers(isNull(), 
isNull());
+  }
+
+  private Set<PersistentMemberID> givenOfflineMembersToWaitFor(int 
memberCount) {
+    HashSet<PersistentMemberID> offlineMembersToWaitFor =
+        IntStream.range(0, memberCount).mapToObj(i -> 
persistentMemberID("offline member " + i))
+            .distinct().collect(toCollection(HashSet::new));
+    Set<PersistentMemberID> membersToWaitFor = new 
HashSet<>(offlineMembersToWaitFor);
+
+    
when(persistenceAdvisor.getPersistedOnlineOrEqualMembers()).thenReturn(offlineMembersToWaitFor);
+    when(persistenceAdvisor.getMembersToWaitFor(any(), 
any())).thenAnswer(invocation -> {
+      Set<PersistentMemberID> previouslyOnlineMembers = 
invocation.getArgument(0);
+      Set<PersistentMemberID> offlineMembers = invocation.getArgument(1);
+      offlineMembers.addAll(previouslyOnlineMembers);
+      return membersToWaitFor;
+    });
+
+    return offlineMembersToWaitFor;
+  }
+
+  private static InternalDistributedMember internalDistributedMember(String 
name) {
+    return mock(InternalDistributedMember.class, name);
+  }
+
+  private static PersistentMemberID persistentMemberID(String name) {
+    return mock(PersistentMemberID.class, name);
+  }
+
+  private static InitialImageAdvice adviceWithReplicates(int replicateCount) {
+    Set<InternalDistributedMember> replicates = IntStream.range(0, 
replicateCount)
+        .mapToObj(i -> internalDistributedMember("replicate " + 
i)).collect(toSet());
+    return new InitialImageAdvice(replicates, emptySet(), emptySet(), 
emptySet(), emptySet(),
+        emptySet(), emptyMap());
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
aging...@apache.org.

Reply via email to