DonalEvans commented on a change in pull request #6232:
URL: https://github.com/apache/geode/pull/6232#discussion_r604450874



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> 
buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new 
HashMap();

Review comment:
       The compiler warning here can be fixed by using `new HashMap<>();`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> 
buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new 
HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting 
"
+          + " node for some of the dataset. [dataset/bucket ids:" + 
bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != 
bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();

Review comment:
       Compiler warning here can be fixed by using `new ArrayList<>();`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> 
buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new 
HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting 
"
+          + " node for some of the dataset. [dataset/bucket ids:" + 
bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != 
bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList(bucketIds);
+        nodeToBucketMap.put(pr.getMyId(), localBucketIds);
         // All the buckets are hosted locally.
-        if (bucketIds.size() == totalBucketsToQuery) {
-          return ret;
+        if (localBucketIds.size() == bucketIdsToConsider.size()) {
+          return bucketIds;
         }
       }
     }
-
-    final List allNodes = getAllNodes(this.pr.getRegionAdvisor());
-    /*
-     * for(Map.Entry<InternalDistributedMember, Collection<Collection>> entry :
-     * resultsPerMember.entrySet()) { InternalDistributedMember member = 
entry.getKey();
-     * TaintableArrayList list = entry.getValue(); if(list.isTainted()) {
-     * taintedMembers.add(member); } }
-     */
-
+    final List<InternalDistributedMember> allNodes = 
getAllNodes(this.pr.getRegionAdvisor());
     // Put the failed members on the end of the list.
     if (failedMembers != null && !failedMembers.isEmpty()) {
       allNodes.removeAll(failedMembers);
       allNodes.addAll(failedMembers);
     }
-
+    int totalBucketsToQuery = bucketIdsToConsider.size();
     for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext()
         && (bucketIds.size() < totalBucketsToQuery);) {
       InternalDistributedMember nd = (InternalDistributedMember) dsItr.next();
-
       final List<Integer> buckets = new ArrayList<Integer>();

Review comment:
       This can be just `new ArrayList<>();`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> 
buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new 
HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting 
"
+          + " node for some of the dataset. [dataset/bucket ids:" + 
bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != 
bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));

Review comment:
       This can be just `bucketIds.add(bid);`

##########
File path: 
geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, 
locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, 
locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> 
cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+      region.destroy("Key-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    AsyncInvocation<?> serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) 
cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          blackboard.signalGate("StartCQ");
+          blackboard.waitForGate("RegistrationFinished");
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      
clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      blackboard.waitForGate("StartCQ");
+      SelectResults cqResults = queryService
+          .newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+      blackboard.signalGate("RegistrationFinished");
+      blackboard.setMailbox("CqQueryResultCount", cqResults.asList().size());
+    });
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      Integer CqQueryResultCount = blackboard.getMailbox("CqQueryResultCount");
+      Integer CqEvents = blackboard.getMailbox("CqEvents");
+      assertThat(CqQueryResultCount + CqEvents).isEqualTo(1);
+    });
+
+    serverAsync.await();
+  }
+
+  @Test
+  public void verifyCqEventInvocationIfTxCommitFromClient() throws Exception {
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, 
locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, 
locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> 
cacheRule.withServerConnection(server1.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      
clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      queryService.newCq("Select * from " + SEPARATOR + REGION_NAME, 
cqAttributes)
+          .executeWithInitialResults();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      TXManagerImpl txManager = (TXManagerImpl) 
clientCache.getCacheTransactionManager();
+      txManager.begin();
+
+      clientCache.getRegion(REGION_NAME).destroy("Key-1");
+      txManager.commit();
+    });
+
+    GeodeAwaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {

Review comment:
       It's recommended to use the default timeout for `await()` unless there 
is a specific timeout that's being tested as using shorter values can make 
flaky test failures more likely.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> 
buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new 
HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting 
"
+          + " node for some of the dataset. [dataset/bucket ids:" + 
bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != 
bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList(bucketIds);
+        nodeToBucketMap.put(pr.getMyId(), localBucketIds);
         // All the buckets are hosted locally.
-        if (bucketIds.size() == totalBucketsToQuery) {
-          return ret;
+        if (localBucketIds.size() == bucketIdsToConsider.size()) {
+          return bucketIds;
         }
       }
     }
