alex-plekhanov commented on code in PR #12128:
URL: https://github.com/apache/ignite/pull/12128#discussion_r2157348539


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java:
##########
@@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, 
EntryProcessorResult<T>>> invokeAll0(
     }
 
     /**
-     * Entry point for all public API put/transform methods.
+     * Entry point for all public API put/transform methods. Except for 
invokeAll operations with {@code Set}.

Review Comment:
   Remove `Except for invokeAll operations with {@code Set}.`



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java:
##########
@@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, 
EntryProcessorResult<T>>> invokeAll0(
     }
 
     /**
-     * Entry point for all public API put/transform methods.
+     * Entry point for all public API put/transform methods. Except for 
invokeAll operations with {@code Set}.
      *
-     * @param map Put map. Either {@code map}, {@code invokeMap} or {@code 
conflictPutMap} should be passed.
-     * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or 
{@code conflictPutMap} should be passed.
+     * @param keys Put keys.

Review Comment:
   Not only "Put", just "Keys"



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java:
##########
@@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, 
EntryProcessorResult<T>>> invokeAll0(
     }
 
     /**
-     * Entry point for all public API put/transform methods.
+     * Entry point for all public API put/transform methods. Except for 
invokeAll operations with {@code Set}.
      *
-     * @param map Put map. Either {@code map}, {@code invokeMap} or {@code 
conflictPutMap} should be passed.
-     * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or 
{@code conflictPutMap} should be passed.
+     * @param keys Put keys.
+     * @param vals Put values. Either {@code vals}, {@code invokeVals} or 
{@code conflictPutVals} should be passed.
+     * @param invokeVals Invoke values. Either {@code vals}, {@code 
invokeVals} or {@code conflictPutVals} should be passed.
      * @param invokeArgs Optional arguments for EntryProcessor.
-     * @param conflictPutMap Conflict put map.
-     * @param conflictRmvMap Conflict remove map.
+     * @param conflictPutVals Conflict put values.
+     * @param conflictRmvVals Conflict remove values.
      * @param retval Return value required flag.
      * @param async Async operation flag.
      * @return Completion future.
      */
     @SuppressWarnings("ConstantConditions")
     private IgniteInternalFuture updateAll0(
-        @Nullable Map<? extends K, ? extends V> map,
-        @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap,
+        @Nullable Collection<?> keys,
+        @Nullable Collection<? extends V> vals,
+        @Nullable Collection<? extends EntryProcessor> invokeVals,
         @Nullable Object[] invokeArgs,
-        @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
-        @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
+        @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+        @Nullable Collection<GridCacheVersion> conflictRmvVals,
         final boolean retval,
         final GridCacheOperation op,
         boolean async
     ) {
         assert ctx.updatesAllowed();
 
+        assert keys != null : keys;
+
         ctx.checkSecurity(SecurityPermission.CACHE_PUT);
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         if (opCtx != null && opCtx.hasDataCenterId()) {
-            assert conflictPutMap == null : conflictPutMap;
-            assert conflictRmvMap == null : conflictRmvMap;
+            assert conflictPutVals == null : conflictPutVals;
+            assert conflictRmvVals == null : conflictRmvVals;
 
             if (op == GridCacheOperation.TRANSFORM) {
-                assert invokeMap != null : invokeMap;
+                assert invokeVals != null : invokeVals;
 
-                conflictPutMap = F.viewReadOnly((Map)invokeMap,
-                    new IgniteClosure<EntryProcessor, GridCacheDrInfo>() {
-                        @Override public GridCacheDrInfo apply(EntryProcessor 
o) {
-                            return new GridCacheDrInfo(o, 
nextVersion(opCtx.dataCenterId()));
-                        }
-                    });
+                conflictPutVals = invokeVals
+                    .stream()
+                    .map(o -> new GridCacheDrInfo(o, 
nextVersion(opCtx.dataCenterId())))
+                    .collect(Collectors.toUnmodifiableList());
 
-                invokeMap = null;
+                invokeVals = null;
             }
             else if (op == GridCacheOperation.DELETE) {
-                assert map != null : map;
+                assert keys != null : keys;
+                assert vals != null : vals;
 
-                conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, 
GridCacheVersion>() {
-                    @Override public GridCacheVersion apply(V o) {
-                        return nextVersion(opCtx.dataCenterId());
-                    }
-                });
+                conflictRmvVals = vals
+                    .stream()
+                    .map(o -> nextVersion(opCtx.dataCenterId()))
+                    .collect(Collectors.toUnmodifiableList());
 
-                map = null;
+                vals = null;
             }
             else {
-                assert map != null : map;
+                assert keys != null : keys;
+                assert vals != null : vals;
 
-                conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, 
GridCacheDrInfo>() {
-                    @Override public GridCacheDrInfo apply(V o) {
-                        return new GridCacheDrInfo(ctx.toCacheObject(o), 
nextVersion(opCtx.dataCenterId()));
-                    }
-                });
+                conflictPutVals = vals
+                    .stream()
+                    .map(o -> new GridCacheDrInfo(ctx.toCacheObject(o), 
nextVersion(opCtx.dataCenterId())))
+                    .collect(Collectors.toUnmodifiableList());

Review Comment:
   F.viewReadOnly(vals, o -> new GridCacheDrInfo(ctx.toCacheObject(o), 
nextVersion(opCtx.dataCenterId())));



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java:
##########
@@ -725,23 +737,22 @@ private <K, V> IgniteInternalFuture putAllAsync0(
 
         if (opCtx != null && opCtx.hasDataCenterId()) {
             assert drMap == null : drMap;
-            assert map != null || invokeMap != null;
+            assert keySet != null || invokeVals != null;
 
             dataCenterId = opCtx.dataCenterId();
         }
         else
             dataCenterId = null;
 
-        final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, 
EntryProcessor<K, V, Object>>)invokeMap;
-
         if (log.isDebugEnabled())
-            log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map + 
", retval=" + retval + "]");
+            log.debug("Called putAllAsync(...) [tx=" + this +
+                ", map=[" + keySet + ", " + vals + "], retval=" + retval + 
"]");
 
