LuciferYang commented on pull request #33556:
URL: https://github.com/apache/spark/pull/33556#issuecomment-892001729
If we consider that `close()` or other methods will write some suffix data
to the file end,
I try to write a case:
```scala
test("Close will wirte some meta data to os") {
when(blockManager.getDiskWriter(
any(classOf[BlockId]),
any(classOf[File]),
any(classOf[SerializerInstance]),
anyInt(),
any(classOf[ShuffleWriteMetrics])
)).thenAnswer((invocation: InvocationOnMock) => {
val args = invocation.getArguments
val shuffleWriteMetrics = args(4).asInstanceOf[ShuffleWriteMetrics]
metricsCreated += shuffleWriteMetrics
new DiskBlockObjectWriter(
args(1).asInstanceOf[File],
blockManager.serializerManager,
args(2).asInstanceOf[SerializerInstance],
args(3).asInstanceOf[Int],
false,
shuffleWriteMetrics,
args(0).asInstanceOf[BlockId]
) {
// Suppose we can override this method, for example, change private
to protected
override def closeResources(): Unit = {
val meta = "suffix".getBytes("UTF-8") // write `suffix` to the
file before close.
new FileOutputStream(file, true).write(meta)
super.closeResources()
}
}
})
// Test data size corresponds to three different scenarios:
// 1. spillBatchSize -> `objectsWritten == 0`
// 2. spillBatchSize + 1 -> `objectsWritten > 0`
// 3. 0 -> Not enter `inMemoryIterator.hasNext` loop and `objectsWritten
== 0`
val dataSizes = {
val spillBatchSize = conf.get(config.SHUFFLE_SPILL_BATCH_SIZE)
Seq(spillBatchSize, spillBatchSize + 1, 0)
}
dataSizes.foreach { dataSize =>
val dataBuffer = new PartitionedPairBuffer[Int, Int]
(0 until dataSize.toInt).foreach(i => dataBuffer.insert(0, 0, i))
val externalSorter = new TestExternalSorter[Int, Int, Int](taskContext)
externalSorter.spill(dataBuffer)
}
// Verify recordsWritten same as data size
assert(metricsCreated.length == dataSizes.length)
metricsCreated.zip(dataSizes).foreach {
case (metrics, dataSize) => assert(metrics.recordsWritten == dataSize)
}
// Verify bytesWritten same as file size
assert(metricsCreated.length == filesCreated.length)
filesCreated.foreach(file => assert(file.exists()))
metricsCreated.zip(filesCreated).foreach {
case (metrics, file) => assert(metrics.bytesWritten == file.length())
}
}
```
If the code in ExternalSorter keep original logic:
```scala
if (objectsWritten > 0) {
flush()
writer.close()
} else {
writer.revertPartialWritesAndClose()
}
```
The above test suite will failed when `2. spillBatchSize + 1 ->
objectsWritten > 0`, Expected file length is 40128 but actually it is 40134,
the `suffix` did write to the end of the file. In other scenes, `suffix` was
truncated.
When `objectsWritten > 0` also uses `writer.revertPartialWritesAndClose()`,
all cases will pass.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]