alex-plekhanov commented on code in PR #12128: URL: https://github.com/apache/ignite/pull/12128#discussion_r2157348539
########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java: ########## @@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAll0( } /** - * Entry point for all public API put/transform methods. + * Entry point for all public API put/transform methods. Except for invokeAll operations with {@code Set}. Review Comment: Remove `Except for invokeAll operations with {@code Set}.` ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java: ########## @@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAll0( } /** - * Entry point for all public API put/transform methods. + * Entry point for all public API put/transform methods. Except for invokeAll operations with {@code Set}. * - * @param map Put map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. - * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. + * @param keys Put keys. Review Comment: Not only "Put", just "Keys" ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java: ########## @@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAll0( } /** - * Entry point for all public API put/transform methods. + * Entry point for all public API put/transform methods. Except for invokeAll operations with {@code Set}. * - * @param map Put map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. - * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. + * @param keys Put keys. + * @param vals Put values. Either {@code vals}, {@code invokeVals} or {@code conflictPutVals} should be passed. + * @param invokeVals Invoke values. Either {@code vals}, {@code invokeVals} or {@code conflictPutVals} should be passed. * @param invokeArgs Optional arguments for EntryProcessor. - * @param conflictPutMap Conflict put map. - * @param conflictRmvMap Conflict remove map. + * @param conflictPutVals Conflict put values. + * @param conflictRmvVals Conflict remove values. * @param retval Return value required flag. * @param async Async operation flag. * @return Completion future. */ @SuppressWarnings("ConstantConditions") private IgniteInternalFuture updateAll0( - @Nullable Map<? extends K, ? extends V> map, - @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap, + @Nullable Collection<?> keys, + @Nullable Collection<? extends V> vals, + @Nullable Collection<? extends EntryProcessor> invokeVals, @Nullable Object[] invokeArgs, - @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap, - @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, + @Nullable Collection<GridCacheDrInfo> conflictPutVals, + @Nullable Collection<GridCacheVersion> conflictRmvVals, final boolean retval, final GridCacheOperation op, boolean async ) { assert ctx.updatesAllowed(); + assert keys != null : keys; + ctx.checkSecurity(SecurityPermission.CACHE_PUT); final CacheOperationContext opCtx = ctx.operationContextPerCall(); if (opCtx != null && opCtx.hasDataCenterId()) { - assert conflictPutMap == null : conflictPutMap; - assert conflictRmvMap == null : conflictRmvMap; + assert conflictPutVals == null : conflictPutVals; + assert conflictRmvVals == null : conflictRmvVals; if (op == GridCacheOperation.TRANSFORM) { - assert invokeMap != null : invokeMap; + assert invokeVals != null : invokeVals; - conflictPutMap = F.viewReadOnly((Map)invokeMap, - new IgniteClosure<EntryProcessor, GridCacheDrInfo>() { - @Override public GridCacheDrInfo apply(EntryProcessor o) { - return new GridCacheDrInfo(o, nextVersion(opCtx.dataCenterId())); - } - }); + conflictPutVals = invokeVals + .stream() + .map(o -> new GridCacheDrInfo(o, nextVersion(opCtx.dataCenterId()))) + .collect(Collectors.toUnmodifiableList()); - invokeMap = null; + invokeVals = null; } else if (op == GridCacheOperation.DELETE) { - assert map != null : map; + assert keys != null : keys; + assert vals != null : vals; - conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() { - @Override public GridCacheVersion apply(V o) { - return nextVersion(opCtx.dataCenterId()); - } - }); + conflictRmvVals = vals + .stream() + .map(o -> nextVersion(opCtx.dataCenterId())) + .collect(Collectors.toUnmodifiableList()); - map = null; + vals = null; } else { - assert map != null : map; + assert keys != null : keys; + assert vals != null : vals; - conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheDrInfo>() { - @Override public GridCacheDrInfo apply(V o) { - return new GridCacheDrInfo(ctx.toCacheObject(o), nextVersion(opCtx.dataCenterId())); - } - }); + conflictPutVals = vals + .stream() + .map(o -> new GridCacheDrInfo(ctx.toCacheObject(o), nextVersion(opCtx.dataCenterId()))) + .collect(Collectors.toUnmodifiableList()); Review Comment: F.viewReadOnly(vals, o -> new GridCacheDrInfo(ctx.toCacheObject(o), nextVersion(opCtx.dataCenterId()))); ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java: ########## @@ -725,23 +737,22 @@ private <K, V> IgniteInternalFuture putAllAsync0( if (opCtx != null && opCtx.hasDataCenterId()) { assert drMap == null : drMap; - assert map != null || invokeMap != null; + assert keySet != null || invokeVals != null; dataCenterId = opCtx.dataCenterId(); } else dataCenterId = null; - final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; - if (log.isDebugEnabled()) - log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map + ", retval=" + retval + "]"); + log.debug("Called putAllAsync(...) [tx=" + this + + ", map=[" + keySet + ", " + vals + "], retval=" + retval + "]"); - assert map != null || invokeMap0 != null; + assert keySet != null || invokeVals != null; final GridCacheReturn ret = new GridCacheReturn(localResult(), false); - if (F.isEmpty(map) && F.isEmpty(invokeMap0)) { + if (F.isEmpty(keySet) && vals == null && invokeVals != null) { Review Comment: (F.isEmpty(vals) && F.isEmpty(invokeVals)) ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java: ########## @@ -725,23 +737,22 @@ private <K, V> IgniteInternalFuture putAllAsync0( if (opCtx != null && opCtx.hasDataCenterId()) { assert drMap == null : drMap; - assert map != null || invokeMap != null; + assert keySet != null || invokeVals != null; Review Comment: Looks redundant, checked lower ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java: ########## @@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAll0( } /** - * Entry point for all public API put/transform methods. + * Entry point for all public API put/transform methods. Except for invokeAll operations with {@code Set}. * - * @param map Put map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. - * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. + * @param keys Put keys. + * @param vals Put values. Either {@code vals}, {@code invokeVals} or {@code conflictPutVals} should be passed. + * @param invokeVals Invoke values. Either {@code vals}, {@code invokeVals} or {@code conflictPutVals} should be passed. * @param invokeArgs Optional arguments for EntryProcessor. - * @param conflictPutMap Conflict put map. - * @param conflictRmvMap Conflict remove map. + * @param conflictPutVals Conflict put values. + * @param conflictRmvVals Conflict remove values. * @param retval Return value required flag. * @param async Async operation flag. * @return Completion future. */ @SuppressWarnings("ConstantConditions") private IgniteInternalFuture updateAll0( - @Nullable Map<? extends K, ? extends V> map, - @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap, + @Nullable Collection<?> keys, + @Nullable Collection<? extends V> vals, + @Nullable Collection<? extends EntryProcessor> invokeVals, @Nullable Object[] invokeArgs, - @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap, - @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, + @Nullable Collection<GridCacheDrInfo> conflictPutVals, + @Nullable Collection<GridCacheVersion> conflictRmvVals, final boolean retval, final GridCacheOperation op, boolean async ) { assert ctx.updatesAllowed(); + assert keys != null : keys; + ctx.checkSecurity(SecurityPermission.CACHE_PUT); final CacheOperationContext opCtx = ctx.operationContextPerCall(); if (opCtx != null && opCtx.hasDataCenterId()) { - assert conflictPutMap == null : conflictPutMap; - assert conflictRmvMap == null : conflictRmvMap; + assert conflictPutVals == null : conflictPutVals; + assert conflictRmvVals == null : conflictRmvVals; if (op == GridCacheOperation.TRANSFORM) { - assert invokeMap != null : invokeMap; + assert invokeVals != null : invokeVals; - conflictPutMap = F.viewReadOnly((Map)invokeMap, - new IgniteClosure<EntryProcessor, GridCacheDrInfo>() { - @Override public GridCacheDrInfo apply(EntryProcessor o) { - return new GridCacheDrInfo(o, nextVersion(opCtx.dataCenterId())); - } - }); + conflictPutVals = invokeVals + .stream() + .map(o -> new GridCacheDrInfo(o, nextVersion(opCtx.dataCenterId()))) + .collect(Collectors.toUnmodifiableList()); - invokeMap = null; + invokeVals = null; } else if (op == GridCacheOperation.DELETE) { - assert map != null : map; + assert keys != null : keys; + assert vals != null : vals; - conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() { - @Override public GridCacheVersion apply(V o) { - return nextVersion(opCtx.dataCenterId()); - } - }); + conflictRmvVals = vals + .stream() + .map(o -> nextVersion(opCtx.dataCenterId())) + .collect(Collectors.toUnmodifiableList()); Review Comment: F.viewReadOnly(vals, o -> nextVersion(opCtx.dataCenterId())) ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java: ########## @@ -505,11 +513,13 @@ public IgniteInternalFuture<?> putAllDrAsync( GridCacheContext cacheCtx, Map<KeyCacheObject, GridCacheDrInfo> drMap ) { - Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, (IgniteClosure<GridCacheDrInfo, Object>)GridCacheDrInfo::value); + Set<?> keySet = drMap.keySet(); + Collection<?> vals = drMap.values().stream().map(GridCacheDrInfo::value).collect(Collectors.toUnmodifiableList()); Review Comment: F.viewReadOnly(drMap.values(), GridCacheDrInfo::value) ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java: ########## @@ -725,23 +737,22 @@ private <K, V> IgniteInternalFuture putAllAsync0( if (opCtx != null && opCtx.hasDataCenterId()) { assert drMap == null : drMap; - assert map != null || invokeMap != null; + assert keySet != null || invokeVals != null; dataCenterId = opCtx.dataCenterId(); } else dataCenterId = null; - final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; - if (log.isDebugEnabled()) - log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map + ", retval=" + retval + "]"); + log.debug("Called putAllAsync(...) [tx=" + this + + ", map=[" + keySet + ", " + vals + "], retval=" + retval + "]"); - assert map != null || invokeMap0 != null; + assert keySet != null || invokeVals != null; Review Comment: vals != null || invokeVals != null ########## modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java: ########## @@ -1616,4 +1650,77 @@ private void checkDataReplicationSupported(ProtocolContext protocolCtx) if (!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS)) throw new ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS); } + + /** + * Warns if an unordered map is used in an operation that may lead to a distributed deadlock + * during an explicit transaction. + * <p> + * This check is relevant only for explicit user-managed transactions. Implicit transactions + * (such as those started automatically by the system) are not inspected by this method. + * </p> + * + * @param m The map being used in the cache operation. + */ + protected void warnIfUnordered(Map<?, ?> m) { + if (m == null || m.size() <= 1) + return; + + TcpClientTransaction tx = transactions.tx(); + + // Only explicit transactions are checked + if (tx == null) + return; + + if (m instanceof SortedMap || m instanceof GridSerializableMap) Review Comment: GridSerializableMap - not sure about this class, looks like required only on server side ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java: ########## @@ -1031,71 +1033,73 @@ private <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAll0( } /** - * Entry point for all public API put/transform methods. + * Entry point for all public API put/transform methods. Except for invokeAll operations with {@code Set}. * - * @param map Put map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. - * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. + * @param keys Put keys. + * @param vals Put values. Either {@code vals}, {@code invokeVals} or {@code conflictPutVals} should be passed. + * @param invokeVals Invoke values. Either {@code vals}, {@code invokeVals} or {@code conflictPutVals} should be passed. * @param invokeArgs Optional arguments for EntryProcessor. - * @param conflictPutMap Conflict put map. - * @param conflictRmvMap Conflict remove map. + * @param conflictPutVals Conflict put values. + * @param conflictRmvVals Conflict remove values. * @param retval Return value required flag. * @param async Async operation flag. * @return Completion future. */ @SuppressWarnings("ConstantConditions") private IgniteInternalFuture updateAll0( - @Nullable Map<? extends K, ? extends V> map, - @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap, + @Nullable Collection<?> keys, + @Nullable Collection<? extends V> vals, + @Nullable Collection<? extends EntryProcessor> invokeVals, @Nullable Object[] invokeArgs, - @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap, - @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, + @Nullable Collection<GridCacheDrInfo> conflictPutVals, + @Nullable Collection<GridCacheVersion> conflictRmvVals, final boolean retval, final GridCacheOperation op, boolean async ) { assert ctx.updatesAllowed(); + assert keys != null : keys; + ctx.checkSecurity(SecurityPermission.CACHE_PUT); final CacheOperationContext opCtx = ctx.operationContextPerCall(); if (opCtx != null && opCtx.hasDataCenterId()) { - assert conflictPutMap == null : conflictPutMap; - assert conflictRmvMap == null : conflictRmvMap; + assert conflictPutVals == null : conflictPutVals; + assert conflictRmvVals == null : conflictRmvVals; if (op == GridCacheOperation.TRANSFORM) { - assert invokeMap != null : invokeMap; + assert invokeVals != null : invokeVals; - conflictPutMap = F.viewReadOnly((Map)invokeMap, - new IgniteClosure<EntryProcessor, GridCacheDrInfo>() { - @Override public GridCacheDrInfo apply(EntryProcessor o) { - return new GridCacheDrInfo(o, nextVersion(opCtx.dataCenterId())); - } - }); + conflictPutVals = invokeVals + .stream() + .map(o -> new GridCacheDrInfo(o, nextVersion(opCtx.dataCenterId()))) + .collect(Collectors.toUnmodifiableList()); Review Comment: F.viewReadOnly(invokeVals, o -> new GridCacheDrInfo(o, nextVersion(opCtx.dataCenterId()))); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org