Copilot commented on code in PR #4111:
URL: https://github.com/apache/texera/pull/4111#discussion_r2594592638


##########
amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala:
##########
@@ -349,6 +349,6 @@ class WorkflowService(
       }
     }
     // Delete big objects
-    BigObjectManager.deleteAllObjects()
+    LargeBinaryManager.deleteAllObjects()

Review Comment:
   `clearExecutionResources` unconditionally calls 
`LargeBinaryManager.deleteAllObjects()`, which deletes all large-binary objects 
in the shared `texera-large-binaries` bucket even though the method is intended 
to clean up resources for a single execution `eid`. This means any user action 
that triggers cleanup for one workflow execution can erase all other users’ 
large-binary data, violating tenant isolation and causing global data loss. 
Instead of a bucket-wide wipe, track the specific `LargeBinary` URIs associated 
with the given `eid` and delete only those objects; reserve any global 
`deleteAllObjects` call for tightly controlled, admin-only maintenance flows, 
if needed at all.



##########
amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala:
##########
@@ -601,7 +601,7 @@ class WorkflowResource extends LazyLogging {
         .asScala
         .toList
 
-      BigObjectManager.deleteAllObjects()
+      LargeBinaryManager.deleteAllObjects()

Review Comment:
   `deleteWorkflow` calls `LargeBinaryManager.deleteAllObjects()` before 
verifying that the requesting user actually owns the workflows in 
`workflowIDs`, and `deleteAllObjects` wipes **all** large-binary objects in the 
shared `texera-large-binaries` bucket. Any authenticated user with `REGULAR` 
role can therefore trigger a global deletion of all stored large binaries 
(including other users’ data) simply by POSTing to this endpoint with arbitrary 
or non-owned workflow IDs. Replace this global wipe with deletion that is 
scoped to the specific executions/workflows being removed (e.g., track and 
delete only the `LargeBinary` URIs associated with the resolved `eids` for 
workflows the user owns) and move any remaining global cleanup behind an 
admin-only path if truly necessary.



##########
common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala:
##########
@@ -234,8 +234,8 @@ object IcebergUtil {
       case AttributeType.BOOLEAN   => Types.BooleanType.get()
       case AttributeType.TIMESTAMP => Types.TimestampType.withoutZone()
       case AttributeType.BINARY    => Types.BinaryType.get()
-      case AttributeType.BIG_OBJECT =>
-        Types.StringType.get() // Store BigObjectPointer URI as string
+      case AttributeType.LARGE_BINARY =>
+        Types.StringType.get() // Store LargeBinaryPointer URI as string

Review Comment:
   Comment references "LargeBinaryPointer" but the class is named "LargeBinary" 
(not "LargeBinaryPointer"). This should be "Store LargeBinary URI as string" 
for accuracy.
   ```suggestion
           Types.StringType.get() // Store LargeBinary URI as string
   ```



##########
amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala:
##########
@@ -349,6 +349,6 @@ class WorkflowService(
       }
     }
     // Delete big objects

Review Comment:
   Comment still references old terminology "big objects" and should be updated 
to "large binaries" to be consistent with the renaming
   ```suggestion
       // Delete large binaries
   ```



##########
common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala:
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.apache.texera.amber.core.tuple.LargeBinary
+import org.scalatest.funsuite.AnyFunSuite
+
+class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase {
+
+  /** Creates a large binary from string data and returns it. */
+  private def createLargeBinary(data: String): LargeBinary = {
+    val largeBinary = new LargeBinary()
+    val out = new LargeBinaryOutputStream(largeBinary)
+    try {
+      out.write(data.getBytes)
+    } finally {
+      out.close()
+    }
+    largeBinary
+  }
+
+  /** Verifies standard bucket name. */
+  private def assertStandardBucket(pointer: LargeBinary): Unit = {
+    assert(pointer.getBucketName == "texera-large-binaries")
+    assert(pointer.getUri.startsWith("s3://texera-large-binaries/"))
+  }
+
+  // ========================================
+  // LargeBinaryInputStream Tests (Standard Java InputStream)
+  // ========================================
+
+  test("LargeBinaryInputStream should read all bytes from stream") {
+    val data = "Hello, World! This is a test."
+    val largeBinary = createLargeBinary(data)
+
+    val stream = new LargeBinaryInputStream(largeBinary)
+    assert(stream.readAllBytes().sameElements(data.getBytes))
+    stream.close()
+
+    LargeBinaryManager.deleteAllObjects()
+  }
+
+  test("LargeBinaryInputStream should read exact number of bytes") {
+    val largeBinary = createLargeBinary("0123456789ABCDEF")
+
+    val stream = new LargeBinaryInputStream(largeBinary)
+    val result = stream.readNBytes(10)
+
+    assert(result.length == 10)
+    assert(result.sameElements("0123456789".getBytes))
+    stream.close()
+
+    LargeBinaryManager.deleteAllObjects()
+  }
+
+  test("LargeBinaryInputStream should handle reading more bytes than 
available") {
+    val data = "Short"
+    val largeBinary = createLargeBinary(data)
+
+    val stream = new LargeBinaryInputStream(largeBinary)
+    val result = stream.readNBytes(100)
+
+    assert(result.length == data.length)
+    assert(result.sameElements(data.getBytes))
+    stream.close()
+
+    LargeBinaryManager.deleteAllObjects()
+  }
+
+  test("LargeBinaryInputStream should support standard single-byte read") {
+    val largeBinary = createLargeBinary("ABC")
+
+    val stream = new LargeBinaryInputStream(largeBinary)
+    assert(stream.read() == 65) // 'A'
+    assert(stream.read() == 66) // 'B'
+    assert(stream.read() == 67) // 'C'
+    assert(stream.read() == -1) // EOF
+    stream.close()
+
+    LargeBinaryManager.deleteAllObjects()
+  }
+
+  test("LargeBinaryInputStream should return -1 at EOF") {
+    val largeBinary = createLargeBinary("EOF")
+
+    val stream = new LargeBinaryInputStream(largeBinary)
+    stream.readAllBytes() // Read all data
+    assert(stream.read() == -1)
+    stream.close()
+
+    LargeBinaryManager.deleteAllObjects()
+  }
+
+  test("LargeBinaryInputStream should throw exception when reading from closed 
stream") {
+    val largeBinary = createLargeBinary("test")
+
+    val stream = new LargeBinaryInputStream(largeBinary)
+    stream.close()
+
+    assertThrows[java.io.IOException](stream.read())
+    assertThrows[java.io.IOException](stream.readAllBytes())
+
+    LargeBinaryManager.deleteAllObjects()
+  }
+
+  test("LargeBinaryInputStream should handle multiple close calls") {
+    val largeBinary = createLargeBinary("test")
+
+    val stream = new LargeBinaryInputStream(largeBinary)
+    stream.close()
+    stream.close() // Should not throw
+
+    LargeBinaryManager.deleteAllObjects()
+  }
+
+  test("LargeBinaryInputStream should read large data correctly") {
+    val largeData = Array.fill[Byte](20000)((scala.util.Random.nextInt(256) - 
128).toByte)
+    val largeBinary = new LargeBinary()
+    val out = new LargeBinaryOutputStream(largeBinary)
+    try {
+      out.write(largeData)
+    } finally {
+      out.close()
+    }
+
+    val stream = new LargeBinaryInputStream(largeBinary)
+    val result = stream.readAllBytes()
+    assert(result.sameElements(largeData))
+    stream.close()
+
+    LargeBinaryManager.deleteAllObjects()
+  }
+
+  // ========================================
+  // LargeBinaryManager Tests
+  // ========================================
+
+  test("LargeBinaryManager should create a large binary") {
+    val pointer = createLargeBinary("Test large binary data")
+
+    assertStandardBucket(pointer)
+  }
+
+  test("LargeBinaryInputStream should open and read a large binary") {
+    val data = "Hello from large binary!"
+    val pointer = createLargeBinary(data)
+
+    val stream = new LargeBinaryInputStream(pointer)
+    val readData = stream.readAllBytes()
+    stream.close()
+
+    assert(readData.sameElements(data.getBytes))
+  }
+
+  test("LargeBinaryInputStream should fail to open non-existent large binary") 
{
+    val fakeLargeBinary = new 
LargeBinary("s3://texera-large-binaries/nonexistent/file")
+    val stream = new LargeBinaryInputStream(fakeLargeBinary)
+
+    try {
+      intercept[Exception] {
+        stream.read()
+      }
+    } finally {
+      try { stream.close() }
+      catch { case _: Exception => }
+    }
+  }
+
+  test("LargeBinaryManager should delete all large binarys") {

Review Comment:
   Spelling error: "binarys" should be "binaries"
   ```suggestion
     test("LargeBinaryManager should delete all large binaries") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to