Hi, testing some failover scenarios etc...
When we call cache.getAsync() and the state of the cluster is not active.
It seems to block.
I implemented a cache repository as follows and using Vertx.io. It seems to
block at the cacheOperation.apply(cache)
So when I call myRepo.get(myKey) which underneath applies the
cache.getAsync() function it blocks.
public class IgniteCacheRepository<K,V> implements CacheRepository<K,V> {
public final long DEFAULT_OPERATION_TIMEOUT = 1000;
private final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
private Vertx vertx;
private IgniteCache<K, V> cache;
public IgniteCacheRepository(Vertx vertx, IgniteCache<K, V> cache) {
this.vertx = vertx;
this.cache = cache;
}
@Override
public Future<Void> put(K key, V value) {
return executeAsync(cache -> cache.putAsync(key, value),
DEFAULT_OPERATION_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
}
@Override
public Future<V> get(K key) {
return executeAsync(cache -> cache.getAsync(key),
DEFAULT_OPERATION_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
}
@Override
public <T> Future<T> invoke(K key, EntryProcessor<K, V, T>
processor, Object... arguments) {
return executeAsync(cache -> cache.invokeAsync(key, processor,
arguments), DEFAULT_OPERATION_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
}
@Override
public <T> T cache() {
return (T) cache;
}
/**
* Adapt Ignite async operation to vertx futures.
*
* @param cacheOperation The ignite operation to execute async.
* @return The value from the cache operation.
*/
private <T> Future<T> executeAsync(Function<IgniteCache<K, V>,
IgniteFuture<T>> cacheOperation, long timeout, TimeUnit unit) {
Future<T> future = Future.future();
try {
IgniteFuture<T> value = cacheOperation.apply(cache);
value.listenAsync(result -> {
try {
future.complete(result.get(timeout, unit));
} catch(Exception ex) {
future.fail(ex);
}
},
VertxIgniteExecutorAdapter.getOrCreate(vertx.getOrCreateContext()));
} catch(Exception ex) {
// Catch some RuntimeException that can be thrown by Ignite cache.
future.fail(ex);
}
return future;
}
}