Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2024-01-25 Thread via GitHub


dongjoon-hyun commented on PR #44172:
URL: https://github.com/apache/spark/pull/44172#issuecomment-1910663949

   Thank you for sharing, @pan3793 . 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2024-01-24 Thread via GitHub


pan3793 commented on PR #44172:
URL: https://github.com/apache/spark/pull/44172#issuecomment-1909449492

   I backported this patch to the internal 3.1.2, and tasks hang occurred, I 
posted details at https://github.com/luben/zstd-jni/issues/298
   
   I haven't found the root cause, just posted the information here in case 
anyone has a similar issue in the future.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


beliefer commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1416538149


##
core/src/main/scala/org/apache/spark/io/CompressionCodec.scala:
##
@@ -233,18 +233,22 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 NoPool.INSTANCE
   }
 
+  private val workers = conf.get(IO_COMPRESSION_ZSTD_WORKERS)
+
   override def compressedOutputStream(s: OutputStream): OutputStream = {
 // Wrap the zstd output stream in a buffered output stream, so that we can
 // avoid overhead excessive of JNI call while trying to compress small 
amount of data.
-val os = new ZstdOutputStreamNoFinalizer(s, bufferPool).setLevel(level)
+val os = new ZstdOutputStreamNoFinalizer(s, 
bufferPool).setLevel(level).setWorkers(workers)

Review Comment:
   Thank you for the explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on PR #44172:
URL: https://github.com/apache/spark/pull/44172#issuecomment-1841089414

   Thank you, @yaooqinn and all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415474154


##
core/src/main/scala/org/apache/spark/io/CompressionCodec.scala:
##
@@ -233,18 +233,22 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 NoPool.INSTANCE
   }
 
+  private val workers = conf.get(IO_COMPRESSION_ZSTD_WORKERS)
+
   override def compressedOutputStream(s: OutputStream): OutputStream = {
 // Wrap the zstd output stream in a buffered output stream, so that we can
 // avoid overhead excessive of JNI call while trying to compress small 
amount of data.
-val os = new ZstdOutputStreamNoFinalizer(s, bufferPool).setLevel(level)
+val os = new ZstdOutputStreamNoFinalizer(s, 
bufferPool).setLevel(level).setWorkers(workers)

Review Comment:
   0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


beliefer commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415430138


##
core/src/main/scala/org/apache/spark/io/CompressionCodec.scala:
##
@@ -233,18 +233,22 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 NoPool.INSTANCE
   }
 
