Why my Spark job is slow and it throws OOM which leads YARN killing executors?

2015-09-12 Thread unk1102
Hi I have the following Spark driver program/job which reads ORC files (i.e.
hive partitions as HDFS directories) process them in DataFrame and use them
as table in hiveContext.sql(). Job runs fine it gives correct results but it
hits physical memory limit after one hour or so and YARN kills executor and
things gets slower and slower. Please see the following code and help me
identify problem. I created 20 Threads from driver program and spawn them.
Thread logic contains lambda function which gets executed on executors.
Please guide I am new to Spark. Thanks much.

  public class DataSpark {

public static final Map<String,String> dMap = new LinkedHashMap<>();

public static final String[] colNameArr = new String[]
{"_col0","col2","bla bla 45 columns"};

public static void main(String[] args) throws Exception {


Set workers = new HashSet<>();

SparkConf sparkConf = new SparkConf().setAppName("SparkTest");
setSparkConfProperties(sparkConf);
SparkContext sc = new SparkContext(sparkConf);
final FileSystem fs = FileSystem.get(sc.hadoopConfiguration());
HiveContext hiveContext = createHiveContext(sc);

declareHiveUDFs(hiveContext);

DateTimeFormatter df = DateTimeFormat.forPattern("MMdd");
String yestday = "20150912";
hiveContext.sql(" use xyz ");
createTables(hiveContext);
DataFrame partitionFrame = hiveContext.sql(" show partitions
data partition(date=\""+ yestday + "\")");

//add csv files to distributed cache
Row[] rowArr = partitionFrame.collect();
for(Row row : rowArr) {
String[] splitArr = row.getString(0).split("/");
String entity = splitArr[0].split("=")[1];
int date =  Integer.parseInt(splitArr[1].split("=")[1]);

String sourcePath =
"/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date;
Path spath = new Path(sourcePath);
if(fs.getContentSummary(spath).getFileCount() > 0) {
DataWorker worker = new DataWorker(hiveContext,entity,
date);
workers.add(worker);
}
}

ExecutorService executorService =
Executors.newFixedThreadPool(20);
executorService.invokeAll(workers);
executorService.shutdown();


sc.stop();
}

private static void setSparkConfProperties(SparkConf sparkConf) {
sparkConf.set("spark.rdd.compress","true");

sparkConf.set("spark.shuffle.consolidateFiles","true");
   
sparkConf.set("spark.executor.logs.rolling.maxRetainedFiles","90");
sparkConf.set("spark.executor.logs.rolling.strategy","time");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.shuffle.manager","tungsten-sort");

   sparkConf.set("spark.shuffle.memoryFraction","0.5");
   sparkConf.set("spark.storage.memoryFraction","0.2");

}

private static HiveContext createHiveContext(SparkContext sc) {
HiveContext hiveContext = new HiveContext(sc);
hiveContext.setConf("spark.sql.codgen","true");
hiveContext.setConf("spark.sql.unsafe.enabled","true");

hiveContext.setConf("spark.sql.shuffle.partitions","15");//need
to set this to avoid large no of small files by default spark creates 200
output part files
hiveContext.setConf("spark.sql.orc.filterPushdown","true");
return hiveContext;
}

private static void declareHiveUDFs(HiveContext hiveContext) {
hiveContext.sql("CREATE TEMPORARY FUNCTION UDF1 AS
'com.blab.blab.UDF1'");
hiveContext.sql("CREATE TEMPORARY FUNCTION UDF2 AS
'com.blab.blab.UDF2'");
}

private static void createTables(HiveContext hiveContext) {

hiveContext.sql(" create table if not exists abc blab bla );

 hiveContext.sql(" create table if not exists def blab bla );

}



private static void createBaseTableAfterProcessing(HiveContext
hiveContext,String entity,int date) {
String sourcePath =
"/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date;

DataFrame sourceFrame =
hiveContext.read().format("orc").load(sourcePath);

//rename fields from _col* to actual column names
  

[Question] ORC - EMRFS Problem

2015-09-12 Thread Cazen
Good Day!

I think there are some problems between ORC and AWS EMRFS.

When I was trying to read "upper 150M" ORC files from S3, ArrayOutOfIndex
Exception occured.

I'm sure that it's AWS side issue because there was no exception when trying
from HDFS or S3NativeFileSystem.

Parquet runs ordinarily but it's inconvenience(Almost our system runs based
on ORC)

Does anybody knows about this issue?

I've tried spark 1.4.1(EMR 4.0.0) and there are no 1.5 patch-note about this

Thank You

--
ca...@korea.com
cazen@samsung.com
http://www.Cazen.co.kr



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-ORC-EMRFS-Problem-tp24673.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What is the best way to migrate existing scikit-learn code to PySpark?

2015-09-12 Thread Jörn Franke
I fear you have to do the plumbing all yourself. This is the same for all
commercial and non-commercial libraries/analytics packages. It often also
depends on the functional requirements on how you distribute.

Le sam. 12 sept. 2015 à 20:18, Rex X  a écrit :

> Hi everyone,
>
> What is the best way to migrate existing scikit-learn code to PySpark
> cluster? Then we can bring together the full power of both scikit-learn and
> spark, to do scalable machine learning. (I know we have MLlib. But the
> existing code base is big, and some functions are not fully supported yet.)
>
> Currently I use multiprocessing module of Python to boost the speed. But
> this only works for one node, while the data set is small.
>
> For many real cases, we may need to deal with gigabytes or even terabytes
> of data, with thousands of raw categorical attributes, which can lead to
> millions of discrete features, using 1-of-k representation.
>
> For these cases, one solution is to use distributed memory. That's why I
> am considering spark. And spark support Python!
> With Pyspark, we can import scikit-learn.
>
> But the question is how to make the scikit-learn code, decisionTree
> classifier for example, running in distributed computing mode, to benefit
> the power of Spark?
>
>
> Best,
> Rex
>


RDD transformation and action running out of memory

2015-09-12 Thread Utkarsh Sengar
I am trying to run this, a basic mapToPair and then count() to trigger an
action.
4 executors are launched but I don't see any relevant logs on those
executors.

It looks like the the driver is pulling all the data and it runs out of
memory, the dataset is big, so it won't fit on 1 machine.

So what is the issue here? I am using spark in a wrong way in this example?

Configuration mongodbConfigInventoryDay = new Configuration();
mongodbConfigInventoryDay.set("mongo.job.input.format",
"com.mongodb.hadoop.MongoInputFormat");
mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" +
props.getProperty("mongo") + ":27017/A.MyColl");
JavaPairRDD MyColl = sc.newAPIHadoopRDD(
mongodbConfigInventoryDay,
MongoInputFormat.class,
Object.class,
BSONObject.class
);
JavaPairRDD myCollRdd = myColl.mapToPair(tuple2 -> {
ObjectMapper mapper = new ObjectMapper();
tuple2._2().removeField("_id");
MyColl day = mapper.readValue(tuple2._2().toMap().toString(),
MyColl.class);
return new Tuple2<>(Long.valueOf((String)
tuple2._2().get("MyCollId")), day);
});

myCollRdd.count();


Logs on the driver:
15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with
curMem=253374, maxMem=278019440
15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 117.8 KB, free 264.8 MB)
15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with
curMem=374038, maxMem=278019440
15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as
bytes in memory (estimated size 12.5 KB, free 264.8 MB)
15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB)
15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from
newAPIHadoopRDD at SparkRunner.java:192
15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to
check splits against mongodb://
dsc-dbs-001.qasql.opentable.com:27017/A.MyColl
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null,
max= { "_id" : "54e64d626d0bfe0a24ba79b3"}
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"}
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "54e64d646d0bfe0a24ba79e1"}, max= { "_id" : "5581d1c3d52db40bc8558c6b"}
..
..
15/09/12 21:08:22 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "55adf840d3b5be0724224807"}, max= { "_id" : "55adf841b4d2970fb07d7288"}
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at org.bson.io.PoolOutputBuffer.(PoolOutputBuffer.java:224)
at org.bson.BasicBSONDecoder.(BasicBSONDecoder.java:499)
at
com.mongodb.hadoop.input.MongoInputSplit.(MongoInputSplit.java:59)
at
com.mongodb.hadoop.splitter.MongoCollectionSplitter.createSplitFromBounds(MongoCollectionSplitter.java:248)
at
com.mongodb.hadoop.splitter.StandaloneMongoSplitter.calculateSplits(StandaloneMongoSplitter.java:157)
at
com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:58)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
at
org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:442)
at
org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:47)
at runner.SparkRunner.getInventoryDayRdd(SparkRunner.java:205)
at runner.SparkRunner.main(SparkRunner.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at 

Re: What is the best way to migrate existing scikit-learn code to PySpark?

2015-09-12 Thread Rex X
Jorn and Nick,

Thanks for answering.

Nick, the sparkit-learn project looks interesting. Thanks for mentioning it.


Rex


On Sat, Sep 12, 2015 at 12:05 PM, Nick Pentreath 
wrote:

> You might want to check out https://github.com/lensacom/sparkit-learn
> 
>
> Though it's true for random
> Forests / trees you will need to use MLlib
>
> —
> Sent from Mailbox 
>
>
> On Sat, Sep 12, 2015 at 9:00 PM, Jörn Franke  wrote:
>
>> I fear you have to do the plumbing all yourself. This is the same for all
>> commercial and non-commercial libraries/analytics packages. It often also
>> depends on the functional requirements on how you distribute.
>>
>> Le sam. 12 sept. 2015 à 20:18, Rex X  a écrit :
>>
>>> Hi everyone,
>>>
>>> What is the best way to migrate existing scikit-learn code to PySpark
>>> cluster? Then we can bring together the full power of both scikit-learn and
>>> spark, to do scalable machine learning. (I know we have MLlib. But the
>>> existing code base is big, and some functions are not fully supported yet.)
>>>
>>> Currently I use multiprocessing module of Python to boost the speed. But
>>> this only works for one node, while the data set is small.
>>>
>>> For many real cases, we may need to deal with gigabytes or even
>>> terabytes of data, with thousands of raw categorical attributes, which can
>>> lead to millions of discrete features, using 1-of-k representation.
>>>
>>> For these cases, one solution is to use distributed memory. That's why I
>>> am considering spark. And spark support Python!
>>> With Pyspark, we can import scikit-learn.
>>>
>>> But the question is how to make the scikit-learn code, decisionTree
>>> classifier for example, running in distributed computing mode, to benefit
>>> the power of Spark?
>>>
>>>
>>> Best,
>>> Rex
>>>
>>
>


What is the best way to migrate existing scikit-learn code to PySpark?

2015-09-12 Thread Rex X
Hi everyone,

What is the best way to migrate existing scikit-learn code to PySpark
cluster? Then we can bring together the full power of both scikit-learn and
spark, to do scalable machine learning. (I know we have MLlib. But the
existing code base is big, and some functions are not fully supported yet.)

Currently I use multiprocessing module of Python to boost the speed. But
this only works for one node, while the data set is small.

For many real cases, we may need to deal with gigabytes or even terabytes
of data, with thousands of raw categorical attributes, which can lead to
millions of discrete features, using 1-of-k representation.

For these cases, one solution is to use distributed memory. That's why I am
considering spark. And spark support Python!
With Pyspark, we can import scikit-learn.

But the question is how to make the scikit-learn code, decisionTree
classifier for example, running in distributed computing mode, to benefit
the power of Spark?


Best,
Rex


Re: Why my Spark job is slow and it throws OOM which leads YARN killing executors?

2015-09-12 Thread Richard W. Eggert II
Without a stack trace, I can't say for certain what is causing your 
OutOfMemoryError, but I do see a number of problems with your code.

First of all, given that Spark is a parallel processing framework, it is almost 
never necessary to manually create a thread pool within the driver. You should 
instead chain together some RDDs and let Spark parallelize the work on the 
cluster for you.

Secondly, unless I'm mistaken, SQLContext.sql (which HiveContext inherits) does 
not actually execute your SQL query. It just creates a DataFrame that 
represents the query. You have to invoke one of DataFrame's "action" methods 
(such as count, collect, foreach, or saveAsTextFile) to cause Spark to create a 
job to actually execute the query. The documentation is admittedly a bit vague 
and misleading about this, however.

Rich

On September 12, 2015, at 3:52 PM, unk1102 <umesh.ka...@gmail.com> wrote:

Hi I have the following Spark driver program/job which reads ORC files (i.e.
hive partitions as HDFS directories) process them in DataFrame and use them
as table in hiveContext.sql(). Job runs fine it gives correct results but it
hits physical memory limit after one hour or so and YARN kills executor and
things gets slower and slower. Please see the following code and help me
identify problem. I created 20 Threads from driver program and spawn them.
Thread logic contains lambda function which gets executed on executors.
Please guide I am new to Spark. Thanks much.

  public class DataSpark {

public static final Map<String,String> dMap = new LinkedHashMap<>();

public static final String[] colNameArr = new String[]
{"_col0","col2","bla bla 45 columns"};

public static void main(String[] args) throws Exception {


Set workers = new HashSet<>();

SparkConf sparkConf = new SparkConf().setAppName("SparkTest");
setSparkConfProperties(sparkConf);
SparkContext sc = new SparkContext(sparkConf);
final FileSystem fs = FileSystem.get(sc.hadoopConfiguration());
HiveContext hiveContext = createHiveContext(sc);

declareHiveUDFs(hiveContext);

DateTimeFormatter df = DateTimeFormat.forPattern("MMdd");
String yestday = "20150912";
hiveContext.sql(" use xyz ");
createTables(hiveContext);
DataFrame partitionFrame = hiveContext.sql(" show partitions
data partition(date=\""+ yestday + "\")");

//add csv files to distributed cache
Row[] rowArr = partitionFrame.collect();
for(Row row : rowArr) {
String[] splitArr = row.getString(0).split("/");
String entity = splitArr[0].split("=")[1];
int date =  Integer.parseInt(splitArr[1].split("=")[1]);

String sourcePath =
"/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date;
Path spath = new Path(sourcePath);
if(fs.getContentSummary(spath).getFileCount() > 0) {
DataWorker worker = new DataWorker(hiveContext,entity,
date);
workers.add(worker);
}
}

ExecutorService executorService =
Executors.newFixedThreadPool(20);
executorService.invokeAll(workers);
executorService.shutdown();


sc.stop();
}

private static void setSparkConfProperties(SparkConf sparkConf) {
sparkConf.set("spark.rdd.compress","true");

sparkConf.set("spark.shuffle.consolidateFiles","true");
   
sparkConf.set("spark.executor.logs.rolling.maxRetainedFiles","90");
sparkConf.set("spark.executor.logs.rolling.strategy","time");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.shuffle.manager","tungsten-sort");

   sparkConf.set("spark.shuffle.memoryFraction","0.5");
   sparkConf.set("spark.storage.memoryFraction","0.2");

}

private static HiveContext createHiveContext(SparkContext sc) {
HiveContext hiveContext = new HiveContext(sc);
hiveContext.setConf("spark.sql.codgen","true");
hiveContext.setConf("spark.sql.unsafe.enabled","true");

hiveContext.setConf("spark.sql.shuffle.partitions","15");//need
to set this to avoid large no of small files by default spark creates 200
output part files
hiveContext.setConf("spark.sql.orc.filterPushdown","true");
  

Re: RDD transformation and action running out of memory

2015-09-12 Thread Richard Eggert
Hmm... The count() method invokes this:

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] =
{
   runJob(rdd, func, 0 until rdd.partitions.length)
}

It appears that you're running out of memory while trying to compute
(within the driver) the number of partitions that will be in the final
result. It seems as if Mongo is computing so many splits that you're
running out of memory.

Looking at your log messages, I see this:
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"}

0x54e64d646d0bfe0a24ba79e1 - 0x54e64d626d0bfe0a24ba79b3 =
0x2002e) = 36893488147419103278

The last split reported in the log has max 55adf841b4d2970fb07d7288.

0x55adf841b4d2970fb07d7288 - 0x54e64d646d0bfe0a24ba79e1 =
0xc7aadd47c699058bc2f8a7 = 241383122307828806444054695

241383122307828806444054695/36893488147419103278 = 6,542,702 potential
splits, assuming they are evenly distributed. I'm not sure how big each
split object is, but it's plausible that the process of creating an array
of 6.5 million of them is causing you to run out of memory.

I think the reason you don't see anything in the executor logs is that the
exception is occurring before the work is tasked to the executors.


Rich



On Sat, Sep 12, 2015 at 5:18 PM, Utkarsh Sengar 
wrote:

> I am trying to run this, a basic mapToPair and then count() to trigger an
> action.
> 4 executors are launched but I don't see any relevant logs on those
> executors.
>
> It looks like the the driver is pulling all the data and it runs out of
> memory, the dataset is big, so it won't fit on 1 machine.
>
> So what is the issue here? I am using spark in a wrong way in this example?
>
> Configuration mongodbConfigInventoryDay = new Configuration();
> mongodbConfigInventoryDay.set("mongo.job.input.format",
> "com.mongodb.hadoop.MongoInputFormat");
> mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" +
> props.getProperty("mongo") + ":27017/A.MyColl");
> JavaPairRDD MyColl = sc.newAPIHadoopRDD(
> mongodbConfigInventoryDay,
> MongoInputFormat.class,
> Object.class,
> BSONObject.class
> );
> JavaPairRDD myCollRdd = myColl.mapToPair(tuple2 -> {
> ObjectMapper mapper = new ObjectMapper();
> tuple2._2().removeField("_id");
> MyColl day = mapper.readValue(tuple2._2().toMap().toString(),
> MyColl.class);
> return new Tuple2<>(Long.valueOf((String)
> tuple2._2().get("MyCollId")), day);
> });
>
> myCollRdd.count();
>
>
> Logs on the driver:
> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with
> curMem=253374, maxMem=278019440
> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in
> memory (estimated size 117.8 KB, free 264.8 MB)
> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with
> curMem=374038, maxMem=278019440
> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as
> bytes in memory (estimated size 12.5 KB, free 264.8 MB)
> 15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB)
> 15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from
> newAPIHadoopRDD at SparkRunner.java:192
> 15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to
> check splits against mongodb://
> dsc-dbs-001.qasql.opentable.com:27017/A.MyColl
> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null,
> max= { "_id" : "54e64d626d0bfe0a24ba79b3"}
> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
> : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"}
> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
> : "54e64d646d0bfe0a24ba79e1"}, max= { "_id" : "5581d1c3d52db40bc8558c6b"}
> ..
> ..
> 15/09/12 21:08:22 INFO MongoCollectionSplitter: Created split: min={ "_id"
> : "55adf840d3b5be0724224807"}, max= { "_id" : "55adf841b4d2970fb07d7288"}
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
> at org.bson.io.PoolOutputBuffer.(PoolOutputBuffer.java:224)
> at org.bson.BasicBSONDecoder.(BasicBSONDecoder.java:499)
> at
> com.mongodb.hadoop.input.MongoInputSplit.(MongoInputSplit.java:59)
> at
> com.mongodb.hadoop.splitter.MongoCollectionSplitter.createSplitFromBounds(MongoCollectionSplitter.java:248)
> at
> com.mongodb.hadoop.splitter.StandaloneMongoSplitter.calculateSplits(StandaloneMongoSplitter.java:157)
> at
> com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:58)
> at
> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at 

