Re: Is Branch 1.0 build broken ?

2014-04-10 Thread Sean Owen
The error is not about the build but an external repo. This almost always
means you have some trouble accessing all the repos from your environment.
Do you need proxy settings? Any other errors in the log about why you can't
access it?
On Apr 11, 2014 12:32 AM, "Chester Chen"  wrote:

> I just updated and got the following:
>
>
> [error] (external-mqtt/*:update) sbt.ResolveException: unresolved
> dependency: org.eclipse.paho#mqtt-client;0.4.0: not found
> [error] Total time: 7 s, completed Apr 10, 2014 4:27:09 PM
> Chesters-MacBook-Pro:spark chester$ git branch
> * branch-1.0
>   master
>
> Looks like certain dependency "mqtt-client" resolver is not specified.
>
> Chester
>


Re: programmatic way to tell Spark version

2014-04-10 Thread Shixiong Zhu
Hi Patrick,

You should use classOf[SparkContext].getPackage.getImplementationVersion

classOf[SparkContext].getClass.getPackage.getImplementationVersion is used
to get the version of java.lang.Class. That's the JVM version.



Best Regards,
Shixiong Zhu


2014-04-11 9:06 GMT+08:00 Nicholas Chammas :

> Looks like it. I'm guessing this didn't make the cut for 0.9.1, and will
> instead be included with 1.0.0.
>
> So would you access it just by calling sc.version from the shell? And will
> this automatically make it into the Python API?
>
> I'll mark the JIRA issue as resolved.
>
>
> On Thu, Apr 10, 2014 at 5:05 PM, Patrick Wendell wrote:
>
>> I think this was solved in a recent merge:
>>
>>
>> https://github.com/apache/spark/pull/204/files#diff-364713d7776956cb8b0a771e9b62f82dR779
>>
>> Is that what you are looking for? If so, mind marking the JIRA as
>> resolved?
>>
>>
>> On Wed, Apr 9, 2014 at 3:30 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> Hey Patrick,
>>>
>>> I've created SPARK-1458 to
>>> track this request, in case the team/community wants to implement it in the
>>> future.
>>>
>>> Nick
>>>
>>>
>>> On Sat, Feb 22, 2014 at 7:25 PM, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 No use case at the moment.

 What prompted the question: I was going to ask a different question on
 this list and wanted to note my version of Spark. I assumed there would be
 a getVersion method on SparkContext or something like that, but I couldn't
 find one in the docs. I also couldn't find an environment variable with the
 version. After futzing around a bit I realized it was printed out (quite
 conspicuously) in the shell startup banner.


 On Sat, Feb 22, 2014 at 7:15 PM, Patrick Wendell wrote:

> AFIAK - We don't have any way to do this right now. Maybe we could add
> a getVersion method to SparkContext that would tell you. Just
> wondering - what is the use case here?
>
> - Patrick
>
> On Sat, Feb 22, 2014 at 4:04 PM, nicholas.chammas
>  wrote:
> > Is there a programmatic way to tell what version of Spark I'm
> running?
> >
> > I know I can look at the banner when the Spark shell starts up, but
> I'm
> > curious to know if there's another way.
> >
> > Nick
> >
> >
> > 
> > View this message in context: programmatic way to tell Spark version
> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>


>>>
>>
>


Re: programmatic way to tell Spark version

2014-04-10 Thread Nicholas Chammas
Looks like it. I'm guessing this didn't make the cut for 0.9.1, and will
instead be included with 1.0.0.

So would you access it just by calling sc.version from the shell? And will
this automatically make it into the Python API?

I'll mark the JIRA issue as resolved.


On Thu, Apr 10, 2014 at 5:05 PM, Patrick Wendell  wrote:

> I think this was solved in a recent merge:
>
>
> https://github.com/apache/spark/pull/204/files#diff-364713d7776956cb8b0a771e9b62f82dR779
>
> Is that what you are looking for? If so, mind marking the JIRA as resolved?
>
>
> On Wed, Apr 9, 2014 at 3:30 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Hey Patrick,
>>
>> I've created SPARK-1458 to
>> track this request, in case the team/community wants to implement it in the
>> future.
>>
>> Nick
>>
>>
>> On Sat, Feb 22, 2014 at 7:25 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> No use case at the moment.
>>>
>>> What prompted the question: I was going to ask a different question on
>>> this list and wanted to note my version of Spark. I assumed there would be
>>> a getVersion method on SparkContext or something like that, but I couldn't
>>> find one in the docs. I also couldn't find an environment variable with the
>>> version. After futzing around a bit I realized it was printed out (quite
>>> conspicuously) in the shell startup banner.
>>>
>>>
>>> On Sat, Feb 22, 2014 at 7:15 PM, Patrick Wendell wrote:
>>>
 AFIAK - We don't have any way to do this right now. Maybe we could add
 a getVersion method to SparkContext that would tell you. Just
 wondering - what is the use case here?

 - Patrick

 On Sat, Feb 22, 2014 at 4:04 PM, nicholas.chammas
  wrote:
 > Is there a programmatic way to tell what version of Spark I'm running?
 >
 > I know I can look at the banner when the Spark shell starts up, but
 I'm
 > curious to know if there's another way.
 >
 > Nick
 >
 >
 > 
 > View this message in context: programmatic way to tell Spark version
 > Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

>>>
>>>
>>
>


Re: Spark 0.9.1 PySpark ImportError

2014-04-10 Thread Matei Zaharia
Kind of strange because we haven’t updated CloudPickle AFAIK. Is this a package 
you added on the PYTHONPATH? How did you set the path, was it in 
conf/spark-env.sh?

Matei

On Apr 10, 2014, at 7:39 AM, aazout  wrote:

> I am getting a python ImportError on Spark standalone cluster. I have set the
> PYTHONPATH on both worker and slave and the package imports properly when I
> run PySpark command line on both machines. This only happens with Master -
> Slave communication. Here is the error below: 
> 
> 14/04/10 13:40:19 INFO scheduler.TaskSetManager: Loss was due to
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last): 
>  File "/root/spark/python/pyspark/worker.py", line 73, in main 
>command = pickleSer._read_with_length(infile) 
>  File "/root/spark/python/pyspark/serializers.py", line 137, in
> _read_with_length 
>return self.loads(obj) 
>  File "/root/spark/python/pyspark/cloudpickle.py", line 810, in subimport 
>__import__(name) 
> ImportError: ('No module named volatility.atm_impl_vol',  at 0xa36050>, ('volatility.atm_impl_vol',)) 
> 
> Any ideas?
> 
> 
> 
> -
> CEO / Velos (velos.io)
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-0-9-1-PySpark-ImportError-tp4068.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Is Branch 1.0 build broken ?

2014-04-10 Thread Chester Chen
I just updated and got the following: 


[error] (external-mqtt/*:update) sbt.ResolveException: unresolved dependency: 
org.eclipse.paho#mqtt-client;0.4.0: not found
[error] Total time: 7 s, completed Apr 10, 2014 4:27:09 PM
Chesters-MacBook-Pro:spark chester$ git branch
* branch-1.0
  master

Looks like certain dependency "mqtt-client" resolver is not specified. 

Chester

Re: SparkR with Sequence Files

2014-04-10 Thread Shivaram Venkataraman
SparkR doesn't support reading in SequenceFiles yet. It is a often
requested feature though and we are working on it. Note that it is tricky
to support sequence files and though there were discussions[1], this isn't
supported in PySpark as well.

Thanks
Shivaram

[1]
http://mail-archives.apache.org/mod_mbox/spark-dev/201310.mbox/%3ccald+6go4b2t+w6cspxutmjxteipyjzw0fd-1dshtotqtwt9...@mail.gmail.com%3E


On Thu, Apr 10, 2014 at 3:37 PM, Gary Malouf  wrote:

> Has anyone been using SparkR to work with data from sequence files?  We
> use protobuf throughout our system and are considering whether to try out
> SparkR.
>


SparkR with Sequence Files

2014-04-10 Thread Gary Malouf
Has anyone been using SparkR to work with data from sequence files?  We use
protobuf throughout our system and are considering whether to try out
SparkR.


Re: Using pyspark shell in local[n] (single machine) mode unnecessarily tries to connect to HDFS NameNode ...

2014-04-10 Thread Aaron Davidson
This is likely because hdfs's core-site.xml (or something similar) provides
an "fs.default.name" which changes the default FileSystem and Spark uses
the Hadoop FileSystem API to resolve paths. Anyway, your solution is
definitely a good one -- another would be to remote hdfs from Spark's
classpath if you didn't want it, or to specify an overriding fs.default.name
.


On Thu, Apr 10, 2014 at 2:30 PM, didata.us  wrote:

>  Hi:
>
> I believe I figured out how the behavior here:
>
> A file specified to SparkContext like this '*/path/to/some/file*':
>
>- Will be interpreted as '*hdfs://*path/to/some/file', when settings
>for HDFS are present in '*/etc/hadoop/conf/*-site.xml*'.
>- Will be interpreted as '*file:///*path/to/some/file', (i.e. locally)
>otherwise.
>
> I confirmed this behavior by temporarily doing this:
>
>- user$ sudo mv /etc/hadoop/conf /etc/hadoop/conf_
>
> after which I re-ran my commands below. This time the SparkContext did,
> indeed, look for and found, the file locally.
>
> In summary, '*/path/to/some/file*' is interpreted as an in-HDFS relative
> path when a HDFS configuration is found; and interpreted as an absolute
> local UNIX file path when a HDFS configuration is *not* found.
>
> To be on the safe side, it's probably best to qualify local files with '
> *file:///*' when that is what's intended; ahd with 'hdfs://' when HDFS is
> what's intended.
>
> Hopes this helps someone. :)
>
> ---
> Sincerely,
> Noel M. Vega
> DiData
> www.didata.us
>
>  On 2014-04-10 14:53, DiData wrote:
>
> Hi Alton:
>
> Thanks for the reply. I just wanted to build/use it from scratch to get a
> better intuition of what's a happening.
>
> Btw, using the binaries provided by Cloudera/CDH5 yielded the same issue
> as my compiled version (i.e. it, too,
> tried to access the HDFS / Name Node. Same exact error).
>
> However, a small breakthrough. Just now I tinkered some more and found
> that this variation works:
>
> REPLACE THIS: >>> distData =
> sc.textFile('/home/user/Download/ml-10M100K/ratings.dat') WITH THIS: 
> >>>distData = sc.textFile('
> *file:///*home/user/Download/ml-10M100K/ratings.dat') That is, use '
> file:///'. I don't know if that is the correct way of specifying the URI
> for local files, or whether this just *happens to work*. The documents that
> I've read thus far haven't shown it that specified way, but I still have
> more to read.
>
>   =:)
>
>
> Thank you,
>
> ~NMV
>
>
> On 04/10/2014 04:20 PM, Alton Alexander wrote:
>
> I am doing the exact same thing for the purpose of learning. I also
> don't have a hadoop cluster and plan to scale on ec2 as soon as I get
> it working locally.
>
> I am having good success just using the binaries on and not compiling
> from source... Is there a reason why you aren't just using the
> binaries?
>
> On Thu, Apr 10, 2014 at 1:30 PM, DiData  
>  wrote:
>
>  Hello friends:
>
> I recently compiled and installed Spark v0.9 from the Apache distribution.
>
> Note: I have the Cloudera/CDH5 Spark RPMs co-installed as well (actually,
> the
> entire big-data suite from CDH is installed), but for the moment I'm using
> my
> manually built Apache Spark for 'ground-up' learning purposes.
>
> Now, prior to compilation (i.e. 'sbt/sbt clean compile') I specified the
> following:
>
>   export SPARK_YARN=true
>   export SPARK_HADOOP_VERSION=2.3.0-cdh5.0.0
>
> The resulting examples ran fine locally as well as on YARN.
>
> I'm not interested in YARN here; just mentioning it for completeness in case
> that matters in
> my upcoming question. Here is my issue / question:
>
> I start pyspark locally -- on one machine for API learning purposes -- as
> shown below, and attempt to
> interact with a local text file (not in HDFS). Unfortunately, the
> SparkContext (sc) tries to connect to
> a HDFS Name Node (which I don't currently have enabled because I don't need
> it).
>
> The SparkContext cleverly inspects the configurations in my
> '/etc/hadoop/conf/' directory to learn
> where my Name Node is, however I don't want it to do that in this case. I
> just want it to run a
> one-machine local version of 'pyspark'.
>
> Did I miss something in my invocation/use of 'pyspark' below? Do I need to
> add something else?
>
> (Btw: I searched but could not find any solutions, and the documentation,
> while good, doesn't
> quite get me there).
>
> See below, and thank you all in advance!
>
>
> user$ export PYSPARK_PYTHON=/usr/bin/bpython
> user$ export MASTER=local[8]
> user$ /home/user/APPS.d/SPARK.d/latest/bin/pyspark
>   #
> ===
>   >>> sc
>   
>   >>>
>   >>> distData = sc.textFile('/home/user/Download/ml-10M100K/ratings.dat')
>   >>> distData.count()
>   [ ... snip ... ]
>   Py4JJavaError: An error occurred while calling o21.collect.
>   : java.net.ConnectException: Call From server01/192.168.0.15 to
> namenode:8020 failed on connection exception:
> java.net.ConnectException: 

Re: programmatic way to tell Spark version

2014-04-10 Thread Patrick Wendell
Pierre - I'm not sure that would work. I just opened a Spark shell and did
this:

scala> classOf[SparkContext].getClass.getPackage.getImplementationVersion
res4: String = 1.7.0_25

It looks like this is the JVM version.

- Patrick


On Thu, Apr 10, 2014 at 2:08 PM, Pierre Borckmans <
pierre.borckm...@realimpactanalytics.com> wrote:

