xuang7 commented on code in PR #4181:
URL: https://github.com/apache/texera/pull/4181#discussion_r2749021371


##########
file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala:
##########
@@ -72,10 +68,20 @@ import 
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATA
 import org.jooq.exception.DataAccessException
 import software.amazon.awssdk.services.s3.model.UploadPartResponse
 import org.apache.commons.io.FilenameUtils
+import 
org.apache.texera.dao.jooq.generated.tables.records.DatasetUploadSessionRecord
+import 
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION
+import 
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART
+import org.jooq.exception.DataAccessException
+import software.amazon.awssdk.services.s3.model.UploadPartResponse
+import org.apache.commons.io.FilenameUtils
 import 
org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling
+import 
org.apache.texera.dao.jooq.generated.tables.records.DatasetUploadSessionRecord
 
 import java.sql.SQLException
 import scala.util.Try
+import java.sql.SQLException
+import java.time.OffsetDateTime
+import scala.util.Try

Review Comment:
   There are some duplicated and unused imports that can be removed.



##########
frontend/src/app/common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component.css:
##########
@@ -25,7 +25,7 @@
   outline: transparent;
   overflow: visible;
   position: relative;
-  : 1pt;
+  :1pt;

Review Comment:
   Seems like this change isn't relevant to the PR.



##########
file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala:
##########
@@ -1557,111 +1581,229 @@ class DatasetResource {
           s"Computed numParts=$numPartsLong is out of range 
1..$MAXIMUM_NUM_OF_MULTIPART_S3_PARTS"
         )
       }
-      val numPartsValue: Int = numPartsLong.toInt
+      val computedNumParts: Int = numPartsLong.toInt
 
-      // S3 multipart constraint: all non-final parts must be >= 5MiB.
-      // If we have >1 parts, then partSizeBytesValue is the non-final part 
size.
-      if (numPartsValue > 1 && partSizeBytesValue < 
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
+      if (computedNumParts > 1 && partSizeBytesValue < 
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
         throw new BadRequestException(
           s"partSizeBytes=$partSizeBytesValue is too small. " +
             s"All non-final parts must be >= $MINIMUM_NUM_OF_MULTIPART_S3_PART 
bytes."
         )
       }
-
-      // Reject if a session already exists
-      val exists = ctx.fetchExists(
-        ctx
-          .selectOne()
-          .from(DATASET_UPLOAD_SESSION)
+      var session: DatasetUploadSessionRecord = null
+      var rows: Result[Record2[Integer, String]] = null
+      try {
+        session = ctx
+          .selectFrom(DATASET_UPLOAD_SESSION)
           .where(
             DATASET_UPLOAD_SESSION.UID
               .eq(uid)
               .and(DATASET_UPLOAD_SESSION.DID.eq(did))
               .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
           )
-      )
-      if (exists) {
-        throw new WebApplicationException(
-          "Upload already in progress for this filePath",
-          Response.Status.CONFLICT
-        )
-      }
-
-      val presign = withLakeFSErrorHandling {
-        LakeFSStorageClient.initiatePresignedMultipartUploads(
-          repositoryName,
-          filePath,
-          numPartsValue
-        )
+          .forUpdate()
+          .noWait()
+          .fetchOne()
+        if (session != null) {
+        //Gain parts lock
+        rows = ctx
+          .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER, 
DATASET_UPLOAD_SESSION_PART.ETAG)
+          .from(DATASET_UPLOAD_SESSION_PART)
+          .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(session.getUploadId))
+          .forUpdate()
+          .noWait()
+          .fetch()
+        val dbFileSize = session.getFileSizeBytes
+        val dbPartSize = session.getPartSizeBytes
+        val dbNumParts = session.getNumPartsRequested
+        val createdAt: OffsetDateTime = session.getCreatedAt
+
+        val isExpired =
+          createdAt
+            .plusHours(PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS.toLong)
+            .isBefore(OffsetDateTime.now(createdAt.getOffset)) // or 
OffsetDateTime.now()
+
+        val conflictConfig =
+          dbFileSize != fileSizeBytesValue ||
+            dbPartSize != partSizeBytesValue ||
+            dbNumParts != computedNumParts ||
+            isExpired ||
+            Option(restart).exists(_.orElse(false))
+
+          if (conflictConfig) {
+            // Parts will be deleted automatically (ON DELETE CASCADE)
+            ctx
+              .deleteFrom(DATASET_UPLOAD_SESSION)
+              .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(session.getUploadId))
+              .execute()
+
+            try {
+              LakeFSStorageClient.abortPresignedMultipartUploads(
+                repositoryName,
+                filePath,
+                session.getUploadId,
+                session.getPhysicalAddress
+              )
+            } catch { case _: Throwable => () }
+            session = null
+            rows = null
+            }
+        }
+      } catch {
+        case e: DataAccessException
+            if Option(e.getCause)
+              .collect { case s: SQLException => s.getSQLState }
+              .contains("55P03") =>
+          throw new WebApplicationException(
+            "Another client is uploading this file",
+            Response.Status.CONFLICT
+          )
       }
 
-      val uploadIdStr = presign.getUploadId
-      val physicalAddr = presign.getPhysicalAddress
-
-      try {
-        val rowsInserted = ctx
-          .insertInto(DATASET_UPLOAD_SESSION)
-          .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath)
-          .set(DATASET_UPLOAD_SESSION.DID, did)
-          .set(DATASET_UPLOAD_SESSION.UID, uid)
-          .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
-          .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
-          .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, 
Integer.valueOf(numPartsValue))
-          .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, 
java.lang.Long.valueOf(fileSizeBytesValue))
-          .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, 
java.lang.Long.valueOf(partSizeBytesValue))
-          .onDuplicateKeyIgnore()
-          .execute()
-
-        if (rowsInserted != 1) {
-          LakeFSStorageClient.abortPresignedMultipartUploads(
+      if (session == null) {
+        val presign = withLakeFSErrorHandling {
+          LakeFSStorageClient.initiatePresignedMultipartUploads(
             repositoryName,
             filePath,
-            uploadIdStr,
-            physicalAddr
-          )
-          throw new WebApplicationException(
-            "Upload already in progress for this filePath",
-            Response.Status.CONFLICT
+            computedNumParts
           )
         }
 
-        // Pre-create part rows 1..numPartsValue with empty ETag.
-        // This makes per-part locking cheap and deterministic.
+        val uploadIdStr = presign.getUploadId
+        val physicalAddr = presign.getPhysicalAddress
 
-        val partNumberSeries = DSL.generateSeries(1, 
numPartsValue).asTable("gs", "pn")
-        val partNumberField = partNumberSeries.field("pn", classOf[Integer])
+        try {
+          val rowsInserted = ctx
+            .insertInto(DATASET_UPLOAD_SESSION)
+            .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath)
+            .set(DATASET_UPLOAD_SESSION.DID, did)
+            .set(DATASET_UPLOAD_SESSION.UID, uid)
+            .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
+            .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
+            .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, 
Integer.valueOf(computedNumParts))
+            .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, 
java.lang.Long.valueOf(fileSizeBytesValue))
+            .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, 
java.lang.Long.valueOf(partSizeBytesValue))
+            .onDuplicateKeyIgnore()
+            .execute()
+
+          if (rowsInserted == 1) {
+            val partNumberSeries =
+              DSL.generateSeries(1, computedNumParts).asTable("gs", 
"partNumberField")
+            val partNumberField = partNumberSeries.field("partNumberField", 
classOf[Integer])
 
-        ctx
-          .insertInto(
-            DATASET_UPLOAD_SESSION_PART,
-            DATASET_UPLOAD_SESSION_PART.UPLOAD_ID,
-            DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
-            DATASET_UPLOAD_SESSION_PART.ETAG
-          )
-          .select(
             ctx
+              .insertInto(
+                DATASET_UPLOAD_SESSION_PART,
+                DATASET_UPLOAD_SESSION_PART.UPLOAD_ID,
+                DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
+                DATASET_UPLOAD_SESSION_PART.ETAG
+              )
               .select(
-                inl(uploadIdStr),
-                partNumberField,
-                inl("") // placeholder empty etag
+                ctx
+                  .select(
+                    inl(uploadIdStr),
+                    partNumberField,
+                    inl("")
+                  )
+                  .from(partNumberSeries)
               )
-              .from(partNumberSeries)
-          )
-          .execute()
+              .execute()
+
+            session = ctx
+              .selectFrom(DATASET_UPLOAD_SESSION)
+              .where(
+                DATASET_UPLOAD_SESSION.UID
+                  .eq(uid)
+                  .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+                  .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+              )
+              .fetchOne()
+          } else {
+            try {
+              LakeFSStorageClient.abortPresignedMultipartUploads(
+                repositoryName,
+                filePath,
+                uploadIdStr,
+                physicalAddr
+              )
+            } catch { case _: Throwable => () }
+
+            session = ctx
+              .selectFrom(DATASET_UPLOAD_SESSION)
+              .where(
+                DATASET_UPLOAD_SESSION.UID
+                  .eq(uid)
+                  .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+                  .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+              )
+              .fetchOne()
+          }
+        } catch {
+          case e: Exception =>
+            try {
+              LakeFSStorageClient.abortPresignedMultipartUploads(
+                repositoryName,
+                filePath,
+                uploadIdStr,
+                physicalAddr
+              )
+            } catch { case _: Throwable => () }
+            throw e
+        }
+      }
 
