timoninmaxim commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r685262569
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -777,133 +724,32 @@ private Object convert(Object obj) {
}
/** {@inheritDoc} */
- @Override public CacheQueryFuture<?>
queryFieldsDistributed(GridCacheQueryBean qry,
- Collection<ClusterNode> nodes) {
- assert cctx.config().getCacheMode() != LOCAL;
-
- if (log.isDebugEnabled())
- log.debug("Executing distributed query: " + qry);
-
- long reqId = cctx.io().nextIoId();
-
- final GridCacheDistributedFieldsQueryFuture fut =
- new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
-
- try {
- qry.query().validate();
-
- GridCacheQueryRequest req = new GridCacheQueryRequest(
- cctx.cacheId(),
- reqId,
- cctx.name(),
- qry.query().type(),
- true,
- qry.query().clause(),
- qry.query().limit(),
- null,
- null,
- null,
- qry.reducer(),
- qry.transform(),
- qry.query().pageSize(),
- qry.query().includeBackups(),
- qry.arguments(),
- qry.query().includeMetadata(),
- qry.query().keepBinary(),
- qry.query().subjectId(),
- qry.query().taskHash(),
- queryTopologyVersion(),
- null,
- cctx.deploymentEnabled(),
- qry.query().isDataPageScanEnabled());
-
- addQueryFuture(req.id(), fut);
-
- final Object topic = topic(cctx.nodeId(), req.id());
-
- cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
-
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- cctx.io().removeOrderedHandler(false, topic);
- }
- });
-
- sendRequest(fut, req, nodes);
- }
- catch (IgniteCheckedException e) {
- fut.onDone(e);
- }
-
- return fut;
+ @Override public CacheQueryFuture<?>
queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+ return queryDistributed(qry, nodes, true);
}
- /**
- * Sends query request.
- *
- * @param fut Distributed future.
- * @param req Request.
- * @param nodes Nodes.
- * @throws IgniteCheckedException In case of error.
- */
- private void sendRequest(
- final GridCacheDistributedQueryFuture<?, ?, ?> fut,
- final GridCacheQueryRequest req,
- Collection<ClusterNode> nodes
- ) throws IgniteCheckedException {
- assert fut != null;
- assert req != null;
- assert nodes != null;
-
- final UUID locNodeId = cctx.localNodeId();
-
- ClusterNode locNode = null;
-
- Collection<ClusterNode> rmtNodes = null;
-
- for (ClusterNode n : nodes) {
- if (n.id().equals(locNodeId))
- locNode = n;
- else {
- if (rmtNodes == null)
- rmtNodes = new ArrayList<>(nodes.size());
+ /** Creates a reducer depends on query type. */
+ private DistributedCacheQueryReducer createReducer(GridCacheQueryType
qryType, long reqId,
+ GridCacheDistributedQueryFuture fut, Collection<ClusterNode> nodes) {
- rmtNodes.add(n);
- }
- }
+ if (qryType == TEXT) {
+ Comparator<CacheEntryWithPayload<K, V, Float>> cmp = (c1, c2) -> {
+ if (c1.payload() == c2.payload())
+ return 0;
- // Request should be sent to remote nodes before the query is
processed on the local node.
- // For example, a remote reducer has a state, we should not serialize
and then send
- // the reducer changed by the local node.
- if (!F.isEmpty(rmtNodes)) {
- for (ClusterNode node : rmtNodes) {
- try {
- cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
- }
- catch (IgniteCheckedException e) {
- if (cctx.io().checkNodeLeft(node.id(), e, true)) {
- fut.onNodeLeft(node.id());
+ if (c1.payload() == null)
Review comment:
I'm going to use this reducer in IndexQuery later. It compares rows by
column values, not by score.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]