LuciferYang commented on pull request #33556:
URL: https://github.com/apache/spark/pull/33556#issuecomment-892001729


   If we consider that `close()` or other methods will write some suffix data 
to the file end, 
   
   I try to write a case:
   
   ```scala
   test("Close will wirte some meta data to os") {
   
       when(blockManager.getDiskWriter(
         any(classOf[BlockId]),
         any(classOf[File]),
         any(classOf[SerializerInstance]),
         anyInt(),
         any(classOf[ShuffleWriteMetrics])
       )).thenAnswer((invocation: InvocationOnMock) => {
         val args = invocation.getArguments
         val shuffleWriteMetrics = args(4).asInstanceOf[ShuffleWriteMetrics]
         metricsCreated += shuffleWriteMetrics
         new DiskBlockObjectWriter(
           args(1).asInstanceOf[File],
           blockManager.serializerManager,
           args(2).asInstanceOf[SerializerInstance],
           args(3).asInstanceOf[Int],
           false,
           shuffleWriteMetrics,
           args(0).asInstanceOf[BlockId]
         ) {
          // Suppose we can override this method, for example,  change private 
to protected
           override def closeResources(): Unit = {
             val meta = "suffix".getBytes("UTF-8") // write `suffix` to the 
file before close.
             new FileOutputStream(file, true).write(meta)
             super.closeResources()
           }
         }
       })
   
       // Test data size corresponds to three different scenarios:
       // 1. spillBatchSize -> `objectsWritten == 0`
       // 2. spillBatchSize + 1 -> `objectsWritten > 0`
       // 3. 0 -> Not enter `inMemoryIterator.hasNext` loop and `objectsWritten 
== 0`
       val dataSizes = {
         val spillBatchSize = conf.get(config.SHUFFLE_SPILL_BATCH_SIZE)
         Seq(spillBatchSize, spillBatchSize + 1, 0)
       }
   
       dataSizes.foreach { dataSize =>
         val dataBuffer = new PartitionedPairBuffer[Int, Int]
         (0 until dataSize.toInt).foreach(i => dataBuffer.insert(0, 0, i))
         val externalSorter = new TestExternalSorter[Int, Int, Int](taskContext)
         externalSorter.spill(dataBuffer)
       }
   
       // Verify recordsWritten same as data size
       assert(metricsCreated.length == dataSizes.length)
       metricsCreated.zip(dataSizes).foreach {
         case (metrics, dataSize) => assert(metrics.recordsWritten == dataSize)
       }
   
       // Verify bytesWritten same as file size
       assert(metricsCreated.length == filesCreated.length)
       filesCreated.foreach(file => assert(file.exists()))
       metricsCreated.zip(filesCreated).foreach {
         case (metrics, file) => assert(metrics.bytesWritten == file.length())
       }
     }
   ```
   
   If the code in ExternalSorter keep original logic:
   ```scala
   if (objectsWritten > 0) {
           flush()
           writer.close()
         } else {
           writer.revertPartialWritesAndClose()
         }
   ```
   
   The above test suite will failed when `2. spillBatchSize + 1 -> 
objectsWritten > 0`, Expected file length is 40128 but actually it is 40134, 
the `suffix` did write to the end of the file. In other scenes, `suffix` was 
truncated.
   
   When `objectsWritten > 0` also uses `writer.revertPartialWritesAndClose()`, 
all cases will pass.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to