Here is a reproducer for this btw.
Run the mainclass with program argument READER and again with argument
WRITER.
In the console for WRITER press a key (this will generate an A and 100
associated Bs)
READER subscribes to A and gets the associated B's with a scan query.
However, it takes some number of retries before all 100 arrive.
package com.testproject.server;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import javax.cache.Cache.Entry;
import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransactionProblem{
private static final Logger LOGGER =
LoggerFactory.getLogger(TransactionProblem.class);
private static class TestIgniteConfiguration extends IgniteConfiguration
{
public TestIgniteConfiguration(String name){
setWorkDirectory("c:\\data\\testproject\\"+name);
TcpDiscoveryVmIpFinder tcpPortConfig = new
TcpDiscoveryVmIpFinder();
tcpPortConfig.setAddresses(Arrays.asList("localhost:47500",
"localhost:47501"));
TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
discoverySpi.setIpFinder(tcpPortConfig);
setDiscoverySpi(discoverySpi);
setPeerClassLoadingEnabled(true);
}
}
private static class TestCacheConfiguration extends CacheConfiguration {
public TestCacheConfiguration(String name){
super(name);
setRebalanceMode(CacheRebalanceMode.SYNC);
setCacheMode(CacheMode.REPLICATED);
setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
}
}
@IgniteAsyncCallback
private static class ACallback implements
CacheEntryUpdatedListener<BinaryObject, BinaryObject> {
private final Ignite ignite;
public ACallback(Ignite ignite) {
this.ignite = ignite;
}
@Override
public void onUpdated(
Iterable<CacheEntryEvent<? extends BinaryObject, ? extends
BinaryObject>> cacheEntryEvents)
throws CacheEntryListenerException {
cacheEntryEvents.forEach(e -> {
LOGGER.info("Continuous update: {}", e);
BinaryObject b = e.getValue();
long id = b.field("ID");
LOGGER.info("ID is {}", id);
// find the B's for this A
// keep retrying until 100 are seen
int count=0;
long start = System.currentTimeMillis();
while(count<100){
count = printBs(id);
}
long end = System.currentTimeMillis();
LOGGER.info("Took {} ms to receive all B's",
(end-start));
}
);
}
private int printBs(long id) {
IgniteCache cacheB = ignite.cache("B").withKeepBinary();
ScanQuery<String, BinaryObject> scanQuery = new ScanQuery<>(
(IgniteBiPredicate<String, BinaryObject>) (key, value) ->
value
.field("PARENT_ID").equals(id));
cacheB.query(scanQuery);
List<?> scanResults = cacheB.query(scanQuery).getAll();
LOGGER.debug("Received {} scan results", scanResults.size());
return scanResults.size();
}
}
public static void main(String[] args){
String type = args.length>0?args[0]:"BLANK";
if(!"READER".equals(type) && !"WRITER".equals(type)){
throw new UnsupportedOperationException("Unknown option
"+type+". Choose one one READER or WRITER");
}
Ignite ignite = Ignition.start(new TestIgniteConfiguration(type));
LOGGER.info("Node was successfully started");
IgniteCache<String, BinaryObject> cacheA =
ignite.getOrCreateCache(new TestCacheConfiguration("A")).withKeepBinary();
IgniteCache<String, BinaryObject> cacheB =
ignite.getOrCreateCache(new TestCacheConfiguration("B")).withKeepBinary();
if("WRITER".equals(type)){
// generate A and 100 associated B's. Write them all in one
transaction
Scanner scanner = new Scanner(System.in);
while (true) {
long id=System.currentTimeMillis();
LOGGER.info("Press a key to generate an A and 100 B's with
join ID "+id);
scanner.nextLine();
BinaryObjectBuilder aBuilder = ignite.binary().builder("A");
aBuilder.setField("ID", id);
// begin a transaction
try (Transaction tx = ignite.transactions().txStart(
TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.READ_COMMITTED, 30000,
101)) {
try {
// insert an A record with ID
cacheA.put("ID_" + id, aBuilder.build());
// insert 100 B records with this PARENT_ID
BinaryObjectBuilder bBuilder =
ignite.binary().builder("B");
bBuilder.setField("PARENT_ID", id);
for (int i = 0; i < 100; i++) {
bBuilder.setField("B_ID", i);
cacheB.put("ID_" + id + "_B_" + i,
bBuilder.build());
}
tx.commit();
LOGGER.info("COMMITTED");
} catch (CacheException e) {
tx.rollback();
}
}
// end transaction
}
}
else{
// subscribe to A's and print associated B's
ContinuousQuery<BinaryObject, BinaryObject> query = new
ContinuousQuery<>();
query.setInitialQuery(new ScanQuery());
query.setLocalListener(new ACallback(ignite));
QueryCursor<Entry<BinaryObject, BinaryObject>> cur =
cacheA.query(query);
cur.forEach(entry -> {
LOGGER.info("Initial record: {}", entry);
});
}
}
}
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/