This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch revert-4189-revert-4086-feature/GEODE-6807 in repository https://gitbox.apache.org/repos/asf/geode.git
commit c083ad58b880d782a5f6b9e84936cdaa7d97504e Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Wed Oct 23 20:22:27 2019 +0200 Revert "Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached" (#4189)" This reverts commit e225ffcd5dc8f15ae59b46dc24baefcc253801f2. --- .../CacheDistributionAdvisorConcurrentTest.java | 105 ++++++++++++ .../distributed/internal/DistributionAdvisor.java | 16 +- .../internal/cache/CacheDistributionAdvisor.java | 98 +++++++++--- .../internal/cache/DistributedCacheOperation.java | 2 +- .../geode/internal/cache/DistributedRegion.java | 7 +- .../cache/CacheDistributionAdvisorTest.java | 176 +++++++++++++++++++++ 6 files changed, 374 insertions(+), 30 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java new file mode 100644 index 0000000..9afbc52 --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + + +import static org.apache.geode.test.concurrency.Utilities.availableProcessors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.Operation; +import org.apache.geode.distributed.internal.DistributionAdvisor; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.distributed.internal.membership.MemberAttributes; +import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; +import org.apache.geode.test.concurrency.ConcurrentTestRunner; +import org.apache.geode.test.concurrency.ParallelExecutor; + + +@RunWith(ConcurrentTestRunner.class) +public class CacheDistributionAdvisorConcurrentTest { + private final int count = availableProcessors() * 2; + + @Test + public void getAdviseAllEventsOrCachedForConcurrentUpdateShouldSucceed( + ParallelExecutor executor) throws Exception { + + DistributionAdvisor advisor = createCacheDistributionAdvisor(); + CacheProfile profile = createCacheProfile(); + advisor.putProfile(profile, true); + + executor.inParallel(() -> { + ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); + }, count); + executor.execute(); + + assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached() + .contains(profile.getDistributedMember())); + assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1); + + } + + @Test + public void getAdviseUpdateForConcurrentUpdateShouldSucceed( + ParallelExecutor executor) throws Exception { + + EntryEventImpl event = new EntryEventImpl(); + event.setNewValue(null); + event.setOperation(Operation.CREATE); + + DistributionAdvisor advisor = createCacheDistributionAdvisor(); + CacheProfile profile = createCacheProfile(); + advisor.putProfile(profile, true); + + executor.inParallel(() -> { + ((CacheDistributionAdvisor) advisor).adviseUpdate(event); + }, count); + executor.execute(); + + assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached() + .contains(profile.getDistributedMember())); + assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1); + + } + + private DistributionAdvisor createCacheDistributionAdvisor() { + CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + when(advisee.getCancelCriterion()).thenReturn(cancelCriterion); + DistributionManager distributionManager = mock(DistributionManager.class); + when(advisee.getDistributionManager()).thenReturn(distributionManager); + CacheDistributionAdvisor result = + CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee); + when(advisee.getDistributionAdvisor()).thenReturn(result); + return result; + } + + private CacheProfile createCacheProfile() throws UnknownHostException { + InternalDistributedMember member = + new InternalDistributedMember(InetAddress.getLocalHost(), 0, false, + false, MemberAttributes.DEFAULT); + return new CacheProfile(member, 1); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java index 0a33f51..a2e701a 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java @@ -171,6 +171,12 @@ public class DistributionAdvisor { private int numActiveProfiles = 0; /** + * Profiles version number + */ + protected volatile long profilesVersion = 0; + + + /** * A collection of MembershipListeners that want to be notified when a profile is added to or * removed from this DistributionAdvisor. The keys are membership listeners and the values are * Boolean.TRUE. @@ -1313,8 +1319,7 @@ public class DistributionAdvisor { // must synchronize when modifying profile array private synchronized boolean basicAddProfile(Profile p) { // don't add more than once, but replace existing profile - // try { - + profilesVersion++; int index = indexOfMemberId(p.getId()); if (index >= 0) { Profile[] oldProfiles = profiles; // volatile read @@ -1340,17 +1345,16 @@ public class DistributionAdvisor { * Perform work of removing the given member from this advisor. */ private synchronized Profile basicRemoveMemberId(ProfileId id) { - // try { + int i = indexOfMemberId(id); if (i >= 0) { + profilesVersion++; Profile profileRemoved = profiles[i]; basicRemoveIndex(i); return profileRemoved; } else return null; - // } finally { - // Assert.assertTrue(-1 == indexOfMemberId(id)); - // } + } private int indexOfMemberId(ProfileId id) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java index 1f3c0ba..725cc61 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java @@ -112,6 +112,19 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { // moved removedProfiles to DistributionAdvisor + private Set<InternalDistributedMember> adviseSetforAllEvents = Collections.emptySet(); + private volatile long adviseAllEventsVersion = -1; + + private Set<InternalDistributedMember> adviseSetforUpdate = Collections.emptySet(); + private volatile long adviseUpdateVersion = -1; + + private volatile long inRecoveryVersion = 0; + private volatile long adviseInRecoveryVersion = -1; + + public synchronized void incInRecoveryVersion() { + inRecoveryVersion++; + } + /** Creates a new instance of CacheDistributionAdvisor */ protected CacheDistributionAdvisor(CacheDistributionAdvisee region) { super(region); @@ -140,19 +153,35 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { /** * Returns a the set of members that either want all events or are caching data. * - * @param excludeInRecovery if true then members in recovery are excluded */ - private Set<InternalDistributedMember> adviseAllEventsOrCached(final boolean excludeInRecovery) + Set<InternalDistributedMember> adviseAllEventsOrCached() throws IllegalStateException { getAdvisee().getCancelCriterion().checkCancelInProgress(null); - return adviseFilter(profile -> { - assert profile instanceof CacheProfile; - CacheProfile cp = (CacheProfile) profile; - if (excludeInRecovery && cp.inRecovery) { - return false; + + // minimize volatile reads by copying ref to local var + long tempProfilesVersion = profilesVersion; // volatile read + long tempInRecoveryVersion = inRecoveryVersion; // volatile read + + if (adviseAllEventsVersion != tempProfilesVersion + || adviseInRecoveryVersion != tempInRecoveryVersion) { + synchronized (adviseSetforAllEvents) { + if (adviseAllEventsVersion != tempProfilesVersion + || adviseInRecoveryVersion != tempInRecoveryVersion) { + + adviseSetforAllEvents = Collections.unmodifiableSet(adviseFilter(profile -> { + CacheProfile cp = (CacheProfile) profile; + if (cp.getInRecovery()) { + return false; + } + return cp.cachedOrAllEventsWithListener(); + })); + adviseAllEventsVersion = tempProfilesVersion; + adviseInRecoveryVersion = tempInRecoveryVersion; + } } - return cp.cachedOrAllEventsWithListener(); - }); + } + return adviseSetforAllEvents; + } /** @@ -162,18 +191,30 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException { if (event.hasNewValue() || event.getOperation().isPutAll()) { // only need to distribute it to members that want all events or cache data - return adviseAllEventsOrCached(true/* fixes 41147 */); + return adviseAllEventsOrCached(); } else { // The new value is null so this is a create with a null value, // in which case we only need to distribute this message to replicates // or all events that are not a proxy or if a proxy has a listener - return adviseFilter(profile -> { - assert profile instanceof CacheProfile; - CacheProfile cp = (CacheProfile) profile; - DataPolicy dp = cp.dataPolicy; - return dp.withReplication() - || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener)); - }); + + // minimize volatile reads by copying ref to local var + long tempProfilesVersion = profilesVersion; // volatile read + + if (adviseUpdateVersion != tempProfilesVersion) { + synchronized (adviseSetforUpdate) { + if (adviseUpdateVersion != tempProfilesVersion) { + + adviseSetforUpdate = Collections.unmodifiableSet(adviseFilter(profile -> { + CacheProfile cp = (CacheProfile) profile; + DataPolicy dp = cp.getDataPolicy(); + return dp.withReplication() + || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener)); + })); + adviseUpdateVersion = tempProfilesVersion; + } + } + } + return adviseSetforUpdate; } } @@ -250,7 +291,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * Same as adviseGeneric except in recovery excluded. */ public Set<InternalDistributedMember> adviseCacheOp() { - return adviseAllEventsOrCached(true); + return adviseAllEventsOrCached(); } /* @@ -260,7 +301,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { return adviseFilter(profile -> { assert profile instanceof CacheProfile; CacheProfile cp = (CacheProfile) profile; - return !cp.inRecovery; + return !cp.getInRecovery(); }); } @@ -283,7 +324,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { assert profile instanceof CacheProfile; CacheProfile prof = (CacheProfile) profile; // if region in cache is in recovery, exclude - if (prof.inRecovery) { + if (prof.getInRecovery()) { return false; } @@ -364,7 +405,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { } // if region in cache is in recovery, exclude - if (profile.inRecovery) { + if (profile.getInRecovery()) { uninitialized.add(profile.getDistributedMember()); continue; } @@ -453,12 +494,13 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { */ public static class CacheProfile extends DistributionAdvisor.Profile { public DataPolicy dataPolicy = DataPolicy.REPLICATE; + public InterestPolicy interestPolicy = InterestPolicy.DEFAULT; public boolean hasCacheLoader = false; public boolean hasCacheWriter = false; public boolean hasCacheListener = false; public Scope scope = Scope.DISTRIBUTED_NO_ACK; - public boolean inRecovery = false; + private boolean inRecovery = false; public Set<String> gatewaySenderIds = Collections.emptySet(); public Set<String> asyncEventQueueIds = Collections.emptySet(); /** @@ -610,6 +652,18 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { return dataPolicy.withPersistence(); } + public boolean getInRecovery() { + return inRecovery; + }; + + public void setInRecovery(boolean recovery) { + inRecovery = recovery; + }; + + public DataPolicy getDataPolicy() { + return dataPolicy; + } + /** Set the profile data information that is stored in a short */ protected void setIntInfo(int s) { if ((s & REPLICATE_MASK) != 0) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index 0b13f2d..aa96f44 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -345,7 +345,7 @@ public abstract class DistributedCacheOperation { try { // Recipients with CacheOp - Set<InternalDistributedMember> recipients = getRecipients(); + Set<InternalDistributedMember> recipients = new HashSet<>(getRecipients()); Map<InternalDistributedMember, PersistentMemberID> persistentIds = null; if (region.getDataPolicy().withPersistence()) { persistentIds = region.getDistributionAdvisor().adviseInitializedPersistentMembers(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index f13bce3..d04882a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2214,7 +2214,12 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute cacheProfile.hasCacheListener = hasListener(); Assert.assertTrue(scope.isDistributed()); cacheProfile.scope = scope; - cacheProfile.inRecovery = getImageState().getInRecovery(); + + boolean newInRecovery = getImageState().getInRecovery(); + if (cacheProfile.getInRecovery() != newInRecovery) { + distAdvisor.incInRecoveryVersion(); + } + cacheProfile.setInRecovery(newInRecovery); cacheProfile.isPersistent = getDataPolicy().withPersistence(); cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java new file mode 100644 index 0000000..010e658 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.Operation; +import org.apache.geode.distributed.internal.DistributionAdvisor; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; + + +public class CacheDistributionAdvisorTest { + DistributionAdvisor advisor; + + @Before + public void setUp() { + CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class); + DistributionManager distributionManager = mock(DistributionManager.class); + when(advisee.getDistributionManager()).thenReturn(distributionManager); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + when(advisee.getCancelCriterion()).thenReturn(cancelCriterion); + advisor = + CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee); + when(advisee.getDistributionAdvisor()).thenReturn(advisor); + + } + + @Test + public void testAdviseAllEventsOrCached() { + CacheProfile profile = mock(CacheProfile.class); + InternalDistributedMember member = mock(InternalDistributedMember.class); + when(profile.getId()).thenReturn(member); + when(profile.getInRecovery()).thenReturn(false); + when(profile.cachedOrAllEventsWithListener()).thenReturn(true); + when(profile.getDistributedMember()).thenReturn(member); + + advisor.putProfile(profile, true); + Set<InternalDistributedMember> targets1 = + ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); + + Set<InternalDistributedMember> targets2 = + ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); + + Set<InternalDistributedMember> targets3 = + ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); + + verify(profile, times(1)).getInRecovery(); + verify(profile, times(1)).cachedOrAllEventsWithListener(); + + } + + @Test + public void testAdviseAllEventsOrCached2() { + CacheProfile profile = mock(CacheProfile.class); + InternalDistributedMember member = mock(InternalDistributedMember.class); + when(profile.getId()).thenReturn(member); + when(profile.getInRecovery()).thenReturn(false); + when(profile.cachedOrAllEventsWithListener()).thenReturn(true); + when(profile.getDistributedMember()).thenReturn(member); + CacheProfile profile2 = mock(CacheProfile.class); + InternalDistributedMember member2 = mock(InternalDistributedMember.class); + when(profile2.getId()).thenReturn(member2); + when(profile2.getInRecovery()).thenReturn(false); + when(profile2.cachedOrAllEventsWithListener()).thenReturn(true); + when(profile2.getDistributedMember()).thenReturn(member2); + + advisor.putProfile(profile, true); + Set<InternalDistributedMember> targets1 = + ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); + + advisor.putProfile(profile2, true); + Set<InternalDistributedMember> targets2 = + ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); + + Set<InternalDistributedMember> targets3 = + ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); + + verify(profile, times(2)).getInRecovery(); + verify(profile, times(2)).cachedOrAllEventsWithListener(); + verify(profile2, times(1)).getInRecovery(); + verify(profile2, times(1)).cachedOrAllEventsWithListener(); + + } + + + @Test + public void testAdviseUpdate() { + CacheProfile profile = mock(CacheProfile.class); + InternalDistributedMember member = mock(InternalDistributedMember.class); + EntryEventImpl event = mock(EntryEventImpl.class); + when(event.hasNewValue()).thenReturn(false); + when(event.getOperation()).thenReturn(Operation.CREATE); + + when(profile.getId()).thenReturn(member); + when(profile.cachedOrAllEventsWithListener()).thenReturn(true); + when(profile.getDistributedMember()).thenReturn(member); + when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE); + + advisor.putProfile(profile, true); + Set<InternalDistributedMember> targets1 = + ((CacheDistributionAdvisor) advisor).adviseUpdate(event); + + Set<InternalDistributedMember> targets2 = + ((CacheDistributionAdvisor) advisor).adviseUpdate(event); + + Set<InternalDistributedMember> targets3 = + ((CacheDistributionAdvisor) advisor).adviseUpdate(event); + + verify(profile, times(1)).getDataPolicy(); + } + + @Test + public void testAdviseUpdate2() { + CacheProfile profile = mock(CacheProfile.class); + InternalDistributedMember member = mock(InternalDistributedMember.class); + EntryEventImpl event = mock(EntryEventImpl.class); + when(event.hasNewValue()).thenReturn(false); + when(event.getOperation()).thenReturn(Operation.CREATE); + + when(profile.getId()).thenReturn(member); + when(profile.cachedOrAllEventsWithListener()).thenReturn(true); + when(profile.getDistributedMember()).thenReturn(member); + when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE); + + CacheProfile profile2 = mock(CacheProfile.class); + InternalDistributedMember member2 = mock(InternalDistributedMember.class); + when(profile2.getId()).thenReturn(member2); + when(profile2.cachedOrAllEventsWithListener()).thenReturn(true); + when(profile2.getDistributedMember()).thenReturn(member2); + when(profile2.getDataPolicy()).thenReturn(DataPolicy.REPLICATE); + + when(event.hasNewValue()).thenReturn(false); + when(event.getOperation()).thenReturn(Operation.CREATE); + + advisor.putProfile(profile, true); + Set<InternalDistributedMember> targets1 = + ((CacheDistributionAdvisor) advisor).adviseUpdate(event); + + advisor.putProfile(profile2, true); + Set<InternalDistributedMember> targets2 = + ((CacheDistributionAdvisor) advisor).adviseUpdate(event); + + Set<InternalDistributedMember> targets3 = + ((CacheDistributionAdvisor) advisor).adviseUpdate(event); + + verify(profile, times(2)).getDataPolicy(); + verify(profile2, times(1)).getDataPolicy(); + + } + + +}