korlov42 commented on code in PR #1469:
URL: https://github.com/apache/ignite-3/pull/1469#discussion_r1064600092
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java:
##########
@@ -368,73 +391,86 @@ public QueryChecker planEquals(String plan) {
* Run checks.
*/
public void check() {
- // Check plan.
- QueryProcessor qryProc = getEngine();
+ QueryProcessor queryEngine = getEngine();
- var explainCursors = qryProc.queryAsync("PUBLIC", "EXPLAIN PLAN FOR "
+ qry, params);
+ SessionId sessionId = queryEngine.createSession(SESSION_IDLE_TIMEOUT,
PropertiesHolder.fromMap(
+ Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC")
+ ));
- var explainCursor = explainCursors.get(0).join();
- var explainRes = getAllFromCursor(explainCursor);
- String actualPlan = (String) explainRes.get(0).get(0);
+ QueryContext context = tx != null ? QueryContext.of(tx) :
QueryContext.of();
- if (!CollectionUtils.nullOrEmpty(planMatchers)) {
- for (Matcher<String> matcher : planMatchers) {
- assertThat("Invalid plan:\n" + actualPlan, actualPlan,
matcher);
+ try {
+ // Check plan.
+ var explainRes = getAllFromCursor(
Review Comment:
Keyword `var` is not allowed in this context. Please check the other places
as well.
BTW, let's execute the EXPLAIN statement if there are `planMatchers` and/or
`exactPlan` only
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java:
##########
@@ -343,37 +346,59 @@ public void checkTransactionsWithDml() throws Exception {
assertEquals(txManagerInternal.finished(), states.size());
}
- /** Check correctness of rw and ro transactions. */
+ /** Check correctness of rw and ro transactions for table scan. */
@Test
- public void checkMixedTransactions() throws Exception {
+ public void checkMixedTransactionsForTable() throws Exception {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ Matcher<String> planMatcher = containsTableScan("PUBLIC", "TEST");
+
+ checkMixedTransactions(planMatcher);
+ }
+
+
+ /** Check correctness of rw and ro transactions for index scan. */
+ @Test
+ public void checkMixedTransactionsForIndex() throws Exception {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ sql("CREATE INDEX TEST_IDX ON TEST(VAL0)");
+
+ Matcher<String> planMatcher = containsIndexScan("PUBLIC", "TEST",
"TEST_IDX");
+
+ checkMixedTransactions(planMatcher);
+ }
+
+ private void checkMixedTransactions(Matcher<String> planMatcher) throws
Exception {
IgniteSql sql = igniteSql();
if (sql instanceof ClientSql) {
return;
}
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
Session ses = sql.createSession();
for (int i = 0; i < ROW_COUNT; ++i) {
sql("INSERT INTO TEST VALUES (?, ?)", i, i);
}
- checkTx(ses, true, false, true);
- checkTx(ses, true, false, false);
- checkTx(ses, true, true, true);
- checkTx(ses, true, true, false);
- checkTx(ses, false, true, true);
- checkTx(ses, false, true, false);
- checkTx(ses, false, false, true);
- checkTx(ses, false, false, false);
+ checkTx(ses, true, false, true, planMatcher);
+ checkTx(ses, true, false, false, planMatcher);
+ checkTx(ses, true, true, true, planMatcher);
+ checkTx(ses, true, true, false, planMatcher);
+ checkTx(ses, false, true, true, planMatcher);
+ checkTx(ses, false, true, false, planMatcher);
+ checkTx(ses, false, false, true, planMatcher);
+ checkTx(ses, false, false, false, planMatcher);
}
- private void checkTx(Session ses, boolean readOnly, boolean commit,
boolean explicit) throws Exception {
+ private void checkTx(Session ses, boolean readOnly, boolean commit,
boolean explicit, Matcher<String> planMatcher) throws Exception {
Transaction outerTx = explicit ? (readOnly ?
igniteTx().readOnly().begin() : igniteTx().begin()) : null;
- AsyncResultSet rs = ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST
ORDER BY VAL0").get();
+ String query = "SELECT VAL0 FROM TEST ORDER BY VAL0";
+
+ assertQuery(query).tx(outerTx).matches(planMatcher).check();
Review Comment:
does it make sense to follow common approach and put `tx` as a first
argument of `assertQuery` method?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java:
##########
@@ -343,37 +346,59 @@ public void checkTransactionsWithDml() throws Exception {
assertEquals(txManagerInternal.finished(), states.size());
}
- /** Check correctness of rw and ro transactions. */
+ /** Check correctness of rw and ro transactions for table scan. */
@Test
- public void checkMixedTransactions() throws Exception {
+ public void checkMixedTransactionsForTable() throws Exception {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ Matcher<String> planMatcher = containsTableScan("PUBLIC", "TEST");
+
+ checkMixedTransactions(planMatcher);
+ }
+
+
+ /** Check correctness of rw and ro transactions for index scan. */
+ @Test
+ public void checkMixedTransactionsForIndex() throws Exception {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ sql("CREATE INDEX TEST_IDX ON TEST(VAL0)");
+
+ Matcher<String> planMatcher = containsIndexScan("PUBLIC", "TEST",
"TEST_IDX");
+
+ checkMixedTransactions(planMatcher);
+ }
+
+ private void checkMixedTransactions(Matcher<String> planMatcher) throws
Exception {
IgniteSql sql = igniteSql();
if (sql instanceof ClientSql) {
return;
}
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
Session ses = sql.createSession();
for (int i = 0; i < ROW_COUNT; ++i) {
sql("INSERT INTO TEST VALUES (?, ?)", i, i);
}
- checkTx(ses, true, false, true);
- checkTx(ses, true, false, false);
- checkTx(ses, true, true, true);
- checkTx(ses, true, true, false);
- checkTx(ses, false, true, true);
- checkTx(ses, false, true, false);
- checkTx(ses, false, false, true);
- checkTx(ses, false, false, false);
+ checkTx(ses, true, false, true, planMatcher);
Review Comment:
honestly speaking, it hard to verify if there is a complete permutation of
all plans, or some options are presented twice... Does it make sense to wrap a
single checkTx invocation with a loop(-s)?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java:
##########
@@ -368,73 +391,86 @@ public QueryChecker planEquals(String plan) {
* Run checks.
*/
public void check() {
- // Check plan.
- QueryProcessor qryProc = getEngine();
+ QueryProcessor queryEngine = getEngine();
- var explainCursors = qryProc.queryAsync("PUBLIC", "EXPLAIN PLAN FOR "
+ qry, params);
+ SessionId sessionId = queryEngine.createSession(SESSION_IDLE_TIMEOUT,
PropertiesHolder.fromMap(
+ Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC")
+ ));
- var explainCursor = explainCursors.get(0).join();
- var explainRes = getAllFromCursor(explainCursor);
- String actualPlan = (String) explainRes.get(0).get(0);
+ QueryContext context = tx != null ? QueryContext.of(tx) :
QueryContext.of();
- if (!CollectionUtils.nullOrEmpty(planMatchers)) {
- for (Matcher<String> matcher : planMatchers) {
- assertThat("Invalid plan:\n" + actualPlan, actualPlan,
matcher);
+ try {
+ // Check plan.
+ var explainRes = getAllFromCursor(
+ await(queryEngine.querySingleAsync(sessionId, context,
"EXPLAIN PLAN FOR " + qry, params))
+ );
+
+ String actualPlan = (String) explainRes.get(0).get(0);
+
+ if (!CollectionUtils.nullOrEmpty(planMatchers)) {
+ for (Matcher<String> matcher : planMatchers) {
+ assertThat("Invalid plan:\n" + actualPlan, actualPlan,
matcher);
+ }
}
- }
- if (exactPlan != null) {
- assertEquals(exactPlan, actualPlan);
- }
+ if (exactPlan != null) {
+ assertEquals(exactPlan, actualPlan);
+ }
- // Check result.
- var cursors = qryProc.queryAsync("PUBLIC", qry, params);
+ // Check result.
+ // ToDo: https://issues.apache.org/jira/browse/IGNITE-18501
+ // var cursors =
queryEngine.querySingleAsync(sessionId, context, qry, params);
+ // var cur = cursors.join();
- var cur = cursors.get(0).join();
+ var cursors = queryEngine.queryAsync("PUBLIC", qry, params);
+ var cur = cursors.get(0).join();
Review Comment:
`org.apache.ignite.internal.testframework.IgniteTestUtils#await(java.util.concurrent.CompletionStage<T>)`
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java:
##########
@@ -90,190 +64,33 @@ public TableScanNode(
@Nullable Function<RowT, RowT> rowTransformer,
@Nullable BitSet requiredColumns
) {
- super(ctx);
+ super(ctx, rowFactory, schemaTable, filters, rowTransformer,
requiredColumns);
assert !nullOrEmpty(parts);
- assert context().transaction() != null || context().transactionTime()
!= null : "Transaction not initialized.";
-
this.physTable = schemaTable.table();
- this.schemaTable = schemaTable;
this.parts = parts;
- this.filters = filters;
- this.rowTransformer = rowTransformer;
- this.requiredColumns = requiredColumns;
- this.factory = rowFactory;
}
/** {@inheritDoc} */
@Override
- public void request(int rowsCnt) throws Exception {
- assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ",
requested=" + requested;
-
- checkState();
+ protected Publisher<RowT> scan() {
+ List<Flow.Publisher<BinaryRow>> partPublishers = new
ArrayList<>(parts.length);
- requested = rowsCnt;
-
- if (!inLoop) {
- context().execute(this::push, this::onError);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void closeInternal() {
- super.closeInternal();
+ boolean roTx = context().transactionTime() != null;
- if (activeSubscription != null) {
- activeSubscription.cancel();
-
- activeSubscription = null;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- protected void rewindInternal() {
- requested = 0;
- waiting = 0;
- curPartIdx = 0;
-
- if (activeSubscription != null) {
- activeSubscription.cancel();
-
- activeSubscription = null;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void register(List<Node<RowT>> sources) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- protected Downstream<RowT> requestDownstream(int idx) {
- throw new UnsupportedOperationException();
- }
-
- private void push() throws Exception {
- if (isClosed()) {
- return;
- }
-
- checkState();
-
- if (requested > 0 && !inBuff.isEmpty()) {
- inLoop = true;
- try {
- while (requested > 0 && !inBuff.isEmpty()) {
- checkState();
-
- RowT row = inBuff.poll();
-
- if (filters != null && !filters.test(row)) {
- continue;
- }
-
- if (rowTransformer != null) {
- row = rowTransformer.apply(row);
- }
-
- requested--;
- downstream().push(row);
- }
- } finally {
- inLoop = false;
- }
- }
-
- if (requested > 0) {
- if (waiting == 0 || activeSubscription == null) {
- requestNextBatch();
- }
- }
-
- if (requested > 0 && waiting == NOT_WAITING) {
- if (inBuff.isEmpty()) {
- requested = 0;
- downstream().end();
- } else {
- context().execute(this::push, this::onError);
- }
- }
- }
-
- private void requestNextBatch() {
- if (waiting == NOT_WAITING) {
- return;
- }
-
- if (waiting == 0) {
- // we must not request rows more than inBufSize
- waiting = inBufSize - inBuff.size();
- }
-
- Subscription subscription = this.activeSubscription;
- if (subscription != null) {
- subscription.request(waiting);
- } else if (curPartIdx < parts.length) {
- if (context().transactionTime() != null) {
- physTable.scan(parts[curPartIdx++],
context().transactionTime(), context().localNode()).subscribe(new
SubscriberImpl());
+ for (int p : parts) {
+ Publisher<BinaryRow> pub;
+ if (roTx) {
+ pub = physTable.scan(p, context().transactionTime(),
context().localNode());
Review Comment:
I'm not sure it's a good idea to open cursors to all partitions at the same
time
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java:
##########
@@ -80,7 +80,7 @@ public boolean equals(Object o) {
if (!query.equals(cacheKey.query)) {
return false;
}
- if (!Objects.equals(contextKey, cacheKey.contextKey)) {
+ if (Objects.equals(contextKey, cacheKey.contextKey)) {
Review Comment:
the previous version seems to be correct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]