Trouble while running spark at ec2 cluster

2016-07-15 Thread Hassaan Chaudhry
Hi

I have launched my cluster and I am trying to submit my application to run
on cluster but its not allowing me to connect . It prompts  the following
error "*Master endpoint
spark://**ec2-54-187-59-117.us-west-2.compute.amazonaws.com:7077
** was not
a REST server.*" The command I use to run my application on cluster is

*" /spark-1.6.1/bin/spark-submit --master
spark://ec2-54-200-193-107.us-west-  **2.compute.amazonaws.com:7077
** --deploy-mode cluster --class BFS
target/scala-  2.10/scalaexample_2.10-1.0.jar "*

Am i missing something ? Your help will be highly appreciated .

*P.S * *I have even tried adding inbound rule to my master node but still
no success.*

Thanks


Fwd: Spark streaming takes longer time to read json into dataframes

2016-07-15 Thread Diwakar Dhanuskodi
-- Forwarded message --
From: Diwakar Dhanuskodi 
Date: Sat, Jul 16, 2016 at 9:30 AM
Subject: Re: Spark streaming takes longer time to read json into dataframes
To: Jean Georges Perrin 


Hello,

I need it on memory.  Increased executor memory to 25G and executor cores
to 3. Got same result. There is always one task running under executor for
rdd.read.json() because rdd partition size is 1 . Doing hash partitioning
inside foreachRDD is a good approach?

Regards,
Diwakar.

On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin  wrote:

> Do you need it on disk or just push it to memory? Can you try to increase
> memory or # of cores (I know it sounds basic)
>
> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
> >
> > Hello,
> >
> > I have 400K json messages pulled from Kafka into spark streaming using
> DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
> single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
> convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
> dataframe.
> >
> > I am running in Yarn client mode with executor memory as 15G and
> executor cores as 2.
> >
> > Caching rdd before converting into dataframe  doesn't change processing
> time. Whether introducing hash partitions inside foreachRDD  will help?
> (or) Will partitioning topic and have more than one DirectStream help?. How
> can I approach this situation to reduce time in converting to dataframe..
> >
> > Regards,
> > Diwakar.
>
>


Re: Spark streaming takes longer time to read json into dataframes

2016-07-15 Thread Jean Georges Perrin
Do you need it on disk or just push it to memory? Can you try to increase 
memory or # of cores (I know it sounds basic)

> On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi 
>  wrote:
> 
> Hello, 
> 
> I have 400K json messages pulled from Kafka into spark streaming using 
> DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is 
> single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to 
> convert  rdd into dataframe. It takes almost 2.3 minutes to convert into 
> dataframe. 
> 
> I am running in Yarn client mode with executor memory as 15G and executor 
> cores as 2.
> 
> Caching rdd before converting into dataframe  doesn't change processing time. 
> Whether introducing hash partitions inside foreachRDD  will help? (or) Will 
> partitioning topic and have more than one DirectStream help?. How can I 
> approach this situation to reduce time in converting to dataframe.. 
> 
> Regards, 
> Diwakar. 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming takes longer time to read json into dataframes

2016-07-15 Thread Diwakar Dhanuskodi
Hello,

I have 400K json messages pulled from Kafka into spark streaming using
DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
dataframe.

I am running in Yarn client mode with executor memory as 15G and executor
cores as 2.

Caching rdd before converting into dataframe  doesn't change processing
time. Whether introducing hash partitions inside foreachRDD  will help?
(or) Will partitioning topic and have more than one DirectStream help?. How
can I approach this situation to reduce time in converting to dataframe..

Regards,
Diwakar.


Size of cached dataframe

2016-07-15 Thread Brandon White
Is there any public API to get the size of a dataframe in cache? It's seen
through the Spark UI but I don't see the API to access this information. Do
I need to build it myself using a forked version of Spark?


Re: RDD and Dataframes

2016-07-15 Thread Taotao.Li
hi, brccosta, databricks have just posted a blog about *RDD, Dataframe and
Dataset*, you can check it here :
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
 , which will be very helpful for you I think.

*___*
Quant | Engineer | Boy
*___*
*blog*:http://litaotao.github.io

*github*: www.github.com/litaotao


On Sat, Jul 16, 2016 at 7:53 AM, RK Aduri  wrote:

> DataFrames uses RDDs as internal implementation of its structure. It
> doesn't
> convert to RDD but uses RDD partitions to produce logical plan.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Dataframes-tp27306p27346.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
*___*
Quant | Engineer | Boy
*___*
*blog*:http://litaotao.github.io

*github*: www.github.com/litaotao


Re: Spark Streaming - Best Practices to handle multiple datapoints arriving at different time interval

2016-07-15 Thread RK Aduri
You can probably define sliding windows and set larger batch intervals. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Best-Practices-to-handle-multiple-datapoints-arriving-at-different-time-interval-tp27315p27348.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: java.lang.OutOfMemoryError related to Graphframe bfs

2016-07-15 Thread RK Aduri
Did you try with different driver's memory? Increasing driver's memory can be
one option. Can you print the GC and post the GC times?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-related-to-Graphframe-bfs-tp27318p27347.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: RDD and Dataframes

2016-07-15 Thread RK Aduri
DataFrames uses RDDs as internal implementation of its structure. It doesn't
convert to RDD but uses RDD partitions to produce logical plan.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Dataframes-tp27306p27346.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark.executor.cores

2016-07-15 Thread Brad Cox
Mitch: could you elaborate on: You can practically run most of your unit 
testing with Local mode and deploy variety of options including running SQL 
queries, reading data from CSV files, writing to HDFS, creating Hive tables 
including ORC tables and doing Spark Streaming.

In particular, are you saying there's a way to run in local mode and still 
read/write HDFS on a non-local cluster? Or do you mean run on the cluster as 
yarn-whichever, which is the only way I've found that works to date?

Please elaborate. I'm shakey on how to aim spark at local files versus hdfs 
files without a bunch of random experimenting around. 

Dr. Brad J. CoxCell: 703-594-1883 Skype: dr.brad.cox



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Custom InputFormat (SequenceFileInputFormat vs FileInputFormat)

2016-07-15 Thread Jörn Franke
I am not sure if I exactly understand your use case, but for my Hadoop/Spark 
format that reads the Bitcoin blockchain I extend from  FileInputFormat. I use 
the default split mechanism. This could mean that I split in the middle of a 
bitcoin block, which is no issue, because the first split can reach beyond its 
original size (in this case the remaining necessary data might be transferred 
from a remote node) and the second split can be seeked through the next block.

However the main different thing to your case it that my blocks are of similar 
size. Your block size can vary a lot, which means that one task could be busy 
with a small block and another with a very big block. This means parallel 
processing might be suboptimal. Here it depends now what do you plan with the 
blocks afterwards?

> On 15 Jul 2016, at 19:31, jtgenesis  wrote:
> 
> I'm working with a single image file that consists of headers and a multitude
> of different of data segment types (each data segment having its own
> sub-header that contains meta data). Currently using Hadoop's HDFS.
> 
> Example file layout:
> 
> | Header | Seg A-1 Sub-Header | Seg A-1 Data | Seg A-2 SubHdr | Seg A-2 Data
> | Seg B-1 Subhdr | Seg B-1 Data | Seg C-1 SubHdr | Seg C-1 Data | etc
> 
> The headers will vary from 1-10 Kb in size and each Data segment size will
> vary anywhere from 10KB - 10GB. The headers are represented as characters
> and the data is represented as binary. The headers include some useful
> information like number of segments, size of subheaders and segment data
> (I'll need this to create my splits).
> 
> To digest it all, I'm wondering if it's best to create a custom InputFormat
> inheriting from (1) FileInputFormat or (2) SequenceFileInputFormat.
> 
> If I go with (1), I will create HeaderSplits and DataSplits (data splits
> will be equiv to block size 128MB). I would also create a custom
> RecordReader for the DataSplits. Where the record size will be of tile
> sizes, 1024^2 Bytes. In the record reader, I will just read a tile size at a
> time. For the headers, each split will contain one record.
> 
> If i go with (2), I believe the bulk of my work would be in converting my
> image file to a SequenceFile. I would create a a key,value for each
> header/subheader, and a key/value for every 1024^2 Bytes in my Segment Data.
> Once I do that, I would have to create a custom SequenceFileInputFormat that
> will also split the headers from the partitioned data segments. I read that
> SequenceFiles are great for dealing with the "large # of small files"
> problem, but I'm dealing with just 1 image file (although with possibly many
> different data segments).
> 
> I also noticed that SequenceFileInputFormat uses FileInputFormat getSplits
> implementation. I'm assuming I would have to modify it to get the kinds of
> splits that I want. (Extract the Header key/value pair and parse/extract
> size info, etc).
> 
> Is one approach better than the other? I feel (1) would be a simpler task,
> but (2) has a lot of nice features. Is there a better way? 
> 
> This is probably more of a hadoop question, but was curious if anyone had
> experience with this.
> 
> Thank you in advance!
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Custom-InputFormat-SequenceFileInputFormat-vs-FileInputFormat-tp27344.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark.executor.cores

2016-07-15 Thread Mich Talebzadeh
Great stuff thanks Jean. These are from my notes:

These are the Spark operation modes that I know


   -

   Spark Local - Spark runs on the local host. This is the simplest set up
   and best suited for learners who want to understand different concepts of
   Spark and those performing unit testing.
   -

   Spark Standalone – a simple cluster manager included with Spark that
   makes it easy to set up a cluster.
   -

   YARN Cluster Mode, the Spark driver runs inside an application master
   process which is managed by YARN on the cluster, and the client can go away
   after initiating the application. This is invoked with –master yarn
and --deploy-mode
   cluster
   -

   YARN Client Mode, the driver runs in the client process, and the
   application master is only used for requesting resources from YARN.
