Re: writing to local files on a worker

2018-11-15 Thread Steve Lewis
I looked at Java's mechanism for creating temporary local files. I believe
they can be created, written to and passed to other programs on the system.
I wrote a proof of concept to send some Strings out and use the local
program cat to concatenate them and write the result to a local file .
Clearly there is a more complex program I want to target but is there
anything wrong with this approach

==

package com.lordjoe.comet;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Option;
import scala.Tuple2;

import java.io.*;
import java.util.*;

/**
 * com.lordjoe.comet.SparkCatTest
 * Tests using Java temp files in a function call in
 */
public class SparkCatTest {

public static final int NUMBER_REPEATS = 10; // make
NUMBER_REPEATS * NUMBER_REPEATS paris

public static List buildItems(String text, int repeats) {
Listret = new ArrayList<>() ;
for (int i = 0; i < repeats; i++) {
 ret.add(text + i);

}
return ret;
}


public static void main(String[] args) throws Exception {
 SparkConf sparkConf = new SparkConf().setAppName("CatWithFiles");

Option option = sparkConf.getOption("spark.master");
if (!option.isDefined()) {   // use local over nothing
sparkConf.setMaster("local[*]");
}

JavaSparkContext ctx = new JavaSparkContext(sparkConf);
List start = buildItems("Start ",NUMBER_REPEATS ) ; //
make some data like   Start 9
List end = buildItems("End ",NUMBER_REPEATS ) ;//
make some data like   End 9

JavaRDD startRdd = ctx.parallelize(start);
JavaRDD endRdd = ctx.parallelize(end);
JavaPairRDD cross =
startRdd.cartesian(endRdd); // make all pairs
 /**
 * dirty work is done here and used files to perform cat
 */
JavaRDD map = cross.map(new Function, String>() {

@Override
public String call(Tuple2 x) throws Exception {
  File f1 = makeTempFile( );
writeFile(f1, x._1);
File f2 = makeTempFile( ) ;
writeFile(f2, x._2);
File f3 = makeTempFile( );
boolean success = false;
String ret = null;
String f1path = f1.getAbsolutePath();
String f2path = f2.getAbsolutePath();
String f3Path = f3.getAbsolutePath();
String command = "cat " + f1path + " "  + f2path + " >
"  + f3Path;
if(osIsWindows())
success =  executeCommandLine("cmd","/c",command);
else
success =  executeCommandLine("/bin/sh","-c",command);
if(success) {
ret = readFile(f3);
}

f1.delete();
f2.delete();
f3.delete();
return ret;
}
});

// note the list returned by collect is immutable so we need a copy
List collect = new ArrayList(map.collect());
Collections.sort(collect);
for (String s : collect) {
System.out.println(s);
}
 }


/**
 * true if running on Windows - otherwise Linux assumed
 * @return
 */
public static synchronized boolean osIsWindows()
{
String osName = System.getProperty("os.name").toLowerCase();
return  (osName.indexOf("windows") != -1);
}

/**
 * make a temporary file wiht a unique name and delete on exit
 * @return   non-null file
 */
public static File makeTempFile( ) throws IOException {
String prefix = UUID.randomUUID().toString();  // unique name
String suffix = ".txt";
File tempFile2 = File.createTempFile(prefix, suffix);
tempFile2.deleteOnExit();  // drop on shutdown
return tempFile2;
}



public static boolean executeCommandLine(String... args) throws
IOException, InterruptedException {
ProcessBuilder p = new ProcessBuilder(args);
 Process process = p.start();
int result = process.waitFor();
int returnVal = process.exitValue();
return returnVal == 0;
}


/**
   * @name writeFile
 * @param FileName name of file to create
 * @param data date to write
 * @function write the string data to the file Filename
 */
public static boolean writeFile(File f, String data)  throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(f));
if (out != null) {
out.print(data);
out.close();
return (true);
}
return (false);
// failure
}
/**
 

Re: writing to local files on a worker

2018-11-12 Thread Steve Lewis
I have been looking at Spark-Blast which calls Blast - a well known C++
program in parallel -
In my case I have tried to translate the C++ code to Java but am not
getting the same results - it is convoluted -
I have code that will call the program and read its results - the only real
issue is the program wants local files -
their use is convoluted with many seeks so replacement with streaming will
not work -
as long as my Java code can write to a local file for the duration of one
call things can work -

I considered in memory files as long as they can be passed to another
program - I am willing to have OS specific code
So my issue is I need to write 3 files - run a program and read one output
file - then all files can be deleted -
JNI calls will be hard - this is s program not a library and it is
available for worker nodes

On Sun, Nov 11, 2018 at 10:52 PM Jörn Franke  wrote:

> Can you use JNI to call the c++ functionality directly from Java?
>
> Or you wrap this into a MR step outside Spark and use Hadoop Streaming (it
> allows you to use shell scripts as mapper and reducer)?
>
> You can also write temporary files for each partition and execute the
> software within a map step.
>
> Generally you should not call external applications from Spark.
>
> > Am 11.11.2018 um 23:13 schrieb Steve Lewis :
> >
> > I have a problem where a critical step needs to be performed by  a third
> party c++ application. I can send or install this program on the worker
> nodes. I can construct  a function holding all the data this program needs
> to process. The problem is that the program is designed to read and write
> from the local file system. I can call the program from Java and read its
> output as  a  local file - then deleting all temporary files but I doubt
> that it is possible to get the program to read from hdfs or any shared file
> system.
> > My question is can a function running on a worker node create temporary
> files and pass the names of these to a local process assuming everything is
> cleaned up after the call?
> >
> > --
> > Steven M. Lewis PhD
> > 4221 105th Ave NE
> > Kirkland, WA 98033
> > 206-384-1340 (cell)
> > Skype lordjoe_com
> >
>


-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com


Re: writing to local files on a worker

2018-11-11 Thread Jörn Franke
Can you use JNI to call the c++ functionality directly from Java? 

Or you wrap this into a MR step outside Spark and use Hadoop Streaming (it 
allows you to use shell scripts as mapper and reducer)?

You can also write temporary files for each partition and execute the software 
within a map step.

Generally you should not call external applications from Spark.

> Am 11.11.2018 um 23:13 schrieb Steve Lewis :
> 
> I have a problem where a critical step needs to be performed by  a third 
> party c++ application. I can send or install this program on the worker 
> nodes. I can construct  a function holding all the data this program needs to 
> process. The problem is that the program is designed to read and write from 
> the local file system. I can call the program from Java and read its output 
> as  a  local file - then deleting all temporary files but I doubt that it is 
> possible to get the program to read from hdfs or any shared file system. 
> My question is can a function running on a worker node create temporary files 
> and pass the names of these to a local process assuming everything is cleaned 
> up after the call?
> 
> -- 
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: writing to local files on a worker

2018-11-11 Thread Joe

Hello,
You could try using mapPartitions function if you can send partial data 
to your C++ program:


mapPartitions(func):
Similar to map, but runs separately on each partition (block) of the 
RDD, so /func/ must be of type Iterator => Iterator when running 
on an RDD of type T.


That way you can write partition data to temp file, call your C++ app, 
then delete the temp file. Of course your data would be limited to all 
rows in one partition.


Also the latest release of Spark (2.4.0) introduced barrier execution mode:
https://issues.apache.org/jira/browse/SPARK-24374

Maybe you could combine the two, just using mapPartitions will give you 
single partition data only, and your app call will be repeated on all 
nodes, not necessarily at the same time.


Spark's strong point is parallel execution, so what you're trying to do 
kind of defeats that.
But if you do not need to combine all the data before calling your app 
then you could do it.

Or you could split your job into Spark -> app -> Spark chain.
Good luck,

Joe



On 11/11/2018 02:13 PM, Steve Lewis wrote:
I have a problem where a critical step needs to be performed by  a 
third party c++ application. I can send or install this program on the 
worker nodes. I can construct  a function holding all the data this 
program needs to process. The problem is that the program is designed 
to read and write from the local file system. I can call the program 
from Java and read its output as  a  local file - then deleting all 
temporary files but I doubt that it is possible to get the program to 
read from hdfs or any shared file system.
My question is can a function running on a worker node create 
temporary files and pass the names of these to a local process 
assuming everything is cleaned up after the call?


--
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org