I looked at Java's mechanism for creating temporary local files. I believe
they can be created, written to and passed to other programs on the system.
I wrote a proof of concept to send some Strings out and use the local
program cat to concatenate them and write the result to a local file .
Clearly there is a more complex program I want to target but is there
anything wrong with this approach


package com.lordjoe.comet;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Option;
import scala.Tuple2;

import java.io.*;
import java.util.*;

 * com.lordjoe.comet.SparkCatTest
 * Tests using Java temp files in a function call in
public class SparkCatTest {

    public static final int NUMBER_REPEATS = 10; // make

    public static List<String> buildItems(String text, int repeats) {
        List<String>ret = new ArrayList<>() ;
        for (int i = 0; i < repeats; i++) {
             ret.add(text + i);

        return ret;

    public static void main(String[] args) throws Exception {
         SparkConf sparkConf = new SparkConf().setAppName("CatWithFiles");

        Option<String> option = sparkConf.getOption("spark.master");
        if (!option.isDefined()) {   // use local over nothing

        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        List<String> start = buildItems("Start ",NUMBER_REPEATS ) ; //
make some data like   Start 9
        List<String> end = buildItems("End ",NUMBER_REPEATS ) ;    //
make some data like   End 9

        JavaRDD<String> startRdd = ctx.parallelize(start);
        JavaRDD<String> endRdd = ctx.parallelize(end);
        JavaPairRDD<String, String> cross =
startRdd.cartesian(endRdd); // make all pairs
         * dirty work is done here and used files to perform cat
        JavaRDD<String> map = cross.map(new Function<Tuple2<String,
String>, String>() {

            public String call(Tuple2<String, String> x) throws Exception {
                  File f1 = makeTempFile( );
                writeFile(f1, x._1);
                File f2 = makeTempFile( ) ;
                writeFile(f2, x._2);
                File f3 = makeTempFile( );
                boolean success = false;
                String ret = null;
                String f1path = f1.getAbsolutePath();
                String f2path = f2.getAbsolutePath();
                String f3Path = f3.getAbsolutePath();
                String command = "cat " + f1path + " "  + f2path + " >
"  + f3Path;
                    success =  executeCommandLine("cmd","/c",command);
                    success =  executeCommandLine("/bin/sh","-c",command);
                if(success) {
                    ret = readFile(f3);

                return ret;

        // note the list returned by collect is immutable so we need a copy
        List<String> collect = new ArrayList(map.collect());
        for (String s : collect) {

         * true if running on Windows - otherwise Linux assumed
         * @return
        public static synchronized boolean osIsWindows()
            String osName = System.getProperty("os.name").toLowerCase();
            return  (osName.indexOf("windows") != -1);

         * make a temporary file wiht a unique name and delete on exit
         * @return   non-null file
        public static File makeTempFile( ) throws IOException {
            String prefix = UUID.randomUUID().toString();  // unique name
            String suffix = ".txt";
            File tempFile2 = File.createTempFile(prefix, suffix);
            tempFile2.deleteOnExit();  // drop on shutdown
            return tempFile2;

    public static boolean executeCommandLine(String... args) throws
IOException, InterruptedException {
        ProcessBuilder p = new ProcessBuilder(args);
         Process process = p.start();
        int result = process.waitFor();
        int returnVal = process.exitValue();
        return returnVal == 0;

       * @name writeFile
     * @param FileName name of file to create
     * @param data     date to write
     * @function write the string data to the file Filename
    public static boolean writeFile(File f, String data)  throws IOException {
        PrintWriter out = new PrintWriter(new FileWriter(f));
        if (out != null) {
            return (true);
        return (false);
        // failure
       * @name readFile
     * @function write the string data to the file Filename
     * @param FileName name of file to read
     * @return contents of a text file
    public static String readFile(File f ) throws IOException{
        LineNumberReader rdr = new LineNumberReader(new FileReader(f));
        StringBuilder sb = new StringBuilder();
        String line = rdr.readLine();
        while(line != null) {
            line = rdr.readLine();
        return sb.toString();
        // failure


On Mon, Nov 12, 2018 at 9:20 AM Steve Lewis <lordjoe2...@gmail.com> wrote:

> I have been looking at Spark-Blast which calls Blast - a well known C++
> program in parallel -
> In my case I have tried to translate the C++ code to Java but am not
> getting the same results - it is convoluted -
> I have code that will call the program and read its results - the only
> real issue is the program wants local files -
> their use is convoluted with many seeks so replacement with streaming will
> not work -
> as long as my Java code can write to a local file for the duration of one
> call things can work -
> I considered in memory files as long as they can be passed to another
> program - I am willing to have OS specific code
> So my issue is I need to write 3 files - run a program and read one output
> file - then all files can be deleted -
> JNI calls will be hard - this is s program not a library and it is
> available for worker nodes
> On Sun, Nov 11, 2018 at 10:52 PM Jörn Franke <jornfra...@gmail.com> wrote:
>> Can you use JNI to call the c++ functionality directly from Java?
>> Or you wrap this into a MR step outside Spark and use Hadoop Streaming
>> (it allows you to use shell scripts as mapper and reducer)?
>> You can also write temporary files for each partition and execute the
>> software within a map step.
>> Generally you should not call external applications from Spark.
>> > Am 11.11.2018 um 23:13 schrieb Steve Lewis <lordjoe2...@gmail.com>:
>> >
>> > I have a problem where a critical step needs to be performed by  a
>> third party c++ application. I can send or install this program on the
>> worker nodes. I can construct  a function holding all the data this program
>> needs to process. The problem is that the program is designed to read and
>> write from the local file system. I can call the program from Java and read
>> its output as  a  local file - then deleting all temporary files but I
>> doubt that it is possible to get the program to read from hdfs or any
>> shared file system.
>> > My question is can a function running on a worker node create temporary
>> files and pass the names of these to a local process assuming everything is
>> cleaned up after the call?
>> >
>> > --
>> > Steven M. Lewis PhD
>> > 4221 105th Ave NE
>> > Kirkland, WA 98033
>> > 206-384-1340 (cell)
>> > Skype lordjoe_com
>> >
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com

Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Reply via email to