[ 
https://issues.apache.org/jira/browse/IGNITE-17369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Steshin resolved IGNITE-17369.
---------------------------------------
    Resolution: Won't Fix

Is not a bug. The nature of default Isolated datastreamer receiver is quick 
data loading. Mostly for pre-loading. The receiver focuses on splitting, 
batched fetching and fast recording primary and backup records. But separately. 
Datastreamer doesn't wait for backup writes or primary writes. It only waits 
for batch processed on the target node. To be consistent, other receivers 
should be used. Like DataStreamerCacheUpdaters.individual() or 
DataStreamerCacheUpdaters.batched(). User should not use the data until 
datastreamer finishes.  

> Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'.
> -----------------------------------------------------------------------------
>
>                 Key: IGNITE-17369
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17369
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Vladimir Steshin
>            Assignee: Vladimir Steshin
>            Priority: Major
>              Labels: ise, ise.lts
>         Attachments: IgniteClusterShanpshotStreamerTest.java
>
>
> Ignite fails to restore snapshot created under streamed load:
> {code:java}
> Conflict partition: PartitionKeyV2 [grpId=109386747, 
> grpName=SQL_PUBLIC_TEST_TBL1, partId=148]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
> consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, 
> partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 
> [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
> updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]]
> Conflict partition: PartitionKeyV2 [grpId=109386747, 
> grpName=SQL_PUBLIC_TEST_TBL1, partId=146]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
> consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, 
> partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 
> [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
> updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]]
> {code}
> Test (attached):
> {code:java}
>     public void testClusterSnapshotConsistencyWithStreamer() throws Exception 
> {
>         int grids = 2;
>         CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000);
>         AtomicBoolean stopLoading = new AtomicBoolean(false);
>         dfltCacheCfg = null;
>         Class.forName("org.apache.ignite.IgniteJdbcDriver");
>         String tableName = "TEST_TBL1";
>         startGrids(grids);
>         grid(0).cluster().state(ACTIVE);
>         IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, 
> stopLoading, loadNumberBeforeSnapshot);
>         loadNumberBeforeSnapshot.await();
>         grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
>         stopLoading.set(true);
>         load1.get();
>         grid(0).cache("SQL_PUBLIC_" + tableName).destroy();
>         grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, 
> F.asList("SQL_PUBLIC_TEST_TBL1")).get();
>     }
>     /** */
>     private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, 
> int backups, boolean streaming, AtomicBoolean stop,
>         CountDownLatch startSnp) {
>         return GridTestUtils.runMultiThreadedAsync(() -> {
>             if(useCache) {
>                 String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase();
>                 IgniteCache<Integer, Object> cache = grid(0)
>                     .createCache(new CacheConfiguration<Integer, 
> Object>(cacheName).setBackups(backups)
>                         .setCacheMode(CacheMode.REPLICATED));
>                 try (IgniteDataStreamer<Integer, Object> ds = 
> grid(0).dataStreamer(cacheName)) {
>                     for (int i = 0; !stop.get(); ++i) {
>                         if (streaming)
>                             ds.addData(i, new Account(i, i - 1));
>                         else
>                             cache.put(i, new Account(i, i - 1));
>                         if (startSnp.getCount() > 0)
>                             startSnp.countDown();
>                         Thread.yield();
>                     }
>                 }
>             } else {
>                 try (Connection conn = 
> DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) {
>                     createTable(conn, tblName, backups);
>                     try (PreparedStatement stmt = 
> conn.prepareStatement("INSERT INTO " + tblName +
>                         "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) {
>                         if (streaming)
>                             conn.prepareStatement("SET STREAMING 
> ON;").execute();
>                         int leftLimit = 97; // letter 'a'
>                         int rightLimit = 122; // letter'z'
>                         int targetStringLength = 15;
>                         Random rand = new Random();
> //
>                         for (int i = 0; !stop.get(); ++i) {
>                             int orgid = rand.ints(1, 0, 
> 5).findFirst().getAsInt();
>                             String val = rand.ints(leftLimit, rightLimit + 
> 1).limit(targetStringLength)
>                                 .collect(StringBuilder::new, 
> StringBuilder::appendCodePoint, StringBuilder::append)
>                                 .toString();
>                             stmt.setInt(1, i);
>                             stmt.setString(2, val);
>                             stmt.setInt(3, orgid);
>                             stmt.setInt(4, 0);
>                             stmt.executeUpdate();
>                             if (startSnp.getCount() > 0)
>                                 startSnp.countDown();
>                             Thread.yield();
>                         }
>                     }
>                 }
>                 catch (Exception e) {
>                     while (startSnp.getCount() > 0)
>                         startSnp.countDown();
>                     throw new IgniteException("Unable to load.", e);
>                 }
>             }
>         }, 1, "load-thread-" + tblName);
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to