Fixed scalar examples
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9c46fa96 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9c46fa96 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9c46fa96 Branch: refs/heads/ignite-1537 Commit: 9c46fa961bb88524fcd6c8ccb85c28f8688758b8 Parents: 91f3f87 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Fri Nov 27 14:12:43 2015 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Nov 27 14:12:43 2015 +0300 ---------------------------------------------------------------------- .../internal/portable/BinaryReaderExImpl.java | 2 +- .../ignite/internal/portable/PortableUtils.java | 6 +-- .../processors/cache/CacheInvokeResult.java | 14 ++++++ .../dht/atomic/GridDhtAtomicCache.java | 50 ++++++++++++++++++-- .../cache/GridCacheAbstractFullApiSelfTest.java | 2 +- 5 files changed, 66 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9c46fa96/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java index 872d7a3..3cc2fbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java @@ -1616,7 +1616,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina break; case OPTM_MARSH: - obj = PortableUtils.doReadOptimized(in, ctx); + obj = PortableUtils.doReadOptimized(in, ctx, ldr); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/9c46fa96/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java index e543c41..1a8f156 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java @@ -1498,13 +1498,13 @@ public class PortableUtils { * * @return Result. */ - public static Object doReadOptimized(PortableInputStream in, PortableContext ctx) { + public static Object doReadOptimized(PortableInputStream in, PortableContext ctx, @Nullable ClassLoader clsLdr) { int len = in.readInt(); ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), len); try { - return ctx.optimizedMarsh().unmarshal(input, null); + return ctx.optimizedMarsh().unmarshal(input, clsLdr); } catch (IgniteCheckedException e) { throw new BinaryObjectException("Failed to unmarshal object with optimized marshaller", e); @@ -1706,7 +1706,7 @@ public class PortableUtils { return doReadClass(in, ctx, ldr); case OPTM_MARSH: - return doReadOptimized(in, ctx); + return doReadOptimized(in, ctx, ldr); default: throw new BinaryObjectException("Invalid flag value: " + flag); http://git-wip-us.apache.org/repos/asf/ignite/blob/9c46fa96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java index 8d6d905..48dabb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeResult.java @@ -64,6 +64,20 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz } /** + * @return Result. + */ + public T result() { + return res; + } + + /** + * Entry processor error; + */ + public Exception error() { + return err; + } + + /** * Static constructor. * * @param err Exception thrown by {@link EntryProcessor#process(MutableEntry, Object...)}. http://git-wip-us.apache.org/repos/asf/ignite/blob/9c46fa96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index cd76a56..028f477 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -724,7 +724,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { - EntryProcessorResult<T> res = invokeAsync(key, entryProcessor, args).get(); + IgniteInternalFuture<EntryProcessorResult<T>> invokeFut = invokeAsync(key, entryProcessor, args); + + EntryProcessorResult<T> res = invokeFut.get(); return res != null ? res : new CacheInvokeResult<T>(); } @@ -750,6 +752,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Map<? extends K, EntryProcessor> invokeMap = Collections.singletonMap(key, (EntryProcessor)entryProcessor); + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); + IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, invokeMap, args, @@ -768,7 +774,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (resMap != null) { assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); - return resMap.isEmpty() ? null : resMap.values().iterator().next(); + EntryProcessorResult<T> res = resMap.isEmpty() ? null : resMap.values().iterator().next(); + + if (res instanceof CacheInvokeResult) { + CacheInvokeResult invokeRes = (CacheInvokeResult)res; + + if (invokeRes.result() != null) + res = CacheInvokeResult.fromResult((T)ctx.unwrapPortableIfNeeded(invokeRes.result(), + keepBinary)); + } + + return res; } return null; @@ -792,7 +808,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - return updateAllAsync0(null, + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); + + IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> resFut = updateAllAsync0(null, invokeMap, args, null, @@ -801,6 +821,30 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, null, true); + + return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() { + @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException { + Map<K, EntryProcessorResult<T>> resMap = fut.get(); + + if (resMap != null) { + return F.viewReadOnly(resMap, new C1<EntryProcessorResult<T>, EntryProcessorResult<T>>() { + @Override public EntryProcessorResult<T> apply(EntryProcessorResult<T> res) { + if (res instanceof CacheInvokeResult) { + CacheInvokeResult invokeRes = (CacheInvokeResult)res; + + if (invokeRes.result() != null) + res = CacheInvokeResult.fromResult((T)ctx.unwrapPortableIfNeeded(invokeRes.result(), + keepBinary)); + } + + return res; + } + }); + } + + return null; + } + }); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9c46fa96/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index c10f81a..084fe83 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -2408,7 +2408,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assert oldVal != null && F.eq(val1, oldVal); - assert cache.remove("key1", val1); + assert cache.remove("key1"); assert cache.get("key1") == null;