Unlike Spark
   standalone mode, in which the master’s address is specified in the
   --master parameter, in YARN mode the ResourceManager’s address is picked
   up from the Hadoop configuration. Thus, the --master parameter is yarn. This
   is invoked with --deploy-mode client

Spark Local is the easiest one. You need to have any master or worker
running. In this mode the driver program (SparkSubmit), the resource
manager and executor all exist within the same JVM. The JVM itself is the
worker thread. This is the one I gather you use on your favourite laptop.

You start it with --local . This will start with one (worker) *thread *or
equivalent to –master local[1]. You can start by more than one thread by
specifying the number of threads *k* in –master local[k]. You can also
start using all available threads with –master local[*]. The degree of
parallelism is defined by the number of threads *k*.

In *Local mode*, you do not need to start master and slaves/workers. In
this mode it is pretty simple and you can run as many JVMs (spark-submit)
as your resources allow (resource meaning memory and cores). Additionally,
the GUI starts by default on port 4040, next one on 4041 and so forth
unless you specifically start it with --conf "spark.ui.port=n"

Remember this is all about testing your apps. It is NOT a performance test.
What it allows you is to test multiple apps concurrently and more
importantly gets you started and understand various configuration
parameters that Spark uses together with spark-submit executable.


You can of course use spark-shell and spark-sql utilities. These in turn
rely on spark-submit executable to run certain variations of the JVM. In
other words, you are still executing spark-submit. You can pass parameters
to spark-submit with an example shown below:

${SPARK_HOME}/bin/spark-submit \
-- \
--driver-memory 2G \
--num-executors 1 \
--executor-memory 2G \
--master local \
--executor-cores 2 \
--conf "spark.scheduler.mode=FAIR" \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \
--jars  \
--class "${FILE_NAME}" \
--conf "spark.ui.port=4040” \
--conf "spark.driver.port=54631" \
--conf "spark.fileserver.port=54731" \
--conf "spark.blockManager.port=54832" \
--conf "spark.kryoserializer.buffer.max=512" \
${JAR_FILE} \
>> ${LOG_FILE}


Note that in the above example I am only using modest resources. This is
intentional to ensure that resources are available for the other Spark jobs
that I may be testing on this standalone node.

Alternatively, you can specify some of these parameters when you are
creating a new SparkConf

val sparkConf = new SparkConf().
 setAppName("My appname").
 setMaster("local").
 Set(“num.executors”, “1”).
 set("spark.executor.memory", "2G").
 set(“spark.executor.cores”, “2”).
 set("spark.cores.max", "2").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")

You can practically run most of your unit testing with Local mode and
deploy variety of options including running SQL queries, reading data from
CSV files, writing to HDFS, creating Hive tables including ORC tables and
doing Spark Streaming.

I like this mode as I can overload my machine with as many as Spark apps as
I can and I am only the one that manages resources. Too much apps and they
will not run.


Cheers


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly 

Re: Streaming from Kinesis is not getting data in Yarn cluster

2016-07-15 Thread Yash Sharma
I struggled with kinesis for a long time and got all my findings documented
at -

http://stackoverflow.com/questions/35567440/spark-not-able-to-fetch-events-from-amazon-kinesis

Let me know if it helps.

Cheers,
Yash

- Thanks, via mobile,  excuse brevity.

On Jul 16, 2016 6:05 AM, "dharmendra"  wrote:

> I have created small spark streaming program to fetch data from Kinesis and
> put some data in database.
> When i ran it in spark standalone cluster using master as local[*] it is
> working fine but when i tried to run in yarn cluster with master as "yarn"
> application doesn't receive any data.
>
> I submit job using following command
> spark-submit --class <> --master yarn --deploy-mode cluster
> --queue default --executor-cores 2 --executor-memory 2G --num-executors 4
> <
>
> My java code is like
>
> JavaDStream enrichStream =
> javaDStream.flatMap(sparkRecordProcessor);
>
> enrichStream.mapToPair(new PairFunction Integer>()
> {
> @Override
> public Tuple2 call(Aggregation s) throws
> Exception {
> LOGGER.info("creating tuple " + s);
> return new Tuple2<>(s, 1);
> }
> }).reduceByKey(new Function2() {
> @Override
> public Integer call(Integer i1, Integer i2) throws Exception {
> LOGGER.info("reduce by key {}, {}", i1, i2);
> return i1 + i2;
> }
> }).foreach(sparkDatabaseProcessor);
>
>
> I have put some logs in sparkRecordProcessor and sparkDatabaseProcessor.
> I can see that sparkDatabaseProcessor executed every batch interval(10 sec)
> and but find no log in sparkRecordProcessor.
> There is no event(avg/sec) in Spark Streaming UI.
> In Executor tab i can see 3 executors. Data against these executors are
> also
> continuously updated.
> I also check Dynamodb table in Amazon and leaseCounter is updated regularly
> from my application.
> But spark streaming gets no data from Kinesis in yarn.
> I see "shuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0
> blocks" many times in log.
> I don't know what else i need to do to run spark streaming on yarn.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-from-Kinesis-is-not-getting-data-in-Yarn-cluster-tp27345.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark.executor.cores

2016-07-15 Thread Jean Georges Perrin
Hey Mich,


Oh well, you know, us humble programmers try to modestly understand what the 
brilliant data scientists are designing and, I can assure you that it is not 
easy.

Basically the way I use Spark is in 2 ways:

1) As a developer
I just embed the Spark binaries (jars) in my Maven POM. In the app, when I need 
to have Spark do something, I just call the local's master (quick example here: 
http://jgp.net/2016/06/26/your-very-first-apache-spark-application/ 
).

Pro: this is the super-duper easy & lazy way, works like a charm, setup under 5 
minutes with one arm in your back and being blindfolded.
Con: well, I have a MacBook Air, a nice MacBook Air, but still it is only a 
MacBook Air, with 8GB or RAM and 2 cores... My analysis never finished (but a 
subset does).

2) As a database
Ok, some will probably find that shocking, but I used Spark as a database on a 
distance computer (my sweet Micha). The app connects to Spark, tells it what to 
do, and the application "consumes" the data crunching done by Spark on Micha (a 
bit more of the architecture there: 
http://jgp.net/2016/07/14/chapel-hill-we-dont-have-a-problem/ 
). 

Pro: this can scale like crazy (I have benchmarks scheduled)
Con: well... after you went through all the issues I had, I don't see much 
issues anymore (except that I still can't set the # of executors -- which 
starts to make sense).

3) As a remote batch processor
You prepare your "batch" as a jar. I remember using mainframes this way (and 
using SAS). 

Pro: very friendly to data scientists / researchers as they are used to this 
batch model.
Con: you need to prepare the batch, send it... The jar also needs to do with 
the results: save them in a database? send a mail? send a PDF? call the police?

Do you agree? Any other opinion?

I am not saying one is better than the other, just trying to get a "big 
picture".

jg




> On Jul 15, 2016, at 2:13 PM, Mich Talebzadeh  
> wrote:
> 
> Interesting
> 
> For some stuff I create an uber jar file and use that against spark-submit. I 
> have not attempted to start the cluster from through application.
> 
> 
> I tend to use a shell program (actually a k-shell) to compile it via maven or 
> sbt and then run it accordingly. In general you can parameterise everything 
> for runtime parameters say --driver-memory ${DRIVER_MEMORY} to practically 
> any other parameter . That way I find it more flexible because I can submit 
> the jar file and the class in any environment and adjust those runtime 
> parameters accordingly.  There are certain advantages to using spark-submit, 
> for example, since driver-memory setting encapsulates the JVM, you will need 
> to set the amount of driver memory for any non-default value before starting 
> JVM by providing the value in spark-submit.
> 
> I would be keen in hearing the pros and cons of the above approach. I am sure 
> you programmers (Scala/Java) know much more than me :)
> 
> Cheers
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 15 July 2016 at 16:42, Jean Georges Perrin  > wrote:
> lol - young padawan I am and path to knowledge seeking I am...
> 
> And on this path I also tried (without luck)...
> 
>   if (restId == 0) {
>   conf = conf.setExecutorEnv("spark.executor.cores", 
> "22");
>   } else {
>   conf = conf.setExecutorEnv("spark.executor.cores", "2");
>   }
> 
> and
> 
>   if (restId == 0) {
>   conf.setExecutorEnv("spark.executor.cores", "22");
>   } else {
>   conf.setExecutorEnv("spark.executor.cores", "2");
>   }
> 
> the only annoying thing I see is we designed some of the work to be handled 
> by the driver/client app and we will have to rethink a bit the design of the 
> app for that...
> 
> 
>> On Jul 15, 2016, at 11:34 AM, Daniel Darabos 
>> > 
>> wrote:
>> 
>> Mich's invocation is for starting a Spark application against an already 
>> running Spark standalone cluster. It will not start the cluster for you.
>> 
>> We used to not use "spark-submit", but we started using 

Streaming from Kinesis is not getting data in Yarn cluster

2016-07-15 Thread dharmendra
I have created small spark streaming program to fetch data from Kinesis and
put some data in database.
When i ran it in spark standalone cluster using master as local[*] it is
working fine but when i tried to run in yarn cluster with master as "yarn"
application doesn't receive any data.

I submit job using following command
spark-submit --class <> --master yarn --deploy-mode cluster
--queue default --executor-cores 2 --executor-memory 2G --num-executors 4
<

My java code is like 

JavaDStream enrichStream =
javaDStream.flatMap(sparkRecordProcessor);

enrichStream.mapToPair(new PairFunction()
{
@Override
public Tuple2 call(Aggregation s) throws 
Exception {
LOGGER.info("creating tuple " + s);
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
LOGGER.info("reduce by key {}, {}", i1, i2);
return i1 + i2;
}
}).foreach(sparkDatabaseProcessor);


