nizhikov commented on a change in pull request #9398:
URL: https://github.com/apache/ignite/pull/9398#discussion_r716767763



##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
##########
@@ -246,19 +275,34 @@ public void testMultiNodeConsumption() throws Exception {
         IgniteConfiguration cfg1 = ign1.configuration();
         IgniteConfiguration cfg2 = ign2.configuration();
 
-        CdcMain cdc1 = new CdcMain(cfg1, null, cdcConfig(cnsmr1));
-        CdcMain cdc2 = new CdcMain(cfg2, null, cdcConfig(cnsmr2));
+        // Always run CDC with consistent id to ensure instance read data for 
specific node.
+        if (!specificConsistentId) {
+            
cfg1.setConsistentId((Serializable)ign1.localNode().consistentId());
+            
cfg2.setConsistentId((Serializable)ign2.localNode().consistentId());
+        }
+
+        CountDownLatch latch = new CountDownLatch(2);
+
+        GridAbsPredicate sizePredicate1 = sizePredicate(keysCnt[0], 
DEFAULT_CACHE_NAME, UPDATE, cnsmr1);
+        GridAbsPredicate sizePredicate2 = sizePredicate(keysCnt[1], 
DEFAULT_CACHE_NAME, UPDATE, cnsmr2);
+
+        CdcMain cdc1 = createCdc(cnsmr1, cfg1, latch, sizePredicate1);
+        CdcMain cdc2 = createCdc(cnsmr2, cfg2, latch, sizePredicate2);
 
         IgniteInternalFuture<?> fut1 = runAsync(cdc1);
         IgniteInternalFuture<?> fut2 = runAsync(cdc2);
 
         addDataFut.get(getTestTimeout());
 
-        addDataFut = runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2));
+        runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 
2)).get(getTestTimeout());
 
-        addDataFut.get(getTestTimeout());
+        // Wait while predicate will become true and state saved on the disk 
for both cdc.
+        assertTrue(latch.await(getTestTimeout(), MILLISECONDS));
 
-        assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, UPDATE, 
getTestTimeout(), cnsmr1, cnsmr2));
+        long evtsCnt1 = checkMetrics(cdc1, -1);
+        long evtsCnt2 = checkMetrics(cdc2, -1);

Review comment:
       Yes. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to