Dmitry Uskov created IGNITE-7868: ------------------------------------ Summary: Continuous Query LocalListener on backup node works unstable Key: IGNITE-7868 URL: https://issues.apache.org/jira/browse/IGNITE-7868 Project: Ignite Issue Type: Bug Affects Versions: 2.3 Reporter: Dmitry Uskov
I have two nodes (*node1* and *node2*) with configuration: {code:java} Ignition.start(); ProgramContext.ignite = Ignition.ignite(); //ProgramContext.ignite is public static field String cacheName = "A-Cache"; CacheConfiguration<CacheKey, CacheValue> cacheConfiguration = new CacheConfiguration<>(); cacheConfiguration.setName(cacheName); cacheConfiguration.setAtomicityMode(CacheAtomicityMode.ATOMIC); cacheConfiguration.setCacheMode(CacheMode.PARTITIONED); cacheConfiguration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); cacheConfiguration.setBackups(1); IgniteCache<CacheKey, CacheValue> cache = ProgramContext.ignite.getOrCreateCache(cacheConfiguration); ContinuousQuery<CacheKey, CacheValue> query = new ContinuousQuery<>(); ClusterNode node = ProgramContext.ignite.cluster().localNode(); query.setRemoteFilterFactory((Factory<CacheEntryEventFilter<CacheKey, CacheValue>>) () -> (CacheEntryEventFilter<CacheKey, CacheValue>) event -> { boolean currentNodeIsPrimary = node.equals(ProgramContext.ignite.cluster().localNode()); boolean eventTypeIsCreated = event.getEventType().equals(javax.cache.event.EventType.CREATED); return currentNodeIsPrimary && eventTypeIsCreated; }); query.setLocalListener(cacheEntryEvents -> { for (CacheEntryEvent<? extends CacheKey, ? extends CacheValue> cacheEntryEvent : cacheEntryEvents) { System.out.println("CacheEntryUpdatedListener: " + cacheEntryEvent.getKey() + "/" + cacheEntryEvent.getValue()); } }); cache.query(query); {code} CacheKey: {code:java} public class CacheKey { public static final String DEFAULT_AFFINITY_KEY = "cons"; private static String affinityKeyValue = DEFAULT_AFFINITY_KEY; private String key; @AffinityKeyMapped private String affinityKey; public CacheKey(String key) { this.key = key; this.affinityKey = affinityKeyValue; //always DEFAULT_AFFINITY_KEY } //... getters/setters and toString() }{code} CacheValue: {code:java} public class CacheValue { private String value; public CacheValue(String value) { this.value = value; } //... getters/setters and toString() }{code} I put two values in cache. As affinityKey always equals DAFAULT_AFFINITY_KEY one node (suppose *node1*) is primary for both values. LocalListener has worked twice on *node1*. Console output: {noformat} CacheEntryUpdatedListener: CacheKey{key='node1-0', affinityKey='cons'}/CacheValue{value='node1-0'} CacheEntryUpdatedListener: CacheKey{key='node1-1', affinityKey='cons'}/CacheValue{value='node1-1'} {noformat} Then I stoped it and I saw LocalListener on *node2* has worked once, but I expected LocalListener on *node2* had to work twice. Console out node1: {noformat} [12:36:36] Ignite node stopped OK [uptime=00:00:14.243]{noformat} Console out node2: {noformat} [12:36:36] Topology snapshot [ver=3, servers=1, clients=0, CPUs=4, heap=1.6GB] CacheEntryUpdatedListener: CacheKey{key='node1-1', affinityKey='cons'}/CacheValue{value='node1-1'}{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)