I have put some logs in sparkRecordProcessor and sparkDatabaseProcessor.
I can see that sparkDatabaseProcessor executed every batch interval(10 sec)
and but find no log in sparkRecordProcessor.
There is no event(avg/sec) in Spark Streaming UI.
In Executor tab i can see 3 executors. Data against these executors are also
continuously updated.
I also check Dynamodb table in Amazon and leaseCounter is updated regularly
from my application.
But spark streaming gets no data from Kinesis in yarn.
I see "shuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0
blocks" many times in log.
I don't know what else i need to do to run spark streaming on yarn.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-from-Kinesis-is-not-getting-data-in-Yarn-cluster-tp27345.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How can we control CPU and Memory per Spark job operation..

2016-07-15 Thread Pavan Achanta
Hi All,

Here is my use case:

I have a pipeline job consisting of 2 map functions:

  1.  CPU intensive map operation that does not require a lot of memory.
  2.  Memory intensive map operation that requires upto 4 GB of memory. And 
this 4GB memory cannot be distributed since it is an NLP model.

Ideally what I like to do is to use 20 nodes with 4 cores each and minimal 
memory for first map operation and then use only 3 nodes with minimal CPU but 
each having 4GB of memory for 2nd operation.

While it is possible to control this parallelism for each map operation in 
spark. I am not sure how to control the resources for each operation. Obviously 
I don't want to start off the job with 20 nodes with 4 cores and 4GB memory 
since I cannot afford that much memory.

We use Yarn with Spark. Any suggestions ?

Thanks and regards,
Pavan




Re: How to convert from DataFrame to Dataset[Row]?

2016-07-15 Thread Mich Talebzadeh
can't you create a temp table from DF say df.registerTempTable("tmp") and
use that instead?



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 July 2016 at 20:21, Daniel Barclay 
wrote:

> In Spark 1.6.1, how can I convert a DataFrame to a Dataset[Row]?
>
> Is there a direct conversion?  (Trying .as[Row] doesn't
> work,
> even after importing  .implicits._ .)
>
> Is there some way to map the Rows from the Dataframe into the Dataset[Row]?
> (DataFrame.map would just make another Dataframe, right?)
>
>
> Thanks,
> Daniel
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How to convert from DataFrame to Dataset[Row]?

2016-07-15 Thread Daniel Barclay

In Spark 1.6.1, how can I convert a DataFrame to a Dataset[Row]?

Is there a direct conversion?  (Trying .as[Row] doesn't work,
even after importing  .implicits._ .)

Is there some way to map the Rows from the Dataframe into the Dataset[Row]?
(DataFrame.map would just make another Dataframe, right?)


Thanks,
Daniel

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark.executor.cores

2016-07-15 Thread Mich Talebzadeh
Interesting

For some stuff I create an uber jar file and use that against spark-submit.
I have not attempted to start the cluster from through application.


I tend to use a shell program (actually a k-shell) to compile it via maven
or sbt and then run it accordingly. In general you can parameterise
everything for runtime parameters say --driver-memory ${DRIVER_MEMORY} to
practically any other parameter . That way I find it more flexible because
I can submit the jar file and the class in any environment and adjust those
runtime parameters accordingly.  There are certain advantages to using
spark-submit, for example, since driver-memory setting encapsulates the
JVM, you will need to set the amount of driver memory for any non-default
value before starting JVM by providing the value in spark-submit.

I would be keen in hearing the pros and cons of the above approach. I am
sure you programmers (Scala/Java) know much more than me :)

Cheers



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 July 2016 at 16:42, Jean Georges Perrin  wrote:

> lol - young padawan I am and path to knowledge seeking I am...
>
> And on this path I also tried (without luck)...
>
> if (restId == 0) {
> conf = conf.setExecutorEnv("spark.executor.cores", "22");
> } else {
> conf = conf.setExecutorEnv("spark.executor.cores", "2");
> }
>
> and
>
> if (restId == 0) {
> conf.setExecutorEnv("spark.executor.cores", "22");
> } else {
> conf.setExecutorEnv("spark.executor.cores", "2");
> }
>
> the only annoying thing I see is we designed some of the work to be
> handled by the driver/client app and we will have to rethink a bit the
> design of the app for that...
>
>
> On Jul 15, 2016, at 11:34 AM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
> Mich's invocation is for starting a Spark application against an already
> running Spark standalone cluster. It will not start the cluster for you.
>
> We used to not use "spark-submit", but we started using it when it solved
> some problem for us. Perhaps that day has also come for you? :)
>
> On Fri, Jul 15, 2016 at 5:14 PM, Jean Georges Perrin  wrote:
>
>> I don't use submit: I start my standalone cluster and connect to it
>> remotely. Is that a bad practice?
>>
>> I'd like to be able to it dynamically as the system knows whether it
>> needs more or less resources based on its own  context
>>
>> On Jul 15, 2016, at 10:55 AM, Mich Talebzadeh 
>> wrote:
>>
>> Hi,
>>
>> You can also do all this at env or submit time with spark-submit which I
>> believe makes it more flexible than coding in.
>>
>> Example
>>
>> ${SPARK_HOME}/bin/spark-submit \
>> --packages com.databricks:spark-csv_2.11:1.3.0 \
>> --driver-memory 2G \
>> --num-executors 2 \
>> --executor-cores 3 \
>> --executor-memory 2G \
>> --master spark://50.140.197.217:7077 \
>> --conf "spark.scheduler.mode=FAIR" \
>> --conf
>> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps" \
>> --jars
>> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
>> --class "${FILE_NAME}" \
>> --conf "spark.ui.port=${SP}" \
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 15 July 2016 at 13:48, Jean Georges Perrin  wrote:
>>
>>> Merci Nihed, this is one of the tests I did :( still not working
>>>
>>>
>>>
>>> On Jul 15, 2016, at 8:41 AM, nihed mbarek  wrote:
>>>
>>> can you try with :
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>>> "spark.executor.memory", "4g")
>>> .setMaster("spark://10.0.100.120:7077");
>>> if (restId == 0) {
>>> conf = conf.set("spark.executor.cores", "22");
>>> } else {
>>> conf = 

spark single PROCESS_LOCAL task

2016-07-15 Thread Matt K
Hi all,

I'm seeing some curious behavior which I have a hard time interpreting. I
have a job which does a "groupByKey" and results in 300 executors. 299 are
run in NODE_LOCAL mode. 1 executor is run in PROCESS_LOCAL mode.

The 1 executor that runs in PROCESS_LOCAL mode gets about 10x as much input
as the other executors. It dies with OOM, and the job fails.

Only working theory I have is that there's a single key which has a ton of
data tied to it. Even so, I can't explain why it's run in PROCESS_LOCAL
mode and not others.

Anyone has ideas?

Thanks,
-Matt


Custom InputFormat (SequenceFileInputFormat vs FileInputFormat)

2016-07-15 Thread jtgenesis
I'm working with a single image file that consists of headers and a multitude
of different of data segment types (each data segment having its own
sub-header that contains meta data). Currently using Hadoop's HDFS.

Example file layout:

| Header | Seg A-1 Sub-Header | Seg A-1 Data | Seg A-2 SubHdr | Seg A-2 Data
| Seg B-1 Subhdr | Seg B-1 Data | Seg C-1 SubHdr | Seg C-1 Data | etc

The headers will vary from 1-10 Kb in size and each Data segment size will
vary anywhere from 10KB - 10GB. The headers are represented as characters
and the data is represented as binary. The headers include some useful
information like number of segments, size of subheaders and segment data
(I'll need this to create my splits).

To digest it all, I'm wondering if it's best to create a custom InputFormat
inheriting from (1) FileInputFormat or (2) SequenceFileInputFormat.

If I go with (1), I will create HeaderSplits and DataSplits (data splits
will be equiv to block size 128MB). I would also create a custom
RecordReader for the DataSplits. Where the record size will be of tile
sizes, 1024^2 Bytes. In the record reader, I will just read a tile size at a
time. For the headers, each split will contain one record.

If i go with (2), I believe the bulk of my work would be in converting my
image file to a SequenceFile. I would create a a key,value for each
header/subheader, and a key/value for every 1024^2 Bytes in my Segment Data.
Once I do that, I would have to create a custom SequenceFileInputFormat that
will also split the headers from the partitioned data segments. I read that
SequenceFiles are great for dealing with the "large # of small files"
problem, but I'm dealing with just 1 image file (although with possibly many
different data segments).

I also noticed that SequenceFileInputFormat uses FileInputFormat getSplits
implementation. I'm assuming I would have to modify it to get the kinds of
splits that I want. (Extract the Header key/value pair and parse/extract
size info, etc).

Is one approach better than the other? I feel (1) would be a simpler task,
but (2) has a lot of nice features. Is there a better way? 

This is probably more of a hadoop question, but was curious if anyone had
experience with this.

Thank you in advance!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-InputFormat-SequenceFileInputFormat-vs-FileInputFormat-tp27344.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: standalone mode only supports FIFO scheduler across applications ? still in spark 2.0 time ?

2016-07-15 Thread Mark Hamstra
Nothing has changed in that regard, nor is there likely to be "progress",
since more sophisticated or capable resource scheduling at the Application
level is really beyond the design goals for standalone mode.  If you want
more in the way of multi-Application resource scheduling, then you should
be looking at Yarn or Mesos.  Is there some reason why neither of those
options can work for you?

On Fri, Jul 15, 2016 at 9:15 AM, Teng Qiu  wrote:

> Hi,
>
>
> http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/spark-standalone.html#resource-scheduling
> The standalone cluster mode currently only supports a simple FIFO
> scheduler across applications.
>
> is this sentence still true? any progress on this? it will really
> helpful. some roadmap?
>
> Thanks
>
> Teng
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Maximum Size of Reference Look Up Table in Spark

2016-07-15 Thread Jacek Laskowski
Hi,

Never worked in a project that would require it.

Jacek

On 15 Jul 2016 5:31 p.m., "Saravanan Subramanian" 
wrote:

> Hello Jacek,
>
> Have you seen any practical limitation or performance degradation issues
> while using more than 10GB of broadcast cache ?
>
> Thanks,
> Saravanan S.
>
>
> On Thursday, 14 July 2016 8:06 PM, Jacek Laskowski 
> wrote:
>
>
> Hi,
>
> My understanding is that the maximum size of a broadcast is the
> Long.MAX_VALUE (and plus some more since the data is going to be
> encoded to save space, esp. for catalyst-driver datasets).
>
> Ad 2. Before the tasks access the broadcast variable it has to be sent
> across network that may be too slow to be acceptable.
>
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Jul 14, 2016 at 11:32 PM, Saravanan Subramanian
>  wrote:
> > Hello All,
> >
> > I am in the middle of designing real time data enhancement services using
> > spark streaming.  As part of this, I have to look up some reference data
> > while processing the incoming stream.
> >
> > I have below questions:
> >
> > 1) what is the maximum size of look up table / variable can be stored as
> > Broadcast variable ()
> > 2) What is the impact of cluster performance, if I store a 10GB data in
> > broadcast variable
> >
> > Any suggestions and thoughts are welcome.
> >
> > Thanks,
> > Saravanan S.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>


