timoninmaxim commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r649802220



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/PageStream.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Iterator over pages stream. Pages are stored in a queue. After polling a 
queue try load a new page instead of it.
+ */
+class PageStream<R> {
+    /** Queue of data of results pages. */
+    protected final Queue<Collection<R>> queue = new LinkedList<>();
+
+    /** Iterator over current page. */
+    private Iterator<R> iter;
+
+    /**
+     * Dynamic collection of nodes that run this query. If a node finishes 
this query then remove it from this colleciton.
+     */
+    private final Collection<UUID> subgrid;
+
+    /**
+     * List of nodes that respons with cache query result pages. This 
collection should be cleaned before sending new
+     * cache query request.
+     */
+    private final Collection<UUID> rcvd = new HashSet<>();

Review comment:
       The semantic is the same. We still have a `subgrid` even it's a single 
node, and it also has to track `rcvd` to avoid duplicate messages. I think we 
should reuse this code instead of implementing separate logic for single node 
stream, that actually will be pretty the same.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Stream over single node.
+ */
+public class NodePageStream<R> extends PageStream<R> {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private R head;
+
+    /** */
+    protected NodePageStream(GridCacheQueryAdapter qry, Object queueLock, long 
timeoutTime,
+        UUID nodeId, BiConsumer<Collection<UUID>, Boolean> reqPages) {
+        super(qry, queueLock, timeoutTime, new HashSet<>(F.asList(nodeId)), 
reqPages);
+
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return Head of stream, that is last item returned with {@link #next()}.
+     */
+    public R head() {
+        return head;
+    }
+
+    /**
+     * @return Head of stream and then clean it.
+     */
+    public R get() {
+        R ret = head;
+
+        head = null;
+
+        return ret;
+    }

Review comment:
       You're correct. I wanted to separate logic of filling head() and 
cleaning(), but actually there is no need to do it.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
##########
@@ -324,8 +242,8 @@ public void onPage(@Nullable UUID nodeId, @Nullable 
Collection<?> data, @Nullabl
 
         try {
             if (err != null)
-                synchronized (this) {
-                    enqueue(Collections.emptyList());
+                synchronized (lock) {
+                    reducer().onError();

Review comment:
       We still need synchronization there, as it's required to change reducer 
queue atomically with future onDone. If user hands on latch await, we has to 
count down it after future is done. Otherwise we can miss exception, we have 
tests for this logic. Please check this comment:
   
https://github.com/apache/ignite/blob/6b6c32bf1582d50f349f522c04b11d1603049797/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java#L330
   
   We can control future.onDone() within reducer, but I think it isn't 
responsibility of reducer.
   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
##########
@@ -338,55 +256,83 @@ public void onPage(@Nullable UUID nodeId, @Nullable 
Collection<?> data, @Nullabl
                                 "query", qry, true),
                             err));
 
-                    onPage(nodeId, true);
-
-                    notifyAll();
+                    lock.notifyAll();
                 }
             else {
                 if (data == null)
                     data = Collections.emptyList();
 
-                data = dedupIfRequired((Collection<Object>)data);
+                data = dedupIfRequired(data);
+
+                if (qry.query().type() == GridCacheQueryType.TEXT) {
+                    ArrayList unwrapped = new ArrayList();
+
+                    for (Object o: data) {
+                        CacheEntryWithPayload e = (CacheEntryWithPayload) o;
+
+                        Object uKey = CacheObjectUtils.unwrapBinary(
+                            cctx.cacheObjectContext(), e.getKey(), 
qry.query().keepBinary(), true, null);
+
+                        Object uVal = CacheObjectUtils.unwrapBinary(
+                            cctx.cacheObjectContext(), e.getValue(), 
qry.query().keepBinary(), true, null);
 
-                data = cctx.unwrapBinariesIfNeeded((Collection<Object>)data, 
qry.query().keepBinary());
+                        if (uKey != e.getKey() || uVal != e.getValue())
+                            unwrapped.add(new CacheEntryWithPayload<>(uKey, 
uVal, e.payload()));
+                        else
+                            unwrapped.add(o);
+                    }
+
+                    data = unwrapped;
+
+                } else
+                    data = 
cctx.unwrapBinariesIfNeeded((Collection<Object>)data, qry.query().keepBinary());
 
-                synchronized (this) {
-                    enqueue(data);
+                synchronized (lock) {

Review comment:
       Yes, it requires for syncing `queue`, `rcvd` and `fut.isDone` to avoid 
duplicate requests sending for nodes that finish query. Also there is required 
to use shared lock for onError.  

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Stream over single node.
+ */
+public class NodePageStream<R> extends PageStream<R> {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private R head;
+
+    /** */
+    protected NodePageStream(GridCacheQueryAdapter qry, Object queueLock, long 
timeoutTime,
+        UUID nodeId, BiConsumer<Collection<UUID>, Boolean> reqPages) {
+        super(qry, queueLock, timeoutTime, new HashSet<>(F.asList(nodeId)), 
reqPages);
+
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return Head of stream, that is last item returned with {@link #next()}.
+     */
+    public R head() {
+        return head;
+    }
+
+    /**
+     * @return Head of stream and then clean it.
+     */
+    public R get() {
+        R ret = head;
+
+        head = null;
+
+        return ret;
+    }

Review comment:
       This method helps simplify NodePageStream code. Sequence of steps is:
   1. hasNext() to check stream queue;
   2. next() to fill `head`;
   3. head() to use `head` for comparison;
   4. get() to return value and clean `head`.
   
   If we replace get() with next() then next() should also invoke hasNext() 
method, also it's requires to store info about last hasNext() invocation from 
next() to avoid double invokation of hasNext() (steps 1, 4).
   
   So I suggest to stay with this separate method, as it pretty clean.
   
   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -786,40 +720,25 @@ private Object convert(Object obj) {
 
         long reqId = cctx.io().nextIoId();
 
-        final GridCacheDistributedFieldsQueryFuture fut =
-            new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
+        final GridCacheDistributedFieldsQueryFuture fut = new 
GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry);
+
+        initDistributedQuery(reqId, fut, nodes);
+
+        return fut;
+    }
 
+    /** Initialize distributed query: stores future, sends query requests to 
nodes. */
+    private void initDistributedQuery(long reqId, 
GridCacheDistributedQueryFuture fut, Collection<ClusterNode> nodes) {
         try {
-            qry.query().validate();
+            DistributedCacheQueryReducer reducer = 
createReducer(fut.qry.query().type(), reqId, fut, nodes);
 
-            GridCacheQueryRequest req = new GridCacheQueryRequest(
-                cctx.cacheId(),
-                reqId,
-                cctx.name(),
-                qry.query().type(),
-                true,
-                qry.query().clause(),
-                qry.query().limit(),
-                null,
-                null,
-                null,
-                qry.reducer(),
-                qry.transform(),
-                qry.query().pageSize(),
-                qry.query().includeBackups(),
-                qry.arguments(),
-                qry.query().includeMetadata(),
-                qry.query().keepBinary(),
-                qry.query().subjectId(),
-                qry.query().taskHash(),
-                queryTopologyVersion(),
-                null,
-                cctx.deploymentEnabled(),
-                qry.query().isDataPageScanEnabled());
+            fut.reducer(reducer);
 
-            addQueryFuture(req.id(), fut);
+            fut.qry.query().validate();
 
-            final Object topic = topic(cctx.nodeId(), req.id());
+            addQueryFuture(reqId, fut);

Review comment:
       Please check, I've done it, but actually code doesn't look clearer I 
think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to