Hello, Can anyone see this email?
From: Cong Guo Sent: 2018年6月1日 13:11 To: '[email protected]' <[email protected]> Subject: ClassCastException When Using CacheEntryProcessor in StreamVisitor Hi, I want to use IgniteDataStreamer to handle data updates. Is it possible to use CacheEntryProcessor in StreamVisitor? I write a simple program as follows. It works on a single node, but gets a ClassCastException on two nodes. The two nodes are on two physical machines. I have set peerClassLoadingEnabled to true on both the nodes. How do I use CacheEntryProcessor in StreamVisitor? The function is like: private static void streamUpdate(Ignite ignite, IgniteCache<Long, Person> personCache) { CacheConfiguration<Long, Double> updateCfg = new CacheConfiguration<>("updateCache"); try(IgniteCache<Long, Double> updateCache = ignite.getOrCreateCache(updateCfg)) { try (IgniteDataStreamer<Long, Double> updateStmr = ignite.dataStreamer(updateCache.getName())) { updateStmr.receiver(StreamVisitor.from((cache,e) -> { Long id = e.getKey(); Double newVal = e.getValue(); personCache.<Long, BinaryObject>withKeepBinary().invoke(id, new CacheEntryProcessor<Long, BinaryObject, Object>() { public Object process(MutableEntry<Long, BinaryObject> entry, Object...objects) throws EntryProcessorException { BinaryObjectBuilder bldr = entry.getValue().toBuilder(); double salary = bldr.getField("salary"); bldr.setField("salary", salary+newVal); entry.setValue(bldr.build()); return null; } }); })); Random generator = new Random(); for(long i=1;i<=EXP_SIZE;i++) { long rankey = 1+generator.nextInt(EXP_SIZE); updateStmr.addData(rankey, 10.0); } }//end second try }//end first try } Here the Person class is from the ignite example. There is no exception on a single node. The exception is like: javax.cache.processor.EntryProcessorException: java.lang.ClassCastException: com.huawei.clusterexperiment.model.Person cannot be cast to org.apache.ignite.binary.BinaryObject at org.apache.ignite.internal.processors.cache.CacheInvokeResult.get(CacheInvokeResult.java:102) at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1361) at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1405) at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.invoke(GatewayProtectedCacheProxy.java:1362) at com.huawei.clusterexperiment.Client.lambda$streamUpdate$a02be2b7$1(Client.java:310) at org.apache.ignite.stream.StreamVisitor$1.apply(StreamVisitor.java:50) at org.apache.ignite.stream.StreamVisitor$1.apply(StreamVisitor.java:48) at org.apache.ignite.stream.StreamVisitor.receive(StreamVisitor.java:38) at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:137) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:397) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:302) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:59) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89) at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555) at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183) at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126) at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090) at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505) at java.lang.Thread.run(Thread.java:745)