-
-    final List allNodes = getAllNodes(this.pr.getRegionAdvisor());
-    /*
-     * for(Map.Entry<InternalDistributedMember, Collection<Collection>> entry :
-     * resultsPerMember.entrySet()) { InternalDistributedMember member = 
entry.getKey();
-     * TaintableArrayList list = entry.getValue(); if(list.isTainted()) {
-     * taintedMembers.add(member); } }
-     */
-
+    final List<InternalDistributedMember> allNodes = 
getAllNodes(this.pr.getRegionAdvisor());
     // Put the failed members on the end of the list.
     if (failedMembers != null && !failedMembers.isEmpty()) {
       allNodes.removeAll(failedMembers);
       allNodes.addAll(failedMembers);
     }
-
+    int totalBucketsToQuery = bucketIdsToConsider.size();
     for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext()

Review comment:
       Compiler warning here can be fixed by using 
`Iterator<InternalDistributedMember>`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> 
buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new 
HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting 
"
+          + " node for some of the dataset. [dataset/bucket ids:" + 
bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != 
bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList(bucketIds);

Review comment:
       Compiler warning here can be fixed by using `new ArrayList<>(bucketIds);`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> 
buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new 
HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting 
"
+          + " node for some of the dataset. [dataset/bucket ids:" + 
bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();

Review comment:
       Compiler warning here can be fixed by using `new ArrayList<>();`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -909,28 +939,22 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
         }
       }
       if (!buckets.isEmpty()) {
-        ret.put(nd, buckets);
+        nodeToBucketMap.put(nd, buckets);
       }
     }
+    return bucketIds;
+  }
 
-    if (bucketIds.size() != totalBucketsToQuery) {
-      bucketIdsToConsider.removeAll(bucketIds);
-      throw new QueryException("Data loss detected, unable to find the hosting 
"
-          + " node for some of the dataset. [dataset/bucket ids:" + 
bucketIdsToConsider + "]");
-    }
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("Node to bucketId map: {}", ret);
-    }
-    return ret;
+  private InternalDistributedMember getPrimaryBucketOwner(Integer bid) {
+    return pr.getBucketPrimary(bid.intValue());
   }
 
   protected Set<InternalDistributedMember> getBucketOwners(Integer bid) {
     return pr.getRegionAdvisor().getBucketOwners(bid.intValue());
   }

Review comment:
       These methods are both one line and only used once in this class and 
nowhere else. Can they just be used inline? Also, it's not necessary to call 
`.intValue()` on `bid` here.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> 
buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new 
HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting 
"
+          + " node for some of the dataset. [dataset/bucket ids:" + 
bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != 
bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> 
bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList(bucketIds);
+        nodeToBucketMap.put(pr.getMyId(), localBucketIds);
         // All the buckets are hosted locally.
-        if (bucketIds.size() == totalBucketsToQuery) {
-          return ret;
+        if (localBucketIds.size() == bucketIdsToConsider.size()) {
+          return bucketIds;
         }
       }
     }
-
-    final List allNodes = getAllNodes(this.pr.getRegionAdvisor());
-    /*
-     * for(Map.Entry<InternalDistributedMember, Collection<Collection>> entry :
-     * resultsPerMember.entrySet()) { InternalDistributedMember member = 
entry.getKey();
-     * TaintableArrayList list = entry.getValue(); if(list.isTainted()) {
-     * taintedMembers.add(member); } }
-     */
-
+    final List<InternalDistributedMember> allNodes = 
getAllNodes(this.pr.getRegionAdvisor());
     // Put the failed members on the end of the list.
     if (failedMembers != null && !failedMembers.isEmpty()) {
       allNodes.removeAll(failedMembers);
       allNodes.addAll(failedMembers);
     }
