xtern commented on code in PR #1501:
URL: https://github.com/apache/ignite-3/pull/1501#discussion_r1094473567
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java:
##########
@@ -151,18 +149,17 @@ public ColocationGroup colocate(ColocationGroup other)
throws ColocationMappingE
+ "Replicated query parts are not co-located on all
nodes");
}
- List<List<String>> assignments;
+ List<List<NodeWithTerm>> assignments;
if (this.assignments == null || other.assignments == null) {
assignments = firstNotNull(this.assignments, other.assignments);
if (assignments != null && nodeNames != null) {
- Set<String> filter = new HashSet<>(nodeNames);
- List<List<String>> assignments0 = new
ArrayList<>(assignments.size());
+ List<List<NodeWithTerm>> assignments0 = new
ArrayList<>(assignments.size());
for (int i = 0; i < assignments.size(); i++) {
- List<String> assignment = Commons.intersect(filter,
assignments.get(i));
+ List<NodeWithTerm> assignment =
filterByNodeNames(assignments.get(i), new HashSet<>(nodeNames));
Review Comment:
Fixed, thanks.
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java:
##########
@@ -194,30 +194,101 @@ public ColocationGroup colocate(ColocationGroup other)
throws ColocationMappingE
return new ColocationGroup(sourceIds, nodeNames, assignments);
}
+ private List<NodeWithTerm> intersect(List<NodeWithTerm> assignment0,
List<NodeWithTerm> assignment1, Predicate<String> filter, int p)
+ throws ColocationMappingException {
+ if (assignment0.size() == 1 && assignment1.size() == 1) {
+ NodeWithTerm first = assignment0.get(0);
+ NodeWithTerm second = assignment1.get(0);
+
+ if (filter.test(first.name()) &&
first.name().equals(second.name())) {
+ validateTerm(first, second, p);
+
+ return assignment0;
+ }
+
+ return Collections.emptyList();
+ } else {
+ if (assignment0.size() > assignment1.size()) {
+ List<NodeWithTerm> tmp = assignment0;
+ assignment0 = assignment1;
+ assignment1 = tmp;
+ }
+
+ List<NodeWithTerm> assignment = new ArrayList<>();
+
+ Map<String, NodeWithTerm> hashedByNameAssignment =
+
assignment1.stream().collect(Collectors.toMap(NodeWithTerm::name, nodeWithTerm
-> nodeWithTerm));
+
+ for (NodeWithTerm first : assignment0) {
+ if (!filter.test(first.name())) {
+ continue;
+ }
+
+ NodeWithTerm second = hashedByNameAssignment.get(first.name());
+
+ if (second == null) {
+ continue;
+ }
+
+ validateTerm(first, second, p);
+
+ assignment.add(first);
+ }
+
+ return assignment;
+ }
+ }
+
+ private void validateTerm(NodeWithTerm first, NodeWithTerm second, int
partId) throws ColocationMappingException {
+ if (first.term() != second.term()) {
+ throw new ColocationMappingException("Primary replica term has
been changed during mapping ["
+ + "node=" + first.name()
+ + ", expectedTerm=" + first.term()
+ + ", actualTerm=" + second.term()
+ + ", part=" + partId
+ + ']');
+ }
+ }
+
+ private List<NodeWithTerm> filterByNodeNames(List<NodeWithTerm>
assignment, Set<String> filter) {
+ if (nullOrEmpty(assignment) || nullOrEmpty(filter)) {
+ return Collections.emptyList();
+ }
+
+ List<NodeWithTerm> res = new ArrayList<>();
Review Comment:
Done, thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]