PGWelch opened a new issue, #10686: URL: https://github.com/apache/ignite/issues/10686
Using ignite 2.14.0 you can get an inconsistent read when using OPTIMISTIC SERIALIZABLE transactions and the index API. Please see this example code. We do PUTs to a cache on one thread and at the same time we query the cache using the index API on another thread. We put 100 entries in total and the query will see a partial set of items, e.g. it can see 13 items, then 32, then 73 etc. The read is inconsistent. I assumed this inconsistent read would fail when we commit the transaction surrounding the query but it doesn't fail, the OPTIMISTIC SERIALIZABLE transactions aren't protecting from the inconsistent read. I suspect the issue is that Ignite isn't acquiring a range lock when it does the index query, quoting from Wikipedia https://en.wikipedia.org/wiki/Isolation_(database_systems)#Serializable : _Also range-locks must be acquired when a SELECT query uses a ranged WHERE clause, especially to avoid the phantom reads phenomenon_ Here's the code: ``` import org.apache.commons.io.FileUtils; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCluster; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.IndexQuery; import org.apache.ignite.cache.query.IndexQueryCriteriaBuilder; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import javax.cache.Cache.Entry; import java.io.File; import java.util.HashSet; public class DebugReadConsistencyIssue1 { private static final int NB_RECS = 100; /** * Small class which stores a foreign key with an index that we query against */ public static class MyValue { @QuerySqlField(name = "fkey", orderedGroups = { @QuerySqlField.Group(name = "fkeyindex", order = 0) }) private String foreignKey; public String getForeignKey() { return foreignKey; } public void setForeignKey(String foreignKey) { this.foreignKey = foreignKey; } } public static void main(String[] args) throws Exception { // initialise ignite with data persistence and longer transaction // timeouts File dir = new File("c:\\temp\\ignite-debug"); File work = new File(dir, "work"); FileUtils.deleteDirectory(dir); work.mkdirs(); IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setIgniteHome(dir.getAbsolutePath()); cfg.setWorkDirectory(work.getAbsolutePath()); cfg.setDataStorageConfiguration(new DataStorageConfiguration()); cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().setPersistenceEnabled(true); cfg.getTransactionConfiguration().setDefaultTxTimeout(60000); cfg.getTransactionConfiguration().setDeadlockTimeout(60000); try (Ignite ignite = Ignition.start(cfg)) { // ensure cluster is active IgniteCluster cluster = ignite.cluster(); cluster.state(ClusterState.ACTIVE); // get or create the cache, setting it transactional, then clear it if it already existed CacheConfiguration<String, MyValue> config = new CacheConfiguration<>("MyValue"); config.setIndexedTypes(String.class, MyValue.class); config.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); config.setCacheMode(CacheMode.PARTITIONED); IgniteCache<String, MyValue> cache = ignite.getOrCreateCache(config); cache.clear(); // put some data values on one thread MutableBoolean putFailed = new MutableBoolean(false); Thread putThread = new Thread(() -> { if (!runPut(ignite, cache)) { putFailed.setValue(true); } }); putThread.start(); // at the same time use the index API to get all entries within the foreign key Thread queryThread = new Thread(() -> { while (runGetQuery(ignite, cache) != NB_RECS && !putFailed.booleanValue()) { } }); queryThread.start(); // wait until both threads completed putThread.join(); queryThread.join(); } finally { } } private static boolean runPut(Ignite ignite, IgniteCache<String, MyValue> cache) { // Put multiple objects, doing it slowly with sleeps in-between PUTs // so we can see that the query on the other thread sees a partial // number of objects written try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE)) { for (int j = 0; j < 100; j++) { MyValue value = new MyValue(); value.foreignKey = "fk1"; cache.put("key" + j, value); try { Thread.sleep(20); } catch (Exception e) { } System.out.print("."); } tx.commit(); System.out.println("Committed PUT transaction"); } catch (Exception e) { System.err.println("Put failed " + e.toString()); return false; } return true; } private static int runGetQuery(Ignite ignite, IgniteCache<String, MyValue> cache) { // Run query against the foreign key using the index API and report if we // see an odd number of objects after the transaction has finished HashSet<String> queryResult = new HashSet<>(); try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE)) { IndexQuery<String, MyValue> query = new IndexQuery<String, MyValue>(MyValue.class, "fkeyindex").setCriteria(IndexQueryCriteriaBuilder.eq("fkey", "fk1")); queryResult.clear(); try (QueryCursor<Entry<String, MyValue>> cursor = cache.query(query)) { cursor.getAll().forEach(entry -> { queryResult.add(entry.getKey()); }); } tx.commit(); } catch (Exception e) { System.err.println(); System.err.println("Query transaction failed: " + e.toString()); return 0; } // Transaction finished without failing if (queryResult.size() != 0 && queryResult.size() != NB_RECS) { System.err.println(); System.err.println("Found " + queryResult.size() + " entries after committed query transaction. We saw a " + "partially written dataset!!!"); } return queryResult.size(); } } ``` Typical output: _.................................................................................................... Found 1 entries after committed query transaction. We saw a partially written dataset Found 13 entries after committed query transaction. We saw a partially written dataset Found 14 entries after committed query transaction. We saw a partially written dataset Found 14 entries after committed query transaction. We saw a partially written dataset_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
