Ilya, unfortunately, I am unable to reproduce this issue in a pet project.
I have face with this issue on Ignite 2.9.1 again when I have brought one of
two nodes of a cluster down:
org.apache.ignite.IgniteCheckedException:
com.devexperts.tos.riskmonitor.domain.RmAccount cannot be cast to
com.devexperts.tos.riskmonitor.cluster.cache.cq.CacheKeyWithEventType
at
org.apache.ignite.internal.util.IgniteUtils.cast(IgniteUtils.java:7563)
[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.util.future.GridFutureAdapter.resolve(GridFutureAdapter.java:260)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:209)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:160)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:3342)
[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:3163)
[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)
[ignite-core-2.9.1.jar:2.9.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
Caused by: java.lang.ClassCastException:
com.devexperts.tos.riskmonitor.domain.RmAccount cannot be cast to
com.devexperts.tos.riskmonitor.cluster.cache.cq.CacheKeyWithEventType
at
com.devexperts.tos.riskmonitor.cluster.cache.distributiontracker.IgniteCacheKeysDistributionTracker$LocalCacheListener.onUpdated(IgniteCacheKeysDistributionTracker.java:365)
~[main/:?]
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyLocalListener(CacheContinuousQueryHandler.java:1128)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback0(CacheContinuousQueryHandler.java:954)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback(CacheContinuousQueryHandler.java:895)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.addBackupNotification(GridContinuousProcessor.java:1162)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$2.flushBackupQueue(CacheContinuousQueryHandler.java:512)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.flushBackupQueue(CacheContinuousQueryManager.java:691)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:2394)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.finishExchangeOnCoordinator(GridDhtPartitionsExchangeFuture.java:3972)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onAllReceived(GridDhtPartitionsExchangeFuture.java:3687)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.distributedExchange(GridDhtPartitionsExchangeFuture.java:1729)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:943)
~[ignite-core-2.9.1.jar:2.9.1]
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:3314)
~[ignite-core-2.9.1.jar:2.9.1]
... 3 more
Cache:
new CacheConfiguration<K, V>()
.setSqlSchema("PUBLIC")
.setCacheMode(CacheMode.PARTITIONED)
// This means that we won't be able to use ACID compliant transactions
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
// Wait only primary nodes to finish write operations, do not wait back up
nodes
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC)
// The default backup factor
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction(true));
.setName(Cache.RM_ACCOUNT.getName())
.setKeyConfiguration(new CacheKeyConfiguration(CacheAccountKey.class));
Query initialization:
continuousQuery = new ContinuousQueryWithTransformer<K, Object,
CacheKeyWithEventType<K>>();
continuousQuery.setInitialQuery(new ScanQuery<>());
continuousQuery.setAutoUnsubscribe(true);
continuousQuery.setLocalListener(new LocalCacheListener());
continuousQuery.setRemoteFilterFactory(FactoryBuilder.factoryOf(new
CacheKeysDistributionTrackerRmtFilter<>()));
continuousQuery.setRemoteTransformerFactory(FactoryBuilder.factoryOf(
new CacheKeysDistributionTrackerRmtTransformer<>()));
Remote transformer:
public class CacheKeysDistributionTrackerRmtTransformer<K>
implements IgniteClosure<CacheEntryEvent<? extends K, ?>,
CacheKeyWithEventType<K>>
{
private static final long serialVersionUID = 0;
@Override
public CacheKeyWithEventType<K> apply(CacheEntryEvent<? extends K, ?> event)
{
return new CacheKeyWithEventType<K>(event.getKey(), event.getEventType());
}
}
Local listener:
private class LocalCacheListener
implements
ContinuousQueryWithTransformer.EventListener<CacheKeyWithEventType<K>>
{
@Override
public void onUpdated(Iterable<? extends CacheKeyWithEventType<K>> events) {
for (CacheKeyWithEventType<K> event : events) {
if (event.getEventType() == EventType.CREATED) {
executorService.execute(() -> onCacheKeyCreated(event.getKey()));
} else if (event.getEventType() == EventType.REMOVED) {
executorService.execute(() -> onCacheKeyRemoved(event.getKey()));
}
}
}
}
CacheKeyWithEventType:
public class CacheKeyWithEventType<K> implements Externalizable {
private static final long serialVersionUID = -1446749299783090657L;
private K key;
private EventType eventType;
public CacheKeyWithEventType(K key, EventType eventType) {
this.key = key;
this.eventType = eventType;
}
public CacheKeyWithEventType() {
}
public K getKey() {
return key;
}
public EventType getEventType() {
return eventType;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(key);
out.writeObject(eventType);
}
@Override
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
key = (K) in.readObject();
eventType = (EventType) in.readObject();
}
}
Best regards,
Ivan Fedorenkov
From: Ilya Kasnacheev <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, October 8, 2020 at 2:24 PM
To: "[email protected]" <[email protected]>
Subject: Re: Ignite 2.8.1 ContinuousQueryWithTransformer: ClassCastException on
PME
Hello!
You can do either one, I'll take it from there.
Regards,
--
Ilya Kasnacheev
вт, 6 окт. 2020 г. в 17:26,
<[email protected]<mailto:[email protected]>>:
Could you please guide be through the process? Should I create just a simple
project anywhere and share it here or I should create a test case in the Ignite
project?
From: Ilya Kasnacheev
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Tuesday, October 6, 2020 at 3:44 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Ignite 2.8.1 ContinuousQueryWithTransformer: ClassCastException on
PME
Hello!
Do you have a reproducer for this issue? I think I could validate it and create
an issue if you do.
Can you please also check if it works or not on e.g. Apache Ignite 2.9RC2?
Regards,
--
Ilya Kasnacheev
вт, 6 окт. 2020 г. в 14:30,
<[email protected]<mailto:[email protected]><mailto:[email protected]<mailto:[email protected]>>>:
Hi everyone!
I am getting the ClassCastException when a node from my cluster fails over. It
looks like the root cause is that nodes are loading some keys from their
backups and the CacheContinousQueryHandler is assuming that the entries are
already converted by the remote side which is a false expectation.
The stacktrace is:
Caused by: java.lang.ClassCastException:
org.apache.ignite.internal.binary.BinaryObjectImpl cannot be cast to
java.lang.String
...
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyLocalListener(CacheContinuousQueryHandler.java:1114)
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback0(CacheContinuousQueryHandler.java:940)
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback(CacheContinuousQueryHandler.java:881)
at
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.addBackupNotification(GridContinuousProcessor.java:1161)
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$2.flushBackupQueue(CacheContinuousQueryHandler.java:498)
at
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.flushBackupQueue(CacheContinuousQueryManager.java:687)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:2261)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.processFullMessage(GridDhtPartitionsExchangeFuture.java:4375)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.access$1500(GridDhtPartitionsExchangeFuture.java:148)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$4.apply(GridDhtPartitionsExchangeFuture.java:4054)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$4.apply(GridDhtPartitionsExchangeFuture.java:4042)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:399)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.listen(GridFutureAdapter.java:354)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onReceiveFullMessage(GridDhtPartitionsExchangeFuture.java:4042)
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.processFullPartitionUpdate(GridCachePartitionExchangeManager.java:1886)
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$3.onMessage(GridCachePartitionExchangeManager.java:429)
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$3.onMessage(GridCachePartitionExchangeManager.java:416)
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$MessageHandler.apply(GridCachePartitionExchangeManager.java:3667)
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$MessageHandler.apply(GridCachePartitionExchangeManager.java:3646)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:1142)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:591)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:392)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:318)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$100(GridCacheIoManager.java:109)
at
org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:308)
at
org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1847)
at
org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1472)
at
org.apache.ignite.internal.managers.communication.GridIoManager.access$5200(GridIoManager.java:229)
at
org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1367)
The workaround is:
public abstract class CqWithTransformerLocalListenerAdapter<T>
implements ContinuousQueryWithTransformer.EventListener<T>
{
private final Function<BinaryObject, T> localTransformer;
protected CqWithTransformerLocalListenerAdapter(Function<BinaryObject, T>
localTransformer) {
this.localTransformer = localTransformer;
}
/**
* The same as {@link
ContinuousQueryWithTransformer.EventListener#onUpdated(Iterable)},
* but guarantees that entities are transformed.
*/
protected abstract void onUpdatedTransformed(Iterable<T> entities);
@Override
public final void onUpdated(Iterable entities) {
onUpdatedTransformed(new IterableAdapter(entities));
}
private class IterableAdapter implements Iterable<T> {
private final Iterable<?> entities;
public IterableAdapter(Iterable<?> entities) {
this.entities = entities;
}
@NotNull
@Override
public Iterator<T> iterator() {
return new IteratorWrapper(entities.iterator());
}
}
private class IteratorWrapper implements Iterator<T> {
private final Iterator<?> underlying;
public IteratorWrapper(Iterator<?> underlying) {
this.underlying = underlying;
}
@Override
public boolean hasNext() {
return underlying.hasNext();
}
@Override
public T next() {
Object o = underlying.next();
// Sometimes an entity may be in a binary form (see javadoc of the
enclosing class).
if (o instanceof BinaryObject) {
return localTransformer.apply((BinaryObject) o);
}
// The entity has been converted by a remote transformer
return (T) o;
}
}
}
Best regards,
Ivan Fedorenkov