Why is the client needs to be serializable? Have you tried suggestion from this answer https://stackoverflow.com/questions/61293343/failed-to-unmarshal-discovery-data-for-component-continuous-proc-with-more-than/61318360#61318360 ?
Evgenii вт, 21 апр. 2020 г. в 00:36, AlexBor <[email protected]>: > Hi Denis, > > Both servers are looking to the same server. > Here are code samples: > > Server: > > public class IgniteServerCacheBootstrap { > > final static Logger logger = > LoggerFactory.getLogger(IgniteCacheClient.class); > > public static void main(String[] args) throws IgniteCheckedException, > InterruptedException { > > IgniteConfiguration serverConfig = new IgniteConfiguration() > .setGridLogger(new Log4J2Logger("log4j2.xml")); > > Ignite server = Ignition.start(serverConfig); > Thread.currentThread().join(); > } > > } > > > Client (I run two of such clients in parallel). Code is mostly taken from > Ignite samples: > > public class IgniteCacheClient implements Serializable { > > Logger logger = LoggerFactory.getLogger(IgniteCacheClient.class); > > private IgniteCache igniteCache; > > public IgniteCacheClient() throws IgniteCheckedException { > IgniteConfiguration clientConfig = new IgniteConfiguration() > .setGridLogger(new Log4J2Logger("log4j2.xml")) > .setClientMode(true); > > Ignite client = Ignition.getOrStart(clientConfig); > igniteCache = client.getOrCreateCache("MY_CACHE"); > } > > public void run() throws InterruptedException { > > // Create new continuous query. > ContinuousQuery<Integer, String> qry = new ContinuousQuery<>(); > > qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, > String>() { > @Override > public boolean apply(Integer key, String val) { > return key > 10; > } > })); > > // Callback that is called locally when update notifications are > received. > qry.setLocalListener(new CacheEntryUpdatedListener<Integer, > String>() { > @Override > public void onUpdated(Iterable<CacheEntryEvent<? extends > Integer, ? extends String>> evts) { > for (CacheEntryEvent<? extends Integer, ? extends String> e > : evts) > logger.info("Updated entry [key=" + e.getKey() + ", > val=" + e.getValue() + ']'); > } > }); > > // This filter will be evaluated remotely on all nodes. > // Entry that pass this filter will be sent to the caller. > qry.setRemoteFilterFactory(new > Factory<CacheEntryEventFilter<Integer, String>>() { > @Override > public CacheEntryEventFilter<Integer, String> create() { > return new CacheEntryEventFilter<Integer, String>() { > @Override > public boolean evaluate(CacheEntryEvent<? extends > Integer, ? extends String> e) { > return e.getKey() > 10; > } > }; > } > }); > > // Execute query. > QueryCursor<Cache.Entry<Integer, String>> cur = > igniteCache.query(qry); > > // Iterate through existing data. > for (Cache.Entry<Integer, String> e : cur) > logger.info("Queried existing entry [key=" + e.getKey() + ", > val=" + e.getValue() + ']'); > > Thread.currentThread().join(); > } > } > > > > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ >
