korlov42 commented on code in PR #817:
URL: https://github.com/apache/ignite-3/pull/817#discussion_r881871124


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.QueryTimeout;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Embedded implementation of the SQL session.
+ */
+public class SessionImpl implements Session {
+    /** Busy lock for close synchronisation. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final QueryProcessor qryProc;
+
+    private final long timeout;
+
+    private final String schema;
+
+    private final int pageSize;
+
+    private final Set<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Set<AsyncSqlCursor<List<Object>>> cursToClose = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Map<String, Object> props;
+
+    /**
+     * Constructor.
+     *
+     * @param qryProc Query processor.
+     * @param schema  Query default schema.
+     * @param timeout Query default timeout.
+     * @param pageSize Query fetch page size.
+     * @param props Session's properties.
+     */
+    SessionImpl(
+            QueryProcessor qryProc,
+            String schema,
+            long timeout,
+            int pageSize,
+            Map<String, Object> props
+    ) {
+        this.qryProc = qryProc;
+        this.schema = schema;
+        this.timeout = timeout;
+        this.pageSize = pageSize;
+        this.props = props;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, String query, 
@Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, Statement 
statement, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, String 
dmlQuery, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void executeScript(String query, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long defaultTimeout(TimeUnit timeUnit) {
+        return timeUnit.convert(timeout, TimeUnit.NANOSECONDS);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String defaultSchema() {
+        return schema;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int defaultPageSize() {
+        return pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable Object property(String name) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder toBuilder() {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteSqlException("Session is closed");
+        }
+
+        try {
+            return new SessionBuilderImpl(qryProc, props)
+                    .defaultPageSize(pageSize)
+                    .defaultTimeout(timeout, TimeUnit.NANOSECONDS)
+                    .defaultSchema(schema);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new 
IgniteSqlException("Session is closed."));
+        }
+
+        QueryContext ctx = QueryContext.of(transaction, new 
QueryTimeout(timeout, TimeUnit.NANOSECONDS));
+
+        try {
+            List<CompletableFuture<AsyncSqlCursor<List<Object>>>> futs = 
qryProc.queryAsync(ctx, schema, query, arguments);
+
+            futsToClose.addAll(futs);
+
+            if (futs.size() != 1) {
+                return CompletableFuture.failedFuture(new 
IgniteSqlException("Multiple statements aren't allowed."));
+            }
+
+            return futs.get(0).thenCompose(cur -> {
+                        futsToClose.remove(futs.get(0));
+
+                        if (!busyLock.enterBusy()) {
+                            return cur.closeAsync()
+                                    .thenCompose((v) -> 
CompletableFuture.failedFuture(new IgniteSqlException("Session is closed")));
+                        }
+
+                        try {
+                            cursToClose.add(cur);
+
+                            return cur.requestNextAsync(pageSize)
+                                    .thenApply(
+                                            batchRes -> new AsyncResultSetImpl(
+                                                    cur,
+                                                    batchRes,
+                                                    pageSize,
+                                                    () -> 
cursToClose.remove(cur)
+                                            )
+                                    );
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+            );
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, Statement statement) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Integer> executeBatchAsync(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Integer> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    public CompletableFuture<Void> closeAsync() {

Review Comment:
   missed `@Override` annotation



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.QueryTimeout;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Embedded implementation of the SQL session.
+ */
+public class SessionImpl implements Session {
+    /** Busy lock for close synchronisation. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final QueryProcessor qryProc;
+
+    private final long timeout;
+
+    private final String schema;
+
+    private final int pageSize;
+
+    private final Set<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Set<AsyncSqlCursor<List<Object>>> cursToClose = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Map<String, Object> props;
+
+    /**
+     * Constructor.
+     *
+     * @param qryProc Query processor.
+     * @param schema  Query default schema.
+     * @param timeout Query default timeout.
+     * @param pageSize Query fetch page size.
+     * @param props Session's properties.
+     */
+    SessionImpl(
+            QueryProcessor qryProc,
+            String schema,
+            long timeout,
+            int pageSize,
+            Map<String, Object> props
+    ) {
+        this.qryProc = qryProc;
+        this.schema = schema;
+        this.timeout = timeout;
+        this.pageSize = pageSize;
+        this.props = props;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, String query, 
@Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, Statement 
statement, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, String 
dmlQuery, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void executeScript(String query, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long defaultTimeout(TimeUnit timeUnit) {
+        return timeUnit.convert(timeout, TimeUnit.NANOSECONDS);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String defaultSchema() {
+        return schema;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int defaultPageSize() {
+        return pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable Object property(String name) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder toBuilder() {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteSqlException("Session is closed");
+        }
+
+        try {
+            return new SessionBuilderImpl(qryProc, props)
+                    .defaultPageSize(pageSize)
+                    .defaultTimeout(timeout, TimeUnit.NANOSECONDS)
+                    .defaultSchema(schema);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new 
IgniteSqlException("Session is closed."));
+        }
+
+        QueryContext ctx = QueryContext.of(transaction, new 
QueryTimeout(timeout, TimeUnit.NANOSECONDS));
+
+        try {
+            List<CompletableFuture<AsyncSqlCursor<List<Object>>>> futs = 
qryProc.queryAsync(ctx, schema, query, arguments);
+
+            futsToClose.addAll(futs);
+
+            if (futs.size() != 1) {
+                return CompletableFuture.failedFuture(new 
IgniteSqlException("Multiple statements aren't allowed."));
+            }
+
+            return futs.get(0).thenCompose(cur -> {
+                        futsToClose.remove(futs.get(0));

Review Comment:
   wrong indentation



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.QueryTimeout;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Embedded implementation of the SQL session.
+ */
+public class SessionImpl implements Session {
+    /** Busy lock for close synchronisation. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final QueryProcessor qryProc;
+
+    private final long timeout;
+
+    private final String schema;
+
+    private final int pageSize;
+
+    private final Set<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Set<AsyncSqlCursor<List<Object>>> cursToClose = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Map<String, Object> props;
+
+    /**
+     * Constructor.
+     *
+     * @param qryProc Query processor.
+     * @param schema  Query default schema.
+     * @param timeout Query default timeout.
+     * @param pageSize Query fetch page size.
+     * @param props Session's properties.
+     */
+    SessionImpl(
+            QueryProcessor qryProc,
+            String schema,
+            long timeout,
+            int pageSize,
+            Map<String, Object> props
+    ) {
+        this.qryProc = qryProc;
+        this.schema = schema;
+        this.timeout = timeout;
+        this.pageSize = pageSize;
+        this.props = props;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, String query, 
@Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, Statement 
statement, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, String 
dmlQuery, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void executeScript(String query, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long defaultTimeout(TimeUnit timeUnit) {
+        return timeUnit.convert(timeout, TimeUnit.NANOSECONDS);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String defaultSchema() {
+        return schema;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int defaultPageSize() {
+        return pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable Object property(String name) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder toBuilder() {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteSqlException("Session is closed");
+        }
+
+        try {
+            return new SessionBuilderImpl(qryProc, props)
+                    .defaultPageSize(pageSize)
+                    .defaultTimeout(timeout, TimeUnit.NANOSECONDS)
+                    .defaultSchema(schema);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new 
IgniteSqlException("Session is closed."));
+        }
+
+        QueryContext ctx = QueryContext.of(transaction, new 
QueryTimeout(timeout, TimeUnit.NANOSECONDS));
+
+        try {
+            List<CompletableFuture<AsyncSqlCursor<List<Object>>>> futs = 
qryProc.queryAsync(ctx, schema, query, arguments);
+
+            futsToClose.addAll(futs);
+
+            if (futs.size() != 1) {
+                return CompletableFuture.failedFuture(new 
IgniteSqlException("Multiple statements aren't allowed."));
+            }
+
+            return futs.get(0).thenCompose(cur -> {
+                        futsToClose.remove(futs.get(0));
+
+                        if (!busyLock.enterBusy()) {
+                            return cur.closeAsync()
+                                    .thenCompose((v) -> 
CompletableFuture.failedFuture(new IgniteSqlException("Session is closed")));
+                        }
+
+                        try {
+                            cursToClose.add(cur);
+
+                            return cur.requestNextAsync(pageSize)
+                                    .thenApply(
+                                            batchRes -> new AsyncResultSetImpl(
+                                                    cur,
+                                                    batchRes,
+                                                    pageSize,
+                                                    () -> 
cursToClose.remove(cur)
+                                            )
+                                    );
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+            );
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, Statement statement) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Integer> executeBatchAsync(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Integer> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    public CompletableFuture<Void> closeAsync() {
+        return CompletableFuture
+                .runAsync(busyLock::block)
+                .thenCompose(
+                        v0 -> {
+                            futsToClose.forEach(f -> f.cancel(true));

Review Comment:
   cancellation of the future doesn't cancel the whole computation chain 
started. So we'll get a bunch of opened cursors here



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.QueryTimeout;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Embedded implementation of the SQL session.
+ */
+public class SessionImpl implements Session {
+    /** Busy lock for close synchronisation. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final QueryProcessor qryProc;
+
+    private final long timeout;
+
+    private final String schema;
+
+    private final int pageSize;
+
+    private final Set<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Set<AsyncSqlCursor<List<Object>>> cursToClose = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Map<String, Object> props;
+
+    /**
+     * Constructor.
+     *
+     * @param qryProc Query processor.
+     * @param schema  Query default schema.
+     * @param timeout Query default timeout.
+     * @param pageSize Query fetch page size.
+     * @param props Session's properties.
+     */
+    SessionImpl(
+            QueryProcessor qryProc,
+            String schema,
+            long timeout,
+            int pageSize,
+            Map<String, Object> props
+    ) {
+        this.qryProc = qryProc;
+        this.schema = schema;
+        this.timeout = timeout;
+        this.pageSize = pageSize;
+        this.props = props;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, String query, 
@Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, Statement 
statement, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, String 
dmlQuery, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void executeScript(String query, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long defaultTimeout(TimeUnit timeUnit) {
+        return timeUnit.convert(timeout, TimeUnit.NANOSECONDS);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String defaultSchema() {
+        return schema;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int defaultPageSize() {
+        return pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable Object property(String name) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder toBuilder() {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteSqlException("Session is closed");
+        }
+
+        try {
+            return new SessionBuilderImpl(qryProc, props)
+                    .defaultPageSize(pageSize)
+                    .defaultTimeout(timeout, TimeUnit.NANOSECONDS)
+                    .defaultSchema(schema);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new 
IgniteSqlException("Session is closed."));
+        }
+
+        QueryContext ctx = QueryContext.of(transaction, new 
QueryTimeout(timeout, TimeUnit.NANOSECONDS));
+
+        try {
+            List<CompletableFuture<AsyncSqlCursor<List<Object>>>> futs = 
qryProc.queryAsync(ctx, schema, query, arguments);
+
+            futsToClose.addAll(futs);
+
+            if (futs.size() != 1) {
+                return CompletableFuture.failedFuture(new 
IgniteSqlException("Multiple statements aren't allowed."));

Review Comment:
   at this point you've just lost all opened cursors...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to