With DEBUG, the log output was over 10MB, so I opted for just INFO output. The (sanitized) log is attached.
The driver is essentially this code: info("A") val t = System.currentTimeMillis val df = sqlContext.read.parquet(dir).select(...).cache val elapsed = System.currentTimeMillis - t info(s"Init time: ${elapsed} ms") We've also observed that it is very slow to read the contents of the parquet files. My colleague wrote a PySpark application that gets the list of files, parallelizes it, maps across it and reads each file manually using a C parquet library, and aggregates manually in the loop. Ignoring the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame query in Scala, his is an order of magnitude faster. Since he is parallelizing the work through Spark, and that isn't causing any performance issues, it seems to be a problem with the parquet reader. I may try to do what he did to construct a DataFrame manually, and see if I can query it with Spark SQL with reasonable performance. - Philip On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian <lian.cs....@gmail.com> wrote: > Would you mind to provide the driver log? > > > On 8/6/15 3:58 PM, Philip Weaver wrote: > > I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried > again. > > The initialization time is about 1 minute now, which is still pretty > terrible. > > On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver <philip.wea...@gmail.com> > wrote: > >> Absolutely, thanks! >> >> On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian < <lian.cs....@gmail.com> >> lian.cs....@gmail.com> wrote: >> >>> We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396 >>> >>> Could you give it a shot to see whether it helps in your case? We've >>> observed ~50x performance boost with schema merging turned on. >>> >>> Cheng >>> >>> >>> On 8/6/15 8:26 AM, Philip Weaver wrote: >>> >>> I have a parquet directory that was produced by partitioning by two >>> keys, e.g. like this: >>> >>> df.write.partitionBy("a", "b").parquet("asdf") >>> >>> >>> There are 35 values of "a", and about 1100-1200 values of "b" for each >>> value of "a", for a total of over 40,000 partitions. >>> >>> Before running any transformations or actions on the DataFrame, just >>> initializing it like this takes *2 minutes*: >>> >>> val df = sqlContext.read.parquet("asdf") >>> >>> >>> Is this normal? Is this because it is doing some bookeeping to discover >>> all the partitions? Is it perhaps having to merge the schema from each >>> partition? Would you expect it to get better or worse if I subpartition by >>> another key? >>> >>> - Philip >>> >>> >>> >>> >> > >
10:51:42 INFO spark.SparkContext: Running Spark version 1.5.0-SNAPSHOT 10:51:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 10:51:42 INFO spark.SecurityManager: Changing view acls to: pweaver 10:51:42 INFO spark.SecurityManager: Changing modify acls to: pweaver 10:51:42 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pweaver); users with modify permissions: Set(pweaver) 10:51:43 INFO slf4j.Slf4jLogger: Slf4jLogger started 10:51:43 INFO Remoting: Starting remoting 10:51:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.26.21.70:51400] 10:51:43 INFO util.Utils: Successfully started service 'sparkDriver' on port 51400. 10:51:43 INFO spark.SparkEnv: Registering MapOutputTracker 10:51:43 INFO spark.SparkEnv: Registering BlockManagerMaster 10:51:43 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-04438917-93ee-45f3-bc10-c5f5eb3d6a4a 10:51:43 INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB 10:51:43 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-faec22af-bb2d-4fae-8a02-b8ca67867858/httpd-50939810-7da7-42d9-9342-48d9dc2705dc 10:51:43 INFO spark.HttpServer: Starting HTTP Server 10:51:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 10:51:43 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55227 10:51:43 INFO util.Utils: Successfully started service 'HTTP file server' on port 55227. 10:51:43 INFO spark.SparkEnv: Registering OutputCommitCoordinator 10:51:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 10:51:43 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 10:51:43 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 10:51:43 INFO ui.SparkUI: Started SparkUI at http://172.26.21.70:4040 10:51:43 INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark-assembly-1.0-deps.jar at http://172.26.21.70:55227/jars/linear_spark-assembly-1.0-deps.jar with timestamp 1438883503937 10:51:43 INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark_2.11-1.0.jar at http://172.26.21.70:55227/jars/linear_spark_2.11-1.0.jar with timestamp 1438883503940 10:51:44 WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 10:51:44 INFO mesos.CoarseMesosSchedulerBackend: Registered as framework ID 20150805-154619-1209342636-5050-54362-0063 10:51:44 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56805. 10:51:44 INFO netty.NettyBlockTransferService: Server created on 56805 10:51:44 INFO storage.BlockManagerMaster: Trying to register BlockManager 10:51:44 INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.26.21.70:56805 with 530.0 MB RAM, BlockManagerId(driver, 172.26.21.70, 56805) 10:51:44 INFO storage.BlockManagerMaster: Registered BlockManager 10:51:44 INFO mesos.CoarseMesosSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 10:51:44 INFO App$: A 10:51:44 INFO parquet.ParquetRelation: Constructing HadoopFsRelation 10:51:44 INFO parquet.ParquetRelation: Listing file:/home/pweaver/work/parquet on driver 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories in parallel under: file:/home/pweaver/work/parquet/day=20150225, file:/home/pweaver/work/parquet/day=20150226, file:/home/pweaver/work/parquet/day=20150224, file:/home/pweaver/work/parquet/day=20150223, file:/home/pweaver/work/parquet/day=20150303, file:/home/pweaver/work/parquet/day=20150304, file:/home/pweaver/work/parquet/day=20150306, file:/home/pweaver/work/parquet/day=20150305, file:/home/pweaver/work/parquet/day=20150227, file:/home/pweaver/work/parquet/day=20150228, file:/home/pweaver/work/parquet/day=20150302, file:/home/pweaver/work/parquet/day=20150301, file:/home/pweaver/work/parquet/day=20150307, file:/home/pweaver/work/parquet/day=20150308, file:/home/pweaver/work/parquet/day=20150309, file:/home/pweaver/work/parquet/day=20150310, file:/home/pweaver/work/parquet/day=20150311, file:/home/pweaver/work/parquet/day=20150312, file:/home/pweaver/work/parquet/day=20150313, file:/home/pweaver/work/parquet/day=20150314, file:/home/pweaver/work/parquet/day=20150315, file:/home/pweaver/work/parquet/day=20150316, file:/home/pweaver/work/parquet/day=20150317, file:/home/pweaver/work/parquet/day=20150318, file:/home/pweaver/work/parquet/day=20150319, file:/home/pweaver/work/parquet/day=20150320, file:/home/pweaver/work/parquet/day=20150321, file:/home/pweaver/work/parquet/day=20150322, file:/home/pweaver/work/parquet/day=20150323, file:/home/pweaver/work/parquet/day=20150324, file:/home/pweaver/work/parquet/day=20150325, file:/home/pweaver/work/parquet/day=20150326, file:/home/pweaver/work/parquet/day=20150327, file:/home/pweaver/work/parquet/day=20150328, file:/home/pweaver/work/parquet/day=20150329 10:51:45 INFO spark.SparkContext: Starting job: parquet at App.scala:182 10:51:45 INFO scheduler.DAGScheduler: Got job 0 (parquet at App.scala:182) with 2 output partitions 10:51:45 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(parquet at App.scala:182) 10:51:45 INFO scheduler.DAGScheduler: Parents of final stage: List() 10:51:45 INFO scheduler.DAGScheduler: Missing parents: List() 10:51:45 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at parquet at App.scala:182), which has no missing parents 10:51:45 INFO storage.MemoryStore: ensureFreeSpace(63016) called with curMem=0, maxMem=555755765 10:51:45 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.5 KB, free 529.9 MB) 10:51:45 INFO storage.MemoryStore: ensureFreeSpace(21109) called with curMem=63016, maxMem=555755765 10:51:45 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.6 KB, free 529.9 MB) 10:51:45 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.26.21.70:56805 (size: 20.6 KB, free: 530.0 MB) 10:51:45 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:844 10:51:45 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at parquet at App.scala:182) 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now TASK_RUNNING 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 1 is now TASK_RUNNING 10:51:54 INFO mesos.CoarseMesosSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@lindevspark5:42562/user/Executor#-1676233070]) with ID 20150727-154221-1175788204-5050-20841-S3/0 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes) 10:51:54 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, lindevspark5, PROCESS_LOCAL, 3147 bytes) 10:51:54 INFO storage.BlockManagerMasterEndpoint: Registering block manager lindevspark5:54383 with 10.4 GB RAM, BlockManagerId(20150727-154221-1175788204-5050-20841-S3/0, lindevspark5, 54383) 10:51:55 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on lindevspark5:54383 (size: 20.6 KB, free: 10.4 GB) 10:51:55 INFO mesos.CoarseMesosSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@lindevspark4:36457/user/Executor#1540365137]) with ID 20150803-141203-1192565420-5050-4560-S1/1 10:51:56 INFO storage.BlockManagerMasterEndpoint: Registering block manager lindevspark4:41180 with 10.4 GB RAM, BlockManagerId(20150803-141203-1192565420-5050-4560-S1/1, lindevspark4, 41180) 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2) 10:52:31 INFO scheduler.DAGScheduler: ResultStage 0 (parquet at App.scala:182) finished in 45.929 s 10:52:31 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 36839 ms on lindevspark5 (2/2) 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 10:52:31 INFO scheduler.DAGScheduler: Job 0 finished: parquet at App.scala:182, took 46.282639 s 10:52:31 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 172.26.21.70:56805 in memory (size: 20.6 KB, free: 530.0 MB) 10:52:31 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on lindevspark5:54383 in memory (size: 20.6 KB, free: 10.4 GB) 10:52:32 INFO spark.SparkContext: Starting job: parquet at App.scala:182 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 8 output partitions 10:52:32 INFO scheduler.DAGScheduler: Final stage: ResultStage 1(parquet at App.scala:182) 10:52:32 INFO scheduler.DAGScheduler: Parents of final stage: List() 10:52:32 INFO scheduler.DAGScheduler: Missing parents: List() 10:52:32 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at parquet at App.scala:182), which has no missing parents 10:52:32 INFO storage.MemoryStore: ensureFreeSpace(62928) called with curMem=0, maxMem=555755765 10:52:32 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 61.5 KB, free 529.9 MB) 10:52:32 INFO storage.MemoryStore: ensureFreeSpace(21070) called with curMem=62928, maxMem=555755765 10:52:32 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.6 KB, free 529.9 MB) 10:52:32 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.26.21.70:56805 (size: 20.6 KB, free: 530.0 MB) 10:52:32 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:844 10:52:32 INFO scheduler.DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at parquet at App.scala:182) 10:52:32 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 8 tasks 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 5, lindevspark5, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 1.0 (TID 6, lindevspark4, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 1.0 (TID 7, lindevspark5, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 1.0 (TID 8, lindevspark4, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 7.0 in stage 1.0 (TID 9, lindevspark5, PROCESS_LOCAL, 2113 bytes) 10:52:32 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on lindevspark5:54383 (size: 20.6 KB, free: 10.4 GB) 10:52:32 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 1.0 (TID 5) in 320 ms on lindevspark5 (1/8) 10:52:32 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 336 ms on lindevspark5 (2/8) 10:52:32 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 1.0 (TID 7) in 323 ms on lindevspark5 (3/8) 10:52:33 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on lindevspark4:41180 (size: 20.6 KB, free: 10.4 GB) 10:52:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 1546 ms on lindevspark4 (4/8) 10:52:34 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 4) in 1539 ms on lindevspark4 (5/8) 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) in 1527 ms on lindevspark4 (6/8) 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) in 1533 ms on lindevspark4 (7/8) 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) in 2886 ms on lindevspark5 (8/8) 10:52:35 INFO scheduler.DAGScheduler: ResultStage 1 (parquet at App.scala:182) finished in 2.913 s 10:52:35 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 10:52:35 INFO scheduler.DAGScheduler: Job 1 finished: parquet at App.scala:182, took 2.962674 s 10:52:36 INFO datasources.DataSourceStrategy: Selected 41609 partitions out of 41609, pruned 0.0% partitions. 10:52:36 INFO storage.MemoryStore: ensureFreeSpace(63280) called with curMem=83998, maxMem=555755765 10:52:36 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 61.8 KB, free 529.9 MB) 10:52:36 INFO storage.MemoryStore: ensureFreeSpace(19783) called with curMem=147278, maxMem=555755765 10:52:36 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.3 KB, free 529.9 MB) 10:52:36 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.26.21.70:56805 (size: 19.3 KB, free: 530.0 MB) 10:52:36 INFO spark.SparkContext: Created broadcast 2 from cache at App.scala:184 10:52:52 INFO simmons_mri.App$: Init time: 67429 ms 10:52:52 INFO spark.SparkContext: Invoking stop() from shutdown hook 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null} 10:52:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null} 10:52:52 INFO ui.SparkUI: Stopped Spark web UI at http://172.26.21.70:4040 10:52:52 INFO scheduler.DAGScheduler: Stopping DAGScheduler 10:52:52 INFO mesos.CoarseMesosSchedulerBackend: Shutting down all executors 10:52:52 INFO mesos.CoarseMesosSchedulerBackend: Asking each executor to shut down 10:52:52 INFO mesos.CoarseMesosSchedulerBackend: driver.run() returned with code DRIVER_STOPPED 10:52:52 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 10:52:52 INFO storage.MemoryStore: MemoryStore cleared 10:52:52 INFO storage.BlockManager: BlockManager stopped 10:52:52 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 10:52:52 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 10:52:52 INFO spark.SparkContext: Successfully stopped SparkContext 10:52:52 INFO util.Utils: Shutdown hook called 10:52:52 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 10:52:52 INFO util.Utils: Deleting directory /tmp/spark-faec22af-bb2d-4fae-8a02-b8ca67867858 10:52:52 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org