Hi Adrian,

Yes. I need to load all files and process it in parallel. Following code
doesn't seem working(Here I used map, even tried foreach) ,I just
downloading the files from HDFS to local system and printing the logs count
in each file. Its not throwing any Exceptions,but not working. Files are
not getting downloaded. I even didn't get that LOGGER print. Same code
works if I iterate the files, but its not Parallelized. How do I get my
code Parallelized and Working.

JavaRDD<String> files = sparkContext.parallelize(fileList);

files.map(new Function<String, Void>()
{
public static final long serialVersionUID = 1L;

@Override
public Void call(String hdfsPath) throws Exception
{
JavaPairRDD<IntWritable, BytesWritable> hdfsContent =
sc.sequenceFile(hdfsPath, IntWritable.class, BytesWritable.class);
JavaRDD<Message> logs = hdfsContent.map(new Function<Tuple2<IntWritable,
BytesWritable>, Message>()
{
public Message call(Tuple2<IntWritable, BytesWritable> tuple2) throws
Exception
{
BytesWritable value = tuple2._2();
BytesWritable tmp = new BytesWritable();
tmp.setCapacity(value.getLength());
tmp.set(value);
return (Message) getProtos(1, tmp.getBytes());
}
});

String path = "/home/sas/logsfilterapp/temp/" + hdfsPath.substring(26);

Thread.sleep(2000);
logs.saveAsObjectFile(path);

LOGGER.log(Level.INFO, "HDFS Path : {0} Count : {1}", new Object[] {
hdfsPath, logs.count() });
return null;
}
});



Note : In another scenario also i didn't get the logs which are present
inside map,filter closures. But logs outside these closures are getting
printed as usual. If i can't get the logger prints inside these closures
how do i debug them ?

Thanks
Vinoth Sankar

On Wed, Oct 28, 2015 at 8:29 PM Adrian Tanase <atan...@adobe.com> wrote:

> The first line is distributing your fileList variable in the cluster as a
> RDD, partitioned using the default partitioner settings (e.g. Number of
> cores in your cluster).
>
> Each of your workers would one or more slices of data (depending on how
> many cores each executor has) and the abstraction is called partition.
>
> What is your use case? If you want to load the files and continue
> processing in parallel, then a simple .map should work.
> If you want to execute arbitrary code based on the list of files that each
> executor received, then you need to use .foreach that will get executed for
> each of the entries, on the worker.
>
> -adrian
>
> From: Vinoth Sankar
> Date: Wednesday, October 28, 2015 at 2:49 PM
> To: "user@spark.apache.org"
> Subject: How do I parallize Spark Jobs at Executor Level.
>
> Hi,
>
> I'm reading and filtering large no of files using Spark. It's getting
> parallized at Spark Driver level only. How do i make it parallelize to
> Executor(Worker) Level. Refer the following sample. Is there any way to
> paralleling iterate the localIterator ?
>
> Note : I use Java 1.7 version
>
> JavaRDD<String> files = javaSparkContext.parallelize(fileList)
> Iterator<String> localIterator = files.toLocalIterator();
>
> Regards
> Vinoth Sankar
>

Reply via email to