Github user bbossy commented on the issue:

    https://github.com/apache/spark/pull/18269
  
    @gatorsmile :
    
    I ran a synthetic scenario to show what changes, since deploying this 
branch would be more involved.
    
    I created two very simple relations on a small HDFS cluster (4 datanodes). 
Running spark with master `local[16]`. The machine has 16 physical cores plus 
16 hyper threaded. The namenode is on a remote machine in the same network.
    
    ### Setup:
    ```
    scala> case class Foo(a: Int, b: Int, c: Int, d: Int)
    defined class Foo
    
    // manypartitions: 4 times 4 times 100 partitions. Parallel listing kicks 
in listing level 'c' both without and with this PR
    scala> val data = for {
         | a <- 1.to(4)
         | b <- 1.to(4)
         | c <- 1.to(100)
         | } yield Foo(a,b,c,100)
    data: scala.collection.immutable.IndexedSeq[Foo] = Vector(Foo(1,1,1,100), 
...
    
    scala> data.toDS.write.partitionBy("a", "b", 
"c").parquet("hdfs://namenode/user/bbossy/manypartitions")
    
    // morepartitions: 10 times 10 times 100 partitions. Before this PR, 100 
parallel listing jobs are spawned to list c, since each b contains more 
directories than the threshold. 
    // With this PR, one parallel listing job is spawned to list all b 
partitions, since at level b, there are more paths to list than the threshold.
    scala> val data = for {
         | a <- 1.to(10)
         | b <- 1.to(10)
         | c <- 1.to(100)
         | } yield Foo(a,b,c,1000)
    data: scala.collection.immutable.IndexedSeq[Foo] = Vector(Foo(1,1,1,1000), 
...
    scala> data.toDS.write.partitionBy("a", "b", 
"c").parquet("hdfs://namenode/user/bbossy/morepartitions")
    ```
    
    ### Using master branch before my commits:
    ```
    scala> :pa
    // Entering paste mode (ctrl-D to finish)
    
    def time[R](block: => R): R = {
      val t0 = System.currentTimeMillis()
      val result = block
      println("Elapsed time: " + (System.currentTimeMillis - t0) + "ms")
      result
    }
    
    // Exiting paste mode, now interpreting.
    
    time: [R](block: => R)R
    
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
    Elapsed time: 6506ms
    res2: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
    Elapsed time: 2905ms
    res3: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
    Elapsed time: 2744ms
    res4: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
    Elapsed time: 2683ms
    res5: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
    
    
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
    Elapsed time: 16068ms
    res6: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
    Elapsed time: 16047ms
    res7: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
    Elapsed time: 15691ms
    res8: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
    Elapsed time: 15767ms
    res9: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
    scala>
    ```
    UI timeline:
    <img width="1875" alt="screen shot 2017-06-12 at 13 51 48" 
src="https://user-images.githubusercontent.com/4428853/27032686-687eeeb2-4f76-11e7-9849-745e19f234f8.png";>
    
    
    ### Using this PR:
    ```
    // omitting def time...
    
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
    Elapsed time: 6790ms
    res0: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
    Elapsed time: 4481ms
    res1: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
    Elapsed time: 4465ms
    res2: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
    Elapsed time: 4103ms
    res3: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
    
    
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
    Elapsed time: 4717ms
    res4: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
    Elapsed time: 4434ms
    res5: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
    Elapsed time: 5219ms
    res6: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
    scala> 
time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
    Elapsed time: 4429ms
    res7: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
    scala>
    ```
    UI timeline:
    <img width="1882" alt="screen shot 2017-06-12 at 13 57 15" 
src="https://user-images.githubusercontent.com/4428853/27033132-48855946-4f78-11e7-9b71-38ac7028ddb8.png";>
    
    Is there something more specific that I should look into?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to