many 'activity' job are pending

2016-07-15 Thread 陆巍|Wei Lu(RD)
Hi there,

I meet with a “many Active jobs” issue when using direct kafka streaming on 
YARN. (spark 1.5, hadoop 2.6, CDH5.5.1)

The problem happens when kafka has almost NO traffic.

From application UI, I see many ‘active’ jobs are pending for hours. And 
finally the driver “Requesting 4 new executors because tasks are backlogged”.

But, when looking at the driver log of a ‘activity’ job, the log says the job 
is finished. So, why the application UI shows this job is activity like forever?

Thanks!


Here are related log info about one of the ‘activity’ jobs.
There are two stages: a reduceByKey follows a flatmap. The log says both stages 
are finished in ~20ms and the job also finishes in 64 ms.

Got job 6567
Final stage: ResultStage 9851(foreachRDD at
Parents of final stage: List(ShuffleMapStage 9850)
Missing parents: List(ShuffleMapStage 9850)
…
Finished task 0.0 in stage 9850.0 (TID 29551) in 20 ms
Removed TaskSet 9850.0, whose tasks have all completed, from pool
ShuffleMapStage 9850 (flatMap at OpaTransLogAnalyzeWithShuffle.scala:83) 
finished in 0.022 s
…
Submitting ResultStage 9851 (ShuffledRDD[16419] at reduceByKey at 
OpaTransLogAnalyzeWithShuffle.scala:83), which is now runnable
…
ResultStage 9851 (foreachRDD at OpaTransLogAnalyzeWithShuffle.scala:84) 
finished in 0.023 s
Job 6567 finished: foreachRDD at OpaTransLogAnalyzeWithShuffle.scala:84, took 
0.064372 s
Finished job streaming job 1468592373000 ms.1 from job set of time 
1468592373000 ms

Wei Lu


standalone mode only supports FIFO scheduler across applications ? still in spark 2.0 time ?

2016-07-15 Thread Teng Qiu
Hi,

http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/spark-standalone.html#resource-scheduling
The standalone cluster mode currently only supports a simple FIFO
scheduler across applications.

is this sentence still true? any progress on this? it will really
helpful. some roadmap?

Thanks

Teng

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Complications with saving Kafka offsets?

2016-07-15 Thread Cody Koeninger
The bottom line short answer for this is that if you actually care
about data integrity, you need to store your offsets transactionally
alongside your results in the same data store.

If you're ok with double-counting in the event of failures, saving
offsets _after_ saving your results, using foreachRDD, will work.

Have you read the material / examples linked from
https://github.com/koeninger/kafka-exactly-once ?


On Mon, Jul 11, 2016 at 9:58 PM, BradleyUM  wrote:
> I'm working on a Spark Streaming (1.6.0) project and one of our requirements
> is to persist Kafka offsets to Zookeeper after a batch has completed so that
> we can restart work from the correct position if we have to restart the
> process for any reason. Many links,
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
> included, seem to suggest that calling transform() on the stream is a
> perfectly acceptable way to store the offsets off for processing when the
> batch completes. Since that method seems to offer more intuitive ordering
> guarantees than foreachRDD() we have, up until now, preferred it. So our
> code looks something like the following:
>
> AtomicReference savedOffsets = new AtomicReference<>();
>
> messages = messages.transformToPair((rdd) -> {
>   // Save the offsets so that we can update ZK with them later
>   HasOffsetRanges hasOffsetRanges = (HasOffsetRanges)rdd.rdd();
>   savedOffsets.set(hasOffsetRanges.offsetRanges());
> }
>
> Unfortunately we've discovered that this doesn't work, as contrary to
> expectations the logic inside of transformToPair() seems to run whenever a
> new batch gets added, even if we're not prepared to process it yet. So
> savedOffsets will store the offsets of the most recently enqueued batch, not
> necessarily the one being processed. When a batch completes, then, the
> offset we save to ZK may reflect enqueued data that we haven't actually
> processed yet. This can (and has) created conditions where a crash causes us
> to restart from the wrong position and drop data.
>
> There seem to be two solutions to this, from what I can tell:
>
> 1.) A brief test using foreachRDD() instead of transform() seems to behave
> more in line with expectations, with the call only being made when a batch
> actually begins to process. I have yet to find an explanation as to why the
> two methods differ in this way.
> 2.) Instead of using an AtomicReference we tried a queue of offsets. Our
> logic pushes a set of offsets at the start of a batch and pulls off the
> oldest at the end - the idea is that the one being pulled will always
> reflect the most recently processed, not one from the queue. Since we're not
> 100% on whether Spark guarantees this we also have logic to assert that the
> batch that was completed has the same RDD ID as the one we're pulling from
> the queue.
>
> However, I have yet to find anything, on this list or elsewhere, that
> suggests that either of these two approaches is necessary. Does what I've
> described match anyone else's experience? Is the behavior I'm seeing from
> the transform() method expected? Do both of the solutions I've proposed seem
> legitimate, or is there some complication that I've failed to account for?
>
> Any help is appreciated.
>
> - Bradley
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Complications-with-saving-Kafka-offsets-tp27324.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Direct Approach

2016-07-15 Thread Cody Koeninger
We've been running direct stream jobs in production for over a year,
with uptimes in the range of months.

I'm pretty slammed with work right now, but when I get time to submit
a PR for the 0.10 docs i'll remove the experimental note from 0.8

On Mon, Jul 11, 2016 at 4:35 PM, Tathagata Das
 wrote:
> Aah, the docs have not been updated. They are totally in production in many
> place. Others should chime in as well.
>
> On Mon, Jul 11, 2016 at 1:43 PM, Mail.com  wrote:
>>
>> Hi All,
>>
>> Can someone please confirm if streaming direct approach for reading Kafka
>> is still experimental or can it be used for production use.
>>
>> I see the documentation and talk from TD suggesting the advantages of the
>> approach but docs state it is an "experimental" feature.
>>
>> Please suggest
>>
>> Thanks,
>> Pradeep
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark.executor.cores

2016-07-15 Thread Jean Georges Perrin
lol - young padawan I am and path to knowledge seeking I am...

And on this path I also tried (without luck)...

if (restId == 0) {
conf = conf.setExecutorEnv("spark.executor.cores", 
"22");
} else {
conf = conf.setExecutorEnv("spark.executor.cores", "2");
}

and

if (restId == 0) {
conf.setExecutorEnv("spark.executor.cores", "22");
} else {
conf.setExecutorEnv("spark.executor.cores", "2");
}

the only annoying thing I see is we designed some of the work to be handled by 
the driver/client app and we will have to rethink a bit the design of the app 
for that...


> On Jul 15, 2016, at 11:34 AM, Daniel Darabos 
>  wrote:
> 
> Mich's invocation is for starting a Spark application against an already 
> running Spark standalone cluster. It will not start the cluster for you.
> 
> We used to not use "spark-submit", but we started using it when it solved 
> some problem for us. Perhaps that day has also come for you? :)
> 
> On Fri, Jul 15, 2016 at 5:14 PM, Jean Georges Perrin  > wrote:
> I don't use submit: I start my standalone cluster and connect to it remotely. 
> Is that a bad practice?
> 
> I'd like to be able to it dynamically as the system knows whether it needs 
> more or less resources based on its own  context
> 
>> On Jul 15, 2016, at 10:55 AM, Mich Talebzadeh > > wrote:
>> 
>> Hi,
>> 
>> You can also do all this at env or submit time with spark-submit which I 
>> believe makes it more flexible than coding in.
>> 
>> Example
>> 
>> ${SPARK_HOME}/bin/spark-submit \
>> --packages com.databricks:spark-csv_2.11:1.3.0 \
>> --driver-memory 2G \
>> --num-executors 2 \
>> --executor-cores 3 \
>> --executor-memory 2G \
>> --master spark://50.140.197.217:7077 
>>  \
>> --conf "spark.scheduler.mode=FAIR" \
>> --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails 
>> -XX:+PrintGCTimeStamps" \
>> --jars 
>> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
>> --class "${FILE_NAME}" \
>> --conf "spark.ui.port=${SP}" \
>>  
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> On 15 July 2016 at 13:48, Jean Georges Perrin > > wrote:
>> Merci Nihed, this is one of the tests I did :( still not working
>> 
>> 
>> 
>>> On Jul 15, 2016, at 8:41 AM, nihed mbarek >> > wrote:
>>> 
>>> can you try with : 
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery 
>>> app").set("spark.executor.memory", "4g")
>>> .setMaster("spark://10.0.100.120:7077 <>");
>>> if (restId == 0) {
>>> conf = conf.set("spark.executor.cores", "22");
>>> } else {
>>> conf = conf.set("spark.executor.cores", "2");
>>> }
>>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>> 
>>> On Fri, Jul 15, 2016 at 2:31 PM, Jean Georges Perrin >> > wrote:
>>> Hi,
>>> 
>>> Configuration: standalone cluster, Java, Spark 1.6.2, 24 cores
>>> 
>>> My process uses all the cores of my server (good), but I am trying to limit 
>>> it so I can actually submit a second job.
>>> 
>>> I tried
>>> 
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery 
>>> app").set("spark.executor.memory", "4g")
>>> .setMaster("spark://10.0.100.120:7077 <>");
>>> if (restId == 0) {
>>> conf = conf.set("spark.executor.cores", "22");
>>> } else {
>>> conf = conf.set("spark.executor.cores", "2");
>>> }
>>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>> 
>>> and
>>> 
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery 
>>> app").set("spark.executor.memory", "4g")
>>> 

Re: spark.executor.cores

2016-07-15 Thread Daniel Darabos
Mich's invocation is for starting a Spark application against an already
running Spark standalone cluster. It will not start the cluster for you.

We used to not use "spark-submit", but we started using it when it solved
some problem for us. Perhaps that day has also come for you? :)

