anton-vinogradov commented on a change in pull request #49:
URL: https://github.com/apache/ignite-extensions/pull/49#discussion_r654342271



##########
File path: 
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * This class implements simple conflict resolution algorithm.
+ * Algorithm decides which version of the entry should be used "new" or "old".
+ * The following steps performed:
+ * <ul>
+ *     <li>If entry is freshly created then new version used - {@link 
GridCacheVersionedEntryEx#isStartVersion()}.</li>
+ *     <li>If change made in this cluster then new version used - {@link 
GridCacheVersionedEntryEx#dataCenterId()}.</li>
+ *     <li>If cluster of new entry equal to cluster of old entry
+ *     then entry with the greater {@link GridCacheVersionedEntryEx#order()} 
used.</li>
+ *     <li>If {@link #conflictResolveField} provided and field of new entry 
greater then new version used.</li>
+ *     <li>If {@link #conflictResolveField} provided and field of old entry 
greater then old version used.</li>
+ *     <li>Conflict can't be resolved. Update ignored. Old version used.</li>
+ * </ul>
+ */
+public class CacheVersionConflictResolverImpl implements 
CacheVersionConflictResolver {
+    /**
+     * Cluster id.
+     */
+    private final byte clusterId;
+
+    /**
+     * Field for conflict resolve.
+     * Value of this field will be used to compare two entries in case of 
conflicting changes.
+     * values of this field must implement {@link Comparable} interface.
+     * <pre><i>Note, value of this field used to resolve conflict for external 
updates only.</i>
+     *
+     * @see CacheVersionConflictResolverImpl
+     */
+    private final String conflictResolveField;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** If {@code true} then conflict resolving with the value field enabled. 
*/
+    private boolean conflictResolveFieldEnabled;
+
+    /**
+     * @param clusterId Data center id.
+     * @param conflictResolveField Field to resolve conflicts.
+     * @param log Logger.
+     */
+    public CacheVersionConflictResolverImpl(byte clusterId, String 
conflictResolveField, IgniteLogger log) {
+        this.clusterId = clusterId;
+        this.conflictResolveField = conflictResolveField;
+        this.log = log;
+
+        conflictResolveFieldEnabled = conflictResolveField != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> GridCacheVersionConflictContext<K, V> resolve(
+        CacheObjectValueContext ctx,
+        GridCacheVersionedEntryEx<K, V> oldEntry,
+        GridCacheVersionedEntryEx<K, V> newEntry,
+        boolean atomicVerComparator
+    ) {
+        GridCacheVersionConflictContext<K, V> res = new 
GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
+
+        if (isUseNew(ctx, oldEntry, newEntry))
+            res.useNew();
+        else
+            res.useOld();
+
+        return res;
+    }
+
+    /**
+     * @param ctx Context.
+     * @param oldEntry Old entry.
+     * @param newEntry New entry.
+     * @param <K> Key type.
+     * @param <V> Key type.
+     * @return {@code True} is should use new entry.
+     */
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <K, V> boolean isUseNew(
+        CacheObjectValueContext ctx,
+        GridCacheVersionedEntryEx<K, V> oldEntry,
+        GridCacheVersionedEntryEx<K, V> newEntry
+    ) {
+        if (newEntry.dataCenterId() == clusterId) // Update made on the local 
cluster always win.
+            return true;
+
+        if (oldEntry.isStartVersion()) // New entry.
+            return true;
+
+        if (oldEntry.dataCenterId() == newEntry.dataCenterId())
+            return newEntry.version().compareTo(oldEntry.version()) > 0; // 
New version from the same cluster.
+
+        if (conflictResolveFieldEnabled) {
+            Object oldVal = oldEntry.value(ctx);
+            Object newVal = newEntry.value(ctx);
+
+            if (oldVal != null && newVal != null) {
+                Comparable oldResolveField;
+                Comparable newResolveField;
+
+                try {
+                    if (oldVal instanceof BinaryObject) {
+                        oldResolveField = 
((BinaryObject)oldVal).field(conflictResolveField);
+                        newResolveField = 
((BinaryObject)newVal).field(conflictResolveField);
+                    }
+                    else {
+                        oldResolveField = U.field(oldVal, 
conflictResolveField);
+                        newResolveField = U.field(newVal, 
conflictResolveField);
+                    }
+
+                    return oldResolveField.compareTo(newResolveField) < 0;
+                }
+                catch (Exception e) {
+                    log.error(
+                        "Error while resolving replication conflict. [field=" 
+ conflictResolveField + ", key=" + newEntry.key() + ']',
+                        e
+                    );
+                }
+            }
+        }
+
+        log.error("Conflict can't be resolved, update ignored [key=" + 
newEntry.key() + ", fromCluster=" + newEntry.dataCenterId()
+            + ", toCluster=" + oldEntry.dataCenterId() + ", 
conflictResolveField=" + conflictResolveField + ']');

Review comment:
       conflictResolveField is absent here?

##########
File path: 
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * This class implements simple conflict resolution algorithm.
+ * Algorithm decides which version of the entry should be used "new" or "old".
+ * The following steps performed:
+ * <ul>
+ *     <li>If entry is freshly created then new version used - {@link 
GridCacheVersionedEntryEx#isStartVersion()}.</li>
+ *     <li>If change made in this cluster then new version used - {@link 
GridCacheVersionedEntryEx#dataCenterId()}.</li>
+ *     <li>If cluster of new entry equal to cluster of old entry
+ *     then entry with the greater {@link GridCacheVersionedEntryEx#order()} 
used.</li>
+ *     <li>If {@link #conflictResolveField} provided and field of new entry 
greater then new version used.</li>
+ *     <li>If {@link #conflictResolveField} provided and field of old entry 
greater then old version used.</li>
+ *     <li>Conflict can't be resolved. Update ignored. Old version used.</li>
+ * </ul>
+ */
+public class CacheVersionConflictResolverImpl implements 
CacheVersionConflictResolver {
+    /**
+     * Cluster id.
+     */
+    private final byte clusterId;
+
+    /**
+     * Field for conflict resolve.
+     * Value of this field will be used to compare two entries in case of 
conflicting changes.
+     * values of this field must implement {@link Comparable} interface.
+     * <pre><i>Note, value of this field used to resolve conflict for external 
updates only.</i>
+     *
+     * @see CacheVersionConflictResolverImpl
+     */
+    private final String conflictResolveField;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** If {@code true} then conflict resolving with the value field enabled. 
*/
+    private boolean conflictResolveFieldEnabled;
+
+    /**
+     * @param clusterId Data center id.
+     * @param conflictResolveField Field to resolve conflicts.
+     * @param log Logger.
+     */
+    public CacheVersionConflictResolverImpl(byte clusterId, String 
conflictResolveField, IgniteLogger log) {
+        this.clusterId = clusterId;
+        this.conflictResolveField = conflictResolveField;
+        this.log = log;
+
+        conflictResolveFieldEnabled = conflictResolveField != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> GridCacheVersionConflictContext<K, V> resolve(
+        CacheObjectValueContext ctx,
+        GridCacheVersionedEntryEx<K, V> oldEntry,
+        GridCacheVersionedEntryEx<K, V> newEntry,
+        boolean atomicVerComparator
+    ) {
+        GridCacheVersionConflictContext<K, V> res = new 
GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
+
+        if (isUseNew(ctx, oldEntry, newEntry))
+            res.useNew();
+        else
+            res.useOld();
+
+        return res;
+    }
+
+    /**
+     * @param ctx Context.
+     * @param oldEntry Old entry.
+     * @param newEntry New entry.
+     * @param <K> Key type.
+     * @param <V> Key type.
+     * @return {@code True} is should use new entry.
+     */
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <K, V> boolean isUseNew(
+        CacheObjectValueContext ctx,
+        GridCacheVersionedEntryEx<K, V> oldEntry,
+        GridCacheVersionedEntryEx<K, V> newEntry
+    ) {
+        if (newEntry.dataCenterId() == clusterId) // Update made on the local 
cluster always win.
+            return true;
+
+        if (oldEntry.isStartVersion()) // New entry.
+            return true;
+
+        if (oldEntry.dataCenterId() == newEntry.dataCenterId())
+            return newEntry.version().compareTo(oldEntry.version()) > 0; // 
New version from the same cluster.
+
+        if (conflictResolveFieldEnabled) {
+            Object oldVal = oldEntry.value(ctx);
+            Object newVal = newEntry.value(ctx);
+
+            if (oldVal != null && newVal != null) {
+                Comparable oldResolveField;
+                Comparable newResolveField;
+
+                try {
+                    if (oldVal instanceof BinaryObject) {
+                        oldResolveField = 
((BinaryObject)oldVal).field(conflictResolveField);
+                        newResolveField = 
((BinaryObject)newVal).field(conflictResolveField);
+                    }
+                    else {
+                        oldResolveField = U.field(oldVal, 
conflictResolveField);
+                        newResolveField = U.field(newVal, 
conflictResolveField);
+                    }
+
+                    return oldResolveField.compareTo(newResolveField) < 0;
+                }
+                catch (Exception e) {
+                    log.error(
+                        "Error while resolving replication conflict. [field=" 
+ conflictResolveField + ", key=" + newEntry.key() + ']',
+                        e
+                    );
+                }
+            }
+        }
+
+        log.error("Conflict can't be resolved, update ignored [key=" + 
newEntry.key() + ", fromCluster=" + newEntry.dataCenterId()

Review comment:
       Some hints of how to fix this issue should be provided.
   1) Fix this by manual update
   or
   2) Use conflictResolveField

##########
File path: 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryVersion;
+import 
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Cache conflict operations test.
+ */
+@RunWith(Parameterized.class)
+public class CacheConflictOperationsTest extends GridCommonAbstractTest {
+    /** Cache mode. */
+    @Parameterized.Parameter
+    public CacheAtomicityMode cacheMode;
+
+    /** Other cluster id. */
+    @Parameterized.Parameter(1)
+    public byte otherClusterId;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, 
THIRD_CLUSTER_ID})
+                params.add(new Object[] {mode, otherClusterId});
+
+        return params;
+    }
+
+    /** */
+    private static IgniteCache<String, ConflictResolvableTestData> cache;
+
+    /** */
+    private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+
+    /** */
+    private static IgniteEx client;
+
+    /** */
+    private static final byte FIRST_CLUSTER_ID = 1;
+
+    /** */
+    private static final byte SECOND_CLUSTER_ID = 2;
+
+    /** */
+    private static final byte THIRD_CLUSTER_ID = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        CacheVersionConflictResolverPluginProvider<?> pluginCfg = new 
CacheVersionConflictResolverPluginProvider<>();
+
+        pluginCfg.setClusterId(SECOND_CLUSTER_ID);
+        pluginCfg.setCaches(new 
HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
+        pluginCfg.setConflictResolveField(conflictResolveField());
+
+        return 
super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(1);
+
+        client = startClientGrid(2);
+
+        cache = client.createCache(new CacheConfiguration<String, 
ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
+        cachex = client.cachex(DEFAULT_CACHE_NAME);
+    }
+
+    /** Tests that regular cache operations works with the conflict resolver 
when there is no update conflicts. */
+    @Test
+    public void testSimpleUpdates() {
+        String key = "UpdatesWithoutConflict";
+
+        put(key);
+        put(key);
+
+        remove(key);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
+        String key = key("UpdateFromOtherClusterWithoutConflict", 
otherClusterId);
+
+        putConflict(key, 1, true);
+
+        putConflict(key, 2, true);
+
+        removeConflict(key, 3, true);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesReorderFromOtherCluster() throws Exception {
+        String key = key("UpdateClusterUpdateReorder", otherClusterId);
+
+        putConflict(key, 2, true);
+
+        // Update with the equal or lower order should fail.
+        putConflict(key, 2, false);
+        putConflict(key, 1, false);
+
+        // Remove with the equal or lower order should fail.
+        removeConflict(key, 2, false);
+        removeConflict(key, 1, false);
+
+        // Remove with the higher order should succeed.
+        putConflict(key, 3, true);
+
+        key = key("UpdateClusterUpdateReorder2", otherClusterId);
+
+        int order = 1;
+
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
true);
+
+        // Update with the equal or lower topVer should fail.
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the equal or lower topVer should fail.
+        removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the higher topVer should succeed.
+        putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), 
true);
+
+        key = key("UpdateClusterUpdateReorder3", otherClusterId);
+
+        int topVer = 1;
+
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), true);
+
+        // Update with the equal or lower nodeOrder should fail.
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        putConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the equal or lower nodeOrder should fail.
+        removeConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        removeConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the higher nodeOrder should succeed.
+        putConflict(key, new GridCacheVersion(topVer, order, 3, 
otherClusterId), true);
+    }
+
+    /** Tests cache operations for entry replicated from another cluster. */
+    @Test
+    public void testUpdatesConflict() throws Exception {
+        String key = key("UpdateThisClusterConflict0", otherClusterId);
+
+        putConflict(key, 1, true);
+
+        // Local remove for other cluster entry should succeed.
+        remove(key);
+
+        // Conflict replicated update should fail.
+        putConflict(key, 2, false);
+
+        key = key("UpdateThisDCConflict1", otherClusterId);
+
+        putConflict(key, 3, true);
+
+        // Local update for other cluster entry should succeed.
+        put(key);
+
+        key = key("UpdateThisDCConflict2", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated remove should fail.
+        removeConflict(key, 4, false);
+
+        key = key("UpdateThisDCConflict3", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated update succeed only if resolved by field.
+        putConflict(key, 5, conflictResolveField() != null);
+    }
+
+    /** */
+    private void put(String key) {
+        ConflictResolvableTestData newVal = 
ConflictResolvableTestData.create();
+
+        CacheEntry<String, ConflictResolvableTestData> oldEntry = 
cache.getEntry(key);
+
+        cache.put(key, newVal);
+
+        CacheEntry<String, ConflictResolvableTestData> newEntry = 
cache.getEntry(key);
+
+        
assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion());
+        assertEquals(newVal, cache.get(key));
+
+        if (oldEntry != null)
+            assertTrue(((CacheEntryVersion)oldEntry.version()).order() < 
((CacheEntryVersion)newEntry.version()).order());
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, long order, boolean success) throws 
IgniteCheckedException {
+        putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), 
success);
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, GridCacheVersion newVer, boolean 
success) throws IgniteCheckedException {
+        CacheEntry<String, ConflictResolvableTestData> oldVal = 
cache.getEntry(k);
+        ConflictResolvableTestData newVal = 
ConflictResolvableTestData.create();
+
+        KeyCacheObject key = new KeyCacheObjectImpl(k, null, 
cachex.context().affinity().partition(k));
+        CacheObject val = new 
CacheObjectImpl(client.binary().toBinary(newVal), null);
+
+        cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, 
newVer)));
+
+        if (success) {
+            assertEquals(newVer, 
((CacheEntryVersion)cache.getEntry(k).version()).otherClusterVersion());
+            assertEquals(newVal, cache.get(k));
+        } else if (oldVal != null) {
+            assertEquals(oldVal.getValue(), cache.get(k));
+            assertEquals(oldVal.version(), cache.getEntry(k).version());
+        }
+    }
+
+    /** */
+    private void remove(String key) {
+        cache.remove(key);
+
+        assertFalse(cache.containsKey(key));
+    }
+
+    /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. 
*/
+    private void removeConflict(String k, long order, boolean success) throws 
IgniteCheckedException {
+        removeConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), 
success);
+    }
+
+    /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. 
*/
+    private void removeConflict(String k, GridCacheVersion ver, boolean 
success) throws IgniteCheckedException {

Review comment:
       assertion we have the value before the remove will be useful.

##########
File path: 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryVersion;
+import 
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Cache conflict operations test.
+ */
+@RunWith(Parameterized.class)
+public class CacheConflictOperationsTest extends GridCommonAbstractTest {
+    /** Cache mode. */
+    @Parameterized.Parameter
+    public CacheAtomicityMode cacheMode;
+
+    /** Other cluster id. */
+    @Parameterized.Parameter(1)
+    public byte otherClusterId;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, 
THIRD_CLUSTER_ID})
+                params.add(new Object[] {mode, otherClusterId});
+
+        return params;
+    }
+
+    /** */
+    private static IgniteCache<String, ConflictResolvableTestData> cache;
+
+    /** */
+    private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+
+    /** */
+    private static IgniteEx client;
+
+    /** */
+    private static final byte FIRST_CLUSTER_ID = 1;
+
+    /** */
+    private static final byte SECOND_CLUSTER_ID = 2;
+
+    /** */
+    private static final byte THIRD_CLUSTER_ID = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        CacheVersionConflictResolverPluginProvider<?> pluginCfg = new 
CacheVersionConflictResolverPluginProvider<>();
+
+        pluginCfg.setClusterId(SECOND_CLUSTER_ID);
+        pluginCfg.setCaches(new 
HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
+        pluginCfg.setConflictResolveField(conflictResolveField());
+
+        return 
super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(1);
+
+        client = startClientGrid(2);
+
+        cache = client.createCache(new CacheConfiguration<String, 
ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
+        cachex = client.cachex(DEFAULT_CACHE_NAME);
+    }
+
+    /** Tests that regular cache operations works with the conflict resolver 
when there is no update conflicts. */
+    @Test
+    public void testSimpleUpdates() {
+        String key = "UpdatesWithoutConflict";
+
+        put(key);
+        put(key);
+
+        remove(key);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
+        String key = key("UpdateFromOtherClusterWithoutConflict", 
otherClusterId);
+
+        putConflict(key, 1, true);
+
+        putConflict(key, 2, true);
+
+        removeConflict(key, 3, true);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesReorderFromOtherCluster() throws Exception {
+        String key = key("UpdateClusterUpdateReorder", otherClusterId);
+
+        putConflict(key, 2, true);
+
+        // Update with the equal or lower order should fail.
+        putConflict(key, 2, false);
+        putConflict(key, 1, false);
+
+        // Remove with the equal or lower order should fail.
+        removeConflict(key, 2, false);
+        removeConflict(key, 1, false);
+
+        // Remove with the higher order should succeed.
+        putConflict(key, 3, true);
+
+        key = key("UpdateClusterUpdateReorder2", otherClusterId);
+
+        int order = 1;
+
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
true);
+
+        // Update with the equal or lower topVer should fail.
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the equal or lower topVer should fail.
+        removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the higher topVer should succeed.
+        putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), 
true);
+
+        key = key("UpdateClusterUpdateReorder3", otherClusterId);
+
+        int topVer = 1;
+
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), true);
+
+        // Update with the equal or lower nodeOrder should fail.
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        putConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the equal or lower nodeOrder should fail.
+        removeConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        removeConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the higher nodeOrder should succeed.
+        putConflict(key, new GridCacheVersion(topVer, order, 3, 
otherClusterId), true);
+    }
+
+    /** Tests cache operations for entry replicated from another cluster. */
+    @Test
+    public void testUpdatesConflict() throws Exception {
+        String key = key("UpdateThisClusterConflict0", otherClusterId);
+
+        putConflict(key, 1, true);
+
+        // Local remove for other cluster entry should succeed.
+        remove(key);
+
+        // Conflict replicated update should fail.
+        putConflict(key, 2, false);
+
+        key = key("UpdateThisDCConflict1", otherClusterId);
+
+        putConflict(key, 3, true);
+
+        // Local update for other cluster entry should succeed.
+        put(key);
+
+        key = key("UpdateThisDCConflict2", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated remove should fail.
+        removeConflict(key, 4, false);
+
+        key = key("UpdateThisDCConflict3", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated update succeed only if resolved by field.
+        putConflict(key, 5, conflictResolveField() != null);
+    }
+
+    /** */
+    private void put(String key) {
+        ConflictResolvableTestData newVal = 
ConflictResolvableTestData.create();
+
+        CacheEntry<String, ConflictResolvableTestData> oldEntry = 
cache.getEntry(key);
+
+        cache.put(key, newVal);
+
+        CacheEntry<String, ConflictResolvableTestData> newEntry = 
cache.getEntry(key);
+
+        
assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion());
+        assertEquals(newVal, cache.get(key));
+
+        if (oldEntry != null)
+            assertTrue(((CacheEntryVersion)oldEntry.version()).order() < 
((CacheEntryVersion)newEntry.version()).order());
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, long order, boolean success) throws 
IgniteCheckedException {
+        putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), 
success);
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, GridCacheVersion newVer, boolean 
success) throws IgniteCheckedException {
+        CacheEntry<String, ConflictResolvableTestData> oldVal = 
cache.getEntry(k);
+        ConflictResolvableTestData newVal = 
ConflictResolvableTestData.create();
+
+        KeyCacheObject key = new KeyCacheObjectImpl(k, null, 
cachex.context().affinity().partition(k));
+        CacheObject val = new 
CacheObjectImpl(client.binary().toBinary(newVal), null);
+
+        cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, 
newVer)));
+
+        if (success) {
+            assertEquals(newVer, 
((CacheEntryVersion)cache.getEntry(k).version()).otherClusterVersion());
+            assertEquals(newVal, cache.get(k));
+        } else if (oldVal != null) {
+            assertEquals(oldVal.getValue(), cache.get(k));
+            assertEquals(oldVal.version(), cache.getEntry(k).version());
+        }
+    }
+
+    /** */
+    private void remove(String key) {
+        cache.remove(key);
+
+        assertFalse(cache.containsKey(key));
+    }
+
+    /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. 
*/
+    private void removeConflict(String k, long order, boolean success) throws 
IgniteCheckedException {
+        removeConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), 
success);
+    }
+
+    /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. 
*/
+    private void removeConflict(String k, GridCacheVersion ver, boolean 
success) throws IgniteCheckedException {
+        CacheEntry<String, ConflictResolvableTestData> oldVal = 
cache.getEntry(k);

Review comment:
       oldEntry?

##########
File path: 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryVersion;
+import 
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Cache conflict operations test.
+ */
+@RunWith(Parameterized.class)
+public class CacheConflictOperationsTest extends GridCommonAbstractTest {
+    /** Cache mode. */
+    @Parameterized.Parameter
+    public CacheAtomicityMode cacheMode;
+
+    /** Other cluster id. */
+    @Parameterized.Parameter(1)
+    public byte otherClusterId;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, 
THIRD_CLUSTER_ID})
+                params.add(new Object[] {mode, otherClusterId});
+
+        return params;
+    }
+
+    /** */
+    private static IgniteCache<String, ConflictResolvableTestData> cache;
+
+    /** */
+    private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+
+    /** */
+    private static IgniteEx client;
+
+    /** */
+    private static final byte FIRST_CLUSTER_ID = 1;
+
+    /** */
+    private static final byte SECOND_CLUSTER_ID = 2;
+
+    /** */
+    private static final byte THIRD_CLUSTER_ID = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        CacheVersionConflictResolverPluginProvider<?> pluginCfg = new 
CacheVersionConflictResolverPluginProvider<>();
+
+        pluginCfg.setClusterId(SECOND_CLUSTER_ID);
+        pluginCfg.setCaches(new 
HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
+        pluginCfg.setConflictResolveField(conflictResolveField());
+
+        return 
super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(1);
+
+        client = startClientGrid(2);
+
+        cache = client.createCache(new CacheConfiguration<String, 
ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
+        cachex = client.cachex(DEFAULT_CACHE_NAME);
+    }
+
+    /** Tests that regular cache operations works with the conflict resolver 
when there is no update conflicts. */
+    @Test
+    public void testSimpleUpdates() {
+        String key = "UpdatesWithoutConflict";
+
+        put(key);
+        put(key);
+
+        remove(key);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
+        String key = key("UpdateFromOtherClusterWithoutConflict", 
otherClusterId);
+
+        putConflict(key, 1, true);
+
+        putConflict(key, 2, true);
+
+        removeConflict(key, 3, true);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesReorderFromOtherCluster() throws Exception {
+        String key = key("UpdateClusterUpdateReorder", otherClusterId);
+
+        putConflict(key, 2, true);
+
+        // Update with the equal or lower order should fail.
+        putConflict(key, 2, false);
+        putConflict(key, 1, false);
+
+        // Remove with the equal or lower order should fail.

Review comment:
       should be ignored?
   
   applicable for other comments

##########
File path: 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryVersion;
+import 
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Cache conflict operations test.
+ */
+@RunWith(Parameterized.class)
+public class CacheConflictOperationsTest extends GridCommonAbstractTest {
+    /** Cache mode. */
+    @Parameterized.Parameter
+    public CacheAtomicityMode cacheMode;
+
+    /** Other cluster id. */
+    @Parameterized.Parameter(1)
+    public byte otherClusterId;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, 
THIRD_CLUSTER_ID})
+                params.add(new Object[] {mode, otherClusterId});
+
+        return params;
+    }
+
+    /** */
+    private static IgniteCache<String, ConflictResolvableTestData> cache;
+
+    /** */
+    private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+
+    /** */
+    private static IgniteEx client;
+
+    /** */
+    private static final byte FIRST_CLUSTER_ID = 1;
+
+    /** */
+    private static final byte SECOND_CLUSTER_ID = 2;
+
+    /** */
+    private static final byte THIRD_CLUSTER_ID = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        CacheVersionConflictResolverPluginProvider<?> pluginCfg = new 
CacheVersionConflictResolverPluginProvider<>();
+
+        pluginCfg.setClusterId(SECOND_CLUSTER_ID);
+        pluginCfg.setCaches(new 
HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
+        pluginCfg.setConflictResolveField(conflictResolveField());
+
+        return 
super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(1);
+
+        client = startClientGrid(2);
+
+        cache = client.createCache(new CacheConfiguration<String, 
ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
+        cachex = client.cachex(DEFAULT_CACHE_NAME);
+    }
+
+    /** Tests that regular cache operations works with the conflict resolver 
when there is no update conflicts. */
+    @Test
+    public void testSimpleUpdates() {
+        String key = "UpdatesWithoutConflict";
+
+        put(key);
+        put(key);
+
+        remove(key);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
+        String key = key("UpdateFromOtherClusterWithoutConflict", 
otherClusterId);
+
+        putConflict(key, 1, true);
+
+        putConflict(key, 2, true);
+
+        removeConflict(key, 3, true);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.

Review comment:
       Javadoc seems to be incorrect

##########
File path: 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryVersion;
+import 
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Cache conflict operations test.
+ */
+@RunWith(Parameterized.class)
+public class CacheConflictOperationsTest extends GridCommonAbstractTest {
+    /** Cache mode. */
+    @Parameterized.Parameter
+    public CacheAtomicityMode cacheMode;
+
+    /** Other cluster id. */
+    @Parameterized.Parameter(1)
+    public byte otherClusterId;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, 
THIRD_CLUSTER_ID})
+                params.add(new Object[] {mode, otherClusterId});
+
+        return params;
+    }
+
+    /** */
+    private static IgniteCache<String, ConflictResolvableTestData> cache;
+
+    /** */
+    private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+
+    /** */
+    private static IgniteEx client;
+
+    /** */
+    private static final byte FIRST_CLUSTER_ID = 1;
+
+    /** */
+    private static final byte SECOND_CLUSTER_ID = 2;
+
+    /** */
+    private static final byte THIRD_CLUSTER_ID = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        CacheVersionConflictResolverPluginProvider<?> pluginCfg = new 
CacheVersionConflictResolverPluginProvider<>();
+
+        pluginCfg.setClusterId(SECOND_CLUSTER_ID);
+        pluginCfg.setCaches(new 
HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
+        pluginCfg.setConflictResolveField(conflictResolveField());
+
+        return 
super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(1);
+
+        client = startClientGrid(2);
+
+        cache = client.createCache(new CacheConfiguration<String, 
ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
+        cachex = client.cachex(DEFAULT_CACHE_NAME);
+    }
+
+    /** Tests that regular cache operations works with the conflict resolver 
when there is no update conflicts. */
+    @Test
+    public void testSimpleUpdates() {
+        String key = "UpdatesWithoutConflict";
+
+        put(key);
+        put(key);
+
+        remove(key);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
+        String key = key("UpdateFromOtherClusterWithoutConflict", 
otherClusterId);
+
+        putConflict(key, 1, true);
+
+        putConflict(key, 2, true);
+
+        removeConflict(key, 3, true);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesReorderFromOtherCluster() throws Exception {
+        String key = key("UpdateClusterUpdateReorder", otherClusterId);
+
+        putConflict(key, 2, true);
+
+        // Update with the equal or lower order should fail.
+        putConflict(key, 2, false);
+        putConflict(key, 1, false);
+
+        // Remove with the equal or lower order should fail.
+        removeConflict(key, 2, false);
+        removeConflict(key, 1, false);
+
+        // Remove with the higher order should succeed.
+        putConflict(key, 3, true);
+
+        key = key("UpdateClusterUpdateReorder2", otherClusterId);
+
+        int order = 1;
+
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
true);
+
+        // Update with the equal or lower topVer should fail.
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the equal or lower topVer should fail.
+        removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the higher topVer should succeed.
+        putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), 
true);
+
+        key = key("UpdateClusterUpdateReorder3", otherClusterId);
+
+        int topVer = 1;
+
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), true);
+
+        // Update with the equal or lower nodeOrder should fail.
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        putConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the equal or lower nodeOrder should fail.
+        removeConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        removeConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the higher nodeOrder should succeed.
+        putConflict(key, new GridCacheVersion(topVer, order, 3, 
otherClusterId), true);
+    }
+
+    /** Tests cache operations for entry replicated from another cluster. */
+    @Test
+    public void testUpdatesConflict() throws Exception {
+        String key = key("UpdateThisClusterConflict0", otherClusterId);
+
+        putConflict(key, 1, true);
+
+        // Local remove for other cluster entry should succeed.
+        remove(key);
+
+        // Conflict replicated update should fail.
+        putConflict(key, 2, false);
+
+        key = key("UpdateThisDCConflict1", otherClusterId);
+
+        putConflict(key, 3, true);
+
+        // Local update for other cluster entry should succeed.
+        put(key);
+
+        key = key("UpdateThisDCConflict2", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated remove should fail.
+        removeConflict(key, 4, false);
+
+        key = key("UpdateThisDCConflict3", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated update succeed only if resolved by field.
+        putConflict(key, 5, conflictResolveField() != null);
+    }
+
+    /** */
+    private void put(String key) {
+        ConflictResolvableTestData newVal = 
ConflictResolvableTestData.create();
+
+        CacheEntry<String, ConflictResolvableTestData> oldEntry = 
cache.getEntry(key);
+
+        cache.put(key, newVal);
+
+        CacheEntry<String, ConflictResolvableTestData> newEntry = 
cache.getEntry(key);
+
+        
assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion());
+        assertEquals(newVal, cache.get(key));
+
+        if (oldEntry != null)
+            assertTrue(((CacheEntryVersion)oldEntry.version()).order() < 
((CacheEntryVersion)newEntry.version()).order());
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, long order, boolean success) throws 
IgniteCheckedException {
+        putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), 
success);
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, GridCacheVersion newVer, boolean 
success) throws IgniteCheckedException {
+        CacheEntry<String, ConflictResolvableTestData> oldVal = 
cache.getEntry(k);
+        ConflictResolvableTestData newVal = 
ConflictResolvableTestData.create();
+
+        KeyCacheObject key = new KeyCacheObjectImpl(k, null, 
cachex.context().affinity().partition(k));
+        CacheObject val = new 
CacheObjectImpl(client.binary().toBinary(newVal), null);
+
+        cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, 
newVer)));
+
+        if (success) {
+            assertEquals(newVer, 
((CacheEntryVersion)cache.getEntry(k).version()).otherClusterVersion());
+            assertEquals(newVal, cache.get(k));
+        } else if (oldVal != null) {
+            assertEquals(oldVal.getValue(), cache.get(k));
+            assertEquals(oldVal.version(), cache.getEntry(k).version());
+        }
+    }
+
+    /** */
+    private void remove(String key) {
+        cache.remove(key);

Review comment:
       assertion we have the value before the remove will be useful.

##########
File path: 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryVersion;
+import 
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Cache conflict operations test.
+ */
+@RunWith(Parameterized.class)
+public class CacheConflictOperationsTest extends GridCommonAbstractTest {
+    /** Cache mode. */
+    @Parameterized.Parameter
+    public CacheAtomicityMode cacheMode;
+
+    /** Other cluster id. */
+    @Parameterized.Parameter(1)
+    public byte otherClusterId;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, 
THIRD_CLUSTER_ID})
+                params.add(new Object[] {mode, otherClusterId});
+
+        return params;
+    }
+
+    /** */
+    private static IgniteCache<String, ConflictResolvableTestData> cache;
+
+    /** */
+    private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+
+    /** */
+    private static IgniteEx client;
+
+    /** */
+    private static final byte FIRST_CLUSTER_ID = 1;
+
+    /** */
+    private static final byte SECOND_CLUSTER_ID = 2;
+
+    /** */
+    private static final byte THIRD_CLUSTER_ID = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        CacheVersionConflictResolverPluginProvider<?> pluginCfg = new 
CacheVersionConflictResolverPluginProvider<>();
+
+        pluginCfg.setClusterId(SECOND_CLUSTER_ID);
+        pluginCfg.setCaches(new 
HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
+        pluginCfg.setConflictResolveField(conflictResolveField());
+
+        return 
super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(1);
+
+        client = startClientGrid(2);
+
+        cache = client.createCache(new CacheConfiguration<String, 
ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
+        cachex = client.cachex(DEFAULT_CACHE_NAME);
+    }
+
+    /** Tests that regular cache operations works with the conflict resolver 
when there is no update conflicts. */
+    @Test
+    public void testSimpleUpdates() {
+        String key = "UpdatesWithoutConflict";
+
+        put(key);
+        put(key);
+
+        remove(key);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
+        String key = key("UpdateFromOtherClusterWithoutConflict", 
otherClusterId);
+
+        putConflict(key, 1, true);
+
+        putConflict(key, 2, true);
+
+        removeConflict(key, 3, true);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesReorderFromOtherCluster() throws Exception {
+        String key = key("UpdateClusterUpdateReorder", otherClusterId);
+
+        putConflict(key, 2, true);
+
+        // Update with the equal or lower order should fail.
+        putConflict(key, 2, false);
+        putConflict(key, 1, false);
+
+        // Remove with the equal or lower order should fail.
+        removeConflict(key, 2, false);
+        removeConflict(key, 1, false);
+
+        // Remove with the higher order should succeed.
+        putConflict(key, 3, true);
+
+        key = key("UpdateClusterUpdateReorder2", otherClusterId);
+
+        int order = 1;
+
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
true);
+
+        // Update with the equal or lower topVer should fail.
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the equal or lower topVer should fail.
+        removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the higher topVer should succeed.
+        putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), 
true);
+
+        key = key("UpdateClusterUpdateReorder3", otherClusterId);
+
+        int topVer = 1;
+
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), true);
+
+        // Update with the equal or lower nodeOrder should fail.
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        putConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the equal or lower nodeOrder should fail.
+        removeConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        removeConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the higher nodeOrder should succeed.
+        putConflict(key, new GridCacheVersion(topVer, order, 3, 
otherClusterId), true);
+    }
+
+    /** Tests cache operations for entry replicated from another cluster. */
+    @Test
+    public void testUpdatesConflict() throws Exception {
+        String key = key("UpdateThisClusterConflict0", otherClusterId);
+
+        putConflict(key, 1, true);
+
+        // Local remove for other cluster entry should succeed.
+        remove(key);
+
+        // Conflict replicated update should fail.

Review comment:
       need to explain here that remove makes the comparison by 
conflictResolveField impossible.

##########
File path: 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryVersion;
+import 
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Cache conflict operations test.
+ */
+@RunWith(Parameterized.class)
+public class CacheConflictOperationsTest extends GridCommonAbstractTest {
+    /** Cache mode. */
+    @Parameterized.Parameter
+    public CacheAtomicityMode cacheMode;
+
+    /** Other cluster id. */
+    @Parameterized.Parameter(1)
+    public byte otherClusterId;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, 
THIRD_CLUSTER_ID})
+                params.add(new Object[] {mode, otherClusterId});
+
+        return params;
+    }
+
+    /** */
+    private static IgniteCache<String, ConflictResolvableTestData> cache;
+
+    /** */
+    private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+
+    /** */
+    private static IgniteEx client;
+
+    /** */
+    private static final byte FIRST_CLUSTER_ID = 1;
+
+    /** */
+    private static final byte SECOND_CLUSTER_ID = 2;
+
+    /** */
+    private static final byte THIRD_CLUSTER_ID = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        CacheVersionConflictResolverPluginProvider<?> pluginCfg = new 
CacheVersionConflictResolverPluginProvider<>();
+
+        pluginCfg.setClusterId(SECOND_CLUSTER_ID);
+        pluginCfg.setCaches(new 
HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
+        pluginCfg.setConflictResolveField(conflictResolveField());
+
+        return 
super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(1);
+
+        client = startClientGrid(2);
+
+        cache = client.createCache(new CacheConfiguration<String, 
ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
+        cachex = client.cachex(DEFAULT_CACHE_NAME);
+    }
+
+    /** Tests that regular cache operations works with the conflict resolver 
when there is no update conflicts. */
+    @Test
+    public void testSimpleUpdates() {
+        String key = "UpdatesWithoutConflict";
+
+        put(key);
+        put(key);
+
+        remove(key);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
+        String key = key("UpdateFromOtherClusterWithoutConflict", 
otherClusterId);
+
+        putConflict(key, 1, true);
+
+        putConflict(key, 2, true);
+
+        removeConflict(key, 3, true);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations 
works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesReorderFromOtherCluster() throws Exception {
+        String key = key("UpdateClusterUpdateReorder", otherClusterId);
+
+        putConflict(key, 2, true);
+
+        // Update with the equal or lower order should fail.
+        putConflict(key, 2, false);
+        putConflict(key, 1, false);
+
+        // Remove with the equal or lower order should fail.
+        removeConflict(key, 2, false);
+        removeConflict(key, 1, false);
+
+        // Remove with the higher order should succeed.
+        putConflict(key, 3, true);
+
+        key = key("UpdateClusterUpdateReorder2", otherClusterId);
+
+        int order = 1;
+
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
true);
+
+        // Update with the equal or lower topVer should fail.
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the equal or lower topVer should fail.
+        removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), 
false);
+        removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), 
false);
+
+        // Remove with the higher topVer should succeed.
+        putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), 
true);
+
+        key = key("UpdateClusterUpdateReorder3", otherClusterId);
+
+        int topVer = 1;
+
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), true);
+
+        // Update with the equal or lower nodeOrder should fail.
+        putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        putConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the equal or lower nodeOrder should fail.
+        removeConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
+        removeConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
+
+        // Remove with the higher nodeOrder should succeed.
+        putConflict(key, new GridCacheVersion(topVer, order, 3, 
otherClusterId), true);
+    }
+
+    /** Tests cache operations for entry replicated from another cluster. */
+    @Test
+    public void testUpdatesConflict() throws Exception {
+        String key = key("UpdateThisClusterConflict0", otherClusterId);
+
+        putConflict(key, 1, true);
+
+        // Local remove for other cluster entry should succeed.
+        remove(key);
+
+        // Conflict replicated update should fail.
+        putConflict(key, 2, false);
+
+        key = key("UpdateThisDCConflict1", otherClusterId);
+
+        putConflict(key, 3, true);
+
+        // Local update for other cluster entry should succeed.
+        put(key);
+
+        key = key("UpdateThisDCConflict2", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated remove should fail.
+        removeConflict(key, 4, false);
+
+        key = key("UpdateThisDCConflict3", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated update succeed only if resolved by field.
+        putConflict(key, 5, conflictResolveField() != null);
+    }
+
+    /** */
+    private void put(String key) {
+        ConflictResolvableTestData newVal = 
ConflictResolvableTestData.create();
+
+        CacheEntry<String, ConflictResolvableTestData> oldEntry = 
cache.getEntry(key);
+
+        cache.put(key, newVal);
+
+        CacheEntry<String, ConflictResolvableTestData> newEntry = 
cache.getEntry(key);
+
+        
assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion());
+        assertEquals(newVal, cache.get(key));
+
+        if (oldEntry != null)
+            assertTrue(((CacheEntryVersion)oldEntry.version()).order() < 
((CacheEntryVersion)newEntry.version()).order());
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, long order, boolean success) throws 
IgniteCheckedException {
+        putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), 
success);
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, GridCacheVersion newVer, boolean 
success) throws IgniteCheckedException {
+        CacheEntry<String, ConflictResolvableTestData> oldVal = 
cache.getEntry(k);

Review comment:
       should this be oldEntry?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to