I am right trying to run some shell script in my spark app, hoping it runs more 
concurrently in my spark cluster.However I am not sure whether my codes will 
run concurrently in my executors.Dive into my code, you can see that I am 
trying to 
1.splite both db and sample into 21 small files. That will generate total 42 
files. By spliting db I will get "chr1" ,"chr2",..."chr21", and spliting sample 
I will get "samplechr1","samplechr2",..."samplechr21".2.merge those splited 
files from hdfs and save to local path.Those merged file will be "chr1.txt" 
,"chr2.txt",..."chr21.txt" and 
"samplechr1.txt","samplechr2.txt",..."samplechr21.txt"3.run modify.sh to clean 
the data, that means to delete some charactors not useful.
4.run shellcompare.sh to compare chr1.txt and samplechr1.txt, get a 
result1.txt. And looping it from 1 to 21 so that those 42 file are compared and 
I can get 21 files like result1.txt,result2.txt...result21.txt.
Sorry for not adding some comments for my code.


--------------------------------

 

Thanks&Best regards!
San.Luo

----- 原始邮件 -----
发件人:Akhil Das <ak...@sigmoidanalytics.com>
收件人:罗辉 <luohui20...@sina.com>
抄送人:user <user@spark.apache.org>
主题:Re: Re: Re: how to distributed run a bash shell in spark
日期:2015年05月25日 22点41分

Can you can tell us what exactly you are trying to achieve?ThanksBest Regards

On Mon, May 25, 2015 at 5:00 PM,  <luohui20...@sina.com> wrote:
thanks, madhu and Akhil
I modified my code like below,however I think it is not so distributed. Have 
you guys better idea to run this app more efficiantly and distributed?
So I add some comments with my understanding:
import org.apache.spark._
import www.celloud.com.model._

object GeneCompare3 {
  def main(args: Array[String]) {
    val conf = new 
SparkConf().setAppName("GeneCompare").setMaster("spark://master:7077").set("spark.executor.memory",
 "6g").set("hive.metastore.warehouse.dir", "/user/hive/warehouse")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val db = sc.textFile("/data/db.txt").map(_.split("\t")).map(x => 
Database(x(0),
      x(1).trim().toInt,
      x(2).trim().toInt,
      x(3).trim().toInt,
      x(4).trim().toInt)).toDF()
    val sample = sc.textFile("/data/sample.txt").map(_.split("\t")).map(x => 
Sample(x(0),
      x(1).trim().toInt,
      x(2),
      x(3).trim().toInt,
      x(4).trim().toInt,
      x(5))).toDF()
      
      //using Akhil Das's idea,1to21 is a file with 21lines,each line is a 
single number
//    val test = sc.textfile("1to21.txt").foreach{i =>                          
             //running on driver manager
//      db.filter("chrname = 'chr" + i + "'").rdd.saveAsTextFile("/data/chr" + 
i)                 //running on driver executor
//      db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + " 
/opt/data/shellcompare/chr" + i + ".txt")
//      sample.filter("name = 'chr" + i + 
"'").rdd.saveAsTextFile("/data/samplechr" + i)
//      db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + " 
/opt/data/shellcompare/samplechr" + i + ".txt")   //running on driver executor
//    }.collect()              //running on driver manager

      //using madhu's method      //running on driver manager    for (i <- 1 to 
21) {
      db.filter("chrname = 'chr" + i + "'").rdd.saveAsTextFile("/data/chr" + i)
      db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + " 
/opt/data/shellcompare/chr" + i + ".txt").collect()
      sample.filter("name = 'chr" + i + 
"'").rdd.saveAsTextFile("/data/samplechr" + i)
      db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + " 
/opt/data/shellcompare/samplechr" + i + ".txt").collect()
    }
    
    //running on driver manager
    val runmodifyshell=List("run","modify.sh")
    val runmodifyshellRDD = sc.makeRDD(runmodifyshell)
    val pipeModify = runmodifyshellRDD.pipe("sh 
/opt/data/shellcompare/modify.sh")
    pipeModify.collect()

    //running on driver manager
    val shellcompare = List("run","sort.sh")
    val shellcompareRDD = sc.makeRDD(shellcompare)
    val result = List("aggregate","result")
    val resultRDD = sc.makeRDD(result)
    for(j <- 1 to 21){
      shellcompareRDD.pipe("sh /opt/sh/bin/sort.sh /opt/data/shellcompare/chr" 
+ j + ".txt /opt/data/shellcompare/samplechr" + j + ".txt 
/opt/data/shellcompare/result" + j + ".txt 600").collect()
      if (j > 1) resultRDD.pipe("cat result" + j + ".txt >> 
result1.txt").collect()
    }
  }
}

And I know there are some problems in my code, such as "modify.sh" will not be 
executed.
--------------------------------

 

Thanks&amp;Best regards!
San.Luo

----- 原始邮件 -----
发件人:madhu phatak <phatak....@gmail.com>
收件人:luohui20...@sina.com
抄送人:Akhil Das <ak...@sigmoidanalytics.com>, user <user@spark.apache.org>
主题:Re: Re: how to distributed run a bash shell in spark
日期:2015年05月25日 14点11分

Hi,You can use pipe operator, if you are running shell script/perl script on 
some data. More information on my blog.


Regards,
Madhukara Phatak
http://datamantra.io/

On Mon, May 25, 2015 at 8:02 AM,  <luohui20...@sina.com> wrote:
Thanks Akhil, 
           your code is a big help to me,'cause perl script is the exactly 
thing i wanna try to run in spark. I will have a try.


--------------------------------

 

Thanks&amp;Best regards!
San.Luo

----- 原始邮件 -----
发件人:Akhil Das <ak...@sigmoidanalytics.com>
收件人:罗辉 <luohui20...@sina.com>
抄送人:user <user@spark.apache.org>
主题:Re: how to distributed run a bash shell in spark
日期:2015年05月25日 00点53分

You mean you want to execute some shell commands from spark? Here's something i 
tried a while back. https://github.com/akhld/spark-exploitThanksBest Regards

On Sun, May 24, 2015 at 4:53 PM,  <luohui20...@sina.com> wrote:
hello there      I am trying to run a app in which part of it needs to run a 
shell.how to run a shell distributed in spark cluster.thanks.

here's my code:import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class ShellCompare {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new 
SparkConf().setAppName("ShellCompare").setMaster("spark://master:7077").set("spark.executor.memory",
 "6g");
        JavaSparkContext sc = new JavaSparkContext(conf);

        for(int i=1;i<=21;i++){
            execShell(i);
        }
//        execShell(1);
        sc.stop();
    }

    private static void execShell(int i) {
        String shpath="/opt/sh/bin/sort.sh";
        Process process =null;
        
        String var="/opt/data/shellcompare/chr" + i +".txt 
/opt/data/shellcompare/samplechr" + i +".txt /opt/data/shellcompare/result.txt 
600";   
//        String var="/opt/data/chr1.txt /opt/data/chr1sample.txt 
/opt/sh/bin/result.txt 600";  
        String command2 = "sh " + shpath + " " + var;
        try {
            process = Runtime.getRuntime().exec(command2);
            process.waitFor();
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

--------------------------------

 

Thanks&amp;Best regards!
San.Luo









Reply via email to