Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1466 d26b03cf8 -> d7121b5a9 (forced update)


GEODE-1962: Increment notQueuedConflated stat if object unresolved from offheap


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/280d2d8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/280d2d8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/280d2d8f

Branch: refs/heads/feature/GEODE-1466
Commit: 280d2d8f70ed2b0ae533b2c2d0728130133927ce
Parents: b2d85b4
Author: Jason Huynh <huyn...@gmail.com>
Authored: Wed Oct 5 13:32:19 2016 -0700
Committer: Jason Huynh <huyn...@gmail.com>
Committed: Tue Oct 11 11:08:28 2016 -0700

----------------------------------------------------------------------
 .../parallel/ParallelGatewaySenderQueue.java    |  35 +++--
 .../ParallelGatewaySenderQueueJUnitTest.java    | 134 ++++++++++++++++++-
 2 files changed, 149 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/280d2d8f/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 1a9b126..04b6c91 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -993,7 +993,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     return null;
   }
 
-  private boolean areLocalBucketQueueRegionsPresent() {
+  protected boolean areLocalBucketQueueRegionsPresent() {
     boolean bucketsAvailable = false;
     for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
       if (prQ.getDataStore().getAllLocalBucketRegions().size() > 0)
@@ -1002,10 +1002,6 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     return false;
   }
   
-  private boolean areLocalBucketQueueRegionsPresent(PartitionedRegion prQ) {
-    return prQ.getDataStore().isLocalBucketRegionPresent();
-  }
-  
   private int pickBucketId;
   
   protected int getRandomPrimaryBucket(PartitionedRegion prQ) {
@@ -1032,8 +1028,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
       while(nTry-- > 0) {
         if(pickBucketId >= thisProcessorBuckets.size())
           pickBucketId = 0;
-        BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore()
-            .getLocalBucketById(thisProcessorBuckets.get(pickBucketId++));
+        BucketRegionQueue br = getBucketRegionQueueByBucketId(prQ, 
thisProcessorBuckets.get(pickBucketId++));
         if (br != null && br.isReadyForPeek()) {
           return br.getId();
         }
@@ -1044,7 +1039,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
       /*Collections.shuffle(thisProcessorBuckets);
       for (Integer bucketId : thisProcessorBuckets) {
         BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore()
-            .getLocalBucketById(bucketId);
+            .getBucketRegionQueueByBucketId(bucketId);
         
         if (br != null && br.isReadyForPeek()) {
           return br.getId();
@@ -1119,8 +1114,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId)
         .isPrimary();
     if (isPrimary) {
-      BucketRegionQueue brq = (BucketRegionQueue)prQ.getDataStore()
-          .getLocalBucketById(bucketId);
+      BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
       // TODO : Kishor : Make sure we dont need to initalize a bucket
       // before destroying a key from it
       try {
@@ -1303,6 +1297,9 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
         if (object != null) {
           GatewaySenderEventImpl copy = object.makeHeapCopyIfOffHeap();
           if (copy == null) {
+            if (stats != null) {
+              stats.incEventsNotQueuedConflated();
+            }
             continue;
           }
           object = copy;
@@ -1314,11 +1311,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
           }
           batch.add(object);
           peekedEvents.add(object);
-          BucketRegionQueue brq = ((BucketRegionQueue)prQ
-              .getDataStore().getLocalBucketById(bId));
-          
-          //brq.doLockForPrimary(false);
-          
+
         } else {
           // If time to wait is -1 (don't wait) or time interval has elapsed
           long currentTime = System.currentTimeMillis();
@@ -1452,8 +1445,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   protected Object peekAhead(PartitionedRegion prQ, int bucketId) throws 
CacheException {
     Object object = null;
-    BucketRegionQueue brq = ((BucketRegionQueue)prQ
-        .getDataStore().getLocalBucketById(bucketId));
+    BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
 
     if (logger.isDebugEnabled()) {
       logger.debug("{}: Peekahead for the bucket {}",this, bucketId);
@@ -1475,8 +1467,13 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     }
     return object; // OFFHEAP: ok since callers are careful to do destroys on 
region queue after finished with peeked object.
   }
-  
-  
+
+  protected BucketRegionQueue getBucketRegionQueueByBucketId(final 
PartitionedRegion prQ, final int bucketId) {
+    return (BucketRegionQueue)prQ
+        .getDataStore().getLocalBucketById(bucketId);
+  }
+
+
   public int localSize() {
     int size = 0;
     for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/280d2d8f/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index f1d4408..4beace5 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -19,20 +19,35 @@ package org.apache.geode.internal.cache.wan.parallel;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
+import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.execute.BucketMovedException;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
 import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue.MetaRegionFactory;
 import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue.ParallelGatewaySenderQueueMetaRegion;
 import org.apache.geode.test.junit.categories.UnitTest;
@@ -43,11 +58,12 @@ public class ParallelGatewaySenderQueueJUnitTest {
   private ParallelGatewaySenderQueue queue;
   private MetaRegionFactory metaRegionFactory;
   private GemFireCacheImpl cache;
+  private AbstractGatewaySender sender;
 
   @Before
   public void createParallelGatewaySenderQueue() {
     cache = mock(GemFireCacheImpl.class);
-    AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
+    sender = mock(AbstractGatewaySender.class);
     CancelCriterion cancelCriterion = mock(CancelCriterion.class);
     when(sender.getCancelCriterion()).thenReturn(cancelCriterion);
     when(sender.getCache()).thenReturn(cache);
@@ -58,6 +74,54 @@ public class ParallelGatewaySenderQueueJUnitTest {
   }
 
   @Test
+  public void 
whenGatewayEventUnableToResolveFromOffHeapTheStatForNotQueuedConflatedShouldBeIncremented()
 throws Exception {
+    GatewaySenderStats stats = mockGatewaySenderStats();
+
+    GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+    when(event.makeHeapCopyIfOffHeap()).thenReturn(null);
+    GatewaySenderEventImpl eventResolvesFromOffHeap = 
mock(GatewaySenderEventImpl.class);
+    
when(eventResolvesFromOffHeap.makeHeapCopyIfOffHeap()).thenReturn(eventResolvesFromOffHeap);
+    Queue backingList = new LinkedList();
+    backingList.add(event);
+    backingList.add(eventResolvesFromOffHeap);
+
+    BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
+
+    TestableParallelGatewaySenderQueue queue = new 
TestableParallelGatewaySenderQueue(sender, Collections.emptySet(), 0, 1, 
metaRegionFactory);
+    queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
+
+    List peeked = queue.peek(1, 1000);
+    assertEquals(1, peeked.size());
+    verify(stats, times(1)).incEventsNotQueuedConflated();
+  }
+
+  private GatewaySenderStats mockGatewaySenderStats() {
+    GatewaySenderStats stats = mock(GatewaySenderStats.class);
+    when(sender.getStatistics()).thenReturn(stats);
+    return stats;
+  }
+
+  @Test
+  public void 
whenNullPeekedEventFromBucketRegionQueueTheStatForNotQueuedConflatedShouldBeIncremented()
 throws Exception {
+    GatewaySenderStats stats = mockGatewaySenderStats();
+
+    GatewaySenderEventImpl eventResolvesFromOffHeap = 
mock(GatewaySenderEventImpl.class);
+    
when(eventResolvesFromOffHeap.makeHeapCopyIfOffHeap()).thenReturn(eventResolvesFromOffHeap);
+    Queue backingList = new LinkedList();
+    backingList.add(null);
+    backingList.add(eventResolvesFromOffHeap);
+
+    BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
+
+    TestableParallelGatewaySenderQueue queue = new 
TestableParallelGatewaySenderQueue(sender, Collections.emptySet(), 0, 1, 
metaRegionFactory);
+    queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
+
+    List peeked = queue.peek(1, 1000);
+    assertEquals(1, peeked.size());
+    verify(stats, times(1)).incEventsNotQueuedConflated();
+  }
+
+  @Test
   public void testLocalSize() throws Exception {
     ParallelGatewaySenderQueueMetaRegion mockMetaRegion = 
mock(ParallelGatewaySenderQueueMetaRegion.class);
     PartitionedRegionDataStore dataStore = 
mock(PartitionedRegionDataStore.class);
@@ -79,5 +143,73 @@ public class ParallelGatewaySenderQueueJUnitTest {
     when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
     return region;
   }
+  
+  private BucketRegionQueue mockBucketRegionQueue(final Queue backingList) {
+    PartitionedRegion mockBucketRegion = mockPR("bucketRegion");
+    //These next mocked return calls are for when peek is called.  It ends up 
checking these on the mocked pr region
+    when(mockBucketRegion.getLocalMaxMemory()).thenReturn(100);
+    when(mockBucketRegion.size()).thenReturn(backingList.size());
+
+    BucketRegionQueue bucketRegionQueue = mock(BucketRegionQueue.class);
+    when 
(bucketRegionQueue.getPartitionedRegion()).thenReturn(mockBucketRegion);
+    when(bucketRegionQueue.peek()).thenAnswer((Answer) invocation -> 
backingList.poll());
+    return bucketRegionQueue;
+  }
+
+
+
+  private class TestableParallelGatewaySenderQueue extends 
ParallelGatewaySenderQueue {
+
+    private BucketRegionQueue mockedAbstractBucketRegionQueue;
+
+    public TestableParallelGatewaySenderQueue(final AbstractGatewaySender 
sender,
+                                              final Set<Region> userRegions,
+                                              final int idx,
+                                              final int nDispatcher) {
+      super(sender, userRegions, idx, nDispatcher);
+    }
+
+    public TestableParallelGatewaySenderQueue(final AbstractGatewaySender 
sender,
+                                              final Set<Region> userRegions,
+                                              final int idx,
+                                              final int nDispatcher,
+                                              final MetaRegionFactory 
metaRegionFactory) {
+      super(sender, userRegions, idx, nDispatcher, metaRegionFactory);
+    }
+
+
+    public void setMockedAbstractBucketRegionQueue(BucketRegionQueue mocked) {
+      this.mockedAbstractBucketRegionQueue = mocked;
+    }
+
+    public AbstractBucketRegionQueue getBucketRegion(final PartitionedRegion 
prQ, final int bucketId) {
+      return mockedAbstractBucketRegionQueue;
+    }
+
+    @Override
+    public boolean areLocalBucketQueueRegionsPresent() {
+      return true;
+    }
+
+    @Override
+    protected PartitionedRegion getRandomShadowPR() {
+      return mockedAbstractBucketRegionQueue.getPartitionedRegion();
+    }
+
+    @Override
+    protected int getRandomPrimaryBucket(PartitionedRegion pr) {
+      return 0;
+    }
+
+    @Override
+    protected BucketRegionQueue 
getBucketRegionQueueByBucketId(PartitionedRegion prQ, int bucketId) {
+      return mockedAbstractBucketRegionQueue;
+    }
+
+//    @Override
+//    public int localSizeForProcessor() {
+//      return 1;
+//    }
+  }
 
 }

Reply via email to