alievmirza commented on code in PR #5468: URL: https://github.com/apache/ignite-3/pull/5468#discussion_r2011830065
########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptorSerializers.java: ########## @@ -57,13 +60,14 @@ public CatalogSchemaDescriptor readFrom(CatalogObjectDataInput input) throws IOE int id = input.readVarIntAsInt(); String name = input.readUTF(); - long updateToken = input.readVarInt(); + long updateTimestampLong = input.readVarInt(); Review Comment: The same question as for `HashIndexDescriptorSerializerV2` ########## modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java: ########## @@ -130,6 +141,42 @@ public void testCompaction() throws InterruptedException { assertEquals(dataNodesBeforeNodeStop, dataNodes(ignite, zoneId, beforeNodesStop)); } + @Test + public void testCompactionDuringRebalancing() throws InterruptedException { + sql("create zone " + ZONE_NAME + " with partitions=1, storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'" + + ", data_nodes_auto_adjust_scale_down=0"); + sql("create table " + TABLE_NAME + " (id int primary key) zone " + ZONE_NAME); + sql("insert into " + TABLE_NAME + " values (1)"); + sql("alter zone " + ZONE_NAME + " set replicas=2"); + + IgniteImpl ignite = unwrapIgniteImpl(CLUSTER.node(0)); + + int zoneId = ignite.catalogManager().activeCatalog(ignite.clock().now().longValue()).zone(ZONE_NAME).id(); + int tableId = ignite.catalogManager().activeCatalog(ignite.clock().now().longValue()).tables() + .stream() + .filter(t -> t.name().equals(TABLE_NAME)) + .findFirst() + .orElseThrow() + .id(); + + MetaStorageManager metaStorageManager = ignite.metaStorageManager(); + + // Wait for the rebalancing to finish. + assertTrue(waitForCondition(() -> { Review Comment: let's check that stable has changed, something like ``` assertValueInStorage( metaStorageManager, stablePartAssignmentsKey(partId), (v) -> Assignments.fromBytes(v).nodes() .stream().map(Assignment::consistentId).collect(Collectors.toSet()), Set.of(node(0).name()), TIMEOUT_MILLIS ); ``` ########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSortedIndexDescriptorSerializers.java: ########## @@ -50,21 +53,22 @@ public SortedIndexDescriptorSerializerV1(CatalogEntrySerializerProvider serializ public CatalogSortedIndexDescriptor readFrom(CatalogObjectDataInput input) throws IOException { int id = input.readVarIntAsInt(); String name = input.readUTF(); - long updateToken = input.readVarInt(); + long updateTimestampLong = input.readVarInt(); Review Comment: the same question as for `HashIndexDescriptorSerializerV2` ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java: ########## @@ -597,6 +597,9 @@ public static CompletableFuture<Void> handleReduceChanged( ByteArray changeTriggerKey = ZoneRebalanceUtil.pendingChangeTriggerKey(partId); byte[] rev = ByteUtils.longToBytesKeepingOrder(entry.revision()); + ByteArray changeTimestampKey = ZoneRebalanceUtil.pendingChangeTriggerKey(partId); Review Comment: Why did you introduce new key? can't we reuse ` changeTriggerKey`? ########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptorSerializers.java: ########## @@ -44,7 +48,8 @@ public ZoneDescriptorSerializerV1(CatalogEntrySerializerProvider serializers) { public CatalogZoneDescriptor readFrom(CatalogObjectDataInput input) throws IOException { int id = input.readVarIntAsInt(); String name = input.readUTF(); - long updateToken = input.readVarInt(); + long updateTimestampLong = input.readVarInt(); Review Comment: the same question as for `HashIndexDescriptorSerializerV2` ########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptorSerializers.java: ########## @@ -49,7 +52,8 @@ public TableDescriptorSerializerV1(CatalogEntrySerializerProvider serializers) { public CatalogTableDescriptor readFrom(CatalogObjectDataInput input) throws IOException { int id = input.readVarIntAsInt(); String name = input.readUTF(); - long updateToken = input.readVarInt(); + long updateTimestampLong = input.readVarInt(); Review Comment: the same question as for `HashIndexDescriptorSerializerV2` ########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorSerializers.java: ########## @@ -53,14 +56,15 @@ public CatalogSystemViewDescriptor readFrom(CatalogObjectDataInput input) throws int id = input.readVarIntAsInt(); int schemaId = input.readVarIntAsInt(); String name = input.readUTF(); - long updateToken = input.readVarInt(); + long updateTimestampLong = input.readVarInt(); Review Comment: the same question as for `HashIndexDescriptorSerializerV2` ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java: ########## @@ -280,16 +280,12 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) { * {@link IllegalArgumentException} if causalityToken or zoneId is not valid. * {@link DistributionZoneNotFoundException} if the zone with the provided zoneId does not exist. * - * @param causalityToken Causality token. + * @param timestamp Timestamp. * @param catalogVersion Catalog version. * @param zoneId Zone id. * @return The future with data nodes for the zoneId. */ - public CompletableFuture<Set<String>> dataNodes(long causalityToken, int catalogVersion, int zoneId) { - if (causalityToken < 1) { - throw new IllegalArgumentException("causalityToken must be greater then zero [causalityToken=" + causalityToken + '"'); - } - + public CompletableFuture<Set<String>> dataNodes(HybridTimestamp timestamp, int catalogVersion, int zoneId) { Review Comment: javadoc must be updated accordingly ########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogHashIndexDescriptorSerializers.java: ########## @@ -43,21 +46,22 @@ static class HashIndexDescriptorSerializerV1 implements CatalogObjectSerializer< public CatalogHashIndexDescriptor readFrom(CatalogObjectDataInput input) throws IOException { int id = input.readVarIntAsInt(); String name = input.readUTF(); - long updateToken = input.readVarInt(); + long updateTimestampLong = input.readVarInt(); Review Comment: I do not understand, why do you change `HashIndexDescriptorSerializerV1`? You brake compatibility with this changes. As far as I can understand, you only need to change `HashIndexDescriptorSerializerV2` But please check with sql folks regarding the correct solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org