anton-vinogradov commented on a change in pull request #9345: URL: https://github.com/apache/ignite/pull/9345#discussion_r823595640
########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ########## @@ -1204,12 +1207,28 @@ public void onReconnected() { } /** - * WAL enabled flag. + * Value returned by this method can be changed runtime by the user or during rebalance. + * + * @return WAL enabled flag. + * @see IgniteCluster#disableWal(String) + * @see IgniteCluster#enableWal(String) */ public boolean walEnabled() { Review comment: Could this be renamed to `walLoggingEnabled` or `localWalEnabled` or similar? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ########## @@ -801,17 +802,32 @@ else if (op == READ) { cctx.mvccCaching().onTxFinished(this, true); - if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) { + boolean walEnabled = false; + + // Log only there are at least one persistent or cdc enabled group. + if (!near() && !F.isEmpty(dataEntries) && cctx.cdcWal() != null) { + for (int i = 0; i < dataEntries.size(); i++) { + CacheGroupContext grpCtx = dataEntries.get(i).get2().context().group(); + + if (grpCtx.walOrCdcEnabled()) { + walEnabled = true; + + break; + } + } + } + + if (walEnabled) { // Set new update counters for data entries received from persisted tx entries. List<DataEntry> entriesWithCounters = dataEntries.stream() .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter())) .collect(Collectors.toList()); - ptr = cctx.wal().log(new DataRecord(entriesWithCounters)); + ptr = cctx.cdcWal().log(new DataRecord(entriesWithCounters)); } if (ptr != null) - cctx.wal().flush(ptr, false); + cctx.cdcWal().flush(ptr, false); Review comment: This is a `wal or if cdc enabled then also the wal`, not a `cdcWal`. `cctx.cdcWal()` looks almost good for me, but we must use ternary operation or something like this to gain wal instance in such code. `wal = cctx.wal() != null ? cctx.wal() : cctx.cdcWal()` and use `wal` variable further at any application code. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ########## @@ -1041,7 +1041,7 @@ public void destroyPartitionStore(int partId) throws IgniteCheckedException { int tag = pageMemory.invalidate(grp.groupId(), partId); - if (grp.walEnabled()) + if (grp.persistenceEnabled() && grp.walEnabled()) Review comment: Could we encapsulate this to the `walEnabled` method? Looks like now everywhere we use this combination, except `CheckpointWorkflow#fillCacheGroupState` which may be the mistake ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java ########## @@ -139,9 +147,18 @@ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setCdcEnabled(true) .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) - .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled)) + .setDataRegionConfigurations( + new DataRegionConfiguration() + .setName("cdc") + .setPersistenceEnabled(persistenceEnabled) + .setCdcEnabled(true), + new DataRegionConfiguration() + .setName("not-cdc") + .setPersistenceEnabled(false) Review comment: Could we check all case matrix? Lets have here `.setPersistenceEnabled(persistenceEnabled)`? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ########## @@ -764,6 +776,13 @@ public IgniteSnapshotManager snapshotMgr() { return walMgr; } + /** + * @return Write ahead log manager. + */ + @Nullable public IgniteWriteAheadLogManager cdcWal() { Review comment: We must assert it's not null here, since we accessing (must) this method only after `walOrCdcEnabled()` check ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.function.BiConsumer; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** Check only {@link DataRecord} written to the WAL for in-memory cache. */ +@RunWith(Parameterized.class) +public class WalForCdcTest extends GridCommonAbstractTest { + /** */ + private static final int RECORD_COUNT = 10; + + /** */ + @Parameterized.Parameter + public CacheMode mode; + + /** */ + @Parameterized.Parameter(1) + public CacheAtomicityMode atomicityMode; + + /** */ + private boolean persistenceEnabled; + + /** */ + @Parameterized.Parameters(name = "mode={0}, atomicityMode={1}") + public static Collection<?> parameters() { + List<Object[]> params = new ArrayList<>(); + + for (CacheMode mode : Arrays.asList(REPLICATED, PARTITIONED)) + for (CacheAtomicityMode atomicityMode : Arrays.asList(ATOMIC, TRANSACTIONAL)) + params.add(new Object[] {mode, atomicityMode}); + + return params; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setWalMode(WALMode.FSYNC) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled) + .setCdcEnabled(true))); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + @Test + public void testOnlyDataRecordWritten() throws Exception { + persistenceEnabled = false; + + doTestWal((ignite, cache) -> { + for (int i = 0; i < RECORD_COUNT; i++) + cache.put(i, i); + }); + } + + /** */ + @Test + public void testWalDisable() throws Exception { + persistenceEnabled = true; Review comment: Inmem case should be tested as well. ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java ########## @@ -75,7 +75,7 @@ public static final int WAL_ARCHIVE_TIMEOUT = 5_000; /** Keys count. */ - public static final int KEYS_CNT = 50; + public static final int KEYS_CNT = 10; Review comment: How may 50 keys freeze the test? What test and why? Let's investigate this. ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java ########## @@ -243,6 +260,7 @@ public void testConflictVersionWritten() throws Exception { new CacheConfiguration<Integer, User>(DEFAULT_CACHE_NAME) .setCacheMode(cacheMode) .setAtomicityMode(atomicityMode) + .setDataRegionName("cdc") Review comment: Is this necessary? Is this checked somewhere and should be equal to "cdc"? If so, lets use const. ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.function.BiConsumer; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** Check only {@link DataRecord} written to the WAL for in-memory cache. */ +@RunWith(Parameterized.class) +public class WalForCdcTest extends GridCommonAbstractTest { + /** */ + private static final int RECORD_COUNT = 10; + + /** */ + @Parameterized.Parameter + public CacheMode mode; + + /** */ + @Parameterized.Parameter(1) + public CacheAtomicityMode atomicityMode; + + /** */ + private boolean persistenceEnabled; + + /** */ + @Parameterized.Parameters(name = "mode={0}, atomicityMode={1}") + public static Collection<?> parameters() { + List<Object[]> params = new ArrayList<>(); + + for (CacheMode mode : Arrays.asList(REPLICATED, PARTITIONED)) + for (CacheAtomicityMode atomicityMode : Arrays.asList(ATOMIC, TRANSACTIONAL)) + params.add(new Object[] {mode, atomicityMode}); + + return params; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setWalMode(WALMode.FSYNC) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled) + .setCdcEnabled(true))); Review comment: Should we check disabled cdc behaviour as well? ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java ########## @@ -105,13 +98,11 @@ cfg.setConsistentId(igniteInstanceName); cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setCdcEnabled(true) .setWalMode(walMode) .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) - .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); - - if (pageSz != 0) - cfg.getDataStorageConfiguration().setPageSize(pageSz); + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled) + .setCdcEnabled(true))); Review comment: Should we check that disabled CDC streams nothing as well? ########## File path: modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.function.BiConsumer; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** Check only {@link DataRecord} written to the WAL for in-memory cache. */ +@RunWith(Parameterized.class) +public class WalForCdcTest extends GridCommonAbstractTest { Review comment: Lets call this test in honor to its aim. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org