tkalkirill commented on code in PR #1615:
URL: https://github.com/apache/ignite-3/pull/1615#discussion_r1094455591


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java:
##########
@@ -642,20 +815,156 @@ void stop() throws Exception {
         NetworkAddress address() {
             return clusterService.topologyService().localMember().address();
         }
+
+        @Nullable TablePartitionId getTablePartitionId(WatchEvent event) {
+            assertTrue(event.single(), event.toString());
+
+            Entry stableAssignmentsWatchEvent = event.entryEvent().newEntry();
+
+            if (stableAssignmentsWatchEvent.value() == null) {
+                return null;
+            }
+
+            int partitionId = 
extractPartitionNumber(stableAssignmentsWatchEvent.key());
+            UUID tableId = extractTableId(stableAssignmentsWatchEvent.key(), 
STABLE_ASSIGNMENTS_PREFIX);
+
+            return new TablePartitionId(tableId, partitionId);
+        }
+
+        TablePartitionId getTablePartitionId(String tableName, int 
partitionId) {
+            InternalTable internalTable = getInternalTable(this, tableName);
+
+            return new TablePartitionId(internalTable.tableId(), partitionId);
+        }
     }
 
     /**
      * Starts the Vault component.
      */
     private static VaultManager createVault(Path workDir) {
-        Path vaultPath = workDir.resolve(Paths.get("vault"));
+        return new VaultManager(new PersistentVaultService(resolveDir(workDir, 
"vault")));
+    }
+
+    private static Path resolveDir(Path workDir, String dirName) {
+        Path newDirPath = workDir.resolve(dirName);
 
         try {
-            Files.createDirectories(vaultPath);
+            return Files.createDirectories(newDirPath);
         } catch (IOException e) {
             throw new IgniteInternalException(e);
         }
+    }
+
+    private static TableDefinition createTableDefinition(String tableName) {
+        return SchemaBuilders.tableBuilder("PUBLIC", tableName).columns(
+                SchemaBuilders.column("key", ColumnType.INT64).build(),
+                SchemaBuilders.column("val", 
ColumnType.INT32).asNullable(true).build()
+        ).withPrimaryKey("key").build();
+    }
+
+    private void createTableForOnePartition(String tableName, int replicas, 
boolean testDataStorage) {
+        assertThat(
+                nodes.get(0).tableManager.createTableAsync(
+                        tableName,
+                        tableChange -> {
+                            
SchemaConfigurationConverter.convert(createTableDefinition(tableName), 
tableChange)
+                                    .changeReplicas(replicas)
+                                    .changePartitions(1);
+
+                            if (testDataStorage) {
+                                
tableChange.changeDataStorage(dataStorageChange -> 
dataStorageChange.convert(TestDataStorageChange.class));
+                            }
+                        }
+                ),
+                willCompleteSuccessfully()
+        );
+
+        assertEquals(replicas, getPartitionClusterNodes(0, 0).size());
+        assertEquals(replicas, getPartitionClusterNodes(1, 0).size());
+        assertEquals(replicas, getPartitionClusterNodes(2, 0).size());
+    }
+
+    private void changeTableReplicasForSinglePartition(String tableName, int 
replicas) {
+        assertThat(
+                nodes.get(0).tableManager.alterTableAsync(tableName, 
tableChange -> {
+                    tableChange.changeReplicas(replicas);
+
+                    return true;
+                }),
+                willCompleteSuccessfully()
+        );
+
+        waitPartitionAssignmentsSyncedToExpected(0, replicas);
+
+        assertEquals(replicas, getPartitionClusterNodes(0, 0).size());
+        assertEquals(replicas, getPartitionClusterNodes(1, 0).size());
+        assertEquals(replicas, getPartitionClusterNodes(2, 0).size());
+    }
+
+    private static Set<Assignment> getEvictedAssignments(Set<Assignment> 
beforeChange, Set<Assignment> afterChange) {
+        Set<Assignment> result = new HashSet<>(beforeChange);
+
+        result.removeAll(afterChange);
+
+        return result;
+    }
+
+    private static @Nullable InternalTable getInternalTable(Node node, String 
tableName) {
+        Table table = node.tableManager.table(tableName);
+
+        assertNotNull(table, tableName);
+
+        return ((TableImpl) table).internalTable();
+    }
+
+    private static void checkInvokeDestroyPartitionStorages(Node node, String 
tableName, int partitionId) {
+        InternalTable internalTable = getInternalTable(node, tableName);
+
+        verify(internalTable.storage(), 
times(1)).destroyPartition(eq(partitionId));
+        verify(internalTable.txStateStorage(), 
times(1)).destroyTxStateStorage(eq(partitionId));
+    }
+
+    private static void throwExceptionOnInvokeDestroyPartitionStorages(Node 
node, String tableName, int partitionId) {
+        InternalTable internalTable = getInternalTable(node, tableName);
+
+        doAnswer(answer -> CompletableFuture.failedFuture(new 
StorageException("From test")))
+                .when(internalTable.storage())
+                .destroyPartition(eq(partitionId));
+
+        doAnswer(answer -> CompletableFuture.failedFuture(new 
IgniteInternalException("From test")))
+                .when(internalTable.txStateStorage())
+                .destroyTxStateStorage(eq(partitionId));

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to