Copilot commented on code in PR #7463:
URL: https://github.com/apache/ignite-3/pull/7463#discussion_r2720000311
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java:
##########
@@ -132,7 +133,8 @@ public RexNode visitCorrelVariable(RexCorrelVariable
variable) {
right,
rel.getCondition(),
correlationIds,
- joinType
+ joinType,
+ ImmutableBitSet.of()
);
Review Comment:
`requiredColumns` is always passed as an empty bitset here. With the new
correlation propagation approach (storing individual correlated field values),
an empty `requiredColumns` means no values are put into the evaluation context,
so any `RexFieldAccess` on correlates in the right-side filter will evaluate to
null/incorrectly. Compute and pass the set of left-side field indexes actually
referenced by correlated variables in the pushed filter (e.g., derive from
`rel.getCondition()` input refs < leftFieldCount).
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java:
##########
@@ -39,5 +39,5 @@ public interface SqlEvaluationContext<RowT> extends
DataContext {
RowFactoryFactory<RowT> rowFactoryFactory();
/** Returns row representing correlation source by given correlation id. */
Review Comment:
The Javadoc says this method "Returns row representing correlation source",
but after this change the API is used to retrieve a single correlated field
value by `(id, idx)`. Please update the documentation (and consider whether the
generic return type should remain `RowT` or be `Object`) to reflect the actual
contract.
```suggestion
/**
* Returns the value of a correlated variable identified by the given
correlation id and field index.
*
* @param id Correlation identifier.
* @param idx Index of the field within the correlation source.
* @return The value of the requested correlated field, represented as
{@code RowT}.
*/
```
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java:
##########
@@ -101,6 +106,7 @@ public CorrelatedNestedLoopJoinNode(
this.correlationIds = new ArrayList<>(correlationIds);
Review Comment:
`correlationIds` is accepted as a `Set` and then converted to an `ArrayList`
using iteration order. The join logic later associates `correlationIds.get(i)`
with the i-th row in the left buffer; if callers pass an unordered set (e.g.,
`HashSet` in `CorrelatedNestedLoopJoinRule`), the correlationId↔row mapping
becomes nondeterministic and can produce incorrect results for batched
correlates. Preserve insertion order (e.g., require/accept a
`List<CorrelationId>` or defensively sort/use `LinkedHashSet`).
```suggestion
this.correlationIds = new ArrayList<>(correlationIds);
Collections.sort(this.correlationIds);
```
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java:
##########
@@ -77,20 +80,22 @@ private enum State {
/**
* Creates CorrelatedNestedLoopJoin node.
*
- * @param ctx Execution context.
+ * @param ctx Execution context.
* @param cond Join expression.
* @param correlationIds Set of collections ids.
* @param joinType Join rel type.
* @param rightRowFactory Right row factory.
* @param joinProjection Output row factory.
+ * @param requiredColumns TODO Required columns.
Review Comment:
The Javadoc for `requiredColumns` is currently a TODO and doesn’t describe
the required behavior. Please document what this bitset represents (e.g.,
left-side column indexes that must be captured into shared state for correlated
expression evaluation) and how it is used.
```suggestion
* @param requiredColumns Bit set of zero-based indexes of left input
columns whose values must be
* captured into the correlation context and made available to
evaluate correlated
* expressions on the right side for each left row.
```
##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java:
##########
@@ -82,7 +83,8 @@ public void testCorrelatedNestedLoopJoin(JoinRelType
joinType) {
Set.of(new CorrelationId(0)),
joinType,
ctx.rowFactoryFactory().create(convertStructuredType(rightType)),
- identityProjection()
+ identityProjection(),
+ ImmutableBitSet.of()
);
Review Comment:
The test passes `ImmutableBitSet.of()` for `requiredColumns`, so it never
exercises the new correlation propagation path in `prepareCorrelations()`.
Consider adding/adjusting a test case that sets `requiredColumns` to a
non-empty set and asserts that correlated values are actually visible to the
right side (e.g., via an expression/predicate that reads correlates from
`SqlEvaluationContext`).
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java:
##########
@@ -22,25 +22,17 @@
import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter;
-import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
class CorrelatesBuilder extends RexShuttle {
- private final BlockBuilder builder;
-
private final Expression ctx;
- private final Expression hnd;
-
- private Map<String, FieldGetter> correlates;
+ private Map<String, InputGetter> correlates;
CorrelatesBuilder(BlockBuilder builder, Expression ctx, Expression hnd) {
Review Comment:
`CorrelatesBuilder` no longer uses the `builder` and `hnd` constructor
parameters (they’re ignored). Please remove these unused parameters and update
call sites accordingly to avoid confusion and keep the API minimal.
```suggestion
CorrelatesBuilder(Expression ctx) {
```
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import org.apache.ignite.internal.sql.engine.message.field.SingleFieldMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converter between {@link SharedState} and {@link SharedStateMessage}.
+ */
+public class SharedStateMessageConverter {
+ /** Message factory. */
+ private static final SqlQueryMessagesFactory MESSAGE_FACTORY = new
SqlQueryMessagesFactory();
+
+ static @Nullable SharedStateMessage toMessage(@Nullable SharedState state)
{
+ if (state == null) {
+ return null;
+ }
+
+ Long2ObjectMap<Object> correlations = state.correlations();
+ Map<Long, NetworkMessage> result =
IgniteUtils.newHashMap(correlations.size());
+
+ for (Long2ObjectMap.Entry<Object> entry :
correlations.long2ObjectEntrySet()) {
+ SingleFieldMessage<?> msg = toSingleFieldMessage(entry.getValue());
+
+ result.put(entry.getLongKey(), msg);
+ }
+
+ return MESSAGE_FACTORY.sharedStateMessage()
+ .sharedState(result)
+ .build();
+ }
+
+ static SharedState fromMessage(SharedStateMessage sharedStateMessage) {
+ int size = sharedStateMessage.sharedState().size();
+ Long2ObjectMap<Object> correlations = new
Long2ObjectOpenHashMap<>(size);
+
+ for (Map.Entry<Long, NetworkMessage> e :
sharedStateMessage.sharedState().entrySet()) {
+ SingleFieldMessage<Object> msg = ((SingleFieldMessage<Object>)
e.getValue());
+
Review Comment:
`fromMessage` blindly casts each map value to `SingleFieldMessage` without
validation. Since this data comes from the network, a malformed/corrupted
message could cause a `ClassCastException` and fail the fragment unexpectedly.
Consider validating `instanceof SingleFieldMessage` (and possibly validating
allowed message types) and throwing a clear exception when an unexpected value
is encountered.
```suggestion
NetworkMessage networkMessage = e.getValue();
if (!(networkMessage instanceof SingleFieldMessage)) {
throw new IllegalArgumentException(
"Unexpected message type in shared state. Expected
SingleFieldMessage but was "
+ (networkMessage == null ? "null" :
networkMessage.getClass().getName())
+ " for key " + e.getKey()
);
}
SingleFieldMessage<?> msg = (SingleFieldMessage<?>)
networkMessage;
```
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java:
##########
@@ -17,41 +17,55 @@
package org.apache.ignite.internal.sql.engine.exec;
-import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
/**
* This class represents the volatile state that may be propagated from parent
to its children
* during rewind.
*/
-public class SharedState implements Serializable {
- private static final long serialVersionUID = 42L;
+public class SharedState {
+ private final Long2ObjectMap<Object> correlations;
+
+ public SharedState() {
+ this(new Long2ObjectOpenHashMap<>());
+ }
- private Object[] correlations = new Object[16];
+ SharedState(Long2ObjectMap<Object> correlations) {
+ this.correlations = correlations;
+ }
/**
* Gets correlated value.
*
- * @param id Correlation ID.
+ * @param corrId Correlation ID.
* @return Correlated value.
*/
- public Object correlatedVariable(int id) {
- checkRange(correlations, id);
+ public Object correlatedVariable(int corrId, int fieldIdx) {
+ long key = packToLong(corrId, fieldIdx);
- return correlations[id];
+ return correlations.get(key);
}
/**
* Sets correlated value.
*
- * @param id Correlation ID.
+ * @param corrId Correlation ID.
+ * @param fieldIdx Field index.
* @param value Correlated value.
*/
- public void correlatedVariable(int id, Object value) {
- correlations = Commons.ensureCapacity(correlations, id + 1);
+ public void correlatedVariable(int corrId, int fieldIdx, Object value) {
+ long key = packToLong(corrId, fieldIdx);
+
+ correlations.put(key, value);
+ }
+
+ Long2ObjectMap<Object> correlations() {
+ return Long2ObjectMaps.unmodifiable(correlations);
+ }
- correlations[id] = value;
+ private static long packToLong(int corrId, int fieldIdx) {
+ return ((((long) corrId) << 32 | fieldIdx));
Review Comment:
`packToLong` should mask `fieldIdx` to avoid sign-extension issues when
packing into a `long` key (e.g., use `fieldIdx & 0xFFFF_FFFFL`). This makes the
key stable even if negative values ever appear and matches common packing
patterns.
```suggestion
return ((corrId & 0xFFFF_FFFFL) << 32) | (fieldIdx & 0xFFFF_FFFFL);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]