gbrd opened a new issue, #1407:
URL: https://github.com/apache/pekko-connectors/issues/1407
### Version
- Pekko Connectors: 1.1.0
- Pekko Streams: 1.1.x
- Scala: 2.13.x
### Description
The TarReaderStage can terminate prematurely when reading TAR archives
containing multiple files, resulting in incomplete extraction. This occurs when
the upstream source closes (e.g., S3 connection) while the internal buffer
still contains unprocessed file headers/data.
### Root Cause
In TarReaderStage.scala at line 104, within the readTrailer() method:
```
def readTrailer(metadata: TarArchiveMetadata,
buffer: ByteString,
subSource: Option[SubSourceOutlet[ByteString]]): Unit = {
val trailerLength = TarArchiveEntry.trailerLength(metadata)
if (buffer.length >= trailerLength) {
subSource.foreach(_.complete())
if (isClosed(flowIn)) completeStage() // ← BUG ?: called before
readHeader()
readHeader(buffer.drop(trailerLength)) // ← never executed if stage
completed ??
} else setHandlers(flowIn, flowOut, new ReadPastTrailer(metadata, buffer,
subSource))
}
```
Problem: completeStage() is called before readHeader(), which means:
1. If flowIn is closed (upstream finished)
2. And the buffer contains remaining TAR entries (headers/data)
3. The stage completes before processing the remaining buffer content
4. Subsequent files in the TAR archive are never extracted
### Expected Behavior
The stage should only complete when:
- The upstream is closed AND
- The buffer is empty (no remaining data to process)
### Actual Behavior
The stage completes as soon as the upstream closes, even if the buffer
contains unprocessed TAR entries, causing data loss.
### Reproduction
```
import org.apache.pekko.stream.connectors.s3.S3
import org.apache.pekko.stream.connectors.file.scaladsl.Archive
// TAR archive with multiple files stored in S3
S3.getObject(bucket, key)
.via(Archive.tarReader())
.runForeach { case (metadata, source) =>
println(s"Processing: ${metadata.filePath}")
source.runWith(Sink.ignore)
}
// Result: Only first file(s) are extracted,
// remaining files are lost when S3 closes the connection
```
### Workaround
Adding an empty ByteString at the end forces a final processing cycle:
```
S3.getObject(bucket, key)
.concat(Source.single(ByteString.empty)) // ← workaround
.via(Archive.tarReader())
```
This prevents isClosed(flowIn) from being true during the last readTrailer()
call, allowing readHeader() to process all remaining entries.
### Proposed Fix
```
def readTrailer(metadata: TarArchiveMetadata,
buffer: ByteString,
subSource: Option[SubSourceOutlet[ByteString]]): Unit = {
val trailerLength = TarArchiveEntry.trailerLength(metadata)
if (buffer.length >= trailerLength) {
subSource.foreach(_.complete())
val remainingBuffer = buffer.drop(trailerLength)
if (remainingBuffer.isEmpty && isClosed(flowIn)) {
completeStage() // Safe: no more data to process
} else {
readHeader(remainingBuffer) // Continue processing
}
} else setHandlers(flowIn, flowOut, new ReadPastTrailer(metadata, buffer,
subSource))
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]