vldpyatkov commented on a change in pull request #9579:
URL: https://github.com/apache/ignite/pull/9579#discussion_r762357924
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -4569,9 +4569,9 @@ public void awaitLastFut() {
ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> {
IgniteInternalFuture fut0;
- if (ctx.kernalContext().isStopping())
+ if (ctx.kernalContext().isStopping() ||
ctx.gate().isStopped())
fut0 = new GridFinishedFuture<>(
- new IgniteCheckedException("Operation has
been cancelled (node is stopping)."));
+ new IgniteCheckedException("Operation has
been cancelled (node or cache is stopping)."));
Review comment:
I think you can through exception like it is when you invoke Geateway:
```
if (ctx.gate().isStopped())
fut0 = new GridFinishedFuture<>(new CacheStoppedException(ctx.name()));
```
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
##########
@@ -191,4 +215,60 @@ private void
doTestDestroyCacheOperationNotBlockingCheckpointTest(boolean loc) t
fut.get();
}
+
+ /**
+ * Tests correctness of concurrent cache destroy and implicit tx`s.
+ */
+ @Test
+ public void cacheDestroyWithConcImplicitTx() throws Exception {
+ final IgniteEx crd = (IgniteEx)startGridsMultiThreaded(3);
+
+ crd.cluster().state(ClusterState.ACTIVE);
+
+ crd.createCache(new CacheConfiguration(DEFAULT_CACHE_NAME)
+
.setBackups(1).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).setGroupName("test"));
+
+ crd.createCache(new CacheConfiguration(DEFAULT_CACHE_NAME + "_1")
+
.setBackups(1).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).setGroupName("test"));
+
+ Set<Integer> pkeys = new TreeSet<>();
+ try (final IgniteDataStreamer<Object, Object> streamer =
crd.dataStreamer(DEFAULT_CACHE_NAME)) {
+ for (int i = 0; i < 100; i++) {
+ streamer.addData(i, i);
+
+ if
(crd.affinity(DEFAULT_CACHE_NAME).isPrimary(crd.localNode(), i))
+ pkeys.add(i);
+ }
+ }
+
+ TestRecordingCommunicationSpi spi =
TestRecordingCommunicationSpi.spi(crd);
+
+ spi.blockMessages(GridDhtTxPrepareRequest.class,
getTestIgniteInstanceName(1));
+
+ List<IgniteFuture<Boolean>> asyncRmFut = new ArrayList<>(100);
+
+ for (Integer pkey : pkeys)
+ asyncRmFut.add(crd.cache(DEFAULT_CACHE_NAME).removeAsync(pkey));
+
+ spi.blockMessages(GridDhtPartitionsFullMessage.class,
getTestIgniteInstanceName(1));
+
+ IgniteInternalFuture destr = GridTestUtils.runAsync(() ->
grid(1).destroyCache(DEFAULT_CACHE_NAME));
+
+ spi.waitForBlocked();
+
+ spi.stopBlock(true, (msg) -> msg.ioMessage().message() instanceof
GridDhtPartitionsFullMessage);
+
+ spi.stopBlock();
+
+ destr.get();
+
+ assertFalse(GridTestUtils.waitForCondition(() -> G.allGrids().size() <
3, 5_000));
Review comment:
It is an antipattern, because this construction wait predefined time (5
second).
Maybe you would get rid of this assertion if handle an exception from a
remove future.
```
for (IgniteFuture<Boolean> rmFut : asyncRmFut) {
try {
rmFut.get(10_000);
}
catch (Throwable ex) {
assertTrue("Unexpected exception thrown [msg=" + ex.getMessage() +
']', X.hasCause(ex,
"Failed to perform cache operation (cache is stopped): " +
DEFAULT_CACHE_NAME,
CacheStoppedException.class));
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]