*Problem Description*:

The program running in  stand-alone spark cluster (1 master, 6 workers with
8g ram and 2 cores).
Input: a 468MB file with 133433 records stored in HDFS.
Output: just 2MB file will stored in HDFS
The program has two map operations and one reduceByKey operation.
Finally I save the result to HDFS using "*saveAsTextFile*".
*Problem*: if I don't add "saveAsTextFile", the program runs very fast(a few
seconds), otherwise extremely slow until about 30 mins.

*My program (is very Simple)*
        public static void main(String[] args) throws IOException{
                /**Parameter Setting***********/
                 String localPointPath = 
"/home/hduser/skyrock/skyrockImageFeatures.csv";
                 String remoteFilePath =
"hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
                 String outputPath = 
"hdfs://HadoopV26Master:9000/user/sparkoutput/";
                 final int row = 133433;
                 final int col = 458;
                 final double dc = Double.valueOf(args[0]);
                
                SparkConf conf = new SparkConf().
                                setAppName("distance")
                                .set("spark.executor.memory", 
"4g").set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
                                .set("spark.eventLog.enabled", "true");
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                JavaRDD<String> textFile = sc.textFile(remoteFilePath);

                //Broadcast variable, the dimension of this double array: 
133433*458
                final Broadcast<double[][]> broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
                /**
                 * Compute the distance in terms of each point on each instance.
                 * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
                 */
                JavaPairRDD<Integer,Double> distance = 
textFile.flatMapToPair(new
PairFlatMapFunction<String, Integer, Double>(){
                        public Iterable<Tuple2&lt;Integer, Double>> call(String 
v1) throws
Exception{
                                List<String> al = Arrays.asList(v1.split(",")); 
                                double[] featureVals = new double[al.size()];
                                for(int j=0;j<al.size()-1;j++)
                                        featureVals[j] = 
Double.valueOf(al.get(j+1));
                                int jIndex = Integer.valueOf(al.get(0));
                                double[][] allPoints =  broadcastPoints.value();
                                double sum = 0;
                                List&lt;Tuple2&lt;Integer, Double>> list = new
ArrayList<Tuple2&lt;Integer, Double>>();
                                for(int i=0;i<row; i++){
                                        sum = 0;
                                        for(int j=0;j&lt;al.size()-1;j++){
                                                sum +=
(allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
                                        }
                                        list.add(new 
Tuple2&lt;Integer,Double>(jIndex, Math.sqrt(sum) ));
                                }
                                return list;
                        }
                });
                
                //Create zeroOne density
                JavaPairRDD<Integer, Integer> densityZeroOne = 
distance.mapValues(new
Function<Double, Integer>(){
                        public Integer call(Double v1) throws Exception {
                                if(v1<dc)
                                        return 1;
                                else return 0;
                        }
                        
                });
//              //Combine the density
                JavaPairRDD&lt;Integer, Integer> counts = 
densityZeroOne.reduceByKey(new
Function2<Integer, Integer,Integer>() {
                                public Integer call(Integer v1, Integer v2) 
throws Exception {
                                        return v1+v2;
                                }
                        });
                counts.*saveAsTextFile*(outputPath+args[1]);
                sc.stop();
        }

*If I comment "saveAsTextFile", log will be:*
Picked up _JAVA_OPTIONS: -Xmx4g
15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser
15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/24 15:21:31 INFO Remoting: Starting remoting
15/05/24 15:21:31 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@HadoopV26Master:57429]
15/05/24 15:21:31 INFO util.Utils: Successfully started service
'sparkDriver' on port 57429.
15/05/24 15:21:31 INFO spark.SparkEnv: Registering MapOutputTracker
15/05/24 15:21:31 INFO spark.SparkEnv: Registering BlockManagerMaster
15/05/24 15:21:31 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-6342bde9-feca-4651-8cca-a67541150420/blockmgr-e92d0ae0-ec95-44cb-986a-266a1899202b
15/05/24 15:21:31 INFO storage.MemoryStore: MemoryStore started with
capacity 1966.1 MB
15/05/24 15:21:31 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-fea59c9e-1264-45e9-ad31-484d7de83d0a/httpd-c6421767-ffaf-4417-905e-34b3d13a7bf4
15/05/24 15:21:31 INFO spark.HttpServer: Starting HTTP Server
15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/24 15:21:31 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:36956
15/05/24 15:21:31 INFO util.Utils: Successfully started service 'HTTP file
server' on port 36956.
15/05/24 15:21:31 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/24 15:21:31 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/05/24 15:21:31 INFO util.Utils: Successfully started service 'SparkUI' on
port 4040.
15/05/24 15:21:31 INFO ui.SparkUI: Started SparkUI at
http://HadoopV26Master:4040
15/05/24 15:21:31 INFO spark.SparkContext: Added JAR
file:/home/hduser/densityspark.jar at
http://10.9.0.16:36956/jars/densityspark.jar with timestamp 1432452091753
15/05/24 15:21:31 INFO client.AppClient$ClientActor: Connecting to master
akka.tcp://sparkMaster@HadoopV26Master:7077/user/Master...
15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Connected to
Spark cluster with app ID app-20150524152132-0035
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
app-20150524152132-0035/0 on worker-20150522181416-HadoopV26Slave3-47002
(HadoopV26Slave3:47002) with 2 cores
15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152132-0035/0 on hostPort HadoopV26Slave3:47002 with 2 cores,
4.0 GB RAM
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
app-20150524152132-0035/1 on worker-20150522181417-HadoopV26Slave6-60280
(HadoopV26Slave6:60280) with 2 cores
15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152132-0035/1 on hostPort HadoopV26Slave6:60280 with 2 cores,
4.0 GB RAM
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
app-20150524152132-0035/2 on worker-20150522181417-HadoopV26Slave5-54797
(HadoopV26Slave5:54797) with 2 cores
15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152132-0035/2 on hostPort HadoopV26Slave5:54797 with 2 cores,
4.0 GB RAM
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
app-20150524152132-0035/3 on worker-20150522181416-HadoopV26Slave1-47647
(HadoopV26Slave1:47647) with 2 cores
15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152132-0035/3 on hostPort HadoopV26Slave1:47647 with 2 cores,
4.0 GB RAM
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
app-20150524152132-0035/4 on worker-20150522181417-HadoopV26Slave4-59352
(HadoopV26Slave4:59352) with 2 cores
15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152132-0035/4 on hostPort HadoopV26Slave4:59352 with 2 cores,
4.0 GB RAM
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
app-20150524152132-0035/5 on worker-20150522181417-HadoopV26Slave2-34694
(HadoopV26Slave2:34694) with 2 cores
15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152132-0035/5 on hostPort HadoopV26Slave2:34694 with 2 cores,
4.0 GB RAM
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/0 is now LOADING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/4 is now LOADING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/1 is now LOADING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/3 is now LOADING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/5 is now LOADING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/2 is now LOADING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/0 is now RUNNING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/1 is now RUNNING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/2 is now RUNNING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/3 is now RUNNING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/4 is now RUNNING
15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152132-0035/5 is now RUNNING
15/05/24 15:21:32 INFO netty.NettyBlockTransferService: Server created on
47711
15/05/24 15:21:32 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/05/24 15:21:32 INFO storage.BlockManagerMasterActor: Registering block
manager HadoopV26Master:47711 with 1966.1 MB RAM, BlockManagerId(<driver>,
HadoopV26Master, 47711)
15/05/24 15:21:32 INFO storage.BlockManagerMaster: Registered BlockManager
15/05/24 15:21:32 INFO scheduler.EventLoggingListener: Logging events to
file:/tmp/spark-events/app-20150524152132-0035
15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend
is ready for scheduling beginning after reached minRegisteredResourcesRatio:
0.0
15/05/24 15:21:32 INFO storage.MemoryStore: ensureFreeSpace(258503) called
with curMem=0, maxMem=2061647216
15/05/24 15:21:32 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 252.4 KB, free 1965.9 MB)
15/05/24 15:21:32 INFO storage.MemoryStore: ensureFreeSpace(27227) called
with curMem=258503, maxMem=2061647216
15/05/24 15:21:32 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 26.6 KB, free 1965.9 MB)
15/05/24 15:21:32 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in
memory on HadoopV26Master:47711 (size: 26.6 KB, free: 1966.1 MB)
15/05/24 15:21:32 INFO storage.BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/05/24 15:21:32 INFO spark.SparkContext: Created broadcast 0 from textFile
at Clustering.java:67

