Repository: geode Updated Branches: refs/heads/develop 288676dfe -> 72d0d4baa
GEODE-2776: Setting version tag on the client event from the current region entry after load. And refactoring the findObjectInSystem(). Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/72d0d4ba Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/72d0d4ba Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/72d0d4ba Branch: refs/heads/develop Commit: 72d0d4baaccfb90e011286cb57d97174065256ae Parents: 288676d Author: Anil <aging...@pivotal.io> Authored: Wed Apr 19 17:35:11 2017 -0700 Committer: Anil <aging...@pivotal.io> Committed: Mon May 8 10:22:24 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/DistributedRegion.java | 246 +++++++++++-------- .../geode/internal/cache/LocalRegion.java | 3 + .../DistributedRegionSearchLoadJUnitTest.java | 187 ++++++++++++++ 3 files changed, 335 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/72d0d4ba/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 0c967c9..c3a4961 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2265,123 +2265,167 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws CacheLoaderException, TimeoutException { + @Released + EntryEventImpl event = null; checkForLimitedOrNoAccess(); + final Operation op = isCreate ? Operation.CREATE : Operation.UPDATE; + long lastModified = 0L; + try { + event = findOnServer(keyInfo, op, generateCallbacks, clientEvent); + if (event == null) { + event = createEventForLoad(keyInfo, generateCallbacks, requestingClient, op); + lastModified = findUsingSearchLoad(txState, localValue, clientEvent, keyInfo, event); + } + // Update region with new value. + if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) { + putNewValueInRegion(isCreate, clientEvent, lastModified, event); + } else if (isCreate) { + recordMiss(null, event.getKey()); + } + return determineResult(preferCD, event); + } finally { + if (event != null) { + event.release(); + } + } + } + + private EntryEventImpl createEventForLoad(KeyInfo keyInfo, boolean generateCallbacks, + ClientProxyMembershipID requestingClient, Operation op) { + // Do not generate Event ID + EntryEventImpl event = EntryEventImpl.create(this, op, keyInfo.getKey(), null /* newValue */, + keyInfo.getCallbackArg(), false, getMyId(), generateCallbacks); + if (requestingClient != null) { + event.setContext(requestingClient); + } + return event; + } + + private Object determineResult(boolean preferCD, EntryEventImpl event) { + if (preferCD) { + return event.getRawNewValueAsHeapObject(); + } + return event.getNewValue(); + } + + private void putNewValueInRegion(boolean isCreate, EntryEventImpl clientEvent, long lastModified, + EntryEventImpl event) { RegionEntry re = null; - final Object key = keyInfo.getKey(); - final Object aCallbackArgument = keyInfo.getCallbackArg(); - Operation op; + // Set eventId. Required for interested clients. + event.setNewEventId(cache.getDistributedSystem()); + + long startPut = CachePerfStats.getStatTime(); + validateKey(event.getKey()); + // this next step also distributes the object to other processes, if necessary + try { + // set the tail key so that the event is passed to GatewaySender queues. + // if the tailKey is not set, the event gets filtered out in ParallelGatewaySenderQueue + if (this instanceof BucketRegion) { + if (((BucketRegion) this).getPartitionedRegion().isParallelWanEnabled()) + ((BucketRegion) this).handleWANEvent(event); + } + re = basicPutEntry(event, lastModified); + + // Update client event with latest version tag from re. + if (re != null && clientEvent != null) { + clientEvent.setVersionTag(event.getVersionTag()); + } + if (!isTX()) { + getCachePerfStats().endPut(startPut, event.isOriginRemote()); + } + } catch (ConcurrentCacheModificationException e) { + // the cache was modified while we were searching for this entry and + // the netsearch result was elided. Return the current value from the cache + updateEventWithCurrentRegionEntry(event, clientEvent); + } catch (CacheWriterException cwe) { + if (logger.isDebugEnabled()) { + logger.debug("findObjectInSystem: writer exception putting entry {} : {}", event, cwe); + } + } if (isCreate) { - op = Operation.CREATE; - } else { - op = Operation.UPDATE; + recordMiss(re, event.getKey()); } - long lastModified = 0L; - boolean fromServer = false; - @Released - EntryEventImpl event = null; - @Retained - Object result = null; + } + + private void updateEventWithCurrentRegionEntry(EntryEventImpl event, EntryEventImpl clientEvent) { + // defer the lruUpdateCallback to prevent a deadlock (see bug 51121). + final boolean disabled = this.entries.disableLruUpdateCallback(); try { - { - if (this.srp != null) { - VersionTagHolder holder = new VersionTagHolder(); - Object value = this.srp.get(key, aCallbackArgument, holder); - fromServer = value != null; - if (fromServer) { - event = EntryEventImpl.create(this, op, key, value, aCallbackArgument, false, getMyId(), - generateCallbacks); - event.setVersionTag(holder.getVersionTag()); - event.setFromServer(fromServer); // fix for bug 39358 - if (clientEvent != null && clientEvent.getVersionTag() == null) { - clientEvent.setVersionTag(holder.getVersionTag()); - } + RegionEntry re = getRegionEntry(event.getKey()); + if (re != null) { + synchronized (re) { // bug #51059 value & version must be obtained atomically + // Update client event with latest version tag from re + if (clientEvent != null) { + clientEvent.setVersionTag(re.getVersionStamp().asVersionTag()); } + // OFFHEAP: need to incrc, copy to heap to setNewValue, decrc + event.setNewValue(re.getValue(this)); } } - - if (!fromServer) { - // Do not generate Event ID - event = EntryEventImpl.create(this, op, key, null /* newValue */, aCallbackArgument, false, - getMyId(), generateCallbacks); - if (requestingClient != null) { - event.setContext(requestingClient); - } - // If this event is because of a register interest call, don't invoke the CacheLoader - boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation() != null - && clientEvent.getOperation().isGetForRegisterInterest(); - if (!getForRegisterInterest) { - SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); - try { - processor.initialize(this, key, aCallbackArgument); - // processor fills in event - processor.doSearchAndLoad(event, txState, localValue); - if (clientEvent != null && clientEvent.getVersionTag() == null) { - clientEvent.setVersionTag(event.getVersionTag()); - } - lastModified = processor.getLastModified(); - } finally { - processor.release(); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("DistributedRegion.findObjectInSystem skipping loader for region=" - + getFullPath() + "; key=" + key); - } - } + } finally { + if (disabled) { + this.entries.enableLruUpdateCallback(); } - if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) { - try { - // Set eventId. Required for interested clients. - event.setNewEventId(cache.getDistributedSystem()); - - long startPut = CachePerfStats.getStatTime(); - validateKey(key); - // if (event.getOperation().isLoad()) { - // this.performedLoad(event, lastModified, txState); - // } - // this next step also distributes the object to other processes, if necessary - try { - // set the tail key so that the event is passed to GatewaySender queues. - // if the tailKey is not set, the event gets filtered out in ParallelGatewaySenderQueue - if (this instanceof BucketRegion) { - if (((BucketRegion) this).getPartitionedRegion().isParallelWanEnabled()) - ((BucketRegion) this).handleWANEvent(event); - } - re = basicPutEntry(event, lastModified); - } catch (ConcurrentCacheModificationException e) { - // the cache was modified while we were searching for this entry and - // the netsearch result was elided. Return the current value from the cache - re = getRegionEntry(key); - if (re != null) { - event.setNewValue(re.getValue(this)); // OFFHEAP: need to incrc, copy to heap to - // setNewValue, decrc - } - } - if (!isTX()) { - getCachePerfStats().endPut(startPut, event.isOriginRemote()); - } - } catch (CacheWriterException cwe) { - if (logger.isDebugEnabled()) { - logger.debug("findObjectInSystem: writer exception putting entry {} : {}", event, cwe); - } - } + try { + this.entries.lruUpdateCallback(); + } catch (DiskAccessException dae) { + this.handleDiskAccessException(dae); + throw dae; } - if (isCreate) { - recordMiss(re, key); + } + } + + /** + * If its client, get the value from server. + */ + private EntryEventImpl findOnServer(KeyInfo keyInfo, Operation op, boolean generateCallbacks, + EntryEventImpl clientEvent) { + if (this.srp == null) { + return null; + } + EntryEventImpl event = null; + VersionTagHolder holder = new VersionTagHolder(); + Object aCallbackArgument = keyInfo.getCallbackArg(); + Object value = this.srp.get(keyInfo.getKey(), aCallbackArgument, holder); + if (value != null) { + event = EntryEventImpl.create(this, op, keyInfo.getKey(), value, aCallbackArgument, false, + getMyId(), generateCallbacks); + event.setVersionTag(holder.getVersionTag()); + event.setFromServer(true); // fix for bug 39358 + if (clientEvent != null && clientEvent.getVersionTag() == null) { + clientEvent.setVersionTag(holder.getVersionTag()); } + } + return event; + } - if (preferCD) { - result = event.getRawNewValueAsHeapObject(); - } else { - result = event.getNewValue(); + private long findUsingSearchLoad(TXStateInterface txState, Object localValue, + EntryEventImpl clientEvent, final KeyInfo keyInfo, EntryEventImpl event) { + long lastModified = 0L; + // If this event is because of a register interest call, don't invoke the CacheLoader + boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation() != null + && clientEvent.getOperation().isGetForRegisterInterest(); + if (!getForRegisterInterest) { + SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); + try { + processor.initialize(this, keyInfo.getKey(), keyInfo.getCallbackArg()); + // processor fills in event + processor.doSearchAndLoad(event, txState, localValue); + if (clientEvent != null && clientEvent.getVersionTag() == null) { + clientEvent.setVersionTag(event.getVersionTag()); + } + lastModified = processor.getLastModified(); + } finally { + processor.release(); } - return result; - } finally { - if (event != null) { - event.release(); + } else { + if (logger.isDebugEnabled()) { + logger.debug("DistributedRegion.findObjectInSystem skipping loader for region=" + + getFullPath() + "; key=" + keyInfo.getKey()); } } + return lastModified; } /** http://git-wip-us.apache.org/repos/asf/geode/blob/72d0d4ba/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 2dec53b..cdba7e4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -1393,6 +1393,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * @param key the key used to fetch the region entry */ final public void recordMiss(final RegionEntry re, Object key) { + if (!this.statisticsEnabled) { + return; + } final RegionEntry e; if (re == null && !isTX()) { e = basicGetEntry(key); http://git-wip-us.apache.org/repos/asf/geode/blob/72d0d4ba/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java new file mode 100755 index 0000000..30fb728 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.*; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.Scope; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; +import org.apache.geode.internal.cache.versions.VersionStamp; +import org.apache.geode.internal.cache.versions.VersionTag; +import org.apache.geode.test.fake.Fakes; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +@RunWith(PowerMockRunner.class) +@PowerMockIgnore("*.UnitTest") +@PrepareForTest({SearchLoadAndWriteProcessor.class}) +public class DistributedRegionSearchLoadJUnitTest { + + protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled, + RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache) { + DistributedRegion region = new DistributedRegion("testRegion", ra, null, cache, ira); + if (isConcurrencyChecksEnabled) { + region.enableConcurrencyChecks(); + } + + // since it is a real region object, we need to tell mockito to monitor it + region = spy(region); + + doNothing().when(region).distributeUpdate(any(), anyLong(), anyBoolean(), anyBoolean(), any(), + anyBoolean()); + doNothing().when(region).distributeDestroy(any(), any()); + doNothing().when(region).distributeInvalidate(any()); + doNothing().when(region).distributeUpdateEntryVersion(any()); + + return region; + } + + private RegionAttributes createRegionAttributes(boolean isConcurrencyChecksEnabled) { + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setDataPolicy(DataPolicy.REPLICATE); + factory.setConcurrencyChecksEnabled(isConcurrencyChecksEnabled); // + RegionAttributes ra = factory.create(); + return ra; + } + + private EventID createDummyEventID() { + byte[] memId = {1, 2, 3}; + EventID eventId = new EventID(memId, 11, 12, 13); + return eventId; + } + + protected EntryEventImpl createDummyEvent(DistributedRegion region) { + // create a dummy event id + EventID eventId = createDummyEventID(); + String key = "key1"; + String value = "Value1"; + + // create an event + EntryEventImpl event = EntryEventImpl.create(region, Operation.CREATE, key, value, null, + false /* origin remote */, null, false /* generateCallbacks */, eventId); + // avoid calling invokeCallbacks + event.callbacksInvoked(true); + + return event; + } + + protected VersionTag createVersionTag(boolean validVersionTag) { + InternalDistributedMember remotemember = mock(InternalDistributedMember.class); + VersionTag tag = VersionTag.create(remotemember); + if (validVersionTag) { + tag.setRegionVersion(1); + tag.setEntryVersion(1); + } + return tag; + } + + protected DistributedRegion prepare(boolean isConcurrencyChecksEnabled) { + GemFireCacheImpl cache = Fakes.cache(); + + // create region attributes and internal region arguments + RegionAttributes ra = createRegionAttributes(isConcurrencyChecksEnabled); + InternalRegionArguments ira = new InternalRegionArguments(); + + // create a region object + DistributedRegion region = createAndDefineRegion(isConcurrencyChecksEnabled, ra, ira, cache); + if (isConcurrencyChecksEnabled) { + region.enableConcurrencyChecks(); + } + + doNothing().when(region).notifyGatewaySender(any(), any()); + doReturn(true).when(region).hasSeenEvent(any(EntryEventImpl.class)); + return region; + } + + private void createSearchLoad() { + SearchLoadAndWriteProcessor proc = mock(SearchLoadAndWriteProcessor.class); + PowerMockito.mockStatic(SearchLoadAndWriteProcessor.class); + PowerMockito.when(SearchLoadAndWriteProcessor.getProcessor()).thenReturn(proc); + + VersionTag tag = createVersionTag(true); + + doAnswer(new Answer<EntryEventImpl>() { + public EntryEventImpl answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + if (args[0] instanceof EntryEventImpl) { + EntryEventImpl event = (EntryEventImpl) args[0]; + event.setNewValue("NewLoadedValue"); + event.setOperation(Operation.LOCAL_LOAD_CREATE); + } + return null; + } + }).when(proc).doSearchAndLoad(any(EntryEventImpl.class), anyObject(), anyObject()); + } + + @Test + public void testClientEventIsUpdatedWithCurrentEntryVersionTagAfterLoad() { + DistributedRegion region = prepare(true); + EntryEventImpl event = createDummyEvent(region); + region.basicInvalidate(event); + + createSearchLoad(); + + KeyInfo ki = new KeyInfo(event.getKey(), null, null); + region.findObjectInSystem(ki, false, null, false, null, false, false, null, event, false); + assertNotNull("ClientEvent version tag is not set with region version tag.", + event.getVersionTag()); + } + + @Test + public void testClientEventIsUpdatedWithCurrentEntryVersionTagAfterSearchConcurrencyException() { + DistributedRegion region = prepare(true); + + EntryEventImpl event = createDummyEvent(region); + region.basicInvalidate(event); + + VersionTag tag = createVersionTag(true); + RegionEntry re = mock(RegionEntry.class); + VersionStamp stamp = mock(VersionStamp.class); + + doReturn(re).when(region).getRegionEntry(any()); + when(re.getVersionStamp()).thenReturn(stamp); + when(stamp.asVersionTag()).thenReturn(tag); + + createSearchLoad(); + doThrow(new ConcurrentCacheModificationException()).when(region) + .basicPutEntry(any(EntryEventImpl.class), anyLong()); + + KeyInfo ki = new KeyInfo(event.getKey(), null, null); + region.findObjectInSystem(ki, false, null, false, null, false, false, null, event, false); + assertNotNull("ClientEvent version tag is not set with region version tag.", + event.getVersionTag()); + } + +} +