On Fri, Jul 15, 2016 at 5:14 PM, Jean Georges Perrin  wrote:

> I don't use submit: I start my standalone cluster and connect to it
> remotely. Is that a bad practice?
>
> I'd like to be able to it dynamically as the system knows whether it needs
> more or less resources based on its own  context
>
> On Jul 15, 2016, at 10:55 AM, Mich Talebzadeh 
> wrote:
>
> Hi,
>
> You can also do all this at env or submit time with spark-submit which I
> believe makes it more flexible than coding in.
>
> Example
>
> ${SPARK_HOME}/bin/spark-submit \
> --packages com.databricks:spark-csv_2.11:1.3.0 \
> --driver-memory 2G \
> --num-executors 2 \
> --executor-cores 3 \
> --executor-memory 2G \
> --master spark://50.140.197.217:7077 \
> --conf "spark.scheduler.mode=FAIR" \
> --conf
> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps" \
> --jars
> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
> --class "${FILE_NAME}" \
> --conf "spark.ui.port=${SP}" \
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 15 July 2016 at 13:48, Jean Georges Perrin  wrote:
>
>> Merci Nihed, this is one of the tests I did :( still not working
>>
>>
>>
>> On Jul 15, 2016, at 8:41 AM, nihed mbarek  wrote:
>>
>> can you try with :
>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>> "spark.executor.memory", "4g")
>> .setMaster("spark://10.0.100.120:7077");
>> if (restId == 0) {
>> conf = conf.set("spark.executor.cores", "22");
>> } else {
>> conf = conf.set("spark.executor.cores", "2");
>> }
>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>
>> On Fri, Jul 15, 2016 at 2:31 PM, Jean Georges Perrin  wrote:
>>
>>> Hi,
>>>
>>> Configuration: standalone cluster, Java, Spark 1.6.2, 24 cores
>>>
>>> My process uses all the cores of my server (good), but I am trying to
>>> limit it so I can actually submit a second job.
>>>
>>> I tried
>>>
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>>> "spark.executor.memory", "4g")
>>> .setMaster("spark://10.0.100.120:7077");
>>> if (restId == 0) {
>>> conf = conf.set("spark.executor.cores", "22");
>>> } else {
>>> conf = conf.set("spark.executor.cores", "2");
>>> }
>>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>>
>>> and
>>>
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>>> "spark.executor.memory", "4g")
>>> .setMaster("spark://10.0.100.120:7077");
>>> if (restId == 0) {
>>> conf.set("spark.executor.cores", "22");
>>> } else {
>>> conf.set("spark.executor.cores", "2");
>>> }
>>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>>
>>> but it does not seem to take it. Any hint?
>>>
>>> jg
>>>
>>>
>>>
>>
>>
>> --
>>
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com
>>
>> 
>>
>>
>>
>
>


Re: Maximum Size of Reference Look Up Table in Spark

2016-07-15 Thread Saravanan Subramanian
Hello Jacek,
Have you seen any practical limitation or performance degradation issues while 
using more than 10GB of broadcast cache ?
Thanks,Saravanan S. 

On Thursday, 14 July 2016 8:06 PM, Jacek Laskowski  wrote:
 

 Hi,

My understanding is that the maximum size of a broadcast is the
Long.MAX_VALUE (and plus some more since the data is going to be
encoded to save space, esp. for catalyst-driver datasets).

Ad 2. Before the tasks access the broadcast variable it has to be sent
across network that may be too slow to be acceptable.


Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jul 14, 2016 at 11:32 PM, Saravanan Subramanian
 wrote:
> Hello All,
>
> I am in the middle of designing real time data enhancement services using
> spark streaming.  As part of this, I have to look up some reference data
> while processing the incoming stream.
>
> I have below questions:
>
> 1) what is the maximum size of look up table / variable can be stored as
> Broadcast variable ()
> 2) What is the impact of cluster performance, if I store a 10GB data in
> broadcast variable
>
> Any suggestions and thoughts are welcome.
>
> Thanks,
> Saravanan S.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



  

Re: spark.executor.cores

2016-07-15 Thread Jean Georges Perrin
I don't use submit: I start my standalone cluster and connect to it remotely. 
Is that a bad practice?

I'd like to be able to it dynamically as the system knows whether it needs more 
or less resources based on its own  context