> I see that this was fixed using a fixed string in SparkContext.scala.
> Wouldn't it be better to use something like:
>
> getClass.getPackage.getImplementationVersion
>
> to get the version from the jar manifest (and thus from the sbt
> definition)?
>
> The same holds for SparkILoopInit.scala in the welcome message
> (printWelcome).
>
> This would avoid having to modify these strings at each release.
>
> cheers
>
>
>
> *Pierre Borckmans*
>
> *Real**Impact* Analytics *| *Brussels Office
> www.realimpactanalytics.com *| 
> *pierre.borckm...@realimpactanalytics.com
>
> *FR *+32 485 91 87 31 *| **Skype* pierre.borckmans
>
>
>
>
>
> On 10 Apr 2014, at 23:05, Patrick Wendell  wrote:
>
> I think this was solved in a recent merge:
>
>
> https://github.com/apache/spark/pull/204/files#diff-364713d7776956cb8b0a771e9b62f82dR779
>
> Is that what you are looking for? If so, mind marking the JIRA as resolved?
>
>
> On Wed, Apr 9, 2014 at 3:30 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Hey Patrick,
>>
>> I've created SPARK-1458 to
>> track this request, in case the team/community wants to implement it in the
>> future.
>>
>> Nick
>>
>>
>> On Sat, Feb 22, 2014 at 7:25 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> No use case at the moment.
>>>
>>> What prompted the question: I was going to ask a different question on
>>> this list and wanted to note my version of Spark. I assumed there would be
>>> a getVersion method on SparkContext or something like that, but I couldn't
>>> find one in the docs. I also couldn't find an environment variable with the
>>> version. After futzing around a bit I realized it was printed out (quite
>>> conspicuously) in the shell startup banner.
>>>
>>>
>>> On Sat, Feb 22, 2014 at 7:15 PM, Patrick Wendell wrote:
>>>
 AFIAK - We don't have any way to do this right now. Maybe we could add
 a getVersion method to SparkContext that would tell you. Just
 wondering - what is the use case here?

 - Patrick

 On Sat, Feb 22, 2014 at 4:04 PM, nicholas.chammas
  wrote:
 > Is there a programmatic way to tell what version of Spark I'm running?
 >
 > I know I can look at the banner when the Spark shell starts up, but
 I'm
 > curious to know if there's another way.
 >
 > Nick
 >
 >
 > 
 > View this message in context: programmatic way to tell Spark version
 > Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

>>>
>>>
>>
>
>


Re: Using pyspark shell in local[n] (single machine) mode unnecessarily tries to connect to HDFS NameNode ...

2014-04-10 Thread didata.us
 

Hi: 

I believe I figured out how the behavior here: 

A file specified to SparkContext like this '/PATH/TO/SOME/FILE': 

* Will be interpreted as 'HDFS://path/to/some/file', when settings for
HDFS are present in '/ETC/HADOOP/CONF/*-SITE.XML'.
* Will be interpreted as 'FILE:///path/to/some/file', (i.e. locally)
otherwise.

I confirmed this behavior by temporarily doing this: 

* user$ sudo mv /etc/hadoop/conf /etc/hadoop/conf_

after which I re-ran my commands below. This time the SparkContext did,
indeed, look for and found, the file locally. 

In summary, '/PATH/TO/SOME/FILE' is interpreted as an in-HDFS relative
path when a HDFS configuration is found; and interpreted as an absolute
local UNIX file path when a HDFS configuration is *not* found. 

To be on the safe side, it's probably best to qualify local files with
'FILE:///' when that is what's intended; ahd with 'hdfs://' when HDFS is
what's intended. 

Hopes this helps someone. :) 

---
Sincerely,
Noel M. Vega
DiData
www.didata.us

On 2014-04-10 14:53, DiData wrote: 

> Hi Alton:
> 
> Thanks for the reply. I just wanted to build/use it from scratch to get a 
> better intuition of what's a happening.
> 
> Btw, using the binaries provided by Cloudera/CDH5 yielded the same issue as 
> my compiled version (i.e. it, too,
> tried to access the HDFS / Name Node. Same exact error).
> 
> However, a small breakthrough. Just now I tinkered some more and found that 
> this variation works:
> 
> REPLACE THIS: >>> distData = 
> sc.textFile('/home/user/Download/ml-10M100K/ratings.dat') WITH THIS: >>> 
> distData = sc.textFile('FILE:/// 
> [2]home/user/Download/ml-10M100K/ratings.dat') That is, use 'file:/// [2]'. I 
> don't know if that is the correct way of specifying the URI for local files, 
> or whether this just *happens to work*. The documents that I've read thus far 
> haven't shown it that specified way, but I still have more to read. 
> 
> =:)
> 
> Thank you, 
> 
> ~NMV
> 
> On 04/10/2014 04:20 PM, Alton Alexander wrote: 
> 
> I am doing the exact same thing for the purpose of learning. I also
> don't have a hadoop cluster and plan to scale on ec2 as soon as I get
> it working locally.
> 
> I am having good success just using the binaries on and not compiling
> from source... Is there a reason why you aren't just using the
> binaries?
> 
> On Thu, Apr 10, 2014 at 1:30 PM, DiData  wrote:
> 
> Hello friends:
> 
> I recently compiled and installed Spark v0.9 from the Apache distribution.
> 
> Note: I have the Cloudera/CDH5 Spark RPMs co-installed as well (actually,
> the
> entire big-data suite from CDH is installed), but for the moment I'm using
> my
> manually built Apache Spark for 'ground-up' learning purposes.
> 
> Now, prior to compilation (i.e. 'sbt/sbt clean compile') I specified the
> following:
> 
> export SPARK_YARN=true
> export SPARK_HADOOP_VERSION=2.3.0-cdh5.0.0
> 
> The resulting examples ran fine locally as well as on YARN.
> 
> I'm not interested in YARN here; just mentioning it for completeness in case
> that matters in
> my upcoming question. Here is my issue / question:
> 
> I start pyspark locally -- on one machine for API learning purposes -- as
> shown below, and attempt to
> interact with a local text file (not in HDFS). Unfortunately, the
> SparkContext (sc) tries to connect to
> a HDFS Name Node (which I don't currently have enabled because I don't need
> it).
> 
> The SparkContext cleverly inspects the configurations in my
> '/etc/hadoop/conf/' directory to learn
> where my Name Node is, however I don't want it to do that in this case. I
> just want it to run a
> one-machine local version of 'pyspark'.
> 
> Did I miss something in my invocation/use of 'pyspark' below? Do I need to
> add something else?
> 
> (Btw: I searched but could not find any solutions, and the documentation,
> while good, doesn't
> quite get me there).
> 
> See below, and thank you all in advance!
> 
> user$ export PYSPARK_PYTHON=/usr/bin/bpython
> user$ export MASTER=local[8]
> user$ /home/user/APPS.d/SPARK.d/latest/bin/pyspark
> #
> ===
 sc
> 

 distData = sc.textFile('/home/user/Download/ml-10M100K/ratings.dat')
 distData.count()
> [ ... snip ... ]
> Py4JJavaError: An error occurred while calling o21.collect.
> : java.net.ConnectException: Call From server01/192.168.0.15 to
> namenode:8020 failed on connection exception:
> java.net.ConnectException: Connection refused; For more details see:
> http://wiki.apache.org/hadoop/ConnectionRefused [1]
> [ ... snip ... ]


> #
> ===
> 
> --
> Sincerely,
> DiData

-- 
Sincerely,
DiData
 

Links:
--
[1] http://wiki.apache.org/hadoop/ConnectionRefused
[2] file:///


Error specifying Kafka params from Java

2014-04-10 Thread Paul Mogren
Hi all,



I get the following exception when trying to build a Kafka input DStream with 
custom properties from Java. I am wondering if it's a problem with the Java to 
Scala binding - I am at a loss for what I could be doing wrong.



14/04/10 16:46:28 ERROR NetworkInputTracker: De-registered receiver for network 
stream 0 with message java.lang.NoSuchMethodException: 
java.lang.Object.(kafka.utils.VerifiableProperties)



Where java.lang.Object is referenced in the error, I expect it should be using 
kafka.serializer.StringDecoder.



Here is my invocation:



  Map kafkaParams = new HashMap<>();

kafkaParams.put("zookeeper.connect", kafkaZooKeepers);

kafkaParams.put("zookeeper.connection.timeout.ms", "1");

kafkaParams.put("group.id", kafkaConsumerGroup);

kafkaParams.put("auto.offset.reset", "smallest");



// The Spark processing stages

JavaPairDStream messages = KafkaUtils.createStream(jssc,

String.class, String.class, StringDecoder.class, 
StringDecoder.class,

kafkaParams, kafkaTopicMap, 
StorageLevel.MEMORY_AND_DISK_SER_2());





Thanks for any insight,

Paul.





P.S. I would like to see a method in KafkaUtils that accepts kafkaParams 
without requiring to specify the four data/decoder types.





Re: programmatic way to tell Spark version

2014-04-10 Thread Pierre Borckmans
I see that this was fixed using a fixed string in SparkContext.scala.
Wouldn’t it be better to use something like:

getClass.getPackage.getImplementationVersion

to get the version from the jar manifest (and thus from the sbt definition)?

The same holds for SparkILoopInit.scala in the welcome message (printWelcome).

This would avoid having to modify these strings at each release.

cheers



Pierre Borckmans

RealImpact Analytics | Brussels Office
www.realimpactanalytics.com | pierre.borckm...@realimpactanalytics.com

FR +32 485 91 87 31 | Skype pierre.borckmans





On 10 Apr 2014, at 23:05, Patrick Wendell  wrote:

> I think this was solved in a recent merge:
> 
> https://github.com/apache/spark/pull/204/files#diff-364713d7776956cb8b0a771e9b62f82dR779
> 
> Is that what you are looking for? If so, mind marking the JIRA as resolved?
> 
> 
> On Wed, Apr 9, 2014 at 3:30 PM, Nicholas Chammas  
> wrote:
> Hey Patrick, 
> 
> I've created SPARK-1458 to track this request, in case the team/community 
> wants to implement it in the future.
> 
> Nick
> 
> 
> On Sat, Feb 22, 2014 at 7:25 PM, Nicholas Chammas 
>  wrote:
> No use case at the moment.
> 
> What prompted the question: I was going to ask a different question on this 
> list and wanted to note my version of Spark. I assumed there would be a 
> getVersion method on SparkContext or something like that, but I couldn't find 
> one in the docs. I also couldn't find an environment variable with the 
> version. After futzing around a bit I realized it was printed out (quite 
> conspicuously) in the shell startup banner.
> 
> 
> On Sat, Feb 22, 2014 at 7:15 PM, Patrick Wendell  wrote:
> AFIAK - We don't have any way to do this right now. Maybe we could add
> a getVersion method to SparkContext that would tell you. Just
> wondering - what is the use case here?
> 
> - Patrick
> 
> On Sat, Feb 22, 2014 at 4:04 PM, nicholas.chammas
>  wrote:
> > Is there a programmatic way to tell what version of Spark I'm running?
> >
> > I know I can look at the banner when the Spark shell starts up, but I'm
> > curious to know if there's another way.
> >
> > Nick
> >
> >
> > 
> > View this message in context: programmatic way to tell Spark version
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> 
> 



Re: programmatic way to tell Spark version

2014-04-10 Thread Patrick Wendell
I think this was solved in a recent merge:

https://github.com/apache/spark/pull/204/files#diff-364713d7776956cb8b0a771e9b62f82dR779

Is that what you are looking for? If so, mind marking the JIRA as resolved?


On Wed, Apr 9, 2014 at 3:30 PM, Nicholas Chammas  wrote:

> Hey Patrick,
>
> I've created SPARK-1458  to
> track this request, in case the team/community wants to implement it in the
> future.
>
> Nick
>
>
> On Sat, Feb 22, 2014 at 7:25 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> No use case at the moment.
>>
>> What prompted the question: I was going to ask a different question on
>> this list and wanted to note my version of Spark. I assumed there would be
>> a getVersion method on SparkContext or something like that, but I couldn't
>> find one in the docs. I also couldn't find an environment variable with the
>> version. After futzing around a bit I realized it was printed out (quite
>> conspicuously) in the shell startup banner.
>>
>>
>> On Sat, Feb 22, 2014 at 7:15 PM, Patrick Wendell wrote:
>>
>>> AFIAK - We don't have any way to do this right now. Maybe we could add
>>> a getVersion method to SparkContext that would tell you. Just
>>> wondering - what is the use case here?
>>>
>>> - Patrick
>>>
>>> On Sat, Feb 22, 2014 at 4:04 PM, nicholas.chammas
>>>  wrote:
>>> > Is there a programmatic way to tell what version of Spark I'm running?
>>> >
>>> > I know I can look at the banner when the Spark shell starts up, but I'm
>>> > curious to know if there's another way.
>>> >
>>> > Nick
>>> >
>>> >
>>> > 
>>> > View this message in context: programmatic way to tell Spark version
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>>
>>
>>
>


Re: hbase scan performance

2014-04-10 Thread Patrick Wendell
This job might still be faster... in MapReduce there will be other
overheads in addition to the fact that doing sequential reads from HBase is
slow. But it's possible the bottleneck is the HBase scan performance.

- Patrick


On Wed, Apr 9, 2014 at 10:10 AM, Jerry Lam  wrote:

> Hi Dave,
>
> This is HBase solution to the poor scan performance issue:
> https://issues.apache.org/jira/browse/HBASE-8369
>
> I encountered the same issue before.
> To the best of my knowledge, this is not a mapreduce issue. It is hbase
> issue. If you are planning to swap out mapreduce and replace it with spark,
> I don't think you can get a lot of performance from scanning HBase unless
> you are talking about caching the results from HBase in spark and reuse it
> over and over.
>
> HTH,
>
> Jerry
>
>
> On Wed, Apr 9, 2014 at 12:02 PM, David Quigley wrote:
>
>> Hi all,
>>
>> We are currently using hbase to store user data and periodically doing a
>> full scan to aggregate data. The reason we use hbase is that we need a
>> single user's data to be contiguous, so as user data comes in, we need the
>> ability to update a random access store.
>>
>> The performance of a full hbase scan with MapReduce is frustratingly
>> slow, despite implementing recommended optimizations. I see that it is
>> possible to scan hbase with Spark, but am not familiar with how Spark
>> interfaces with hbase. Would you expect the scan to perform similarly if
>> used as a Spark input as a MapReduce input?
>>
>> Thanks,
>> Dave
>>
>
>


Error specifying Kafka params from Java

2014-04-10 Thread Paul Mogren
Hi all,

I get the following exception when trying to build a Kafka input DStream with 
custom properties from Java. I am wondering if it's a problem with the Java to 
Scala binding - I am at a loss for what I could be doing wrong.

14/04/10 16:46:28 ERROR NetworkInputTracker: De-registered receiver for network 
stream 0 with message java.lang.NoSuchMethodException: 
java.lang.Object.(kafka.utils.VerifiableProperties)

Where java.lang.Object is referenced in the error, I expect it should be using 
kafka.serializer.StringDecoder.

Here is my invocation:

  Map kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", kafkaZooKeepers);
kafkaParams.put("zookeeper.connection.timeout.ms", "1");
kafkaParams.put("group.id", kafkaConsumerGroup);
kafkaParams.put("auto.offset.reset", "smallest");

