alex-plekhanov commented on code in PR #12087: URL: https://github.com/apache/ignite/pull/12087#discussion_r2151760534
########## modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java: ########## @@ -140,7 +140,7 @@ public void startTracking(TrackableQuery qryInfo) { * @param qryInfo Query info to remove. * @param err Exception if query executed with error. */ - public void stopTracking(TrackableQuery qryInfo, @Nullable Throwable err) { + public synchronized void stopTracking(TrackableQuery qryInfo, @Nullable Throwable err) { Review Comment: qrys is thread safe collection, no synchronization required here. But to avoid double logging you can add somethink like `if (qrys.remove(qryInfo) != null) { ... log ... }`. Synchronization doesn't solve the double logging porblem. ########## modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java: ########## @@ -392,6 +415,102 @@ public void testBigResultDistributed() throws Exception { checkBigResultSet(); } + /** + * Verifies that while a query is not fully fetched, its {@link H2QueryInfo} is kept in {@link HeavyQueriesTracker} + * on all cluster nodes and its {@link H2QueryInfo#isSuspended()} returns {@code true}. Then, once the query is fully + * fetched, its {@link H2QueryInfo} is removed from {@link HeavyQueriesTracker}. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithFullyFetchedIterator() { + Iterator<?> it = queryCursor(false).iterator(); + + checkQryInfoCount(gridCount()); + + H2QueryInfo qry = (H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next(); + + assertTrue(qry.isSuspended()); + + it.forEachRemaining(x -> {}); + } + + /** + * Verifies that when the cursor of a not fully fetched query is closed, its {@link H2QueryInfo} is removed from + * {@link HeavyQueriesTracker} on all cluster nodes. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithClosedCursor() { + FieldsQueryCursor<List<?>> cursor = queryCursor(false); + + cursor.iterator().next(); + + checkQryInfoCount(gridCount()); + + H2QueryInfo qryInfo = (H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next(); + + assertTrue(qryInfo.isSuspended()); + + cursor.close(); + } + + /** + * Verifies that when a not fully fetched query is cancelled, its {@link H2QueryInfo} is removed from + * {@link HeavyQueriesTracker} on all cluster nodes. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithCancelledQuery() { + cancelQuery(runNotFullyFetchedQuery(false)); + } + + /** + * Verifies that when a local not fully fetched query is cancelled, its {@link H2QueryInfo} is removed from + * {@link HeavyQueriesTracker} on all cluster nodes. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithCancelledLocalQuery() { + long qryId = runNotFullyFetchedQuery(true); + + ((IgniteEx)ignite).context().query().cancelLocalQueries(Set.of(qryId)); + } + + /** + * Verifies that when there are multiple not fully fetched queries, and they are cancelled separately, corresponding + * {@link H2QueryInfo} instances are removed from {@link HeavyQueriesTracker} on all cluster nodes. + * */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithMultipleCancelledQueries() { + int qryCnt = 4; + int cnldQryCnt = 2; + + for (int i = 0; i < qryCnt; i++) + runNotFullyFetchedQuery(false); + + for (int i = 0; i < gridCount(); ++i) + assertEquals(qryCnt, heavyQueriesTracker(i).getQueries().size()); + + for (int i = 0; i < cnldQryCnt; i++) + cancelQuery(i + 1); + + for (int i = 0; i < gridCount(); ++i) { + Set<TrackableQuery> qrys = heavyQueriesTracker(i).getQueries(); + + assertEquals(cnldQryCnt, qrys.size()); Review Comment: qryCnt - cnldQryCnt ########## modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java: ########## @@ -392,6 +415,102 @@ public void testBigResultDistributed() throws Exception { checkBigResultSet(); } + /** + * Verifies that while a query is not fully fetched, its {@link H2QueryInfo} is kept in {@link HeavyQueriesTracker} + * on all cluster nodes and its {@link H2QueryInfo#isSuspended()} returns {@code true}. Then, once the query is fully + * fetched, its {@link H2QueryInfo} is removed from {@link HeavyQueriesTracker}. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithFullyFetchedIterator() { + Iterator<?> it = queryCursor(false).iterator(); + + checkQryInfoCount(gridCount()); + + H2QueryInfo qry = (H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next(); + + assertTrue(qry.isSuspended()); + + it.forEachRemaining(x -> {}); + } + + /** + * Verifies that when the cursor of a not fully fetched query is closed, its {@link H2QueryInfo} is removed from + * {@link HeavyQueriesTracker} on all cluster nodes. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithClosedCursor() { + FieldsQueryCursor<List<?>> cursor = queryCursor(false); + + cursor.iterator().next(); + + checkQryInfoCount(gridCount()); + + H2QueryInfo qryInfo = (H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next(); + + assertTrue(qryInfo.isSuspended()); + + cursor.close(); + } + + /** + * Verifies that when a not fully fetched query is cancelled, its {@link H2QueryInfo} is removed from + * {@link HeavyQueriesTracker} on all cluster nodes. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithCancelledQuery() { + cancelQuery(runNotFullyFetchedQuery(false)); + } + + /** + * Verifies that when a local not fully fetched query is cancelled, its {@link H2QueryInfo} is removed from + * {@link HeavyQueriesTracker} on all cluster nodes. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithCancelledLocalQuery() { + long qryId = runNotFullyFetchedQuery(true); + + ((IgniteEx)ignite).context().query().cancelLocalQueries(Set.of(qryId)); + } + + /** + * Verifies that when there are multiple not fully fetched queries, and they are cancelled separately, corresponding + * {@link H2QueryInfo} instances are removed from {@link HeavyQueriesTracker} on all cluster nodes. + * */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithMultipleCancelledQueries() { + int qryCnt = 4; + int cnldQryCnt = 2; + + for (int i = 0; i < qryCnt; i++) + runNotFullyFetchedQuery(false); + + for (int i = 0; i < gridCount(); ++i) + assertEquals(qryCnt, heavyQueriesTracker(i).getQueries().size()); + + for (int i = 0; i < cnldQryCnt; i++) + cancelQuery(i + 1); Review Comment: It's not safe to rely on internals of id generation, let's use ids from heavyQueriesTracker(i).getQueries() ########## modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java: ########## @@ -176,6 +177,8 @@ public void onCancel(ClusterNode node, GridQueryCancelRequest msg) { } nodeRess.cancelRequest(qryReqId); + + h2.heavyQueriesTracker().stopTracking(new H2QueryInfo(node.id(), qryReqId), null); Review Comment: Consider to call stopTracking from MapQueryResult.Result#close. There we have qryInfo and looks like it's safer, in case someone add any new cancel request method. ########## modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java: ########## @@ -392,6 +415,102 @@ public void testBigResultDistributed() throws Exception { checkBigResultSet(); } + /** + * Verifies that while a query is not fully fetched, its {@link H2QueryInfo} is kept in {@link HeavyQueriesTracker} + * on all cluster nodes and its {@link H2QueryInfo#isSuspended()} returns {@code true}. Then, once the query is fully + * fetched, its {@link H2QueryInfo} is removed from {@link HeavyQueriesTracker}. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithFullyFetchedIterator() { + Iterator<?> it = queryCursor(false).iterator(); + + checkQryInfoCount(gridCount()); + + H2QueryInfo qry = (H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next(); + + assertTrue(qry.isSuspended()); + + it.forEachRemaining(x -> {}); + } + + /** + * Verifies that when the cursor of a not fully fetched query is closed, its {@link H2QueryInfo} is removed from + * {@link HeavyQueriesTracker} on all cluster nodes. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithClosedCursor() { + FieldsQueryCursor<List<?>> cursor = queryCursor(false); + + cursor.iterator().next(); + + checkQryInfoCount(gridCount()); + + H2QueryInfo qryInfo = (H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next(); + + assertTrue(qryInfo.isSuspended()); + + cursor.close(); + } + + /** + * Verifies that when a not fully fetched query is cancelled, its {@link H2QueryInfo} is removed from + * {@link HeavyQueriesTracker} on all cluster nodes. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithCancelledQuery() { + cancelQuery(runNotFullyFetchedQuery(false)); + } + + /** + * Verifies that when a local not fully fetched query is cancelled, its {@link H2QueryInfo} is removed from + * {@link HeavyQueriesTracker} on all cluster nodes. + */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithCancelledLocalQuery() { + long qryId = runNotFullyFetchedQuery(true); + + ((IgniteEx)ignite).context().query().cancelLocalQueries(Set.of(qryId)); + } + + /** + * Verifies that when there are multiple not fully fetched queries, and they are cancelled separately, corresponding + * {@link H2QueryInfo} instances are removed from {@link HeavyQueriesTracker} on all cluster nodes. + * */ + @Test + @MultiNodeTest + public void testEmptyHeavyQueriesTrackerWithMultipleCancelledQueries() { + int qryCnt = 4; + int cnldQryCnt = 2; + + for (int i = 0; i < qryCnt; i++) + runNotFullyFetchedQuery(false); + + for (int i = 0; i < gridCount(); ++i) + assertEquals(qryCnt, heavyQueriesTracker(i).getQueries().size()); + + for (int i = 0; i < cnldQryCnt; i++) + cancelQuery(i + 1); + + for (int i = 0; i < gridCount(); ++i) { + Set<TrackableQuery> qrys = heavyQueriesTracker(i).getQueries(); + + assertEquals(cnldQryCnt, qrys.size()); + + assertFalse(qrys.stream().anyMatch(qryInfo -> { + long id = ((H2QueryInfo)qryInfo).queryId(); + + return IntStream.range(0, cnldQryCnt).anyMatch(x -> x == id); Review Comment: Wrong check, you cancel id: `i+1`, but check `i` ########## modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java: ########## @@ -643,7 +795,29 @@ private ListeningTestLogger testLog() { * @return Heavy queries tracker. */ private HeavyQueriesTracker heavyQueriesTracker() { - return ((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker(); + return heavyQueriesTracker(0); + } + + /** */ + private HeavyQueriesTracker heavyQueriesTracker(int idx) { + return ((IgniteH2Indexing)grid(idx).context().query().getIndexing()).heavyQueriesTracker(); + } + + /** */ + private void checkQryInfoCount(int exp) { + int res = 0; + + for (int i = 0; i < gridCount(); i++) { + if (!heavyQueriesTracker(i).getQueries().isEmpty()) + res++; + } Review Comment: res += heavyQueriesTracker(i).getQueries().size(); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org