timoninmaxim commented on a change in pull request #9081: URL: https://github.com/apache/ignite/pull/9081#discussion_r649802220
########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/PageStream.java ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.reducer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Iterator over pages stream. Pages are stored in a queue. After polling a queue try load a new page instead of it. + */ +class PageStream<R> { + /** Queue of data of results pages. */ + protected final Queue<Collection<R>> queue = new LinkedList<>(); + + /** Iterator over current page. */ + private Iterator<R> iter; + + /** + * Dynamic collection of nodes that run this query. If a node finishes this query then remove it from this colleciton. + */ + private final Collection<UUID> subgrid; + + /** + * List of nodes that respons with cache query result pages. This collection should be cleaned before sending new + * cache query request. + */ + private final Collection<UUID> rcvd = new HashSet<>(); Review comment: The semantic is the same. We still have a `subgrid` even it's a single node, and it also has to track `rcvd` to avoid duplicate messages. I think we should reuse this code instead of implementing separate logic for single node stream, that actually will be pretty the same. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.reducer; + +import java.util.Collection; +import java.util.HashSet; +import java.util.UUID; +import java.util.function.BiConsumer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter; +import org.apache.ignite.internal.util.typedef.F; + +/** + * Stream over single node. + */ +public class NodePageStream<R> extends PageStream<R> { + /** */ + private final UUID nodeId; + + /** */ + private R head; + + /** */ + protected NodePageStream(GridCacheQueryAdapter qry, Object queueLock, long timeoutTime, + UUID nodeId, BiConsumer<Collection<UUID>, Boolean> reqPages) { + super(qry, queueLock, timeoutTime, new HashSet<>(F.asList(nodeId)), reqPages); + + this.nodeId = nodeId; + } + + /** + * @return Head of stream, that is last item returned with {@link #next()}. + */ + public R head() { + return head; + } + + /** + * @return Head of stream and then clean it. + */ + public R get() { + R ret = head; + + head = null; + + return ret; + } Review comment: You're correct. I wanted to separate logic of filling head() and cleaning(), but actually there is no need to do it. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ########## @@ -324,8 +242,8 @@ public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data, @Nullabl try { if (err != null) - synchronized (this) { - enqueue(Collections.emptyList()); + synchronized (lock) { + reducer().onError(); Review comment: We still need synchronization there, as it's required to change reducer queue atomically with future onDone. If user hands on latch await, we has to count down it after future is done. Otherwise we can miss exception, we have tests for this logic. Please check this comment: https://github.com/apache/ignite/blob/6b6c32bf1582d50f349f522c04b11d1603049797/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java#L330 We can control future.onDone() within reducer, but I think it isn't responsibility of reducer. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ########## @@ -338,55 +256,83 @@ public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data, @Nullabl "query", qry, true), err)); - onPage(nodeId, true); - - notifyAll(); + lock.notifyAll(); } else { if (data == null) data = Collections.emptyList(); - data = dedupIfRequired((Collection<Object>)data); + data = dedupIfRequired(data); + + if (qry.query().type() == GridCacheQueryType.TEXT) { + ArrayList unwrapped = new ArrayList(); + + for (Object o: data) { + CacheEntryWithPayload e = (CacheEntryWithPayload) o; + + Object uKey = CacheObjectUtils.unwrapBinary( + cctx.cacheObjectContext(), e.getKey(), qry.query().keepBinary(), true, null); + + Object uVal = CacheObjectUtils.unwrapBinary( + cctx.cacheObjectContext(), e.getValue(), qry.query().keepBinary(), true, null); - data = cctx.unwrapBinariesIfNeeded((Collection<Object>)data, qry.query().keepBinary()); + if (uKey != e.getKey() || uVal != e.getValue()) + unwrapped.add(new CacheEntryWithPayload<>(uKey, uVal, e.payload())); + else + unwrapped.add(o); + } + + data = unwrapped; + + } else + data = cctx.unwrapBinariesIfNeeded((Collection<Object>)data, qry.query().keepBinary()); - synchronized (this) { - enqueue(data); + synchronized (lock) { Review comment: Yes, it requires for syncing `queue`, `rcvd` and `fut.isDone` to avoid duplicate requests sending for nodes that finish query. Also there is required to use shared lock for onError. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.reducer; + +import java.util.Collection; +import java.util.HashSet; +import java.util.UUID; +import java.util.function.BiConsumer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter; +import org.apache.ignite.internal.util.typedef.F; + +/** + * Stream over single node. + */ +public class NodePageStream<R> extends PageStream<R> { + /** */ + private final UUID nodeId; + + /** */ + private R head; + + /** */ + protected NodePageStream(GridCacheQueryAdapter qry, Object queueLock, long timeoutTime, + UUID nodeId, BiConsumer<Collection<UUID>, Boolean> reqPages) { + super(qry, queueLock, timeoutTime, new HashSet<>(F.asList(nodeId)), reqPages); + + this.nodeId = nodeId; + } + + /** + * @return Head of stream, that is last item returned with {@link #next()}. + */ + public R head() { + return head; + } + + /** + * @return Head of stream and then clean it. + */ + public R get() { + R ret = head; + + head = null; + + return ret; + } Review comment: This method helps simplify NodePageStream code. Sequence of steps is: 1. hasNext() to check stream queue; 2. next() to fill `head`; 3. head() to use `head` for comparison; 4. get() to return value and clean `head`. If we replace get() with next() then next() should also invoke hasNext() method, also it's requires to store info about last hasNext() invocation from next() to avoid double invokation of hasNext() (steps 1, 4). So I suggest to stay with this separate method, as it pretty clean. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ########## @@ -786,40 +720,25 @@ private Object convert(Object obj) { long reqId = cctx.io().nextIoId(); - final GridCacheDistributedFieldsQueryFuture fut = - new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes); + final GridCacheDistributedFieldsQueryFuture fut = new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry); + + initDistributedQuery(reqId, fut, nodes); + + return fut; + } + /** Initialize distributed query: stores future, sends query requests to nodes. */ + private void initDistributedQuery(long reqId, GridCacheDistributedQueryFuture fut, Collection<ClusterNode> nodes) { try { - qry.query().validate(); + DistributedCacheQueryReducer reducer = createReducer(fut.qry.query().type(), reqId, fut, nodes); - GridCacheQueryRequest req = new GridCacheQueryRequest( - cctx.cacheId(), - reqId, - cctx.name(), - qry.query().type(), - true, - qry.query().clause(), - qry.query().limit(), - null, - null, - null, - qry.reducer(), - qry.transform(), - qry.query().pageSize(), - qry.query().includeBackups(), - qry.arguments(), - qry.query().includeMetadata(), - qry.query().keepBinary(), - qry.query().subjectId(), - qry.query().taskHash(), - queryTopologyVersion(), - null, - cctx.deploymentEnabled(), - qry.query().isDataPageScanEnabled()); + fut.reducer(reducer); - addQueryFuture(req.id(), fut); + fut.qry.query().validate(); - final Object topic = topic(cctx.nodeId(), req.id()); + addQueryFuture(reqId, fut); Review comment: Please check, I've done it, but actually code doesn't look clearer I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