// The Spark processing stages
JavaPairDStream messages = KafkaUtils.createStream(jssc,
String.class, String.class, StringDecoder.class, 
StringDecoder.class,
kafkaParams, kafkaTopicMap, 
StorageLevel.MEMORY_AND_DISK_SER_2());


Thanks for any insight,
Paul.


P.S. I would like to see a method in KafkaUtils that accepts kafkaParams 
without requiring to specify the four data/decoder types.



Re: Using pyspark shell in local[n] (single machine) mode unnecessarily tries to connect to HDFS NameNode ...

2014-04-10 Thread DiData

Hi Alton:

Thanks for the reply. I just wanted to build/use it from scratch to get 
a better intuition of what's a happening.


Btw, using the binaries provided by Cloudera/CDH5 yielded the same issue 
as my compiled version (i.e. it, too,

tried to access the HDFS / Name Node. Same exact error).

However, a small breakthrough. Just now I tinkered some more and found 
that this variation works:


   REPLACE THIS: >>> distData = 
sc.textFile('/home/user/Download/ml-10M100K/ratings.dat')
   WITH THIS:>>>  distData = 
sc.textFile('*file:///*home/user/Download/ml-10M100K/ratings.dat')

That is, use 'file:///'.

I don't know if that is the correct way of specifying the URI for local files, 
or whether this just *happens to
work*. The documents that I've read thus far haven't shown it that specified 
way, but I still have more to
read.   =:)

Thank you,
~NMV


On 04/10/2014 04:20 PM, Alton Alexander wrote:

I am doing the exact same thing for the purpose of learning. I also
don't have a hadoop cluster and plan to scale on ec2 as soon as I get
it working locally.

I am having good success just using the binaries on and not compiling
from source... Is there a reason why you aren't just using the
binaries?

On Thu, Apr 10, 2014 at 1:30 PM, DiData  wrote:

Hello friends:

I recently compiled and installed Spark v0.9 from the Apache distribution.

Note: I have the Cloudera/CDH5 Spark RPMs co-installed as well (actually,
the
entire big-data suite from CDH is installed), but for the moment I'm using
my
manually built Apache Spark for 'ground-up' learning purposes.

Now, prior to compilation (i.e. 'sbt/sbt clean compile') I specified the
following:

   export SPARK_YARN=true
   export SPARK_HADOOP_VERSION=2.3.0-cdh5.0.0

The resulting examples ran fine locally as well as on YARN.

I'm not interested in YARN here; just mentioning it for completeness in case
that matters in
my upcoming question. Here is my issue / question:

I start pyspark locally -- on one machine for API learning purposes -- as
shown below, and attempt to
interact with a local text file (not in HDFS). Unfortunately, the
SparkContext (sc) tries to connect to
a HDFS Name Node (which I don't currently have enabled because I don't need
it).

The SparkContext cleverly inspects the configurations in my
'/etc/hadoop/conf/' directory to learn
where my Name Node is, however I don't want it to do that in this case. I
just want it to run a
one-machine local version of 'pyspark'.

Did I miss something in my invocation/use of 'pyspark' below? Do I need to
add something else?

(Btw: I searched but could not find any solutions, and the documentation,
while good, doesn't
quite get me there).

See below, and thank you all in advance!


user$ export PYSPARK_PYTHON=/usr/bin/bpython
user$ export MASTER=local[8]
user$ /home/user/APPS.d/SPARK.d/latest/bin/pyspark
   #
===
   >>> sc
   
   >>>
   >>> distData = sc.textFile('/home/user/Download/ml-10M100K/ratings.dat')
   >>> distData.count()
   [ ... snip ... ]
   Py4JJavaError: An error occurred while calling o21.collect.
   : java.net.ConnectException: Call From server01/192.168.0.15 to
namenode:8020 failed on connection exception:
 java.net.ConnectException: Connection refused; For more details see:
http://wiki.apache.org/hadoop/ConnectionRefused
   [ ... snip ... ]
   >>>
   >>>
   #
===

--
Sincerely,
DiData


--
Sincerely,
DiData



Re: Using ProtoBuf 2.5 for messages with Spark Streaming

2014-04-10 Thread Patrick Wendell
Okay so I think the issue here is just a conflict between your application
code and the Hadoop code.

Hadoop 2.0.0 depends on protobuf 2.4.0a:
https://svn.apache.org/repos/asf/hadoop/common/tags/release-2.0.0-alpha/hadoop-project/pom.xml

Your code is depending on protobuf 2.5.X

The protobuf library is not binary compatible between these two versions
(unfortunately). This means that your application will have to shade
protobuf 2.5.X or you will have to upgrade to a version of Hadoop that is
compatible.


On Wed, Apr 9, 2014 at 1:03 PM, Kanwaldeep  wrote:

> Any update on this? We are still facing this issue.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-ProtoBuf-2-5-for-messages-with-Spark-Streaming-tp3396p4015.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Using pyspark shell in local[n] (single machine) mode unnecessarily tries to connect to HDFS NameNode ...

2014-04-10 Thread Alton Alexander
I am doing the exact same thing for the purpose of learning. I also
don't have a hadoop cluster and plan to scale on ec2 as soon as I get
it working locally.

I am having good success just using the binaries on and not compiling
from source... Is there a reason why you aren't just using the
binaries?

On Thu, Apr 10, 2014 at 1:30 PM, DiData  wrote:
> Hello friends:
>
> I recently compiled and installed Spark v0.9 from the Apache distribution.
>
> Note: I have the Cloudera/CDH5 Spark RPMs co-installed as well (actually,
> the
> entire big-data suite from CDH is installed), but for the moment I'm using
> my
> manually built Apache Spark for 'ground-up' learning purposes.
>
> Now, prior to compilation (i.e. 'sbt/sbt clean compile') I specified the
> following:
>
>   export SPARK_YARN=true
>   export SPARK_HADOOP_VERSION=2.3.0-cdh5.0.0
>
> The resulting examples ran fine locally as well as on YARN.
>
> I'm not interested in YARN here; just mentioning it for completeness in case
> that matters in
> my upcoming question. Here is my issue / question:
>
> I start pyspark locally -- on one machine for API learning purposes -- as
> shown below, and attempt to
> interact with a local text file (not in HDFS). Unfortunately, the
> SparkContext (sc) tries to connect to
> a HDFS Name Node (which I don't currently have enabled because I don't need
> it).
>
> The SparkContext cleverly inspects the configurations in my
> '/etc/hadoop/conf/' directory to learn
> where my Name Node is, however I don't want it to do that in this case. I
> just want it to run a
> one-machine local version of 'pyspark'.
>
> Did I miss something in my invocation/use of 'pyspark' below? Do I need to
> add something else?
>
> (Btw: I searched but could not find any solutions, and the documentation,
> while good, doesn't
> quite get me there).
>
> See below, and thank you all in advance!
>
>
> user$ export PYSPARK_PYTHON=/usr/bin/bpython
> user$ export MASTER=local[8]
> user$ /home/user/APPS.d/SPARK.d/latest/bin/pyspark
>   #
> ===
>   >>> sc
>   
>   >>>
>   >>> distData = sc.textFile('/home/user/Download/ml-10M100K/ratings.dat')
>   >>> distData.count()
>   [ ... snip ... ]
>   Py4JJavaError: An error occurred while calling o21.collect.
>   : java.net.ConnectException: Call From server01/192.168.0.15 to
> namenode:8020 failed on connection exception:
> java.net.ConnectException: Connection refused; For more details see:
> http://wiki.apache.org/hadoop/ConnectionRefused
>   [ ... snip ... ]
>   >>>
>   >>>
>   #
> ===
>
> --
> Sincerely,
> DiData


Re: Spark - ready for prime time?

2014-04-10 Thread Matei Zaharia
To add onto the discussion about memory working space, 0.9 introduced the 
ability to spill data within a task to disk, and in 1.0 we’re also changing the 
interface to allow spilling data within the same *group* to disk (e.g. when you 
do groupBy and get a key with lots of values). The main reason these weren’t 
there was that for a lot of workloads (everything except the same key having 
lots of values), simply launching more reduce tasks was also a good solution, 
because it results in an external sort across the cluster similar to what would 
happen within a task.

Overall, expect to see more work to both explain how things execute 
(http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the 
monitoring UI is another) and try to make things require no configuration out 
of the box. We’re doing a lot of this based on user feedback, so that’s 
definitely appreciated.

Matei

On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov  wrote:

> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
> The biggest issue I've come across is that the cluster is somewhat unstable 
> when under memory pressure.  Meaning that if you attempt to persist an RDD 
> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get 
> OOMs.  I had to carefully modify some of the space tuning parameters and GC 
> settings to get some jobs to even finish.
> 
> The other issue I've observed is if you group on a key that is highly skewed, 
> with a few massively-common keys and a long tail of rare keys, the one 
> massive key can be too big for a single machine and again cause OOMs.
> 
> My take on it -- Spark doesn't believe in sort-and-spill things to enable 
> super long groups, and IMO for a good reason. Here are my thoughts:
> 
> (1) in my work i don't need "sort" in 99% of the cases, i only need "group" 
> which absolutely doesn't need the spill which makes things slow down to a 
> crawl. 
> (2) if that's an aggregate (such as group count), use combine(), not 
> groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always 
> because you want to do something that is other than aggregation, with them. 
> E,g build an index of that grouped data. we actually had a case just like 
> that. In this case your friend is really not groupBy, but rather PartitionBy. 
> I.e. what happens there you build a quick count sketch, perhaps on 
> downsampled data, to figure which keys have sufficiently "big" count -- and 
> then you build a partitioner that redirects large groups to a dedicated 
> map(). assuming this map doesn't try to load things in memory but rather do 
> something like streaming BTree build, that should be fine. In certain 
> cituations such processing may require splitting super large group even into 
> smaller sub groups (e.g. partitioned BTree structure), at which point you 
> should be fine even from uniform load point of view. It takes a little of 
> jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise 
> do this all for you in the groupBy contract.
> 
>  
> 
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
> 
> Just my personal experience, but I've observed significant improvements in 
> stability since even the 0.7.x days, so I'm confident that things will 
> continue to get better as long as people report what they're seeing so it can 
> get fixed.
> 
> Andrew
> 
> 
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert  
> wrote:
> I'll provide answers from our own experience at Bizo.  We've been using Spark 
> for 1+ year now and have found it generally better than previous approaches 
> (Hadoop + Hive mostly).
> 
> 
> 
> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
>  wrote:
> I. Is it too much magic? Lots of things "just work right" in Spark and it's 
> extremely convenient and efficient when it indeed works. But should we be 
> worried that customization is hard if the built in behavior is not quite 
> right for us? Are we to expect hard to track down issues originating from the 
> black box behind the magic?
> 
> I think is goes back to understanding Spark's architecture, its design 
> constraints and the problems it explicitly set out to address.   If the 
> solution to your problems can be easily formulated in terms of the map/reduce 
> model, then it's a good choice.  You'll want your "customizations" to go with 
> (not against) the grain of the architecture.
>  
> II. Is it mature enough? E.g. we've created a pull request which fixes a 
> problem that we were very surprised no one ever stumbled upon before. So 
> that's why I'm asking: is Spark being already used in professional settings? 
> Can one already trust it being reasonably bug free and reliable?
> 
> There are lots of ways to use Spark; and not all of the features are 
> necessarily at the same level of maturity.   For instance, we put all the 
> jars on the main classpath so we've never run into the issue 

Using pyspark shell in local[n] (single machine) mode unnecessarily tries to connect to HDFS NameNode ...

2014-04-10 Thread DiData

Hello friends:

I recently compiled and installed Spark v0.9 from the Apache distribution.

Note: I have the Cloudera/CDH5 Spark RPMs co-installed as well 
(actually, the
entire big-data suite from CDH is installed), but for the moment I'm 
using my

manually built Apache Spark for 'ground-up' learning purposes.

Now, prior to compilation (i.e. 'sbt/sbt clean compile') I specified the 
following:


  export SPARK_YARN=true
  export SPARK_HADOOP_VERSION=2.3.0-cdh5.0.0

The resulting examples ran fine locally as well as on YARN.

I'm not interested in YARN here; just mentioning it for completeness in 
case that matters in

my upcoming question. Here is my issue / question:

I start pyspark locally -- on one machine for API learning purposes -- 
as shown below, and attempt to
interact with a local text file (not in HDFS). Unfortunately, the 
SparkContext (sc) tries to connect to
a HDFS Name Node (which I don't currently have enabled because I don't 
need it).


The SparkContext cleverly inspects the configurations in my 
'/etc/hadoop/conf/' directory to learn
where my Name Node is, however I don't want it to do that in this case. 
I just want it to run a

one-machine local version of 'pyspark'.

Did I miss something in my invocation/use of 'pyspark' below? Do I need 
to add something else?


(Btw: I searched but could not find any solutions, and the 
documentation, while good, doesn't

quite get me there).

See below, and thank you all in advance!


user$ export PYSPARK_PYTHON=/usr/bin/bpython
user$ export MASTER=local[8]
user$ /home/user/APPS.d/SPARK.d/latest/bin/pyspark
  # 
===

  >>> sc
  
  >>>
  >>> distData = sc.textFile('/home/user/Download/ml-10M100K/ratings.dat')
  >>> distData.count()
  [ ... snip ... ]
*Py4JJavaError: An error occurred while calling o21.collect.
  : java.net.ConnectException: Call From server01/192.168.0.15 to 
namenode:8020 failed on connection exception:
java.net.ConnectException: Connection refused; For more details 
see: http://wiki.apache.org/hadoop/ConnectionRefused*

  [ ... snip ... ]
  >>>
  >>>
  # 
===


--
Sincerely,
DiData


Re: Spark - ready for prime time?

2014-04-10 Thread Brad Miller
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this mentioned
> in various threads - that shuffle _always_ hits disk? (Disregarding OS
> caches.) Why is this the case? Are you planning to add a function to do
> shuffle in memory or are there some intrinsic reasons for this to be
> impossible?
>
> I don't think it's true... as far as I'm concerned Spark doesn't peek into
> the OS and force it to disregard buffer caches. In general, for large
> shuffles, all shuffle files do not fit into memory, so we kind of have to
> write them out to disk. There is an undocumented option to sync writing
> shuffle files to disk every time we write a block, but that is by default
> false and not many people use it (for obvious reasons).

I believe I recently had the experience that for the map portion of
the shuffle all shuffle files seemed to be written into the file
system (albeit potentially on buffer caches).  The size of the shuffle
files on hosts matched the size of the "shuffle write" metric shown in
the UI (pyspark branch-0.9 as of Monday), so there didn't seem to be
any effort to keep the shuffle files in memory.

