Hello! Can you please provide a runnable reproducer project? Save me the trouble of pasting all these snippets and writing the boilerplate.
Regards, -- Ilya Kasnacheev чт, 4 мар. 2021 г. в 18:21, <[email protected]>: > Ilya, unfortunately, I am unable to reproduce this issue in a pet project. > > I have face with this issue on Ignite 2.9.1 again when I have brought one > of two nodes of a cluster down: > > org.apache.ignite.IgniteCheckedException: > com.devexperts.tos.riskmonitor.domain.RmAccount cannot be cast to > com.devexperts.tos.riskmonitor.cluster.cache.cq.CacheKeyWithEventType > at > org.apache.ignite.internal.util.IgniteUtils.cast(IgniteUtils.java:7563) > [ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.util.future.GridFutureAdapter.resolve(GridFutureAdapter.java:260) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:209) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:160) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:3342) > [ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:3163) > [ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120) > [ignite-core-2.9.1.jar:2.9.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202] > Caused by: java.lang.ClassCastException: > com.devexperts.tos.riskmonitor.domain.RmAccount cannot be cast to > com.devexperts.tos.riskmonitor.cluster.cache.cq.CacheKeyWithEventType > at > com.devexperts.tos.riskmonitor.cluster.cache.distributiontracker.IgniteCacheKeysDistributionTracker$LocalCacheListener.onUpdated(IgniteCacheKeysDistributionTracker.java:365) > ~[main/:?] > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyLocalListener(CacheContinuousQueryHandler.java:1128) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback0(CacheContinuousQueryHandler.java:954) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback(CacheContinuousQueryHandler.java:895) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.addBackupNotification(GridContinuousProcessor.java:1162) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$2.flushBackupQueue(CacheContinuousQueryHandler.java:512) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.flushBackupQueue(CacheContinuousQueryManager.java:691) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:2394) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.finishExchangeOnCoordinator(GridDhtPartitionsExchangeFuture.java:3972) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onAllReceived(GridDhtPartitionsExchangeFuture.java:3687) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.distributedExchange(GridDhtPartitionsExchangeFuture.java:1729) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:943) > ~[ignite-core-2.9.1.jar:2.9.1] > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:3314) > ~[ignite-core-2.9.1.jar:2.9.1] > ... 3 more > > Cache: > > > new CacheConfiguration<K, V>() > .setSqlSchema("PUBLIC") > .setCacheMode(CacheMode.PARTITIONED) > // This means that we won't be able to use ACID compliant transactions > .setAtomicityMode(CacheAtomicityMode.ATOMIC) > // Wait only primary nodes to finish write operations, do not wait > back up nodes > > .setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC) > // The default backup factor > .setBackups(1) > .setAffinity(new RendezvousAffinityFunction(true)); > > .setName(Cache.RM_ACCOUNT.getName()) > .setKeyConfiguration(new CacheKeyConfiguration(CacheAccountKey.class)); > > > > Query initialization: > > continuousQuery = new ContinuousQueryWithTransformer<K, Object, > CacheKeyWithEventType<K>>(); > continuousQuery.setInitialQuery(new ScanQuery<>()); > continuousQuery.setAutoUnsubscribe(true); > continuousQuery.setLocalListener(new LocalCacheListener()); > continuousQuery.setRemoteFilterFactory(FactoryBuilder.factoryOf(new > CacheKeysDistributionTrackerRmtFilter<>())); > continuousQuery.setRemoteTransformerFactory(FactoryBuilder.factoryOf( > new CacheKeysDistributionTrackerRmtTransformer<>())); > > Remote transformer: > > > public class CacheKeysDistributionTrackerRmtTransformer<K> > implements IgniteClosure<CacheEntryEvent<? extends K, ?>, > CacheKeyWithEventType<K>> > { > private static final long serialVersionUID = 0; > > @Override > public CacheKeyWithEventType<K> apply(CacheEntryEvent<? extends K, ?> > event) { > return new CacheKeyWithEventType<K>(event.getKey(), > event.getEventType()); > } > } > > Local listener: > > > private class LocalCacheListener > implements > ContinuousQueryWithTransformer.EventListener<CacheKeyWithEventType<K>> > { > @Override > public void onUpdated(Iterable<? extends CacheKeyWithEventType<K>> > events) { > for (CacheKeyWithEventType<K> event : events) { > if (event.getEventType() == EventType.CREATED) { > executorService.execute(() -> > onCacheKeyCreated(event.getKey())); > } else if (event.getEventType() == EventType.REMOVED) { > executorService.execute(() -> > onCacheKeyRemoved(event.getKey())); > } > } > } > } > > > CacheKeyWithEventType: > > > public class CacheKeyWithEventType<K> implements Externalizable { > private static final long serialVersionUID = -1446749299783090657L; > > private K key; > private EventType eventType; > > public CacheKeyWithEventType(K key, EventType eventType) { > this.key = key; > this.eventType = eventType; > } > > public CacheKeyWithEventType() { > } > > public K getKey() { > return key; > } > > public EventType getEventType() { > return eventType; > } > > @Override > public void writeExternal(ObjectOutput out) throws IOException { > out.writeObject(key); > out.writeObject(eventType); > } > > @Override > public void readExternal(ObjectInput in) throws IOException, > ClassNotFoundException { > key = (K) in.readObject(); > eventType = (EventType) in.readObject(); > } > } > > Best regards, > Ivan Fedorenkov > > From: Ilya Kasnacheev <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Thursday, October 8, 2020 at 2:24 PM > To: "[email protected]" <[email protected]> > Subject: Re: Ignite 2.8.1 ContinuousQueryWithTransformer: > ClassCastException on PME > > Hello! > > You can do either one, I'll take it from there. > > Regards, > -- > Ilya Kasnacheev > > > вт, 6 окт. 2020 г. в 17:26, <[email protected]<mailto: > [email protected]>>: > Could you please guide be through the process? Should I create just a > simple project anywhere and share it here or I should create a test case in > the Ignite project? > > From: Ilya Kasnacheev <[email protected]<mailto: > [email protected]>> > Reply-To: "[email protected]<mailto:[email protected]>" < > [email protected]<mailto:[email protected]>> > Date: Tuesday, October 6, 2020 at 3:44 PM > To: "[email protected]<mailto:[email protected]>" < > [email protected]<mailto:[email protected]>> > Subject: Re: Ignite 2.8.1 ContinuousQueryWithTransformer: > ClassCastException on PME > > Hello! > > Do you have a reproducer for this issue? I think I could validate it and > create an issue if you do. > > Can you please also check if it works or not on e.g. Apache Ignite 2.9RC2? > > Regards, > -- > Ilya Kasnacheev > > > вт, 6 окт. 2020 г. в 14:30, <[email protected]<mailto: > [email protected]><mailto:[email protected] > <mailto:[email protected]>>>: > Hi everyone! > > I am getting the ClassCastException when a node from my cluster fails > over. It looks like the root cause is that nodes are loading some keys from > their backups and the CacheContinousQueryHandler is assuming that the > entries are already converted by the remote side which is a false > expectation. > > The stacktrace is: > > Caused by: java.lang.ClassCastException: > org.apache.ignite.internal.binary.BinaryObjectImpl cannot be cast to > java.lang.String > > ... > > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyLocalListener(CacheContinuousQueryHandler.java:1114) > > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback0(CacheContinuousQueryHandler.java:940) > > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback(CacheContinuousQueryHandler.java:881) > > at > org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.addBackupNotification(GridContinuousProcessor.java:1161) > > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$2.flushBackupQueue(CacheContinuousQueryHandler.java:498) > > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.flushBackupQueue(CacheContinuousQueryManager.java:687) > > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:2261) > > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.processFullMessage(GridDhtPartitionsExchangeFuture.java:4375) > > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.access$1500(GridDhtPartitionsExchangeFuture.java:148) > > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$4.apply(GridDhtPartitionsExchangeFuture.java:4054) > > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$4.apply(GridDhtPartitionsExchangeFuture.java:4042) > > at > org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:399) > > at > org.apache.ignite.internal.util.future.GridFutureAdapter.listen(GridFutureAdapter.java:354) > > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onReceiveFullMessage(GridDhtPartitionsExchangeFuture.java:4042) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.processFullPartitionUpdate(GridCachePartitionExchangeManager.java:1886) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$3.onMessage(GridCachePartitionExchangeManager.java:429) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$3.onMessage(GridCachePartitionExchangeManager.java:416) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$MessageHandler.apply(GridCachePartitionExchangeManager.java:3667) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$MessageHandler.apply(GridCachePartitionExchangeManager.java:3646) > > at > org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:1142) > > at > org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:591) > > at > org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:392) > > at > org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:318) > > at > org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$100(GridCacheIoManager.java:109) > > at > org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:308) > > at > org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1847) > > at > org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1472) > > at > org.apache.ignite.internal.managers.communication.GridIoManager.access$5200(GridIoManager.java:229) > > at > org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1367) > > The workaround is: > > public abstract class CqWithTransformerLocalListenerAdapter<T> > implements ContinuousQueryWithTransformer.EventListener<T> > { > private final Function<BinaryObject, T> localTransformer; > > protected CqWithTransformerLocalListenerAdapter(Function<BinaryObject, > T> localTransformer) { > this.localTransformer = localTransformer; > } > > /** > * The same as {@link > ContinuousQueryWithTransformer.EventListener#onUpdated(Iterable)}, > * but guarantees that entities are transformed. > */ > protected abstract void onUpdatedTransformed(Iterable<T> entities); > > @Override > public final void onUpdated(Iterable entities) { > onUpdatedTransformed(new IterableAdapter(entities)); > } > > private class IterableAdapter implements Iterable<T> { > private final Iterable<?> entities; > > public IterableAdapter(Iterable<?> entities) { > this.entities = entities; > } > > @NotNull > @Override > public Iterator<T> iterator() { > return new IteratorWrapper(entities.iterator()); > } > } > > private class IteratorWrapper implements Iterator<T> { > private final Iterator<?> underlying; > > public IteratorWrapper(Iterator<?> underlying) { > this.underlying = underlying; > } > > @Override > public boolean hasNext() { > return underlying.hasNext(); > } > > @Override > public T next() { > Object o = underlying.next(); > // Sometimes an entity may be in a binary form (see javadoc of > the enclosing class). > if (o instanceof BinaryObject) { > return localTransformer.apply((BinaryObject) o); > } > // The entity has been converted by a remote transformer > return (T) o; > } > } > } > > Best regards, > Ivan Fedorenkov >