UDAF and UDT with SparkSQL 1.5.0

2015-09-12 Thread jussipekkap
Hi,

Issue #1:
I'm using the new UDAF interface (UserDefinedAggregateFunction) at Spark
1.5.0 release. Is it possible to aggregate all values in the
MutableAggregationBuffer into an array in a robust manner? I'm creating an
aggregation function that collects values into an array from all input rows,
and then calculates the final result from the UDAF using the array/list as
input value. The issue I'm running into is that the values contained in the
MutableAggregationBuffer are immutable, which means I need to create a copy
of the array every time I append a new value. This of course makes it very
slow for any significant number of elements.

Issue #2:
I also tried the hive 'collect_list' UDAF, but as the input values are UDTs,
I'm getting scala.MatchError as a result. I suppose the hive UDAFs only work
with primitive parameters!?

-JP



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UDAF-and-UDT-with-SparkSQL-1-5-0-tp24670.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What is the best way to migrate existing scikit-learn code to PySpark?

2015-09-12 Thread Nick Pentreath
You might want to check out https://github.com/lensacom/sparkit-learn





Though it's true for random

Forests / trees you will need to use MLlib



—
Sent from Mailbox

On Sat, Sep 12, 2015 at 9:00 PM, Jörn Franke  wrote:

> I fear you have to do the plumbing all yourself. This is the same for all
> commercial and non-commercial libraries/analytics packages. It often also
> depends on the functional requirements on how you distribute.
> Le sam. 12 sept. 2015 à 20:18, Rex X  a écrit :
>> Hi everyone,
>>
>> What is the best way to migrate existing scikit-learn code to PySpark
>> cluster? Then we can bring together the full power of both scikit-learn and
>> spark, to do scalable machine learning. (I know we have MLlib. But the
>> existing code base is big, and some functions are not fully supported yet.)
>>
>> Currently I use multiprocessing module of Python to boost the speed. But
>> this only works for one node, while the data set is small.
>>
>> For many real cases, we may need to deal with gigabytes or even terabytes
>> of data, with thousands of raw categorical attributes, which can lead to
>> millions of discrete features, using 1-of-k representation.
>>
>> For these cases, one solution is to use distributed memory. That's why I
>> am considering spark. And spark support Python!
>> With Pyspark, we can import scikit-learn.
>>
>> But the question is how to make the scikit-learn code, decisionTree
>> classifier for example, running in distributed computing mode, to benefit
>> the power of Spark?
>>
>>
>> Best,
>> Rex
>>