> On Jul 15, 2016, at 10:55 AM, Mich Talebzadeh  
> wrote:
> 
> Hi,
> 
> You can also do all this at env or submit time with spark-submit which I 
> believe makes it more flexible than coding in.
> 
> Example
> 
> ${SPARK_HOME}/bin/spark-submit \
> --packages com.databricks:spark-csv_2.11:1.3.0 \
> --driver-memory 2G \
> --num-executors 2 \
> --executor-cores 3 \
> --executor-memory 2G \
> --master spark://50.140.197.217:7077 
>  \
> --conf "spark.scheduler.mode=FAIR" \
> --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps" \
> --jars 
> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
> --class "${FILE_NAME}" \
> --conf "spark.ui.port=${SP}" \
>  
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 15 July 2016 at 13:48, Jean Georges Perrin  > wrote:
> Merci Nihed, this is one of the tests I did :( still not working
> 
> 
> 
>> On Jul 15, 2016, at 8:41 AM, nihed mbarek > > wrote:
>> 
>> can you try with : 
>> SparkConf conf = new SparkConf().setAppName("NC Eatery 
>> app").set("spark.executor.memory", "4g")
>>  .setMaster("spark://10.0.100.120:7077 <>");
>>  if (restId == 0) {
>>  conf = conf.set("spark.executor.cores", "22");
>>  } else {
>>  conf = conf.set("spark.executor.cores", "2");
>>  }
>>  JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>> 
>> On Fri, Jul 15, 2016 at 2:31 PM, Jean Georges Perrin > > wrote:
>> Hi,
>> 
>> Configuration: standalone cluster, Java, Spark 1.6.2, 24 cores
>> 
>> My process uses all the cores of my server (good), but I am trying to limit 
>> it so I can actually submit a second job.
>> 
>> I tried
>> 
>>  SparkConf conf = new SparkConf().setAppName("NC Eatery 
>> app").set("spark.executor.memory", "4g")
>>  .setMaster("spark://10.0.100.120:7077 <>");
>>  if (restId == 0) {
>>  conf = conf.set("spark.executor.cores", "22");
>>  } else {
>>  conf = conf.set("spark.executor.cores", "2");
>>  }
>>  JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>> 
>> and
>> 
>>  SparkConf conf = new SparkConf().setAppName("NC Eatery 
>> app").set("spark.executor.memory", "4g")
>>  .setMaster("spark://10.0.100.120:7077 <>");
>>  if (restId == 0) {
>>  conf.set("spark.executor.cores", "22");
>>  } else {
>>  conf.set("spark.executor.cores", "2");
>>  }
>>  JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>> 
>> but it does not seem to take it. Any hint?
>> 
>> jg
>> 
>> 
>> 
>> 
>> 
>> -- 
>> 
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com 
>> 
>>  
>> 
> 
> 



Re: spark.executor.cores

2016-07-15 Thread Mich Talebzadeh
Hi,

You can also do all this at env or submit time with spark-submit which I
believe makes it more flexible than coding in.

Example

${SPARK_HOME}/bin/spark-submit \
--packages com.databricks:spark-csv_2.11:1.3.0 \
--driver-memory 2G \
--num-executors 2 \
--executor-cores 3 \
--executor-memory 2G \
--master spark://50.140.197.217:7077 \
--conf "spark.scheduler.mode=FAIR" \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \
--jars
/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
--class "${FILE_NAME}" \
--conf "spark.ui.port=${SP}" \

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 July 2016 at 13:48, Jean Georges Perrin  wrote:

> Merci Nihed, this is one of the tests I did :( still not working
>
>
>
> On Jul 15, 2016, at 8:41 AM, nihed mbarek  wrote:
>
> can you try with :
> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
> "spark.executor.memory", "4g")
> .setMaster("spark://10.0.100.120:7077");
> if (restId == 0) {
> conf = conf.set("spark.executor.cores", "22");
> } else {
> conf = conf.set("spark.executor.cores", "2");
> }
> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>
> On Fri, Jul 15, 2016 at 2:31 PM, Jean Georges Perrin  wrote:
>
>> Hi,
>>
>> Configuration: standalone cluster, Java, Spark 1.6.2, 24 cores
>>
>> My process uses all the cores of my server (good), but I am trying to
>> limit it so I can actually submit a second job.
>>
>> I tried
>>
>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>> "spark.executor.memory", "4g")
>> .setMaster("spark://10.0.100.120:7077");
>> if (restId == 0) {
>> conf = conf.set("spark.executor.cores", "22");
>> } else {
>> conf = conf.set("spark.executor.cores", "2");
>> }
>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>
>> and
>>
>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>> "spark.executor.memory", "4g")
>> .setMaster("spark://10.0.100.120:7077");
>> if (restId == 0) {
>> conf.set("spark.executor.cores", "22");
>> } else {
>> conf.set("spark.executor.cores", "2");
>> }
>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>
>> but it does not seem to take it. Any hint?
>>
>> jg
>>
>>
>>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> 
>
>
>


Re: repartitionAndSortWithinPartitions HELP

2016-07-15 Thread Koert Kuipers
spark's shuffle mechanism takes care of this kind of optimization
internally when you use the sort-based shuffle (which is the default).

On Thu, Jul 14, 2016 at 2:57 PM, Punit Naik  wrote:

> I meant to say that first we can sort the individual partitions and then
> sort them again by merging. Sort of a divide and conquer mechanism.
> Does sortByKey take care of all this internally?
>
>
> On Fri, Jul 15, 2016 at 12:08 AM, Punit Naik 
> wrote:
>
>> Can we increase the sorting speed of RDD by doing a secondary sort first?
>>
>> On Thu, Jul 14, 2016 at 11:52 PM, Punit Naik 
>> wrote:
>>
>>> Okay. Can't I supply the same partitioner I used for
>>> "repartitionAndSortWithinPartitions" as an argument to "sortByKey"?
>>>
>>> On 14-Jul-2016 11:38 PM, "Koert Kuipers"  wrote:
>>>
 repartitionAndSortWithinPartitions partitions the rdd and sorts within
 each partition. so each partition is fully sorted, but the rdd is not
 sorted.

 sortByKey is basically the same as repartitionAndSortWithinPartitions
 except it uses a range partitioner so that the entire rdd is sorted.
 however since sortByKey uses a different partitioner than
 repartitionAndSortWithinPartitions you do not get much benefit from running
 sortByKey after repartitionAndSortWithinPartitions (because all the data
 will get shuffled again)


 On Thu, Jul 14, 2016 at 1:59 PM, Punit Naik 
 wrote:

> Hi Koert
>
> I have already used "repartitionAndSortWithinPartitions" for secondary
> sorting and it works fine. Just wanted to know whether it will sort the
> entire RDD or not.
>
> On Thu, Jul 14, 2016 at 11:25 PM, Koert Kuipers 
> wrote:
>
>> repartitionAndSortWithinPartit sort by keys, not values per key, so
>> not really secondary sort by itself.
>>
>> for secondary sort also check out:
>> https://github.com/tresata/spark-sorted
>>
>>
>> On Thu, Jul 14, 2016 at 1:09 PM, Punit Naik 
>> wrote:
>>
>>> Hi guys
>>>
>>> In my spark/scala code I am implementing secondary sort. I wanted to
>>> know, when I call the "repartitionAndSortWithinPartitions" method, the
>>> whole (entire) RDD will be sorted or only the individual partitions 
>>> will be
>>> sorted?
>>> If its the latter case, will applying a "sortByKey" after
>>> "repartitionAndSortWithinPartitions" be faster now that the individual
>>> partitions are sorted?
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


>>
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Re: repartitionAndSortWithinPartitions HELP

2016-07-15 Thread Koert Kuipers
sortByKey needs to use a range partitioner, a very particular partitioner,
so you cannot supply your own partitioner.

you should not have to shuffle twice to do a secondary sort algo


On Thu, Jul 14, 2016 at 2:22 PM, Punit Naik  wrote:

> Okay. Can't I supply the same partitioner I used for
> "repartitionAndSortWithinPartitions" as an argument to "sortByKey"?
>
> On 14-Jul-2016 11:38 PM, "Koert Kuipers"  wrote:
>
>> repartitionAndSortWithinPartitions partitions the rdd and sorts within
>> each partition. so each partition is fully sorted, but the rdd is not
>> sorted.
>>
>> sortByKey is basically the same as repartitionAndSortWithinPartitions
>> except it uses a range partitioner so that the entire rdd is sorted.
>> however since sortByKey uses a different partitioner than
>> repartitionAndSortWithinPartitions you do not get much benefit from running
>> sortByKey after repartitionAndSortWithinPartitions (because all the data
>> will get shuffled again)
>>
>>
>> On Thu, Jul 14, 2016 at 1:59 PM, Punit Naik 
>> wrote:
>>
>>> Hi Koert
>>>
>>> I have already used "repartitionAndSortWithinPartitions" for secondary
>>> sorting and it works fine. Just wanted to know whether it will sort the
>>> entire RDD or not.
>>>
>>> On Thu, Jul 14, 2016 at 11:25 PM, Koert Kuipers 
>>> wrote:
>>>
 repartitionAndSortWithinPartit sort by keys, not values per key, so not
 really secondary sort by itself.

 for secondary sort also check out:
 https://github.com/tresata/spark-sorted


 On Thu, Jul 14, 2016 at 1:09 PM, Punit Naik 
 wrote:

> Hi guys
>
> In my spark/scala code I am implementing secondary sort. I wanted to
> know, when I call the "repartitionAndSortWithinPartitions" method, the
> whole (entire) RDD will be sorted or only the individual partitions will 
> be
> sorted?
> If its the latter case, will applying a "sortByKey" after
> "repartitionAndSortWithinPartitions" be faster now that the individual
> partitions are sorted?
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


>>>
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>


How to verify in Spark 1.6.x usage, User Memory used after Cache table

2016-07-15 Thread Yogesh Rajak
Hi Team,

I am using HDP 2.4 Sandbox for checking Spark 1.6 memory feature. I have
connected to spark using spark thrift server through Squirrel (JDBC Client)
and executed the CACHE command to cache the hive table. Command execution is
successful and SQL is returning data  in less than seconds. But i did not
find any way to check if Spark is using User Memory or not. 

Please let me know if we can verify the scenario.   

Thanks,
Yogesh 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-verify-in-Spark-1-6-x-usage-User-Memory-used-after-Cache-table-tp27343.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Error starting HiveServer2: could not start ThriftBinaryCLIService

2016-07-15 Thread ram kumar
Hi all,

I started Hive Thrift Server with command,

/sbin/start-thriftserver.sh --master yarn -hiveconf
hive.server2.thrift.port 10003

The Thrift server started at the particular node without any error.


When doing the same, except pointing to different node to start the server,

./sbin/start-thriftserver.sh --master yarn --hiveconf
hive.server2.thrift.bind.host
DIFFERENT_NODE_IP --hiveconf hive.server2.thrift.port 10003

I am getting following error,

16/07/15 13:04:35 INFO service.AbstractService:
Service:ThriftBinaryCLIService is started.
16/07/15 13:04:35 INFO service.AbstractService: Service:HiveServer2 is
started.
16/07/15 13:04:35 INFO thriftserver.HiveThriftServer2: HiveThriftServer2
started
16/07/15 13:04:36 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create
ServerSocket on address DIFFERENT_NODE_IP:10003.
at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:93)
at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:79)
at
org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
at
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
at java.lang.Thread.run(Thread.java:745)

Can anyone help me with this?

Thanks,
Ram.


Re: spark.executor.cores

