gbrd opened a new issue, #1407:
URL: https://github.com/apache/pekko-connectors/issues/1407

   ### Version
   
   - Pekko Connectors: 1.1.0 
   - Pekko Streams: 1.1.x
   - Scala: 2.13.x
   
   ### Description
   The TarReaderStage can terminate prematurely when reading TAR archives 
containing multiple files, resulting in incomplete extraction. This occurs when 
the upstream source closes (e.g., S3 connection) while the internal buffer 
still contains unprocessed file headers/data.
   
   ### Root Cause
   In TarReaderStage.scala at line 104, within the readTrailer() method:
   ```
   def readTrailer(metadata: TarArchiveMetadata,
       buffer: ByteString,
       subSource: Option[SubSourceOutlet[ByteString]]): Unit = {
     val trailerLength = TarArchiveEntry.trailerLength(metadata)
     if (buffer.length >= trailerLength) {
       subSource.foreach(_.complete())
       if (isClosed(flowIn)) completeStage()  // ← BUG ?: called before 
readHeader()
       readHeader(buffer.drop(trailerLength)) // ← never executed if stage 
completed ??
     } else setHandlers(flowIn, flowOut, new ReadPastTrailer(metadata, buffer, 
subSource))
   }
   ```
   Problem: completeStage() is called before readHeader(), which means:
   
   1. If flowIn is closed (upstream finished)
   2. And the buffer contains remaining TAR entries (headers/data)
   3. The stage completes before processing the remaining buffer content
   4. Subsequent files in the TAR archive are never extracted
   
   ### Expected Behavior
   
   The stage should only complete when:
   - The upstream is closed AND
   - The buffer is empty (no remaining data to process)
   
   ### Actual Behavior
   The stage completes as soon as the upstream closes, even if the buffer 
contains unprocessed TAR entries, causing data loss.
   
   ### Reproduction
   ```
   import org.apache.pekko.stream.connectors.s3.S3
   import org.apache.pekko.stream.connectors.file.scaladsl.Archive
   
   // TAR archive with multiple files stored in S3
   S3.getObject(bucket, key)
     .via(Archive.tarReader())
     .runForeach { case (metadata, source) =>
       println(s"Processing: ${metadata.filePath}")
       source.runWith(Sink.ignore)
     }
   
   // Result: Only first file(s) are extracted, 
   // remaining files are lost when S3 closes the connection
   ```
   
   ### Workaround
   Adding an empty ByteString at the end forces a final processing cycle:
   ```
   S3.getObject(bucket, key)
     .concat(Source.single(ByteString.empty))  // ← workaround
     .via(Archive.tarReader())
   ```
   
   This prevents isClosed(flowIn) from being true during the last readTrailer() 
call, allowing readHeader() to process all remaining entries.
   
   ### Proposed Fix
   ```
   def readTrailer(metadata: TarArchiveMetadata,
       buffer: ByteString,
       subSource: Option[SubSourceOutlet[ByteString]]): Unit = {
     val trailerLength = TarArchiveEntry.trailerLength(metadata)
     if (buffer.length >= trailerLength) {
       subSource.foreach(_.complete())
       val remainingBuffer = buffer.drop(trailerLength)
       
       if (remainingBuffer.isEmpty && isClosed(flowIn)) {
         completeStage()  // Safe: no more data to process
       } else {
         readHeader(remainingBuffer)  // Continue processing
       }
     } else setHandlers(flowIn, flowOut, new ReadPastTrailer(metadata, buffer, 
subSource))
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to