alex-plekhanov commented on code in PR #12593:
URL: https://github.com/apache/ignite/pull/12593#discussion_r2648022354
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java:
##########
@@ -190,4 +198,197 @@ public void testDuplicatedColumnNames() throws Exception {
assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
schema, hasColumns("NAME", "NAME1"));
}
+
+ /** Tests that table modify can be executed on remote nodes. */
+ @Test
+ public void testDistributedTableModify() throws Exception {
+ IgniteSchema schema = createSchema(
+ createTable("TEST", IgniteDistributions.affinity(3, "test",
"hash"),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ ),
+ createTable("TEST2", IgniteDistributions.affinity(2, "test2",
"hash"),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ ),
+ createTable("TEST3", IgniteDistributions.random(),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ ),
+ createTable("TEST_REPL", IgniteDistributions.broadcast(),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ ),
+ createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+ QueryUtils.KEY_FIELD_NAME, OTHER,
+ QueryUtils.VAL_FIELD_NAME, OTHER,
+ "ID", INTEGER,
+ "AFF_ID", INTEGER,
+ "VAL", INTEGER
+ )
+ );
+
+ // Check INSERT statements.
+
+ // partitioned <- values (broadcast).
+ assertPlan("INSERT INTO test VALUES (?, ?, ?)", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+ // partitioned <- partitioned (same).
+ assertPlan("INSERT INTO test SELECT * FROM test", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTableSpool.class)
+ .and(input(isTableScan("TEST")))))));
+
+ // partitioned <- partitioned (same, affinity key change).
+ assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test",
schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isInstanceOf(IgniteTableSpool.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isTableScan("TEST"))))))));
+
+ // partitioned <- partitioned (another affinity).
+ assertPlan("INSERT INTO test SELECT * FROM test2", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST2")))));
+
+ // partitioned <- partitioned (another affinity, affinity key change).
+ assertPlan("INSERT INTO test SELECT id, aff_id + 1, val FROM test2",
schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST2")))));
+
+ // partitioned <- random.
+ assertPlan("INSERT INTO test SELECT * FROM test3", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST3")))));
+
+ // partitioned <- broadcast.
+ assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isTableScan("TEST_REPL"))));
+
+ // partitioned <- broadcast (force distributed).
+ assertPlan("INSERT INTO test SELECT * FROM test_repl", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTrimExchange.class)
+ .and(input(isTableScan("TEST_REPL")))))),
+ TableModifySingleNodeConverterRule.class.getSimpleName()
+ );
+
+ // broadcast <- partitioned.
+ assertPlan("INSERT INTO test_repl SELECT * FROM test", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST")))));
+
+ // roadcast <- random.
+ assertPlan("INSERT INTO test_repl SELECT * FROM test3", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isTableScan("TEST3")))));
+
+ // broadcast <- broadcast.
+ assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isTableScan("TEST_REPL2"))));
+
+ // broadcast <- broadcast (force distributed).
+ assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+ .and(input(isInstanceOf(IgniteTrimExchange.class)
+ .and(input(isTableScan("TEST_REPL2")))))),
+ TableModifySingleNodeConverterRule.class.getSimpleName()
+ );
+
+ // broadcast <- broadcast (same).
+ assertPlan("INSERT INTO test_repl SELECT * FROM test_repl", schema,
+
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isInstanceOf(IgniteTableSpool.class)
+ .and(input(isTableScan("TEST_REPL"))))));
+
+ // broadcast <- broadcast (same, force distributed).
Review Comment:
No, since it has IgniteTrimExchange.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]