This is an automated email from the ASF dual-hosted git repository. irakov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 305e8a0 IGNITE-12101 Fixed NullPointerException when IgniteQueue.removeAll is called. - Fixes #7266. 305e8a0 is described below commit 305e8a0c7797c2ce51ff55bc7396870aae182da3 Author: Slava Koptilin <slava.kopti...@gmail.com> AuthorDate: Fri Jan 17 16:46:56 2020 +0300 IGNITE-12101 Fixed NullPointerException when IgniteQueue.removeAll is called. - Fixes #7266. Signed-off-by: Ivan Rakov <ira...@apache.org> --- .../datastructures/GridCacheQueueAdapter.java | 4 +- .../GridCacheReplicatedQueueRemoveSelfTest.java | 144 +++++++++++++++++++++ .../IgniteCacheDataStructuresSelfTestSuite.java | 2 + 3 files changed, 148 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index daf7b04..e7bde03 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -1045,13 +1045,13 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeGridUuid(out, id); - out.writeLong(idx); + out.writeObject(idx); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { id = U.readGridUuid(in); - idx = in.readLong(); + idx = (Long)in.readObject(); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastructures/GridCacheReplicatedQueueRemoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastructures/GridCacheReplicatedQueueRemoveSelfTest.java new file mode 100644 index 0000000..1541510 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastructures/GridCacheReplicatedQueueRemoveSelfTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.IgniteQueue; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteCollectionAbstractTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.util.io.GridByteArrayInputStream; +import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * Tests for Ignite Queue remove method. + */ +public class GridCacheReplicatedQueueRemoveSelfTest extends IgniteCollectionAbstractTest { + /** */ + public static final int CACHE_SIZE = 1_000; + + /** */ + public static final int THREADS_CNT = 8; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override protected CacheMode collectionCacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode collectionCacheAtomicityMode() { + return TRANSACTIONAL; + } + + /** + * This a unit test of Ignite queue's RemoveProcessor process() method. + * + * @throws Exception If failed. + */ + @Test + @SuppressWarnings("unchecked") + public void testQueueRemovalProcessor() throws Exception { + GridCacheContext cctx = grid(0).context().cache().cache("ignite-sys-cache").context(); + + IgniteUuid id = IgniteUuid.randomUuid(); + + CacheInvokeEntry entry = new CacheInvokeEntry<>(null, null, null, false, + new GridDhtCacheEntry(cctx, null, + new KeyCacheObjectImpl(1, BigInteger.valueOf(1).toByteArray(), 1))); + + entry.setValue(new GridCacheQueueHeader( + id, + 2147483647, + false, + 0L, + 10000L, + Collections.singleton(1L))); + + GridCacheQueueAdapter.RemoveProcessor rp = new GridCacheQueueAdapter.RemoveProcessor(id, 1L); + + rp.process(entry); + + GridCacheQueueAdapter.RemoveProcessor externalRP = new GridCacheQueueAdapter.RemoveProcessor(); + + GridByteArrayOutputStream output = new GridByteArrayOutputStream(); + + rp.writeExternal(new ObjectOutputStream(output)); + + externalRP.readExternal(new ObjectInputStream(new GridByteArrayInputStream(output.toByteArray()))); + + assertEquals(id, GridTestUtils.getFieldValue(externalRP, "id")); + + // idx should be null, cause entry was already removed, see GridCacheQueueAdapter.RemoveProcessor.code + // for more details. + assertNull(GridTestUtils.getFieldValue(externalRP, "idx")); + } + + /** + * Removes buckets of data from the queue in many threads at the same time. + * @throws Exception If failed. + */ + @Test + @SuppressWarnings("unchecked") + public void testQueueRemoval() throws Exception { + IgniteQueue queue = grid(0).queue("SomeQueue", 0, + new CollectionConfiguration().setCollocated(true).setCacheMode(REPLICATED).setAtomicityMode(TRANSACTIONAL)); + + // Populate queue with some data. + for (int i = 0; i < CACHE_SIZE; i++) + queue.add(i); + + CountDownLatch latch = new CountDownLatch(THREADS_CNT); + + // Remove buckets of data from the queue in many threads at the same time. + GridTestUtils.runMultiThreaded(() -> { + ArrayList dataToRmv = new ArrayList(); + + for (int i = 0; i < CACHE_SIZE / 10; i++) + dataToRmv.add(ThreadLocalRandom.current().nextInt(CACHE_SIZE)); + + latch.countDown(); + latch.await(); + + queue.removeAll(dataToRmv); + + return null; + }, THREADS_CNT, "queue-test-worker"); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java index a8b075e..42216ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java @@ -91,6 +91,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.replicated.Ign import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedLockSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedSemaphoreSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest; +import org.apache.ignite.internal.processors.datastructures.GridCacheReplicatedQueueRemoveSelfTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -124,6 +125,7 @@ import org.junit.runners.Suite; IgniteReplicatedSemaphoreSelfTest.class, IgniteReplicatedLockSelfTest.class, IgniteCacheAtomicReplicatedNodeRestartSelfTest.class, + GridCacheReplicatedQueueRemoveSelfTest.class, GridCachePartitionedSequenceApiSelfTest.class, GridCachePartitionedSequenceMultiNodeSelfTest.class,