*Problem Description*: The program running in stand-alone spark cluster (1 master, 6 workers with 8g ram and 2 cores). Input: a 468MB file with 133433 records stored in HDFS. Output: just 2MB file will stored in HDFS The program has two map operations and one reduceByKey operation. Finally I save the result to HDFS using "*saveAsTextFile*". *Problem*: if I don't add "saveAsTextFile", the program runs very fast(a few seconds), otherwise extremely slow until about 30 mins.
*My program (is very Simple)* public static void main(String[] args) throws IOException{ /**Parameter Setting***********/ String localPointPath = "/home/hduser/skyrock/skyrockImageFeatures.csv"; String remoteFilePath = "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv"; String outputPath = "hdfs://HadoopV26Master:9000/user/sparkoutput/"; final int row = 133433; final int col = 458; final double dc = Double.valueOf(args[0]); SparkConf conf = new SparkConf(). setAppName("distance") .set("spark.executor.memory", "4g").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.eventLog.enabled", "true"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> textFile = sc.textFile(remoteFilePath); //Broadcast variable, the dimension of this double array: 133433*458 final Broadcast<double[][]> broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDD<Integer,Double> distance = textFile.flatMapToPair(new PairFlatMapFunction<String, Integer, Double>(){ public Iterable<Tuple2<Integer, Double>> call(String v1) throws Exception{ List<String> al = Arrays.asList(v1.split(",")); double[] featureVals = new double[al.size()]; for(int j=0;j<al.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.value(); double sum = 0; List<Tuple2<Integer, Double>> list = new ArrayList<Tuple2<Integer, Double>>(); for(int i=0;i<row; i++){ sum = 0; for(int j=0;j<al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2<Integer,Double>(jIndex, Math.sqrt(sum) )); } return list; } }); //Create zeroOne density JavaPairRDD<Integer, Integer> densityZeroOne = distance.mapValues(new Function<Double, Integer>(){ public Integer call(Double v1) throws Exception { if(v1<dc) return 1; else return 0; } }); // //Combine the density JavaPairRDD<Integer, Integer> counts = densityZeroOne.reduceByKey(new Function2<Integer, Integer,Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); counts.*saveAsTextFile*(outputPath+args[1]); sc.stop(); } *If I comment "saveAsTextFile", log will be:* Picked up _JAVA_OPTIONS: -Xmx4g 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser 15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to: hduser 15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser); users with modify permissions: Set(hduser) 15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/05/24 15:21:31 INFO Remoting: Starting remoting 15/05/24 15:21:31 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@HadoopV26Master:57429] 15/05/24 15:21:31 INFO util.Utils: Successfully started service 'sparkDriver' on port 57429. 15/05/24 15:21:31 INFO spark.SparkEnv: Registering MapOutputTracker 15/05/24 15:21:31 INFO spark.SparkEnv: Registering BlockManagerMaster 15/05/24 15:21:31 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-6342bde9-feca-4651-8cca-a67541150420/blockmgr-e92d0ae0-ec95-44cb-986a-266a1899202b 15/05/24 15:21:31 INFO storage.MemoryStore: MemoryStore started with capacity 1966.1 MB 15/05/24 15:21:31 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-fea59c9e-1264-45e9-ad31-484d7de83d0a/httpd-c6421767-ffaf-4417-905e-34b3d13a7bf4 15/05/24 15:21:31 INFO spark.HttpServer: Starting HTTP Server 15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/24 15:21:31 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:36956 15/05/24 15:21:31 INFO util.Utils: Successfully started service 'HTTP file server' on port 36956. 15/05/24 15:21:31 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/24 15:21:31 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/05/24 15:21:31 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/05/24 15:21:31 INFO ui.SparkUI: Started SparkUI at http://HadoopV26Master:4040 15/05/24 15:21:31 INFO spark.SparkContext: Added JAR file:/home/hduser/densityspark.jar at http://10.9.0.16:36956/jars/densityspark.jar with timestamp 1432452091753 15/05/24 15:21:31 INFO client.AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@HadoopV26Master:7077/user/Master... 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150524152132-0035 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: app-20150524152132-0035/0 on worker-20150522181416-HadoopV26Slave3-47002 (HadoopV26Slave3:47002) with 2 cores 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152132-0035/0 on hostPort HadoopV26Slave3:47002 with 2 cores, 4.0 GB RAM 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: app-20150524152132-0035/1 on worker-20150522181417-HadoopV26Slave6-60280 (HadoopV26Slave6:60280) with 2 cores 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152132-0035/1 on hostPort HadoopV26Slave6:60280 with 2 cores, 4.0 GB RAM 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: app-20150524152132-0035/2 on worker-20150522181417-HadoopV26Slave5-54797 (HadoopV26Slave5:54797) with 2 cores 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152132-0035/2 on hostPort HadoopV26Slave5:54797 with 2 cores, 4.0 GB RAM 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: app-20150524152132-0035/3 on worker-20150522181416-HadoopV26Slave1-47647 (HadoopV26Slave1:47647) with 2 cores 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152132-0035/3 on hostPort HadoopV26Slave1:47647 with 2 cores, 4.0 GB RAM 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: app-20150524152132-0035/4 on worker-20150522181417-HadoopV26Slave4-59352 (HadoopV26Slave4:59352) with 2 cores 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152132-0035/4 on hostPort HadoopV26Slave4:59352 with 2 cores, 4.0 GB RAM 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: app-20150524152132-0035/5 on worker-20150522181417-HadoopV26Slave2-34694 (HadoopV26Slave2:34694) with 2 cores 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152132-0035/5 on hostPort HadoopV26Slave2:34694 with 2 cores, 4.0 GB RAM 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/0 is now LOADING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/4 is now LOADING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/1 is now LOADING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/3 is now LOADING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/5 is now LOADING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/2 is now LOADING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/0 is now RUNNING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/1 is now RUNNING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/2 is now RUNNING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/3 is now RUNNING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/4 is now RUNNING 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: app-20150524152132-0035/5 is now RUNNING 15/05/24 15:21:32 INFO netty.NettyBlockTransferService: Server created on 47711 15/05/24 15:21:32 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/05/24 15:21:32 INFO storage.BlockManagerMasterActor: Registering block manager HadoopV26Master:47711 with 1966.1 MB RAM, BlockManagerId(<driver>, HadoopV26Master, 47711) 15/05/24 15:21:32 INFO storage.BlockManagerMaster: Registered BlockManager 15/05/24 15:21:32 INFO scheduler.EventLoggingListener: Logging events to file:/tmp/spark-events/app-20150524152132-0035 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/05/24 15:21:32 INFO storage.MemoryStore: ensureFreeSpace(258503) called with curMem=0, maxMem=2061647216 15/05/24 15:21:32 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 252.4 KB, free 1965.9 MB) 15/05/24 15:21:32 INFO storage.MemoryStore: ensureFreeSpace(27227) called with curMem=258503, maxMem=2061647216 15/05/24 15:21:32 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 26.6 KB, free 1965.9 MB) 15/05/24 15:21:32 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on HadoopV26Master:47711 (size: 26.6 KB, free: 1966.1 MB) 15/05/24 15:21:32 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0 15/05/24 15:21:32 INFO spark.SparkContext: Created broadcast 0 from textFile at Clustering.java:67 *If I uncomment "saveAsTextFile" then:* 15/05/24 15:23:57 INFO spark.SparkContext: Running Spark version 1.3.1 15/05/24 15:23:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/05/24 15:23:58 INFO spark.SecurityManager: Changing view acls to: hduser 15/05/24 15:23:58 INFO spark.SecurityManager: Changing modify acls to: hduser 15/05/24 15:23:58 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser); users with modify permissions: Set(hduser) 15/05/24 15:23:58 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/05/24 15:23:58 INFO Remoting: Starting remoting 15/05/24 15:23:58 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@HadoopV26Master:41017] 15/05/24 15:23:58 INFO util.Utils: Successfully started service 'sparkDriver' on port 41017. 15/05/24 15:23:58 INFO spark.SparkEnv: Registering MapOutputTracker 15/05/24 15:23:58 INFO spark.SparkEnv: Registering BlockManagerMaster 15/05/24 15:23:58 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-cc73d7c3-4b0e-4a20-bf1c-147a8ee927f7/blockmgr-aa5cb9e3-224c-477e-b0b1-e65ec02e80fe 15/05/24 15:23:58 INFO storage.MemoryStore: MemoryStore started with capacity 1966.1 MB 15/05/24 15:23:58 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c42a1cc6-f137-4233-96ef-1e0a44e634d1/httpd-114edee3-3404-4425-a644-c14f95a7ee7b 15/05/24 15:23:58 INFO spark.HttpServer: Starting HTTP Server 15/05/24 15:23:59 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/24 15:23:59 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:38453 15/05/24 15:23:59 INFO util.Utils: Successfully started service 'HTTP file server' on port 38453. 15/05/24 15:23:59 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/05/24 15:23:59 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/24 15:23:59 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/05/24 15:23:59 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/05/24 15:23:59 INFO ui.SparkUI: Started SparkUI at http://HadoopV26Master:4040 15/05/24 15:23:59 INFO spark.SparkContext: Added JAR file:/home/hduser/densitysparksave.jar at http://10.9.0.16:38453/jars/densitysparksave.jar with timestamp 1432452239254 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@HadoopV26Master:7077/user/Master... 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150524152359-0036 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: app-20150524152359-0036/0 on worker-20150522181416-HadoopV26Slave3-47002 (HadoopV26Slave3:47002) with 2 cores 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152359-0036/0 on hostPort HadoopV26Slave3:47002 with 2 cores, 4.0 GB RAM 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: app-20150524152359-0036/1 on worker-20150522181417-HadoopV26Slave6-60280 (HadoopV26Slave6:60280) with 2 cores 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152359-0036/1 on hostPort HadoopV26Slave6:60280 with 2 cores, 4.0 GB RAM 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: app-20150524152359-0036/2 on worker-20150522181417-HadoopV26Slave5-54797 (HadoopV26Slave5:54797) with 2 cores 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152359-0036/2 on hostPort HadoopV26Slave5:54797 with 2 cores, 4.0 GB RAM 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: app-20150524152359-0036/3 on worker-20150522181416-HadoopV26Slave1-47647 (HadoopV26Slave1:47647) with 2 cores 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152359-0036/3 on hostPort HadoopV26Slave1:47647 with 2 cores, 4.0 GB RAM 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: app-20150524152359-0036/4 on worker-20150522181417-HadoopV26Slave4-59352 (HadoopV26Slave4:59352) with 2 cores 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152359-0036/4 on hostPort HadoopV26Slave4:59352 with 2 cores, 4.0 GB RAM 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: app-20150524152359-0036/5 on worker-20150522181417-HadoopV26Slave2-34694 (HadoopV26Slave2:34694) with 2 cores 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150524152359-0036/5 on hostPort HadoopV26Slave2:34694 with 2 cores, 4.0 GB RAM 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/1 is now LOADING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/2 is now LOADING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/0 is now LOADING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/3 is now LOADING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/5 is now LOADING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/4 is now LOADING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/0 is now RUNNING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/1 is now RUNNING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/2 is now RUNNING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/3 is now RUNNING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/4 is now RUNNING 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: app-20150524152359-0036/5 is now RUNNING 15/05/24 15:23:59 INFO netty.NettyBlockTransferService: Server created on 46919 15/05/24 15:23:59 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/05/24 15:23:59 INFO storage.BlockManagerMasterActor: Registering block manager HadoopV26Master:46919 with 1966.1 MB RAM, BlockManagerId(<driver>, HadoopV26Master, 46919) 15/05/24 15:23:59 INFO storage.BlockManagerMaster: Registered BlockManager 15/05/24 15:24:00 INFO scheduler.EventLoggingListener: Logging events to file:/tmp/spark-events/app-20150524152359-0036 15/05/24 15:24:00 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/05/24 15:24:00 INFO storage.MemoryStore: ensureFreeSpace(258503) called with curMem=0, maxMem=2061647216 15/05/24 15:24:00 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 252.4 KB, free 1965.9 MB) 15/05/24 15:24:00 INFO storage.MemoryStore: ensureFreeSpace(27227) called with curMem=258503, maxMem=2061647216 15/05/24 15:24:00 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 26.6 KB, free 1965.9 MB) 15/05/24 15:24:00 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on HadoopV26Master:46919 (size: 26.6 KB, free: 1966.1 MB) 15/05/24 15:24:00 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0 15/05/24 15:24:00 INFO spark.SparkContext: Created broadcast 0 from textFile at Clustering.java:67 15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@HadoopV26Slave5:43757/user/Executor#-184395839] with ID 2 15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@HadoopV26Slave4:35123/user/Executor#1588464525] with ID 4 15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@HadoopV26Slave2:33439/user/Executor#529853915] with ID 5 15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@HadoopV26Slave6:37211/user/Executor#509959098] with ID 1 15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@HadoopV26Slave3:52152/user/Executor#-1731088969] with ID 0 15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@HadoopV26Slave1:55666/user/Executor#-635759065] with ID 3 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block manager HadoopV26Slave5:45835 with 2.1 GB RAM, BlockManagerId(2, HadoopV26Slave5, 45835) 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block manager HadoopV26Slave2:44597 with 2.1 GB RAM, BlockManagerId(5, HadoopV26Slave2, 44597) 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block manager HadoopV26Slave4:44317 with 2.1 GB RAM, BlockManagerId(4, HadoopV26Slave4, 44317) 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block manager HadoopV26Slave6:52445 with 2.1 GB RAM, BlockManagerId(1, HadoopV26Slave6, 52445) 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block manager HadoopV26Slave3:38931 with 2.1 GB RAM, BlockManagerId(0, HadoopV26Slave3, 38931) 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block manager HadoopV26Slave1:44960 with 2.1 GB RAM, BlockManagerId(3, HadoopV26Slave1, 44960) 15/05/24 15:24:10 INFO mapred.FileInputFormat: Total input paths to process : 1 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/05/24 15:24:11 INFO spark.SparkContext: Starting job: saveAsTextFile at Clustering.java:117 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Registering RDD 3 (mapValues at Clustering.java:100) 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at Clustering.java:117) with 4 output partitions (allowLocal=false) 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Final stage: Stage 1(saveAsTextFile at Clustering.java:117) 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 0) 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Missing parents: List(Stage 0) 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[3] at mapValues at Clustering.java:100), which has no missing parents 15/05/24 15:24:12 INFO storage.MemoryStore: ensureFreeSpace(490237112) called with curMem=285730, maxMem=2061647216 15/05/24 15:24:12 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 467.5 MB, free 1498.3 MB) 15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(4194304) called with curMem=490522842, maxMem=2061647216 15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.0 MB, free 1494.3 MB) 15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on HadoopV26Master:46919 (size: 4.0 MB, free: 1962.1 MB) 15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 *(omits hundreds of lines here)* 15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(4194304) called with curMem=968673498, maxMem=2061647216 15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece114 stored as bytes in memory (estimated size 4.0 MB, free 1038.3 MB) 15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece114 in memory on HadoopV26Master:46919 (size: 4.0 MB, free: 1506.1 MB) 15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece114 15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(2116571) called with curMem=972867802, maxMem=2061647216 15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece115 stored as bytes in memory (estimated size 2.0 MB, free 1036.3 MB) 15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece115 in memory on HadoopV26Master:46919 (size: 2.0 MB, free: 1504.1 MB) 15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece115 15/05/24 15:24:13 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/05/24 15:24:13 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapValues at Clustering.java:100) 15/05/24 15:24:13 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 4 tasks 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, HadoopV26Slave1, NODE_LOCAL, 1383 bytes) 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 1, HadoopV26Slave2, NODE_LOCAL, 1383 bytes) 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 2, HadoopV26Slave3, NODE_LOCAL, 1383 bytes) 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 3, HadoopV26Slave4, NODE_LOCAL, 1383 bytes) 15/05/24 15:24:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece68 in memory on HadoopV26Slave4:44317 (size: 4.0 MB, free: 2.1 GB) 15/05/24 15:24:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece102 in memory on HadoopV26Slave2:44597 (size: 4.0 MB, free: 2.1 GB) *(omits hundreds of lines here)* 15/05/24 15:24:23 INFO storage.BlockManagerInfo: Added broadcast_1_piece35 in memory on HadoopV26Slave1:44960 (size: 4.0 MB, free: 1666.0 MB) 15/05/24 15:24:24 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on HadoopV26Slave3:38931 (size: 26.6 KB, free: 1658.0 MB) 15/05/24 15:24:24 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on HadoopV26Slave1:44960 (size: 26.6 KB, free: 1658.0 MB) *Then the program stuck at here for many mins* At this time, I check the web UI master:4040 and see the second map is still running: <http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/jobstage.png> And then I click into mapValue I saw this: <http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapValue.png> But why that is so slow? And how's that related to "saveAsTextFile". BTW, in the mapValue stage, I saw input size/records increased very slowly, does it mean read very slow. MapValue is the second stage. And why it didn't show the reduce stage? *Then after about 20 mins, I get the following logs and can get the result:* 15/05/24 16:12:22 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 0.0 (TID 2) in 2888967 ms on HadoopV26Slave3 (1/4) 15/05/24 16:24:27 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 3) in 3614079 ms on HadoopV26Slave4 (2/4) 15/05/24 16:25:53 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 1) in 3700117 ms on HadoopV26Slave2 (3/4) 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Stage 0 (mapValues at Clustering.java:100) finished in 3801.290 s 15/05/24 16:27:35 INFO scheduler.DAGScheduler: looking for newly runnable stages 15/05/24 16:27:35 INFO scheduler.DAGScheduler: running: Set() 15/05/24 16:27:35 INFO scheduler.DAGScheduler: waiting: Set(Stage 1) 15/05/24 16:27:35 INFO scheduler.DAGScheduler: failed: Set() 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Missing parents for Stage 1: List() 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3801284 ms on HadoopV26Slave1 (4/4) 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at Clustering.java:117), which is now runnable 15/05/24 16:27:35 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/05/24 16:27:35 INFO storage.MemoryStore: ensureFreeSpace(128128) called with curMem=974984373, maxMem=2061647216 15/05/24 16:27:35 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 125.1 KB, free 1036.2 MB) 15/05/24 16:27:35 INFO storage.MemoryStore: ensureFreeSpace(58374) called with curMem=975112501, maxMem=2061647216 15/05/24 16:27:35 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 57.0 KB, free 1036.1 MB) 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Master:46919 (size: 57.0 KB, free: 1504.0 MB) 15/05/24 16:27:35 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0 15/05/24 16:27:35 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at Clustering.java:117) 15/05/24 16:27:35 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 4 tasks 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, HadoopV26Slave5, PROCESS_LOCAL, 1114 bytes) 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 5, HadoopV26Slave3, PROCESS_LOCAL, 1114 bytes) 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 6, HadoopV26Slave2, PROCESS_LOCAL, 1114 bytes) 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 7, HadoopV26Slave6, PROCESS_LOCAL, 1114 bytes) 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave2:44597 (size: 57.0 KB, free: 1657.9 MB) 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave3:38931 (size: 57.0 KB, free: 1657.9 MB) 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@HadoopV26Slave2:33439 15/05/24 16:27:35 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 188 bytes 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@HadoopV26Slave3:52152 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave6:52445 (size: 57.0 KB, free: 2.1 GB) 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave5:45835 (size: 57.0 KB, free: 2.1 GB) 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@HadoopV26Slave6:37211 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@HadoopV26Slave5:43757 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added rdd_4_1 in memory on HadoopV26Slave3:38931 (size: 1733.1 KB, free: 1656.2 MB) 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_2 in memory on HadoopV26Slave2:44597 (size: 1733.1 KB, free: 1656.2 MB) 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_3 in memory on HadoopV26Slave6:52445 (size: 1717.4 KB, free: 2.1 GB) 15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 5) in 1757 ms on HadoopV26Slave3 (1/4) 15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 6) in 1776 ms on HadoopV26Slave2 (2/4) 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_0 in memory on HadoopV26Slave5:45835 (size: 1733.1 KB, free: 2.1 GB) 15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 1.0 (TID 7) in 3153 ms on HadoopV26Slave6 (3/4) 15/05/24 16:27:38 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at Clustering.java:117) finished in 3.258 s 15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 4) in 3256 ms on HadoopV26Slave5 (4/4) 15/05/24 16:27:38 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/05/24 16:27:38 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at Clustering.java:117, took 3807.229501 s although I can obtain the result ,but it's too slow, right? The followings are also the final result info. <http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/result.png> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapvalueres.png> PS: if I reduce the size the input to just 10 records, it performs very fast. But it doesn't make any sense for just 10 records. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-dramatically-slow-when-I-add-saveAsTextFile-tp23003.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org