sashapolo commented on code in PR #3446:
URL: https://github.com/apache/ignite-3/pull/3446#discussion_r1533658382
##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java:
##########
@@ -329,4 +340,41 @@ public static void startBuildingIndex(CatalogManager
catalogManager, int indexId
public static void makeIndexAvailable(CatalogManager catalogManager, int
indexId) {
assertThat(catalogManager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()),
willCompleteSuccessfully());
}
+
+ /**
+ * Adds a column to the table to catalog.
+ *
+ * @param catalogManager Catalog manager.
+ * @param schemaName Schema name.
+ * @param tableName Table name.
+ * @param columnName Column name.
+ * @param columnType Column type.
+ */
+ public static void addColumnForTable(
+ CatalogManager catalogManager,
+ String schemaName,
+ String tableName,
+ String columnName,
+ ColumnType columnType
+ ) {
+ CatalogCommand command = AlterTableAddColumnCommand.builder()
+ .schemaName(schemaName)
+ .tableName(tableName)
+
.columns(List.of(ColumnParams.builder().name(columnName).type(columnType).build()))
+ .build();
+
+ assertThat(catalogManager.execute(command),
willCompleteSuccessfully());
+ }
+
+ /**
+ * Adds a column to the table from {@link
#createSimpleTable(CatalogManager, String)} to catalog.
+ *
+ * @param catalogManager Catalog manager.
+ * @param tableName Table name.
+ * @param columnName Column name.
+ * @param columnType Column type.
+ */
+ public static void addColumnForSimpleTable(CatalogManager catalogManager,
String tableName, String columnName, ColumnType columnType) {
Review Comment:
```suggestion
public static void addColumnToSimpleTable(CatalogManager catalogManager,
String tableName, String columnName, ColumnType columnType) {
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +86,68 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of the low watermark).
+ *
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark, {@code null} if it has never been
updated.
+ */
+ // TODO: IGNITE-21771 Process or check catalog compaction
+ static List<DroppedTableInfo> droppedTables(CatalogService catalogService,
@Nullable HybridTimestamp lowWatermark) {
+ if (lowWatermark == null) {
+ return List.of();
+ }
+
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(lowWatermark.longValue());
+
+ var tableIds = catalogService.tables(lwmCatalogVersion).stream()
Review Comment:
This `var` usage is illegal, according to the code style
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +86,68 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of the low watermark).
+ *
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark, {@code null} if it has never been
updated.
+ */
+ // TODO: IGNITE-21771 Process or check catalog compaction
+ static List<DroppedTableInfo> droppedTables(CatalogService catalogService,
@Nullable HybridTimestamp lowWatermark) {
+ if (lowWatermark == null) {
+ return List.of();
+ }
+
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(lowWatermark.longValue());
+
+ var tableIds = catalogService.tables(lwmCatalogVersion).stream()
+ .map(CatalogObjectDescriptor::id)
+ .collect(toSet());
+
+ var res = new ArrayList<DroppedTableInfo>();
+
+ for (int catalogVersion = lwmCatalogVersion - 1; catalogVersion >=
earliestCatalogVersion; catalogVersion--) {
+ int finalCatalogVersion = catalogVersion;
+
+ catalogService.tables(catalogVersion).stream()
+ .map(CatalogObjectDescriptor::id)
+ .filter(tableIds::add)
Review Comment:
I also really don't like predicates that have side effects. Can we rewrite
this part without using streams?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +86,68 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of the low watermark).
+ *
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark, {@code null} if it has never been
updated.
+ */
+ // TODO: IGNITE-21771 Process or check catalog compaction
+ static List<DroppedTableInfo> droppedTables(CatalogService catalogService,
@Nullable HybridTimestamp lowWatermark) {
+ if (lowWatermark == null) {
+ return List.of();
+ }
+
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(lowWatermark.longValue());
+
+ var tableIds = catalogService.tables(lwmCatalogVersion).stream()
+ .map(CatalogObjectDescriptor::id)
+ .collect(toSet());
+
+ var res = new ArrayList<DroppedTableInfo>();
+
+ for (int catalogVersion = lwmCatalogVersion - 1; catalogVersion >=
earliestCatalogVersion; catalogVersion--) {
+ int finalCatalogVersion = catalogVersion;
+
+ catalogService.tables(catalogVersion).stream()
+ .map(CatalogObjectDescriptor::id)
+ .filter(tableIds::add)
+ .map(tableId -> new DroppedTableInfo(tableId,
finalCatalogVersion + 1))
+ .forEach(res::add);
+ }
+
+ return res;
+ }
+
+ static List<DroppedTableInfo> droppedTables0(CatalogService
catalogService, @Nullable HybridTimestamp lowWatermark) {
Review Comment:
Looks like you forgot to remove the previous version
##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java:
##########
@@ -329,4 +340,41 @@ public static void startBuildingIndex(CatalogManager
catalogManager, int indexId
public static void makeIndexAvailable(CatalogManager catalogManager, int
indexId) {
assertThat(catalogManager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()),
willCompleteSuccessfully());
}
+
+ /**
+ * Adds a column to the table to catalog.
+ *
+ * @param catalogManager Catalog manager.
+ * @param schemaName Schema name.
+ * @param tableName Table name.
+ * @param columnName Column name.
+ * @param columnType Column type.
+ */
+ public static void addColumnForTable(
Review Comment:
```suggestion
public static void addColumnToTable(
```
##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java:
##########
@@ -329,4 +340,41 @@ public static void startBuildingIndex(CatalogManager
catalogManager, int indexId
public static void makeIndexAvailable(CatalogManager catalogManager, int
indexId) {
assertThat(catalogManager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()),
willCompleteSuccessfully());
}
+
+ /**
+ * Adds a column to the table to catalog.
Review Comment:
```suggestion
* Adds a column to the table in the catalog.
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +87,44 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of low watermark).
+ *
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark, {@code null} if it has never been
updated.
+ * @return Result is sorted by the catalog version in which the table was
removed from the catalog and by the table ID.
+ */
+ // TODO: IGNITE-21771 Process or check catalog compaction
+ static List<DroppedTableInfo> droppedTables(CatalogService catalogService,
@Nullable HybridTimestamp lowWatermark) {
+ if (lowWatermark == null) {
+ return List.of();
+ }
+
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(lowWatermark.longValue());
+
+ Set<Integer> previousCatalogVersionTableIds = Set.of();
+
+ var res = new ArrayList<DroppedTableInfo>();
+
+ for (int catalogVersion = earliestCatalogVersion; catalogVersion <=
lwmCatalogVersion; catalogVersion++) {
Review Comment:
I like the new implementation
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +86,68 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of the low watermark).
+ *
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark, {@code null} if it has never been
updated.
+ */
+ // TODO: IGNITE-21771 Process or check catalog compaction
+ static List<DroppedTableInfo> droppedTables(CatalogService catalogService,
@Nullable HybridTimestamp lowWatermark) {
+ if (lowWatermark == null) {
+ return List.of();
+ }
+
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(lowWatermark.longValue());
+
+ var tableIds = catalogService.tables(lwmCatalogVersion).stream()
+ .map(CatalogObjectDescriptor::id)
+ .collect(toSet());
+
+ var res = new ArrayList<DroppedTableInfo>();
+
+ for (int catalogVersion = lwmCatalogVersion - 1; catalogVersion >=
earliestCatalogVersion; catalogVersion--) {
+ int finalCatalogVersion = catalogVersion;
+
+ catalogService.tables(catalogVersion).stream()
+ .map(CatalogObjectDescriptor::id)
+ .filter(tableIds::add)
Review Comment:
Strictly speaking, it is not a good practice to modify a set returned by the
`toSet` collector (see its javadoc), it's recommended to instantiate a
`HashSet` directly
##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java:
##########
@@ -544,6 +567,12 @@ private static List<Path> collectFilesOnly(Path start)
throws Exception {
}
}
+ private static List<Path> collectAllSubFileAndDirs(Path start) throws
Exception {
+ try (Stream<Path> fileStream = Files.find(start, Integer.MAX_VALUE,
(path, basicFileAttributes) -> !start.equals(path))) {
Review Comment:
Can we use `Files.walk` here instead? Does it return the root file as well?
##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java:
##########
@@ -329,4 +340,41 @@ public static void startBuildingIndex(CatalogManager
catalogManager, int indexId
public static void makeIndexAvailable(CatalogManager catalogManager, int
indexId) {
assertThat(catalogManager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()),
willCompleteSuccessfully());
}
+
+ /**
+ * Adds a column to the table to catalog.
+ *
+ * @param catalogManager Catalog manager.
+ * @param schemaName Schema name.
+ * @param tableName Table name.
+ * @param columnName Column name.
+ * @param columnType Column type.
+ */
+ public static void addColumnForTable(
+ CatalogManager catalogManager,
+ String schemaName,
+ String tableName,
+ String columnName,
+ ColumnType columnType
+ ) {
+ CatalogCommand command = AlterTableAddColumnCommand.builder()
+ .schemaName(schemaName)
+ .tableName(tableName)
+
.columns(List.of(ColumnParams.builder().name(columnName).type(columnType).build()))
+ .build();
+
+ assertThat(catalogManager.execute(command),
willCompleteSuccessfully());
+ }
+
+ /**
+ * Adds a column to the table from {@link
#createSimpleTable(CatalogManager, String)} to catalog.
Review Comment:
```suggestion
* Adds a column to the table from {@link
#createSimpleTable(CatalogManager, String)} in the catalog.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]