AMashenkov commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r633556166



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -169,103 +97,30 @@ protected 
GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean onPage(UUID nodeId, boolean last) {
-        assert Thread.holdsLock(this);
-
-        if (!loc) {
-            rcvd.add(nodeId);
-
-            if (rcvd.containsAll(subgrid))
-                firstPageLatch.countDown();
-        }
-
-        boolean futFinish;
-
-        if (last) {
-            futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty());
-
-            if (futFinish)
-                firstPageLatch.countDown();
-        }
-        else
-            futFinish = false;
-
-        return futFinish;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void loadPage() {
-        assert !Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && rcvd.containsAll(subgrid)) {
-                rcvd.clear();
-
-                nodes = nodes();
-            }
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, false);
+    @Override protected boolean onPage(@Nullable UUID nodeId, boolean last) {
+        return reducer.onPage(nodeId, last);
     }
 
     /** {@inheritDoc} */
     @Override protected void loadAllPages() throws 
IgniteInterruptedCheckedException {
-        assert !Thread.holdsLock(this);
-
-        U.await(firstPageLatch);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && !subgrid.isEmpty())
-                nodes = nodes();
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, true);
-    }
-
-    /**
-     * @return Nodes to send requests to.
-     */
-    private Collection<ClusterNode> nodes() {
-        assert Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());
-
-        for (UUID nodeId : subgrid) {
-            ClusterNode node = cctx.discovery().node(nodeId);
-
-            if (node != null)
-                nodes.add(node);
-        }
-
-        return nodes;
+        reducer.loadAll();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        boolean done = super.onDone(res, err);
-
-        // Must release the lath after onDone() in order for a waiting thread 
to see an exception, if any.
-        firstPageLatch.countDown();
-
-        return done;
+    @Override protected Reducer<R> reducer() {
+        return reducer;
     }
 
     /** {@inheritDoc} */
     @Override public boolean onCancelled() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         return super.onCancelled();
     }
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         super.onTimeout();
     }

Review comment:
       Do we need all these callbacks in QueryFutureAdapter just to delegate 
calls to reducer
   and having reducer() at same time?
   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -169,103 +97,30 @@ protected 
GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean onPage(UUID nodeId, boolean last) {
-        assert Thread.holdsLock(this);
-
-        if (!loc) {
-            rcvd.add(nodeId);
-
-            if (rcvd.containsAll(subgrid))
-                firstPageLatch.countDown();
-        }
-
-        boolean futFinish;
-
-        if (last) {
-            futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty());
-
-            if (futFinish)
-                firstPageLatch.countDown();
-        }
-        else
-            futFinish = false;
-
-        return futFinish;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void loadPage() {
-        assert !Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && rcvd.containsAll(subgrid)) {
-                rcvd.clear();
-
-                nodes = nodes();
-            }
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, false);
+    @Override protected boolean onPage(@Nullable UUID nodeId, boolean last) {
+        return reducer.onPage(nodeId, last);
     }
 
     /** {@inheritDoc} */
     @Override protected void loadAllPages() throws 
IgniteInterruptedCheckedException {
-        assert !Thread.holdsLock(this);
-
-        U.await(firstPageLatch);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && !subgrid.isEmpty())
-                nodes = nodes();
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, true);
-    }
-
-    /**
-     * @return Nodes to send requests to.
-     */
-    private Collection<ClusterNode> nodes() {
-        assert Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());
-
-        for (UUID nodeId : subgrid) {
-            ClusterNode node = cctx.discovery().node(nodeId);
-
-            if (node != null)
-                nodes.add(node);
-        }
-
-        return nodes;
+        reducer.loadAll();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        boolean done = super.onDone(res, err);
-
-        // Must release the lath after onDone() in order for a waiting thread 
to see an exception, if any.
-        firstPageLatch.countDown();
-
-        return done;
+    @Override protected Reducer<R> reducer() {
+        return reducer;
     }
 
     /** {@inheritDoc} */
     @Override public boolean onCancelled() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         return super.onCancelled();
     }
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         super.onTimeout();
     }

Review comment:
       Do we need all these callbacks in QueryFutureAdapter just to delegate 
calls to reducer
   and having reducer() at same time?
   I see a patter you alresy use: reducer().onPage() and similar.
   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryResultFetcher.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * This class is responsible for sending request for query result pages to 
remote nodes.
+ */
+public abstract class CacheQueryResultFetcher {
+    /**
+     * Map (requestId -> query future) where request id is unique for all 
requests per query.
+     * This map is populated by query manager.
+     */
+    private final ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, 
?>> qryFuts;
+
+    /** Cache context. */
+    private final GridCacheContext cctx;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** */
+    CacheQueryResultFetcher(
+        final GridCacheContext cctx,
+        final ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, ?>> 
qryFuts) {

Review comment:
       CacheQueryResultFetcher doesn't look like good class name.
   1. Actually, it is not fetcher for particular query, it is a holder for 
object-fetchers for multiple queries.
   2. Why it depends on qryFuts?
   3. The functionality of the class is limited to create-and-send requests, 
but any response processing is out of scope...
   So, it is a 'requester', but not 'fetcher'




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to