anton-vinogradov commented on a change in pull request #9345:
URL: https://github.com/apache/ignite/pull/9345#discussion_r823595640



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
##########
@@ -1204,12 +1207,28 @@ public void onReconnected() {
     }
 
     /**
-     * WAL enabled flag.
+     * Value returned by this method can be changed runtime by the user or 
during rebalance.
+     *
+     * @return WAL enabled flag.
+     * @see IgniteCluster#disableWal(String)
+     * @see IgniteCluster#enableWal(String)
      */
     public boolean walEnabled() {

Review comment:
       Could this be renamed to `walLoggingEnabled` or `localWalEnabled` or 
similar?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
##########
@@ -801,17 +802,32 @@ else if (op == READ) {
 
                         cctx.mvccCaching().onTxFinished(this, true);
 
-                        if (!near() && !F.isEmpty(dataEntries) && cctx.wal() 
!= null) {
+                        boolean walEnabled = false;
+
+                        // Log only there are at least one persistent or cdc 
enabled group.
+                        if (!near() && !F.isEmpty(dataEntries) && 
cctx.cdcWal() != null) {
+                            for (int i = 0; i < dataEntries.size(); i++) {
+                                CacheGroupContext grpCtx = 
dataEntries.get(i).get2().context().group();
+
+                                if (grpCtx.walOrCdcEnabled()) {
+                                    walEnabled = true;
+
+                                    break;
+                                }
+                            }
+                        }
+
+                        if (walEnabled) {
                             // Set new update counters for data entries 
received from persisted tx entries.
                             List<DataEntry> entriesWithCounters = 
dataEntries.stream()
                                 .map(tuple -> 
tuple.get1().partitionCounter(tuple.get2().updateCounter()))
                                 .collect(Collectors.toList());
 
-                            ptr = cctx.wal().log(new 
DataRecord(entriesWithCounters));
+                            ptr = cctx.cdcWal().log(new 
DataRecord(entriesWithCounters));
                         }
 
                         if (ptr != null)
-                            cctx.wal().flush(ptr, false);
+                            cctx.cdcWal().flush(ptr, false);

Review comment:
       This is a `wal or if cdc enabled then also the wal`, not a `cdcWal`.
   `cctx.cdcWal()` looks almost good for me, but we must use ternary operation 
or something like this to gain wal instance in such code.
   
   `wal = cctx.wal() != null ? cctx.wal() : cctx.cdcWal()`
   and use `wal` variable further at any application code.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
##########
@@ -1041,7 +1041,7 @@ public void destroyPartitionStore(int partId) throws 
IgniteCheckedException {
 
         int tag = pageMemory.invalidate(grp.groupId(), partId);
 
-        if (grp.walEnabled())
+        if (grp.persistenceEnabled() && grp.walEnabled())

Review comment:
       Could we encapsulate this to the `walEnabled` method?
   Looks like now everywhere we use this combination, except 
`CheckpointWorkflow#fillCacheGroupState` which may be the mistake

##########
File path: 
modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
##########
@@ -139,9 +147,18 @@
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
-            .setCdcEnabled(true)
             .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
-            .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration().setPersistenceEnabled(true)));
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(persistenceEnabled))
+            .setDataRegionConfigurations(
+                new DataRegionConfiguration()
+                    .setName("cdc")
+                    .setPersistenceEnabled(persistenceEnabled)
+                    .setCdcEnabled(true),
+                new DataRegionConfiguration()
+                    .setName("not-cdc")
+                    .setPersistenceEnabled(false)

Review comment:
       Could we check all case matrix? 
   Lets have here `.setPersistenceEnabled(persistenceEnabled)`?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
##########
@@ -764,6 +776,13 @@ public IgniteSnapshotManager snapshotMgr() {
         return walMgr;
     }
 
+    /**
+     * @return Write ahead log manager.
+     */
+    @Nullable public IgniteWriteAheadLogManager cdcWal() {

Review comment:
       We must assert it's not null here, since we accessing (must) this method 
only after `walOrCdcEnabled()` check

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** Check only {@link DataRecord} written to the WAL for in-memory cache. */
+@RunWith(Parameterized.class)
+public class WalForCdcTest extends GridCommonAbstractTest {
+    /** */
+    private static final int RECORD_COUNT = 10;
+
+    /** */
+    @Parameterized.Parameter
+    public CacheMode mode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** */
+    private boolean persistenceEnabled;
+
+    /** */
+    @Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheMode mode : Arrays.asList(REPLICATED, PARTITIONED))
+            for (CacheAtomicityMode atomicityMode : Arrays.asList(ATOMIC, 
TRANSACTIONAL))
+                params.add(new Object[] {mode, atomicityMode});
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalMode(WALMode.FSYNC)
+            .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(persistenceEnabled)
+                .setCdcEnabled(true)));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testOnlyDataRecordWritten() throws Exception {
+        persistenceEnabled = false;
+
+        doTestWal((ignite, cache) -> {
+            for (int i = 0; i < RECORD_COUNT; i++)
+                cache.put(i, i);
+        });
+    }
+
+    /** */
+    @Test
+    public void testWalDisable() throws Exception {
+        persistenceEnabled = true;

Review comment:
       Inmem case should be tested as well.

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
##########
@@ -75,7 +75,7 @@
     public static final int WAL_ARCHIVE_TIMEOUT = 5_000;
 
     /** Keys count. */
-    public static final int KEYS_CNT = 50;
+    public static final int KEYS_CNT = 10;

Review comment:
       How may 50 keys freeze the test?
   What test and why?
   Let's investigate this.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
##########
@@ -243,6 +260,7 @@ public void testConflictVersionWritten() throws Exception {
             new CacheConfiguration<Integer, User>(DEFAULT_CACHE_NAME)
                 .setCacheMode(cacheMode)
                 .setAtomicityMode(atomicityMode)
+                .setDataRegionName("cdc")

Review comment:
       Is this necessary?
   Is this checked somewhere and should be equal to "cdc"?
   If so, lets use const.

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** Check only {@link DataRecord} written to the WAL for in-memory cache. */
+@RunWith(Parameterized.class)
+public class WalForCdcTest extends GridCommonAbstractTest {
+    /** */
+    private static final int RECORD_COUNT = 10;
+
+    /** */
+    @Parameterized.Parameter
+    public CacheMode mode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** */
+    private boolean persistenceEnabled;
+
+    /** */
+    @Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheMode mode : Arrays.asList(REPLICATED, PARTITIONED))
+            for (CacheAtomicityMode atomicityMode : Arrays.asList(ATOMIC, 
TRANSACTIONAL))
+                params.add(new Object[] {mode, atomicityMode});
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalMode(WALMode.FSYNC)
+            .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(persistenceEnabled)
+                .setCdcEnabled(true)));

Review comment:
       Should we check disabled cdc behaviour as well?

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
##########
@@ -105,13 +98,11 @@
             cfg.setConsistentId(igniteInstanceName);
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
-            .setCdcEnabled(true)
             .setWalMode(walMode)
             .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
-            .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration().setPersistenceEnabled(true)));
-
-        if (pageSz != 0)
-            cfg.getDataStorageConfiguration().setPageSize(pageSz);
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(persistenceEnabled)
+                .setCdcEnabled(true)));

Review comment:
       Should we check that disabled CDC streams nothing as well?

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** Check only {@link DataRecord} written to the WAL for in-memory cache. */
+@RunWith(Parameterized.class)
+public class WalForCdcTest extends GridCommonAbstractTest {

Review comment:
       Lets call this test in honor to its aim.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to