MarcosZyk commented on code in PR #10231:
URL: https://github.com/apache/iotdb/pull/10231#discussion_r1236521508


##########
server/src/main/java/org/apache/iotdb/db/metadata/query/reader/ISchemaReader.java:
##########
@@ -21,9 +21,17 @@
 
 import org.apache.iotdb.db.metadata.query.info.ISchemaInfo;
 
-import java.util.Iterator;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
+public interface ISchemaReader<T extends ISchemaInfo> extends AutoCloseable {
+
+  ListenableFuture<Boolean> NOT_BLOCKED_TRUE = immediateFuture(true);
+  ListenableFuture<Boolean> NOT_BLOCKED_FALSE = immediateFuture(false);
+  ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
 
-public interface ISchemaReader<T extends ISchemaInfo> extends Iterator<T>, 
AutoCloseable {

Review Comment:
   Keep extends Iterator<T>



##########
server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java:
##########
@@ -64,19 +60,50 @@ public void close() throws Exception {
   }
 
   @Override
-  public boolean hasNext() {
+  public ListenableFuture<?> isBlocked() {
+    if (hasNextFuture != null) {
+      return hasNextFuture;
+    }
+    hasNextFuture = hasNextFuture();
+    return hasNextFuture;
+  }
+
+  private ListenableFuture<?> hasNextFuture() {

Review Comment:
   Rename to tryGetNext



##########
server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java:
##########
@@ -32,20 +34,14 @@
   private final boolean hasLimit;
 
   private int count = 0;
-  int curOffset = 0;
+  private int curOffset = 0;
+  private ListenableFuture<?> hasNextFuture = null;

Review Comment:
   Rename to isBlocked to keep consistent that in Operators



##########
server/src/main/java/org/apache/iotdb/db/metadata/query/reader/TimeseriesReaderWithViewFetch.java:
##########
@@ -45,12 +51,13 @@
 import java.util.Queue;
 
 public class TimeseriesReaderWithViewFetch implements 
ISchemaReader<ITimeSeriesSchemaInfo> {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeseriesReaderWithViewFetch.class);
   private final Traverser<ITimeSeriesSchemaInfo, ?> traverser;
   private final Queue<ITimeSeriesSchemaInfo> cachedViewList = new 
ArrayDeque<>();
   private ITimeSeriesSchemaInfo next = null;
   private boolean consumeView = false;
   private final SchemaFilter schemaFilter;
+  private ListenableFuture<Boolean> hasNextFuture = null;

Review Comment:
   Rename to isBlocked.



##########
server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java:
##########
@@ -293,8 +293,8 @@ ISchemaReader<IDeviceSchemaInfo> 
getDeviceReader(IShowDevicesPlan showDevicesPla
       throws MetadataException;
 
   /**
-   * The iterated result shall be consumed before calling reader.hasNext() or 
reader.next(). Its
-   * implementation is based on the reader's process context.
+   * The iterated result shall be consumed before calling 
reader.hasNextFuture() or reader.next().

Review Comment:
   Fix this annotation. Describe these three interfaces usage, isBlocked, 
hasNext and next.



##########
server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java:
##########
@@ -64,19 +60,50 @@ public void close() throws Exception {
   }
 
   @Override
-  public boolean hasNext() {
+  public ListenableFuture<?> isBlocked() {
+    if (hasNextFuture != null) {
+      return hasNextFuture;
+    }
+    hasNextFuture = hasNextFuture();
+    return hasNextFuture;
+  }
+
+  private ListenableFuture<?> hasNextFuture() {
     if (hasLimit) {
-      return count < limit && schemaReader.hasNext();
+      while (curOffset < offset) {
+        // first time
+        return Futures.submit(
+            () -> {
+              while (curOffset < offset && schemaReader.hasNext()) {
+                schemaReader.next();
+                curOffset++;
+              }
+              return schemaReader.hasNext();
+            },
+            FragmentInstanceManager.getInstance().getIntoOperationExecutor());
+      }
+      if (count >= limit) {
+        return NOT_BLOCKED_FALSE;
+      } else {
+        return schemaReader.isBlocked();
+      }
     } else {
+      return schemaReader.isBlocked();
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      isBlocked().get();
       return schemaReader.hasNext();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 
   @Override
   public T next() {
-    if (!hasNext()) {
-      throw new NoSuchElementException();
-    }

Review Comment:
   Keep this hasNext check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to