timoninmaxim commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r726931325



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -17,257 +17,172 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.util.lang.GridPlainCallable;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
+import 
org.apache.ignite.internal.processors.cache.query.reducer.MergeSortDistributedCacheQueryReducer;
+import 
org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
+import 
org.apache.ignite.internal.processors.cache.query.reducer.UnsortedDistributedCacheQueryReducer;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
+
 /**
  * Distributed query future.
  */
 public class GridCacheDistributedQueryFuture<K, V, R> extends 
GridCacheQueryFutureAdapter<K, V, R> {
     /** */
-    private static final long serialVersionUID = 0L;
+    private final long reqId;
 
-    /** */
-    private long reqId;
+    /** Helps to send cache query requests to other nodes. */
+    private final GridCacheDistributedQueryManager<K, V> qryMgr;
 
-    /** */
-    private final Collection<UUID> subgrid = new HashSet<>();
+    /** Collection of streams. */
+    private final Map<UUID, NodePageStream<R>> streams;
 
-    /** */
-    private final Collection<UUID> rcvd = new HashSet<>();
+    /** Count of streams that finish receiving remote pages. */
+    private final AtomicInteger noRemotePagesStreamsCnt = new AtomicInteger();
 
-    /** */
-    private CountDownLatch firstPageLatch = new CountDownLatch(1);
+    /** Count down this latch when every node responses on initial cache query 
request. */
+    private final CountDownLatch firstPageLatch = new CountDownLatch(1);
+
+    /** Set of nodes that deliver their first page. */
+    private Set<UUID> rcvdFirstPage = ConcurrentHashMap.newKeySet();
 
     /**
      * @param ctx Cache context.
      * @param reqId Request ID.
      * @param qry Query.
-     * @param nodes Nodes.
      */
-    protected GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long 
reqId, GridCacheQueryBean qry,
-        Iterable<ClusterNode> nodes) {
+    protected GridCacheDistributedQueryFuture(
+        GridCacheContext<K, V> ctx,
+        long reqId,
+        GridCacheQueryBean qry,
+        Collection<ClusterNode> nodes
+    ) {
         super(ctx, qry, false);
 
         assert reqId > 0;
 
         this.reqId = reqId;
 
-        GridCacheQueryManager<K, V> mgr = ctx.queries();
+        qryMgr = (GridCacheDistributedQueryManager<K, V>) ctx.queries();
 
-        assert mgr != null;
+        streams = new ConcurrentHashMap<>(nodes.size());
 
-        synchronized (this) {
-            for (ClusterNode node : nodes)
-                subgrid.add(node.id());
-        }
-    }
+        for (ClusterNode node : nodes) {
+            NodePageStream<R> s = new NodePageStream<>(node.id(), () -> 
requestPages(node.id()), () -> cancelPages(node.id()));
 
-    /** {@inheritDoc} */
-    @Override protected void cancelQuery() throws IgniteCheckedException {
-        final GridCacheQueryManager<K, V> qryMgr = cctx.queries();
+            streams.put(node.id(), s);
 
-        assert qryMgr != null;
+            startQuery(node.id());

Review comment:
       Use `#startQuery` for start query. `#execute` is OK for local future as 
it naturally executes full query locally. For distributed query we can just 
start query on remote nodes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to