AMashenkov commented on a change in pull request #9081: URL: https://github.com/apache/ignite/pull/9081#discussion_r633556166
########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java ########## @@ -169,103 +97,30 @@ protected GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId } /** {@inheritDoc} */ - @Override protected boolean onPage(UUID nodeId, boolean last) { - assert Thread.holdsLock(this); - - if (!loc) { - rcvd.add(nodeId); - - if (rcvd.containsAll(subgrid)) - firstPageLatch.countDown(); - } - - boolean futFinish; - - if (last) { - futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty()); - - if (futFinish) - firstPageLatch.countDown(); - } - else - futFinish = false; - - return futFinish; - } - - /** {@inheritDoc} */ - @Override protected void loadPage() { - assert !Thread.holdsLock(this); - - Collection<ClusterNode> nodes = null; - - synchronized (this) { - if (!isDone() && rcvd.containsAll(subgrid)) { - rcvd.clear(); - - nodes = nodes(); - } - } - - if (nodes != null) - cctx.queries().loadPage(reqId, qry.query(), nodes, false); + @Override protected boolean onPage(@Nullable UUID nodeId, boolean last) { + return reducer.onPage(nodeId, last); } /** {@inheritDoc} */ @Override protected void loadAllPages() throws IgniteInterruptedCheckedException { - assert !Thread.holdsLock(this); - - U.await(firstPageLatch); - - Collection<ClusterNode> nodes = null; - - synchronized (this) { - if (!isDone() && !subgrid.isEmpty()) - nodes = nodes(); - } - - if (nodes != null) - cctx.queries().loadPage(reqId, qry.query(), nodes, true); - } - - /** - * @return Nodes to send requests to. - */ - private Collection<ClusterNode> nodes() { - assert Thread.holdsLock(this); - - Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size()); - - for (UUID nodeId : subgrid) { - ClusterNode node = cctx.discovery().node(nodeId); - - if (node != null) - nodes.add(node); - } - - return nodes; + reducer.loadAll(); } /** {@inheritDoc} */ - @Override public boolean onDone(Collection<R> res, Throwable err) { - boolean done = super.onDone(res, err); - - // Must release the lath after onDone() in order for a waiting thread to see an exception, if any. - firstPageLatch.countDown(); - - return done; + @Override protected Reducer<R> reducer() { + return reducer; } /** {@inheritDoc} */ @Override public boolean onCancelled() { - firstPageLatch.countDown(); + reducer.onLastPage(); return super.onCancelled(); } /** {@inheritDoc} */ @Override public void onTimeout() { - firstPageLatch.countDown(); + reducer.onLastPage(); super.onTimeout(); } Review comment: Do we need all these callbacks in QueryFutureAdapter just to delegate calls to reducer and having reducer() at same time? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java ########## @@ -169,103 +97,30 @@ protected GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId } /** {@inheritDoc} */ - @Override protected boolean onPage(UUID nodeId, boolean last) { - assert Thread.holdsLock(this); - - if (!loc) { - rcvd.add(nodeId); - - if (rcvd.containsAll(subgrid)) - firstPageLatch.countDown(); - } - - boolean futFinish; - - if (last) { - futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty()); - - if (futFinish) - firstPageLatch.countDown(); - } - else - futFinish = false; - - return futFinish; - } - - /** {@inheritDoc} */ - @Override protected void loadPage() { - assert !Thread.holdsLock(this); - - Collection<ClusterNode> nodes = null; - - synchronized (this) { - if (!isDone() && rcvd.containsAll(subgrid)) { - rcvd.clear(); - - nodes = nodes(); - } - } - - if (nodes != null) - cctx.queries().loadPage(reqId, qry.query(), nodes, false); + @Override protected boolean onPage(@Nullable UUID nodeId, boolean last) { + return reducer.onPage(nodeId, last); } /** {@inheritDoc} */ @Override protected void loadAllPages() throws IgniteInterruptedCheckedException { - assert !Thread.holdsLock(this); - - U.await(firstPageLatch); - - Collection<ClusterNode> nodes = null; - - synchronized (this) { - if (!isDone() && !subgrid.isEmpty()) - nodes = nodes(); - } - - if (nodes != null) - cctx.queries().loadPage(reqId, qry.query(), nodes, true); - } - - /** - * @return Nodes to send requests to. - */ - private Collection<ClusterNode> nodes() { - assert Thread.holdsLock(this); - - Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size()); - - for (UUID nodeId : subgrid) { - ClusterNode node = cctx.discovery().node(nodeId); - - if (node != null) - nodes.add(node); - } - - return nodes; + reducer.loadAll(); } /** {@inheritDoc} */ - @Override public boolean onDone(Collection<R> res, Throwable err) { - boolean done = super.onDone(res, err); - - // Must release the lath after onDone() in order for a waiting thread to see an exception, if any. - firstPageLatch.countDown(); - - return done; + @Override protected Reducer<R> reducer() { + return reducer; } /** {@inheritDoc} */ @Override public boolean onCancelled() { - firstPageLatch.countDown(); + reducer.onLastPage(); return super.onCancelled(); } /** {@inheritDoc} */ @Override public void onTimeout() { - firstPageLatch.countDown(); + reducer.onLastPage(); super.onTimeout(); } Review comment: Do we need all these callbacks in QueryFutureAdapter just to delegate calls to reducer and having reducer() at same time? I see a patter you alresy use: reducer().onPage() and similar. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryResultFetcher.java ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * This class is responsible for sending request for query result pages to remote nodes. + */ +public abstract class CacheQueryResultFetcher { + /** + * Map (requestId -> query future) where request id is unique for all requests per query. + * This map is populated by query manager. + */ + private final ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, ?>> qryFuts; + + /** Cache context. */ + private final GridCacheContext cctx; + + /** Ignite logger. */ + private final IgniteLogger log; + + /** */ + CacheQueryResultFetcher( + final GridCacheContext cctx, + final ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, ?>> qryFuts) { Review comment: CacheQueryResultFetcher doesn't look like good class name. 1. Actually, it is not fetcher for particular query, it is a holder for object-fetchers for multiple queries. 2. Why it depends on qryFuts? 3. The functionality of the class is limited to create-and-send requests, but any response processing is out of scope... So, it is a 'requester', but not 'fetcher' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org