[jira] [Commented] (SPARK-8437) Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
[ https://issues.apache.org/jira/browse/SPARK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572468#comment-15572468 ] Steve Loughran commented on SPARK-8437: --- Just came across by way of comments in the source. This *shouldn't* happen; the glob code ought to recognise when there is no wildcard and exit early —faster than if there was a wildcard. If its taking longer, then the full list process is making a mess of things, or somehow the result generation is playing up. If it was S3 only I'd blame the S3 APIs, but this sounds like S3 just amplifies a problem which may exist already How big was the directory where this surfaced? Was it deep, wide or deep & wide? > Using directory path without wildcard for filename slow for large number of > files with wholeTextFiles and binaryFiles > - > > Key: SPARK-8437 > URL: https://issues.apache.org/jira/browse/SPARK-8437 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 1.3.1, 1.4.0 > Environment: Ubuntu 15.04 + local filesystem > Amazon EMR + S3 + HDFS >Reporter: Ewan Leith >Assignee: Sean Owen >Priority: Minor > Fix For: 1.4.1, 1.5.0 > > > When calling wholeTextFiles or binaryFiles with a directory path with 10,000s > of files in it, Spark hangs for a few minutes before processing the files. > If you add a * to the end of the path, there is no delay. > This happens for me on Spark 1.3.1 and 1.4 on the local filesystem, HDFS, and > on S3. > To reproduce, create a directory with 50,000 files in it, then run: > val a = sc.binaryFiles("file:/path/to/files/") > a.count() > val b = sc.binaryFiles("file:/path/to/files/*") > b.count() > and monitor the different startup times. > For example, in the spark-shell these commands are pasted in together, so the > delay at f.count() is from 10:11:08 t- 10:13:29 to output "Total input paths > to process : 4", then until 10:15:42 to being processing files: > scala> val f = sc.binaryFiles("file:/home/ewan/large/") > 15/06/18 10:11:07 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:11:07 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO MemoryStore: ensureFreeSpace(17282) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:11:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:40430 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:11:08 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/ > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:13:29 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:15:42 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:15:42 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions (allowLocal=false) > 15/06/18 10:15:42 INFO DAGScheduler: Final stage: ResultStage 0(count at > :24) > 15/06/18 10:15:42 INFO DAGScheduler: Parents of final stage: List() > Adding a * to the end of the path removes the delay: > scala> val f = sc.binaryFiles("file:/home/ewan/large/*") > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(17309) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:42825 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:08:29 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/* > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:08:32 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:33 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:35 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation wi
[jira] [Commented] (SPARK-8437) Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
[ https://issues.apache.org/jira/browse/SPARK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608174#comment-14608174 ] Apache Spark commented on SPARK-8437: - User 'srowen' has created a pull request for this issue: https://github.com/apache/spark/pull/7126 > Using directory path without wildcard for filename slow for large number of > files with wholeTextFiles and binaryFiles > - > > Key: SPARK-8437 > URL: https://issues.apache.org/jira/browse/SPARK-8437 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 1.3.1, 1.4.0 > Environment: Ubuntu 15.04 + local filesystem > Amazon EMR + S3 + HDFS >Reporter: Ewan Leith >Assignee: Sean Owen >Priority: Minor > > When calling wholeTextFiles or binaryFiles with a directory path with 10,000s > of files in it, Spark hangs for a few minutes before processing the files. > If you add a * to the end of the path, there is no delay. > This happens for me on Spark 1.3.1 and 1.4 on the local filesystem, HDFS, and > on S3. > To reproduce, create a directory with 50,000 files in it, then run: > val a = sc.binaryFiles("file:/path/to/files/") > a.count() > val b = sc.binaryFiles("file:/path/to/files/*") > b.count() > and monitor the different startup times. > For example, in the spark-shell these commands are pasted in together, so the > delay at f.count() is from 10:11:08 t- 10:13:29 to output "Total input paths > to process : 4", then until 10:15:42 to being processing files: > scala> val f = sc.binaryFiles("file:/home/ewan/large/") > 15/06/18 10:11:07 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:11:07 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO MemoryStore: ensureFreeSpace(17282) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:11:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:40430 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:11:08 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/ > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:13:29 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:15:42 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:15:42 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions (allowLocal=false) > 15/06/18 10:15:42 INFO DAGScheduler: Final stage: ResultStage 0(count at > :24) > 15/06/18 10:15:42 INFO DAGScheduler: Parents of final stage: List() > Adding a * to the end of the path removes the delay: > scala> val f = sc.binaryFiles("file:/home/ewan/large/*") > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(17309) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:42825 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:08:29 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/* > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:08:32 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:33 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:35 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:08:35 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:08:35 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8437) Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
[ https://issues.apache.org/jira/browse/SPARK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14606807#comment-14606807 ] Andrew Or commented on SPARK-8437: -- Re-opening because the patch was reverted. > Using directory path without wildcard for filename slow for large number of > files with wholeTextFiles and binaryFiles > - > > Key: SPARK-8437 > URL: https://issues.apache.org/jira/browse/SPARK-8437 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 1.3.1, 1.4.0 > Environment: Ubuntu 15.04 + local filesystem > Amazon EMR + S3 + HDFS >Reporter: Ewan Leith >Assignee: Sean Owen >Priority: Minor > > When calling wholeTextFiles or binaryFiles with a directory path with 10,000s > of files in it, Spark hangs for a few minutes before processing the files. > If you add a * to the end of the path, there is no delay. > This happens for me on Spark 1.3.1 and 1.4 on the local filesystem, HDFS, and > on S3. > To reproduce, create a directory with 50,000 files in it, then run: > val a = sc.binaryFiles("file:/path/to/files/") > a.count() > val b = sc.binaryFiles("file:/path/to/files/*") > b.count() > and monitor the different startup times. > For example, in the spark-shell these commands are pasted in together, so the > delay at f.count() is from 10:11:08 t- 10:13:29 to output "Total input paths > to process : 4", then until 10:15:42 to being processing files: > scala> val f = sc.binaryFiles("file:/home/ewan/large/") > 15/06/18 10:11:07 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:11:07 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO MemoryStore: ensureFreeSpace(17282) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:11:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:40430 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:11:08 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/ > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:13:29 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:15:42 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:15:42 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions (allowLocal=false) > 15/06/18 10:15:42 INFO DAGScheduler: Final stage: ResultStage 0(count at > :24) > 15/06/18 10:15:42 INFO DAGScheduler: Parents of final stage: List() > Adding a * to the end of the path removes the delay: > scala> val f = sc.binaryFiles("file:/home/ewan/large/*") > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(17309) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:42825 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:08:29 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/* > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:08:32 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:33 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:35 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:08:35 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:08:35 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8437) Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
[ https://issues.apache.org/jira/browse/SPARK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14606721#comment-14606721 ] Andrew Or commented on SPARK-8437: -- The merged PR involves only documentation changes. I don't think there is another Spark fix that can change this, as this issue is fundamental to the underlying file system. I am closing this. > Using directory path without wildcard for filename slow for large number of > files with wholeTextFiles and binaryFiles > - > > Key: SPARK-8437 > URL: https://issues.apache.org/jira/browse/SPARK-8437 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 1.3.1, 1.4.0 > Environment: Ubuntu 15.04 + local filesystem > Amazon EMR + S3 + HDFS >Reporter: Ewan Leith >Priority: Minor > Fix For: 1.5.0, 1.4.2 > > > When calling wholeTextFiles or binaryFiles with a directory path with 10,000s > of files in it, Spark hangs for a few minutes before processing the files. > If you add a * to the end of the path, there is no delay. > This happens for me on Spark 1.3.1 and 1.4 on the local filesystem, HDFS, and > on S3. > To reproduce, create a directory with 50,000 files in it, then run: > val a = sc.binaryFiles("file:/path/to/files/") > a.count() > val b = sc.binaryFiles("file:/path/to/files/*") > b.count() > and monitor the different startup times. > For example, in the spark-shell these commands are pasted in together, so the > delay at f.count() is from 10:11:08 t- 10:13:29 to output "Total input paths > to process : 4", then until 10:15:42 to being processing files: > scala> val f = sc.binaryFiles("file:/home/ewan/large/") > 15/06/18 10:11:07 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:11:07 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO MemoryStore: ensureFreeSpace(17282) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:11:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:40430 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:11:08 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/ > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:13:29 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:15:42 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:15:42 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions (allowLocal=false) > 15/06/18 10:15:42 INFO DAGScheduler: Final stage: ResultStage 0(count at > :24) > 15/06/18 10:15:42 INFO DAGScheduler: Parents of final stage: List() > Adding a * to the end of the path removes the delay: > scala> val f = sc.binaryFiles("file:/home/ewan/large/*") > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(17309) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:42825 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:08:29 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/* > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:08:32 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:33 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:35 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:08:35 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:08:35 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-
[jira] [Commented] (SPARK-8437) Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
[ https://issues.apache.org/jira/browse/SPARK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14602600#comment-14602600 ] Apache Spark commented on SPARK-8437: - User 'srowen' has created a pull request for this issue: https://github.com/apache/spark/pull/7036 > Using directory path without wildcard for filename slow for large number of > files with wholeTextFiles and binaryFiles > - > > Key: SPARK-8437 > URL: https://issues.apache.org/jira/browse/SPARK-8437 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 1.3.1, 1.4.0 > Environment: Ubuntu 15.04 + local filesystem > Amazon EMR + S3 + HDFS >Reporter: Ewan Leith >Priority: Minor > > When calling wholeTextFiles or binaryFiles with a directory path with 10,000s > of files in it, Spark hangs for a few minutes before processing the files. > If you add a * to the end of the path, there is no delay. > This happens for me on Spark 1.3.1 and 1.4 on the local filesystem, HDFS, and > on S3. > To reproduce, create a directory with 50,000 files in it, then run: > val a = sc.binaryFiles("file:/path/to/files/") > a.count() > val b = sc.binaryFiles("file:/path/to/files/*") > b.count() > and monitor the different startup times. > For example, in the spark-shell these commands are pasted in together, so the > delay at f.count() is from 10:11:08 t- 10:13:29 to output "Total input paths > to process : 4", then until 10:15:42 to being processing files: > scala> val f = sc.binaryFiles("file:/home/ewan/large/") > 15/06/18 10:11:07 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:11:07 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO MemoryStore: ensureFreeSpace(17282) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:11:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:40430 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:11:08 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/ > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:13:29 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:15:42 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:15:42 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions (allowLocal=false) > 15/06/18 10:15:42 INFO DAGScheduler: Final stage: ResultStage 0(count at > :24) > 15/06/18 10:15:42 INFO DAGScheduler: Parents of final stage: List() > Adding a * to the end of the path removes the delay: > scala> val f = sc.binaryFiles("file:/home/ewan/large/*") > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(17309) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:42825 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:08:29 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/* > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:08:32 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:33 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:35 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:08:35 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:08:35 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8437) Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
[ https://issues.apache.org/jira/browse/SPARK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591615#comment-14591615 ] Ewan Leith commented on SPARK-8437: --- Thanks, I wasn't sure if it was Hadoop or Spark specific, initially I thought it was S3 related but it happens all over. If it is Hadoop, I don't know if it would be feasible for Spark to check if a directory has been given and add a wildcard in the background, that might not give the desired effect, but otherwise there's various doc changes to make. > Using directory path without wildcard for filename slow for large number of > files with wholeTextFiles and binaryFiles > - > > Key: SPARK-8437 > URL: https://issues.apache.org/jira/browse/SPARK-8437 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 1.3.1, 1.4.0 > Environment: Ubuntu 15.04 + local filesystem > Amazon EMR + S3 + HDFS >Reporter: Ewan Leith >Priority: Minor > > When calling wholeTextFiles or binaryFiles with a directory path with 10,000s > of files in it, Spark hangs for a few minutes before processing the files. > If you add a * to the end of the path, there is no delay. > This happens for me on Spark 1.3.1 and 1.4 on the local filesystem, HDFS, and > on S3. > To reproduce, create a directory with 50,000 files in it, then run: > val a = sc.binaryFiles("file:/path/to/files/") > a.count() > val b = sc.binaryFiles("file:/path/to/files/*") > b.count() > and monitor the different startup times. > For example, in the spark-shell these commands are pasted in together, so the > delay at f.count() is from 10:11:08 t- 10:13:29 to output "Total input paths > to process : 4", then until 10:15:42 to being processing files: > scala> val f = sc.binaryFiles("file:/home/ewan/large/") > 15/06/18 10:11:07 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:11:07 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO MemoryStore: ensureFreeSpace(17282) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:11:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:40430 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:11:08 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/ > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:13:29 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:15:42 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:15:42 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions (allowLocal=false) > 15/06/18 10:15:42 INFO DAGScheduler: Final stage: ResultStage 0(count at > :24) > 15/06/18 10:15:42 INFO DAGScheduler: Parents of final stage: List() > Adding a * to the end of the path removes the delay: > scala> val f = sc.binaryFiles("file:/home/ewan/large/*") > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(17309) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:42825 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:08:29 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/* > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:08:32 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:33 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:35 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:08:35 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:08:35 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions -- This message was sent by Atlas
[jira] [Commented] (SPARK-8437) Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
[ https://issues.apache.org/jira/browse/SPARK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591608#comment-14591608 ] Sean Owen commented on SPARK-8437: -- Yeah that's a weird one, but I'm pretty sure this is a Hadoop API phenomenon rather than Spark-related. I assume that the glob can be pushed down further rather than go manually list and expand directories remotely or something. It might not be something Spark can do anything about, but have a look. At least you could propose a doc change to suggest that the glob expression is more desirable. > Using directory path without wildcard for filename slow for large number of > files with wholeTextFiles and binaryFiles > - > > Key: SPARK-8437 > URL: https://issues.apache.org/jira/browse/SPARK-8437 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 1.3.1, 1.4.0 > Environment: Ubuntu 15.04 + local filesystem > Amazon EMR + S3 + HDFS >Reporter: Ewan Leith >Priority: Minor > > When calling wholeTextFiles or binaryFiles with a directory path with 10,000s > of files in it, Spark hangs for a few minutes before processing the files. > If you add a * to the end of the path, there is no delay. > This happens for me on Spark 1.3.1 and 1.4 on the local filesystem, HDFS, and > on S3. > To reproduce, create a directory with 50,000 files in it, then run: > val a = sc.binaryFiles("file:/path/to/files/") > a.count() > val b = sc.binaryFiles("file:/path/to/files/*") > b.count() > and monitor the different startup times. > For example, in the spark-shell these commands are pasted in together, so the > delay at f.count() is from 10:11:08 t- 10:13:29 to output "Total input paths > to process : 4", then until 10:15:42 to being processing files: > scala> val f = sc.binaryFiles("file:/home/ewan/large/") > 15/06/18 10:11:07 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:11:07 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO MemoryStore: ensureFreeSpace(17282) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:11:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:11:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:40430 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:11:08 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/ > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:13:29 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:15:42 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:15:42 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:15:42 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partitions (allowLocal=false) > 15/06/18 10:15:42 INFO DAGScheduler: Final stage: ResultStage 0(count at > :24) > 15/06/18 10:15:42 INFO DAGScheduler: Parents of final stage: List() > Adding a * to the end of the path removes the delay: > scala> val f = sc.binaryFiles("file:/home/ewan/large/*") > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(160616) called with > curMem=0, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 156.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO MemoryStore: ensureFreeSpace(17309) called with > curMem=160616, maxMem=278019440 > 15/06/18 10:08:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 16.9 KB, free 265.0 MB) > 15/06/18 10:08:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:42825 (size: 16.9 KB, free: 265.1 MB) > 15/06/18 10:08:29 INFO SparkContext: Created broadcast 0 from binaryFiles at > :21 > f: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = file:/home/ewan/large/* > BinaryFileRDD[0] at binaryFiles at :21 > scala> f.count() > 15/06/18 10:08:32 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:33 INFO FileInputFormat: Total input paths to process : 4 > 15/06/18 10:08:35 INFO CombineFileInputFormat: DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 0 > 15/06/18 10:08:35 INFO SparkContext: Starting job: count at :24 > 15/06/18 10:08:35 INFO DAGScheduler: Got job 0 (count at :24) with > 4 output partit