2016-07-15 Thread Jean Georges Perrin
Merci Nihed, this is one of the tests I did :( still not working



> On Jul 15, 2016, at 8:41 AM, nihed mbarek  wrote:
> 
> can you try with : 
> SparkConf conf = new SparkConf().setAppName("NC Eatery 
> app").set("spark.executor.memory", "4g")
>   .setMaster("spark://10.0.100.120:7077 <>");
>   if (restId == 0) {
>   conf = conf.set("spark.executor.cores", "22");
>   } else {
>   conf = conf.set("spark.executor.cores", "2");
>   }
>   JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
> 
> On Fri, Jul 15, 2016 at 2:31 PM, Jean Georges Perrin  > wrote:
> Hi,
> 
> Configuration: standalone cluster, Java, Spark 1.6.2, 24 cores
> 
> My process uses all the cores of my server (good), but I am trying to limit 
> it so I can actually submit a second job.
> 
> I tried
> 
>   SparkConf conf = new SparkConf().setAppName("NC Eatery 
> app").set("spark.executor.memory", "4g")
>   .setMaster("spark://10.0.100.120:7077 <>");
>   if (restId == 0) {
>   conf = conf.set("spark.executor.cores", "22");
>   } else {
>   conf = conf.set("spark.executor.cores", "2");
>   }
>   JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
> 
> and
> 
>   SparkConf conf = new SparkConf().setAppName("NC Eatery 
> app").set("spark.executor.memory", "4g")
>   .setMaster("spark://10.0.100.120:7077 <>");
>   if (restId == 0) {
>   conf.set("spark.executor.cores", "22");
>   } else {
>   conf.set("spark.executor.cores", "2");
>   }
>   JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
> 
> but it does not seem to take it. Any hint?
> 
> jg
> 
> 
> 
> 
> 
> -- 
> 
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com 
> 
>  
> 



XML

2016-07-15 Thread VON RUEDEN, Jonathan
Hi everyone,

I want to read an XML file with multiple attributes per tag and would need some 
help. I am able to read and process the sample files but can't find a solution 
for my XML.
Here's the file structure:


   
   
  https://githudoc.doc.doc.doc.docm.md; 
severity="Info" />
  https://github.wdf.sap.coath.oath/pathpath.nmd; severity="Info" />
   



--> Is there any way I can have com.databricks.spark.xml write all the 
attributes into one cell as a string and I come up with my own way of splitting 
and transforming this into a table? Do you guys know how I can read in such a 
file.
thanks much,
best,
jonathan


[SAP_grad_R_pref.png]

Jonathan von Rüden
Enterprise Analytics

SAP France | Paris
Mobile: +33 68 221-2425
Email: jonathan.von.rue...@sap.com



Re: spark.executor.cores

2016-07-15 Thread nihed mbarek
can you try with :
SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
"spark.executor.memory", "4g")
.setMaster("spark://10.0.100.120:7077");
if (restId == 0) {
conf = conf.set("spark.executor.cores", "22");
} else {
conf = conf.set("spark.executor.cores", "2");
}
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

On Fri, Jul 15, 2016 at 2:31 PM, Jean Georges Perrin  wrote:

> Hi,
>
> Configuration: standalone cluster, Java, Spark 1.6.2, 24 cores
>
> My process uses all the cores of my server (good), but I am trying to
> limit it so I can actually submit a second job.
>
> I tried
>
> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
> "spark.executor.memory", "4g")
> .setMaster("spark://10.0.100.120:7077");
> if (restId == 0) {
> conf = conf.set("spark.executor.cores", "22");
> } else {
> conf = conf.set("spark.executor.cores", "2");
> }
> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>
> and
>
> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
> "spark.executor.memory", "4g")
> .setMaster("spark://10.0.100.120:7077");
> if (restId == 0) {
> conf.set("spark.executor.cores", "22");
> } else {
> conf.set("spark.executor.cores", "2");
> }
> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>
> but it does not seem to take it. Any hint?
>
> jg
>
>
>


-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




spark.executor.cores

2016-07-15 Thread Jean Georges Perrin
Hi,

Configuration: standalone cluster, Java, Spark 1.6.2, 24 cores

My process uses all the cores of my server (good), but I am trying to limit it 
so I can actually submit a second job.

I tried

SparkConf conf = new SparkConf().setAppName("NC Eatery 
app").set("spark.executor.memory", "4g")
.setMaster("spark://10.0.100.120:7077");
if (restId == 0) {
conf = conf.set("spark.executor.cores", "22");
} else {
conf = conf.set("spark.executor.cores", "2");
}
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

and

SparkConf conf = new SparkConf().setAppName("NC Eatery 
app").set("spark.executor.memory", "4g")
.setMaster("spark://10.0.100.120:7077");
if (restId == 0) {
conf.set("spark.executor.cores", "22");
} else {
conf.set("spark.executor.cores", "2");
}
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

but it does not seem to take it. Any hint?

jg




Random Forest gererate model failed (DecisionTree.scala:642), which has no missing parents

2016-07-15 Thread Ascot Moss
Hi,

I am trying to create the Random Forest model, my source_code as follows:
val rf_model  = RandomForest.trainClassifier(trainData, 7, Map[Int,Int](),
20, "auto", "entropy", 30, 300)


I got following error:

##

16/07/15 19:55:04 INFO TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks
have all completed, from pool

16/07/15 19:55:04 INFO DAGScheduler: ShuffleMapStage 21 (mapPartitions at
DecisionTree.scala:622) finished in 2.685 s

16/07/15 19:55:04 INFO DAGScheduler: looking for newly runnable stages

16/07/15 19:55:04 INFO DAGScheduler: running: Set()

16/07/15 19:55:04 INFO DAGScheduler: waiting: Set(ResultStage 22)

16/07/15 19:55:04 INFO DAGScheduler: failed: Set()

16/07/15 19:55:04 INFO DAGScheduler: Submitting ResultStage 22
(MapPartitionsRDD[43] at map at DecisionTree.scala:642), which has no
missing parents

Killed

##


Any idea what is wrong?

Regards


Random Forest Job got killed (DAGScheduler: failed: Set() , DecisionTree.scala:642), which has no missing parents)

2016-07-15 Thread Ascot Moss
Hi,

I am trying to create the Random Forest model, my source_code as follows:
val rf_model  = edhRF.trainClassifier(trainData, 7, Map[Int,Int](), 20,
"auto", "entropy", 30, 300)


I got following error:

##

16/07/15 19:55:04 INFO TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks
have all completed, from pool

16/07/15 19:55:04 INFO DAGScheduler: ShuffleMapStage 21 (mapPartitions at
DecisionTree.scala:622) finished in 2.685 s

16/07/15 19:55:04 INFO DAGScheduler: looking for newly runnable stages

16/07/15 19:55:04 INFO DAGScheduler: running: Set()

16/07/15 19:55:04 INFO DAGScheduler: waiting: Set(ResultStage 22)

16/07/15 19:55:04 INFO DAGScheduler: failed: Set()

16/07/15 19:55:04 INFO DAGScheduler: Submitting ResultStage 22
(MapPartitionsRDD[43] at map at DecisionTree.scala:642), which has no
missing parents

Killed

##


Any idea what is wrong?

Regards


Re: How to recommend most similar users using Spark ML

2016-07-15 Thread nguyen duc Tuan
Hi jeremycod,
If you want to find top N nearest neighbors for all users using exact top-k
algorithm for all users, I recommend using the same approach as  as used in
Mllib :
https://github.com/apache/spark/blob/85d6b0db9f5bd425c36482ffcb1c3b9fd0fcdb31/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L272

If the number of users is large, the exact topk algorithm can rather slow,
try using approximate nearest neighbors algorithm. There's is a good
benchmark of various libraries that can be found here:
https://github.com/erikbern/ann-benchmarks

2016-07-15 10:36 GMT+07:00 jeremycod :

> Hi,
>
> I need to develop a service that will recommend user with other similar
> users that he can connect to. For each user I have a data about user
> preferences for specific items in the form:
>
> user, item, preference
> 1,75,   0.89
> 2,168,  0.478
> 2,99,   0.321
> 3,31,   0.012
>
> So far, I implemented approach using cosine similarity that compare one
> user
> features vector with other users:
>
> def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double=
> {
> vec1.dot(vec2)/(vec1.norm2()*vec2.norm2())
> }
> def user2usersimilarity(userid:Integer, recNumber:Integer): Unit ={
> val userFactor=model.userFeatures.lookup(userid).head
> val userVector=new DoubleMatrix(userFactor)
> val s1=cosineSimilarity(userVector,userVector)
> val sims=model.userFeatures.map{case(id,factor)=>
> val factorVector=new DoubleMatrix(factor)
> val sim=cosineSimilarity(factorVector, userVector)
> (id,sim)
> }
> val sortedSims=sims.top(recNumber+1)(Ordering.by[(Int, Double),Double]
> {case(id, similarity)=>similarity})
> println(sortedSims.slice(1,recNumber+1).mkString("\n"))
>  }
>
> This approach works fine with the MovieLens dataset in terms of quality of
> recommendations. However, my concern is related to performance of such
> algorithm. Since I have to generate recommendations for all users in the
> system, with this approach I would compare each user with all other users
> in
> the system.
>
> I would appreciate if somebody could suggest how to limit comparison of the
> user to top N neighbors, or some other algorithm that would work better in
> my use case.
>
> Thanks,
> Zoran
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recommend-most-similar-users-using-Spark-ML-tp27342.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Input path does not exist error in giving input file for word count program

2016-07-15 Thread Ted Yu
>From 
>examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
:

val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))

In your case, looks like inputfile didn't correspond to an existing path.

On Fri, Jul 15, 2016 at 1:05 AM, RK Spark  wrote:

> val count = inputfile.flatMap(line => line.split(" ")).map(word =>
> (word,1)).reduceByKey(_ + _);
> org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
>


RE: Presentation in London: Running Spark on Hive or Hive on Spark

2016-07-15 Thread Joaquin Alzola
It is on the 20th (Wednesday) next week.

From: Marco Mistroni [mailto:mmistr...@gmail.com]
Sent: 15 July 2016 11:04
To: Mich Talebzadeh 
Cc: user @spark ; user 
Subject: Re: Presentation in London: Running Spark on Hive or Hive on Spark

Dr Mich
  do you have any slides or videos available for the presentation you did 
@Canary Wharf?
kindest regards
 marco

On Wed, Jul 6, 2016 at 10:37 PM, Mich Talebzadeh 
> wrote:
Dear forum members


I will be presenting on the topic of "Running Spark on Hive or Hive on Spark, 
your mileage varies" in Future of Data: 
London

Details

Organized by: Hortonworks

Date: Wednesday, July 20, 2016, 6:00 PM to 8:30 PM

Place: London

Location: One Canada Square, Canary Wharf,  London E14 5AB.

Nearest Underground:  Canary Warf 
(map)

If you are interested please register 
here

Looking forward to seeing those who can make it to have an interesting 
discussion and leverage your experience.
Regards,


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



This email is confidential and may be subject to privilege. If you are not the 
intended recipient, please do not copy or disclose its content but contact the 
sender immediately upon receipt.


Re: Presentation in London: Running Spark on Hive or Hive on Spark

2016-07-15 Thread Marco Mistroni
Dr Mich
  do you have any slides or videos available for the presentation you did
@Canary Wharf?
kindest regards
 marco

On Wed, Jul 6, 2016 at 10:37 PM, Mich Talebzadeh 
wrote:

> Dear forum members
>
> I will be presenting on the topic of "Running Spark on Hive or Hive on
> Spark, your mileage varies" in Future of Data: London
> 
>
> *Details*
>
> *Organized by: Hortonworks *
>
> *Date: Wednesday, July 20, 2016, 6:00 PM to 8:30 PM *
>
> *Place: London*
>
> *Location: One Canada Square, Canary Wharf,  London E14 5AB.*
>
> *Nearest Underground:  Canary Warf (map
> )
> *
>
> If you are interested please register here
> 
>
> Looking forward to seeing those who can make it to have an interesting
> discussion and leverage your experience.
> Regards,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Call http request from within Spark

2016-07-15 Thread ayan guha
Can you explain what do you mean by count never stops?
On 15 Jul 2016 00:53, "Amit Dutta"  wrote:

