lowka commented on code in PR #4256:
URL: https://github.com/apache/ignite-3/pull/4256#discussion_r1773597316
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -387,24 +435,61 @@ private CompletableFuture<Boolean>
tryCompactCatalog(Catalog catalog, LogicalTop
}
return
catalogManagerFacade.compactCatalog(catalog.version());
+ }).whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Catalog compaction has failed
[timestamp={}].", ex, minRequiredTime);
+ } else {
+ if (res) {
+ LOG.info("Catalog compaction completed
successfully [timestamp={}].", minRequiredTime);
+ } else {
+ LOG.info("Catalog compaction skipped
[timestamp={}].", minRequiredTime);
+ }
+ }
});
}
- private CompletableFuture<Set<String>> requiredNodes(Catalog catalog) {
+ private CompletableFuture<Pair<Boolean, Set<String>>> validatePartitions(
+ Catalog catalog,
+ Map<Integer, BitSet> localPartitions,
+ Map<String, Map<Integer, BitSet>> remotePartitions
+ ) {
HybridTimestamp nowTs = clockService.now();
- Set<String> required = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ ConcurrentHashMap<String, CurrentPartitions> currentPartitionsPerNode
= new ConcurrentHashMap<>();
return CompletableFutures.allOf(catalog.tables().stream()
- .map(table -> collectRequiredNodes(catalog, table, required,
nowTs))
+ .map(table -> collectRequiredNodes(catalog, table, nowTs,
currentPartitionsPerNode))
.collect(Collectors.toList())
- ).thenApply(ignore -> required);
+ ).thenApply(ignore -> {
+
+ Map<String, Map<Integer, BitSet>> all = new
HashMap<>(remotePartitions);
+ all.put(localNodeName, localPartitions);
+
+ Set<String> required = currentPartitionsPerNode.keySet();
+
+ for (Map.Entry<String, Map<Integer, BitSet>> node :
all.entrySet()) {
+ String nodeId = node.getKey();
+ Map<Integer, BitSet> tables = node.getValue();
+ CurrentPartitions partitionsPerNode =
currentPartitionsPerNode.get(nodeId);
Review Comment:
Fixed.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -387,24 +435,61 @@ private CompletableFuture<Boolean>
tryCompactCatalog(Catalog catalog, LogicalTop
}
return
catalogManagerFacade.compactCatalog(catalog.version());
+ }).whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Catalog compaction has failed
[timestamp={}].", ex, minRequiredTime);
+ } else {
+ if (res) {
+ LOG.info("Catalog compaction completed
successfully [timestamp={}].", minRequiredTime);
+ } else {
+ LOG.info("Catalog compaction skipped
[timestamp={}].", minRequiredTime);
+ }
+ }
});
}
- private CompletableFuture<Set<String>> requiredNodes(Catalog catalog) {
+ private CompletableFuture<Pair<Boolean, Set<String>>> validatePartitions(
+ Catalog catalog,
+ Map<Integer, BitSet> localPartitions,
+ Map<String, Map<Integer, BitSet>> remotePartitions
+ ) {
HybridTimestamp nowTs = clockService.now();
- Set<String> required = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ ConcurrentHashMap<String, CurrentPartitions> currentPartitionsPerNode
= new ConcurrentHashMap<>();
return CompletableFutures.allOf(catalog.tables().stream()
- .map(table -> collectRequiredNodes(catalog, table, required,
nowTs))
+ .map(table -> collectRequiredNodes(catalog, table, nowTs,
currentPartitionsPerNode))
.collect(Collectors.toList())
- ).thenApply(ignore -> required);
+ ).thenApply(ignore -> {
+
+ Map<String, Map<Integer, BitSet>> all = new
HashMap<>(remotePartitions);
+ all.put(localNodeName, localPartitions);
+
+ Set<String> required = currentPartitionsPerNode.keySet();
+
+ for (Map.Entry<String, Map<Integer, BitSet>> node :
all.entrySet()) {
+ String nodeId = node.getKey();
+ Map<Integer, BitSet> tables = node.getValue();
+ CurrentPartitions partitionsPerNode =
currentPartitionsPerNode.get(nodeId);
+
+ if (partitionsPerNode == null) {
+ return new Pair<>(false, required);
+ } else {
+ Map<Integer, BitSet> data = partitionsPerNode.data();
+ if (!data.equals(tables)) {
Review Comment:
Done. It now checks for inclusion.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -387,24 +435,61 @@ private CompletableFuture<Boolean>
tryCompactCatalog(Catalog catalog, LogicalTop
}
return
catalogManagerFacade.compactCatalog(catalog.version());
+ }).whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Catalog compaction has failed
[timestamp={}].", ex, minRequiredTime);
+ } else {
+ if (res) {
+ LOG.info("Catalog compaction completed
successfully [timestamp={}].", minRequiredTime);
+ } else {
+ LOG.info("Catalog compaction skipped
[timestamp={}].", minRequiredTime);
+ }
+ }
});
}
- private CompletableFuture<Set<String>> requiredNodes(Catalog catalog) {
+ private CompletableFuture<Pair<Boolean, Set<String>>> validatePartitions(
+ Catalog catalog,
+ Map<Integer, BitSet> localPartitions,
+ Map<String, Map<Integer, BitSet>> remotePartitions
+ ) {
HybridTimestamp nowTs = clockService.now();
- Set<String> required = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ ConcurrentHashMap<String, CurrentPartitions> currentPartitionsPerNode
= new ConcurrentHashMap<>();
return CompletableFutures.allOf(catalog.tables().stream()
- .map(table -> collectRequiredNodes(catalog, table, required,
nowTs))
+ .map(table -> collectRequiredNodes(catalog, table, nowTs,
currentPartitionsPerNode))
.collect(Collectors.toList())
- ).thenApply(ignore -> required);
+ ).thenApply(ignore -> {
+
+ Map<String, Map<Integer, BitSet>> all = new
HashMap<>(remotePartitions);
+ all.put(localNodeName, localPartitions);
+
+ Set<String> required = currentPartitionsPerNode.keySet();
+
+ for (Map.Entry<String, Map<Integer, BitSet>> node :
all.entrySet()) {
+ String nodeId = node.getKey();
+ Map<Integer, BitSet> tables = node.getValue();
+ CurrentPartitions partitionsPerNode =
currentPartitionsPerNode.get(nodeId);
+
+ if (partitionsPerNode == null) {
Review Comment:
Fixed.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -387,24 +435,61 @@ private CompletableFuture<Boolean>
tryCompactCatalog(Catalog catalog, LogicalTop
}
return
catalogManagerFacade.compactCatalog(catalog.version());
+ }).whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Catalog compaction has failed
[timestamp={}].", ex, minRequiredTime);
+ } else {
+ if (res) {
+ LOG.info("Catalog compaction completed
successfully [timestamp={}].", minRequiredTime);
+ } else {
+ LOG.info("Catalog compaction skipped
[timestamp={}].", minRequiredTime);
+ }
+ }
});
}
- private CompletableFuture<Set<String>> requiredNodes(Catalog catalog) {
+ private CompletableFuture<Pair<Boolean, Set<String>>> validatePartitions(
+ Catalog catalog,
+ Map<Integer, BitSet> localPartitions,
+ Map<String, Map<Integer, BitSet>> remotePartitions
+ ) {
HybridTimestamp nowTs = clockService.now();
- Set<String> required = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ ConcurrentHashMap<String, CurrentPartitions> currentPartitionsPerNode
= new ConcurrentHashMap<>();
return CompletableFutures.allOf(catalog.tables().stream()
- .map(table -> collectRequiredNodes(catalog, table, required,
nowTs))
+ .map(table -> collectRequiredNodes(catalog, table, nowTs,
currentPartitionsPerNode))
.collect(Collectors.toList())
- ).thenApply(ignore -> required);
+ ).thenApply(ignore -> {
+
+ Map<String, Map<Integer, BitSet>> all = new
HashMap<>(remotePartitions);
+ all.put(localNodeName, localPartitions);
+
+ Set<String> required = currentPartitionsPerNode.keySet();
Review Comment:
Renamed for clarity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]