-        assert map != null || invokeMap0 != null;
+        assert keySet != null || invokeVals != null;
 
         final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
 
-        if (F.isEmpty(map) && F.isEmpty(invokeMap0)) {
+        if (F.isEmpty(keySet) && vals == null && invokeVals != null) {

Review Comment:
   (F.isEmpty(vals) && F.isEmpty(invokeVals))



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java:
##########
@@ -725,23 +737,22 @@ private <K, V> IgniteInternalFuture putAllAsync0(
 
         if (opCtx != null && opCtx.hasDataCenterId()) {
             assert drMap == null : drMap;
-            assert map != null || invokeMap != null;
+            assert keySet != null || invokeVals != null;

Review Comment:
   Looks redundant, checked lower



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java:
##########
@@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, 
EntryProcessorResult<T>>> invokeAll0(
     }
 
     /**
-     * Entry point for all public API put/transform methods.
+     * Entry point for all public API put/transform methods. Except for 
invokeAll operations with {@code Set}.
      *
-     * @param map Put map. Either {@code map}, {@code invokeMap} or {@code 
conflictPutMap} should be passed.
-     * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or 
{@code conflictPutMap} should be passed.
+     * @param keys Put keys.
+     * @param vals Put values. Either {@code vals}, {@code invokeVals} or 
{@code conflictPutVals} should be passed.
+     * @param invokeVals Invoke values. Either {@code vals}, {@code 
invokeVals} or {@code conflictPutVals} should be passed.
      * @param invokeArgs Optional arguments for EntryProcessor.
-     * @param conflictPutMap Conflict put map.
-     * @param conflictRmvMap Conflict remove map.
+     * @param conflictPutVals Conflict put values.
+     * @param conflictRmvVals Conflict remove values.
      * @param retval Return value required flag.
      * @param async Async operation flag.
      * @return Completion future.
      */
     @SuppressWarnings("ConstantConditions")
     private IgniteInternalFuture updateAll0(
-        @Nullable Map<? extends K, ? extends V> map,
-        @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap,
+        @Nullable Collection<?> keys,
+        @Nullable Collection<? extends V> vals,
+        @Nullable Collection<? extends EntryProcessor> invokeVals,
         @Nullable Object[] invokeArgs,
-        @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
-        @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
+        @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+        @Nullable Collection<GridCacheVersion> conflictRmvVals,
         final boolean retval,
         final GridCacheOperation op,
         boolean async
     ) {
         assert ctx.updatesAllowed();
 
+        assert keys != null : keys;
+
         ctx.checkSecurity(SecurityPermission.CACHE_PUT);
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         if (opCtx != null && opCtx.hasDataCenterId()) {
-            assert conflictPutMap == null : conflictPutMap;
-            assert conflictRmvMap == null : conflictRmvMap;
+            assert conflictPutVals == null : conflictPutVals;
+            assert conflictRmvVals == null : conflictRmvVals;
 
             if (op == GridCacheOperation.TRANSFORM) {
-                assert invokeMap != null : invokeMap;
+                assert invokeVals != null : invokeVals;
 
-                conflictPutMap = F.viewReadOnly((Map)invokeMap,
-                    new IgniteClosure<EntryProcessor, GridCacheDrInfo>() {
-                        @Override public GridCacheDrInfo apply(EntryProcessor 
o) {
-                            return new GridCacheDrInfo(o, 
nextVersion(opCtx.dataCenterId()));
-                        }
-                    });
+                conflictPutVals = invokeVals
+                    .stream()
+                    .map(o -> new GridCacheDrInfo(o, 
nextVersion(opCtx.dataCenterId())))
+                    .collect(Collectors.toUnmodifiableList());
 
-                invokeMap = null;
+                invokeVals = null;
             }
             else if (op == GridCacheOperation.DELETE) {
-                assert map != null : map;
+                assert keys != null : keys;
+                assert vals != null : vals;
 
-                conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, 
GridCacheVersion>() {
-                    @Override public GridCacheVersion apply(V o) {
-                        return nextVersion(opCtx.dataCenterId());
-                    }
-                });
+                conflictRmvVals = vals
+                    .stream()
+                    .map(o -> nextVersion(opCtx.dataCenterId()))
+                    .collect(Collectors.toUnmodifiableList());

Review Comment:
   F.viewReadOnly(vals, o -> nextVersion(opCtx.dataCenterId()))



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java:
##########
@@ -505,11 +513,13 @@ public IgniteInternalFuture<?> putAllDrAsync(
         GridCacheContext cacheCtx,
         Map<KeyCacheObject, GridCacheDrInfo> drMap
     ) {
-        Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, 
(IgniteClosure<GridCacheDrInfo, Object>)GridCacheDrInfo::value);
+        Set<?> keySet = drMap.keySet();
+        Collection<?> vals = 
drMap.values().stream().map(GridCacheDrInfo::value).collect(Collectors.toUnmodifiableList());

Review Comment:
   F.viewReadOnly(drMap.values(), GridCacheDrInfo::value)



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java:
##########
@@ -725,23 +737,22 @@ private <K, V> IgniteInternalFuture putAllAsync0(
 
         if (opCtx != null && opCtx.hasDataCenterId()) {
             assert drMap == null : drMap;
-            assert map != null || invokeMap != null;
+            assert keySet != null || invokeVals != null;
 
             dataCenterId = opCtx.dataCenterId();
         }
         else
             dataCenterId = null;
 
-        final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, 
EntryProcessor<K, V, Object>>)invokeMap;
-
         if (log.isDebugEnabled())
-            log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map + 
", retval=" + retval + "]");
+            log.debug("Called putAllAsync(...) [tx=" + this +
+                ", map=[" + keySet + ", " + vals + "], retval=" + retval + 
"]");
 
-        assert map != null || invokeMap0 != null;
+        assert keySet != null || invokeVals != null;

Review Comment:
   vals != null || invokeVals != null



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java:
##########
@@ -1616,4 +1650,77 @@ private void 
checkDataReplicationSupported(ProtocolContext protocolCtx)
         if 
(!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS))
             throw new 
ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS);
     }
+
+    /**
+     * Warns if an unordered map is used in an operation that may lead to a 
distributed deadlock
+     * during an explicit transaction.
+     * <p>
+     * This check is relevant only for explicit user-managed transactions. 
Implicit transactions
+     * (such as those started automatically by the system) are not inspected 
by this method.
+     * </p>
+     *
+     * @param m        The map being used in the cache operation.
+     */
+    protected void warnIfUnordered(Map<?, ?> m) {
+        if (m == null || m.size() <= 1)
+            return;
+
+        TcpClientTransaction tx = transactions.tx();
+
+        // Only explicit transactions are checked
+        if (tx == null)
+            return;
+
+        if (m instanceof SortedMap || m instanceof GridSerializableMap)

Review Comment:
   GridSerializableMap - not sure about this class, looks like required only on 
server side



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java:
##########
@@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, 
EntryProcessorResult<T>>> invokeAll0(
     }
 
     /**
-     * Entry point for all public API put/transform methods.
+     * Entry point for all public API put/transform methods. Except for 
invokeAll operations with {@code Set}.
      *
-     * @param map Put map. Either {@code map}, {@code invokeMap} or {@code 
conflictPutMap} should be passed.
-     * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or 
{@code conflictPutMap} should be passed.
+     * @param keys Put keys.
+     * @param vals Put values. Either {@code vals}, {@code invokeVals} or 
{@code conflictPutVals} should be passed.
+     * @param invokeVals Invoke values. Either {@code vals}, {@code 
invokeVals} or {@code conflictPutVals} should be passed.
      * @param invokeArgs Optional arguments for EntryProcessor.
-     * @param conflictPutMap Conflict put map.
-     * @param conflictRmvMap Conflict remove map.
+     * @param conflictPutVals Conflict put values.
+     * @param conflictRmvVals Conflict remove values.
      * @param retval Return value required flag.
      * @param async Async operation flag.
      * @return Completion future.
      */
     @SuppressWarnings("ConstantConditions")
     private IgniteInternalFuture updateAll0(
-        @Nullable Map<? extends K, ? extends V> map,
-        @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap,
+        @Nullable Collection<?> keys,
+        @Nullable Collection<? extends V> vals,
+        @Nullable Collection<? extends EntryProcessor> invokeVals,
         @Nullable Object[] invokeArgs,
-        @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
-        @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
+        @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+        @Nullable Collection<GridCacheVersion> conflictRmvVals,
         final boolean retval,
         final GridCacheOperation op,
         boolean async
     ) {
         assert ctx.updatesAllowed();
 
+        assert keys != null : keys;
+
         ctx.checkSecurity(SecurityPermission.CACHE_PUT);
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         if (opCtx != null && opCtx.hasDataCenterId()) {
-            assert conflictPutMap == null : conflictPutMap;
-            assert conflictRmvMap == null : conflictRmvMap;
+            assert conflictPutVals == null : conflictPutVals;
+            assert conflictRmvVals == null : conflictRmvVals;
 
             if (op == GridCacheOperation.TRANSFORM) {
-                assert invokeMap != null : invokeMap;
+                assert invokeVals != null : invokeVals;
 
-                conflictPutMap = F.viewReadOnly((Map)invokeMap,
-                    new IgniteClosure<EntryProcessor, GridCacheDrInfo>() {
-                        @Override public GridCacheDrInfo apply(EntryProcessor 
o) {
-                            return new GridCacheDrInfo(o, 
nextVersion(opCtx.dataCenterId()));
-                        }
-                    });
+                conflictPutVals = invokeVals
+                    .stream()
+                    .map(o -> new GridCacheDrInfo(o, 
nextVersion(opCtx.dataCenterId())))
+                    .collect(Collectors.toUnmodifiableList());

Review Comment:
   F.viewReadOnly(invokeVals, o -> new GridCacheDrInfo(o, 
nextVersion(opCtx.dataCenterId())));



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to