I am right trying to run some shell script in my spark app, hoping it runs more concurrently in my spark cluster.However I am not sure whether my codes will run concurrently in my executors.Dive into my code, you can see that I am trying to 1.splite both db and sample into 21 small files. That will generate total 42 files. By spliting db I will get "chr1" ,"chr2",..."chr21", and spliting sample I will get "samplechr1","samplechr2",..."samplechr21".2.merge those splited files from hdfs and save to local path.Those merged file will be "chr1.txt" ,"chr2.txt",..."chr21.txt" and "samplechr1.txt","samplechr2.txt",..."samplechr21.txt"3.run modify.sh to clean the data, that means to delete some charactors not useful. 4.run shellcompare.sh to compare chr1.txt and samplechr1.txt, get a result1.txt. And looping it from 1 to 21 so that those 42 file are compared and I can get 21 files like result1.txt,result2.txt...result21.txt. Sorry for not adding some comments for my code.
-------------------------------- Thanks&Best regards! San.Luo ----- 原始邮件 ----- 发件人:Akhil Das <ak...@sigmoidanalytics.com> 收件人:罗辉 <luohui20...@sina.com> 抄送人:user <user@spark.apache.org> 主题:Re: Re: Re: how to distributed run a bash shell in spark 日期:2015年05月25日 22点41分 Can you can tell us what exactly you are trying to achieve?ThanksBest Regards On Mon, May 25, 2015 at 5:00 PM, <luohui20...@sina.com> wrote: thanks, madhu and Akhil I modified my code like below,however I think it is not so distributed. Have you guys better idea to run this app more efficiantly and distributed? So I add some comments with my understanding: import org.apache.spark._ import www.celloud.com.model._ object GeneCompare3 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("GeneCompare").setMaster("spark://master:7077").set("spark.executor.memory", "6g").set("hive.metastore.warehouse.dir", "/user/hive/warehouse") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val db = sc.textFile("/data/db.txt").map(_.split("\t")).map(x => Database(x(0), x(1).trim().toInt, x(2).trim().toInt, x(3).trim().toInt, x(4).trim().toInt)).toDF() val sample = sc.textFile("/data/sample.txt").map(_.split("\t")).map(x => Sample(x(0), x(1).trim().toInt, x(2), x(3).trim().toInt, x(4).trim().toInt, x(5))).toDF() //using Akhil Das's idea,1to21 is a file with 21lines,each line is a single number // val test = sc.textfile("1to21.txt").foreach{i => //running on driver manager // db.filter("chrname = 'chr" + i + "'").rdd.saveAsTextFile("/data/chr" + i) //running on driver executor // db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + " /opt/data/shellcompare/chr" + i + ".txt") // sample.filter("name = 'chr" + i + "'").rdd.saveAsTextFile("/data/samplechr" + i) // db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + " /opt/data/shellcompare/samplechr" + i + ".txt") //running on driver executor // }.collect() //running on driver manager //using madhu's method //running on driver manager for (i <- 1 to 21) { db.filter("chrname = 'chr" + i + "'").rdd.saveAsTextFile("/data/chr" + i) db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + " /opt/data/shellcompare/chr" + i + ".txt").collect() sample.filter("name = 'chr" + i + "'").rdd.saveAsTextFile("/data/samplechr" + i) db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + " /opt/data/shellcompare/samplechr" + i + ".txt").collect() } //running on driver manager val runmodifyshell=List("run","modify.sh") val runmodifyshellRDD = sc.makeRDD(runmodifyshell) val pipeModify = runmodifyshellRDD.pipe("sh /opt/data/shellcompare/modify.sh") pipeModify.collect() //running on driver manager val shellcompare = List("run","sort.sh") val shellcompareRDD = sc.makeRDD(shellcompare) val result = List("aggregate","result") val resultRDD = sc.makeRDD(result) for(j <- 1 to 21){ shellcompareRDD.pipe("sh /opt/sh/bin/sort.sh /opt/data/shellcompare/chr" + j + ".txt /opt/data/shellcompare/samplechr" + j + ".txt /opt/data/shellcompare/result" + j + ".txt 600").collect() if (j > 1) resultRDD.pipe("cat result" + j + ".txt >> result1.txt").collect() } } } And I know there are some problems in my code, such as "modify.sh" will not be executed. -------------------------------- Thanks&Best regards! San.Luo ----- 原始邮件 ----- 发件人:madhu phatak <phatak....@gmail.com> 收件人:luohui20...@sina.com 抄送人:Akhil Das <ak...@sigmoidanalytics.com>, user <user@spark.apache.org> 主题:Re: Re: how to distributed run a bash shell in spark 日期:2015年05月25日 14点11分 Hi,You can use pipe operator, if you are running shell script/perl script on some data. More information on my blog. Regards, Madhukara Phatak http://datamantra.io/ On Mon, May 25, 2015 at 8:02 AM, <luohui20...@sina.com> wrote: Thanks Akhil, your code is a big help to me,'cause perl script is the exactly thing i wanna try to run in spark. I will have a try. -------------------------------- Thanks&Best regards! San.Luo ----- 原始邮件 ----- 发件人:Akhil Das <ak...@sigmoidanalytics.com> 收件人:罗辉 <luohui20...@sina.com> 抄送人:user <user@spark.apache.org> 主题:Re: how to distributed run a bash shell in spark 日期:2015年05月25日 00点53分 You mean you want to execute some shell commands from spark? Here's something i tried a while back. https://github.com/akhld/spark-exploitThanksBest Regards On Sun, May 24, 2015 at 4:53 PM, <luohui20...@sina.com> wrote: hello there I am trying to run a app in which part of it needs to run a shell.how to run a shell distributed in spark cluster.thanks. here's my code:import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class ShellCompare { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf conf = new SparkConf().setAppName("ShellCompare").setMaster("spark://master:7077").set("spark.executor.memory", "6g"); JavaSparkContext sc = new JavaSparkContext(conf); for(int i=1;i<=21;i++){ execShell(i); } // execShell(1); sc.stop(); } private static void execShell(int i) { String shpath="/opt/sh/bin/sort.sh"; Process process =null; String var="/opt/data/shellcompare/chr" + i +".txt /opt/data/shellcompare/samplechr" + i +".txt /opt/data/shellcompare/result.txt 600"; // String var="/opt/data/chr1.txt /opt/data/chr1sample.txt /opt/sh/bin/result.txt 600"; String command2 = "sh " + shpath + " " + var; try { process = Runtime.getRuntime().exec(command2); process.waitFor(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } -------------------------------- Thanks&Best regards! San.Luo