timoninmaxim commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r726947639
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -776,144 +686,99 @@ private Object convert(Object obj) {
}
/** {@inheritDoc} */
- @Override public CacheQueryFuture<?>
queryFieldsDistributed(GridCacheQueryBean qry,
- Collection<ClusterNode> nodes) {
- assert cctx.config().getCacheMode() != LOCAL;
-
- if (log.isDebugEnabled())
- log.debug("Executing distributed query: " + qry);
-
- long reqId = cctx.io().nextIoId();
-
- final GridCacheDistributedFieldsQueryFuture fut =
- new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
-
- try {
- qry.query().validate();
-
- GridCacheQueryRequest req = new GridCacheQueryRequest(
- cctx.cacheId(),
- reqId,
- cctx.name(),
- qry.query().type(),
- true,
- qry.query().clause(),
- null,
- qry.query().limit(),
- null,
- null,
- null,
- qry.reducer(),
- qry.transform(),
- qry.query().pageSize(),
- qry.query().includeBackups(),
- qry.arguments(),
- qry.query().includeMetadata(),
- qry.query().keepBinary(),
- qry.query().taskHash(),
- queryTopologyVersion(),
- null,
- cctx.deploymentEnabled(),
- qry.query().isDataPageScanEnabled());
-
- addQueryFuture(req.id(), fut);
-
- final Object topic = topic(cctx.nodeId(), req.id());
-
- cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
-
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- cctx.io().removeOrderedHandler(false, topic);
- }
- });
-
- sendRequest(fut, req, nodes);
- }
- catch (IgniteCheckedException e) {
- fut.onDone(e);
- }
+ @Override public CacheQueryFuture<?>
queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+ return queryDistributed(qry, nodes, true);
+ }
- return fut;
+ /**
+ * Gets topic for ordered response messages.
+ *
+ * @param nodeId Node ID.
+ * @param reqId Request ID.
+ * @return Topic.
+ */
+ private Object topic(UUID nodeId, long reqId) {
+ return TOPIC_CACHE.topic(TOPIC_PREFIX, nodeId, reqId);
}
/**
* Sends query request.
*
- * @param fut Distributed future.
+ * @param fut Cache query future. {@code null} in case of cancel request.
* @param req Request.
* @param nodes Nodes.
* @throws IgniteCheckedException In case of error.
*/
- private void sendRequest(
- final GridCacheDistributedQueryFuture<?, ?, ?> fut,
- final GridCacheQueryRequest req,
- Collection<ClusterNode> nodes
+ public void sendRequest(
+ @Nullable GridCacheQueryFutureAdapter<?, ?, ?> fut,
+ GridCacheQueryRequest req,
+ Collection<UUID> nodes
) throws IgniteCheckedException {
- assert fut != null;
assert req != null;
assert nodes != null;
- final UUID locNodeId = cctx.localNodeId();
+ UUID locNodeId = cctx.localNodeId();
- ClusterNode locNode = null;
+ boolean loc = false;
- Collection<ClusterNode> rmtNodes = null;
-
- for (ClusterNode n : nodes) {
- if (n.id().equals(locNodeId))
- locNode = n;
+ for (UUID nodeId : nodes) {
+ if (nodeId.equals(locNodeId))
+ loc = true;
else {
- if (rmtNodes == null)
- rmtNodes = new ArrayList<>(nodes.size());
-
- rmtNodes.add(n);
- }
- }
-
- // Request should be sent to remote nodes before the query is
processed on the local node.
- // For example, a remote reducer has a state, we should not serialize
and then send
- // the reducer changed by the local node.
- if (!F.isEmpty(rmtNodes)) {
- for (ClusterNode node : rmtNodes) {
- try {
- cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
- }
- catch (IgniteCheckedException e) {
- if (cctx.io().checkNodeLeft(node.id(), e, true)) {
- fut.onNodeLeft(node.id());
-
- if (fut.isDone())
- return;
- }
- else
- throw e;
- }
+ if (req.cancel())
+ sendNodeCancelRequest(nodeId, req);
+ else if (!sendNodePageRequest(nodeId, req, fut))
+ return;
}
}
- if (locNode != null) {
+ if (loc) {
cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
req.beforeLocalExecution(cctx);
- processQueryRequest(locNodeId, req);
+ processQueryRequest(cctx.localNodeId(), req);
return null;
}
}, GridIoPolicy.QUERY_POOL);
}
}
+ /** */
+ private void sendNodeCancelRequest(UUID nodeId, GridCacheQueryRequest req)
throws IgniteCheckedException {
+ try {
+ cctx.io().send(nodeId, req, GridIoPolicy.QUERY_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ if (cctx.io().checkNodeLeft(nodeId, e, false)) {
Review comment:
Let's save this, it's old behavior. There could be cases, when one
thread tries to request page, another thread concurrently accepts info about
node left. And this affects what we should do, log error or raise an exception.
Let's make this in separate ticket, WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]