korlov42 commented on code in PR #1700:
URL: https://github.com/apache/ignite-3/pull/1700#discussion_r1117028747


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java:
##########
@@ -197,69 +174,203 @@ private void tryEnd() throws Exception {
         }
     }
 
-    /** Returns mapping of modifications per modification action. */
-    private Map<ModifyRow.Operation, Collection<BinaryRowEx>> 
getOperationsPerAction(List<ModifyRow> rows) {
-        Map<ModifyRow.Operation, Collection<BinaryRowEx>> store = new 
EnumMap<>(ModifyRow.Operation.class);
+    private void flushTuples(boolean force) {
+        if (nullOrEmpty(rows) || (!force && rows.size() < MODIFY_BATCH_SIZE)) {
+            return;
+        }
+
+        List<RowT> rows = this.rows;
+        this.rows = new ArrayList<>(MODIFY_BATCH_SIZE);
+
+        switch (modifyOp) {
+            case INSERT:
+                table.insertAll(context(), rows).join();
+
+                break;
+            case UPDATE:
+                inlineUpdates(0, rows);
+
+                table.upsertAll(context(), rows).join();
+
+                break;
+            case MERGE:
+                Pair<List<RowT>, List<RowT>> split = splitMerge(rows);
+
+                List<CompletableFuture<?>> mergeParts = new ArrayList<>(2);
+
+                if (split.getFirst() != null) {
+                    mergeParts.add(table.insertAll(context(), 
split.getFirst()));
+                }
 
-        for (ModifyRow tuple : rows) {
-            store.computeIfAbsent(tuple.getOp(), k -> new 
ArrayList<>()).add(tuple.getRow());
+                if (split.getSecond() != null) {
+                    mergeParts.add(table.upsertAll(context(), 
split.getSecond()));
+                }
+
+                CompletableFuture.allOf(
+                        mergeParts.toArray(CompletableFuture[]::new)
+                ).join();
+
+                break;
+            case DELETE:
+                table.deleteAll(context(), rows).join();
+
+                break;
+            default:
+                throw new UnsupportedOperationException(modifyOp.name());
         }
 
-        return store;
+        updatedRows += rows.size();
     }
 
-    private void flushTuples(boolean force) {
-        if (nullOrEmpty(rows) || !force && rows.size() < MODIFY_BATCH_SIZE) {
+    /** See {@link #mapping(TableDescriptor, List)}. */
+    private void inlineUpdates(int offset, List<RowT> rows) {
+        if (mapping == null) {
             return;
         }
 
-        List<ModifyRow> rows = this.rows;
-        this.rows = new ArrayList<>(MODIFY_BATCH_SIZE);
+        assert updateColumns != null;
+
+        RowHandler<RowT> handler = context().rowHandler();
+
+        int rowSize = handler.columnCount(rows.get(0));
+        int updateColumnOffset = hasUpsertSemantic(rowSize) ? rowSize - 
(mapping.length + updateColumns.size()) : 0;
 
-        Map<ModifyRow.Operation, Collection<BinaryRowEx>> operations = 
getOperationsPerAction(rows);
+        for (RowT row : rows) {
+            for (int i = 0; i < mapping.length; i++) {
+                if (offset == 0 && i == mapping[i]) {
+                    continue;
+                }
 
-        for (Map.Entry<ModifyRow.Operation, Collection<BinaryRowEx>> op : 
operations.entrySet()) {
-            switch (op.getKey()) {
-                case INSERT_ROW:
-                    Collection<BinaryRow> conflictKeys = 
tableView.insertAll(op.getValue(), tx).join();
+                handler.set(i, row, handler.get(mapping[i] + 
updateColumnOffset, row));
+            }
+        }
+    }
+
+    /**
+     * Splits the rows of merge operation onto rows that should be inserted 
and rows that should be updated.
+     *
+     * @param rows Rows to split.
+     * @return Pair where first element is list of rows to insert (or null if 
there is no such rows), and second
+     *     element is list of rows to update (or null if there is no such 
rows).
+     */
+    private Pair<List<RowT>, List<RowT>> splitMerge(List<RowT> rows) {
+        if (nullOrEmpty(updateColumns)) {
+            // WHEN NOT MATCHED clause only
+            return new Pair<>(rows, null);
+        }
 
-                    if (!conflictKeys.isEmpty()) {
-                        IgniteTypeFactory typeFactory = 
context().getTypeFactory();
-                        RowHandler.RowFactory<RowT> rowFactory = 
context().rowHandler().factory(
-                                context().getTypeFactory(),
-                                table.descriptor().insertRowType(typeFactory)
-                        );
+        assert mapping != null;
 
-                        List<String> conflictKeys0 = conflictKeys.stream()
-                                .map(binRow -> table.toRow(context(), binRow, 
rowFactory, null))
-                                .map(context().rowHandler()::toString)
-                                .collect(Collectors.toList());
+        List<RowT> rowsToInsert = null;
+        List<RowT> rowsToUpdate = null;
 
-                        throw conflictKeysException(conflictKeys0);
-                    }
+        RowHandler<RowT> handler = context().rowHandler();
+        int rowSize = handler.columnCount(rows.get(0));
 
-                    break;
-                case UPDATE_ROW:
-                    tableView.upsertAll(op.getValue(), tx).join();
+        int updateColumnOffset = rowSize - (mapping.length + 
updateColumns.size());
 
-                    break;
-                case DELETE_ROW:
-                    tableView.deleteAll(op.getValue(), tx).join();
+        if (!hasUpsertSemantic(rowSize)) {
+            // WHEN MATCHED clause only
+            rowsToUpdate = rows;
+        } else {
+            rowsToInsert = new ArrayList<>();
+            rowsToUpdate = new ArrayList<>();
 
-                    break;
-                default:
-                    throw new 
UnsupportedOperationException(op.getKey().name());
+            for (RowT row : rows) {
+                // this check doesn't seem correct because NULL could be a 
legit value for column,
+                // but this is how it was implemented before, so I just file 
an issue to deal with this later
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-18883
+                if (handler.get(updateColumnOffset, row) == null) {
+                    rowsToInsert.add(row);
+                } else {
+                    rowsToUpdate.add(row);
+                }
+            }
+
+            if (nullOrEmpty(rowsToInsert)) {
+                rowsToInsert = null;
+            }
+
+            if (nullOrEmpty(rowsToUpdate)) {
+                rowsToUpdate = null;
             }
         }
 
-        updatedRows += rows.size();
+        if (rowsToUpdate != null) {
+            inlineUpdates(updateColumnOffset, rowsToUpdate);
+        }
+
+        return new Pair<>(rowsToInsert, rowsToUpdate);
     }
 
-    /** Transforms keys list to appropriate exception. */
-    private RuntimeException conflictKeysException(List<String> conflictKeys) {
-        LOG.debug("Unable to update some keys because of conflict [op={}, 
keys={}]", modifyOp, conflictKeys);
+    /**
+     * Returns {@code true} if this ModifyNode is MERGE operator that has both 
WHEN MATCHED and WHEN NOT MATCHED
+     * clauses, thus has na UPSERT semantic.
+     *
+     * <p>The rows passed to the node have one of the possible format: <ol>
+     *     <li>[insert row type] -- the node is INSERT or MERGE with WHEN NOT 
MATCHED clause only</li>
+     *     <li>[delete row type] -- the node is DELETE</li>
+     *     <li>[full row type] + [columns to update] -- the node is UPDATE or 
MERGE with WHEN MATCHED clause only</li>
+     *     <li>[insert row type] + [full row type] + [columns to update] -- 
the node is MERGE with both handlers, has UPSERT semantic</li>
+     * </ol>
+     *
+     * @param rowSize The size of the row passed to the node.
+     * @return {@code true} if the node is MERGE with UPSERT semantic.
+     */
+    private boolean hasUpsertSemantic(int rowSize) {
+        return mapping != null && updateColumns != null && rowSize > 
mapping.length + updateColumns.size();
+    }
+
+    /**
+     * Creates a mapping to inline updates into the row.
+     *
+     * <p>The row passed to the modify node contains columns specified by
+     * {@link TableDescriptor#selectForUpdateRowType(IgniteTypeFactory)} 
followed by {@link #updateColumns}. Here is an example:
+     *
+     * <pre>
+     *     CREATE TABLE t (a INT, b INT, c INT);
+     *     INSERT INTO t VALUES (2, 2, 2);
+     *
+     *     UPDATE t SET b = b + 10, c = c * 10;
+     *     -- If selectForUpdateRowType specifies all the table columns,
+     *     -- then the following row should be passed to ModifyNode:
+     *     -- [2, 2, 2, 12, 20], where first three values is the original 
values
+     *     -- of columns A, B, and C respectively, 12 is the computed value 
for column B,
+     *     -- and 20 is the computed value for column C
+     * </pre>
+     *
+     * <p>For example above we should get mapping [0, 3, 4]: <ul>
+     *     <li>The column at index 0 is A. It's not updated, thus it mapped 
onto itself: 0 --> 0</li>
+     *     <li>The column at index 1 is B. B should be updated, and new value 
for B is at the index 3: 1 --> 3</li>
+     *     <li>The column at index 2 is C. C should be updated, and new value 
for C is at the index 4: 2 --> 4</li>
+     * </ul>
+     *
+     * @param descriptor A descriptor of the target table.
+     * @param updateColumns Enumeration of columns to update.
+     * @return A mapping to inline the updates into the row.
+     */
+    private static int @Nullable [] mapping(TableDescriptor descriptor, 
@Nullable List<String> updateColumns) {
+        if (updateColumns == null) {
+            return null;
+        }
+
+        int columnCount = descriptor.columnsCount();
+
+        int[] mapping = new int[columnCount];
+
+        Object2IntMap<String> updateColumnIndexes = new 
Object2IntOpenHashMap<>(updateColumns.size());
+
+        for (int i = 0; i < updateColumns.size(); i++) {
+            updateColumnIndexes.put(updateColumns.get(i), i + columnCount);
+        }

Review Comment:
   thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to