Hi,

Is it possible you change lambdas code between calls? Or may be classes are
differs on nodes?
Try to replace lambdas with static classes in your code. Will it work for
you?


On Tue, Jun 5, 2018 at 10:28 PM, Cong Guo <[email protected]> wrote:

> Hi,
>
>
>
> The stacktrace is as follows. Do I use the CacheEntryProcessor in the
> right way? May I have an example about how to use CacheEntryProcessor in
> StreamVisitor, please? Thank you!
>
>
>
> javax.cache.processor.EntryProcessorException:
> java.lang.ClassCastException: com.huawei.clusterexperiment.model.Person
> cannot be cast to org.apache.ignite.binary.BinaryObject
>
>         at org.apache.ignite.internal.processors.cache.
> CacheInvokeResult.get(CacheInvokeResult.java:102)
>
>         at org.apache.ignite.internal.processors.cache.
> IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1361)
>
>         at org.apache.ignite.internal.processors.cache.
> IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1405)
>
>         at org.apache.ignite.internal.processors.cache.
> GatewayProtectedCacheProxy.invoke(GatewayProtectedCacheProxy.java:1362)
>
>         at com.huawei.clusterexperiment.Client.lambda$streamUpdate$
> 531c8d2f$1(Client.java:337)
>
>         at org.apache.ignite.stream.StreamVisitor$1.apply(
> StreamVisitor.java:50)
>
>         at org.apache.ignite.stream.StreamVisitor$1.apply(
> StreamVisitor.java:48)
>
>         at org.apache.ignite.stream.StreamVisitor.receive(
> StreamVisitor.java:38)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:137)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor.localUpdate(DataStreamProcessor.java:397)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor.processRequest(DataStreamProcessor.java:302)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor.access$000(DataStreamProcessor.java:59)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89)
>
>         at org.apache.ignite.internal.managers.communication.
> GridIoManager.invokeListener(GridIoManager.java:1555)
>
>         at org.apache.ignite.internal.managers.communication.
> GridIoManager.processRegularMessage0(GridIoManager.java:1183)
>
>         at org.apache.ignite.internal.managers.communication.
> GridIoManager.access$4200(GridIoManager.java:126)
>
>         at org.apache.ignite.internal.managers.communication.
> GridIoManager$9.run(GridIoManager.java:1090)
>
>         at org.apache.ignite.internal.util.StripedExecutor$Stripe.
> run(StripedExecutor.java:505)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ClassCastException: 
> com.huawei.clusterexperiment.model.Person
> cannot be cast to org.apache.ignite.binary.BinaryObject
>
>         at com.huawei.clusterexperiment.Client$2.process(Client.java:340)
>
>         at org.apache.ignite.internal.processors.cache.
> EntryProcessorResourceInjectorProxy.process(EntryProcessorResourceInjector
> Proxy.java:68)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> dht.GridDhtTxPrepareFuture.onEntriesLocked(GridDhtTxPrepareFuture.java:
> 421)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> dht.GridDhtTxPrepareFuture.prepare0(GridDhtTxPrepareFuture.java:1231)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> dht.GridDhtTxPrepareFuture.mapIfLocked(GridDhtTxPrepareFuture.java:671)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> dht.GridDhtTxPrepareFuture.prepare(GridDhtTxPrepareFuture.java:1048)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> near.GridNearTxLocal.prepareAsyncLocal(GridNearTxLocal.java:3452)
>
>         at org.apache.ignite.internal.processors.cache.transactions.
> IgniteTxHandler.prepareColocatedTx(IgniteTxHandler.java:257)
>
>         at org.apache.ignite.internal.processors.cache.distributed.near.
> GridNearOptimisticTxPrepareFuture.proceedPrepare(
> GridNearOptimisticTxPrepareFuture.java:578)
>
>         at org.apache.ignite.internal.processors.cache.distributed.near.
> GridNearOptimisticTxPrepareFuture.prepareSingle(
> GridNearOptimisticTxPrepareFuture.java:405)
>
>         at org.apache.ignite.internal.processors.cache.distributed.near.
> GridNearOptimisticTxPrepareFuture.prepare0(GridNearOptimisticTxPrepareFut
> ure.java:348)
>
>         at org.apache.ignite.internal.processors.cache.distributed.near.
> GridNearOptimisticTxPrepareFutureAdapter.prepareOnTopology(
> GridNearOptimisticTxPrepareFutureAdapter.java:137)
>
>         at org.apache.ignite.internal.processors.cache.distributed.near.
> GridNearOptimisticTxPrepareFutureAdapter.prepare(
> GridNearOptimisticTxPrepareFutureAdapter.java:74)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> near.GridNearTxLocal.prepareNearTxLocal(GridNearTxLocal.java:3161)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> near.GridNearTxLocal.commitNearTxLocalAsync(GridNearTxLocal.java:3221)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> near.GridNearTxLocal.optimisticPutFuture(GridNearTxLocal.java:2391)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> near.GridNearTxLocal.putAsync0(GridNearTxLocal.java:622)
>
>         at org.apache.ignite.internal.processors.cache.distributed.
> near.GridNearTxLocal.invokeAsync(GridNearTxLocal.java:407)
>
>         at org.apache.ignite.internal.processors.cache.
> GridCacheAdapter$25.op(GridCacheAdapter.java:2486)
>
>         at org.apache.ignite.internal.processors.cache.
> GridCacheAdapter$25.op(GridCacheAdapter.java:2478)
>
>         at org.apache.ignite.internal.processors.cache.
> GridCacheAdapter.syncOp(GridCacheAdapter.java:4088)
>
>         at org.apache.ignite.internal.processors.cache.
> GridCacheAdapter.invoke0(GridCacheAdapter.java:2478)
>
>         at org.apache.ignite.internal.processors.cache.
> GridCacheAdapter.invoke(GridCacheAdapter.java:2456)
>
>         at org.apache.ignite.internal.processors.cache.
> GridCacheProxyImpl.invoke(GridCacheProxyImpl.java:588)
>
>         at org.apache.ignite.internal.processors.cache.
> IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1359)
>
>         ... 17 more
>
>
>
>
>
>
>
>
>
> *From:* Alexey Goncharuk [mailto:[email protected]]
> *Sent:* 2018年6月5日 12:32
> *To:* user <[email protected]>
> *Subject:* Re: ClassCastException When Using CacheEntryProcessor in
> StreamVisitor
>
>
>
> Hello,
>
>
>
> Can you please share the full stacktrace so we can see where the original
> ClassCastException is initiated? If it is not printed on a client, it
> should be printed on one of the server nodes.
>
>
>
> Thanks!
>
>
>
> вт, 5 июн. 2018 г. в 18:35, Cong Guo <[email protected]>:
>
> Hello,
>
>
>
> Can anyone see this email?
>
>
>
> *From:* Cong Guo
> *Sent:* 2018年6月1日 13:11
> *To:* '[email protected]' <[email protected]>
> *Subject:* ClassCastException When Using CacheEntryProcessor in
> StreamVisitor
>
>
>
> Hi,
>
>
>
> I want to use IgniteDataStreamer to handle data updates. Is it possible to
> use CacheEntryProcessor in StreamVisitor? I write a simple program as
> follows. It works on a single node, but gets a ClassCastException on two
> nodes. The two nodes are on two physical machines. I have set
>  peerClassLoadingEnabled to true on both the nodes. How do I use
> CacheEntryProcessor in StreamVisitor?
>
>
>
> The function is like:
>
>
>
> private static void streamUpdate(Ignite ignite, IgniteCache<Long, Person>
> personCache) {
>
>                                 CacheConfiguration<Long, Double>
> updateCfg = new CacheConfiguration<>("updateCache");
>
>                                 try(IgniteCache<Long, Double> updateCache
> = ignite.getOrCreateCache(updateCfg)) {
>
>                                                 try
> (IgniteDataStreamer<Long, Double> updateStmr = 
> ignite.dataStreamer(updateCache.getName()))
> {
>
>
>
>
> updateStmr.receiver(StreamVisitor.from((cache,e) -> {
>
>
> Long id = e.getKey();
>
>
> Double newVal = e.getValue();
>
>
> personCache.<Long, BinaryObject>withKeepBinary().invoke(id,
>
>
>                                     new CacheEntryProcessor<Long,
> BinaryObject, Object>() {
>
>
>                                                     public Object
> process(MutableEntry<Long, BinaryObject> entry, Object...objects) throws
> EntryProcessorException {
>
>
>
> BinaryObjectBuilder bldr = entry.getValue().toBuilder();
>
>
>
> double salary = bldr.getField("salary");
>
>
>
> bldr.setField("salary", salary+newVal);
>
>
>
> entry.setValue(bldr.build());
>
>
>
> return null;
>
>
>                                                     }
>
>
>                                     });
>
>                                                                 }));
>
>
>
>                                                                 Random
> generator = new Random();
>
>                                                                 for(long
> i=1;i<=EXP_SIZE;i++) {
>
>
> long rankey = 1+generator.nextInt(EXP_SIZE);
>
>
> updateStmr.addData(rankey, 10.0);
>
>                                                                 }
>
>                                                 }//end second try
>
>                                 }//end first try
>
> }
>
>
>
> Here the Person class is from the ignite example. There is no exception on
> a single node.
>
> The exception is like:
>
>
>
> javax.cache.processor.EntryProcessorException:
> java.lang.ClassCastException: com.huawei.clusterexperiment.model.Person
> cannot be cast to org.apache.ignite.binary.BinaryObject
>
>         at org.apache.ignite.internal.processors.cache.
> CacheInvokeResult.get(CacheInvokeResult.java:102)
>
>         at org.apache.ignite.internal.processors.cache.
> IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1361)
>
>         at org.apache.ignite.internal.processors.cache.
> IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1405)
>
>         at org.apache.ignite.internal.processors.cache.
> GatewayProtectedCacheProxy.invoke(GatewayProtectedCacheProxy.java:1362)
>
>         at com.huawei.clusterexperiment.Client.lambda$streamUpdate$
> a02be2b7$1(Client.java:310)
>
>         at org.apache.ignite.stream.StreamVisitor$1.apply(
> StreamVisitor.java:50)
>
>         at org.apache.ignite.stream.StreamVisitor$1.apply(
> StreamVisitor.java:48)
>
>         at org.apache.ignite.stream.StreamVisitor.receive(
> StreamVisitor.java:38)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:137)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor.localUpdate(DataStreamProcessor.java:397)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor.processRequest(DataStreamProcessor.java:302)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor.access$000(DataStreamProcessor.java:59)
>
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89)
>
>         at org.apache.ignite.internal.managers.communication.
> GridIoManager.invokeListener(GridIoManager.java:1555)
>
>         at org.apache.ignite.internal.managers.communication.
> GridIoManager.processRegularMessage0(GridIoManager.java:1183)
>
>         at org.apache.ignite.internal.managers.communication.
> GridIoManager.access$4200(GridIoManager.java:126)
>
>         at org.apache.ignite.internal.managers.communication.
> GridIoManager$9.run(GridIoManager.java:1090)
>
>         at org.apache.ignite.internal.util.StripedExecutor$Stripe.
> run(StripedExecutor.java:505)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
>


-- 
Best regards,
Andrey V. Mashenkov

Reply via email to