anton-vinogradov commented on a change in pull request #9398:
URL: https://github.com/apache/ignite/pull/9398#discussion_r716498434
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
##########
@@ -244,30 +260,120 @@ public void runX() throws Exception {
log.info("Ignite node Marshaller [dir=" + marshaller + ']');
}
- injectResources(consumer.consumer());
+ StandaloneGridKernalContext kctx = startStandaloneKernal();
+
+ initMetrics();
- state = new CdcConsumerState(cdcDir.resolve(STATE_DIR));
+ try {
+ kctx.resource().injectGeneric(consumer.consumer());
- initState = state.load();
+ state = createState(cdcDir.resolve(STATE_DIR));
- if (initState != null && log.isInfoEnabled())
- log.info("Initial state loaded [state=" + initState + ']');
+ initState = state.load();
- consumer.start();
+ if (initState != null) {
+ committedSegmentIdx.value(initState.index());
+ committedSegmentOffset.value(initState.fileOffset());
- try {
- consumeWalSegmentsUntilStopped();
+ if (log.isInfoEnabled())
+ log.info("Initial state loaded [state=" + initState +
']');
+ }
+
+ consumer.start(mreg, kctx.metric().registry(metricName("cdc",
"consumer")));
+
+ try {
+ consumeWalSegmentsUntilStopped();
+ }
+ finally {
+ consumer.stop();
+
+ if (log.isInfoEnabled())
+ log.info("Ignite Change Data Capture Application
stopped.");
+ }
}
finally {
- consumer.stop();
+ for (GridComponent comp : kctx)
+ comp.stop(false);
+ }
+ }
+ }
+
+ /** Creates consumer state. */
+ protected CdcConsumerState createState(Path stateDir) {
+ return new CdcConsumerState(stateDir);
+ }
Review comment:
Oneliner with single usage?
Please get rid of it.
##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
##########
@@ -96,29 +183,87 @@ public void addAndWaitForConsumption(
}
/** */
- public boolean waitForSize(
+ public void waitForSize(
int expSz,
String cacheName,
CdcSelfTest.ChangeEventType evtType,
- long timeout,
TestCdcConsumer<?>... cnsmrs
) throws IgniteInterruptedCheckedException {
- return waitForCondition(
- () -> {
- int sum = Arrays.stream(cnsmrs).mapToInt(c ->
F.size(c.data(evtType, cacheId(cacheName)))).sum();
- return sum == expSz;
- },
- timeout);
+ assertTrue(waitForCondition(sizePredicate(expSz, cacheName, evtType,
cnsmrs), getTestTimeout()));
}
/** */
- public CdcConfiguration cdcConfig(CdcConsumer cnsmr) {
- CdcConfiguration cdcCfg = new CdcConfiguration();
+ protected GridAbsPredicate sizePredicate(
+ int expSz,
+ String cacheName,
+ ChangeEventType evtType,
+ TestCdcConsumer<?>... cnsmrs
+ ) {
+ return () -> {
+ int sum = Arrays.stream(cnsmrs).mapToInt(c ->
F.size(c.data(evtType, cacheId(cacheName)))).sum();
+ return sum == expSz;
+ };
+ }
- cdcCfg.setConsumer(cnsmr);
- cdcCfg.setKeepBinary(false);
+ /** */
+ protected long checkMetrics(CdcMain cdc, int expCnt) throws Exception {
+ if (metricExporters() != null) {
+ IgniteConfiguration cfg = getFieldValue(cdc, "igniteCfg");
+
+ DynamicMBean jmxCdcReg =
metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc");
+
+ Function<String, ?> jmxVal = m -> {
+ try {
+ return jmxCdcReg.getAttribute(m);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ };
+
+ checkMetrics(expCnt, (Function<String, Long>)jmxVal,
(Function<String, String>)jmxVal);
+ }
+
+ MetricRegistry mreg = getFieldValue(cdc, "mreg");
+
+ assertNotNull(mreg);
+
+ return checkMetrics(
+ expCnt,
+ m -> mreg.<LongMetric>findMetric(m).value(),
+ m -> mreg.<ObjectMetric<String>>findMetric(m).value()
+ );
+ }
+
+ /** */
+ private long checkMetrics(long expCnt, Function<String, Long> longMetric,
Function<String, String> strMetric) {
+ long committedSegIdx = longMetric.apply(COMMITTED_SEG_IDX);
+ long curSegIdx = longMetric.apply(CUR_SEG_IDX);
+
+ assertTrue(committedSegIdx <= curSegIdx);
- return cdcCfg;
+ assertTrue(longMetric.apply(COMMITTED_SEG_OFFSET) >= 0);
+ assertTrue(longMetric.apply(LAST_SEG_CONSUMPTION_TIME) > 0);
+
+ assertTrue(longMetric.apply(LAST_EVT_TIME) > 0);
+
+ for (String m : new String[] {BINARY_META_DIR, MARSHALLER_DIR,
CDC_DIR})
+ assertTrue(new File(strMetric.apply(m)).exists());
+
+ if (expCnt != -1)
Review comment:
we should always check this, see no reason for such ignor.
##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
##########
@@ -246,19 +275,34 @@ public void testMultiNodeConsumption() throws Exception {
IgniteConfiguration cfg1 = ign1.configuration();
IgniteConfiguration cfg2 = ign2.configuration();
- CdcMain cdc1 = new CdcMain(cfg1, null, cdcConfig(cnsmr1));
- CdcMain cdc2 = new CdcMain(cfg2, null, cdcConfig(cnsmr2));
+ // Always run CDC with consistent id to ensure instance read data for
specific node.
+ if (!specificConsistentId) {
+
cfg1.setConsistentId((Serializable)ign1.localNode().consistentId());
+
cfg2.setConsistentId((Serializable)ign2.localNode().consistentId());
+ }
+
+ CountDownLatch latch = new CountDownLatch(2);
+
+ GridAbsPredicate sizePredicate1 = sizePredicate(keysCnt[0],
DEFAULT_CACHE_NAME, UPDATE, cnsmr1);
+ GridAbsPredicate sizePredicate2 = sizePredicate(keysCnt[1],
DEFAULT_CACHE_NAME, UPDATE, cnsmr2);
+
+ CdcMain cdc1 = createCdc(cnsmr1, cfg1, latch, sizePredicate1);
+ CdcMain cdc2 = createCdc(cnsmr2, cfg2, latch, sizePredicate2);
IgniteInternalFuture<?> fut1 = runAsync(cdc1);
IgniteInternalFuture<?> fut2 = runAsync(cdc2);
addDataFut.get(getTestTimeout());
- addDataFut = runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2));
+ runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT *
2)).get(getTestTimeout());
- addDataFut.get(getTestTimeout());
+ // Wait while predicate will become true and state saved on the disk
for both cdc.
+ assertTrue(latch.await(getTestTimeout(), MILLISECONDS));
- assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, UPDATE,
getTestTimeout(), cnsmr1, cnsmr2));
+ long evtsCnt1 = checkMetrics(cdc1, -1);
+ long evtsCnt2 = checkMetrics(cdc2, -1);
Review comment:
```suggestion
long evtsCnt1 = checkMetrics(cdc1, keysCnt[0]);
long evtsCnt2 = checkMetrics(cdc2, keysCnt[1]);
```
please fix this way other -1s
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]