Hi Denis,
Both servers are looking to the same server.
Here are code samples:
Server:
public class IgniteServerCacheBootstrap {
final static Logger logger =
LoggerFactory.getLogger(IgniteCacheClient.class);
public static void main(String[] args) throws IgniteCheckedException,
InterruptedException {
IgniteConfiguration serverConfig = new IgniteConfiguration()
.setGridLogger(new Log4J2Logger("log4j2.xml"));
Ignite server = Ignition.start(serverConfig);
Thread.currentThread().join();
}
}
Client (I run two of such clients in parallel). Code is mostly taken from
Ignite samples:
public class IgniteCacheClient implements Serializable {
Logger logger = LoggerFactory.getLogger(IgniteCacheClient.class);
private IgniteCache igniteCache;
public IgniteCacheClient() throws IgniteCheckedException {
IgniteConfiguration clientConfig = new IgniteConfiguration()
.setGridLogger(new Log4J2Logger("log4j2.xml"))
.setClientMode(true);
Ignite client = Ignition.getOrStart(clientConfig);
igniteCache = client.getOrCreateCache("MY_CACHE");
}
public void run() throws InterruptedException {
// Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer,
String>() {
@Override
public boolean apply(Integer key, String val) {
return key > 10;
}
}));
// Callback that is called locally when update notifications are
received.
qry.setLocalListener(new CacheEntryUpdatedListener<Integer,
String>() {
@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends
Integer, ? extends String>> evts) {
for (CacheEntryEvent<? extends Integer, ? extends String> e
: evts)
logger.info("Updated entry [key=" + e.getKey() + ",
val=" + e.getValue() + ']');
}
});
// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilterFactory(new
Factory<CacheEntryEventFilter<Integer, String>>() {
@Override
public CacheEntryEventFilter<Integer, String> create() {
return new CacheEntryEventFilter<Integer, String>() {
@Override
public boolean evaluate(CacheEntryEvent<? extends
Integer, ? extends String> e) {
return e.getKey() > 10;
}
};
}
});
// Execute query.
QueryCursor<Cache.Entry<Integer, String>> cur =
igniteCache.query(qry);
// Iterate through existing data.
for (Cache.Entry<Integer, String> e : cur)
logger.info("Queried existing entry [key=" + e.getKey() + ",
val=" + e.getValue() + ']');
Thread.currentThread().join();
}
}
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/