On Thu, Apr 10, 2014 at 12:43 PM, Andrew Or  wrote:
> Here are answers to a subset of your questions:
>
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take
>> RDD caching related memory management more into our own hands as LRU
>> eviction is nice most of the time but can be very suboptimal in some of our
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>> really wants to keep. I'm fine with going down in flames if I mark too much
>> data
>
> As far as I am aware, there is currently no other eviction policies for RDD
> blocks other than LRU. Your suggestion of prioritizing RDDs is an
> interesting one and I'm sure other users would like that as well.
>
>> B. Memory "reflection": can you pragmatically get the memory size of a
>> cached rdd and memory sizes available in total/per executor? If we could do
>> this we could indirectly avoid automatic evictions of things we might really
>> want to keep in memory.
>
> All this information should be displayed on the UI under the Storage tab.
>
>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>> the system started to cache RDD partitions on the driver as well. As the
>> driver ran out of memory I started to see evictions while there were still >
>> plenty of space on workers. This resulted in lengthy recomputations. Can
>> this be avoided somehow?
>
> The amount of space used for RDD storage is only a fraction of the total
> amount of memory available to the JVM. More specifically, it is governed by
> `spark.storage.memoryFraction`, which is by default 60%. This may explain
> why evictions seem to occur pre-maturely sometimes. In the future, we should
> probably add a table that contains information about evicted RDDs on the UI,
> so it's easier to track them. Right now evicted RDD's disappear from the
> face of the planet completely, sometimes leaving the user somewhat
> confounded. Though with off-heap storage (Tachyon) this may become less
> relevant.
>
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>> broadcast programmatically?
>
> In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
> explicitly added! Under the storage tab of the UI, we could probably also
> have a Broadcast table in the future, seeing that there are users interested
> in this feature.
>
>
>> 3. Recalculation of cached rdds
>> I see the following scenario happening. I load two RDDs A,B from disk,
>> cache them and then do some jobs on them, at the very least a count on each.
>> After these jobs are done I see on the storage panel that 100% of these RDDs
>> are cached in memory.
>> Then I create a third RDD C which is created by multiple joins and maps
>> from A and B, also cache it and start a job on C. When I do this I still see
>> A and B completely cached and also see C slowly getting more and more
>> cached. This is all fine and good, but in the meanwhile I see stages running
>> on the UI that point to code which is used to load A and B. How is this
>> possible? Am I misunderstanding how cached RDDs should behave?
>> And again the general question - how can one debug such issues?
>
> From the fractions of RDDs cached in memory, it seems to me that your
> application is running as expected. If you also cache C, then it will slowly
> add more blocks to storage, possibly evicting A and B if there is memory
> pressure. It's entirely possible that there is a bug on finding the call
> site on the stages page (there were a few PRs that made changes to this
> recently).
>
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, bu

Re: Spark - ready for prime time?

2014-04-10 Thread Andrew Or
Here are answers to a subset of your questions:

> 1. Memory management
> The general direction of these questions is whether it's possible to take
RDD caching related memory management more into our own hands as LRU
eviction is nice most of the time but can be very suboptimal in some of our
use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
really wants to keep. I'm fine with going down in flames if I mark too much
data

As far as I am aware, there is currently no other eviction policies for RDD
blocks other than LRU. Your suggestion of prioritizing RDDs is an
interesting one and I'm sure other users would like that as well.

> B. Memory "reflection": can you pragmatically get the memory size of a
cached rdd and memory sizes available in total/per executor? If we could do
this we could indirectly avoid automatic evictions of things we might
really want to keep in memory.

All this information should be displayed on the UI under the Storage tab.

> C. Evictions caused by RDD partitions on the driver. I had a setup with
huge worker memory and smallish memory on the driver JVM. To my surprise,
the system started to cache RDD partitions on the driver as well. As the
driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
this be avoided somehow?

The amount of space used for RDD storage is only a fraction of the total
amount of memory available to the JVM. More specifically, it is governed by
`spark.storage.memoryFraction`, which is by default 60%. This may explain
why evictions seem to occur pre-maturely sometimes. In the future, we
should probably add a table that contains information about evicted RDDs on
the UI, so it's easier to track them. Right now evicted RDD's disappear
from the face of the planet completely, sometimes leaving the user somewhat
confounded. Though with off-heap storage (Tachyon) this may become less
relevant.

> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
waiting for the LRU eviction taking care of it? Can you tell the size of a
broadcast programmatically?

In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
explicitly added! Under the storage tab of the UI, we could probably also
have a Broadcast table in the future, seeing that there are users
interested in this feature.

> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
cache them and then do some jobs on them, at the very least a count on
each. After these jobs are done I see on the storage panel that 100% of
these RDDs are cached in memory.
> Then I create a third RDD C which is created by multiple joins and maps
from A and B, also cache it and start a job on C. When I do this I still
see A and B completely cached and also see C slowly getting more and more
cached. This is all fine and good, but in the meanwhile I see stages
running on the UI that point to code which is used to load A and B. How is
this possible? Am I misunderstanding how cached RDDs should behave?
> And again the general question - how can one debug such issues?

>From the fractions of RDDs cached in memory, it seems to me that your
application is running as expected. If you also cache C, then it will
slowly add more blocks to storage, possibly evicting A and B if there is
memory pressure. It's entirely possible that there is a bug on finding the
call site on the stages page (there were a few PRs that made changes to
this recently).

4. Shuffle on disk
Is it true - I couldn't find it in official docs, but did see this
mentioned in various threads - that shuffle _always_ hits disk?
(Disregarding OS caches.) Why is this the case? Are you planning to add a
function to do shuffle in memory or are there some intrinsic reasons for
this to be impossible?

I don't think it's true... as far as I'm concerned Spark doesn't peek into
the OS and force it to disregard buffer caches. In general, for large
shuffles, all shuffle files do not fit into memory, so we kind of have to
write them out to disk. There is an undocumented option to sync writing
shuffle files to disk every time we write a block, but that is by default
false and not many people use it (for obvious reasons).



On Thu, Apr 10, 2014 at 12:05 PM, Roger Hoover wrote:

> Can anyone comment on their experience running Spark Streaming in
> production?
>
>
> On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov wrote:
>
>>
>>
>>
>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
>>
>>> The biggest issue I've come across is that the cluster is somewhat
>>> unstable when under memory pressure.  Meaning that if you attempt to
>>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>>> often still get OOMs.  I had to carefully modify some of the space tuning
>>> parameters and GC settings to get some jobs to even finish.
>>>
>>> The other issue I've observed is if yo

Re: /bin/java not found: JAVA_HOME ignored launching shark executor

2014-04-10 Thread Ken Ellinwood
Sorry, I forgot to mention this is spark-0.9.1 and shark-0.9.1.

Ken


On Thursday, April 10, 2014 9:02 AM, Ken Ellinwood  wrote:
 

 14/04/10 08:00:42 INFO AppClient$ClientActor: Executor added: 
app-20140410080041-0017/9 on worker-20140409145028-ken-
VirtualBox-39159 (ken-VirtualBox:39159) with 4 cores
14/04/10
 08:00:42 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20140410080041-0017/9 on hostPort ken-VirtualBox:39159 with 4 cores,
 1024.0 MB RAM
14/04/10 08:00:42 INFO AppClient$ClientActor: Executor updated: 
app-20140410080041-0017/9 is now RUNNING
14/04/10
 08:00:42 INFO AppClient$ClientActor: Executor updated: 
