korlov42 commented on a change in pull request #712: URL: https://github.com/apache/ignite-3/pull/712#discussion_r839384154
########## File path: modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/AsyncWrapperSelfTest.java ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ForkJoinPool; +import org.apache.ignite.internal.sql.engine.AsyncCursor; +import org.apache.ignite.internal.sql.engine.ClosedCursorException; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +/** Test class to verify {@link org.apache.ignite.internal.sql.engine.exec.AsyncWrapper}. */ +public class AsyncWrapperSelfTest { + /** + * The very first invocation of {@link AsyncCursor#requestNext requestNext} on the empty cursor should complete normally, follow + * invocation should be completed exceptionally. + */ + @Test + public void testEmpty() { + var cursor = new AsyncWrapper<>(Collections.emptyIterator()); + + await(cursor.requestNext(20).thenAccept(batch -> assertThat(batch.items(), empty()))); + + assertCursorHasNoMoreRow(cursor); + } + + /** + * Request the exact amount of rows, follow invocation of {@link AsyncCursor#requestNext requestNext} should be completed exceptionally. + */ + @Test + public void testNotEmptyRequestExact() { + var data = List.of(1, 2); + var cursor = new AsyncWrapper<>(data.iterator()); + + await(cursor.requestNext(data.size()).thenAccept(batch -> assertThat(batch.items(), equalTo(data)))); + + assertCursorHasNoMoreRow(cursor); + } + + /** + * Request several times by 1 row. After the whole iterator will be drained, the next invocation + * of {@link AsyncCursor#requestNext requestNext} should be completed exceptionally. + */ + @Test + public void testNotEmptyRequestLess() { + var data = List.of(1, 2); + var cursor = new AsyncWrapper<>(data.iterator()); + + await(cursor.requestNext(1).thenAccept(batch -> assertThat(batch.items(), equalTo(data.subList(0, 1))))); + await(cursor.requestNext(1).thenAccept(batch -> assertThat(batch.items(), equalTo(data.subList(1, 2))))); + + assertCursorHasNoMoreRow(cursor); + } + + /** + * Request the greater amount of rows, follow invocation of {@link AsyncCursor#requestNext requestNext} should complete exceptionally. + */ + @Test + public void testNotEmptyRequestMore() { + var data = List.of(1, 2); + var cursor = new AsyncWrapper<>(data.iterator()); + + await(cursor.requestNext(data.size() * 2).thenAccept(batch -> assertThat(batch.items(), equalTo(data)))); + + assertCursorHasNoMoreRow(cursor); + } + + /** + * Call to {@link AsyncCursor#close()} should be passed to delegate in case the latter implements {@link AutoCloseable}. + */ + @Test + @SuppressWarnings("unchecked") + public void testClosePropagatedToDelegate() throws Exception { + var mockIt = (ClosableIterator<Object>) Mockito.mock(ClosableIterator.class); + var cursor = new AsyncWrapper<>(mockIt); + + await(cursor.close()); + + Mockito.verify(mockIt).close(); + } + + /** + * All calls to {@link AsyncCursor#requestNext(int)} should be chained and executed in the proper order. + */ + @Test + public void testRequestsChainedAndExecutedAfterCursorInited() { + var data = List.of(1, 2); Review comment: done ########## File path: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AsyncWrapper.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import org.apache.ignite.internal.sql.engine.AsyncCursor; +import org.apache.ignite.internal.sql.engine.ClosedCursorException; + +/** + * Wrapper that converts a synchronous iterator to an asynchronous one. + * + * @param <T> Type of the item. + */ +public class AsyncWrapper<T> implements AsyncCursor<T> { + /** + * Future returning iterator that should be converted to async. + */ + private final CompletableFuture<Iterator<T>> cursorFut; + + private final CompletableFuture<Void> cancelFut = new CompletableFuture<>(); + + private final Executor exec; + + private final Object lock = new Object(); + + /** The tail of the request chain. Guarded by {@link #lock}. */ + private CompletableFuture<BatchedResult<T>> requestChainTail = CompletableFuture.completedFuture(null); + + private volatile boolean cancelled = false; + + private volatile boolean firstRequest = true; + + /** + * Constructor. + * + * <p>The execution will be in the thread invoking particular method of this cursor. + * + * @param source An iterator to wrap. + */ + public AsyncWrapper(Iterator<T> source) { + this(CompletableFuture.completedFuture(source), Runnable::run); + } + + /** + * Constructor. + * + * @param source An iterator to wrap. + * @param exec An executor to delegate execution. + */ + public AsyncWrapper(Iterator<T> source, Executor exec) { + this(CompletableFuture.completedFuture(source), exec); + } + + /** + * Constructor. + * + * @param initFut Initialization future. + * @param exec An executor to delegate execution. + */ + public AsyncWrapper(CompletableFuture<Iterator<T>> initFut, Executor exec) { + this.cursorFut = initFut; + this.exec = exec; + } + + /** {@inheritDoc} */ + @Override + public CompletionStage<BatchedResult<T>> requestNext(int rows) { + CompletableFuture<BatchedResult<T>> next = new CompletableFuture<>(); + CompletableFuture<BatchedResult<T>> prev; + + synchronized (lock) { + if (cancelled) { + next.completeExceptionally(new ClosedCursorException()); + + return next; + } + + prev = requestChainTail; + requestChainTail = next; + } + + prev.thenCompose(tmp -> cursorFut).thenAcceptAsync(cursor -> { + int remains = rows; + List<T> batch = new ArrayList<>(rows); Review comment: good catch! Fixed ########## File path: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AsyncWrapper.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import org.apache.ignite.internal.sql.engine.AsyncCursor; +import org.apache.ignite.internal.sql.engine.ClosedCursorException; + +/** + * Wrapper that converts a synchronous iterator to an asynchronous one. + * + * @param <T> Type of the item. + */ +public class AsyncWrapper<T> implements AsyncCursor<T> { + /** + * Future returning iterator that should be converted to async. + */ + private final CompletableFuture<Iterator<T>> cursorFut; + + private final CompletableFuture<Void> cancelFut = new CompletableFuture<>(); + + private final Executor exec; + + private final Object lock = new Object(); + + /** The tail of the request chain. Guarded by {@link #lock}. */ + private CompletableFuture<BatchedResult<T>> requestChainTail = CompletableFuture.completedFuture(null); + + private volatile boolean cancelled = false; + + private volatile boolean firstRequest = true; + + /** + * Constructor. + * + * <p>The execution will be in the thread invoking particular method of this cursor. + * + * @param source An iterator to wrap. + */ + public AsyncWrapper(Iterator<T> source) { + this(CompletableFuture.completedFuture(source), Runnable::run); + } + + /** + * Constructor. + * + * @param source An iterator to wrap. + * @param exec An executor to delegate execution. + */ + public AsyncWrapper(Iterator<T> source, Executor exec) { + this(CompletableFuture.completedFuture(source), exec); + } + + /** + * Constructor. + * + * @param initFut Initialization future. + * @param exec An executor to delegate execution. + */ + public AsyncWrapper(CompletableFuture<Iterator<T>> initFut, Executor exec) { Review comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
