korlov42 commented on code in PR #2906:
URL: https://github.com/apache/ignite-3/pull/2906#discussion_r1432661090
##########
modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQuerySingleResult.java:
##########
@@ -69,15 +69,30 @@ public JdbcQuerySingleResult(int status, String err) {
super(status, err);
}
+ /**
+ * Constructor.
+ *
+ * @param hasNext {@code true} if more results are present.
+ * @param updCount Update counter.
+ */
+ public JdbcQuerySingleResult(boolean hasNext, long updCount) {
+ hasResults = hasNext;
+ this.updateCnt = updCount;
Review Comment:
if there are no results, why should we provide `updateCount`?
##########
modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java:
##########
@@ -168,4 +172,14 @@ protected void checkConnectionClosed(Executable ex) {
protected void checkNotSupported(Executable ex) {
assertThrows(SQLFeatureNotSupportedException.class, ex);
}
+
+ /** Return a size of stored resources. Reflection based implementation,
need to be refactored. */
+ int openCursorsRegistered() {
Review Comment:
the name of the method is kinda misleading. Let's rename to `openResources`
or something
##########
modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQuerySingleResult.java:
##########
@@ -179,6 +197,7 @@ public long updateCount() {
@Override
public void writeBinary(ClientMessagePacker packer) {
super.writeBinary(packer);
+ packer.packLong(updateCnt);
Review Comment:
if there are no results, we should not pack `updateCount`
##########
modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryCursorHandlerImplTest.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Catalog.VALIDATION_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_PARSE_ERR;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
+import
org.apache.ignite.internal.jdbc.proto.event.JdbcFetchQueryResultsRequest;
+import org.apache.ignite.internal.jdbc.proto.event.JdbcQuerySingleResult;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlException;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test to verify {@link JdbcQueryCursorHandlerImpl}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class JdbcQueryCursorHandlerImplTest extends BaseIgniteAbstractTest {
+
+ @ParameterizedTest(name = "throw exception from nextResult = {0}")
+ @ValueSource(booleans = {true, false})
+ void testGetMoreResultsProcessExceptions(boolean nextResultThrow) throws
IgniteInternalCheckedException {
+ ClientResourceRegistry resourceRegistryMocked =
mock(ClientResourceRegistry.class);
+ ClientResource rsrc = mock(ClientResource.class);
+
+ JdbcQueryCursorHandler cursorHandler = new
JdbcQueryCursorHandlerImpl(resourceRegistryMocked);
+
+ when(resourceRegistryMocked.get(anyLong())).thenAnswer(new
Answer<ClientResource>() {
+ @Override
+ public ClientResource answer(InvocationOnMock invocation) {
+ return rsrc;
+ }
+ });
+
+ when(rsrc.get(AsyncSqlCursor.class)).thenAnswer(new
Answer<AsyncSqlCursor<InternalSqlRow>>() {
+ @Override
+ public AsyncSqlCursor<InternalSqlRow> answer(InvocationOnMock
invocation) {
+ return new AsyncSqlCursor<>() {
+ @Override
+ public SqlQueryType queryType() {
+ throw new UnsupportedOperationException("queryType");
+ }
+
+ @Override
+ public ResultSetMetadata metadata() {
+ throw new UnsupportedOperationException("metadata");
+ }
+
+ @Override
+ public boolean hasNextResult() {
+ return true;
+ }
+
+ @Override
+ public void onClose(Runnable callback) {
+ // No op.
+ }
+
+ @Override
+ public CompletableFuture<AsyncSqlCursor<InternalSqlRow>>
nextResult() {
+ if (nextResultThrow) {
+ throw new SqlException(STMT_PARSE_ERR, new
Exception("nextResult exception"));
+ } else {
+ AsyncSqlCursorImpl<InternalSqlRow> sqlCursor =
mock(AsyncSqlCursorImpl.class);
+
+
when(sqlCursor.requestNextAsync(anyInt())).thenAnswer((Answer<BatchedResult<InternalSqlRow>>)
invocation -> {
+ throw new
IgniteInternalException(VALIDATION_ERR, "requestNextAsync error");
+ });
+
+ return
CompletableFuture.completedFuture(sqlCursor);
+ }
+ }
+
+ @Override
+ public CompletableFuture<BatchedResult<InternalSqlRow>>
requestNextAsync(int rows) {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return nullCompletedFuture();
+ }
+ };
+ }
+ });
+
+ CompletableFuture<JdbcQuerySingleResult> fut =
IgniteTestUtils.runAsync(() ->
+ await(cursorHandler.getMoreResultsAsync(new
JdbcFetchQueryResultsRequest(1, 100)), 5, TimeUnit.SECONDS)
+ );
+
+ try {
+ await(fut, 5, TimeUnit.SECONDS);
+ } catch (Throwable e) {
+ fail("Unexpected exception is raised.");
+ }
Review Comment:
to be honest, I don't quite understand what you are trying to verify here...
##########
modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java:
##########
@@ -460,33 +440,53 @@ public boolean getMoreResults() throws SQLException {
public boolean getMoreResults(int curr) throws SQLException {
ensureNotClosed();
- if (resSets == null || curRes >= resSets.size()) {
- return false;
- }
-
- curRes++;
-
if (resSets != null) {
assert curRes <= resSets.size() : "Invalid results state:
[resultsCount=" + resSets.size() + ", curRes=" + curRes + ']';
switch (curr) {
case CLOSE_CURRENT_RESULT:
- if (curRes > 0) {
- resSets.get(curRes - 1).close0();
- }
-
break;
case CLOSE_ALL_RESULTS:
case KEEP_CURRENT_RESULT:
throw new SQLFeatureNotSupportedException("Multiple open
results is not supported.");
default:
- throw new SQLException("Invalid 'current' parameter.");
+ throw new SQLException("Invalid 'curr' parameter.");
}
}
- return (resSets != null && curRes < resSets.size());
+ // all previous results need to be closed at this point.
+ if (isCloseOnCompletion()) {
+ close();
+ return false;
+ }
+
+ if (resSets == null || curRes >= resSets.size() || resSets.get(curRes)
== null) {
+ return false;
+ }
+
+ JdbcResultSet nextResultSet;
+ SQLException exceptionally = null;
+
+ try {
+ // just a stub if exception is raised inside multiple statements.
+ // all further execution is not processed.
+ nextResultSet = resSets.get(curRes).getNextResultSet();
+ } catch (SQLException ex) {
+ nextResultSet = null;
+ exceptionally = ex;
+ }
+
+ resSets.add(nextResultSet);
+
+ curRes++;
+
+ if (exceptionally != null) {
+ throw exceptionally;
+ }
+
+ return nextResultSet != null && nextResultSet.holdResults();
Review Comment:
this condition is not quite correct: empty result set still result set, thus
`true` should be returned (try this query `SELECT 1; SELECT 1 FROM
table(system_range(1, 0))`)
##########
modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java:
##########
@@ -415,6 +394,7 @@ public boolean execute(String sql, String[] colNames)
throws SQLException {
/** {@inheritDoc} */
@Override
+ @Nullable
public ResultSet getResultSet() throws SQLException {
Review Comment:
```suggestion
public @Nullable ResultSet getResultSet() throws SQLException {
```
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryCursorHandlerImpl.java:
##########
@@ -91,6 +95,51 @@ public CompletableFuture<JdbcQueryFetchResult>
fetchAsync(JdbcQueryFetchRequest
}).toCompletableFuture();
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<JdbcQuerySingleResult>
getMoreResultsAsync(JdbcFetchQueryResultsRequest req) {
+ AsyncSqlCursor<InternalSqlRow> asyncSqlCursor;
+ try {
+ asyncSqlCursor =
resources.get(req.cursorId()).get(AsyncSqlCursor.class);
+ } catch (IgniteInternalCheckedException e) {
+ StringWriter sw = getWriterWithStackTrace(e);
+
+ return CompletableFuture.completedFuture(new
JdbcQuerySingleResult(Response.STATUS_FAILED,
+ "Failed to find query cursor [curId=" + req.cursorId() +
"]. Error message:" + sw));
+ }
+
+ if (!asyncSqlCursor.hasNextResult()) {
+ return CompletableFuture.completedFuture(new
JdbcQuerySingleResult(false, -1));
+ }
+
+ return asyncSqlCursor.closeAsync().thenCompose(c ->
asyncSqlCursor.nextResult())
+ .thenCompose(cur -> cur.requestNextAsync(req.fetchSize())
+ .thenApply(batch -> {
+ try {
+ SqlQueryType queryType = cur.queryType();
+
+ long cursorId = resources.put(new
ClientResource(cur, cur::closeAsync));
+
+ List<ColumnMetadata> columns =
cur.metadata().columns();
+
+ return buildSingleRequest(batch, columns,
cursorId, !batch.hasMore(), queryType);
Review Comment:
I see that first page response for first statement and next statement are
created differently: for first statement only subset of query types goes
through `buildSingleRequest`, while for next statement all types goes through
this method. Let's align the creation of result for the first page
##########
modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQuerySingleResult.java:
##########
@@ -69,15 +69,30 @@ public JdbcQuerySingleResult(int status, String err) {
super(status, err);
}
+ /**
+ * Constructor.
+ *
+ * @param hasNext {@code true} if more results are present.
+ * @param updCount Update counter.
+ */
+ public JdbcQuerySingleResult(boolean hasNext, long updCount) {
+ hasResults = hasNext;
+ this.updateCnt = updCount;
+ }
+
/**
* Constructor.
*
* @param cursorId Cursor ID.
* @param rowTuples Serialized SQL result rows.
+ * @param columnTypes Ordered list of types of columns in serialized rows.
+ * @param decimalScales Decimal scales in appearance order.
+ * @param isQuery {@code true} if query is SELECT/EXPLAIN.
+ * @param updateCnt Update count.
* @param last Flag indicates the query has no unfetched results.
*/
public JdbcQuerySingleResult(long cursorId, List<BinaryTupleReader>
rowTuples, List<ColumnType> columnTypes, int[] decimalScales,
- boolean last) {
+ boolean last, boolean isQuery, long updateCnt) {
Review Comment:
this particular constructor was used to create result for statement
returning result set. With that said, it's better keep it as is (without
additional `isQuery` and `updateCnt`) to avoid confusion and possible misusage
##########
modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java:
##########
@@ -460,33 +440,53 @@ public boolean getMoreResults() throws SQLException {
public boolean getMoreResults(int curr) throws SQLException {
ensureNotClosed();
- if (resSets == null || curRes >= resSets.size()) {
- return false;
- }
-
- curRes++;
-
if (resSets != null) {
assert curRes <= resSets.size() : "Invalid results state:
[resultsCount=" + resSets.size() + ", curRes=" + curRes + ']';
switch (curr) {
case CLOSE_CURRENT_RESULT:
- if (curRes > 0) {
- resSets.get(curRes - 1).close0();
- }
-
break;
case CLOSE_ALL_RESULTS:
case KEEP_CURRENT_RESULT:
throw new SQLFeatureNotSupportedException("Multiple open
results is not supported.");
default:
- throw new SQLException("Invalid 'current' parameter.");
+ throw new SQLException("Invalid 'curr' parameter.");
Review Comment:
better to use `current` because it is how it named in interface
##########
modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcResultSet.java:
##########
@@ -265,7 +307,7 @@ public boolean next() throws SQLException {
/** {@inheritDoc} */
@Override
public void close() throws SQLException {
- close0();
+ closed = true;
Review Comment:
all resources must be cleaned up as soon as possible, it's not enough to
just pretend that result set is closed
##########
modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java:
##########
@@ -396,7 +398,7 @@ public ResultSet getResultSet() throws SQLException {
JdbcResultSet rs = resSets.get(curRes);
- if (!rs.isQuery()) {
+ if (rs == null || !rs.isQuery()) {
Review Comment:
a few words about comments above:
at the line 407 you've added null checking for element of `resSets`
collection. This implies that now `null` is considered legit item of the
collection. If so, it's better to state in `resSets` declaration --
`List<JdbcResultSet> resSets;` --> `List<@Nullable JdbcResultSet> resSets;` .
Now IDEA shows 4 more warning because of this annotation. Two of these warning
actually may cause NPE
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]