kevins-29 opened a new pull request #35613:
URL: https://github.com/apache/spark/pull/35613


   ### What changes were proposed in this pull request?
   Wrapping the DataInputStream in the SparkPlan.decodeUnsafeRows method with a 
NextIterator as opposed to a plain Iterator, this will allow us to close the 
DataInputStream properly.
   
   ### Why are the changes needed?
   SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. 
This meant that all usages of `CompressionCodec.compressedInputStream` would 
need to manually close the stream as this would no longer be handled by the 
finaliser mechanism.
   
   In SparkPlan, the result of `CompressionCodec.compressedInputStream` is 
wrapped in an Iterator which never calls close. 
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   
   #### Spark Shell Configuration
   ```bash
   $> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch --Xms=1g"
   $> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd
   ```
   
   #### Test Script
   ```scala
   import java.sql.Timestamp
   import java.time.Instant
   import spark.implicits._
   
   case class Record(timestamp: Timestamp, batch: Long, value: Long)
   
   (1 to 300).foreach { batch =>
     sc.parallelize(1 to 100000).map(Record(Timestamp.from(Instant.now()), 
batch, _)).toDS.write.parquet(s"test_data/batch_$batch")
   }
   
   (1 to 300).foreach(batch => 
spark.read.parquet(s"test_data/batch_$batch").as[Record].repartition().collect())
   
   ```
   
   #### Memory Monitor
   ```shell
   $> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | 
grep "total kB" | awk '{print $4}'); sleep 10; done;
   ``` 
   
   #### Results
   
   ##### Before - 1st Run
   ```
   "2022-02-22 14:12:27",1449308
   "2022-02-22 14:12:37",1449396
   "2022-02-22 14:12:47",1531332
   "2022-02-22 14:12:57",1742336
   "2022-02-22 14:13:08",1796308
   "2022-02-22 14:13:18",1811164
   "2022-02-22 14:13:28",1855408
   "2022-02-22 14:13:38",1859080
   "2022-02-22 14:13:48",1901852
   "2022-02-22 14:13:58",1915132
   "2022-02-22 14:14:08",1983812
   "2022-02-22 14:14:18",1982132
   "2022-02-22 14:14:28",1996888
   "2022-02-22 14:14:38",1997440
   "2022-02-22 14:14:48",1999376
   "2022-02-22 14:14:59",2000100
   "2022-02-22 14:15:09",2213684
   "2022-02-22 14:15:19",2358572
   "2022-02-22 14:15:29",2559824
   "2022-02-22 14:15:39",2728744
   "2022-02-22 14:15:49",2900608
   "2022-02-22 14:15:59",3095464
   "2022-02-22 14:16:09",3277204
   "2022-02-22 14:16:19",3470144
   "2022-02-22 14:16:29",3628048
   "2022-02-22 14:16:40",3628048
   "2022-02-22 14:16:50",3628048
   ```
   
   ##### Before - 2nd Run
   ```
   "2022-02-22 14:27:33",1498036
   "2022-02-22 14:27:43",1581348
   "2022-02-22 14:27:53",1781136
   "2022-02-22 14:28:03",1792796
   "2022-02-22 14:28:13",1853848
   "2022-02-22 14:28:23",1872000
   "2022-02-22 14:28:33",1874008
   "2022-02-22 14:28:44",1892828
   "2022-02-22 14:28:54",1927752
   "2022-02-22 14:29:04",1950756
   "2022-02-22 14:29:14",1981436
   "2022-02-22 14:29:24",1969212
   "2022-02-22 14:29:34",1971572
   "2022-02-22 14:29:44",1972124
   "2022-02-22 14:29:54",2113104
   "2022-02-22 14:30:04",2283932
   "2022-02-22 14:30:14",2475672
   "2022-02-22 14:30:24",2670424
   "2022-02-22 14:30:35",2883556
   "2022-02-22 14:30:45",3094044
   "2022-02-22 14:30:55",3333840
   "2022-02-22 14:31:05",3570224
   "2022-02-22 14:31:15",3618060
   "2022-02-22 14:31:25",3618060
   "2022-02-22 14:31:35",3618060
   "2022-02-22 14:31:46",3617892
   "2022-02-22 14:31:56",3617220
   "2022-02-22 14:32:06",3593240
   "2022-02-22 14:32:16",3583920
   "2022-02-22 14:32:26",3583920
   ```
   
   ##### After - 1st Run
   ```
   "2022-02-22 14:22:17",1483220
   "2022-02-22 14:22:27",1606236
   "2022-02-22 14:22:37",1757304
   "2022-02-22 14:22:47",1809436
   "2022-02-22 14:22:57",1863628
   "2022-02-22 14:23:07",1860172
   "2022-02-22 14:23:17",1858216
   "2022-02-22 14:23:27",1898056
   "2022-02-22 14:23:37",1936076
   "2022-02-22 14:23:48",1937776
   "2022-02-22 14:23:58",1926536
   "2022-02-22 14:24:08",1936188
   "2022-02-22 14:24:18",2048996
   "2022-02-22 14:24:28",2093032
   "2022-02-22 14:24:38",2118164
   "2022-02-22 14:24:48",2145980
   "2022-02-22 14:24:58",2133464
   "2022-02-22 14:25:08",2134964
   "2022-02-22 14:25:18",2132620
   "2022-02-22 14:25:28",2132620
   ```
   
   ##### After - 2nd Run
   ```
   "2022-02-22 14:33:44",1463284
   "2022-02-22 14:33:54",1558932
   "2022-02-22 14:34:04",1749700
   "2022-02-22 14:34:14",1785908
   "2022-02-22 14:34:24",1829740
   "2022-02-22 14:34:34",1891472
   "2022-02-22 14:34:44",1882452
   "2022-02-22 14:34:54",1880788
   "2022-02-22 14:35:04",1893988
   "2022-02-22 14:35:14",1968108
   "2022-02-22 14:35:24",1974868
   "2022-02-22 14:35:34",1977516
   "2022-02-22 14:35:45",1964580
   "2022-02-22 14:35:55",1966580
   "2022-02-22 14:36:05",2057708
   "2022-02-22 14:36:15",2116624
   "2022-02-22 14:36:25",2126248
   "2022-02-22 14:36:35",2123440
   "2022-02-22 14:36:45",2123724
   "2022-02-22 14:36:55",2123540
   "2022-02-22 14:37:05",2122028
   "2022-02-22 14:37:15",2122028
   "2022-02-22 14:37:25",2122028
   "2022-02-22 14:37:35",2122028
   "2022-02-22 14:37:46",2118072
   "2022-02-22 14:37:56",2096524
   "2022-02-22 14:38:06",2096524
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to