timoninmaxim commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r726944317
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -17,257 +17,172 @@
package org.apache.ignite.internal.processors.cache.query;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.util.lang.GridPlainCallable;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
+import
org.apache.ignite.internal.processors.cache.query.reducer.MergeSortDistributedCacheQueryReducer;
+import
org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
+import
org.apache.ignite.internal.processors.cache.query.reducer.UnsortedDistributedCacheQueryReducer;
import org.apache.ignite.internal.util.typedef.internal.U;
+import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
+
/**
* Distributed query future.
*/
public class GridCacheDistributedQueryFuture<K, V, R> extends
GridCacheQueryFutureAdapter<K, V, R> {
/** */
- private static final long serialVersionUID = 0L;
+ private final long reqId;
- /** */
- private long reqId;
+ /** Helps to send cache query requests to other nodes. */
+ private final GridCacheDistributedQueryManager<K, V> qryMgr;
- /** */
- private final Collection<UUID> subgrid = new HashSet<>();
+ /** Collection of streams. */
+ private final Map<UUID, NodePageStream<R>> streams;
- /** */
- private final Collection<UUID> rcvd = new HashSet<>();
+ /** Count of streams that finish receiving remote pages. */
+ private final AtomicInteger noRemotePagesStreamsCnt = new AtomicInteger();
- /** */
- private CountDownLatch firstPageLatch = new CountDownLatch(1);
+ /** Count down this latch when every node responses on initial cache query
request. */
+ private final CountDownLatch firstPageLatch = new CountDownLatch(1);
+
+ /** Set of nodes that deliver their first page. */
+ private Set<UUID> rcvdFirstPage = ConcurrentHashMap.newKeySet();
/**
* @param ctx Cache context.
* @param reqId Request ID.
* @param qry Query.
- * @param nodes Nodes.
*/
- protected GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long
reqId, GridCacheQueryBean qry,
- Iterable<ClusterNode> nodes) {
+ protected GridCacheDistributedQueryFuture(
+ GridCacheContext<K, V> ctx,
+ long reqId,
+ GridCacheQueryBean qry,
+ Collection<ClusterNode> nodes
+ ) {
super(ctx, qry, false);
assert reqId > 0;
this.reqId = reqId;
- GridCacheQueryManager<K, V> mgr = ctx.queries();
+ qryMgr = (GridCacheDistributedQueryManager<K, V>) ctx.queries();
- assert mgr != null;
+ streams = new ConcurrentHashMap<>(nodes.size());
- synchronized (this) {
- for (ClusterNode node : nodes)
- subgrid.add(node.id());
- }
- }
+ for (ClusterNode node : nodes) {
+ NodePageStream<R> s = new NodePageStream<>(node.id(), () ->
requestPages(node.id()), () -> cancelPages(node.id()));
- /** {@inheritDoc} */
- @Override protected void cancelQuery() throws IgniteCheckedException {
- final GridCacheQueryManager<K, V> qryMgr = cctx.queries();
+ streams.put(node.id(), s);
- assert qryMgr != null;
+ startQuery(node.id());
+ }
- try {
- Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
- Collection<ClusterNode> nodes;
-
- synchronized (this) {
- nodes = F.retain(allNodes, true,
- new P1<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return !cctx.localNodeId().equals(node.id()) &&
subgrid.contains(node.id());
- }
- }
- );
-
- subgrid.clear();
- }
+ Map<UUID, NodePageStream<R>> streamsMap =
Collections.unmodifiableMap(streams);
- final GridCacheQueryRequest req = new
GridCacheQueryRequest(cctx.cacheId(),
- reqId,
- fields(),
- qryMgr.queryTopologyVersion(),
- cctx.deploymentEnabled());
+ reducer = qry.query().type() == TEXT ?
+ new MergeSortDistributedCacheQueryReducer<>(streamsMap)
+ : new UnsortedDistributedCacheQueryReducer<>(streamsMap);
+ }
- // Process cancel query directly (without sending) for local node,
- cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
- @Override public Object call() {
- qryMgr.processQueryRequest(cctx.localNodeId(), req);
+ /** {@inheritDoc} */
+ @Override protected void cancelQuery(Throwable err) {
+ firstPageLatch.countDown();
- return null;
- }
- });
-
- if (!nodes.isEmpty()) {
- for (ClusterNode node : nodes) {
- try {
- cctx.io().send(node, req, cctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- if (cctx.io().checkNodeLeft(node.id(), e, false)) {
- if (log.isDebugEnabled())
- log.debug("Failed to send cancel request, node
failed: " + node);
- }
- else
- U.error(log, "Failed to send cancel request
[node=" + node + ']', e);
- }
- }
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send cancel request (will cancel query in
any case).", e);
- }
+ for (NodePageStream<R> s : streams.values())
+ s.cancel(err);
- qryMgr.onQueryFutureCanceled(reqId);
+ cctx.queries().onQueryFutureCanceled(reqId);
clear();
}
/** {@inheritDoc} */
- @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
@Override protected void onNodeLeft(UUID nodeId) {
Review comment:
We should not mark future with error if node left after delivering all
pages, as it's save to ignore this. But we can invoke `onError` within if block
directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]