Can you try commenting the saveAsTextFile and do a simple count()? If its a broadcast issue, then it would throw up the same error. On 21 May 2015 14:21, "allanjie" <allanmcgr...@gmail.com> wrote:
> Sure, the code is very simple. I think u guys can understand from the main > function. > > public class Test1 { > > public static double[][] createBroadcastPoints(String > localPointPath, int > row, int col) throws IOException{ > BufferedReader br = RAWF.reader(localPointPath); > String line = null; > int rowIndex = 0; > double[][] pointFeatures = new double[row][col]; > while((line = br.readLine())!=null){ > List<String> point = > Arrays.asList(line.split(",")); > int colIndex = 0; > for(String pointFeature: point){ > pointFeatures[rowIndex][colIndex] = > Double.valueOf(pointFeature); > colIndex++; > } > rowIndex++; > } > br.close(); > return pointFeatures; > } > > > > public static void main(String[] args) throws IOException{ > /**Parameter Setting***********/ > String localPointPath = > "/home/hduser/skyrock/skyrockImageFeatures.csv"; > String remoteFilePath = > "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv"; > //this csv file is only 468MB > final int row = 133433; > final int col = 458; > /******************/ > > SparkConf conf = new SparkConf(). > setAppName("distance"). > setMaster("spark://HadoopV26Master:7077"). > set("spark.executor.memory", "4g"). > set("spark.eventLog.enabled", "true") > .set("spark.eventLog.dir", > "/usr/local/spark/logs/spark-events") > .set("spark.local.dir", "/tmp/spark-temp"); > JavaSparkContext sc = new JavaSparkContext(conf); > > JavaRDD<String> textFile = sc.textFile(remoteFilePath); > //Broadcast variable > //double[][] xx =; > > final Broadcast<double[][]> broadcastPoints = > sc.broadcast(createBroadcastPoints(localPointPath,row,col)); > //final Broadcast<double[][]> broadcastPoints = > sc.broadcast(xx); > > /** > * Compute the distance in terms of each point on each > instance. > * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 > */ > JavaPairRDD<Pair,Double> distance = > textFile.flatMapToPair(new > PairFlatMapFunction<String, Pair, Double>(){ > public Iterable<Tuple2<Pair, Double>> > call(String v1) throws > Exception{ > List<String> al = > Arrays.asList(v1.split(",")); > double[] featureVals = new > double[al.size()]; > for(int j=0;j<al.size()-1;j++) > featureVals[j] = > Double.valueOf(al.get(j+1)); > int jIndex = Integer.valueOf(al.get(0)); > double[][] allPoints = > broadcastPoints.getValue(); > double sum = 0; > List<Tuple2<Pair, Double>> list = > new ArrayList<Tuple2<Pair, > Double>>(); > for(int i=0;i<row; i++){ > sum = 0; > for(int j=0;j<al.size()-1;j++){ > sum += > (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); > } > list.add(new > Tuple2<Pair,Double>(new > Pair(i,jIndex),Math.sqrt(sum))); > } > return list; > } > }); > > > > distance.saveAsTextFile("hdfs://HadoopV26Master:9000/user/"+args[0]); > } > } > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >