Copilot commented on code in PR #7463:
URL: https://github.com/apache/ignite-3/pull/7463#discussion_r2720000311


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java:
##########
@@ -132,7 +133,8 @@ public RexNode visitCorrelVariable(RexCorrelVariable 
variable) {
                 right,
                 rel.getCondition(),
                 correlationIds,
-                joinType
+                joinType,
+                ImmutableBitSet.of()
         );

Review Comment:
   `requiredColumns` is always passed as an empty bitset here. With the new 
correlation propagation approach (storing individual correlated field values), 
an empty `requiredColumns` means no values are put into the evaluation context, 
so any `RexFieldAccess` on correlates in the right-side filter will evaluate to 
null/incorrectly. Compute and pass the set of left-side field indexes actually 
referenced by correlated variables in the pushed filter (e.g., derive from 
`rel.getCondition()` input refs < leftFieldCount).



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java:
##########
@@ -39,5 +39,5 @@ public interface SqlEvaluationContext<RowT> extends 
DataContext {
     RowFactoryFactory<RowT> rowFactoryFactory();
 
     /** Returns row representing correlation source by given correlation id. */

Review Comment:
   The Javadoc says this method "Returns row representing correlation source", 
but after this change the API is used to retrieve a single correlated field 
value by `(id, idx)`. Please update the documentation (and consider whether the 
generic return type should remain `RowT` or be `Object`) to reflect the actual 
contract.
   ```suggestion
       /**
        * Returns the value of a correlated variable identified by the given 
correlation id and field index.
        *
        * @param id Correlation identifier.
        * @param idx Index of the field within the correlation source.
        * @return The value of the requested correlated field, represented as 
{@code RowT}.
        */
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java:
##########
@@ -101,6 +106,7 @@ public CorrelatedNestedLoopJoinNode(
         this.correlationIds = new ArrayList<>(correlationIds);

Review Comment:
   `correlationIds` is accepted as a `Set` and then converted to an `ArrayList` 
using iteration order. The join logic later associates `correlationIds.get(i)` 
with the i-th row in the left buffer; if callers pass an unordered set (e.g., 
`HashSet` in `CorrelatedNestedLoopJoinRule`), the correlationId↔row mapping 
becomes nondeterministic and can produce incorrect results for batched 
correlates. Preserve insertion order (e.g., require/accept a 
`List<CorrelationId>` or defensively sort/use `LinkedHashSet`).
   ```suggestion
           this.correlationIds = new ArrayList<>(correlationIds);
           Collections.sort(this.correlationIds);
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java:
##########
@@ -77,20 +80,22 @@ private enum State {
     /**
      * Creates CorrelatedNestedLoopJoin node.
      *
-     * @param ctx  Execution context.
+     * @param ctx Execution context.
      * @param cond Join expression.
      * @param correlationIds Set of collections ids.
      * @param joinType Join rel type.
      * @param rightRowFactory Right row factory.
      * @param joinProjection Output row factory.
+     * @param requiredColumns TODO Required columns.

Review Comment:
   The Javadoc for `requiredColumns` is currently a TODO and doesn’t describe 
the required behavior. Please document what this bitset represents (e.g., 
left-side column indexes that must be captured into shared state for correlated 
expression evaluation) and how it is used.
   ```suggestion
        * @param requiredColumns Bit set of zero-based indexes of left input 
columns whose values must be
        *         captured into the correlation context and made available to 
evaluate correlated
        *         expressions on the right side for each left row.
   ```



##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java:
##########
@@ -82,7 +83,8 @@ public void testCorrelatedNestedLoopJoin(JoinRelType 
joinType) {
                 Set.of(new CorrelationId(0)),
                 joinType,
                 
ctx.rowFactoryFactory().create(convertStructuredType(rightType)),
-                identityProjection()
+                identityProjection(),
+                ImmutableBitSet.of()
         );

Review Comment:
   The test passes `ImmutableBitSet.of()` for `requiredColumns`, so it never 
exercises the new correlation propagation path in `prepareCorrelations()`. 
Consider adding/adjusting a test case that sets `requiredColumns` to a 
non-empty set and asserts that correlated values are actually visible to the 
right side (e.g., via an expression/predicate that reads correlates from 
`SqlEvaluationContext`).



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java:
##########
@@ -22,25 +22,17 @@
 import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.Expressions;
 import org.apache.calcite.rex.RexCorrelVariable;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter;
-import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
 
 class CorrelatesBuilder extends RexShuttle {
-    private final BlockBuilder builder;
-
     private final Expression ctx;
 
-    private final Expression hnd;
-
-    private Map<String, FieldGetter> correlates;
+    private Map<String, InputGetter> correlates;
 
     CorrelatesBuilder(BlockBuilder builder, Expression ctx, Expression hnd) {

Review Comment:
   `CorrelatesBuilder` no longer uses the `builder` and `hnd` constructor 
parameters (they’re ignored). Please remove these unused parameters and update 
call sites accordingly to avoid confusion and keep the API minimal.
   ```suggestion
       CorrelatesBuilder(Expression ctx) {
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import org.apache.ignite.internal.sql.engine.message.field.SingleFieldMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converter between {@link SharedState} and {@link SharedStateMessage}.
+ */
+public class SharedStateMessageConverter {
+    /** Message factory. */
+    private static final SqlQueryMessagesFactory MESSAGE_FACTORY = new 
SqlQueryMessagesFactory();
+
+    static @Nullable SharedStateMessage toMessage(@Nullable SharedState state) 
{
+        if (state == null) {
+            return null;
+        }
+
+        Long2ObjectMap<Object> correlations = state.correlations();
+        Map<Long, NetworkMessage> result = 
IgniteUtils.newHashMap(correlations.size());
+
+        for (Long2ObjectMap.Entry<Object> entry : 
correlations.long2ObjectEntrySet()) {
+            SingleFieldMessage<?> msg = toSingleFieldMessage(entry.getValue());
+
+            result.put(entry.getLongKey(), msg);
+        }
+
+        return MESSAGE_FACTORY.sharedStateMessage()
+                .sharedState(result)
+                .build();
+    }
+
+    static SharedState fromMessage(SharedStateMessage sharedStateMessage) {
+        int size = sharedStateMessage.sharedState().size();
+        Long2ObjectMap<Object> correlations = new 
Long2ObjectOpenHashMap<>(size);
+
+        for (Map.Entry<Long, NetworkMessage> e : 
sharedStateMessage.sharedState().entrySet()) {
+            SingleFieldMessage<Object> msg = ((SingleFieldMessage<Object>) 
e.getValue());
+

Review Comment:
   `fromMessage` blindly casts each map value to `SingleFieldMessage` without 
validation. Since this data comes from the network, a malformed/corrupted 
message could cause a `ClassCastException` and fail the fragment unexpectedly. 
Consider validating `instanceof SingleFieldMessage` (and possibly validating 
allowed message types) and throwing a clear exception when an unexpected value 
is encountered.
   ```suggestion
               NetworkMessage networkMessage = e.getValue();
   
               if (!(networkMessage instanceof SingleFieldMessage)) {
                   throw new IllegalArgumentException(
                           "Unexpected message type in shared state. Expected 
SingleFieldMessage but was "
                                   + (networkMessage == null ? "null" : 
networkMessage.getClass().getName())
                                   + " for key " + e.getKey()
                   );
               }
   
               SingleFieldMessage<?> msg = (SingleFieldMessage<?>) 
networkMessage;
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java:
##########
@@ -17,41 +17,55 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 
 /**
  * This class represents the volatile state that may be propagated from parent 
to its children
  * during rewind.
  */
-public class SharedState implements Serializable {
-    private static final long serialVersionUID = 42L;
+public class SharedState {
+    private final Long2ObjectMap<Object> correlations;
+
+    public SharedState() {
+        this(new Long2ObjectOpenHashMap<>());
+    }
 
-    private Object[] correlations = new Object[16];
+    SharedState(Long2ObjectMap<Object> correlations) {
+        this.correlations = correlations;
+    }
 
     /**
      * Gets correlated value.
      *
-     * @param id Correlation ID.
+     * @param corrId Correlation ID.
      * @return Correlated value.
      */
-    public Object correlatedVariable(int id) {
-        checkRange(correlations, id);
+    public Object correlatedVariable(int corrId, int fieldIdx) {
+        long key = packToLong(corrId, fieldIdx);
 
-        return correlations[id];
+        return correlations.get(key);
     }
 
     /**
      * Sets correlated value.
      *
-     * @param id Correlation ID.
+     * @param corrId Correlation ID.
+     * @param fieldIdx Field index.
      * @param value Correlated value.
      */
-    public void correlatedVariable(int id, Object value) {
-        correlations = Commons.ensureCapacity(correlations, id + 1);
+    public void correlatedVariable(int corrId, int fieldIdx, Object value) {
+        long key = packToLong(corrId, fieldIdx);
+
+        correlations.put(key, value);
+    }
+
+    Long2ObjectMap<Object> correlations() {
+        return Long2ObjectMaps.unmodifiable(correlations);
+    }
 
-        correlations[id] = value;
+    private static long packToLong(int corrId, int fieldIdx) {
+        return ((((long) corrId) << 32 | fieldIdx));

Review Comment:
   `packToLong` should mask `fieldIdx` to avoid sign-extension issues when 
packing into a `long` key (e.g., use `fieldIdx & 0xFFFF_FFFFL`). This makes the 
key stable even if negative values ever appear and matches common packing 
patterns.
   ```suggestion
           return ((corrId & 0xFFFF_FFFFL) << 32) | (fieldIdx & 0xFFFF_FFFFL);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to