Yicong-Huang commented on code in PR #5149:
URL: https://github.com/apache/texera/pull/5149#discussion_r3290119395


##########
common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala:
##########
@@ -428,7 +434,32 @@ object IcebergUtil {
         .project(schema)
         .createReaderFunc(readerFunc)
         .build()
-    closeableIterable.iterator().asScala
+    new CloseableScalaIterator(closeableIterable)
   }
 
 }
+
+/**
+  * A Scala `Iterator` that also owns the lifecycle of an Iceberg
+  * `CloseableIterable`. Closing this iterator (or letting `Using.resource`
+  * close it) releases the underlying CloseableIterable, which in turn closes
+  * the Parquet reader / S3InputStream / AWS HTTP-pool slot that the iterable
+  * acquired.
+  */
+class CloseableScalaIterator[T](source: CloseableIterable[T])

Review Comment:
   can we just use Iceberg's `CloseableIterator`? I don't see the need to wrap 
it again. 



##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala:
##########
@@ -202,38 +207,43 @@ private[storage] class IcebergDocument[T >: Null <: 
AnyRef](
             }
             table.foreach(_.refresh())
 
-            // Retrieve and sort the file scan tasks by file sequence number
+            // Retrieve and sort the file scan tasks by file sequence number.
+            // Materialize inside `Using.resource` so the `planFiles()`
+            // CloseableIterable is released after collection.
             val fileScanTasksIterator: Iterator[FileScanTask] = table match {
               case Some(t) =>
                 val currentSnapshotId = 
Option(t.currentSnapshot()).map(_.snapshotId())
-                val fileScanTasks = (lastSnapshotId, currentSnapshotId) match {
-                  // Read from the start
-                  case (None, Some(_)) =>
-                    val tasks = t.newScan().planFiles().iterator().asScala
-                    lastSnapshotId = currentSnapshotId
-                    tasks
-
-                  // Read incrementally from the last snapshot
-                  case (Some(lastId), Some(currId)) if lastId != currId =>
-                    val tasks = t
-                      .newIncrementalAppendScan()
-                      .fromSnapshotExclusive(lastId)
-                      .toSnapshot(currId)
-                      .planFiles()
-                      .iterator()
-                      .asScala
-                    lastSnapshotId = currentSnapshotId
-                    tasks
-
-                  // No new data
-                  case (Some(lastId), Some(currId)) if lastId == currId =>
-                    Iterator.empty
-
-                  // Default: No data yet
-                  case _ =>
-                    Iterator.empty
-                }
-                
fileScanTasks.toSeq.sortBy(_.file().fileSequenceNumber()).iterator
+                val fileScanTasks: Seq[FileScanTask] =
+                  (lastSnapshotId, currentSnapshotId) match {
+                    // Read from the start
+                    case (None, Some(_)) =>
+                      val tasks = Using.resource(t.newScan().planFiles()) { ci 
=>
+                        ci.iterator().asScala.toSeq
+                      }
+                      lastSnapshotId = currentSnapshotId
+                      tasks
+
+                    // Read incrementally from the last snapshot
+                    case (Some(lastId), Some(currId)) if lastId != currId =>
+                      val tasks = Using.resource(

Review Comment:
   I don't think we need, or should consume and materialize all from the 
iterator. Iceberg's CloseableIterator can close before the iterator is fully 
consumed. 



##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala:
##########
@@ -125,7 +125,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
       .getOrElse(
         return 0
       )
-    table.newScan().planFiles().iterator().asScala.map(f => 
f.file().recordCount()).sum
+    Using.resource(table.newScan().planFiles()) { tasks =>

Review Comment:
   Using.resource materializes all elements from the iterator. then essentially 
this dismisses the power of iterator. Please avoid to do so. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to