-
+    int totalBucketsToQuery = bucketIdsToConsider.size();
     for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext()
         && (bucketIds.size() < totalBucketsToQuery);) {
       InternalDistributedMember nd = (InternalDistributedMember) dsItr.next();

Review comment:
       If the change on line 928 is applied, this cast is redundant.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
##########
@@ -92,6 +98,26 @@ public void doAfterCompletionThrowsIfCommitFails() {
         .isSameAs(transactionDataNodeHasDepartedException);
   }
 
+  @Test
+  public void attacheFilterProfileAfterApplyingChagnes() {
+    TXState txState = spy(new TXState(txStateProxy, false, disabledClock()));
+    ArrayList entries = mock(ArrayList.class);
+    doReturn(entries).when(txState).generateEventOffsets();
+    doNothing().when(txState).attachFilterProfileInformation(entries);
+    doNothing().when(txState).applyChanges(entries);

Review comment:
       It's generally best not to mock collections, especially when we don't 
care about the contents, so this could be changed to:
   ```suggestion
       doReturn(new ArrayList<>()).when(txState).generateEventOffsets();
       doNothing().when(txState).attachFilterProfileInformation(any());
       doNothing().when(txState).applyChanges(any());
   ```

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
##########
@@ -605,6 +604,18 @@ protected void attachFilterProfileInformation(List 
entries) {
               o.es.setFilterRoutingInfo(fri);
               Set set = bucket.getAdjunctReceivers(ev, Collections.EMPTY_SET, 
new HashSet(), fri);
               o.es.setAdjunctRecipients(set);
+
+              if (o.es.getPendingCallback() != null) {
+                if (fri != null) {
+                  // For tx host, local filter info was also calculated.
+                  // Set this local filter info in corresponding pending 
callback so that
+                  // notifyBridgeClient has correct routing info.
+                  FilterRoutingInfo.FilterInfo localRouting = 
fri.getLocalFilterInfo();
+                  o.es.getPendingCallback().setLocalFilterInfo(localRouting);
+                }
+                // do not hold pending callback reference after setting local 
routing.
+                o.es.setPendingCallback(null);

Review comment:
       It's possible at this point that `fri` was null and so no local routing 
was set, but the pending callback reference is still set to null. Is this 
correct? If so, the comment could be changed a little to make this clear.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);

Review comment:
       Compiler warnings can be fixed here and in the other added test cases by 
using `Set<Integer> bucketSet`

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = 
prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        
when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = 
prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(10);
+    // Called 3 times : 2 times in retry and once for setting the return value
+    verify(prqe, times(3)).getPrimaryBucketOwner(1);

Review comment:
       See above comment about using `doReturn()` with a spy.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = 
prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        
when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);

Review comment:
       Correct invocation counts can be had if instead of 
`when(x).thenReturn(y)` you use `doReturn(y).when(x)` when using a spy:
   ```suggestion
           
doReturn(null).doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
         } else {
           doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
   ```

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = 
prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {

Review comment:
       Instead of hard-coding this bucketId, it might be good to extract it to 
a variable, since it's used in the verify step too.

##########
File path: 
geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, 
locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, 
locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> 
cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())

Review comment:
       Compiler warning here and elsewhere in the class can be fixed by using 
`new PartitionAttributesFactory<>()`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -909,28 +939,22 @@ private SelectResults buildSortedResult(CompiledSelect 
cs, int limit) throws Que
         }
       }
       if (!buckets.isEmpty()) {
-        ret.put(nd, buckets);
+        nodeToBucketMap.put(nd, buckets);
       }
     }
+    return bucketIds;
+  }
 
-    if (bucketIds.size() != totalBucketsToQuery) {
-      bucketIdsToConsider.removeAll(bucketIds);
-      throw new QueryException("Data loss detected, unable to find the hosting 
"
-          + " node for some of the dataset. [dataset/bucket ids:" + 
bucketIdsToConsider + "]");
-    }
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("Node to bucketId map: {}", ret);
-    }
-    return ret;
+  private InternalDistributedMember getPrimaryBucketOwner(Integer bid) {
+    return pr.getBucketPrimary(bid.intValue());
   }
 
   protected Set<InternalDistributedMember> getBucketOwners(Integer bid) {
     return pr.getRegionAdvisor().getBucketOwners(bid.intValue());
   }
 