> Hi All,
>
>
> I have a requirement to call a rest service url for 300k customer ids.
>
> Things I have tried so far is
>
>
> custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User
> Hashed LCID List.csv') #getting all the customer ids and building adds
>
> profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0]))
>
> profile_rdd.count()
>
>
> #getprofile is the method to do the http call
>
> def getProfile(cust_id):
>
> api_key = 'txt'
>
> api_secret = 'yuyuy'
>
> profile_uri = 'https://profile.localytics.com/x1/customers/{}'
>
> customer_id = cust_id
>
>
> if customer_id is not None:
>
> data = requests.get(profile_uri.format(customer_id),
> auth=requests.auth.HTTPBasicAuth(api_key, api_secret))
>
> # print json.dumps(data.json(), indent=4)
>
> return data
>
>
> when I print the json dump of the data i see it returning results from the
> rest call. But the count never stops.
>
>
> Is there an efficient way of dealing this? Some post says we have to
> define a batch size etc but don't know how.
>
>
> Appreciate your help
>
>
> Regards,
>
> Amit
>
>


Re: scala.MatchError on stand-alone cluster mode

2016-07-15 Thread Saisai Shao
The error stack is throwing from your code:

Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
[Ljava.lang.String;)
at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)

I think you should debug the code yourself, it may not be the problem of
Spark.

On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng  wrote:

> Hi,
>
> I have a Spark Streaming job written in Scala and is running well on local
> and client mode, but when I submit it on cluster mode, the driver reported
> an error shown as below.
> Is there anyone know what is wrong here?
> pls help me!
>
> the Job CODE is after
>
> 16/07/14 17:28:21 DEBUG ByteBufUtil:
> -Dio.netty.threadLocalDirectBufferSize: 65536
> 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
> 0:0:0:0:0:0:0:1%lo)
> 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
> 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
> :43492
> 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on
> port 43492.
> 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
> Worker@172.20.130.98:23933
> 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to
> /172.20.130.98:23933
> Exception in thread "main" java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
> at
> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
> [Ljava.lang.String;)
> at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
> at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
> ... 6 more
>
> ==
> Job CODE:
>
> object LogAggregator {
>
>   val batchDuration = Seconds(5)
>
>   def main(args:Array[String]) {
>
> val usage =
>   """Usage: LogAggregator 
> 
> |  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field 
> must have both name and role
> |  logFormat.role: can be key|avg|enum|sum|ignore
>   """.stripMargin
>
> if (args.length < 9) {
>   System.err.println(usage)
>   System.exit(1)
> }
>
> val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, 
> batchDuration, destType, destPath) = args
>
> println("Start streaming calculation...")
>
> val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
> val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>
> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
> topicMap).map(_._2)
>
> val logFields = logFormat.split(",").map(field => {
>   val fld = field.split(":")
>   if (fld.size != 2) {
> System.err.println("Wrong parameters for logFormat!\n")
> System.err.println(usage)
> System.exit(1)
>   }
>   // TODO: ensure the field has both 'name' and 'role'
>   new LogField(fld(0), fld(1))
> })
>
> val keyFields = logFields.filter(logFieldName => {
>   logFieldName.role == "key"
> })
> val keys = keyFields.map(key => {
>   key.name
> })
>
> val logsByKey = lines.map(line => {
>   val log = new Log(logFields, line, logSeparator)
>   log.toMap
> }).filter(log => log.nonEmpty).map(log => {
>   val keys = keyFields.map(logField => {
> log(logField.name).value
>   })
>
>   val key = keys.reduce((key1, key2) => {
> key1.asInstanceOf[String] + key2.asInstanceOf[String]
>   })
>
>   val fullLog = log + ("count" -> new LogSegment("sum", 1))
>
>   (key, fullLog)
> })
>
>
> val aggResults = logsByKey.reduceByKey((log_a, log_b) => {
>
>   log_a.map(logField => {
> val logFieldName = logField._1
> val logSegment_a = logField._2
> val logSegment_b = log_b(logFieldName)
>
> val segValue = logSegment_a.role match {
>   case "avg" => {
> logSegment_a.value.toString.toInt + 
> logSegment_b.value.toString.toInt
>   }
>   case "sum" => {
> logSegment_a.value.toString.toInt + 
> logSegment_b.value.toString.toInt
>   }
>   case "enum" => {
> val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
> val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
> list_a ++ list_b
>   }
>   case _ => logSegment_a.value
> }
> 

Input path does not exist error in giving input file for word count program

2016-07-15 Thread RK Spark
val count = inputfile.flatMap(line => line.split(" ")).map(word =>
(word,1)).reduceByKey(_ + _);
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:


Re: Getting error in inputfile | inputFile

2016-07-15 Thread RK Spark
scala> val count = inputfile.flatMap(line => line.split((" ").map(word =>
(word,1)).reduceByKey(_ + _) | | You typed two blank lines. Starting a new
command.
I am getting like how to solve this

Regrads,

Ramkrishna KT


Re: Getting error in inputfile | inputFile

2016-07-15 Thread ram kumar
check the "*inputFile*" variable name
lol

On Fri, Jul 15, 2016 at 12:12 PM, RK Spark  wrote:

> I am using Spark version is 1.5.1, I am getting errors in first program of
> spark,ie.e., word count. Please help me to solve this
>
> *scala> val inputfile = sc.textFile("input.txt")*
> *inputfile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at
> textFile at :21*
>
> *scala> val counts = inputFile.flatMap(line => line.split(" ")).map(word
> => (word,1)).reduceByKey(_ + _);*
> *:19: error: not found: value inputFile*
> *   val counts = inputFile.flatMap(line => line.split(" ")).map(word
> => (word,1)).reduceByKey(_ + _);*
> *^*
>
>


scala.MatchError on stand-alone cluster mode

2016-07-15 Thread Mekal Zheng
Hi,

I have a Spark Streaming job written in Scala and is running well on local
and client mode, but when I submit it on cluster mode, the driver reported
an error shown as below.
Is there anyone know what is wrong here?
pls help me!

the Job CODE is after

16/07/14 17:28:21 DEBUG ByteBufUtil:
-Dio.netty.threadLocalDirectBufferSize: 65536
16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
0:0:0:0:0:0:0:1%lo)
16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
:43492
16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on port
43492.
16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
Worker@172.20.130.98:23933
16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to /
172.20.130.98:23933
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
[Ljava.lang.String;)
at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
... 6 more

==
Job CODE:

object LogAggregator {

  val batchDuration = Seconds(5)

  def main(args:Array[String]) {

val usage =
  """Usage: LogAggregator

|  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each
field must have both name and role
|  logFormat.role: can be key|avg|enum|sum|ignore
  """.stripMargin

if (args.length < 9) {
  System.err.println(usage)
  System.exit(1)
}

val Array(zkQuorum, group, topics, numThreads, logFormat,
logSeparator, batchDuration, destType, destPath) = args

println("Start streaming calculation...")

val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

val logFields = logFormat.split(",").map(field => {
  val fld = field.split(":")
  if (fld.size != 2) {
System.err.println("Wrong parameters for logFormat!\n")
System.err.println(usage)
System.exit(1)
  }
  // TODO: ensure the field has both 'name' and 'role'
  new LogField(fld(0), fld(1))
})

val keyFields = logFields.filter(logFieldName => {
  logFieldName.role == "key"
})
val keys = keyFields.map(key => {
  key.name
})

val logsByKey = lines.map(line => {
  val log = new Log(logFields, line, logSeparator)
  log.toMap
}).filter(log => log.nonEmpty).map(log => {
  val keys = keyFields.map(logField => {
log(logField.name).value
  })

  val key = keys.reduce((key1, key2) => {
key1.asInstanceOf[String] + key2.asInstanceOf[String]
  })

  val fullLog = log + ("count" -> new LogSegment("sum", 1))

  (key, fullLog)
})


val aggResults = logsByKey.reduceByKey((log_a, log_b) => {

  log_a.map(logField => {
val logFieldName = logField._1
val logSegment_a = logField._2
val logSegment_b = log_b(logFieldName)

val segValue = logSegment_a.role match {
  case "avg" => {
logSegment_a.value.toString.toInt +
logSegment_b.value.toString.toInt
  }
  case "sum" => {
logSegment_a.value.toString.toInt +
logSegment_b.value.toString.toInt
  }
  case "enum" => {
val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
list_a ++ list_b
  }
  case _ => logSegment_a.value
}
(logFieldName, new LogSegment(logSegment_a.role, segValue))
  })
}).map(logRecord => {
  val log = logRecord._2
  val count = log("count").value.toString.toInt


  val logContent = log.map(logField => {
val logFieldName = logField._1
val logSegment = logField._2
val fieldValue = logSegment.role match {
  case "avg" => {
logSegment.value.toString.toInt / count
  }
  case "enum" => {
val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
val enumJson = enumList.groupBy(_._1).map(el =>
el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt +
e2._2.toString.toInt)))
   

Getting error in inputfile | inputFile

2016-07-15 Thread RK Spark
I am using Spark version is 1.5.1, I am getting errors in first program of
spark,ie.e., word count. Please help me to solve this

*scala> val inputfile = sc.textFile("input.txt")*
*inputfile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at
textFile at :21*

*scala> val counts = inputFile.flatMap(line => line.split(" ")).map(word =>
(word,1)).reduceByKey(_ + _);*
*:19: error: not found: value inputFile*
*   val counts = inputFile.flatMap(line => line.split(" ")).map(word =>
(word,1)).reduceByKey(_ + _);*
*^*


find two consective points

2016-07-15 Thread Divya Gehlot
Hi,
I have huge data set like similar below :
timestamp,fieldid,point_id
1468564189,89,1
1468564090,76,4
1468304090,89,9
1468304090,54,6
1468304090,54,4


Have configuration file of consecutive points --
1,9
4,6


like 1 and 9 are consecutive points similarly 4,6 are consecutive points

Now I need to group the data on field id with consecutive points
like the
sample output should look like
89, 1,4
54,4,6

Can somebody help me doing it in spark.


Thanks,
Divya