Re: Why my Spark job is slow and it throws OOM which leads YARN killing executors?

2015-09-12 Thread Umesh Kacha
Hi Richard, thanks much for the reply. If I dont create threads job runs
too slow since I have thousand jobs or thousand hive partitions directory
to process. hiveContext.sql(...) runs fine and creates output as I expected
do I need to call any action method really? Job works fine as expected I am
just hitting physical memory limit and YARN kills executor I saw it in YARN
logs. I believe because of group by queries lots shuffle data moves around
and creates mess. Please guide.

On Sun, Sep 13, 2015 at 2:09 AM, Richard W. Eggert II <
richard.egg...@gmail.com> wrote:

> Without a stack trace, I can't say for certain what is causing your
> OutOfMemoryError, but I do see a number of problems with your code.
>
> First of all, given that Spark is a parallel processing framework, it is
> almost never necessary to manually create a thread pool within the driver.
> You should instead chain together some RDDs and let Spark parallelize the
> work on the cluster for you.
>
> Secondly, unless I'm mistaken, SQLContext.sql (which HiveContext inherits)
> does not actually execute your SQL query. It just creates a DataFrame that
> represents the query. You have to invoke one of DataFrame's "action"
> methods (such as count, collect, foreach, or saveAsTextFile) to cause Spark
> to create a job to actually execute the query. The documentation is
> admittedly a bit vague and misleading about this, however.
>
> Rich
>
> On September 12, 2015, at 3:52 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>
> Hi I have the following Spark driver program/job which reads ORC files
> (i.e.
> hive partitions as HDFS directories) process them in DataFrame and use them
> as table in hiveContext.sql(). Job runs fine it gives correct results but
> it
> hits physical memory limit after one hour or so and YARN kills executor and
> things gets slower and slower. Please see the following code and help me
> identify problem. I created 20 Threads from driver program and spawn them.
> Thread logic contains lambda function which gets executed on executors.
> Please guide I am new to Spark. Thanks much.
>
>   public class DataSpark {
>
> public static final Map<String,String> dMap = new
> LinkedHashMap<>();
>
> public static final String[] colNameArr = new String[]
> {"_col0","col2","bla bla 45 columns"};
>
> public static void main(String[] args) throws Exception {
>
>
> Set workers = new HashSet<>();
>
> SparkConf sparkConf = new SparkConf().setAppName("SparkTest");
> setSparkConfProperties(sparkConf);
> SparkContext sc = new SparkContext(sparkConf);
> final FileSystem fs = FileSystem.get(sc.hadoopConfiguration());
>     HiveContext hiveContext = createHiveContext(sc);
>
> declareHiveUDFs(hiveContext);
>
> DateTimeFormatter df = DateTimeFormat.forPattern("MMdd");
> String yestday = "20150912";
> hiveContext.sql(" use xyz ");
> createTables(hiveContext);
> DataFrame partitionFrame = hiveContext.sql(" show partitions
> data partition(date=\""+ yestday + "\")");
>
> //add csv files to distributed cache
> Row[] rowArr = partitionFrame.collect();
> for(Row row : rowArr) {
> String[] splitArr = row.getString(0).split("/");
> String entity = splitArr[0].split("=")[1];
> int date =  Integer.parseInt(splitArr[1].split("=")[1]);
>
> String sourcePath =
> "/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date;
> Path spath = new Path(sourcePath);
> if(fs.getContentSummary(spath).getFileCount() > 0) {
> DataWorker worker = new DataWorker(hiveContext,entity,
> date);
> workers.add(worker);
> }
> }
>
> ExecutorService executorService =
> Executors.newFixedThreadPool(20);
> executorService.invokeAll(workers);
> executorService.shutdown();
>
>
> sc.stop();
> }
>
> private static void setSparkConfProperties(SparkConf sparkConf) {
> sparkConf.set("spark.rdd.compress","true");
>
> sparkConf.set("spark.shuffle.consolidateFiles","true");
>
> sparkConf.set("spark.executor.logs.rolling.maxRetainedFiles","90");
> sparkConf.set("spark.executor.logs.rolling.strategy","time");
&

Spark Streaming..Exception

2015-09-12 Thread Priya Ch
Hello All,

 When I push messages into kafka and read into streaming application, I see
the following exception-
 I am running the application on YARN and no where broadcasting the message
within the application. Just simply reading message, parsing it and
populating fields in a class and then printing the dstream (using
DStream.print).

 Have no clue if this is cluster issue or spark version issue or node
issue. The strange part is, sometimes the message is processed but
sometimes I see the below exception -

java.io.IOException: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org

$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)


I would be glad if someone can throw some light on this.

Thanks,
Padma Ch


Re: Spark 1.5.0 java.lang.OutOfMemoryError: PermGen space

2015-09-12 Thread Jagat Singh
Sorry to answer your question fully.

The job starts tasks and few of them fail and some are successful. The
failed one have that PermGen error in logs.

But ultimately full job is marked fail and session quits.


On Sun, Sep 13, 2015 at 10:48 AM, Jagat Singh  wrote:

> Hi Davies,
>
> This was first query on new version.
>
> The one which ran successfully was Spark Pi example
>
> ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
>
> --master yarn-client \
>
> --num-executors 3 \
>
> --driver-memory 4g \
>
> --executor-memory 2g \
>
> --executor-cores 1 \
>
> --queue default \
>
> lib/spark-examples*.jar \
>
> 10
>
> Then i tried using spark-shell , which was started without any extra
> memory Grabage collection or Permgen configurations
>
> ./bin/spark-shell --num-executors 2 --executor-memory 512m --master
> yarn-client
>
> val t1= sqlContext.sql("select count(*) from table")
>
> t1.show
>
> This one fails with PermGen
>
> I will try on Monday the solution suggested about passing extra PermGen to
> driver.
>
> Thanks,
>
> On Sat, Sep 12, 2015 at 2:57 AM, Davies Liu  wrote:
>
>> Did this happen immediately after you start the cluster or after ran
>> some queries?
>>
>> Is this in local mode or cluster mode?
>>
>> On Fri, Sep 11, 2015 at 3:00 AM, Jagat Singh 
>> wrote:
>> > Hi,
>> >
>> > We have queries which were running fine on 1.4.1 system.
>> >
>> > We are testing upgrade and even simple query like
>> >
>> > val t1= sqlContext.sql("select count(*) from table")
>> >
>> > t1.show
>> >
>> > This works perfectly fine on 1.4.1 but throws OOM error in 1.5.0
>> >
>> > Are there any changes in default memory settings from 1.4.1 to 1.5.0
>> >
>> > Thanks,
>> >
>> >
>> >
>>
>
>


Cogrouping data in dataframes - PairRDD cogroup vs. join - best practices

2015-09-12 Thread Matthew Denny
I was wondering what are the best practices in regards to cogrouping
data in dataframes.  While joins are obviously a powerful tool, it seems
that there's still some cases where using cogroup (which is only
supported by PairRDDs) is still a better choice.  Consider 2 case
classes with the following data: 

case class TypeA(id: Int, typeAStr: String  )
case class TypeB(id: Int, typeBId: Int, typeBStr: String  )

val rddA = ... // contains TypeA(1, "A1")
val rddB = ... // contains TypeB(1, 1, "B1"), TypeB(1, 2, "B2"),

I need basically all objects (regardless of their class) with the same
"id" grouped together.  Thus, cogroup yields exactly what I want: 
val cogroupAB = rddA.map(rec => (rec.id, rec)).cogroup(rddB.map(rec =>
(rec.id, rec))) // printing this out for key = 1 yields: 
//(1,(CompactBuffer(TypeA(1,A1)),CompactBuffer(TypeB(1,1,B1),
TypeB(1,2,B2


However, suppose instead I have two Dataframes dfA and dfB with the same
schema and data as rddA and rddB, respectively, but we're using
dataframes because the data is coming from an external source with it's
own schema.  In this case, there appears to be no cogroup between data
frames, only joins: 
val joinResult = dfA.join(dfB, dfA("id") === dfB("id") ) // printing
this out where id = 1yields 
// [1,A1,1,1,B1]
// [1,A1,1,2,B2]

While this gives me the data that I want, this will create a lot of
redundant data if dfA has a lot more fields and there's a lot of dfB
rows with the same id as any given dfA row.  I've seen a post on
stackExchange about the lack of cogroup for dataframes (
http://stackoverflow.com/questions/31806473/spark-dataframe-best-way-to-cogroup-dataframes
) but the only solution posted consists of creating RDDs from the
dataframe, cogrouping them, then converting them back to data frames,
which (the author readily admits) is inefficient.  

Thus, in the case of cogrouping dataframes, would I be best off to: 
1. run the join, then create a dataframe manually removing the redundant
data  
2. run a similar solution to the stackoverflow post 
3. use some other method (maybe this is an idea for an API enhancement?) 

I'd prefer not to convert the data frames to RDDs permanently, as I'd
like to keep all the nice properties of dataframes.  If #3 is an option
and requires an enhancement, I would be interested in discussing further
and possibly contributing.  

thanks, Matt   



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Limiting number of cores per job in multi-threaded driver.

2015-09-12 Thread Philip Weaver
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
scheduler, so I can define a long-running application capable of executing
multiple simultaneous spark jobs.

The kind of jobs that I'm running do not benefit from more than 4 cores,
but I want my application to be able to take several times that in order to
run multiple jobs at the same time.

I suppose my question is more basic: How can I limit the number of cores
used to load an RDD or DataFrame? I can immediately repartition or coalesce
my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
Spark from using more cores to load it.

Does it make sense what I am trying to accomplish, and is there any way to
do it?

- Philip


Re: Spark 1.5.0 java.lang.OutOfMemoryError: PermGen space

2015-09-12 Thread Jagat Singh
Hi Davies,

This was first query on new version.

The one which ran successfully was Spark Pi example

./bin/spark-submit --class org.apache.spark.examples.SparkPi \

--master yarn-client \

--num-executors 3 \

--driver-memory 4g \

--executor-memory 2g \

--executor-cores 1 \

--queue default \

lib/spark-examples*.jar \

10

Then i tried using spark-shell , which was started without any extra memory
Grabage collection or Permgen configurations

./bin/spark-shell --num-executors 2 --executor-memory 512m --master
yarn-client

val t1= sqlContext.sql("select count(*) from table")

t1.show

This one fails with PermGen

I will try on Monday the solution suggested about passing extra PermGen to
driver.

Thanks,

On Sat, Sep 12, 2015 at 2:57 AM, Davies Liu  wrote:

> Did this happen immediately after you start the cluster or after ran
> some queries?
>
> Is this in local mode or cluster mode?
>
> On Fri, Sep 11, 2015 at 3:00 AM, Jagat Singh  wrote:
> > Hi,
> >
> > We have queries which were running fine on 1.4.1 system.
> >
> > We are testing upgrade and even simple query like
> >
> > val t1= sqlContext.sql("select count(*) from table")
> >
> > t1.show
> >
> > This works perfectly fine on 1.4.1 but throws OOM error in 1.5.0
> >
> > Are there any changes in default memory settings from 1.4.1 to 1.5.0
> >
> > Thanks,
> >
> >
> >
>


What happens when cache is full?

2015-09-12 Thread Hemminger Jeff
I am trying to understand the process of caching and specifically what the
behavior is when the cache is full. Please excuse me if this question is a
little vague, I am trying to build my understanding of this process.

I have an RDD that I perform several computations with, I persist it with
IN_MEMORY_SER before performing the computations.

I believe that, due to insufficient memory, it is recomputing (at least
part of) the RDD each time.

Logging shows that the RDD was not cached previously, and therefore needs
to be computed.

I looked at the BlockManager Spark code, and see that getOrCompute attempts
to retrieve memory from cache. If it is not available, it computes it.

Can I assume that when Spark attempts to cache an RDD but runs out of
memory, it recomputes a part of the RDD each time it is read?

I think I might be incorrect in this assumption, because I would have
expected a warning message if the cache was out of memory.

Thanks,
Jeff


Re: Multithreaded vs Spark Executor

2015-09-12 Thread Richard Eggert
Parallel processing is what Spark was made for. Let it do its job. Spawning
your own threads independently of what Spark is doing seems like you'd just
be asking for trouble.

I think you can accomplish what you want by taking the cartesian product of
the data element RDD and the feature list RDD and then perform the
computation as a map operation that takes the tuple of the data element and
feature as input.

Rich
On Sep 11, 2015 11:07 PM, "Rachana Srivastava" <
rachana.srivast...@markmonitor.com> wrote:

> Hello all,
>
>
>
> We are getting stream of input data from a Kafka queue using Spark
> Streaming API.  For each data element we want to run parallel threads to
> process a set of feature lists (nearly 100 feature or more).Since
> feature lists creation is independent of each other we would like to
> execute these feature lists in parallel on the input data that we get from
> the Kafka queue.
>
>
>
> *Question is *
>
>
>
> 1. Should we write thread pool and manage these features execution on
> different threads in parallel.  Only concern is because of data locality we
> are confined to the node that is assigned to the input data from the Kafka
> stream we cannot leverage distributed nodes for processing of these
> features for a single input data.
>
>
>
> 2.  Or since we are using JavaRDD as a feature list, these feature
> execution will be managed internally by Spark executors.
>
>
>
> Thanks,
>
>
>
> Rachana
>


change the spark version

2015-09-12 Thread Angel Angel
Respected sir,

I installed two versions of spark 1.2.0 (cloudera 5.3) and 1.4.0.
I am running some application that need spark 1.4.0

The application is related to deep learning.

*So how can i remove the version 1.2.0 *
*and run my application on version 1.4.0 ?*



When i run command spark-shell the version 1.2.0 comes in picture.

and for version 1.4.0 i have run command like /root/spark/bin/spark-shell.


Spark K means number of Iterations?

2015-09-12 Thread ashensw
H all,

I want to know whether the K means algorithm stops if the data set converges
to stable clusters before reaching the number of iteration that we defined? 

As an example if I give 100 as the number of iterations and 5 as the number
of clusters, but if the data set converges to stable 5 clusters within 30
iterations whether the K means algorithm stops that point or is it anyway do
100 iterations?

Thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-K-means-number-of-Iterations-tp24664.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: change the spark version

2015-09-12 Thread Sean Owen
This is a question for the CDH list. CDH 5.4 has Spark 1.3, and 5.5
has 1.5. The best thing is to update CDH as a whole if you can.

However it's pretty simple to just run a newer Spark assembly as a
YARN app. Don't remove anything in the CDH installation. Try
downloading the assembly and prefixing your command with
SPARK_JAR=newassembly.jar ...

On Sat, Sep 12, 2015 at 7:16 AM, Angel Angel  wrote:
> Respected sir,
>
> I installed two versions of spark 1.2.0 (cloudera 5.3) and 1.4.0.
> I am running some application that need spark 1.4.0
>
> The application is related to deep learning.
>
> So how can i remove the version 1.2.0
> and run my application on version 1.4.0 ?
>
>
>
> When i run command spark-shell the version 1.2.0 comes in picture.
>
> and for version 1.4.0 i have run command like /root/spark/bin/spark-shell.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SIGTERM 15 Issue : Spark Streaming for ingesting huge text files using custom Receiver

2015-09-12 Thread Jörn Franke
I am not sure what are you trying to achieve here. Have you thought about
using flume? Additionally maybe something like rsync?

Le sam. 12 sept. 2015 à 0:02, Varadhan, Jawahar 
a écrit :

> Hi all,
>I have a coded a custom receiver which receives kafka messages. These
> Kafka messages have FTP server credentials in them. The receiver then opens
> the message and uses the ftp credentials in it  to connect to the ftp
> server. It then streams this huge text file (3.3G) . Finally this stream it
> read line by line using buffered reader and pushed to the spark streaming
> via the receiver's "store" method. Spark streaming process receives all
> these lines and stores it in hdfs.
>
> With this process I could ingest small files (50 mb) but cant ingest this
> 3.3gb file.  I get a YARN exception of SIGTERM 15 in spark streaming
> process. Also, I tried going to that 3.3GB file directly (without custom
> receiver) in spark streaming using ssc.textFileStream  and everything works
> fine and that file ends in HDFS
>
> Please let me know what I might have to do to get this working with
> receiver. I know there are better ways to ingest the file but we need to
> use Spark streaming in our case.
>
> Thanks.
>


Re: Implement "LIKE" in SparkSQL

2015-09-12 Thread liam
OK, I got another way, it looks silly and low inefficiency but works.

tradeDF.registerTempTable(tradeTab);

orderDF.registerTempTable(orderTab);

//orderId = tid + "_x"

String sql1 = "select * from " + tradeTab + " a, " + orderTab + " b where
substr(b.orderId,1,15) = substr(a.tid,1) ";

String sql2 = "select * from " + tradeTab + " a, " + orderTab + " b where
substr(b.orderId,1,16) = substr(a.tid,1) ";

String sql3 = "select * from " + tradeTab + " a, " + orderTab + " b where
substr(b.orderId,1,17) = substr(a.tid,1) ";

DataFrame combinDF =
sqlContext.sql(sql1).unionAll(sqlContext.sql(sql2)).unionAll(sqlContext.sql(sql3));


 As I try :
   substr(b.orderId,1,length(a.tid)) = a.tid  *-> no length available*
   b.orderId like concat(a.tid,'%')   *-> no concat available*
   instr(b.orderId,a.tid) > 0*->** no instr available*
   locate(a.tid,b.orderId) > 0 *->** no locate available*
   ..*-> no
.. *



2015-09-12 13:49 GMT+08:00 Richard Eggert :

> concat and locate are available as of version 1.5.0, according to the
> Scaladocs. For earlier versions of Spark, and for the operations that are
> still not supported,  it's pretty straightforward to define your own
> UserDefinedFunctions in either Scala or Java  (I don't know about other
> languages).
> On Sep 11, 2015 10:26 PM, "liam"  wrote:
>
>> Hi,
>>
>>  Imaging this: the value of one column is the substring of another
>> column, when using Oracle,I got many ways to do the query like the
>> following statement,but how to do in SparkSQL since this no "concat(),
>> instr(), locate()..."
>>
>>
>> select * from table t where t.a like '%'||t.b||'%';
>>
>>
>> Thanks.
>>
>>


Re: Help with collect() in Spark Streaming

2015-09-12 Thread Luca
I am trying to implement an application that requires the output to be
aggregated and stored as a single txt file to HDFS (instead of, for
instance, having 4 different txt files coming from my 4 workers).
The solution I used does the trick, but I can't tell if it's ok to
regularly stress one of the worker for the writing. That is why I thought
about having the driver collecting and storing the data.

Thanks for your patience and for your help, anyway. :)

2015-09-11 19:00 GMT+02:00 Holden Karau :

> Having the driver write the data instead of a worker probably won't spread
> it up, you still need to copy all of the data to a single node. Is there
> something which forces you to only write from a single node?
>
>
> On Friday, September 11, 2015, Luca  wrote:
>
>> Hi,
>> thanks for answering.
>>
>> With the *coalesce() *transformation a single worker is in charge of
>> writing to HDFS, but I noticed that the single write operation usually
>> takes too much time, slowing down the whole computation (this is
>> particularly true when 'unified' is made of several partitions). Besides,
>> 'coalesce' forces me to perform a further repartitioning ('true' flag), in
>> order not to lose upstream parallelism (by the way, did I get this part
>> right?).
>> Am I wrong in thinking that having the driver do the writing will speed
>> things up, without the need of repartitioning data?
>>
>> Hope I have been clear, I am pretty new to Spark. :)
>>
>> 2015-09-11 18:19 GMT+02:00 Holden Karau :
>>
>>> A common practice to do this is to use foreachRDD with a local var to
>>> accumulate the data (you can see it in the Spark Streaming test code).
>>>
>>> That being said, I am a little curious why you want the driver to create
>>> the file specifically.
>>>
>>> On Friday, September 11, 2015, allonsy  wrote:
>>>
 Hi everyone,

 I have a JavaPairDStream object and I'd like the
 Driver to
 create a txt file (on HDFS) containing all of its elements.

 At the moment, I use the /coalesce(1, true)/ method:


 JavaPairDStream unified = [partitioned stuff]
 unified.foreachRDD(new Function,
 Void>() {
 public Void call(JavaPairRDD arg0) throws Exception {
 arg0.coalesce(1,
 true).saveAsTextFile();
 return null;
 }
 });


 but this implies that a /single worker/ is taking all the data and
 writing
 to HDFS, and that could be a major bottleneck.

 How could I replace the worker with the Driver? I read that /collect()/
 might do this, but I haven't the slightest idea on how to implement it.

 Can anybody help me?

 Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-collect-in-Spark-Streaming-tp24659.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>> Linked In: https://www.linkedin.com/in/holdenkarau