app-20140410080041-0017/9 is now FAILED (class java.io.IOException: 
Cannot run program "/bin/java" (in directory "/usr/lib/spark/work/app- 
20140410080041-0017/9"): error=2, No such file or directory)


I have JAVA_HOME set in both spark-env.sh and shark-env.sh.

I
 suspect that something is not passing the JAVA_HOME value correctly, 
because unless I create a symlink from /bin/java to my java executable 
in /usr/lib/jvm/... I get the errors above when starting sharkserver2.

Ideas?

Ken

Re: Spark - ready for prime time?

2014-04-10 Thread Roger Hoover
Can anyone comment on their experience running Spark Streaming in
production?


On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov wrote:

>
>
>
> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
>
>> The biggest issue I've come across is that the cluster is somewhat
>> unstable when under memory pressure.  Meaning that if you attempt to
>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>> often still get OOMs.  I had to carefully modify some of the space tuning
>> parameters and GC settings to get some jobs to even finish.
>>
>> The other issue I've observed is if you group on a key that is highly
>> skewed, with a few massively-common keys and a long tail of rare keys, the
>> one massive key can be too big for a single machine and again cause OOMs.
>>
>
> My take on it -- Spark doesn't believe in sort-and-spill things to enable
> super long groups, and IMO for a good reason. Here are my thoughts:
>
> (1) in my work i don't need "sort" in 99% of the cases, i only need
> "group" which absolutely doesn't need the spill which makes things slow
> down to a crawl.
> (2) if that's an aggregate (such as group count), use combine(), not
> groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always
> because you want to do something that is other than aggregation, with them.
> E,g build an index of that grouped data. we actually had a case just like
> that. In this case your friend is really not groupBy, but rather
> PartitionBy. I.e. what happens there you build a quick count sketch,
> perhaps on downsampled data, to figure which keys have sufficiently "big"
> count -- and then you build a partitioner that redirects large groups to a
> dedicated map(). assuming this map doesn't try to load things in memory but
> rather do something like streaming BTree build, that should be fine. In
> certain cituations such processing may require splitting super large group
> even into smaller sub groups (e.g. partitioned BTree structure), at which
> point you should be fine even from uniform load point of view. It takes a
> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
> not promise do this all for you in the groupBy contract.
>
>
>
>>
>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>> issues.
>>
>> Just my personal experience, but I've observed significant improvements
>> in stability since even the 0.7.x days, so I'm confident that things will
>> continue to get better as long as people report what they're seeing so it
>> can get fixed.
>>
>> Andrew
>>
>>
>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert 
>> wrote:
>>
>>> I'll provide answers from our own experience at Bizo.  We've been using
>>> Spark for 1+ year now and have found it generally better than previous
>>> approaches (Hadoop + Hive mostly).
>>>
>>>
>>>
>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>> andras.nem...@lynxanalytics.com> wrote:
>>>
 I. Is it too much magic? Lots of things "just work right" in Spark and
 it's extremely convenient and efficient when it indeed works. But should we
 be worried that customization is hard if the built in behavior is not quite
 right for us? Are we to expect hard to track down issues originating from
 the black box behind the magic?

>>>
>>> I think is goes back to understanding Spark's architecture, its design
>>> constraints and the problems it explicitly set out to address.   If the
>>> solution to your problems can be easily formulated in terms of the
>>> map/reduce model, then it's a good choice.  You'll want your
>>> "customizations" to go with (not against) the grain of the architecture.
>>>
>>>
 II. Is it mature enough? E.g. we've created a pull 
 requestwhich fixes a problem 
 that we were very surprised no one ever stumbled upon
 before. So that's why I'm asking: is Spark being already used in
 professional settings? Can one already trust it being reasonably bug free
 and reliable?

>>>
>>> There are lots of ways to use Spark; and not all of the features are
>>> necessarily at the same level of maturity.   For instance, we put all the
>>> jars on the main classpath so we've never run into the issue your pull
>>> request addresses.
>>>
>>> We definitely use and rely on Spark on a professional basis.  We have 5+
>>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>>> Once we got them working with the proper configuration settings, they have
>>> been running reliability since.
>>>
>>> I would characterize our use of Spark as a "better Hadoop", in the sense
>>> that we use it for batch processing only, no streaming yet.   We're happy
>>> it performs better than Hadoop but we don't require/rely on its memory
>>> caching features.  In fact, for most of our jobs it would simplify our
>>> lives if Spark wouldn't cache so many things in memory sin

/bin/java not found: JAVA_HOME ignored launching shark executor

2014-04-10 Thread Ken Ellinwood

 14/04/10 08:00:42 INFO AppClient$ClientActor: Executor added: 
app-20140410080041-0017/9 on worker-20140409145028-ken-
VirtualBox-39159 (ken-VirtualBox:39159) with 4 cores
14/04/10
 08:00:42 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20140410080041-0017/9 on hostPort ken-VirtualBox:39159 with 4 cores,
 1024.0 MB RAM
14/04/10 08:00:42 INFO AppClient$ClientActor: Executor updated: 
app-20140410080041-0017/9 is now RUNNING
14/04/10
 08:00:42 INFO AppClient$ClientActor: Executor updated: 
app-20140410080041-0017/9 is now FAILED (class java.io.IOException: 
Cannot run program "/bin/java" (in directory 
"/usr/lib/spark/work/app-20140410080041-0017/9"): error=2, No such file or 
directory)


I have JAVA_HOME set in both spark-env.sh and shark-env.sh.

I
 suspect that something is not passing the JAVA_HOME value correctly, 
because unless I create a symlink from /bin/java to my java executable 
in /usr/lib/jvm/... I get the errors above when starting sharkserver2.

Ideas?

Ken

Re: Spark - ready for prime time?

2014-04-10 Thread Dmitriy Lyubimov
On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:

> The biggest issue I've come across is that the cluster is somewhat
> unstable when under memory pressure.  Meaning that if you attempt to
> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
> often still get OOMs.  I had to carefully modify some of the space tuning
> parameters and GC settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>

My take on it -- Spark doesn't believe in sort-and-spill things to enable
super long groups, and IMO for a good reason. Here are my thoughts:

(1) in my work i don't need "sort" in 99% of the cases, i only need "group"
which absolutely doesn't need the spill which makes things slow down to a
crawl.
(2) if that's an aggregate (such as group count), use combine(), not
groupByKey -- this will do tons of good on memory use.
(3) if you really need groups that don't fit into memory, that is always
because you want to do something that is other than aggregation, with them.
E,g build an index of that grouped data. we actually had a case just like
that. In this case your friend is really not groupBy, but rather
PartitionBy. I.e. what happens there you build a quick count sketch,
perhaps on downsampled data, to figure which keys have sufficiently "big"
count -- and then you build a partitioner that redirects large groups to a
dedicated map(). assuming this map doesn't try to load things in memory but
rather do something like streaming BTree build, that should be fine. In
certain cituations such processing may require splitting super large group
even into smaller sub groups (e.g. partitioned BTree structure), at which
point you should be fine even from uniform load point of view. It takes a
little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
not promise do this all for you in the groupBy contract.



>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert wrote:
>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>> andras.nem...@lynxanalytics.com> wrote:
>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>
>>> II. Is it mature enough? E.g. we've created a pull 
>>> requestwhich fixes a problem that 
>>> we were very surprised no one ever stumbled upon
>>> before. So that's why I'm asking: is Spark being already used in
>>> professional settings? Can one already trust it being reasonably bug free
>>> and reliable?
>>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy
>> it performs better than Hadoop but we don't require/rely on its memory
>> caching features.  In fact, for most of our jobs it would simplify our
>> lives if Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this 

Re: Spark - ready for prime time?

2014-04-10 Thread Debasish Das
I agree with AndrewEvery time I underestimate the RAM requirementmy
hand calculations are always ways less than what JVM actually allocates...

But I guess I will understand the Scala JVM optimizations as I get more
pain


On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:

> The biggest issue I've come across is that the cluster is somewhat
> unstable when under memory pressure.  Meaning that if you attempt to
> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
> often still get OOMs.  I had to carefully modify some of the space tuning
> parameters and GC settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert wrote:
>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>> andras.nem...@lynxanalytics.com> wrote:
>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>
>>> II. Is it mature enough? E.g. we've created a pull 
>>> requestwhich fixes a problem that 
>>> we were very surprised no one ever stumbled upon
>>> before. So that's why I'm asking: is Spark being already used in
>>> professional settings? Can one already trust it being reasonably bug free
>>> and reliable?
>>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy
>> it performs better than Hadoop but we don't require/rely on its memory
>> caching features.  In fact, for most of our jobs it would simplify our
>> lives if Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this out into more threads or if there is some other way to
>>> have this discussion...
>>>
>>> 1. Memory management
>>> The general direction of these questions is whether it's possible to
>>> take RDD caching related memory management more into our own hands as LRU
>>> eviction is nice most of the time but can be very suboptimal in some of our
>>> use cases.
>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>> data essential.
>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>> cached rdd and memory sizes available in total/per executor? If we could do
>>> this we could indirectly avoid automatic evictions of things we might
>>> really want to keep in memory.
>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>>> the system started to cache RDD partitions on the driver as well. As the
>>> driver ran out of memory I started to see evictions while there were still
>>> plenty of space on workers. This resulted in lengthy recomputations. Can
>>> this be avoided 

Re: Spark - ready for prime time?

2014-04-10 Thread Brad Miller
I would echo much of what Andrew has said.

I manage a small/medium sized cluster (48 cores, 512G ram, 512G disk
space dedicated to spark, data storage in separate HDFS shares).  I've
been using spark since 0.7, and as with Andrew I've observed
significant and consistent improvements in stability (and in the
PySpark API) since then.  I have run into some trouble with mesos, and
I have run into some trouble when working with data which is large
relative to the size of my cluster (e.g. 100G), but overall it's
worked well and our group is continuing to build on top of spark.

On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
> The biggest issue I've come across is that the cluster is somewhat unstable
> when under memory pressure.  Meaning that if you attempt to persist an RDD
> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get
> OOMs.  I had to carefully modify some of the space tuning parameters and GC
> settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert 
> wrote:
>>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth
>>  wrote:
>>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>>
>>> II. Is it mature enough? E.g. we've created a pull request which fixes a
>>> problem that we were very surprised no one ever stumbled upon before. So
>>> that's why I'm asking: is Spark being already used in professional settings?
>>> Can one already trust it being reasonably bug free and reliable?
>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy it
>> performs better than Hadoop but we don't require/rely on its memory caching
>> features.  In fact, for most of our jobs it would simplify our lives if
>> Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this out into more threads or if there is some other way to
>>> have this discussion...
>>>
>>> 1. Memory management
>>> The general direction of these questions is whether it's possible to take
>>> RDD caching related memory management more into our own hands as LRU
>>> eviction is nice most of the time but can be very suboptimal in some of our
>>> use cases.
>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>> data essential.
>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>> cached rdd and memory sizes available in total/per executor? If we could do
>>> this we could indirectly avoid automatic evictions of things we might really
>>> want to keep in memory.
>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>> huge worker memory and smallish memory on the driver JVM. To my

Re: Spark - ready for prime time?

2014-04-10 Thread Andrew Ash
The biggest issue I've come across is that the cluster is somewhat unstable
when under memory pressure.  Meaning that if you attempt to persist an RDD
that's too big for memory, even with MEMORY_AND_DISK, you'll often still
get OOMs.  I had to carefully modify some of the space tuning parameters
and GC settings to get some jobs to even finish.

The other issue I've observed is if you group on a key that is highly
skewed, with a few massively-common keys and a long tail of rare keys, the
one massive key can be too big for a single machine and again cause OOMs.

I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.

Just my personal experience, but I've observed significant improvements in
stability since even the 0.7.x days, so I'm confident that things will
continue to get better as long as people report what they're seeing so it
can get fixed.

Andrew


On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert wrote:

> I'll provide answers from our own experience at Bizo.  We've been using
> Spark for 1+ year now and have found it generally better than previous
> approaches (Hadoop + Hive mostly).
>
>
>
> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
> andras.nem...@lynxanalytics.com> wrote:
>
>> I. Is it too much magic? Lots of things "just work right" in Spark and
>> it's extremely convenient and efficient when it indeed works. But should we
>> be worried that customization is hard if the built in behavior is not quite
>> right for us? Are we to expect hard to track down issues originating from
>> the black box behind the magic?
>>
>
> I think is goes back to understanding Spark's architecture, its design
> constraints and the problems it explicitly set out to address.   If the
> solution to your problems can be easily formulated in terms of the
> map/reduce model, then it's a good choice.  You'll want your
> "customizations" to go with (not against) the grain of the architecture.
>
>
>> II. Is it mature enough? E.g. we've created a pull 
>> requestwhich fixes a problem that 
>> we were very surprised no one ever stumbled upon
>> before. So that's why I'm asking: is Spark being already used in
>> professional settings? Can one already trust it being reasonably bug free
>> and reliable?
>>
>
> There are lots of ways to use Spark; and not all of the features are
> necessarily at the same level of maturity.   For instance, we put all the
> jars on the main classpath so we've never run into the issue your pull
> request addresses.
>
> We definitely use and rely on Spark on a professional basis.  We have 5+
> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
> Once we got them working with the proper configuration settings, they have
> been running reliability since.
>
> I would characterize our use of Spark as a "better Hadoop", in the sense
> that we use it for batch processing only, no streaming yet.   We're happy
> it performs better than Hadoop but we don't require/rely on its memory
> caching features.  In fact, for most of our jobs it would simplify our
> lives if Spark wouldn't cache so many things in memory since it would make
> configuration/tuning a lot simpler and jobs would run successfully on the
> first try instead of having to tweak things (# of partitions and such).
>
> So, to the concrete issues. Sorry for the long mail, and let me know if I
>> should break this out into more threads or if there is some other way to
>> have this discussion...
>>
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take
>> RDD caching related memory management more into our own hands as LRU
>> eviction is nice most of the time but can be very suboptimal in some of our
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>> really wants to keep. I'm fine with going down in flames if I mark too much
>> data essential.
>> B. Memory "reflection": can you pragmatically get the memory size of a
>> cached rdd and memory sizes available in total/per executor? If we could do
>> this we could indirectly avoid automatic evictions of things we might
>> really want to keep in memory.
>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>> the system started to cache RDD partitions on the driver as well. As the
>> driver ran out of memory I started to see evictions while there were still
>> plenty of space on workers. This resulted in lengthy recomputations. Can
>> this be avoided somehow?
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>> broadcast programmatically?
>>
>>
>> 2. Akka lost connections
>> We have quite often experienced lost executors due to akka exceptions -
>> mostly connection lost or similar. It seems to happen when an executor gets
>> extremely 

Re: NPE using saveAsTextFile

2014-04-10 Thread Nick Pentreath
There was a closure over the config object lurking around - but in any case
upgrading to 1.2.0 for config did the trick as it seems to have been a bug
in Typesafe config,

Thanks Matei!


On Thu, Apr 10, 2014 at 8:46 AM, Nick Pentreath wrote:

> Ok I thought it may be closing over the config option. I am using config
> for job configuration, but extracting vals from that. So not sure why as I
> thought I'd avoided closing over it. Will go back to source and see where
> it is creeping in.
>
>
>
> On Thu, Apr 10, 2014 at 8:42 AM, Matei Zaharia wrote:
>
>> I haven't seen this but it may be a bug in Typesafe Config, since this is
>> serializing a Config object. We don't actually use Typesafe Config
>> ourselves.
>>
>> Do you have any nulls in the data itself by any chance? And do you know
>> how that Config object is getting there?
>>
>> Matei
>>
>> On Apr 9, 2014, at 11:38 PM, Nick Pentreath 
>> wrote:
>>
>> Anyone have a chance to look at this?
>>
>> Am I just doing something silly somewhere?
>>
>> If it makes any difference, I am using the elasticsearch-hadoop plugin
>> for ESInputFormat. But as I say, I can parse the data (count, first() etc).
>> I just can't save it as text file.
>>
>>
>>
>>
>> On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath 
>> wrote:
>>
>>> Hi
>>>
>>> I'm using Spark 0.9.0.
>>>
>>> When calling saveAsTextFile on a custom hadoop inputformat (loaded with
>>> newAPIHadoopRDD), I get the following error below.
>>>
>>> If I call count, I get the correct count of number of records, so the
>>> inputformat is being read correctly... the issue only appears when trying
>>> to use saveAsTextFile.
>>>
>>> If I call first() I get the correct output, also. So it doesn't appear
>>> to be anything with the data or inputformat.
>>>
>>> Any idea what the actual problem is, since this stack trace is not
>>> obvious (though it seems to be in ResultTask which ultimately causes this).
>>>
>>> Is this a known issue at all?
>>>
>>>
>>> ==
>>>
>>> 14/04/08 16:00:46 ERROR OneForOneStrategy:
>>> java.lang.NullPointerException
>>>  at
>>> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>>> at
>>> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>>>  at
>>> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>  at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>  at java.lang.reflect.Method.invoke(Method.java:601)
>>> at
>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
>>>  at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>>  at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>>  at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>>  at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>>  at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>>  at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>>  at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>>  at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>>  at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>>  at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>>  at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>>  at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>>  at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>>  at
>>> java.io.ObjectOutputStream.writeOb

Re: Spark operators on Objects

2014-04-10 Thread Flavio Pompermaier
Probably for the XML case the best resource I found iare
http://stevenskelton.ca/real-time-data-mining-spark/ and
http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/
.
And about JSON? If I have to work with JSON and I want to use fasterxml
implementation? Is there any suggestion about how to start?

On Wed, Apr 9, 2014 at 11:37 PM, Flavio Pompermaier wrote:

> Any help about this...?
> On Apr 9, 2014 9:19 AM, "Flavio Pompermaier"  wrote:
>
>> Hi to everybody,
>>
>> In my current scenario I have complex objects stored as xml in an HBase
>> Table.
>> What's the best strategy to work with them? My final goal would be to
>> define operators on those objects (like filter, equals, append, join,
>> merge, etc) and then work with multiple RDDs to perform some kind of
>> comparison between those objects. What do you suggest me? Is it possible?
>>
>> Best,
>> Flavio
>>
>


Re: Spark on YARN performance

2014-04-10 Thread Flavio Pompermaier
Thank you for the reply Mayur, it would be nice to have a comparison about
that.
I hope one day it will be available, or to have the time to test it myself
:)
So you're using Mesos for the moment, right? Which are the main differences
in you experience? YARN seems to be more flexible and interoperable with
other frameworks..am I wrong?

Best,
Flavio


On Thu, Apr 10, 2014 at 5:55 PM, Mayur Rustagi wrote:

> I've had better luck with standalone in terms of speed & latency. I think
> thr is impact but not really very high. Bigger impact is towards being able
> to manage resources & share cluster.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Wed, Apr 9, 2014 at 12:10 AM, Flavio Pompermaier 
> wrote:
>
>> Hi to everybody,
>>
>> I'm new to Spark and I'd like to know if running Spark on top of YARN or
>> Mesos could affect (and how much) its performance. Is there any doc about
>> this?
>>
>> Best,
>> Flavio
>>
>


Behaviour of caching when dataset does not fit into memory

2014-04-10 Thread Pierre Borckmans
Hi there,

Just playing around in the Spark shell, I am now a bit confused by the 
performance I observe when the dataset does not fit into memory :

- i load a dataset with roughly 500 million rows
- i do a count, it takes about 20 seconds
- now if I cache the RDD and do a count again (which will try cache the data 
again), it takes roughly 90 seconds (the fraction cached is only 25%).
=> is this expected? to be roughly 5 times slower when caching and not 
enough RAM is available?
- the subsequent calls to count are also really slow : about 90 seconds as well.
=> I can see that the first 25% tasks are fast (the ones dealing with 
data in memory), but then it gets really slow…

Am I missing something?
I thought performance would decrease kind of linearly with the amour of data 
fit into memory…

Thanks for your help!

Cheers





Pierre Borckmans

RealImpact Analytics | Brussels Office
www.realimpactanalytics.com | pierre.borckm...@realimpactanalytics.com

FR +32 485 91 87 31 | Skype pierre.borckmans







Re: Spark on YARN performance

2014-04-10 Thread Mayur Rustagi
I've had better luck with standalone in terms of speed & latency. I think
thr is impact but not really very high. Bigger impact is towards being able
to manage resources & share cluster.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Wed, Apr 9, 2014 at 12:10 AM, Flavio Pompermaier wrote:

> Hi to everybody,
>
> I'm new to Spark and I'd like to know if running Spark on top of YARN or
> Mesos could affect (and how much) its performance. Is there any doc about
> this?
>
> Best,
> Flavio
>


Re: is it possible to initiate Spark jobs from Oozie?

2014-04-10 Thread Mayur Rustagi
I dont think it'll do failure detection etc of spark job in Oozie as of
yet. You should be able to trigger it from Oozie (worst case as a shell
script).

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Thu, Apr 10, 2014 at 2:56 AM, Konstantin Kudryavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> I believe you need to write custom action or engage java action
> On Apr 10, 2014 12:11 AM, "Segerlind, Nathan L" <
> nathan.l.segerl...@intel.com> wrote:
>
>>  Howdy.
>>
>>
>>
>> Is it possible to initiate Spark jobs from Oozie (presumably as a java
>> action)? If so, are there known limitations to this?  And would anybody
>> have a pointer to an example?
>>
>>
>>
>> Thanks,
>>
>> Nate
>>
>>
>>
>


Re: Pig on Spark

2014-04-10 Thread Mayur Rustagi
Bam !!!
http://docs.sigmoidanalytics.com/index.php/Setting_up_spork_with_spark_0.8.1


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Thu, Apr 10, 2014 at 3:07 AM, Konstantin Kudryavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> Hi Mayur,
>
> I wondered if you could share your findings in some way (github, blog
> post, etc). I guess your experience will be very interesting/useful for
> many people
>
> sent from Lenovo YogaTablet
> On Apr 8, 2014 8:48 PM, "Mayur Rustagi"  wrote:
>
>> Hi Ankit,
>> Thanx for all the work on Pig.
>> Finally got it working. Couple of high level bugs right now:
>>
>>- Getting it working on Spark 0.9.0
>>- Getting UDF working
>>- Getting generate functionality working
>>- Exhaustive test suite on Spark on Pig
>>
>> are you maintaining a Jira somewhere?
>>
>> I am currently trying to deploy it on 0.9.0.
>>
>> Regards
>> Mayur
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Fri, Mar 14, 2014 at 1:37 PM, Aniket Mokashi wrote:
>>
>>> We will post fixes from our side at - https://github.com/twitter/pig.
>>>
>>> Top on our list are-
>>> 1. Make it work with pig-trunk (execution engine interface) (with 0.8 or
>>> 0.9 spark).
>>> 2. Support for algebraic udfs (this mitigates the group by oom problems).
>>>
>>> Would definitely love more contribution on this.
>>>
>>> Thanks,
>>> Aniket
>>>
>>>
>>> On Fri, Mar 14, 2014 at 12:29 PM, Mayur Rustagi >> > wrote:
>>>
 Dam I am off to NY for Structure Conf. Would it be possible to meet
 anytime after 28th March?
 I am really interested in making it stable & production quality.

 Regards
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi 



 On Fri, Mar 14, 2014 at 11:53 AM, Julien Le Dem wrote:

> Hi Mayur,
> Are you going to the Pig meetup this afternoon?
> http://www.meetup.com/PigUser/events/160604192/
> Aniket and I will be there.
> We would be happy to chat about Pig-on-Spark
>
>
>
> On Tue, Mar 11, 2014 at 8:56 AM, Mayur Rustagi <
> mayur.rust...@gmail.com> wrote:
>
>> Hi Lin,
>> We are working on getting Pig on spark functional with 0.8.0, have
>> you got it working on any spark version ?
>> Also what all functionality works on it?
>> Regards
>> Mayur
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Mon, Mar 10, 2014 at 11:00 PM, Xiangrui Meng wrote:
>>
>>> Hi Sameer,
>>>
>>> Lin (cc'ed) could also give you some updates about Pig on Spark
>>> development on her side.
>>>
>>> Best,
>>> Xiangrui
>>>
>>> On Mon, Mar 10, 2014 at 12:52 PM, Sameer Tilak 
>>> wrote:
>>> > Hi Mayur,
>>> > We are planning to upgrade our distribution MR1> MR2 (YARN) and
>>> the goal is
>>> > to get SPROK set up next month. I will keep you posted. Can you
>>> please keep
>>> > me informed about your progress as well.
>>> >
>>> > 
>>> > From: mayur.rust...@gmail.com
>>> > Date: Mon, 10 Mar 2014 11:47:56 -0700
>>> >
>>> > Subject: Re: Pig on Spark
>>> > To: user@spark.apache.org
>>> >
>>> >
>>> > Hi Sameer,
>>> > Did you make any progress on this. My team is also trying it out
>>> would love
>>> > to know some detail so progress.
>>> >
>>> > Mayur Rustagi
>>> > Ph: +1 (760) 203 3257
>>> > http://www.sigmoidanalytics.com
>>> > @mayur_rustagi
>>> >
>>> >
>>> >
>>> > On Thu, Mar 6, 2014 at 2:20 PM, Sameer Tilak 
>>> wrote:
>>> >
>>> > Hi Aniket,
>>> > Many thanks! I will check this out.
>>> >
>>> > 
>>> > Date: Thu, 6 Mar 2014 13:46:50 -0800
>>> > Subject: Re: Pig on Spark
>>> > From: aniket...@gmail.com
>>> > To: user@spark.apache.org; tgraves...@yahoo.com
>>> >
>>> >
>>> > There is some work to make this work on yarn at
>>> > https://github.com/aniket486/pig. (So, compile pig with ant
>>> > -Dhadoopversion=23)
>>> >
>>> > You can look at
>>> https://github.com/aniket486/pig/blob/spork/pig-spark to
>>> > find out what sort of env variables you need (sorry, I haven't
>>> been able to
>>> > clean this up- in-progress). There are few known issues with this,
>>> I will
>>> > work on fixing them soon.
>>> >
>>> > Known issues-
>>> > 1. Limit does not work (spork-fix)
>>> > 2. Foreach requires to turn off schema-tuple-backend (should be a
>>> pig-jira)
>>> > 3. Algebraic udfs dont work (spork-fix in-progress)
>>> > 4. Grou

Re: Spark - ready for prime time?

2014-04-10 Thread Alex Boisvert
I'll provide answers from our own experience at Bizo.  We've been using
Spark for 1+ year now and have found it generally better than previous
approaches (Hadoop + Hive mostly).


On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
andras.nem...@lynxanalytics.com> wrote:

> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should we
> be worried that customization is hard if the built in behavior is not quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
>

I think is goes back to understanding Spark's architecture, its design
constraints and the problems it explicitly set out to address.   If the
solution to your problems can be easily formulated in terms of the
map/reduce model, then it's a good choice.  You'll want your
"customizations" to go with (not against) the grain of the architecture.


> II. Is it mature enough? E.g. we've created a pull 
> requestwhich fixes a problem that 
> we were very surprised no one ever stumbled upon
> before. So that's why I'm asking: is Spark being already used in
> professional settings? Can one already trust it being reasonably bug free
> and reliable?
>

There are lots of ways to use Spark; and not all of the features are
necessarily at the same level of maturity.   For instance, we put all the
jars on the main classpath so we've never run into the issue your pull
request addresses.

We definitely use and rely on Spark on a professional basis.  We have 5+
spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
Once we got them working with the proper configuration settings, they have
been running reliability since.

I would characterize our use of Spark as a "better Hadoop", in the sense
that we use it for batch processing only, no streaming yet.   We're happy
it performs better than Hadoop but we don't require/rely on its memory
caching features.  In fact, for most of our jobs it would simplify our
lives if Spark wouldn't cache so many things in memory since it would make
configuration/tuning a lot simpler and jobs would run successfully on the
first try instead of having to tweak things (# of partitions and such).

So, to the concrete issues. Sorry for the long mail, and let me know if I
> should break this out into more threads or if there is some other way to
> have this discussion...
>
> 1. Memory management
> The general direction of these questions is whether it's possible to take
> RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
> waiting for the LRU eviction taking care of it? Can you tell the size of a
> broadcast programmatically?
>
>
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions -
> mostly connection lost or similar. It seems to happen when an executor gets
> extremely busy with some CPU intensive work. Our hypothesis is that akka
> network threads get starved and the executor fails to respond within
> timeout limits. Is this plausible? If yes, what can we do with it?
>

We've seen these as well.  In our case, increasing the akka timeouts and
framesize helped a lot.

e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}


>
> In general, these are scary errors in the sense that they come from the
> very core of the framework and it's hard to link it to something we do in
> our own code, and thus hard to find a fix. So a question more for the
> community: how often do you end up scratching your head about cases where
> spark
>
magic doesn't work perfectly?
>

For us, this happens most often for jobs processing TBs of data (instead of
GBs)... which is frustrating of course because these jobs cost a lot more
in $$$ + time to run/debug/diagnose than smaller jobs.

It means we have to comb the logs to understand what happened, interpret
stack traces, dump memory / object allocations, read Spark 

Re: Spark - ready for prime time?

2014-04-10 Thread Sean Owen
Mike Olson's comment:

http://vision.cloudera.com/mapreduce-spark/

Here's the partnership announcement:

http://databricks.com/blog/2013/10/28/databricks-and-cloudera-partner-to-support-spark.html

> On Thu, Apr 10, 2014 at 10:42 AM, Ian Ferreira 
> wrote:
>>
>> Do you have the link to the Cloudera comment?


Re: Spark - ready for prime time?

2014-04-10 Thread Dean Wampler
Here are several good ones:

https://www.google.com/search?q=cloudera+spark&oq=cloudera+spark&aqs=chrome..69i57j69i65l3j69i60l2.4439j0j7&sourceid=chrome&espv=2&es_sm=119&ie=UTF-8



On Thu, Apr 10, 2014 at 10:42 AM, Ian Ferreira wrote:

>  Do you have the link to the Cloudera comment?
>
> Sent from Windows Mail
>
> *From:* Dean Wampler 
> *Sent:* Thursday, April 10, 2014 7:39 AM
> *To:* Spark Users 
> *Cc:* Daniel Darabos , Andras 
> Barjak
>
> Spark has been endorsed by Cloudera as the successor to MapReduce. That
> says a lot...
>
>
> On Thu, Apr 10, 2014 at 10:11 AM, Andras Nemeth <
> andras.nem...@lynxanalytics.com> wrote:
>
>> Hello Spark Users,
>>
>> With the recent graduation of Spark to a top level project (grats, btw!),
>> maybe a well timed question. :)
>>
>> We are at the very beginning of a large scale big data project and after
>> two months of exploration work we'd like to settle on the technologies to
>> use, roll up our sleeves and start to build the system.
>>
>> Spark is one of the forerunners for our technology choice.
>>
>> My question in essence is whether it's a good idea or is Spark too
>> 'experimental' just yet to bet our lives (well, the project's life) on it.
>>
>> The benefits of choosing Spark are numerous and I guess all too obvious
>> for this audience - e.g. we love its powerful abstraction, ease of
>> development and the potential for using a single system for serving and
>> manipulating huge amount of data.
>>
>> This email aims to ask about the risks. I enlist concrete issues we've
>> encountered below, but basically my concern boils down to two philosophical
>> points:
>> I. Is it too much magic? Lots of things "just work right" in Spark and
>> it's extremely convenient and efficient when it indeed works. But should we
>> be worried that customization is hard if the built in behavior is not quite
>> right for us? Are we to expect hard to track down issues originating from
>> the black box behind the magic?
>> II. Is it mature enough? E.g. we've created a pull 
>> requestwhich fixes a problem that 
>> we were very surprised no one ever stumbled upon
>> before. So that's why I'm asking: is Spark being already used in
>> professional settings? Can one already trust it being reasonably bug free
>> and reliable?
>>
>> I know I'm asking a biased audience, but that's fine, as I want to be
>> convinced. :)
>>
>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>> should break this out into more threads or if there is some other way to
>> have this discussion...
>>
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take
>> RDD caching related memory management more into our own hands as LRU
>> eviction is nice most of the time but can be very suboptimal in some of our
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>> really wants to keep. I'm fine with going down in flames if I mark too much
>> data essential.
>> B. Memory "reflection": can you pragmatically get the memory size of a
>> cached rdd and memory sizes available in total/per executor? If we could do
>> this we could indirectly avoid automatic evictions of things we might
>> really want to keep in memory.
>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>> the system started to cache RDD partitions on the driver as well. As the
>> driver ran out of memory I started to see evictions while there were still
>> plenty of space on workers. This resulted in lengthy recomputations. Can
>> this be avoided somehow?
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>> broadcast programmatically?
>>
>>
>> 2. Akka lost connections
>> We have quite often experienced lost executors due to akka exceptions -
>> mostly connection lost or similar. It seems to happen when an executor gets
>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>> network threads get starved and the executor fails to respond within
>> timeout limits. Is this plausible? If yes, what can we do with it?
>>
>> In general, these are scary errors in the sense that they come from the
>> very core of the framework and it's hard to link it to something we do in
>> our own code, and thus hard to find a fix. So a question more for the
>> community: how often do you end up scratching your head about cases where
>> spark magic doesn't work perfectly?
>>
>>
>> 3. Recalculation of cached rdds
>> I see the following scenario happening. I load two RDDs A,B from disk,
>> cache them and then do some jobs on them, at the very least a count on
>> each. After these jobs are done I see on the storage panel that 100% of
>> these RDDs are cached in memory.
>>
>> Then I create a third RDD C which is created 

Re: Spark - ready for prime time?

2014-04-10 Thread Ian Ferreira
Do you have the link to the Cloudera comment?






Sent from Windows Mail





From: Dean Wampler
Sent: ‎Thursday‎, ‎April‎ ‎10‎, ‎2014 ‎7‎:‎39‎ ‎AM
To: Spark Users
Cc: Daniel Darabos, Andras Barjak






Spark has been endorsed by Cloudera as the successor to MapReduce. That says a 
lot...




On Thu, Apr 10, 2014 at 10:11 AM, Andras Nemeth 
 wrote:





Hello Spark Users,




With the recent graduation of Spark to a top level project (grats, btw!), maybe 
a well timed question. :)




