I am working on a scala code which performs Linear Regression on certain datasets. Right now I am using 20 cores and 25 executors and everytime I run a Spark job I get a different result.
The input size of the files are 2GB and 400 MB.However, when I run the job with 20 cores and 1 executor, I get consistent results. Has anyone experienced such a thing so far? Please find the code below: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SchemaRDD import org.apache.spark.Partitioner import org.apache.spark.storage.StorageLevel object TextProcess{ def main(args: Array[String]){ val conf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val numExecutors=(conf.get("spark.executor.instances").toInt) // Read the 2 input files // First file is either cases / controls val input1 = sc.textFile(args(0)) // Second file is Gene Expression val input2 = sc.textFile(args(1)) //collecting header information val header1=sc.parallelize(input1.take(1)) val header2=sc.parallelize(input2.take(1)) //mapping data without the header information val map1 = input1.subtract(header1).map(x => (x.split(" ")(0)+x.split(" ")(1), x)) val map2 = input2.subtract(header2).map(x => (x.split(" ")(0)+x.split(" ")(1), x)) //joining data. here is where the order was getting affected. val joinedMap = map1.join(map2) //adding the header back to the top of RDD val x = header1.union(joinedMap.map{case(x,(y,z))=>y}) val y = header2.union(joinedMap.map{case(x,(y,z))=>z}) //removing irrelevant columns val rddX = x.map(x=>x.split(" ").drop(3)).zipWithIndex.map{case(a,b)=> a.map(x=>b.toString+" "+x.toString)} val rddY = y.map(x=>x.split(" ").drop(2)).zipWithIndex.map{case(a,b)=> a.map(x=>b.toString+" "+x.toString)} //transposing and cross joining data. This keeps the identifier at the start val transposedX = rddX.flatMap(x => x.zipWithIndex.map(x=>x.swap)).reduceByKey((a,b)=> a+":"+b).map{case(a,b)=>b.split(":").sorted} val transposedY = rddY.flatMap(x => x.zipWithIndex.map(x=>x.swap)).reduceByKey((a,b)=> a+":"+b).map{case(a,b)=>b.split(":").sorted}.persist(StorageLevel.apply(false, true, false, false, numExecutors)) val cleanedX = transposedX.map(x=>x.map(x=>x.slice(x.indexOfSlice(" ")+1,x.length))) val cleanedY = transposedY.map(x=>x.map(x=>x.slice(x.indexOfSlice(" ")+1,x.length))).persist(StorageLevel.apply(false, true, false, false, numExecutors)) val cartXY = cleanedX.cartesian(cleanedY) val finalDataSet= cartXY.map{case(a,b)=>a zip b} //convert to key value pair val regressiondataset = finalDataSet.map(x=>(x(0),x.drop(1).filter{case(a,b)=> a!="NA" && b!="NA" && a!="null" && b!="null"}.map{case(a,b)=> (a.toDouble, b.toDouble)})) val linearOutput = regressiondataset.map(s => new LinearRegression(s._1 ,s._2).outputVal) linearOutput.saveAsTextFile(args(2)) cleanedY.unpersist() transposedY.unpersist() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-returns-a-different-result-on-each-run-tp23861.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org