>>>
>>>
>>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
> Linked In: https://www.linkedin.com/in/holdenkarau
>
>


How to create broadcast variable from Java String array?

2015-09-12 Thread unk1102
Hi I have Java String array which contains 45 string which is basically
Schema

String[] fieldNames = {"col1","col2",...};

Currently I am storing above array of String in a driver static field. My
job is running slow so trying to refactor code

I am using String array in creating DataFrame

DataFrame df = sourceFrame.toDF(fieldNames);

I want to do the above using broadcast variable to that we dont ship huge
string array to executor I believe we can do something like the following to
create broadcast 

String[] brArray = sc.broadcast(fieldNames);
DataFrame df = sourceFrame.toDF(???);//how do I use above broadcast can I
use it as is by passing brArray

Please guide thanks much.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-broadcast-variable-from-Java-String-array-tp24666.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark K means number of Iterations?

2015-09-12 Thread ashensw
Okay. Thanks for the help.

On Sat, Sep 12, 2015 at 1:16 PM, Robineast [via Apache Spark User List] <
ml-node+s1001560n24665...@n3.nabble.com> wrote:

> Yes it does stop if the algorithm converges in less than the specified
> tolerance. You have a parameter to the Kmeans constructor called epsilon
> which defaults to 1e4. If you have logging set to INFO you will get a log
> message telling you how many iterations were run
>
> -
> Robin East
> *Spark GraphX in Action* Michael S Malak and Robin East
> http://manning.com/books/spark-graphx-in-action
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-K-means-number-of-Iterations-tp24664p24665.html
> To unsubscribe from Spark K means number of Iterations?, click here
> 
> .
> NAML
> 
>



-- 
*Ashen Weerathunga*
Software Engineer - Intern
WSO2 Inc.: http://wso2.com
lean.enterprise.middleware

Email: as...@wso2.com
Mobile: +94 716042995 <94716042995>
LinkedIn:
*http://lk.linkedin.com/in/ashenweerathunga
*




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-K-means-number-of-Iterations-tp24664p24669.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

[Question] ORC - EMRFS Problem

2015-09-12 Thread Cazen Lee
Good Day!

I think there are some problems between ORC and AWS EMRFS.

When I was trying to read "upper 150M" ORC files from S3, ArrayOutOfIndex 
Exception occured.

I'm sure that it's AWS side issue because there was no exception when trying 
from HDFS or S3NativeFileSystem.

Parquet runs ordinarily but it's inconvenience(Almost our system runs based on 
ORC)

Does anybody knows about this issue?

I've tried spark 1.4.1(EMR 4.0.0) and there are no 1.5 patch-note about this

Thank You
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org