We are at the very beginning of a large scale big data project and after two 
months of exploration work we'd like to settle on the technologies to use, roll 
up our sleeves and start to build the system.




Spark is one of the forerunners for our technology choice.




My question in essence is whether it's a good idea or is Spark too 
'experimental' just yet to bet our lives (well, the project's life) on it.




The benefits of choosing Spark are numerous and I guess all too obvious for 
this audience - e.g. we love its powerful abstraction, ease of development and 
the potential for using a single system for serving and manipulating huge 
amount of data.




This email aims to ask about the risks. I enlist concrete issues we've 
encountered below, but basically my concern boils down to two philosophical 
points:

I. Is it too much magic? Lots of things "just work right" in Spark and it's 
extremely convenient and efficient when it indeed works. But should we be 
worried that customization is hard if the built in behavior is not quite right 
for us? Are we to expect hard to track down issues originating from the black 
box behind the magic?

II. Is it mature enough? E.g. we've created a pull request which fixes a 
problem that we were very surprised no one ever stumbled upon before. So that's 
why I'm asking: is Spark being already used in professional settings? Can one 
already trust it being reasonably bug free and reliable?




I know I'm asking a biased audience, but that's fine, as I want to be 
convinced. :)




So, to the concrete issues. Sorry for the long mail, and let me know if I 
should break this out into more threads or if there is some other way to have 
this discussion...




1. Memory management


The general direction of these questions is whether it's possible to take RDD 
caching related memory management more into our own hands as LRU eviction is 
nice most of the time but can be very suboptimal in some of our use cases.

A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one really 
wants to keep. I'm fine with going down in flames if I mark too much data 
essential.

B. Memory "reflection": can you pragmatically get the memory size of a cached 
rdd and memory sizes available in total/per executor? If we could do this we 
could indirectly avoid automatic evictions of things we might really want to 
keep in memory.

C. Evictions caused by RDD partitions on the driver. I had a setup with huge 
worker memory and smallish memory on the driver JVM. To my surprise, the system 
started to cache RDD partitions on the driver as well. As the driver ran out of 
memory I started to see evictions while there were still plenty of space on 
workers. This resulted in lengthy recomputations. Can this be avoided somehow?

D. Broadcasts. Is it possible to get rid of a broadcast manually, without 
waiting for the LRU eviction taking care of it? Can you tell the size of a 
broadcast programmatically?







2. Akka lost connections


We have quite often experienced lost executors due to akka exceptions - mostly 
connection lost or similar. It seems to happen when an executor gets extremely 
busy with some CPU intensive work. Our hypothesis is that akka network threads 
get starved and the executor fails to respond within timeout limits. Is this 
plausible? If yes, what can we do with it?




In general, these are scary errors in the sense that they come from the very 
core of the framework and it's hard to link it to something we do in our own 
code, and thus hard to find a fix. So a question more for the community: how 
often do you end up scratching your head about cases where spark magic doesn't 
work perfectly?







3. Recalculation of cached rdds


I see the following scenario happening. I load two RDDs A,B from disk, cache 
them and then do some jobs on them, at the very least a count on each. After 
these jobs are done I see on the storage panel that 100% of these RDDs are 
cached in memory.




Then I create a third RDD C which is created by multiple joins and maps from A 
and B, also cache it and start a job on C. When I do this I still see A and B 
completely cached and also see C slowly getting more and more cached. This is 
all fine and good, but in the meanwhile I see stages running on the UI that 
point to code which is used to load A and B. How is this possible? Am I 
misunderstanding how cached RDDs should behave?




And again the general question - how can one debug such issues?





Spark 0.9.1 PySpark ImportError

2014-04-10 Thread aazout
I am getting a python ImportError on Spark standalone cluster. I have set the
PYTHONPATH on both worker and slave and the package imports properly when I
run PySpark command line on both machines. This only happens with Master -
Slave communication. Here is the error below: 

14/04/10 13:40:19 INFO scheduler.TaskSetManager: Loss was due to
org.apache.spark.api.python.PythonException: Traceback (most recent call
last): 
  File "/root/spark/python/pyspark/worker.py", line 73, in main 
command = pickleSer._read_with_length(infile) 
  File "/root/spark/python/pyspark/serializers.py", line 137, in
_read_with_length 
return self.loads(obj) 
  File "/root/spark/python/pyspark/cloudpickle.py", line 810, in subimport 
__import__(name) 
ImportError: ('No module named volatility.atm_impl_vol', , ('volatility.atm_impl_vol',)) 

Any ideas?



-
CEO / Velos (velos.io)
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-0-9-1-PySpark-ImportError-tp4068.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark - ready for prime time?

2014-04-10 Thread Dean Wampler
Spark has been endorsed by Cloudera as the successor to MapReduce. That
says a lot...


On Thu, Apr 10, 2014 at 10:11 AM, Andras Nemeth <
andras.nem...@lynxanalytics.com> wrote:

> Hello Spark Users,
>
> With the recent graduation of Spark to a top level project (grats, btw!),
> maybe a well timed question. :)
>
> We are at the very beginning of a large scale big data project and after
> two months of exploration work we'd like to settle on the technologies to
> use, roll up our sleeves and start to build the system.
>
> Spark is one of the forerunners for our technology choice.
>
> My question in essence is whether it's a good idea or is Spark too
> 'experimental' just yet to bet our lives (well, the project's life) on it.
>
> The benefits of choosing Spark are numerous and I guess all too obvious
> for this audience - e.g. we love its powerful abstraction, ease of
> development and the potential for using a single system for serving and
> manipulating huge amount of data.
>
> This email aims to ask about the risks. I enlist concrete issues we've
> encountered below, but basically my concern boils down to two philosophical
> points:
> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should we
> be worried that customization is hard if the built in behavior is not quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
> II. Is it mature enough? E.g. we've created a pull 
> requestwhich fixes a problem that 
> we were very surprised no one ever stumbled upon
> before. So that's why I'm asking: is Spark being already used in
> professional settings? Can one already trust it being reasonably bug free
> and reliable?
>
> I know I'm asking a biased audience, but that's fine, as I want to be
> convinced. :)
>
> So, to the concrete issues. Sorry for the long mail, and let me know if I
> should break this out into more threads or if there is some other way to
> have this discussion...
>
> 1. Memory management
> The general direction of these questions is whether it's possible to take
> RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
> waiting for the LRU eviction taking care of it? Can you tell the size of a
> broadcast programmatically?
>
>
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions -
> mostly connection lost or similar. It seems to happen when an executor gets
> extremely busy with some CPU intensive work. Our hypothesis is that akka
> network threads get starved and the executor fails to respond within
> timeout limits. Is this plausible? If yes, what can we do with it?
>
> In general, these are scary errors in the sense that they come from the
> very core of the framework and it's hard to link it to something we do in
> our own code, and thus hard to find a fix. So a question more for the
> community: how often do you end up scratching your head about cases where
> spark magic doesn't work perfectly?
>
>
> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
> cache them and then do some jobs on them, at the very least a count on
> each. After these jobs are done I see on the storage panel that 100% of
> these RDDs are cached in memory.
>
> Then I create a third RDD C which is created by multiple joins and maps
> from A and B, also cache it and start a job on C. When I do this I still
> see A and B completely cached and also see C slowly getting more and more
> cached. This is all fine and good, but in the meanwhile I see stages
> running on the UI that point to code which is used to load A and B. How is
> this possible? Am I misunderstanding how cached RDDs should behave?
>
> And again the general question - how can one debug such issues?
>
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but 

RE: Executing spark jobs with predefined Hadoop user

2014-04-10 Thread Shao, Saisai
Hi Asaf,

The user who run SparkContext is decided by the below code in SparkContext, 
normally this user.name is the user who started JVM, you can start your 
application with -Duser.name=xxx to specify a username you want, this specified 
username will be the user to communicate with HDFS.

 val sparkUser = Option {

Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
  }.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
  }

Thanks
Jerry

From: Asaf Lahav [mailto:asaf.la...@gmail.com]
Sent: Thursday, April 10, 2014 8:15 PM
To: user@spark.apache.org
Subject: Executing spark jobs with predefined Hadoop user

Hi,
We are using Spark with data files on HDFS. The files are stored as files for 
predefined hadoop user ("hdfs").
The folder is permitted with
* read write, executable and read permission for the hdfs user
* executable and read permission for users in the group
* just read permission for all other users

now the Spark write operation fails, due to a user mismatch of the spark 
context and the Hadoop user permission.
Is there a way to start the Spark Context with another user than the one 
configured on the local machine?



Please the technical details below:





The permission on the hdfs folder "/tmp/Iris" is as follows:
drwxr-xr-x   - hdfs  hadoop  0 2014-04-10 14:12 /tmp/Iris


The Spark context is initiated on my local machine and according to the 
configured hdfs permission "rwxr-xr-x" there is no problem in loading the 
Hadoop hdfs file into a rdd:
final JavaRDD rdd = sparkContext.textFile(filePath);

But saving the resulted rdd back to Hadoop resulst in an Hadoop security 
exception:
rdd.saveAsTextFile("/tmp/Iris/output");

Then the I receive the following Hadoop security exception:
org.apache.hadoop.security.AccessControlException: 
org.apache.hadoop.security.AccessControlException: Permission denied: 
user=halbani, access=WRITE, inode="/tmp/Iris":hdfs:hadoop:drwxr-xr-x
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method)
  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
  at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
  at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:95)
  at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:57)
  at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:1428)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:332)
  at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
  at 
org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:52)
  at 
org.apache.hadoop.mapred.SparkHadoopWriter.preSetup(SparkHadoopWriter.scala:65)
  at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:713)
  at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:686)
  at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:572)
  at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:894)
  at 
org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:355)
  at org.apache.spark.api.java.JavaRDD.saveAsTextFile(JavaRDD.scala:27)
  at org.apache.spark.reader.FileSpliter.split(FileSpliter.java:73)
  at org.apache.spark.reader.FileReaderMain.main(FileReaderMain.java:17)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:601)
  at 
com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: org.apache.hadoop.ipc.RemoteException: 
org.apache.hadoop.security.AccessControlException: Permission denied: 
user=halbani, access=WRITE, inode="/tmp/Iris":hdfs:hadoop:drwxr-xr-x
  at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:225)
  at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:205)
  at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:151)
  at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5951)
  at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5924)
  at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:2628)
  at 
org.apache.hadoop.hdfs.server

Re: Spark - ready for prime time?

2014-04-10 Thread Debasish Das
When you say "Spark is one of the forerunners for our technology choice",
what are the other options you are looking into ?

I start cross validation runs on a 40 core, 160 GB spark job using a
script...I woke up in the morning, none of the jobs crashed ! and the
project just came out of incubation

I wish Spark keep evolving as a standalone Akka cluster (MPI cluster if you
remember C++ mpiexec :-) where you can plug and play any distributed file
system (HDFS,..,) or distributed caching systems (HBase, Cassandra,..)

I am also confident that Spark as a standalone akka cluster can serve
analytics driven scalable frontend appsand by analytics I don't mean
sql analytics...but predictive analytics...



On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
andras.nem...@lynxanalytics.com> wrote:

> Hello Spark Users,
>
> With the recent graduation of Spark to a top level project (grats, btw!),
> maybe a well timed question. :)
>
> We are at the very beginning of a large scale big data project and after
> two months of exploration work we'd like to settle on the technologies to
> use, roll up our sleeves and start to build the system.
>
> Spark is one of the forerunners for our technology choice.
>
> My question in essence is whether it's a good idea or is Spark too
> 'experimental' just yet to bet our lives (well, the project's life) on it.
>
> The benefits of choosing Spark are numerous and I guess all too obvious
> for this audience - e.g. we love its powerful abstraction, ease of
> development and the potential for using a single system for serving and
> manipulating huge amount of data.
>
> This email aims to ask about the risks. I enlist concrete issues we've
> encountered below, but basically my concern boils down to two philosophical
> points:
> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should we
> be worried that customization is hard if the built in behavior is not quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
> II. Is it mature enough? E.g. we've created a pull 
> requestwhich fixes a problem that 
> we were very surprised no one ever stumbled upon
> before. So that's why I'm asking: is Spark being already used in
> professional settings? Can one already trust it being reasonably bug free
> and reliable?
>
> I know I'm asking a biased audience, but that's fine, as I want to be
> convinced. :)
>
> So, to the concrete issues. Sorry for the long mail, and let me know if I
> should break this out into more threads or if there is some other way to
> have this discussion...
>
> 1. Memory management
> The general direction of these questions is whether it's possible to take
> RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
> waiting for the LRU eviction taking care of it? Can you tell the size of a
> broadcast programmatically?
>
>
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions -
> mostly connection lost or similar. It seems to happen when an executor gets
> extremely busy with some CPU intensive work. Our hypothesis is that akka
> network threads get starved and the executor fails to respond within
> timeout limits. Is this plausible? If yes, what can we do with it?
>
> In general, these are scary errors in the sense that they come from the
> very core of the framework and it's hard to link it to something we do in
> our own code, and thus hard to find a fix. So a question more for the
> community: how often do you end up scratching your head about cases where
> spark magic doesn't work perfectly?
>
>
> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
> cache them and then do some jobs on them, at the very least a count on
> each. After these jobs are done I see on the storage panel that 100% of
> t

Fwd: Spark - ready for prime time?

2014-04-10 Thread Andras Nemeth
Hello Spark Users,

With the recent graduation of Spark to a top level project (grats, btw!),
maybe a well timed question. :)

We are at the very beginning of a large scale big data project and after
two months of exploration work we'd like to settle on the technologies to
use, roll up our sleeves and start to build the system.

Spark is one of the forerunners for our technology choice.

My question in essence is whether it's a good idea or is Spark too
'experimental' just yet to bet our lives (well, the project's life) on it.

The benefits of choosing Spark are numerous and I guess all too obvious for
this audience - e.g. we love its powerful abstraction, ease of development
and the potential for using a single system for serving and manipulating
huge amount of data.

This email aims to ask about the risks. I enlist concrete issues we've
encountered below, but basically my concern boils down to two philosophical
points:
I. Is it too much magic? Lots of things "just work right" in Spark and it's
extremely convenient and efficient when it indeed works. But should we be
worried that customization is hard if the built in behavior is not quite
right for us? Are we to expect hard to track down issues originating from
the black box behind the magic?
II. Is it mature enough? E.g. we've created a pull
requestwhich fixes a problem
that we were very surprised no one ever stumbled upon
before. So that's why I'm asking: is Spark being already used in
professional settings? Can one already trust it being reasonably bug free
and reliable?

I know I'm asking a biased audience, but that's fine, as I want to be
convinced. :)

So, to the concrete issues. Sorry for the long mail, and let me know if I
should break this out into more threads or if there is some other way to
have this discussion...

1. Memory management
The general direction of these questions is whether it's possible to take
RDD caching related memory management more into our own hands as LRU
eviction is nice most of the time but can be very suboptimal in some of our
use cases.
A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
really wants to keep. I'm fine with going down in flames if I mark too much
data essential.
B. Memory "reflection": can you pragmatically get the memory size of a
cached rdd and memory sizes available in total/per executor? If we could do
this we could indirectly avoid automatic evictions of things we might
really want to keep in memory.
C. Evictions caused by RDD partitions on the driver. I had a setup with
huge worker memory and smallish memory on the driver JVM. To my surprise,
the system started to cache RDD partitions on the driver as well. As the
driver ran out of memory I started to see evictions while there were still
plenty of space on workers. This resulted in lengthy recomputations. Can
this be avoided somehow?
D. Broadcasts. Is it possible to get rid of a broadcast manually, without
waiting for the LRU eviction taking care of it? Can you tell the size of a
broadcast programmatically?


2. Akka lost connections
We have quite often experienced lost executors due to akka exceptions -
mostly connection lost or similar. It seems to happen when an executor gets
extremely busy with some CPU intensive work. Our hypothesis is that akka
network threads get starved and the executor fails to respond within
timeout limits. Is this plausible? If yes, what can we do with it?

In general, these are scary errors in the sense that they come from the
very core of the framework and it's hard to link it to something we do in
our own code, and thus hard to find a fix. So a question more for the
community: how often do you end up scratching your head about cases where
spark magic doesn't work perfectly?


