timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776203084



##########
File path: 
modules/core/src/main/java/org/apache/ignite/cache/ReadRepairStrategy.java
##########
@@ -18,23 +18,43 @@
 package org.apache.ignite.cache;
 
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 
 /**
  * Read repair strategies.
  *
  * @see IgniteCache#withReadRepair(ReadRepairStrategy) for details.
  */
 public enum ReadRepairStrategy {
-    /** Last write wins. */
+    /** Last write (the newest entry) wins.
+     * <p>
+     * May cause {@link IgniteException} when fix is impossible (unable to 
detect the newest entry):
+     * <ul>
+     * <li>Null(s) found as well as non-null values for the save key.

Review comment:
       s/save/same/

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
##########
@@ -232,7 +233,7 @@ public void 
setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteCache<K, V> withReadRepair() {
+    @Override public IgniteCache<K, V> withReadRepair(ReadRepairStrategy 
strategy) {

Review comment:
       Let's add assert that the strategy isn't `null` here.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/IgniteIrreparableConsistencyViolationException.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+
+/**
+ * Irreparable consistency violation exception.
+ */
+public class IgniteIrreparableConsistencyViolationException extends 
IgniteConsistencyViolationException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Irreparable keys. */
+    private final Collection<?> irreparableKeys;

Review comment:
       Can be replaced with `Collection<Object>`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
##########
@@ -92,7 +93,7 @@ public CacheOperationContext(
         boolean noRetries,
         @Nullable Byte dataCenterId,
         boolean recovery,
-        boolean readRepair,
+        ReadRepairStrategy readRepairStrategy,

Review comment:
       There is already `@Nullable` in the signature. Let's keep it consistent

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
##########
@@ -191,7 +192,7 @@ public GridDistributedCacheEntry entryExx(
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         final boolean recovery = opCtx != null && opCtx.recovery();
-        final boolean readRepair = opCtx != null && opCtx.readRepair();
+        final ReadRepairStrategy readRepairStrategy = opCtx != null ? 
opCtx.readRepairStrategy() : null;

Review comment:
       Can replace it with boolean flag for better readability. It looks 
strange that we extract a strategy here, but don't use it and run only 
consistency check and not repair.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -4896,13 +4901,13 @@ protected V get(
             /*skip vals*/false,
             needVer);
 
-        if (readRepair) {
+        if (readRepairStrategy != null) {
             CacheOperationContext opCtx = ctx.operationContextPerCall();
 
             return getWithRepairAsync(
                 fut,
                 () -> repairAsync(key, opCtx, false),
-                () -> repairableGetAsync(key, deserializeBinary, needVer, 
readRepair));
+                () -> repairableGetAsync(key, deserializeBinary, needVer, 
readRepairStrategy));

Review comment:
       It's ok to have the boolean flag here too. Let's use `strategy` only in 
the place actually performs repair.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some 
keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> 
implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final 
AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        
AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, 
"initFlag");
+
+    /** Listener calls updater. */
+    private static final 
AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        
AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, 
"lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Keys. */
+    private volatile Collection<Object> keys;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;
+
+    /**
+     * @param fut Future.
+     */
+    public void add(IgniteInternalFuture<Void> fut) {
+        size++; // All additions are from the same thread.
+
+        fut.listen(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<Void> fut) {
+        Throwable e = fut.error();
+
+        if (e != null) {
+            if (e instanceof IgniteConsistencyViolationException) {
+                synchronized (this) {

Review comment:
       Can replace with the DCL pattern to synchronize on initialization of 
collections only.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/cache/ReadRepairStrategy.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+
+/**
+ * Read repair strategies.
+ *
+ * @see IgniteCache#withReadRepair(ReadRepairStrategy) for details.
+ */
+public enum ReadRepairStrategy {
+    /** Last write (the newest entry) wins.
+     * <p>
+     * May cause {@link IgniteException} when fix is impossible (unable to 
detect the newest entry):
+     * <ul>
+     * <li>Null(s) found as well as non-null values for the save key.
+     * <p>
+     * Null (missed entry) has no version, so, it can not be compared with the 
versioned entry.</li>
+     * <li>Entries with the same version have different values.</li>
+     * </ul>
+     */
+    LWW("LWW"),
+
+    /** Value from the primary node wins. */
+    PRIMARY("PRIMARY"),
+
+    /** The relative majority, any value found more times than any other wins.
+     * <p>
+     * Works for an even number of copies (which is typical of Ignite) instead 
of an absolute majority.
+     * <p>
+     * May cause {@link IgniteException} when unable to detect value found 
more times than any other.
+     * <p>
+     * For example, when we have 5 copies and value `A` found twice, but 
`X`,`Y` and `Z` only once, `A` wins.

Review comment:
       But it's said 
   
   > works for an even number of copies 
   
   What does it mean in case of 5 copies?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -699,9 +701,9 @@ public void onKernalStop() {
             /*skip values*/true,
             false);
 
-        boolean readRepair = opCtx != null && opCtx.readRepair();

Review comment:
       Here and below. I like this boolean here. It's a little bit 
straightforward from my point of view.
   
   `boolean readRepair = opCtx != null && opCtx.readRepairStrategy() != null`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some 
keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> 
implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final 
AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        
AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, 
"initFlag");
+
+    /** Listener calls updater. */
+    private static final 
AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        
AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, 
"lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Keys. */
+    private volatile Collection<Object> keys;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;
+
+    /**
+     * @param fut Future.
+     */
+    public void add(IgniteInternalFuture<Void> fut) {
+        size++; // All additions are from the same thread.
+
+        fut.listen(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<Void> fut) {
+        Throwable e = fut.error();
+
+        if (e != null) {
+            if (e instanceof IgniteConsistencyViolationException) {
+                synchronized (this) {
+                    Collection<?> keys = 
((IgniteConsistencyViolationException)e).keys();
+
+                    if (this.keys == null)
+                        this.keys = new GridConcurrentHashSet<>();
+
+                    this.keys.addAll(keys);
+
+                    if (e instanceof 
IgniteIrreparableConsistencyViolationException) {
+                        Collection<?> irreparableKeys = 
((IgniteIrreparableConsistencyViolationException)e).irreparableKeys();
+
+                        if (this.irreparableKeys == null)
+                            this.irreparableKeys = new 
GridConcurrentHashSet<>();
+
+                        this.irreparableKeys.addAll(irreparableKeys);
+                    }
+                }
+            }
+            else
+                onDone(e);
+        }
+
+        LSNR_CALLS_UPD.incrementAndGet(this);
+
+        checkComplete();
+    }
+
+    /**
+     * Mark this future as initialized.
+     */
+    public final void markInitialized() {
+        if (FLAGS_UPD.compareAndSet(this, 0, INIT_FLAG))

Review comment:
       `markInitialized` invoked in the same thread that invokes the `add` 
method. Is it required to have the CAS operation here?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/IgniteIrreparableConsistencyViolationException.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+
+/**
+ * Irreparable consistency violation exception.
+ */
+public class IgniteIrreparableConsistencyViolationException extends 
IgniteConsistencyViolationException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Irreparable keys. */
+    private final Collection<?> irreparableKeys;
+
+    /**
+     * @param keys            Keys.
+     * @param irreparableKeys Irreparable keys.
+     */
+    public IgniteIrreparableConsistencyViolationException(Collection<?> keys, 
Collection<?> irreparableKeys) {
+        super(keys);

Review comment:
       Should we change error msg too? Currently it uses the message from super 
class. I think we should write down info about irreparable keys, used strategy, 
and made some explanation why exception happened. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -254,51 +266,114 @@ else if (!canRemap)
      */
     protected abstract void reduce();
 
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> check() throws 
IgniteCheckedException {

Review comment:
       Can we make it `protected`?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some 
keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> 
implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final 
AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        
AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, 
"initFlag");
+
+    /** Listener calls updater. */
+    private static final 
AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        
AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, 
"lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Keys. */
+    private volatile Collection<Object> keys;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;
+
+    /**
+     * @param fut Future.
+     */
+    public void add(IgniteInternalFuture<Void> fut) {
+        size++; // All additions are from the same thread.
+
+        fut.listen(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<Void> fut) {
+        Throwable e = fut.error();
+
+        if (e != null) {
+            if (e instanceof IgniteConsistencyViolationException) {
+                synchronized (this) {
+                    Collection<?> keys = 
((IgniteConsistencyViolationException)e).keys();
+
+                    if (this.keys == null)
+                        this.keys = new GridConcurrentHashSet<>();

Review comment:
       Can we use `ConcurrentHashMap.newKeySet()` here?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some 
keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> 
implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final 
AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        
AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, 
"initFlag");
+
+    /** Listener calls updater. */
+    private static final 
AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        
AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, 
"lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;

Review comment:
       Can we use boolean instead?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
##########
@@ -1712,20 +1712,6 @@ void updateAllAsyncInternal(
         }
     }
 
-    /** {@inheritDoc} */
-    @Override protected IgniteInternalFuture<Void> repairAsync(Collection<? 
extends K> keys,

Review comment:
       Now `GridCacheAdapter#repairAsync` can be `private`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to