-        Response.ok().build()
-      } catch {
-        case e: Exception =>
+      if (session == null) {
+        throw new WebApplicationException(
+          "Failed to create or locate upload session",
+          Response.Status.INTERNAL_SERVER_ERROR
+        )
+      }
+
+      val dbNumParts = session.getNumPartsRequested
+
+      val uploadId = session.getUploadId
+      val nParts = dbNumParts
+
+      // CHANGED: lock rows with NOWAIT; if any row is locked by another 
uploader -> 409
+      if (rows == null) {
+        rows =
           try {
-            LakeFSStorageClient.abortPresignedMultipartUploads(
-              repositoryName,
-              filePath,
-              uploadIdStr,
-              physicalAddr
-            )
-          } catch { case _: Throwable => () }
-          throw e
+            ctx
+              .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER, 
DATASET_UPLOAD_SESSION_PART.ETAG)
+              .from(DATASET_UPLOAD_SESSION_PART)
+              .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId))
+              .forUpdate()
+              .noWait()
+              .fetch()
+          } catch {
+            case e: DataAccessException
+                if Option(e.getCause)
+                  .collect { case s: SQLException => s.getSQLState }
+                  .contains("55P03") =>
+              throw new WebApplicationException(
+                "Another client is uploading parts for this file",
+                Response.Status.CONFLICT
+              )
+          }
       }
+
+      // CHANGED: compute missingParts + completedPartsCount from the SAME 
query result
+      val missingPartsSmartSorted = rows.asScala

Review Comment:
   This could be renamed since it's filtering and isn't sorted.