3. Recalculation of cached rdds
I see the following scenario happening. I load two RDDs A,B from disk,
cache them and then do some jobs on them, at the very least a count on
each. After these jobs are done I see on the storage panel that 100% of
these RDDs are cached in memory.

Then I create a third RDD C which is created by multiple joins and maps
from A and B, also cache it and start a job on C. When I do this I still
see A and B completely cached and also see C slowly getting more and more
cached. This is all fine and good, but in the meanwhile I see stages
running on the UI that point to code which is used to load A and B. How is
this possible? Am I misunderstanding how cached RDDs should behave?

And again the general question - how can one debug such issues?

4. Shuffle on disk
Is it true - I couldn't find it in official docs, but did see this
mentioned in various threads - that shuffle _always_ hits disk?
(Disregarding OS caches.) Why is this the case? Are you planning to add a
function to do shuffle in memory or are there some intrinsic reasons for
this to be impossible?


Sorry again for the giant mail, and thanks for any insights!

Andras


Re: Executing spark jobs with predefined Hadoop user

2014-04-10 Thread Adnan
Then problem is not on spark side, you have three options, choose any one of
them:

1. Change permissions on /tmp/Iris folder from shell on NameNode with "hdfs
dfs -chmod" command.
2. Run your hadoop service with hdfs user.
3. Disable dfs.permissions in conf/hdfs-site.xml.

Regards,
Adnan


avito wrote
> Thanks Adam for the quick answer. You are absolutely right. 
> We are indeed using the entire HDFS URI. Just for the post I have removed
> the name node details.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executing-spark-jobs-with-predefined-Hadoop-user-tp4059p4063.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Executing spark jobs with predefined Hadoop user

2014-04-10 Thread Adnan
You need to use proper HDFS URI with saveAsTextFile.

For Example:

rdd.saveAsTextFile("hdfs://NameNode:Port/tmp/Iris/output.tmp")

Regards,
Adnan


Asaf Lahav wrote
> Hi,
> 
> We are using Spark with data files on HDFS. The files are stored as files
> for predefined hadoop user ("hdfs").
> 
> The folder is permitted with
> 
> · read write, executable and read permission for the hdfs user
> 
> · executable and read permission for users in the group
> 
> · just read permission for all other users
> 
> 
> 
> now the Spark write operation fails, due to a user mismatch of the spark
> context and the Hadoop user permission.
> 
> Is there a way to start the Spark Context with another user than the one
> configured on the local machine?
> 
> 
> 
> 
> 
> 
> 
> Please the technical details below:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> The permission on the hdfs folder "/tmp/Iris" is as follows:
> 
> drwxr-xr-x   - hdfs  hadoop  0 2014-04-10 14:12 /tmp/Iris
> 
> 
> 
> 
> 
> The Spark context is initiated on my local machine and according to the
> configured hdfs permission "rwxr-xr-x" there is no problem in loading the
> Hadoop hdfs file into a rdd:
> 
> final JavaRDD
> 
>  rdd = sparkContext.textFile(filePath);
> 
> 
> 
> But saving the resulted rdd back to Hadoop resulst in an Hadoop security
> exception:
> 
> rdd.saveAsTextFile("/tmp/Iris/output");
> 
> 
> 
> Then the I receive the following Hadoop security exception:
> 
> org.apache.hadoop.security.AccessControlException:
> org.apache.hadoop.security.AccessControlException: *Permission denied:
> user=halbani, access=WRITE, inode="/tmp/Iris":hdfs:hadoop:drwxr-xr-x*
> 
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> 
>   at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> 
>   at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 
>   at
> java.lang.reflect.Constructor.newInstance(Constructor.java:525)
> 
>   at
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:95)
> 
>   at
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:57)
> 
>   at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:1428)
> 
>   at
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:332)
> 
>   at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
> 
>   at
> org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:52)
> 
>   at
> org.apache.hadoop.mapred.SparkHadoopWriter.preSetup(SparkHadoopWriter.scala:65)
> 
>   at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:713)
> 
>   at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:686)
> 
>   at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:572)
> 
>   at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:894)
> 
>   at
> org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:355)
> 
>   at
> org.apache.spark.api.java.JavaRDD.saveAsTextFile(JavaRDD.scala:27)
> 
>   at
> org.apache.spark.reader.FileSpliter.split(FileSpliter.java:73)
> 
>   at
> org.apache.spark.reader.FileReaderMain.main(FileReaderMain.java:17)
> 
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>   at java.lang.reflect.Method.invoke(Method.java:601)
> 
>   at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
> 
> Caused by: org.apache.hadoop.ipc.RemoteException:
> org.apache.hadoop.security.AccessControlException: Permission denied:
> user=halbani, access=WRITE, inode="/tmp/Iris":hdfs:hadoop:drwxr-xr-x
> 
>   at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:225)
> 
>   at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:205)
> 
>   at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:151)
> 
>   at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5951)
> 
>   at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5924)
> 
>   at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:2628)
> 
>   at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:2593)
> 
>   at
> org.apache.hadoop.hdfs.server.namenode.NameNode.mkd

Executing spark jobs with predefined Hadoop user

2014-04-10 Thread Asaf Lahav
Hi,

We are using Spark with data files on HDFS. The files are stored as files
for predefined hadoop user ("hdfs").

The folder is permitted with

· read write, executable and read permission for the hdfs user

· executable and read permission for users in the group

· just read permission for all other users



now the Spark write operation fails, due to a user mismatch of the spark
context and the Hadoop user permission.

Is there a way to start the Spark Context with another user than the one
configured on the local machine?







Please the technical details below:











The permission on the hdfs folder "/tmp/Iris" is as follows:

drwxr-xr-x   - hdfs  hadoop  0 2014-04-10 14:12 /tmp/Iris





The Spark context is initiated on my local machine and according to the
configured hdfs permission "rwxr-xr-x" there is no problem in loading the
Hadoop hdfs file into a rdd:

final JavaRDD rdd = sparkContext.textFile(filePath);



But saving the resulted rdd back to Hadoop resulst in an Hadoop security
exception:

rdd.saveAsTextFile("/tmp/Iris/output");



Then the I receive the following Hadoop security exception:

org.apache.hadoop.security.AccessControlException:
org.apache.hadoop.security.AccessControlException: *Permission denied:
user=halbani, access=WRITE, inode="/tmp/Iris":hdfs:hadoop:drwxr-xr-x*

  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)

  at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

  at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)

  at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:95)

  at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:57)

  at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:1428)

  at
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:332)

  at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)

  at
org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:52)

  at
org.apache.hadoop.mapred.SparkHadoopWriter.preSetup(SparkHadoopWriter.scala:65)

  at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:713)

  at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:686)

  at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:572)

  at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:894)

  at
org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:355)

  at
org.apache.spark.api.java.JavaRDD.saveAsTextFile(JavaRDD.scala:27)

  at org.apache.spark.reader.FileSpliter.split(FileSpliter.java:73)

  at
org.apache.spark.reader.FileReaderMain.main(FileReaderMain.java:17)

  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

  at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

  at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

  at java.lang.reflect.Method.invoke(Method.java:601)

  at
com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

Caused by: org.apache.hadoop.ipc.RemoteException:
org.apache.hadoop.security.AccessControlException: Permission denied:
user=halbani, access=WRITE, inode="/tmp/Iris":hdfs:hadoop:drwxr-xr-x

  at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:225)

  at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:205)

  at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:151)

  at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5951)

  at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5924)

  at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:2628)

  at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:2593)

  at
org.apache.hadoop.hdfs.server.namenode.NameNode.mkdirs(NameNode.java:927)

  at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source)

  at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

  at java.lang.reflect.Method.invoke(Method.java:606)

  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:587)

  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1444)

  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1440)

  at java

Re: How does Spark handle RDD via HDFS ?

2014-04-10 Thread gtanguy
Yes that help to understand better how works spark. But that was also what I
was afraid, I think the network communications will take to much time for my
job.

I will continue to look for a trick in order to not have network
communications.

I saw on the hadoop website that : "To minimize global bandwidth consumption
and read latency, HDFS tries to satisfy a read request from a replica that
is closest to the reader. If there exists a replica on the same rack as the
reader node, then that replica is preferred to satisfy the read request"

May if in a way I success to combine a part of spark and some of this, it
could work.

Thank you very much for you answer.

Germain.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-handle-RDD-via-HDFS-tp4003p4058.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Pig on Spark

2014-04-10 Thread Konstantin Kudryavtsev
Hi Mayur,

I wondered if you could share your findings in some way (github, blog post,
etc). I guess your experience will be very interesting/useful for many
people

sent from Lenovo YogaTablet
On Apr 8, 2014 8:48 PM, "Mayur Rustagi"  wrote:

> Hi Ankit,
> Thanx for all the work on Pig.
> Finally got it working. Couple of high level bugs right now:
>
>- Getting it working on Spark 0.9.0
>- Getting UDF working
>- Getting generate functionality working
>- Exhaustive test suite on Spark on Pig
>
> are you maintaining a Jira somewhere?
>
> I am currently trying to deploy it on 0.9.0.
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Fri, Mar 14, 2014 at 1:37 PM, Aniket Mokashi wrote:
>
>> We will post fixes from our side at - https://github.com/twitter/pig.
>>
>> Top on our list are-
>> 1. Make it work with pig-trunk (execution engine interface) (with 0.8 or
>> 0.9 spark).
>> 2. Support for algebraic udfs (this mitigates the group by oom problems).
>>
>> Would definitely love more contribution on this.
>>
>> Thanks,
>> Aniket
>>
>>
>> On Fri, Mar 14, 2014 at 12:29 PM, Mayur Rustagi 
>> wrote:
>>
>>> Dam I am off to NY for Structure Conf. Would it be possible to meet
>>> anytime after 28th March?
>>> I am really interested in making it stable & production quality.
>>>
>>> Regards
>>> Mayur Rustagi
>>> Ph: +1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi 
>>>
>>>
>>>
>>> On Fri, Mar 14, 2014 at 11:53 AM, Julien Le Dem wrote:
>>>
 Hi Mayur,
 Are you going to the Pig meetup this afternoon?
 http://www.meetup.com/PigUser/events/160604192/
 Aniket and I will be there.
 We would be happy to chat about Pig-on-Spark



 On Tue, Mar 11, 2014 at 8:56 AM, Mayur Rustagi >>> > wrote:

> Hi Lin,
> We are working on getting Pig on spark functional with 0.8.0, have you
> got it working on any spark version ?
> Also what all functionality works on it?
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Mon, Mar 10, 2014 at 11:00 PM, Xiangrui Meng wrote:
>
>> Hi Sameer,
>>
>> Lin (cc'ed) could also give you some updates about Pig on Spark
>> development on her side.
>>
>> Best,
>> Xiangrui
>>
>> On Mon, Mar 10, 2014 at 12:52 PM, Sameer Tilak 
>> wrote:
>> > Hi Mayur,
>> > We are planning to upgrade our distribution MR1> MR2 (YARN) and the
>> goal is
>> > to get SPROK set up next month. I will keep you posted. Can you
>> please keep
>> > me informed about your progress as well.
>> >
>> > 
>> > From: mayur.rust...@gmail.com
>> > Date: Mon, 10 Mar 2014 11:47:56 -0700
>> >
>> > Subject: Re: Pig on Spark
>> > To: user@spark.apache.org
>> >
>> >
>> > Hi Sameer,
>> > Did you make any progress on this. My team is also trying it out
>> would love
>> > to know some detail so progress.
>> >
>> > Mayur Rustagi
>> > Ph: +1 (760) 203 3257
>> > http://www.sigmoidanalytics.com
>> > @mayur_rustagi
>> >
>> >
>> >
>> > On Thu, Mar 6, 2014 at 2:20 PM, Sameer Tilak 
>> wrote:
>> >
>> > Hi Aniket,
>> > Many thanks! I will check this out.
>> >
>> > 
>> > Date: Thu, 6 Mar 2014 13:46:50 -0800
>> > Subject: Re: Pig on Spark
>> > From: aniket...@gmail.com
>> > To: user@spark.apache.org; tgraves...@yahoo.com
>> >
>> >
>> > There is some work to make this work on yarn at
>> > https://github.com/aniket486/pig. (So, compile pig with ant
>> > -Dhadoopversion=23)
>> >
>> > You can look at
>> https://github.com/aniket486/pig/blob/spork/pig-spark to
>> > find out what sort of env variables you need (sorry, I haven't been
>> able to
>> > clean this up- in-progress). There are few known issues with this,
>> I will
>> > work on fixing them soon.
>> >
>> > Known issues-
>> > 1. Limit does not work (spork-fix)
>> > 2. Foreach requires to turn off schema-tuple-backend (should be a
>> pig-jira)
>> > 3. Algebraic udfs dont work (spork-fix in-progress)
>> > 4. Group by rework (to avoid OOMs)
>> > 5. UDF Classloader issue (requires SPARK-1053, then you can put
>> > pig-withouthadoop.jar as SPARK_JARS in SparkContext along with udf
>> jars)
>> >
>> > ~Aniket
>> >
>> >
>> >
>> >
>> > On Thu, Mar 6, 2014 at 1:36 PM, Tom Graves 
>> wrote:
>> >
>> > I had asked a similar question on the dev mailing list a while back
>> (Jan
>> > 22nd).
>> >
>> > See the archives:
>> >
>> http://mail-arc

Re: is it possible to initiate Spark jobs from Oozie?

2014-04-10 Thread Konstantin Kudryavtsev
I believe you need to write custom action or engage java action
On Apr 10, 2014 12:11 AM, "Segerlind, Nathan L" <
nathan.l.segerl...@intel.com> wrote:

>  Howdy.
>
>
>
> Is it possible to initiate Spark jobs from Oozie (presumably as a java
> action)? If so, are there known limitations to this?  And would anybody
> have a pointer to an example?
>
>
>
> Thanks,
>
> Nate
>
>
>


Re: Shark CDH5 Final Release

2014-04-10 Thread chutium
hi, you can take a look here:

http://www.abcn.net/2014/04/install-shark-on-cdh5-hadoop2-spark.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shark-CDH5-Final-Release-tp3826p4055.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Where does println output go?

2014-04-10 Thread wxhsdp
rdd.foreach(p => {
 print(p)
})

The above closure gets executed on workers, you need to look at the logs of
the workers to see the output.

but if i'm in local mode, where's the logs of local driver, there are no
/logs and /work dirs in /SPARK_HOME which are set in standalone mode.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Where-does-println-output-go-tp2202p4054.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.