-  protected ArrayList getAllNodes(RegionAdvisor regionAdvisor) {
-    ArrayList nodes = new ArrayList(regionAdvisor.adviseDataStore());
+  protected ArrayList<InternalDistributedMember> getAllNodes(RegionAdvisor 
regionAdvisor) {
+    ArrayList<InternalDistributedMember> nodes = new 
ArrayList(regionAdvisor.adviseDataStore());

Review comment:
       Compiler warning here can be fixed by using `new 
ArrayList<>(regionAdvisor.adviseDataStore());`

##########
File path: 
geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, 
locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, 
locator.getPort());

Review comment:
       It is not necessary to pass a `Properties` as an argument to 
`startLocator()` or `startServer()`, so the variable can be removed here.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = 
prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        
when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = 
prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(10);
+    // Called 3 times : 2 times in retry and once for setting the return value
+    verify(prqe, times(3)).getPrimaryBucketOwner(1);
+  }
+
+  @Test
+  public void exceptionIsThrownWhenPrimaryBucketNodeIsNotFoundForCqQuery() 
throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      }
+    }
+
+    assertThatThrownBy(() -> 
prqe.buildNodeToBucketMap()).isInstanceOf(QueryException.class)
+        .hasMessageContaining(
+            "Data loss detected, unable to find the hosting  node for some of 
the dataset.");
+
+    // Called 3 times : 2 times in retry and once for setting the return value
+    verify(prqe, times(4)).getPrimaryBucketOwner(1);

Review comment:
       See above comments about using `doReturn()` with a spy and extracting 
the bucketId to a variable.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
##########
@@ -92,6 +98,26 @@ public void doAfterCompletionThrowsIfCommitFails() {
         .isSameAs(transactionDataNodeHasDepartedException);
   }
 
+  @Test
+  public void attacheFilterProfileAfterApplyingChagnes() {

Review comment:
       Typo here, should be "attachFilterProfileAfterApplyingChanges"

##########
File path: 
geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, 
locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, 
locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> 
cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+      region.destroy("Key-1");

Review comment:
       This can be replaced with 
`PartitionRegionHelper.assignBucketsToPartitions(region);`, which creates 
buckets without needing to put data into the region.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = 
prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws 
Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        
when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = 
prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(10);
+    // Called 3 times : 2 times in retry and once for setting the return value
+    verify(prqe, times(3)).getPrimaryBucketOwner(1);
+  }
+
+  @Test
+  public void exceptionIsThrownWhenPrimaryBucketNodeIsNotFoundForCqQuery() 
throws Exception {

Review comment:
       Exception is never thrown from this method, so this can be removed.

##########
File path: 
geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {

Review comment:
       There is quite a lot of duplicate code in the tests in this class. Would 
it be possible to move the cluster startup and region creation to a `@Before` 
method?

##########
File path: 
geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, 
locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, 
locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> 
cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+      region.destroy("Key-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    AsyncInvocation<?> serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) 
cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          blackboard.signalGate("StartCQ");
+          blackboard.waitForGate("RegistrationFinished");
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      
clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      blackboard.waitForGate("StartCQ");
+      SelectResults cqResults = queryService

Review comment:
       Compiler warning here can be fixed by using `SelectResults<Object>`

##########
File path: 
geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, 
locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, 
locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> 
cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+      region.destroy("Key-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    AsyncInvocation<?> serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) 
cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          blackboard.signalGate("StartCQ");
+          blackboard.waitForGate("RegistrationFinished");
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      
clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      blackboard.waitForGate("StartCQ");
+      SelectResults cqResults = queryService
+          .newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+      blackboard.signalGate("RegistrationFinished");
+      blackboard.setMailbox("CqQueryResultCount", cqResults.asList().size());
+    });
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      Integer CqQueryResultCount = blackboard.getMailbox("CqQueryResultCount");
+      Integer CqEvents = blackboard.getMailbox("CqEvents");
+      assertThat(CqQueryResultCount + CqEvents).isEqualTo(1);
+    });
+
+    serverAsync.await();
+  }
+
+  @Test
+  public void verifyCqEventInvocationIfTxCommitFromClient() throws Exception {
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, 
locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, 
locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> 
cacheRule.withServerConnection(server1.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      
cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new 
PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      
clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      queryService.newCq("Select * from " + SEPARATOR + REGION_NAME, 
cqAttributes)
+          .executeWithInitialResults();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      TXManagerImpl txManager = (TXManagerImpl) 
clientCache.getCacheTransactionManager();
+      txManager.begin();
+
+      clientCache.getRegion(REGION_NAME).destroy("Key-1");

Review comment:
       If the suggestions to move region creation to a `@Before` method and to 
use `PartitionRegionHelper.assignBucketsToPartitions(region);` are followed, 
this can be changed to a put to allow the test to still pass.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to