anton-vinogradov commented on a change in pull request #8909: URL: https://github.com/apache/ignite/pull/8909#discussion_r646379268
########## File path: modules/core/src/main/java/org/apache/ignite/internal/cdc/ChangeDataCapture.java ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cdc; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cdc.ChangeDataCaptureConfiguration; +import org.apache.ignite.cdc.ChangeDataCaptureConsumer; +import org.apache.ignite.cdc.ChangeDataCaptureEvent; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.MarshallerContextImpl; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver; +import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.processors.resource.GridResourceIoc; +import org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector; +import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.resources.SpringApplicationContextResource; +import org.apache.ignite.resources.SpringResource; +import org.apache.ignite.startup.cmdline.ChangeDataCaptureCommandLineStartup; + +import static org.apache.ignite.internal.IgniteKernal.NL; +import static org.apache.ignite.internal.IgniteKernal.SITE; +import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; +import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; + +/** + * Change Data Capture (CDC) application. + * The application runs independently of Ignite node process and provides the ability + * for the {@link ChangeDataCaptureConsumer} to consume events({@link ChangeDataCaptureEvent}) from WAL segments. + * The user should provide {@link ChangeDataCaptureConsumer} implementation with custom consumption logic. + * + * Ignite node should be explicitly configured for using {@link ChangeDataCapture}. + * <ol> + * <li>Set {@link DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)} to true.</li> + * <li>Optional: Set {@link DataStorageConfiguration#setChangeDataCaptureWalPath(String)} to path to the directory + * to store WAL segments for CDC.</li> + * <li>Optional: Set {@link DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout for + * force WAL rollover, so new events will be available for consumptions with the predicted time.</li> + * </ol> + * + * When {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} is true then Ignite node on each WAL segment + * rollover creates hard link to archive WAL segment in + * {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} directory. {@link ChangeDataCapture} application takes + * segment file and consumes events from it. + * After successful consumption (see {@link ChangeDataCaptureConsumer#onEvents(Iterator)}) WAL segment will be deleted + * from directory. + * + * Several Ignite nodes can be started on the same host. + * If your deployment done with custom consistent id then you should specify it via + * {@link IgniteConfiguration#setConsistentId(Serializable)} in provided {@link IgniteConfiguration}. + * + * Application works as follows: + * <ol> + * <li>Search node work directory based on provided {@link IgniteConfiguration}.</li> + * <li>Await for the creation of CDC directory if it not exists.</li> + * <li>Acquire file lock to ensure exclusive consumption.</li> Review comment: Searches, Awaits, Acquires ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/ChangeDataCaptureSelfTest.java ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cdc.ChangeDataCapture; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cdc.AbstractChangeDataCaptureTest.ChangeEventType.DELETE; +import static org.apache.ignite.cdc.AbstractChangeDataCaptureTest.ChangeEventType.UPDATE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +@RunWith(Parameterized.class) +public class ChangeDataCaptureSelfTest extends AbstractChangeDataCaptureTest { + /** */ + public static final String TX_CACHE_NAME = "tx-cache"; + + /** */ + public static final int WAL_ARCHIVE_TIMEOUT = 5_000; + + /** Keys count. */ + public static final int KEYS_CNT = 50; + + /** */ + @Parameterized.Parameter + public boolean specificConsistentId; + + /** */ + @Parameterized.Parameter(1) + public WALMode walMode; + + /** */ + @Parameterized.Parameters(name = "specificConsistentId={0}, walMode={1}") + public static Collection<?> parameters() { + return Arrays.asList(new Object[][] { + {true, WALMode.FSYNC}, + {false, WALMode.FSYNC}, + {true, WALMode.LOG_ONLY}, + {false, WALMode.LOG_ONLY}, + {true, WALMode.BACKGROUND}, + {false, WALMode.BACKGROUND} + }); + } + + /** Consistent id. */ + private UUID consistentId = UUID.randomUUID(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (specificConsistentId) + cfg.setConsistentId(consistentId); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setChangeDataCaptureEnabled(true) + .setWalMode(walMode) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); + + cfg.setCacheConfiguration( + new CacheConfiguration<>(TX_CACHE_NAME).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + ); + + return cfg; + } + + /** Simplest CDC test. */ + @Test + public void testReadAllKeys() throws Exception { + // Read all records from iterator. + readAll(new UserCDCConsumer()); + + // Read one record per call. + readAll(new UserCDCConsumer() { + @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> evts) { + super.onEvents(Collections.singleton(evts.next()).iterator()); + + return false; + } + }); + + // Read one record per call and commit. + readAll(new UserCDCConsumer() { + @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> evts) { + super.onEvents(Collections.singleton(evts.next()).iterator()); + + return true; + } + }); + } + + /** */ + private void readAll(UserCDCConsumer cnsmr) throws Exception { + IgniteConfiguration cfg = getConfiguration("ignite-0"); + + Ignite ign = startGrid(cfg); + + ign.cluster().state(ACTIVE); + + ChangeDataCapture cdc = new ChangeDataCapture(cfg, null, cdcConfig(cnsmr)); + + IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + IgniteCache<Integer, User> txCache = ign.getOrCreateCache(TX_CACHE_NAME); + + addAndWaitForConsumption( + cnsmr, + cdc, + cache, + txCache, + ChangeDataCaptureSelfTest::addData, + 0, + KEYS_CNT * 2, Review comment: why KEYS_CNT * 2? ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/ChangeDataCaptureCacheVersionTest.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cdc.ChangeDataCapture; +import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.plugin.AbstractCachePluginProvider; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +public class ChangeDataCaptureCacheVersionTest extends AbstractChangeDataCaptureTest { + /** */ + public static final String FOR_OTHER_DC_ID = "for-other-dc-id"; + + /** */ + public static final byte DFLT_DC_ID = 1; + + /** */ + public static final byte OTHER_DC_ID = 2; + + /** */ + public static final int KEY_TO_UPD = 42; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setChangeDataCaptureEnabled(true) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); + + cfg.setPluginProviders(new AbstractTestPluginProvider() { + @Override public String name() { + return "ConflictResolverProvider"; + } + + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + if (!ctx.igniteCacheConfiguration().getName().equals(FOR_OTHER_DC_ID)) + return null; + + return new AbstractCachePluginProvider() { + @Override public @Nullable Object createComponent(Class cls) { + if (cls != CacheConflictResolutionManager.class) + return null; + + return new TestCacheConflictResolutionManager(); + } + }; + } + }); + + return cfg; + } + + /** Simplest CDC test with usage of {@link IgniteInternalCache#putAllConflict(Map)}. */ + @Test + public void testReadAllKeysWithOtherDc() throws Exception { Review comment: *fromOtherCluster? please check we use `Cluster` not `DC` in another code. ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/ChangeDataCaptureSelfTest.java ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cdc.ChangeDataCapture; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cdc.AbstractChangeDataCaptureTest.ChangeEventType.DELETE; +import static org.apache.ignite.cdc.AbstractChangeDataCaptureTest.ChangeEventType.UPDATE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +@RunWith(Parameterized.class) +public class ChangeDataCaptureSelfTest extends AbstractChangeDataCaptureTest { + /** */ + public static final String TX_CACHE_NAME = "tx-cache"; + + /** */ + public static final int WAL_ARCHIVE_TIMEOUT = 5_000; + + /** Keys count. */ + public static final int KEYS_CNT = 50; + + /** */ + @Parameterized.Parameter + public boolean specificConsistentId; + + /** */ + @Parameterized.Parameter(1) + public WALMode walMode; + + /** */ + @Parameterized.Parameters(name = "specificConsistentId={0}, walMode={1}") + public static Collection<?> parameters() { + return Arrays.asList(new Object[][] { + {true, WALMode.FSYNC}, + {false, WALMode.FSYNC}, + {true, WALMode.LOG_ONLY}, + {false, WALMode.LOG_ONLY}, + {true, WALMode.BACKGROUND}, + {false, WALMode.BACKGROUND} + }); + } + + /** Consistent id. */ + private UUID consistentId = UUID.randomUUID(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (specificConsistentId) + cfg.setConsistentId(consistentId); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setChangeDataCaptureEnabled(true) + .setWalMode(walMode) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); + + cfg.setCacheConfiguration( + new CacheConfiguration<>(TX_CACHE_NAME).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + ); + + return cfg; + } + + /** Simplest CDC test. */ + @Test + public void testReadAllKeys() throws Exception { + // Read all records from iterator. + readAll(new UserCDCConsumer()); + + // Read one record per call. + readAll(new UserCDCConsumer() { + @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> evts) { + super.onEvents(Collections.singleton(evts.next()).iterator()); + + return false; + } + }); + + // Read one record per call and commit. + readAll(new UserCDCConsumer() { + @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> evts) { + super.onEvents(Collections.singleton(evts.next()).iterator()); + + return true; + } + }); + } + + /** */ + private void readAll(UserCDCConsumer cnsmr) throws Exception { + IgniteConfiguration cfg = getConfiguration("ignite-0"); + + Ignite ign = startGrid(cfg); + + ign.cluster().state(ACTIVE); + + ChangeDataCapture cdc = new ChangeDataCapture(cfg, null, cdcConfig(cnsmr)); + + IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + IgniteCache<Integer, User> txCache = ign.getOrCreateCache(TX_CACHE_NAME); + + addAndWaitForConsumption( + cnsmr, + cdc, + cache, + txCache, + ChangeDataCaptureSelfTest::addData, + 0, + KEYS_CNT * 2, + getTestTimeout() + ); + + removeData(cache, 0, KEYS_CNT); + + IgniteInternalFuture<?> rmvFut = runAsync(cdc); + + assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, getTestTimeout(), cnsmr)); + + rmvFut.cancel(); + + assertTrue(cnsmr.stopped()); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + @Test + public void testReadBeforeGracefulShutdown() throws Exception { + IgniteConfiguration cfg = getConfiguration("ignite-0"); + + Ignite ign = startGrid(cfg); + + ign.cluster().state(ACTIVE); + + CountDownLatch cnsmrStarted = new CountDownLatch(1); + CountDownLatch startProcEvts = new CountDownLatch(1); + + UserCDCConsumer cnsmr = new UserCDCConsumer() { + @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> evts) { + cnsmrStarted.countDown(); + + try { + startProcEvts.await(getTestTimeout(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return super.onEvents(evts); + } + }; + + ChangeDataCapture cdc = new ChangeDataCapture(cfg, null, cdcConfig(cnsmr)); + + runAsync(cdc); + + IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + + addData(cache, 0, KEYS_CNT); + + // Make sure all streamed data will become available for consumption. + Thread.sleep(2 * WAL_ARCHIVE_TIMEOUT); + + cnsmrStarted.await(getTestTimeout(), TimeUnit.MILLISECONDS); + + // Initiate graceful shutdown. + cdc.stop(); + + startProcEvts.countDown(); + + assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, getTestTimeout(), cnsmr)); + assertTrue(waitForCondition(cnsmr::stopped, getTestTimeout())); + + List<Integer> keys = cnsmr.data(UPDATE, cacheId(DEFAULT_CACHE_NAME)); + + assertEquals(KEYS_CNT, keys.size()); + + for (int i = 0; i < KEYS_CNT; i++) + assertTrue(keys.contains(i)); + } + + /** */ + @Test + public void testMultiNodeConsumption() throws Exception { + IgniteEx ign1 = startGrid(0); + + if (specificConsistentId) + consistentId = UUID.randomUUID(); + + IgniteEx ign2 = startGrid(1); + + ign1.cluster().state(ACTIVE); + + IgniteCache<Integer, User> cache = ign1.getOrCreateCache(DEFAULT_CACHE_NAME); + + IgniteInternalFuture<?> addDataFut = runAsync(() -> addData(cache, 0, KEYS_CNT)); Review comment: If this check is about "CDC guarantees data processing even if data processed before the CDC start", a hint should be added here to prevent refactoring later. ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/ChangeDataCaptureSelfTest.java ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cdc.ChangeDataCapture; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cdc.AbstractChangeDataCaptureTest.ChangeEventType.DELETE; +import static org.apache.ignite.cdc.AbstractChangeDataCaptureTest.ChangeEventType.UPDATE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +@RunWith(Parameterized.class) +public class ChangeDataCaptureSelfTest extends AbstractChangeDataCaptureTest { + /** */ + public static final String TX_CACHE_NAME = "tx-cache"; + + /** */ + public static final int WAL_ARCHIVE_TIMEOUT = 5_000; + + /** Keys count. */ + public static final int KEYS_CNT = 50; + + /** */ + @Parameterized.Parameter + public boolean specificConsistentId; + + /** */ + @Parameterized.Parameter(1) + public WALMode walMode; + + /** */ + @Parameterized.Parameters(name = "specificConsistentId={0}, walMode={1}") + public static Collection<?> parameters() { + return Arrays.asList(new Object[][] { + {true, WALMode.FSYNC}, + {false, WALMode.FSYNC}, + {true, WALMode.LOG_ONLY}, + {false, WALMode.LOG_ONLY}, + {true, WALMode.BACKGROUND}, + {false, WALMode.BACKGROUND} + }); + } + + /** Consistent id. */ + private UUID consistentId = UUID.randomUUID(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (specificConsistentId) + cfg.setConsistentId(consistentId); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setChangeDataCaptureEnabled(true) + .setWalMode(walMode) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); + + cfg.setCacheConfiguration( + new CacheConfiguration<>(TX_CACHE_NAME).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + ); + + return cfg; + } + + /** Simplest CDC test. */ + @Test + public void testReadAllKeys() throws Exception { + // Read all records from iterator. + readAll(new UserCDCConsumer()); + + // Read one record per call. + readAll(new UserCDCConsumer() { + @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> evts) { + super.onEvents(Collections.singleton(evts.next()).iterator()); + + return false; + } + }); + + // Read one record per call and commit. + readAll(new UserCDCConsumer() { + @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> evts) { + super.onEvents(Collections.singleton(evts.next()).iterator()); + + return true; + } + }); + } + + /** */ + private void readAll(UserCDCConsumer cnsmr) throws Exception { + IgniteConfiguration cfg = getConfiguration("ignite-0"); + + Ignite ign = startGrid(cfg); + + ign.cluster().state(ACTIVE); + + ChangeDataCapture cdc = new ChangeDataCapture(cfg, null, cdcConfig(cnsmr)); + + IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + IgniteCache<Integer, User> txCache = ign.getOrCreateCache(TX_CACHE_NAME); + + addAndWaitForConsumption( + cnsmr, + cdc, + cache, + txCache, + ChangeDataCaptureSelfTest::addData, + 0, + KEYS_CNT * 2, + getTestTimeout() + ); + + removeData(cache, 0, KEYS_CNT); + + IgniteInternalFuture<?> rmvFut = runAsync(cdc); + + assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, getTestTimeout(), cnsmr)); + + rmvFut.cancel(); + + assertTrue(cnsmr.stopped()); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + @Test + public void testReadBeforeGracefulShutdown() throws Exception { + IgniteConfiguration cfg = getConfiguration("ignite-0"); + + Ignite ign = startGrid(cfg); + + ign.cluster().state(ACTIVE); + + CountDownLatch cnsmrStarted = new CountDownLatch(1); + CountDownLatch startProcEvts = new CountDownLatch(1); + + UserCDCConsumer cnsmr = new UserCDCConsumer() { + @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> evts) { + cnsmrStarted.countDown(); + + try { + startProcEvts.await(getTestTimeout(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return super.onEvents(evts); + } + }; + + ChangeDataCapture cdc = new ChangeDataCapture(cfg, null, cdcConfig(cnsmr)); + + runAsync(cdc); + + IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + + addData(cache, 0, KEYS_CNT); + + // Make sure all streamed data will become available for consumption. + Thread.sleep(2 * WAL_ARCHIVE_TIMEOUT); + + cnsmrStarted.await(getTestTimeout(), TimeUnit.MILLISECONDS); + + // Initiate graceful shutdown. + cdc.stop(); + + startProcEvts.countDown(); + + assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, getTestTimeout(), cnsmr)); + assertTrue(waitForCondition(cnsmr::stopped, getTestTimeout())); + + List<Integer> keys = cnsmr.data(UPDATE, cacheId(DEFAULT_CACHE_NAME)); + + assertEquals(KEYS_CNT, keys.size()); + + for (int i = 0; i < KEYS_CNT; i++) + assertTrue(keys.contains(i)); + } + + /** */ + @Test + public void testMultiNodeConsumption() throws Exception { + IgniteEx ign1 = startGrid(0); + + if (specificConsistentId) + consistentId = UUID.randomUUID(); + + IgniteEx ign2 = startGrid(1); + + ign1.cluster().state(ACTIVE); + + IgniteCache<Integer, User> cache = ign1.getOrCreateCache(DEFAULT_CACHE_NAME); + + IgniteInternalFuture<?> addDataFut = runAsync(() -> addData(cache, 0, KEYS_CNT)); + + UserCDCConsumer cnsmr1 = new UserCDCConsumer(); + UserCDCConsumer cnsmr2 = new UserCDCConsumer(); + + IgniteConfiguration cfg1 = ign1.configuration(); + IgniteConfiguration cfg2 = ign2.configuration(); + + ChangeDataCapture cdc1 = new ChangeDataCapture(cfg1, null, cdcConfig(cnsmr1)); + ChangeDataCapture cdc2 = new ChangeDataCapture(cfg2, null, cdcConfig(cnsmr2)); + + IgniteInternalFuture<?> fut1 = runAsync(cdc1); + IgniteInternalFuture<?> fut2 = runAsync(cdc2); + + addDataFut.get(getTestTimeout()); + + addDataFut = runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2)); + + addDataFut.get(getTestTimeout()); + + assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, UPDATE, getTestTimeout(), cnsmr1, cnsmr2)); + + assertFalse(cnsmr1.stopped()); + assertFalse(cnsmr2.stopped()); + + fut1.cancel(); + fut2.cancel(); + + assertTrue(cnsmr1.stopped()); + assertTrue(cnsmr2.stopped()); + + removeData(cache, 0, KEYS_CNT * 2); + + IgniteInternalFuture<?> rmvFut1 = runAsync(cdc1); + IgniteInternalFuture<?> rmvFut2 = runAsync(cdc2); + + assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, DELETE, getTestTimeout(), cnsmr1, cnsmr2)); + + rmvFut1.cancel(); + rmvFut2.cancel(); + + assertTrue(cnsmr1.stopped()); + assertTrue(cnsmr2.stopped()); + } + + /** */ + @Test + public void testCDCSingleton() throws Exception { Review comment: testCdcSingleton ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/ChangeDataCaptureCacheVersionTest.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cdc.ChangeDataCapture; +import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.plugin.AbstractCachePluginProvider; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +public class ChangeDataCaptureCacheVersionTest extends AbstractChangeDataCaptureTest { + /** */ + public static final String FOR_OTHER_DC_ID = "for-other-dc-id"; + + /** */ + public static final byte DFLT_DC_ID = 1; + + /** */ + public static final byte OTHER_DC_ID = 2; + + /** */ + public static final int KEY_TO_UPD = 42; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setChangeDataCaptureEnabled(true) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); + + cfg.setPluginProviders(new AbstractTestPluginProvider() { + @Override public String name() { + return "ConflictResolverProvider"; + } + + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + if (!ctx.igniteCacheConfiguration().getName().equals(FOR_OTHER_DC_ID)) + return null; + + return new AbstractCachePluginProvider() { + @Override public @Nullable Object createComponent(Class cls) { + if (cls != CacheConflictResolutionManager.class) + return null; + + return new TestCacheConflictResolutionManager(); + } + }; + } + }); + + return cfg; + } + + /** Simplest CDC test with usage of {@link IgniteInternalCache#putAllConflict(Map)}. */ + @Test + public void testReadAllKeysWithOtherDc() throws Exception { + IgniteConfiguration cfg = getConfiguration("ignite-conflict-resolver"); + + IgniteEx ign = startGrid(cfg); + + ign.context().cache().context().versions().dataCenterId(DFLT_DC_ID); + ign.cluster().state(ACTIVE); + + UserCDCConsumer cnsmr = new UserCDCConsumer() { + @Override public void checkEvent(ChangeDataCaptureEvent evt) { + assertEquals(DFLT_DC_ID, evt.version().clusterId()); + assertEquals(OTHER_DC_ID, evt.version().otherClusterVersion().clusterId()); Review comment: *CLUSTER_ID? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
