x-kreator commented on a change in pull request #7680: URL: https://github.com/apache/ignite/pull/7680#discussion_r411275026
########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest Review comment: Please, rename test class to `QueryCursorSpliteratorCallsTest` and move it to ignite-indexing module because some queries requires that module for correct execution. Suggested package is `org.apache.ignite.internal.processors.query`. Also, you have to add this test class to suitable test suite, for instance `org.apache.ignite.testsuites.IgniteBinaryCacheQueryTestSuite`. ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { Review comment: Please, remove unnecessary `implements Serializable` declaration and following empty line. ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); Review comment: Replace this two code lines with following: ``` cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME) .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) .setIndexedTypes(Integer.class, String.class)); ``` Seems it will enough for configuration of this test. ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull + protected CacheConfiguration<Integer, String> cacheConfiguration( Review comment: This method seems unnecessary, so remove it please. ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull + protected CacheConfiguration<Integer, String> cacheConfiguration( + CacheAtomicityMode cacheAtomicityMode, + CacheMode cacheMode) { + CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(1); + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheScanQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ScanQuery<Object, Object> qry = new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key != null; + } + }); + + try (QueryCursor<?> cur = cache.query(qry)) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheContinuousQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>() + .setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key != null; + } + })) + .setAutoUnsubscribe(true) + .setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> iterable) throws CacheEntryListenerException { + } + }); + + try(QueryCursor<?> cur = cache.query(qry)) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheSqlQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, String> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + try(QueryCursor<?> cur = cache.query(new SqlQuery<>("String", "from String"))) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheSqlFieldQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, String> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + try(QueryCursor<?> cur = cache.query(new SqlFieldsQuery("select _key from String"))) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheTextQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, String> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + try(QueryCursor<?> cur = cache.query(new TextQuery<>("String", "1"))) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + Review comment: Please, remove redundant empty lines before the closing brace. ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull + protected CacheConfiguration<Integer, String> cacheConfiguration( + CacheAtomicityMode cacheAtomicityMode, + CacheMode cacheMode) { + CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(1); + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheScanQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); Review comment: 1. Add the following method at the end of class: ``` /** * Executes query on cache then calls {@link QueryCursor#iterator()} and {@link QueryCursor#spliterator()} sequentially. * * @param qry Query. */ private void doQueryCursorSpliteratorCalls(Query<?> qry) { Ignite client = grid(0); IgniteCache<Object, String> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); try (QueryCursor<?> cur = cache.query(qry)) { cur.iterator(); cur.spliterator(); GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); } } ``` 2. Refactor this and others test methods as similar: ``` doQueryCursorSpliteratorCalls(new ScanQuery<>((key, val) -> key != null)); ``` ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull + protected CacheConfiguration<Integer, String> cacheConfiguration( + CacheAtomicityMode cacheAtomicityMode, + CacheMode cacheMode) { + CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(1); + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheScanQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ScanQuery<Object, Object> qry = new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key != null; + } + }); + + try (QueryCursor<?> cur = cache.query(qry)) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheContinuousQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); Review comment: Please, refactor as shown below: ``` doQueryCursorSpliteratorCalls(new ContinuousQuery<>() .setInitialQuery(new ScanQuery<>((key, val) -> key != null)) .setAutoUnsubscribe(true) .setLocalListener(iterable -> {})); ``` ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryKeyValueSpliterator.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; + +import javax.cache.Cache; +import javax.cache.CacheException; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.function.Consumer; + +/** + * SqlQuery key-value spliterator. + */ +public class QueryKeyValueSpliterator<K, V> implements Spliterator<Cache.Entry<K, V>> { + /** Target spliterator. */ + private final Spliterator<List<?>> spliterator; + + /** + * Constructor. + * + * @param spliterator Target spliterator. + */ + public QueryKeyValueSpliterator(Spliterator<List<?>> spliterator) { + this.spliterator = spliterator; + } + + /** {@inheritDoc} */ + @Override public boolean tryAdvance(Consumer<? super Cache.Entry<K, V>> action) { + return spliterator.tryAdvance(new Consumer<List<?>>() { + @Override Review comment: `@Override` annotation should be at the method's declaration line. ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull + protected CacheConfiguration<Integer, String> cacheConfiguration( + CacheAtomicityMode cacheAtomicityMode, + CacheMode cacheMode) { + CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(1); + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheScanQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ScanQuery<Object, Object> qry = new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key != null; + } + }); + + try (QueryCursor<?> cur = cache.query(qry)) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheContinuousQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>() + .setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key != null; + } + })) + .setAutoUnsubscribe(true) + .setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> iterable) throws CacheEntryListenerException { + } + }); + + try(QueryCursor<?> cur = cache.query(qry)) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheSqlQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); Review comment: Please, refactor as shown below: ``` doQueryCursorSpliteratorCalls(new SqlQuery<>("String", "from String")); ``` ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull + protected CacheConfiguration<Integer, String> cacheConfiguration( + CacheAtomicityMode cacheAtomicityMode, + CacheMode cacheMode) { + CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(1); + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheScanQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ScanQuery<Object, Object> qry = new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ Review comment: It's no matter due to further refactoring but note that `/** {@inheritDoc} */` annotations is not needed on methods of anonymous classes. ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull + protected CacheConfiguration<Integer, String> cacheConfiguration( + CacheAtomicityMode cacheAtomicityMode, + CacheMode cacheMode) { + CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(1); Review comment: Replace with `startGrids(1);` ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") Review comment: This javadoc line seems too long, please move to a new line the tail after "fetched" or "fetched or". Also, if you using the Intellijent IDEA as IDE, it will be convenient to use the Ignite project's code style, see https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines#CodingGuidelines-A.ConfigureIntelliJIDEAcodestyle ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull + protected CacheConfiguration<Integer, String> cacheConfiguration( + CacheAtomicityMode cacheAtomicityMode, + CacheMode cacheMode) { + CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(1); + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheScanQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ScanQuery<Object, Object> qry = new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key != null; + } + }); + + try (QueryCursor<?> cur = cache.query(qry)) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheContinuousQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>() + .setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key != null; + } + })) + .setAutoUnsubscribe(true) + .setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> iterable) throws CacheEntryListenerException { + } + }); + + try(QueryCursor<?> cur = cache.query(qry)) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheSqlQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, String> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + try(QueryCursor<?> cur = cache.query(new SqlQuery<>("String", "from String"))) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheSqlFieldQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, String> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + try(QueryCursor<?> cur = cache.query(new SqlFieldsQuery("select _key from String"))) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheTextQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); Review comment: Please, refactor as shown below: ``` doQueryCursorSpliteratorCalls(new TextQuery<>("String", "1")); ``` ########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheQueryMultipleSpliteratorCallsTest.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import java.io.Serializable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Query cursor spliterator called multiple times without triggering IgniteException("Iterator is already fetched or query was cancelled.") + */ +public class CacheQueryMultipleSpliteratorCallsTest extends GridCommonAbstractTest + implements Serializable { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull + protected CacheConfiguration<Integer, String> cacheConfiguration( + CacheAtomicityMode cacheAtomicityMode, + CacheMode cacheMode) { + CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(1); + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheScanQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ScanQuery<Object, Object> qry = new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key != null; + } + }); + + try (QueryCursor<?> cur = cache.query(qry)) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheContinuousQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>() + .setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key != null; + } + })) + .setAutoUnsubscribe(true) + .setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> iterable) throws CacheEntryListenerException { + } + }); + + try(QueryCursor<?> cur = cache.query(qry)) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheSqlQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); + + IgniteCache<Object, String> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + try(QueryCursor<?> cur = cache.query(new SqlQuery<>("String", "from String"))) { + cur.iterator(); + cur.spliterator(); + + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + + /** + * @throws IgniteException If failed. + */ + @Test + public void testCacheSqlFieldQuerySpliteratorMultipleCalls() throws IgniteException { + Ignite client = grid(0); Review comment: Please, refactor as shown below: ``` doQueryCursorSpliteratorCalls(new SqlFieldsQuery("select _key from String")); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
