Github user bbossy commented on the issue:
https://github.com/apache/spark/pull/18269
@gatorsmile :
I ran a synthetic scenario to show what changes, since deploying this
branch would be more involved.
I created two very simple relations on a small HDFS cluster (4 datanodes).
Running spark with master `local[16]`. The machine has 16 physical cores plus
16 hyper threaded. The namenode is on a remote machine in the same network.
### Setup:
```
scala> case class Foo(a: Int, b: Int, c: Int, d: Int)
defined class Foo
// manypartitions: 4 times 4 times 100 partitions. Parallel listing kicks
in listing level 'c' both without and with this PR
scala> val data = for {
| a <- 1.to(4)
| b <- 1.to(4)
| c <- 1.to(100)
| } yield Foo(a,b,c,100)
data: scala.collection.immutable.IndexedSeq[Foo] = Vector(Foo(1,1,1,100),
...
scala> data.toDS.write.partitionBy("a", "b",
"c").parquet("hdfs://namenode/user/bbossy/manypartitions")
// morepartitions: 10 times 10 times 100 partitions. Before this PR, 100
parallel listing jobs are spawned to list c, since each b contains more
directories than the threshold.
// With this PR, one parallel listing job is spawned to list all b
partitions, since at level b, there are more paths to list than the threshold.
scala> val data = for {
| a <- 1.to(10)
| b <- 1.to(10)
| c <- 1.to(100)
| } yield Foo(a,b,c,1000)
data: scala.collection.immutable.IndexedSeq[Foo] = Vector(Foo(1,1,1,1000),
...
scala> data.toDS.write.partitionBy("a", "b",
"c").parquet("hdfs://namenode/user/bbossy/morepartitions")
```
### Using master branch before my commits:
```
scala> :pa
// Entering paste mode (ctrl-D to finish)
def time[R](block: => R): R = {
val t0 = System.currentTimeMillis()
val result = block
println("Elapsed time: " + (System.currentTimeMillis - t0) + "ms")
result
}
// Exiting paste mode, now interpreting.
time: [R](block: => R)R
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 6506ms
res2: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 2905ms
res3: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 2744ms
res4: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 2683ms
res5: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 16068ms
res6: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 16047ms
res7: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 15691ms
res8: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 15767ms
res9: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>
```
UI timeline:
<img width="1875" alt="screen shot 2017-06-12 at 13 51 48"
src="https://user-images.githubusercontent.com/4428853/27032686-687eeeb2-4f76-11e7-9849-745e19f234f8.png">
### Using this PR:
```
// omitting def time...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 6790ms
res0: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 4481ms
res1: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 4465ms
res2: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 4103ms
res3: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 4717ms
res4: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 4434ms
res5: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 5219ms
res6: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 4429ms
res7: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>
```
UI timeline:
<img width="1882" alt="screen shot 2017-06-12 at 13 57 15"
src="https://user-images.githubusercontent.com/4428853/27033132-48855946-4f78-11e7-9b71-38ac7028ddb8.png">
Is there something more specific that I should look into?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]