With DEBUG, the log output was over 10MB, so I opted for just INFO output.
The (sanitized) log is attached.

The driver is essentially this code:

    info("A")

    val t = System.currentTimeMillis
    val df = sqlContext.read.parquet(dir).select(...).cache

    val elapsed = System.currentTimeMillis - t
    info(s"Init time: ${elapsed} ms")

We've also observed that it is very slow to read the contents of the
parquet files. My colleague wrote a PySpark application that gets the list
of files, parallelizes it, maps across it and reads each file manually
using a C parquet library, and aggregates manually in the loop. Ignoring
the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
query in Scala, his is an order of magnitude faster. Since he is
parallelizing the work through Spark, and that isn't causing any
performance issues, it seems to be a problem with the parquet reader. I may
try to do what he did to construct a DataFrame manually, and see if I can
query it with Spark SQL with reasonable performance.

- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian <lian.cs....@gmail.com> wrote:

> Would you mind to provide the driver log?
>
>
> On 8/6/15 3:58 PM, Philip Weaver wrote:
>
> I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
> again.
>
> The initialization time is about 1 minute now, which is still pretty
> terrible.
>
> On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver <philip.wea...@gmail.com>
> wrote:
>
>> Absolutely, thanks!
>>
>> On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian < <lian.cs....@gmail.com>
>> lian.cs....@gmail.com> wrote:
>>
>>> We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396
>>>
>>> Could you give it a shot to see whether it helps in your case? We've
>>> observed ~50x performance boost with schema merging turned on.
>>>
>>> Cheng
>>>
>>>
>>> On 8/6/15 8:26 AM, Philip Weaver wrote:
>>>
>>> I have a parquet directory that was produced by partitioning by two
>>> keys, e.g. like this:
>>>
>>> df.write.partitionBy("a", "b").parquet("asdf")
>>>
>>>
>>> There are 35 values of "a", and about 1100-1200 values of "b" for each
>>> value of "a", for a total of over 40,000 partitions.
>>>
>>> Before running any transformations or actions on the DataFrame, just
>>> initializing it like this takes *2 minutes*:
>>>
>>> val df = sqlContext.read.parquet("asdf")
>>>
>>>
>>> Is this normal? Is this because it is doing some bookeeping to discover
>>> all the partitions? Is it perhaps having to merge the schema from each
>>> partition? Would you expect it to get better or worse if I subpartition by
>>> another key?
>>>
>>> - Philip
>>>
>>>
>>>
>>>
>>
>
>
10:51:42  INFO spark.SparkContext: Running Spark version 1.5.0-SNAPSHOT
10:51:42  WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:51:42  INFO spark.SecurityManager: Changing view acls to: pweaver
10:51:42  INFO spark.SecurityManager: Changing modify acls to: pweaver
10:51:42  INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pweaver); users with modify permissions: Set(pweaver)
10:51:43  INFO slf4j.Slf4jLogger: Slf4jLogger started
10:51:43  INFO Remoting: Starting remoting
10:51:43  INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.26.21.70:51400]
10:51:43  INFO util.Utils: Successfully started service 'sparkDriver' on port 51400.
10:51:43  INFO spark.SparkEnv: Registering MapOutputTracker
10:51:43  INFO spark.SparkEnv: Registering BlockManagerMaster
10:51:43  INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-04438917-93ee-45f3-bc10-c5f5eb3d6a4a
10:51:43  INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
10:51:43  INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-faec22af-bb2d-4fae-8a02-b8ca67867858/httpd-50939810-7da7-42d9-9342-48d9dc2705dc
10:51:43  INFO spark.HttpServer: Starting HTTP Server
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55227
10:51:43  INFO util.Utils: Successfully started service 'HTTP file server' on port 55227.
10:51:43  INFO spark.SparkEnv: Registering OutputCommitCoordinator
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
10:51:43  INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
10:51:43  INFO ui.SparkUI: Started SparkUI at http://172.26.21.70:4040
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark-assembly-1.0-deps.jar at http://172.26.21.70:55227/jars/linear_spark-assembly-1.0-deps.jar with timestamp 1438883503937
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark_2.11-1.0.jar at http://172.26.21.70:55227/jars/linear_spark_2.11-1.0.jar with timestamp 1438883503940
10:51:44  WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
10:51:44  INFO mesos.CoarseMesosSchedulerBackend: Registered as framework ID 20150805-154619-1209342636-5050-54362-0063
10:51:44  INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56805.
10:51:44  INFO netty.NettyBlockTransferService: Server created on 56805
10:51:44  INFO storage.BlockManagerMaster: Trying to register BlockManager
10:51:44  INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.26.21.70:56805 with 530.0 MB RAM, BlockManagerId(driver, 172.26.21.70, 56805)
10:51:44  INFO storage.BlockManagerMaster: Registered BlockManager
10:51:44  INFO mesos.CoarseMesosSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
10:51:44  INFO App$: A
10:51:44  INFO parquet.ParquetRelation: Constructing HadoopFsRelation
10:51:44  INFO parquet.ParquetRelation: Listing file:/home/pweaver/work/parquet on driver
10:51:44  INFO sources.HadoopFsRelation: Listing leaf files and directories in parallel under: file:/home/pweaver/work/parquet/day=20150225, file:/home/pweaver/work/parquet/day=20150226, file:/home/pweaver/work/parquet/day=20150224, file:/home/pweaver/work/parquet/day=20150223, file:/home/pweaver/work/parquet/day=20150303, file:/home/pweaver/work/parquet/day=20150304, file:/home/pweaver/work/parquet/day=20150306, file:/home/pweaver/work/parquet/day=20150305, file:/home/pweaver/work/parquet/day=20150227, file:/home/pweaver/work/parquet/day=20150228, file:/home/pweaver/work/parquet/day=20150302, file:/home/pweaver/work/parquet/day=20150301, file:/home/pweaver/work/parquet/day=20150307, file:/home/pweaver/work/parquet/day=20150308, file:/home/pweaver/work/parquet/day=20150309, file:/home/pweaver/work/parquet/day=20150310, file:/home/pweaver/work/parquet/day=20150311, file:/home/pweaver/work/parquet/day=20150312, file:/home/pweaver/work/parquet/day=20150313, file:/home/pweaver/work/parquet/day=20150314, file:/home/pweaver/work/parquet/day=20150315, file:/home/pweaver/work/parquet/day=20150316, file:/home/pweaver/work/parquet/day=20150317, file:/home/pweaver/work/parquet/day=20150318, file:/home/pweaver/work/parquet/day=20150319, file:/home/pweaver/work/parquet/day=20150320, file:/home/pweaver/work/parquet/day=20150321, file:/home/pweaver/work/parquet/day=20150322, file:/home/pweaver/work/parquet/day=20150323, file:/home/pweaver/work/parquet/day=20150324, file:/home/pweaver/work/parquet/day=20150325, file:/home/pweaver/work/parquet/day=20150326, file:/home/pweaver/work/parquet/day=20150327, file:/home/pweaver/work/parquet/day=20150328, file:/home/pweaver/work/parquet/day=20150329
10:51:45  INFO spark.SparkContext: Starting job: parquet at App.scala:182
10:51:45  INFO scheduler.DAGScheduler: Got job 0 (parquet at App.scala:182) with 2 output partitions
10:51:45  INFO scheduler.DAGScheduler: Final stage: ResultStage 0(parquet at App.scala:182)
10:51:45  INFO scheduler.DAGScheduler: Parents of final stage: List()
10:51:45  INFO scheduler.DAGScheduler: Missing parents: List()
10:51:45  INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at parquet at App.scala:182), which has no missing parents
10:51:45  INFO storage.MemoryStore: ensureFreeSpace(63016) called with curMem=0, maxMem=555755765
10:51:45  INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.5 KB, free 529.9 MB)
10:51:45  INFO storage.MemoryStore: ensureFreeSpace(21109) called with curMem=63016, maxMem=555755765
10:51:45  INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.6 KB, free 529.9 MB)
10:51:45  INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.26.21.70:56805 (size: 20.6 KB, free: 530.0 MB)
10:51:45  INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:844
10:51:45  INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at parquet at App.scala:182)
10:51:45  INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
10:51:51  INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now TASK_RUNNING
10:51:51  INFO mesos.CoarseMesosSchedulerBackend: Mesos task 1 is now TASK_RUNNING
10:51:54  INFO mesos.CoarseMesosSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@lindevspark5:42562/user/Executor#-1676233070]) with ID 20150727-154221-1175788204-5050-20841-S3/0
10:51:54  INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
10:51:54  INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, lindevspark5, PROCESS_LOCAL, 3147 bytes)
10:51:54  INFO storage.BlockManagerMasterEndpoint: Registering block manager lindevspark5:54383 with 10.4 GB RAM, BlockManagerId(20150727-154221-1175788204-5050-20841-S3/0, lindevspark5, 54383)
10:51:55  INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on lindevspark5:54383 (size: 20.6 KB, free: 10.4 GB)
10:51:55  INFO mesos.CoarseMesosSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@lindevspark4:36457/user/Executor#1540365137]) with ID 20150803-141203-1192565420-5050-4560-S1/1
10:51:56  INFO storage.BlockManagerMasterEndpoint: Registering block manager lindevspark4:41180 with 10.4 GB RAM, BlockManagerId(20150803-141203-1192565420-5050-4560-S1/1, lindevspark4, 41180)
10:52:30  INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)
10:52:31  INFO scheduler.DAGScheduler: ResultStage 0 (parquet at App.scala:182) finished in 45.929 s
10:52:31  INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 36839 ms on lindevspark5 (2/2)
10:52:31  INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
10:52:31  INFO scheduler.DAGScheduler: Job 0 finished: parquet at App.scala:182, took 46.282639 s
10:52:31  INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 172.26.21.70:56805 in memory (size: 20.6 KB, free: 530.0 MB)
10:52:31  INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on lindevspark5:54383 in memory (size: 20.6 KB, free: 10.4 GB)
10:52:32  INFO spark.SparkContext: Starting job: parquet at App.scala:182
10:52:32  INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 8 output partitions
10:52:32  INFO scheduler.DAGScheduler: Final stage: ResultStage 1(parquet at App.scala:182)
10:52:32  INFO scheduler.DAGScheduler: Parents of final stage: List()
10:52:32  INFO scheduler.DAGScheduler: Missing parents: List()
10:52:32  INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at parquet at App.scala:182), which has no missing parents
10:52:32  INFO storage.MemoryStore: ensureFreeSpace(62928) called with curMem=0, maxMem=555755765
10:52:32  INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 61.5 KB, free 529.9 MB)
10:52:32  INFO storage.MemoryStore: ensureFreeSpace(21070) called with curMem=62928, maxMem=555755765
10:52:32  INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.6 KB, free 529.9 MB)
10:52:32  INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.26.21.70:56805 (size: 20.6 KB, free: 530.0 MB)
10:52:32  INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:844
10:52:32  INFO scheduler.DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at parquet at App.scala:182)
10:52:32  INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 8 tasks
10:52:32  INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32  INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
10:52:32  INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32  INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 5, lindevspark5, PROCESS_LOCAL, 2058 bytes)
10:52:32  INFO scheduler.TaskSetManager: Starting task 4.0 in stage 1.0 (TID 6, lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32  INFO scheduler.TaskSetManager: Starting task 5.0 in stage 1.0 (TID 7, lindevspark5, PROCESS_LOCAL, 2058 bytes)
10:52:32  INFO scheduler.TaskSetManager: Starting task 6.0 in stage 1.0 (TID 8, lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32  INFO scheduler.TaskSetManager: Starting task 7.0 in stage 1.0 (TID 9, lindevspark5, PROCESS_LOCAL, 2113 bytes)
10:52:32  INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on lindevspark5:54383 (size: 20.6 KB, free: 10.4 GB)
10:52:32  INFO scheduler.TaskSetManager: Finished task 3.0 in stage 1.0 (TID 5) in 320 ms on lindevspark5 (1/8)
10:52:32  INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 336 ms on lindevspark5 (2/8)
10:52:32  INFO scheduler.TaskSetManager: Finished task 5.0 in stage 1.0 (TID 7) in 323 ms on lindevspark5 (3/8)
10:52:33  INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on lindevspark4:41180 (size: 20.6 KB, free: 10.4 GB)
10:52:34  INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 1546 ms on lindevspark4 (4/8)
10:52:34  INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 4) in 1539 ms on lindevspark4 (5/8)
10:52:34  INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) in 1527 ms on lindevspark4 (6/8)
10:52:34  INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) in 1533 ms on lindevspark4 (7/8)
10:52:35  INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) in 2886 ms on lindevspark5 (8/8)
10:52:35  INFO scheduler.DAGScheduler: ResultStage 1 (parquet at App.scala:182) finished in 2.913 s
10:52:35  INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
10:52:35  INFO scheduler.DAGScheduler: Job 1 finished: parquet at App.scala:182, took 2.962674 s
10:52:36  INFO datasources.DataSourceStrategy: Selected 41609 partitions out of 41609, pruned 0.0% partitions.
10:52:36  INFO storage.MemoryStore: ensureFreeSpace(63280) called with curMem=83998, maxMem=555755765
10:52:36  INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 61.8 KB, free 529.9 MB)
10:52:36  INFO storage.MemoryStore: ensureFreeSpace(19783) called with curMem=147278, maxMem=555755765
10:52:36  INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.3 KB, free 529.9 MB)
10:52:36  INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.26.21.70:56805 (size: 19.3 KB, free: 530.0 MB)
10:52:36  INFO spark.SparkContext: Created broadcast 2 from cache at App.scala:184
10:52:52  INFO simmons_mri.App$: Init time: 67429 ms
10:52:52  INFO spark.SparkContext: Invoking stop() from shutdown hook
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
10:52:52  INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
10:52:52  INFO ui.SparkUI: Stopped Spark web UI at http://172.26.21.70:4040
10:52:52  INFO scheduler.DAGScheduler: Stopping DAGScheduler
10:52:52  INFO mesos.CoarseMesosSchedulerBackend: Shutting down all executors
10:52:52  INFO mesos.CoarseMesosSchedulerBackend: Asking each executor to shut down
10:52:52  INFO mesos.CoarseMesosSchedulerBackend: driver.run() returned with code DRIVER_STOPPED
10:52:52  INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
10:52:52  INFO storage.MemoryStore: MemoryStore cleared
10:52:52  INFO storage.BlockManager: BlockManager stopped
10:52:52  INFO storage.BlockManagerMaster: BlockManagerMaster stopped
10:52:52  INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
10:52:52  INFO spark.SparkContext: Successfully stopped SparkContext
10:52:52  INFO util.Utils: Shutdown hook called
10:52:52  INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
10:52:52  INFO util.Utils: Deleting directory /tmp/spark-faec22af-bb2d-4fae-8a02-b8ca67867858
10:52:52  INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to