+  private val workers = conf.get(IO_COMPRESSION_ZSTD_WORKERS)
+
   override def compressedOutputStream(s: OutputStream): OutputStream = {
 // Wrap the zstd output stream in a buffered output stream, so that we can
 // avoid overhead excessive of JNI call while trying to compress small 
amount of data.
-val os = new ZstdOutputStreamNoFinalizer(s, bufferPool).setLevel(level)
+val os = new ZstdOutputStreamNoFinalizer(s, 
bufferPool).setLevel(level).setWorkers(workers)

Review Comment:
   Late LGTM.
   How much is the default worker num if not specify the worker num?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on PR #44172:
URL: https://github.com/apache/spark/pull/44172#issuecomment-1840545492

   Thank you @dongjoon-hyun @pan3793 @LuciferYang.
   
   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


yaooqinn closed pull request #44172: [SPARK-46256][CORE] Parallel Compression 
Support for ZSTD
URL: https://github.com/apache/spark/pull/44172


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415169458


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray
+
+Seq(3, 9).foreach { level =>
+  val benchmark = new Benchmark(s"Parallel Compression at level $level", 
N, output = output)
+  Seq(0, 1, 2, 4, 8, 16).foreach { workers =>

Review Comment:
   The theads max might also be capped by the underlying zstd lib based 
different OS environments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415166902


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray
+
+Seq(3, 9).foreach { level =>
+  val benchmark = new Benchmark(s"Parallel Compression at level $level", 
N, output = output)
+  Seq(0, 1, 2, 4, 8, 16).foreach { workers =>

Review Comment:
   Based on my observations, we perform best when the value approximately 
equals the total available cores.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415159705


##
docs/configuration.md:
##
@@ -1760,6 +1760,17 @@ Apart from these, the following properties are also 
available, and may be useful
   
   2.3.0
 
+
+  spark.io.compression.zstd.workers
+  8

Review Comment:
   Thanks, I will push it along with the benchmarking results. However, I don't 
have sufficient Google Analytics resources for frequent git-push.. It turns out 
I don't have sufficient GA resources for frequent git-push.



##
docs/configuration.md:
##
@@ -1760,6 +1760,17 @@ Apart from these, the following properties are also 
available, and may be useful
   
   2.3.0
 
+
+  spark.io.compression.zstd.workers
+  8

Review Comment:
   Thanks, I will push it along with the benchmarking results. It turns out I 
don't have sufficient GA resources for frequent git-push.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415151944


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray
+
+Seq(3, 9).foreach { level =>
+  val benchmark = new Benchmark(s"Parallel Compression at level $level", 
N, output = output)
+  Seq(0, 1, 2, 4, 8, 16).foreach { workers =>

Review Comment:
   In any way, it's good to have this controllability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415150418


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray
+
+Seq(3, 9).foreach { level =>
+  val benchmark = new Benchmark(s"Parallel Compression at level $level", 
N, output = output)
+  Seq(0, 1, 2, 4, 8, 16).foreach { workers =>

Review Comment:
   BTW, according to the result, the best performance is 4-worker? It seems to 
be a little difficult to find the best fit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


pan3793 commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415126822


##
docs/configuration.md:
##
@@ -1760,6 +1760,17 @@ Apart from these, the following properties are also 
available, and may be useful
   
   2.3.0
 
+
+  spark.io.compression.zstd.workers
+  8

Review Comment:
   nit: inconsistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415122415


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray
+
+Seq(3, 9).foreach { level =>
+  val benchmark = new Benchmark(s"Parallel Compression at level $level", 
N, output = output)
+  Seq(0, 1, 2, 4, 8, 16).foreach { workers =>

Review Comment:
   ```
   OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
   AMD EPYC 7763 64-Core Processor
   Parallel Compression at level 9:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   

   Parallel Compression with 0 workers 176179   
2  0.0 1375722.8   1.0X
   Parallel Compression with 1 workers 187189   
4  0.0 1457923.8   0.9X
   Parallel Compression with 2 workers 110119   
   13  0.0  858340.6   1.6X
   Parallel Compression with 4 workers 106109   
2  0.0  826473.0   1.7X
   Parallel Compression with 8 workers 110111   
1  0.0  856810.2   1.6X
   Parallel Compression with 16 workers110113   
5  0.0  857306.3   1.6X
   ```
   
   I have gotten a partial result in GA, it looks fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


LuciferYang commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415092650


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray
+
+Seq(3, 9).foreach { level =>
+  val benchmark = new Benchmark(s"Parallel Compression at level $level", 
N, output = output)
+  Seq(0, 1, 2, 4, 8, 16).foreach { workers =>

Review Comment:
   IIRC, the GitHub Action runner only has 2 CPU cores, and I'm not sure if 
testing with more workers at this time would yield meaningful test data(Will 
having more workers lead to the CPU being busy with context switching?).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


LuciferYang commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415092650


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray
+
+Seq(3, 9).foreach { level =>
+  val benchmark = new Benchmark(s"Parallel Compression at level $level", 
N, output = output)
+  Seq(0, 1, 2, 4, 8, 16).foreach { workers =>

Review Comment:
   IIRC, the GitHub Action runner only has 2 CPU cores, and I'm not sure if 
testing with more workers at this time would yield meaningful test data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-05 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415059445


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128

Review Comment:
   Never mind. I saw the new variable, `val numberOfLargeObjectToWrite = 128`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1415054415


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128

Review Comment:
   Oh, no. I thought we can use the following instead of this.
   
https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala#L39



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414909456


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128

Review Comment:
   Do you mean renaming it?



##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128

Review Comment:
   Do you mean renaming it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414907623


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -49,6 +49,7 @@ object ZStandardBenchmark extends BenchmarkBase {
   val benchmark2 = new Benchmark(name, N, output = output)
   decompressionBenchmark(benchmark2, N)
   benchmark2.run()
+  parallelCompressionBenchmark()

Review Comment:
   Ya, I will update them after CI jobs finished



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414906621


##
core/benchmarks/ZStandardBenchmark-results.txt:
##
@@ -2,26 +2,26 @@
 Benchmark ZStandardCompressionCodec
 

 
-OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 17.0.9+0 on Mac OS X 14.1.2
+Apple M2 Max

Review Comment:
   Ya, it's pending on the resouces



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414906299


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1910,6 +1910,16 @@ package object config {
   .booleanConf
   .createWithDefault(true)
 
+  private[spark] val IO_COMPRESSION_ZSTD_WORKERS =
+ConfigBuilder("spark.io.compression.zstd.workers")
+  .doc("Thread size spawned to compress in parallel when using Zstd. When 
value <= 0, " +
+"no worker is spawned, it works in single-threaded mode. When value > 
0, it triggers " +
+"asynchronous mode, corresponding number of threads are spawned. More 
workers improve " +
+"performance, but also increase memory cost.")
+  .version("4.0.0")
+  .intConf
+  .createWithDefault(8)

Review Comment:
   > IMO it does not fit the Spark executor's thread-based parallel computing 
model.
   
   I can't entirely agree with your opinion on confusing Spark executors' cores 
with threads. But I'm okay with 0 as the default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


yaooqinn commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414903391


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray
+
+Seq(3, 9).foreach { level =>
+  val benchmark = new Benchmark(s"Parallel Compression at level $level", 
N, output = output)
+  Seq(0, 1, 2, 4, 8, 16).foreach { workers =>

Review Comment:
   2 ~ 3 secs for case 0 on my mac



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414898758


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1910,6 +1910,16 @@ package object config {
   .booleanConf
   .createWithDefault(true)
 
+  private[spark] val IO_COMPRESSION_ZSTD_WORKERS =
+ConfigBuilder("spark.io.compression.zstd.workers")
+  .doc("Thread size spawned to compress in parallel when using Zstd. When 
value <= 0, " +
+"no worker is spawned, it works in single-threaded mode. When value > 
0, it triggers " +
+"asynchronous mode, corresponding number of threads are spawned. More 
workers improve " +
+"performance, but also increase memory cost.")
+  .version("4.0.0")
+  .intConf
+  .createWithDefault(8)

Review Comment:
   Ya, we should use `0` by default because this has a side-effect on both CPU 
cycle and native Memory usage. The production job can have perf regressions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414897758


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray

Review Comment:
   This is `Array[Int]`. I guess you wanted to make 128MB size, but this looks 
like bigger than 512MB.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414895612


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128

Review Comment:
   If you don't mind, please don't override this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414895115


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -101,4 +102,28 @@ object ZStandardBenchmark extends BenchmarkBase {
   }
 }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+val N = 128
+val data = (1 until 128 * 1024 * 1024).toArray
+
+Seq(3, 9).foreach { level =>
+  val benchmark = new Benchmark(s"Parallel Compression at level $level", 
N, output = output)
+  Seq(0, 1, 2, 4, 8, 16).foreach { workers =>

Review Comment:
   How long does it take?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414894496


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1910,6 +1910,16 @@ package object config {
   .booleanConf
   .createWithDefault(true)
 
+  private[spark] val IO_COMPRESSION_ZSTD_WORKERS =
+ConfigBuilder("spark.io.compression.zstd.workers")
+  .doc("Thread size spawned to compress in parallel when using Zstd. When 
value <= 0, " +

Review Comment:
   With `checkValue`, this can be `When value is 0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414894192


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1910,6 +1910,16 @@ package object config {
   .booleanConf
   .createWithDefault(true)
 
+  private[spark] val IO_COMPRESSION_ZSTD_WORKERS =
+ConfigBuilder("spark.io.compression.zstd.workers")
+  .doc("Thread size spawned to compress in parallel when using Zstd. When 
value <= 0, " +
+"no worker is spawned, it works in single-threaded mode. When value > 
0, it triggers " +
+"asynchronous mode, corresponding number of threads are spawned. More 
workers improve " +
+"performance, but also increase memory cost.")
+  .version("4.0.0")
+  .intConf

Review Comment:
   Please add `checkValue`.
   ```scala
.checkValue(_ >= 0, "description")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414892735


##
core/benchmarks/ZStandardBenchmark-results.txt:
##
@@ -2,26 +2,26 @@
 Benchmark ZStandardCompressionCodec
 

 
-OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 17.0.9+0 on Mac OS X 14.1.2
+Apple M2 Max

Review Comment:
   You can regenerate the result in your CI.
   - https://github.com/apache/spark/actions/workflows/benchmark.yml



##
core/benchmarks/ZStandardBenchmark-results.txt:
##
@@ -2,26 +2,26 @@
 Benchmark ZStandardCompressionCodec
 

 
-OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 17.0.9+0 on Mac OS X 14.1.2
+Apple M2 Max
 Benchmark ZStandardCompressionCodec:Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
--
-Compression 1 times at level 1 without buffer pool657  
  659   2  0.0   65716.9   1.0X
-Compression 1 times at level 2 without buffer pool706  
  707   1  0.0   70617.1   0.9X
-Compression 1 times at level 3 without buffer pool788  
  788   0  0.0   78755.6   0.8X
-Compression 1 times at level 1 with buffer pool   585  
  586   1  0.0   58455.1   1.1X
-Compression 1 times at level 2 with buffer pool   614  
  616   1  0.0   61437.2   1.1X
-Compression 1 times at level 3 with buffer pool   717  
  717   0  0.0   71705.1   0.9X
+Compression 1 times at level 1 without buffer pool   1573  
 1576   4  0.0  157331.9   1.0X
+Compression 1 times at level 2 without buffer pool   1907  
 1972  92  0.0  190663.7   0.8X
+Compression 1 times at level 3 without buffer pool   2004  
 2038  49  0.0  200351.7   0.8X
+Compression 1 times at level 1 with buffer pool  2223  
 2278  77  0.0  222348.1   0.7X
+Compression 1 times at level 2 with buffer pool  2376  
 2384  11  0.0  237598.2   0.7X
+Compression 1 times at level 3 with buffer pool  2357  
 2372  21  0.0  235745.3   0.7X
 
-OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 17.0.9+0 on Mac OS X 14.1.2
+Apple M2 Max

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


dongjoon-hyun commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414893432


##
core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala:
##
@@ -49,6 +49,7 @@ object ZStandardBenchmark extends BenchmarkBase {
   val benchmark2 = new Benchmark(name, N, output = output)
   decompressionBenchmark(benchmark2, N)
   benchmark2.run()
+  parallelCompressionBenchmark()

Review Comment:
   The attached benchmark result didn't have this, does it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


pan3793 commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414881447


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1910,6 +1910,16 @@ package object config {
   .booleanConf
   .createWithDefault(true)
 
+  private[spark] val IO_COMPRESSION_ZSTD_WORKERS =
+ConfigBuilder("spark.io.compression.zstd.workers")
+  .doc("Thread size spawned to compress in parallel when using Zstd. When 
value <= 0, " +
+"no worker is spawned, it works in single-threaded mode. When value > 
0, it triggers " +
+"asynchronous mode, corresponding number of threads are spawned. More 
workers improve " +
+"performance, but also increase memory cost.")
+  .version("4.0.0")
+  .intConf
+  .createWithDefault(8)

Review Comment:
   This does help for those IO-heavy but uses less CPU workloads, but I think 
we should use the single thread in default, otherwise, it would make task 
shuffle writing occupy additional CPU(threads), IMO it does not fit the Spark 
executor's thread-based parallel computing model.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46256][CORE] Parallel Compression Support for ZSTD [spark]

2023-12-04 Thread via GitHub


pan3793 commented on code in PR #44172:
URL: https://github.com/apache/spark/pull/44172#discussion_r1414881447


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1910,6 +1910,16 @@ package object config {
   .booleanConf
   .createWithDefault(true)
 
+  private[spark] val IO_COMPRESSION_ZSTD_WORKERS =
+ConfigBuilder("spark.io.compression.zstd.workers")
+  .doc("Thread size spawned to compress in parallel when using Zstd. When 
value <= 0, " +
+"no worker is spawned, it works in single-threaded mode. When value > 
0, it triggers " +
+"asynchronous mode, corresponding number of threads are spawned. More 
workers improve " +
+"performance, but also increase memory cost.")
+  .version("4.0.0")
+  .intConf
+  .createWithDefault(8)

Review Comment:
   This does help for those IO-heavy workloads but uses less CPU, but I think 
we should use the single thread in default, otherwise, it would make task 
shuffle writing occupy additional CPU(threads), IMO it does not fit the Spark 
executor's thread-based parallel computing model.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org