*If I uncomment "saveAsTextFile" then:*
15/05/24 15:23:57 INFO spark.SparkContext: Running Spark version 1.3.1
15/05/24 15:23:58 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/05/24 15:23:58 INFO spark.SecurityManager: Changing view acls to: hduser
15/05/24 15:23:58 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/05/24 15:23:58 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/05/24 15:23:58 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/24 15:23:58 INFO Remoting: Starting remoting
15/05/24 15:23:58 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@HadoopV26Master:41017]
15/05/24 15:23:58 INFO util.Utils: Successfully started service
'sparkDriver' on port 41017.
15/05/24 15:23:58 INFO spark.SparkEnv: Registering MapOutputTracker
15/05/24 15:23:58 INFO spark.SparkEnv: Registering BlockManagerMaster
15/05/24 15:23:58 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-cc73d7c3-4b0e-4a20-bf1c-147a8ee927f7/blockmgr-aa5cb9e3-224c-477e-b0b1-e65ec02e80fe
15/05/24 15:23:58 INFO storage.MemoryStore: MemoryStore started with
capacity 1966.1 MB
15/05/24 15:23:58 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-c42a1cc6-f137-4233-96ef-1e0a44e634d1/httpd-114edee3-3404-4425-a644-c14f95a7ee7b
15/05/24 15:23:58 INFO spark.HttpServer: Starting HTTP Server
15/05/24 15:23:59 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/24 15:23:59 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:38453
15/05/24 15:23:59 INFO util.Utils: Successfully started service 'HTTP file
server' on port 38453.
15/05/24 15:23:59 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/05/24 15:23:59 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/24 15:23:59 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/05/24 15:23:59 INFO util.Utils: Successfully started service 'SparkUI' on
port 4040.
15/05/24 15:23:59 INFO ui.SparkUI: Started SparkUI at
http://HadoopV26Master:4040
15/05/24 15:23:59 INFO spark.SparkContext: Added JAR
file:/home/hduser/densitysparksave.jar at
http://10.9.0.16:38453/jars/densitysparksave.jar with timestamp
1432452239254
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Connecting to master
akka.tcp://sparkMaster@HadoopV26Master:7077/user/Master...
15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Connected to
Spark cluster with app ID app-20150524152359-0036
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
app-20150524152359-0036/0 on worker-20150522181416-HadoopV26Slave3-47002
(HadoopV26Slave3:47002) with 2 cores
15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152359-0036/0 on hostPort HadoopV26Slave3:47002 with 2 cores,
4.0 GB RAM
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
app-20150524152359-0036/1 on worker-20150522181417-HadoopV26Slave6-60280
(HadoopV26Slave6:60280) with 2 cores
15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152359-0036/1 on hostPort HadoopV26Slave6:60280 with 2 cores,
4.0 GB RAM
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
app-20150524152359-0036/2 on worker-20150522181417-HadoopV26Slave5-54797
(HadoopV26Slave5:54797) with 2 cores
15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152359-0036/2 on hostPort HadoopV26Slave5:54797 with 2 cores,
4.0 GB RAM
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
app-20150524152359-0036/3 on worker-20150522181416-HadoopV26Slave1-47647
(HadoopV26Slave1:47647) with 2 cores
15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152359-0036/3 on hostPort HadoopV26Slave1:47647 with 2 cores,
4.0 GB RAM
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
app-20150524152359-0036/4 on worker-20150522181417-HadoopV26Slave4-59352
(HadoopV26Slave4:59352) with 2 cores
15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152359-0036/4 on hostPort HadoopV26Slave4:59352 with 2 cores,
4.0 GB RAM
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
app-20150524152359-0036/5 on worker-20150522181417-HadoopV26Slave2-34694
(HadoopV26Slave2:34694) with 2 cores
15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20150524152359-0036/5 on hostPort HadoopV26Slave2:34694 with 2 cores,
4.0 GB RAM
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/1 is now LOADING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/2 is now LOADING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/0 is now LOADING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/3 is now LOADING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/5 is now LOADING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/4 is now LOADING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/0 is now RUNNING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/1 is now RUNNING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/2 is now RUNNING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/3 is now RUNNING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/4 is now RUNNING
15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
app-20150524152359-0036/5 is now RUNNING
15/05/24 15:23:59 INFO netty.NettyBlockTransferService: Server created on
46919
15/05/24 15:23:59 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/05/24 15:23:59 INFO storage.BlockManagerMasterActor: Registering block
manager HadoopV26Master:46919 with 1966.1 MB RAM, BlockManagerId(<driver>,
HadoopV26Master, 46919)
15/05/24 15:23:59 INFO storage.BlockManagerMaster: Registered BlockManager
15/05/24 15:24:00 INFO scheduler.EventLoggingListener: Logging events to
file:/tmp/spark-events/app-20150524152359-0036
15/05/24 15:24:00 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend
is ready for scheduling beginning after reached minRegisteredResourcesRatio:
0.0
15/05/24 15:24:00 INFO storage.MemoryStore: ensureFreeSpace(258503) called
with curMem=0, maxMem=2061647216
15/05/24 15:24:00 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 252.4 KB, free 1965.9 MB)
15/05/24 15:24:00 INFO storage.MemoryStore: ensureFreeSpace(27227) called
with curMem=258503, maxMem=2061647216
15/05/24 15:24:00 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 26.6 KB, free 1965.9 MB)
15/05/24 15:24:00 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in
memory on HadoopV26Master:46919 (size: 26.6 KB, free: 1966.1 MB)
15/05/24 15:24:00 INFO storage.BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/05/24 15:24:00 INFO spark.SparkContext: Created broadcast 0 from textFile
at Clustering.java:67
15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@HadoopV26Slave5:43757/user/Executor#-184395839]
with ID 2
15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@HadoopV26Slave4:35123/user/Executor#1588464525]
with ID 4
15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@HadoopV26Slave2:33439/user/Executor#529853915]
with ID 5
15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@HadoopV26Slave6:37211/user/Executor#509959098]
with ID 1
15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@HadoopV26Slave3:52152/user/Executor#-1731088969]
with ID 0
15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@HadoopV26Slave1:55666/user/Executor#-635759065]
with ID 3
15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
manager HadoopV26Slave5:45835 with 2.1 GB RAM, BlockManagerId(2,
HadoopV26Slave5, 45835)
15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
manager HadoopV26Slave2:44597 with 2.1 GB RAM, BlockManagerId(5,
HadoopV26Slave2, 44597)
15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
manager HadoopV26Slave4:44317 with 2.1 GB RAM, BlockManagerId(4,
HadoopV26Slave4, 44317)
15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
manager HadoopV26Slave6:52445 with 2.1 GB RAM, BlockManagerId(1,
HadoopV26Slave6, 52445)
15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
manager HadoopV26Slave3:38931 with 2.1 GB RAM, BlockManagerId(0,
HadoopV26Slave3, 38931)
15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
manager HadoopV26Slave1:44960 with 2.1 GB RAM, BlockManagerId(3,
HadoopV26Slave1, 44960)
15/05/24 15:24:10 INFO mapred.FileInputFormat: Total input paths to process
: 1
15/05/24 15:24:10 INFO Configuration.deprecation: mapred.tip.id is
deprecated. Instead, use mapreduce.task.id
15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.id is
deprecated. Instead, use mapreduce.task.attempt.id
15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.is.map is
deprecated. Instead, use mapreduce.task.ismap
15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.partition is
deprecated. Instead, use mapreduce.task.partition
15/05/24 15:24:10 INFO Configuration.deprecation: mapred.job.id is
deprecated. Instead, use mapreduce.job.id
15/05/24 15:24:11 INFO spark.SparkContext: Starting job: saveAsTextFile at
Clustering.java:117
15/05/24 15:24:11 INFO scheduler.DAGScheduler: Registering RDD 3 (mapValues
at Clustering.java:100)
15/05/24 15:24:11 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at
Clustering.java:117) with 4 output partitions (allowLocal=false)
15/05/24 15:24:11 INFO scheduler.DAGScheduler: Final stage: Stage
1(saveAsTextFile at Clustering.java:117)
15/05/24 15:24:11 INFO scheduler.DAGScheduler: Parents of final stage:
List(Stage 0)
15/05/24 15:24:11 INFO scheduler.DAGScheduler: Missing parents: List(Stage
0)
15/05/24 15:24:11 INFO scheduler.DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[3] at mapValues at Clustering.java:100), which has no
missing parents
15/05/24 15:24:12 INFO storage.MemoryStore: ensureFreeSpace(490237112)
called with curMem=285730, maxMem=2061647216
15/05/24 15:24:12 INFO storage.MemoryStore: Block broadcast_1 stored as
values in memory (estimated size 467.5 MB, free 1498.3 MB)
15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(4194304) called
with curMem=490522842, maxMem=2061647216
15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece0 stored
as bytes in memory (estimated size 4.0 MB, free 1494.3 MB)
15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in
memory on HadoopV26Master:46919 (size: 4.0 MB, free: 1962.1 MB)
15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block
broadcast_1_piece0
*(omits hundreds of lines here)*
15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(4194304) called
with curMem=968673498, maxMem=2061647216
15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece114
stored as bytes in memory (estimated size 4.0 MB, free 1038.3 MB)
15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece114
in memory on HadoopV26Master:46919 (size: 4.0 MB, free: 1506.1 MB)
15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block
broadcast_1_piece114
15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(2116571) called
with curMem=972867802, maxMem=2061647216
15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece115
stored as bytes in memory (estimated size 2.0 MB, free 1036.3 MB)
15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece115
in memory on HadoopV26Master:46919 (size: 2.0 MB, free: 1504.1 MB)
15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block
broadcast_1_piece115
15/05/24 15:24:13 INFO spark.SparkContext: Created broadcast 1 from
broadcast at DAGScheduler.scala:839
15/05/24 15:24:13 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
from Stage 0 (MapPartitionsRDD[3] at mapValues at Clustering.java:100)
15/05/24 15:24:13 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
4 tasks
15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, HadoopV26Slave1, NODE_LOCAL, 1383 bytes)
15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
0.0 (TID 1, HadoopV26Slave2, NODE_LOCAL, 1383 bytes)
15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
0.0 (TID 2, HadoopV26Slave3, NODE_LOCAL, 1383 bytes)
15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
0.0 (TID 3, HadoopV26Slave4, NODE_LOCAL, 1383 bytes)
15/05/24 15:24:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece68
in memory on HadoopV26Slave4:44317 (size: 4.0 MB, free: 2.1 GB)
15/05/24 15:24:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece102
in memory on HadoopV26Slave2:44597 (size: 4.0 MB, free: 2.1 GB)
*(omits hundreds of lines here)*
15/05/24 15:24:23 INFO storage.BlockManagerInfo: Added broadcast_1_piece35
in memory on HadoopV26Slave1:44960 (size: 4.0 MB, free: 1666.0 MB)
15/05/24 15:24:24 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in
memory on HadoopV26Slave3:38931 (size: 26.6 KB, free: 1658.0 MB)
15/05/24 15:24:24 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in
memory on HadoopV26Slave1:44960 (size: 26.6 KB, free: 1658.0 MB)
*Then the program stuck at here for many mins*