##########
frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts:
##########
@@ -32,98 +37,229 @@ import { UntilDestroy, untilDestroyed } from 
"@ngneat/until-destroy";
   styleUrls: ["./files-uploader.component.scss"],
 })
 export class FilesUploaderComponent {
-  @Input()
-  showUploadAlert: boolean = false;
+  @Input() showUploadAlert: boolean = false;
 
-  @Output()
-  uploadedFiles = new EventEmitter<FileUploadItem[]>();
+  @Output() uploadedFiles = new EventEmitter<FileUploadItem[]>();
 
   newUploadFileTreeNodes: DatasetFileNode[] = [];
 
   fileUploadingFinished: boolean = false;
-  // four types: "success", "info", "warning" and "error"
   fileUploadBannerType: "error" | "success" | "info" | "warning" = "success";
   fileUploadBannerMessage: string = "";
   singleFileUploadMaxSizeMiB: number = 20;
 
   constructor(
     private notificationService: NotificationService,
-    private adminSettingsService: AdminSettingsService
+    private adminSettingsService: AdminSettingsService,
+    private datasetService: DatasetService,
+    @Optional() @Host() private parent: DatasetDetailComponent,
+    private modal: NzModalService
   ) {
     this.adminSettingsService
       .getSetting("single_file_upload_max_size_mib")
       .pipe(untilDestroyed(this))
       .subscribe(value => (this.singleFileUploadMaxSizeMiB = parseInt(value)));
   }
 
-  hideBanner() {
+  private formatBytes(n: number): string {
+    const mib = n / (1024 * 1024);
+    if (mib >= 1024) return `${(mib / 1024).toFixed(2)} GiB`;
+    if (mib >= 1) return `${mib.toFixed(2)} MiB`;
+    return `${Math.max(1, Math.round(n / 1024))} KiB`;
+  }

Review Comment:
   There's an existing size formatter util 
(frontend/src/app/common/util/size-formatter.util.ts), might be better to use 
that one for consistency.



##########
frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts:
##########
@@ -32,98 +37,229 @@ import { UntilDestroy, untilDestroyed } from 
"@ngneat/until-destroy";
   styleUrls: ["./files-uploader.component.scss"],
 })
 export class FilesUploaderComponent {
-  @Input()
-  showUploadAlert: boolean = false;
+  @Input() showUploadAlert: boolean = false;
 
-  @Output()
-  uploadedFiles = new EventEmitter<FileUploadItem[]>();
+  @Output() uploadedFiles = new EventEmitter<FileUploadItem[]>();
 
   newUploadFileTreeNodes: DatasetFileNode[] = [];
 
   fileUploadingFinished: boolean = false;
-  // four types: "success", "info", "warning" and "error"
   fileUploadBannerType: "error" | "success" | "info" | "warning" = "success";
   fileUploadBannerMessage: string = "";
   singleFileUploadMaxSizeMiB: number = 20;
 
   constructor(
     private notificationService: NotificationService,
-    private adminSettingsService: AdminSettingsService
+    private adminSettingsService: AdminSettingsService,
+    private datasetService: DatasetService,
+    @Optional() @Host() private parent: DatasetDetailComponent,
+    private modal: NzModalService
   ) {
     this.adminSettingsService
       .getSetting("single_file_upload_max_size_mib")
       .pipe(untilDestroyed(this))
       .subscribe(value => (this.singleFileUploadMaxSizeMiB = parseInt(value)));
   }
 
-  hideBanner() {
+  private formatBytes(n: number): string {
+    const mib = n / (1024 * 1024);
+    if (mib >= 1024) return `${(mib / 1024).toFixed(2)} GiB`;
+    if (mib >= 1) return `${mib.toFixed(2)} MiB`;
+    return `${Math.max(1, Math.round(n / 1024))} KiB`;
+  }
+
+  private markForceRestart(item: FileUploadItem): void {
+    // uploader should call backend init with type=forceRestart when this is 
set
+    (item as any).restart = true;
+  }
+
+  private askResumeOrSkip(
+    item: FileUploadItem,
+    showForAll: boolean
+  ): Promise<"resume" | "resumeAll" | "restart" | "restartAll"> {
+    return new Promise(resolve => {
+      const fileName = item.name.split("/").pop() || item.name;
+      const sizeStr = this.formatBytes(item.file.size);
+
+      const ref = this.modal.create({
+        nzTitle: "Conflicting File",
+        nzMaskClosable: false,
+        nzClosable: false,
+        nzContent: `
+<div>

Review Comment:
   nzContent is currently built via a template string with raw HTML 
interpolation. It might be better to use an Angular ng-template (or a small 
component) for nzContent.



##########
frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts:
##########
@@ -426,7 +428,8 @@ export class DatasetDetailComponent implements OnInit {
                 file.name,
                 file.file,
                 this.chunkSizeMiB * 1024 * 1024,
-                this.maxConcurrentChunks
+                this.maxConcurrentChunks,
+                Boolean((file as any).restart)

Review Comment:
   We could add an optional restart?: boolean field.



##########
frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts:
##########
@@ -32,98 +37,229 @@ import { UntilDestroy, untilDestroyed } from 
"@ngneat/until-destroy";
   styleUrls: ["./files-uploader.component.scss"],
 })
 export class FilesUploaderComponent {
-  @Input()
-  showUploadAlert: boolean = false;
+  @Input() showUploadAlert: boolean = false;
 
-  @Output()
-  uploadedFiles = new EventEmitter<FileUploadItem[]>();
+  @Output() uploadedFiles = new EventEmitter<FileUploadItem[]>();
 
   newUploadFileTreeNodes: DatasetFileNode[] = [];
 
   fileUploadingFinished: boolean = false;
-  // four types: "success", "info", "warning" and "error"
   fileUploadBannerType: "error" | "success" | "info" | "warning" = "success";
   fileUploadBannerMessage: string = "";
   singleFileUploadMaxSizeMiB: number = 20;
 
   constructor(
     private notificationService: NotificationService,
-    private adminSettingsService: AdminSettingsService
+    private adminSettingsService: AdminSettingsService,
+    private datasetService: DatasetService,
+    @Optional() @Host() private parent: DatasetDetailComponent,
+    private modal: NzModalService
   ) {
     this.adminSettingsService
       .getSetting("single_file_upload_max_size_mib")
       .pipe(untilDestroyed(this))
       .subscribe(value => (this.singleFileUploadMaxSizeMiB = parseInt(value)));
   }
 
-  hideBanner() {
+  private formatBytes(n: number): string {
+    const mib = n / (1024 * 1024);
+    if (mib >= 1024) return `${(mib / 1024).toFixed(2)} GiB`;
+    if (mib >= 1) return `${mib.toFixed(2)} MiB`;
+    return `${Math.max(1, Math.round(n / 1024))} KiB`;
+  }
+
+  private markForceRestart(item: FileUploadItem): void {
+    // uploader should call backend init with type=forceRestart when this is 
set
+    (item as any).restart = true;

Review Comment:
   We could add an optional restart?: boolean field to FileUploadItem and set 
it directly.



##########
frontend/src/app/workspace/component/menu/coeditor-user-icon/coeditor-user-icon.component.css:
##########
@@ -16,4 +16,3 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-

Review Comment:
   Seems like this change isn't relevant to the PR.



##########
file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala:
##########
@@ -1487,11 +1495,38 @@ class DatasetResource {
     dataset
   }
 
+  private def listMultipartUploads(did: Integer, requesterUid: Int): Response 
= {
+    withTransaction(context) { ctx =>
+      if (!userHasWriteAccess(ctx, did, requesterUid)) {
+        throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
+      }
+
+      val filePaths =
+        ctx
+          .selectDistinct(DATASET_UPLOAD_SESSION.FILE_PATH)
+          .from(DATASET_UPLOAD_SESSION)
+          .where(DATASET_UPLOAD_SESSION.DID.eq(did))
+          .and(
+            DSL.condition(
+              "created_at > current_timestamp - (? * interval '1 hour')",

Review Comment:
   init and list seem to be checking expiry using different time sources (app 
now() and DB current_timestamp). Might be better to unify this so both use the 
same source to avoid inconsistent expiry decisions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to