Hi
Assume that a list of file names exists in the file "data/Files.dat":
"
File1.ext
File2.ext
...
"
and these files exist in the directory "data". The following commands:
"
1: import java.io.file
2:
3: val filesList = sc.textFile("data/Files.dat", 2)
4:
5: val fileSizes = filesList.map(file => new File("data/" + file).length)
6:
7: fileSizes.collect.sum
"
work correctly when they are run in the Spark shell in local mode. That is,
I get the total size of the files listed in the file "data/Files.dat". (This
is not supposed to be a good example of using Spark. I was just testing
whether I understood how to use the system.) However, the same commands
return 0 on the line 7 ("fileSizes.collect.sum") when the lines are executed
in distributed mode (with one worker).
Have you any ideas what could be causing this? Running "fileSizes.collect"
in distributed mode returns an array of zeros, but mysteriously the command
"fileSizes.first" returns the correct answer (176034 > 0).
The log messages (with the logging level DEBUG) on spark-shell are as
follows. I have excluded the lines:
"
13/10/15 10:58:14 DEBUG Configuration: java.io.IOException: config(config)
at
org.apache.hadoop.conf.Configuration.<init>(Configuration.java:260)
at org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:341)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:346)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:319)
at $line4.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:13)
at $line4.$read$$iwC$$iwC$$iwC.<init>(<console>:18)
at $line4.$read$$iwC$$iwC.<init>(<console>:20)
at $line4.$read$$iwC.<init>(<console>:22)
at $line4.$read.<init>(<console>:24)
at $line4.$read$.<init>(<console>:28)
at $line4.$read$.<clinit>(<console>)
at $line4.$eval$.<init>(<console>:11)
at $line4.$eval$.<clinit>(<console>)
at $line4.$eval.$export(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39
)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629)
at
org.apache.spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:
890)
at
scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43)
at scala.tools.nsc.io.package$$anon$2.run(package.scala:25)
at java.lang.Thread.run(Thread.java:662)
"
because they may have resulted from
(https://issues.apache.org/jira/browse/HADOOP-8995). I used Hadoop 1.0.4.
--- Local ---
val filesList = sc.textFile("data/Files.dat", 2):
13/10/15 10:58:14 DEBUG Groups: Creating new Groups object
13/10/15 10:58:14 DEBUG Groups: Group mapping
impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
13/10/15 10:58:14 DEBUG UserGroupInformation: hadoop login
13/10/15 10:58:14 DEBUG UserGroupInformation: hadoop login commit
13/10/15 10:58:14 DEBUG UserGroupInformation: using local
user:UnixPrincipal: mlosoi
13/10/15 10:58:14 DEBUG UserGroupInformation: UGI loginUser:mlosoi
13/10/15 10:58:14 DEBUG FileSystem: Creating filesystem for file:///
13/10/15 10:58:14 INFO MemoryStore: ensureFreeSpace(36280) called with
curMem=0, maxMem=339585269
13/10/15 10:58:14 INFO MemoryStore: Block broadcast_0 stored as values to
memory (estimated size 35.4 KB, free 323.8 MB)
13/10/15 10:58:14 DEBUG BlockManager: Put block broadcast_0 locally took 58
ms
filesList: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
<console>:13
val fileSizes = filesList.map(file => new File("data/" + file).length):
fileSizes: org.apache.spark.rdd.RDD[Long] = MappedRDD[2] at map at
<console>:15
fileSizes.collect.sum:
13/10/15 11:01:02 DEBUG NativeCodeLoader: Trying to load the custom-built
native-hadoop library...
13/10/15 11:01:02 DEBUG NativeCodeLoader: Failed to load native-hadoop with
error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
13/10/15 11:01:02 DEBUG NativeCodeLoader: java.library.path=
13/10/15 11:01:02 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
13/10/15 11:01:02 WARN LoadSnappy: Snappy native library not loaded
13/10/15 11:01:02 INFO FileInputFormat: Total input paths to process : 1
13/10/15 11:01:02 DEBUG FileInputFormat: Total # of splits: 2
13/10/15 11:01:02 INFO SparkContext: Starting job: collect at <console>:18
13/10/15 11:01:02 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.JobSubmitted
13/10/15 11:01:02 INFO DAGScheduler: Got job 0 (collect at <console>:18)
with 2 output partitions (allowLocal=false)
13/10/15 11:01:02 INFO DAGScheduler: Final stage: Stage 0 (collect at
<console>:18)
13/10/15 11:01:02 INFO DAGScheduler: Parents of final stage: List()
13/10/15 11:01:02 DEBUG BlockManager: Got multiple block location in 2 ms
13/10/15 11:01:02 DEBUG BlockManager: Got multiple block location in 0 ms
13/10/15 11:01:02 DEBUG BlockManager: Got multiple block location in 0 ms
13/10/15 11:01:02 INFO DAGScheduler: Missing parents: List()
13/10/15 11:01:02 DEBUG DAGScheduler: submitStage(Stage 0)
13/10/15 11:01:02 DEBUG DAGScheduler: missing: List()
13/10/15 11:01:02 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at map
at <console>:15), which has no missing parents
13/10/15 11:01:02 DEBUG DAGScheduler: submitMissingTasks(Stage 0)
13/10/15 11:01:02 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0
(MappedRDD[2] at map at <console>:15)
13/10/15 11:01:02 DEBUG DAGScheduler: New pending tasks: Set(ResultTask(0,
0), ResultTask(0, 1))
13/10/15 11:01:02 DEBUG LocalScheduler:
parentName:,name:TaskSet_0,runningTasks:0
13/10/15 11:01:02 DEBUG LocalTaskSetManager: availableCpus:1, numFinished:0,
numTasks:2
13/10/15 11:01:02 INFO LocalTaskSetManager: Size of task 0 is 1671 bytes
13/10/15 11:01:02 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.BeginEvent
13/10/15 11:01:02 DEBUG LocalTaskSetManager: availableCpus:0, numFinished:0,
numTasks:2
13/10/15 11:01:02 INFO LocalScheduler: Running 0
13/10/15 11:01:02 DEBUG BlockManager: Getting local block broadcast_0
13/10/15 11:01:02 DEBUG BlockManager: Level for block broadcast_0 is
StorageLevel(true, true, true, 1)
13/10/15 11:01:02 DEBUG BlockManager: Getting block broadcast_0 from memory
13/10/15 11:01:02 INFO HadoopRDD: Input split:
file:/home/mlosoi/personal/testenv3/spark_apps/TestApp1/data/Files.dat:0+337
5
13/10/15 11:01:02 INFO LocalScheduler: Finished 0
13/10/15 11:01:02 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.CompletionEvent
13/10/15 11:01:02 DEBUG LocalScheduler:
parentName:,name:TaskSet_0,runningTasks:0
13/10/15 11:01:02 DEBUG LocalTaskSetManager: availableCpus:1, numFinished:1,
numTasks:2
13/10/15 11:01:02 INFO LocalTaskSetManager: Size of task 1 is 1671 bytes
13/10/15 11:01:02 DEBUG LocalTaskSetManager: availableCpus:0, numFinished:1,
numTasks:2
13/10/15 11:01:02 INFO LocalScheduler: Running 1
13/10/15 11:01:02 INFO DAGScheduler: Completed ResultTask(0, 0)
13/10/15 11:01:02 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.BeginEvent
13/10/15 11:01:02 DEBUG BlockManager: Getting local block broadcast_0
13/10/15 11:01:02 DEBUG BlockManager: Level for block broadcast_0 is
StorageLevel(true, true, true, 1)
13/10/15 11:01:02 DEBUG BlockManager: Getting block broadcast_0 from memory
13/10/15 11:01:02 INFO HadoopRDD: Input split:
file:/home/mlosoi/personal/testenv3/spark_apps/TestApp1/data/Files.dat:3375+
3375
13/10/15 11:01:02 INFO LocalScheduler: Finished 1
13/10/15 11:01:02 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.CompletionEvent
13/10/15 11:01:02 INFO DAGScheduler: Completed ResultTask(0, 1)
13/10/15 11:01:02 INFO LocalScheduler: Remove TaskSet 0.0 from pool
13/10/15 11:01:02 INFO DAGScheduler: Stage 0 (collect at <console>:18)
finished in 0.099 s
13/10/15 11:01:02 INFO SparkContext: Job finished: collect at <console>:18,
took 0.191815601 s
res0: Long = 74747646
--- Distributed ---
val filesList = sc.textFile("data/Files.dat", 2):
13/10/15 11:05:33 DEBUG Groups: Creating new Groups object
13/10/15 11:05:33 DEBUG Groups: Group mapping
impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
13/10/15 11:05:33 DEBUG UserGroupInformation: hadoop login
13/10/15 11:05:33 DEBUG UserGroupInformation: hadoop login commit
13/10/15 11:05:33 DEBUG UserGroupInformation: using local
user:UnixPrincipal: mlosoi
13/10/15 11:05:33 DEBUG UserGroupInformation: UGI loginUser:mlosoi
13/10/15 11:05:33 DEBUG FileSystem: Creating filesystem for file:///
13/10/15 11:05:33 INFO MemoryStore: ensureFreeSpace(36280) called with
curMem=0, maxMem=339585269
13/10/15 11:05:33 INFO MemoryStore: Block broadcast_0 stored as values to
memory (estimated size 35.4 KB, free 323.8 MB)
13/10/15 11:05:33 DEBUG BlockManager: Put block broadcast_0 locally took 63
ms
filesList: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
<console>:13
val fileSizes = filesList.map(file => new File("data/" + file).length):
fileSizes: org.apache.spark.rdd.RDD[Long] = MappedRDD[2] at map at
<console>:15
fileSizes.collect.sum:
13/10/15 11:06:58 DEBUG NativeCodeLoader: Trying to load the custom-built
native-hadoop library...
13/10/15 11:06:58 DEBUG NativeCodeLoader: Failed to load native-hadoop with
error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
13/10/15 11:06:58 DEBUG NativeCodeLoader: java.library.path=
13/10/15 11:06:58 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
13/10/15 11:06:58 WARN LoadSnappy: Snappy native library not loaded
13/10/15 11:06:58 INFO FileInputFormat: Total input paths to process : 1
13/10/15 11:06:58 DEBUG FileInputFormat: Total # of splits: 2
13/10/15 11:06:58 INFO SparkContext: Starting job: collect at <console>:18
13/10/15 11:06:58 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.JobSubmitted
13/10/15 11:06:58 INFO DAGScheduler: Got job 0 (collect at <console>:18)
with 2 output partitions (allowLocal=false)
13/10/15 11:06:58 INFO DAGScheduler: Final stage: Stage 0 (collect at
<console>:18)
13/10/15 11:06:58 INFO DAGScheduler: Parents of final stage: List()
13/10/15 11:06:58 DEBUG BlockManager: Got multiple block location in 2 ms
13/10/15 11:06:58 DEBUG BlockManager: Got multiple block location in 0 ms
13/10/15 11:06:58 DEBUG BlockManager: Got multiple block location in 2 ms
13/10/15 11:06:58 INFO DAGScheduler: Missing parents: List()
13/10/15 11:06:58 DEBUG DAGScheduler: submitStage(Stage 0)
13/10/15 11:06:58 DEBUG DAGScheduler: missing: List()
13/10/15 11:06:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at map
at <console>:15), which has no missing parents
13/10/15 11:06:58 DEBUG DAGScheduler: submitMissingTasks(Stage 0)
13/10/15 11:06:58 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0
(MappedRDD[2] at map at <console>:15)
13/10/15 11:06:58 DEBUG DAGScheduler: New pending tasks: Set(ResultTask(0,
1), ResultTask(0, 0))
13/10/15 11:06:58 INFO ClusterScheduler: Adding task set 0.0 with 2 tasks
13/10/15 11:06:58 DEBUG ClusterTaskSetManager: Epoch for TaskSet 0.0: 0
13/10/15 11:06:58 DEBUG ClusterTaskSetManager: Valid locality levels for
TaskSet 0.0: ANY
13/10/15 11:06:58 DEBUG ClusterScheduler: parentName: , name: TaskSet_0,
runningTasks: 0
13/10/15 11:06:58 DEBUG ClusterScheduler: parentName: , name: TaskSet_0,
runningTasks: 0
13/10/15 11:06:58 INFO ClusterTaskSetManager: Starting task 0.0:0 as TID 0
on executor 0: 130.233.193.25 (PROCESS_LOCAL)
13/10/15 11:06:58 INFO ClusterTaskSetManager: Serialized task 0.0:0 as 1671
bytes in 4 ms
13/10/15 11:06:58 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.BeginEvent
13/10/15 11:06:58 INFO ClusterTaskSetManager: Finished TID 0 in 482 ms on
130.233.193.25 (progress: 1/2)
13/10/15 11:06:58 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.CompletionEvent
13/10/15 11:06:58 DEBUG ClusterScheduler: parentName: , name: TaskSet_0,
runningTasks: 0
13/10/15 11:06:58 DEBUG ClusterScheduler: parentName: , name: TaskSet_0,
runningTasks: 0
13/10/15 11:06:58 INFO ClusterTaskSetManager: Starting task 0.0:1 as TID 1
on executor 0: 130.233.193.25 (PROCESS_LOCAL)
13/10/15 11:06:58 INFO DAGScheduler: Completed ResultTask(0, 0)
13/10/15 11:06:58 INFO ClusterTaskSetManager: Serialized task 0.0:1 as 1671
bytes in 1 ms
13/10/15 11:06:58 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.BeginEvent
13/10/15 11:06:58 INFO ClusterTaskSetManager: Finished TID 1 in 27 ms on
130.233.193.25 (progress: 2/2)
13/10/15 11:06:58 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.CompletionEvent
13/10/15 11:06:58 INFO DAGScheduler: Completed ResultTask(0, 1)
13/10/15 11:06:58 INFO ClusterScheduler: Remove TaskSet 0.0 from pool
13/10/15 11:06:58 INFO DAGScheduler: Stage 0 (collect at <console>:18)
finished in 0.521 s
13/10/15 11:06:58 INFO SparkContext: Job finished: collect at <console>:18,
took 0.586467999 s
res0: Long = 0
fileSizes.collect:
13/10/15 11:09:04 INFO SparkContext: Starting job: collect at <console>:18
13/10/15 11:09:04 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.JobSubmitted
13/10/15 11:09:04 INFO DAGScheduler: Got job 1 (collect at <console>:18)
with 2 output partitions (allowLocal=false)
13/10/15 11:09:04 INFO DAGScheduler: Final stage: Stage 1 (collect at
<console>:18)
13/10/15 11:09:04 INFO DAGScheduler: Parents of final stage: List()
13/10/15 11:09:04 DEBUG BlockManager: Got multiple block location in 0 ms
13/10/15 11:09:04 DEBUG BlockManager: Got multiple block location in 0 ms
13/10/15 11:09:04 DEBUG BlockManager: Got multiple block location in 0 ms
13/10/15 11:09:04 INFO DAGScheduler: Missing parents: List()
13/10/15 11:09:04 DEBUG DAGScheduler: submitStage(Stage 1)
13/10/15 11:09:04 DEBUG DAGScheduler: missing: List()
13/10/15 11:09:04 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[2] at map
at <console>:15), which has no missing parents
13/10/15 11:09:04 DEBUG DAGScheduler: submitMissingTasks(Stage 1)
13/10/15 11:09:04 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1
(MappedRDD[2] at map at <console>:15)
13/10/15 11:09:04 DEBUG DAGScheduler: New pending tasks: Set(ResultTask(1,
1), ResultTask(1, 0))
13/10/15 11:09:04 INFO ClusterScheduler: Adding task set 1.0 with 2 tasks
13/10/15 11:09:04 DEBUG ClusterTaskSetManager: Epoch for TaskSet 1.0: 0
13/10/15 11:09:04 DEBUG ClusterTaskSetManager: Valid locality levels for
TaskSet 1.0: ANY
13/10/15 11:09:04 DEBUG ClusterScheduler: parentName: , name: TaskSet_1,
runningTasks: 0
13/10/15 11:09:04 DEBUG ClusterScheduler: parentName: , name: TaskSet_1,
runningTasks: 0
13/10/15 11:09:04 INFO ClusterTaskSetManager: Starting task 1.0:0 as TID 2
on executor 0: 130.233.193.25 (PROCESS_LOCAL)
13/10/15 11:09:04 INFO ClusterTaskSetManager: Serialized task 1.0:0 as 1675
bytes in 1 ms
13/10/15 11:09:04 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.BeginEvent
13/10/15 11:09:04 INFO ClusterTaskSetManager: Finished TID 2 in 26 ms on
130.233.193.25 (progress: 1/2)
13/10/15 11:09:04 DEBUG ClusterScheduler: parentName: , name: TaskSet_1,
runningTasks: 0
13/10/15 11:09:04 DEBUG ClusterScheduler: parentName: , name: TaskSet_1,
runningTasks: 0
13/10/15 11:09:04 INFO ClusterTaskSetManager: Starting task 1.0:1 as TID 3
on executor 0: 130.233.193.25 (PROCESS_LOCAL)
13/10/15 11:09:04 INFO ClusterTaskSetManager: Serialized task 1.0:1 as 1675
bytes in 0 ms
13/10/15 11:09:04 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.CompletionEvent
13/10/15 11:09:04 INFO DAGScheduler: Completed ResultTask(1, 0)
13/10/15 11:09:04 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.BeginEvent
13/10/15 11:09:04 INFO ClusterTaskSetManager: Finished TID 3 in 27 ms on
130.233.193.25 (progress: 2/2)
13/10/15 11:09:04 INFO ClusterScheduler: Remove TaskSet 1.0 from pool
13/10/15 11:09:04 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.CompletionEvent
13/10/15 11:09:04 INFO DAGScheduler: Completed ResultTask(1, 1)
13/10/15 11:09:04 INFO DAGScheduler: Stage 1 (collect at <console>:18)
finished in 0.057 s
13/10/15 11:09:04 INFO SparkContext: Job finished: collect at <console>:18,
took 0.06363878 s
res1: Array[Long] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
fileSizes.first:
13/10/15 11:10:14 INFO SparkContext: Starting job: first at <console>:18
13/10/15 11:10:14 DEBUG DAGScheduler: Got event of type
org.apache.spark.scheduler.JobSubmitted
13/10/15 11:10:14 INFO DAGScheduler: Got job 2 (first at <console>:18) with
1 output partitions (allowLocal=true)
13/10/15 11:10:14 INFO DAGScheduler: Final stage: Stage 2 (first at
<console>:18)
13/10/15 11:10:14 INFO DAGScheduler: Parents of final stage: List()
13/10/15 11:10:14 DEBUG BlockManager: Got multiple block location in 0 ms
13/10/15 11:10:14 DEBUG BlockManager: Got multiple block location in 0 ms
13/10/15 11:10:14 DEBUG BlockManager: Got multiple block location in 0 ms
13/10/15 11:10:14 INFO DAGScheduler: Missing parents: List()
13/10/15 11:10:14 INFO DAGScheduler: Computing the requested partition
locally
13/10/15 11:10:14 INFO HadoopRDD: Input split:
file:/home/mlosoi/personal/testenv3/spark_apps/TestApp1/data/Files.dat:0+337
5
13/10/15 11:10:14 INFO SparkContext: Job finished: first at <console>:18,
took 0.016751241 s
res2: Long = 176034