At this time, I check the web UI master:4040 and see the second map is still
running:
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/jobstage.png> 
And then I click into mapValue I saw this:
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapValue.png> 
But why that is so slow? And how's that related to "saveAsTextFile".

BTW, in the mapValue stage, I saw input size/records increased very slowly,
does it mean read very slow. MapValue is the second stage.  And why it
didn't show the reduce stage?

*Then after about 20 mins, I get the following logs and can get the result:*
15/05/24 16:12:22 INFO scheduler.TaskSetManager: Finished task 3.0 in stage
0.0 (TID 2) in 2888967 ms on HadoopV26Slave3 (1/4)
15/05/24 16:24:27 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
0.0 (TID 3) in 3614079 ms on HadoopV26Slave4 (2/4)
15/05/24 16:25:53 INFO scheduler.TaskSetManager: Finished task 2.0 in stage
0.0 (TID 1) in 3700117 ms on HadoopV26Slave2 (3/4)
15/05/24 16:27:35 INFO scheduler.DAGScheduler: Stage 0 (mapValues at
Clustering.java:100) finished in 3801.290 s
15/05/24 16:27:35 INFO scheduler.DAGScheduler: looking for newly runnable
stages
15/05/24 16:27:35 INFO scheduler.DAGScheduler: running: Set()
15/05/24 16:27:35 INFO scheduler.DAGScheduler: waiting: Set(Stage 1)
15/05/24 16:27:35 INFO scheduler.DAGScheduler: failed: Set()
15/05/24 16:27:35 INFO scheduler.DAGScheduler: Missing parents for Stage 1:
List()
15/05/24 16:27:35 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
0.0 (TID 0) in 3801284 ms on HadoopV26Slave1 (4/4)
15/05/24 16:27:35 INFO scheduler.DAGScheduler: Submitting Stage 1
(MapPartitionsRDD[5] at saveAsTextFile at Clustering.java:117), which is now
runnable
15/05/24 16:27:35 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool 
15/05/24 16:27:35 INFO storage.MemoryStore: ensureFreeSpace(128128) called
with curMem=974984373, maxMem=2061647216
15/05/24 16:27:35 INFO storage.MemoryStore: Block broadcast_2 stored as
values in memory (estimated size 125.1 KB, free 1036.2 MB)
15/05/24 16:27:35 INFO storage.MemoryStore: ensureFreeSpace(58374) called
with curMem=975112501, maxMem=2061647216
15/05/24 16:27:35 INFO storage.MemoryStore: Block broadcast_2_piece0 stored
as bytes in memory (estimated size 57.0 KB, free 1036.1 MB)
15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Master:46919 (size: 57.0 KB, free: 1504.0 MB)
15/05/24 16:27:35 INFO storage.BlockManagerMaster: Updated info of block
broadcast_2_piece0
15/05/24 16:27:35 INFO spark.SparkContext: Created broadcast 2 from
broadcast at DAGScheduler.scala:839
15/05/24 16:27:35 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
from Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at Clustering.java:117)
15/05/24 16:27:35 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with
4 tasks
15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
1.0 (TID 4, HadoopV26Slave5, PROCESS_LOCAL, 1114 bytes)
15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
1.0 (TID 5, HadoopV26Slave3, PROCESS_LOCAL, 1114 bytes)
15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
1.0 (TID 6, HadoopV26Slave2, PROCESS_LOCAL, 1114 bytes)
15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
1.0 (TID 7, HadoopV26Slave6, PROCESS_LOCAL, 1114 bytes)
15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave2:44597 (size: 57.0 KB, free: 1657.9 MB)
15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave3:38931 (size: 57.0 KB, free: 1657.9 MB)
15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to sparkExecutor@HadoopV26Slave2:33439
15/05/24 16:27:35 INFO spark.MapOutputTrackerMaster: Size of output statuses
for shuffle 0 is 188 bytes
15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to sparkExecutor@HadoopV26Slave3:52152
15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave6:52445 (size: 57.0 KB, free: 2.1 GB)
15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave5:45835 (size: 57.0 KB, free: 2.1 GB)
15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to sparkExecutor@HadoopV26Slave6:37211
15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to sparkExecutor@HadoopV26Slave5:43757
15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added rdd_4_1 in memory on
HadoopV26Slave3:38931 (size: 1733.1 KB, free: 1656.2 MB)
15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_2 in memory on
HadoopV26Slave2:44597 (size: 1733.1 KB, free: 1656.2 MB)
15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_3 in memory on
HadoopV26Slave6:52445 (size: 1717.4 KB, free: 2.1 GB)
15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
1.0 (TID 5) in 1757 ms on HadoopV26Slave3 (1/4)
15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 2.0 in stage
1.0 (TID 6) in 1776 ms on HadoopV26Slave2 (2/4)
15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_0 in memory on
HadoopV26Slave5:45835 (size: 1733.1 KB, free: 2.1 GB)
15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 3.0 in stage
1.0 (TID 7) in 3153 ms on HadoopV26Slave6 (3/4)
15/05/24 16:27:38 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at
Clustering.java:117) finished in 3.258 s
15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
1.0 (TID 4) in 3256 ms on HadoopV26Slave5 (4/4)
15/05/24 16:27:38 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0,
whose tasks have all completed, from pool 
15/05/24 16:27:38 INFO scheduler.DAGScheduler: Job 0 finished:
saveAsTextFile at Clustering.java:117, took 3807.229501 s

although I can obtain the result ,but it's too slow, right?
The followings are also the final result info.
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/result.png> 
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapvalueres.png>
 

PS: if I reduce the size the input to just 10 records, it performs very
fast. But it doesn't make any sense for just 10 records. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-dramatically-slow-when-I-add-saveAsTextFile-tp23003.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to