Re: Testing spark applications

2015-05-21 Thread Josh Rosen
I think that @holdenk's *spark-testing-base* project publishes some of
these test classes as well as some helper classes for testing streaming
jobs: https://github.com/holdenk/spark-testing-base

On Thu, May 21, 2015 at 10:39 PM, Reynold Xin  wrote:

> It is just 15 lines of code to copy, isn't it?
>
> On Thu, May 21, 2015 at 7:46 PM, Nathan Kronenfeld <
> nkronenfeld@uncharted.software> wrote:
>
>> see discussions about Spark not really liking multiple contexts in the
>>> same JVM
>>>
>>
>> Speaking of this - is there a standard way of writing unit tests that
>> require a SparkContext?
>>
>> We've ended up copying out the code of SharedSparkContext to our own
>> testing hierarchy, but it occurs to me someone would have published a test
>> jar by now if that was the best way.
>>
>>   -Nathan
>>
>>
>


Re: Testing spark applications

2015-05-21 Thread Reynold Xin
It is just 15 lines of code to copy, isn't it?

On Thu, May 21, 2015 at 7:46 PM, Nathan Kronenfeld <
nkronenfeld@uncharted.software> wrote:

> see discussions about Spark not really liking multiple contexts in the
>> same JVM
>>
>
> Speaking of this - is there a standard way of writing unit tests that
> require a SparkContext?
>
> We've ended up copying out the code of SharedSparkContext to our own
> testing hierarchy, but it occurs to me someone would have published a test
> jar by now if that was the best way.
>
>   -Nathan
>
>


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-05-21 Thread Dibyendu Bhattacharya
Hi Tathagata,

Thanks for looking into this. Further investigating I found that the issue
is with Tachyon does not support File Append. The streaming receiver which
writes to WAL when failed, and again restarted, not able to append to same
WAL file after restart.

I raised this with Tachyon user group, and Haoyuan told that within 3
months time Tachyon file append will be ready. Will revisit this issue
again then .

Regards,
Dibyendu


On Fri, May 22, 2015 at 12:24 AM, Tathagata Das  wrote:

> Looks like somehow the file size reported by the FSInputDStream of
> Tachyon's FileSystem interface, is returning zero.
>
> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Just to follow up this thread further .
>>
>> I was doing some fault tolerant testing of Spark Streaming with Tachyon
>> as OFF_HEAP block store. As I said in earlier email, I could able to solve
>> the BlockNotFound exception when I used Hierarchical Storage of Tachyon
>> ,  which is good.
>>
>> I continue doing some testing around storing the Spark Streaming WAL and
>> CheckPoint files also in Tachyon . Here is few finding ..
>>
>>
>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>> throughput is much higher . I tested the Driver and Receiver failure cases
>> , and Spark Streaming is able to recover without any Data Loss on Driver
>> failure.
>>
>> *But on Receiver failure , Spark Streaming looses data* as I see
>> Exception while reading the WAL file from Tachyon "receivedData" location
>>  for the same Receiver id which just failed.
>>
>> If I change the Checkpoint location back to HDFS , Spark Streaming can
>> recover from both Driver and Receiver failure .
>>
>> Here is the Log details when Spark Streaming receiver failed ...I raised
>> a JIRA for the same issue :
>> https://issues.apache.org/jira/browse/SPARK-7525
>>
>>
>>
>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>> (epoch 1)*
>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
>> remove executor 2 from BlockManagerMaster.
>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
>> block manager BlockManagerId(2, 10.252.5.54, 45789)
>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>> successfully in removeExecutor
>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>> receiver for stream 2 from 10.252.5.62*:47255
>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
>> 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>> not read data from write ahead log record
>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>> )*
>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>> at scala.Option.getOrElse(Option.scala:120)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.IllegalArgumentException:* Seek position is past
>> EOF: 645603894, fileSize = 0*
>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>> at
>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>> at
>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>> ... 15 more
>>
>> INFO : org.a

Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Koert Kuipers
we also launch jobs programmatically, both on standalone mode and
yarn-client mode. in standalone mode it always worked, in yarn-client mode
we ran into some issues and were forced to use spark-submit, but i still
have on my todo list to move back to a normal java launch without
spark-submit at some point.
for me spark is a library that i use to do distributed computations within
my app, and ideally a  library should not tell me how to launch my app. i
mean, if multiple libraries that i use all had their own launch script i
would get stuck very quickly hadoop jar vs spark-submit vs kiji launch
vs hbase jar bad idea, i think!

however i do understand the practical reasons why spark-submit can about...


On Thu, May 21, 2015 at 10:30 PM, Nathan Kronenfeld <
nkronenfeld@uncharted.software> wrote:

>
>
>> In researching and discussing these issues with Cloudera and others,
>> we've been told that only one mechanism is supported for starting Spark
>> jobs: the *spark-submit* scripts.
>>
>
> Is this new? We've been submitting jobs directly from a programatically
> created spark context (instead of through spark-submit) from the beginning
> (from 0.7.x to 1.2) - to a local cluster.
>
> In moving to 1.3 on Yarn cluster recently, we've had no end of problems
> trying to switch this over (though I think we're almost there).
>
> Why would one want to eliminate this possibility?
>
>  -Nathan
>
>


Testing spark applications

2015-05-21 Thread Nathan Kronenfeld
>
> see discussions about Spark not really liking multiple contexts in the
> same JVM
>

Speaking of this - is there a standard way of writing unit tests that
require a SparkContext?

We've ended up copying out the code of SharedSparkContext to our own
testing hierarchy, but it occurs to me someone would have published a test
jar by now if that was the best way.

  -Nathan


Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Nathan Kronenfeld
 Thanks, Marcelo


> Instantiating SparkContext directly works. Well, sorta: it has
> limitations. For example, see discussions about Spark not really liking
> multiple contexts in the same JVM. It also does not work in "cluster"
> deploy mode.
>
> That's fine - when one is doing something out of standard, one expects a
bit of Caveat Emptor.


Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Marcelo Vanzin
Hi Nathan,

On Thu, May 21, 2015 at 7:30 PM, Nathan Kronenfeld <
nkronenfeld@uncharted.software> wrote:

>
>
>> In researching and discussing these issues with Cloudera and others,
>> we've been told that only one mechanism is supported for starting Spark
>> jobs: the *spark-submit* scripts.
>>
>
> Is this new? We've been submitting jobs directly from a programatically
> created spark context (instead of through spark-submit) from the beginning
> (from 0.7.x to 1.2) - to a local cluster.
>
> In moving to 1.3 on Yarn cluster recently, we've had no end of problems
> trying to switch this over (though I think we're almost there).
>
>
Instantiating SparkContext directly works. Well, sorta: it has limitations.
For example, see discussions about Spark not really liking multiple
contexts in the same JVM. It also does not work in "cluster" deploy mode.

-- 
Marcelo


Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Nathan Kronenfeld
> In researching and discussing these issues with Cloudera and others, we've
> been told that only one mechanism is supported for starting Spark jobs: the
> *spark-submit* scripts.
>

Is this new? We've been submitting jobs directly from a programatically
created spark context (instead of through spark-submit) from the beginning
(from 0.7.x to 1.2) - to a local cluster.

In moving to 1.3 on Yarn cluster recently, we've had no end of problems
trying to switch this over (though I think we're almost there).

Why would one want to eliminate this possibility?

 -Nathan


Re: Re: Low throughput and effect of GC in SparkSql GROUP BY

2015-05-21 Thread Pramod Biligiri
Hi Zhang,
No my data is not compressed. I'm trying to minimize the load on the CPU.
The GC time reduced for me after codegen.

Pramod

On Thu, May 21, 2015 at 3:43 AM, zhangxiongfei 
wrote:

> Hi Pramod
>
>  Is your data compressed? I encountered similar problem,however, after
> turned codegen on, the GC time was still very long.The size of  input data
> for my map task is about 100M lzo file.
> My query is ""select ip, count(*) as c from stage_bitauto_adclick_d group
> by ip sort by c limit 100""
>
> Thanks
> Zhang Xiongfei
>
>
>
> At 2015-05-21 12:18:35, "Reynold Xin"  wrote:
>
> Does this turn codegen on? I think the performance is fairly different
> when codegen is turned on.
>
> For 1.5, we are investigating having codegen on by default, so users get
> much better performance out of the box.
>
>
> On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri  > wrote:
>
>> Hi,
>> Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have
>> a data point regarding the performance of Group By, indicating there's
>> excessive GC and it's impacting the throughput. I want to know if the new
>> memory manager for aggregations (
>> https://github.com/apache/spark/pull/5725/) is going to address this
>> kind of issue.
>>
>> I only have a small amount of data on each node (~360MB) with a large
>> heap size (18 Gig). I still see 2-3 minor collections happening whenever I
>> do a Select Sum() with a group by(). I have tried with different sizes for
>> Young Generation without much effect, though not with different GC
>> algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps).
>>
>> I have made a chart of my results [1] by adding timing code to
>> Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab
>> benchmark, running over 10 million records. The chart is from one of the 4
>> worker nodes in the cluster.
>>
>> I am trying to square this with a claim on the Project Tungsten blog post
>> [2]: "When profiling Spark user applications, we’ve found that a large
>> fraction of the CPU time is spent waiting for data to be fetched from main
>> memory. "
>>
>> Am I correct in assuming that SparkSql is yet to reach that level of
>> efficiency, at least in aggregation operations?
>>
>> Thanks.
>>
>> [1] -
>> https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174
>> [2]
>> https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
>>
>> Pramod
>>
>
>
>
>


Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Marcelo Vanzin
Hi Kevin,

I read through your e-mail and I see two main things you're talking about.

- You want a public YARN "Client" class and don't really care about
anything else.

In you message you already mention why that's not a good idea. It's much
better to have a standardized submission API. As you noticed by working
with the previous Client API, it's not for the faint of heart; SparkSubmit
hides a lot of the complexity, and does so in a way that is transparent to
the caller. Whether you're submitting a Scala, Python, or R app against
standlone, yarn, mesos, or local, the interface is the same.

You may argue that for your particular use case you don't care about
anything other than Scala apps on YARN cluster mode, but Spark does need to
care about more than that.

I still think that once we have a way to expose more information about the
application being launched (more specifically, the app id), then doing
anything else you may want to do that is specific to YARN is up to you and
pretty easy to do. But I strongly believe that having different ways to
launch apps in Spark is not good design.


- You have some restriction that you app servers cannot fork processes

Honestly, I didn't really understand what that is about. Why can't you fork
processes? Is it a restriction regarding what you can deploy on the server
(e.g. you cannot have a full Spark installation, everything needs to be
contained in a jar that is deployed in the app server)?

I really don't believe this is about the inability to fork a process, so it
must be something else.

The unfortunate reality is that Spark is really not friendly multiple
things being launched from the same JVM. Doing that is prone to apps
running all over each other and overwriting configs and other things, which
would lead to many, many tears. Once the limitations around that are fixed,
then we can study adding a way to launch multiple Spark apps from the same
JVM, but right now that's just asking for (hard to debug) trouble.

It might be possible to add support for launching subprocesses without
having to invoke the shell scripts; that would have limitations (e.g. no
"spark-env.sh" support). In fact I did something like that in the first
implementation of the launcher library, but was asked to go through the
shell scripts during code review. (I even had a different method that
launched in the same VM, but that one suffered from all the problems
described in the paragraph above.)


On Thu, May 21, 2015 at 5:21 PM, Kevin Markey 
wrote:

>  This is an excellent discussion.  As mentioned in an earlier email, we
> agree with a number of Chester's suggestions, but we have yet other
> concerns.  I've researched this further in the past several days, and I've
> queried my team.  This email attempts to capture those other concerns.
>
> Making *yarn.Client* private has prevented us from moving from Spark
> 1.0.x to Spark 1.2 or 1.3 despite many alluring new features. The
> SparkLauncher, which provides “support for programmatically running Spark
> jobs” (SPARK-3733 and SPARK-4924) will not work in our environment or for
> our use case -- which requires programmatically initiating and monitoring
> Spark jobs on Yarn in cluster mode *from a cloud-based application server*.
>
>
> It is not just that the Yarn *ApplicationId* is no longer directly or
> indirectly available. More critically, it violates constraints imposed by
> any application server and additional constraints imposed by security,
> process, and dynamic resource allocation requirements in our cloud services
> environment.
>
> In Spark 1.0 and 1.1, with *yarn.Client* *public*, our applications' *job
> scheduler* marshalls configuration and environmental resources necessary
> for any Spark job, including cluster-, data- or job-specific parameters,
> makes the appropriate calls to initialize and run *yarn.Client*, which
> together with the other classes in the spark-yarn module requests the Yarn
> resource manager to start and monitor a job (see Figure 1) on the cluster.
> (Our job scheduler is not Yarn replacement; it leverages Yarn to coordinate
> a variety of different Spark analytic and data enrichment jobs.)
>
> More recent Spark versions make *yarn.Client* *private* and thus remove
> that capability, but the *SparkLauncher*, scheduled for Spark 1.4,
> replaces this simple programmatic solution with one considerably more
> complicated. Based on our understanding, in this scenario, our job
> scheduler marshalls configuration and environmental resources for the 
> *SparkLauncher
> *much as it did for *yarn.Client*. It then calls *launch() *to initialize
> a new Linux process to execute the *spark-submit *shell script with the
> specified configuration and environment, which in turn starts a new JVM
> (with the Spark assembly jar in its class path) that executes *launcher.Main.
> *This ultimately calls *yarn.Client* (see Figure 2). This is more than an
> arm's-length transaction. There are three legs: job scheduler 
> *S

RE: Using CUDA within Spark / boosting linear algebra

2015-05-21 Thread Ulanov, Alexander
Hi,

There is a major update on the benchmarks. I've performed them on a newer 
hardware with 2 CPUs and 3 GPUs. The latter can be used by NVBLAS for 
parallelizing matrix-matrix multiplication. Results are in the same spreadsheet 
as previously:
https://docs.google.com/spreadsheets/d/1lWdVSuSragOobb0A_oeouQgHUMx378T9J5r7kwKSPkY/edit?usp=sharing
Previous results are on the separate sheet of the same document. I have also 
created a github page with benchmark source code and few explanatory comments:
https://github.com/avulanov/scala-blas

I was able to use all 3 GPUs for NVBLAS but BIDMat used only one GPU. 

John, could you suggest how to force BIDMat to use all GPUs? Also, could you 
suggest how to test Double matrices multiplication in BIDMat-cuda (in GPU and 
with copy from/to main memory)?

Best regards, Alexander


-Original Message-
From: Ulanov, Alexander 
Sent: Wednesday, April 01, 2015 12:11 PM
To: Xiangrui Meng; Sean Owen
Cc: Evan R. Sparks; Sam Halliday; dev@spark.apache.org; jfcanny
Subject: RE: Using CUDA within Spark / boosting linear algebra

FYI, I've added instructions to Netlib-java wiki, Sam added the link to them 
from the project's readme.md https://github.com/fommil/netlib-java/wiki/NVBLAS

Best regards, Alexander
-Original Message-
From: Xiangrui Meng [mailto:men...@gmail.com]
Sent: Monday, March 30, 2015 2:43 PM
To: Sean Owen
Cc: Evan R. Sparks; Sam Halliday; dev@spark.apache.org; Ulanov, Alexander; 
jfcanny
Subject: Re: Using CUDA within Spark / boosting linear algebra

Hi Alex,

Since it is non-trivial to make nvblas work with netlib-java, it would be great 
if you can send the instructions to netlib-java as part of the README. 
Hopefully we don't need to modify netlib-java code to use nvblas.

Best,
Xiangrui

On Thu, Mar 26, 2015 at 9:54 AM, Sean Owen  wrote:
> The license issue is with libgfortran, rather than OpenBLAS.
>
> (FWIW I am going through the motions to get OpenBLAS set up by default 
> on CDH in the near future, and the hard part is just handling
> libgfortran.)
>
> On Thu, Mar 26, 2015 at 4:07 PM, Evan R. Sparks  wrote:
>> Alright Sam - you are the expert here. If the GPL issues are 
>> unavoidable, that's fine - what is the exact bit of code that is GPL?
>>
>> The suggestion to use OpenBLAS is not to say it's the best option, 
>> but that it's a *free, reasonable default* for many users - keep in 
>> mind the most common deployment for Spark/MLlib is on 64-bit linux on EC2[1].
>> Additionally, for many of the problems we're targeting, this 
>> reasonable default can provide a 1-2 orders of magnitude improvement 
>> in performance over the f2jblas implementation that netlib-java falls back 
>> on.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For 
> additional commands, e-mail: dev-h...@spark.apache.org
>
B CB  [  
X  ܚX KK[XZ[
 ] ][  X  ܚX P \ ˘\X K ܙ B  ܈Y][ۘ[  [X[  K[XZ[
 ] Z[ \ ˘\X K ܙ B 


Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Kevin Markey

  
  


  This is an excellent discussion.  As mentioned in an earlier
  email, we agree with a number of Chester's suggestions, but we
  have yet other concerns.  I've researched this further in the past
  several days, and I've queried my team.  This email attempts to
  capture those other concerns.

Making yarn.Client private has prevented us from
  moving from
  Spark 1.0.x to Spark 1.2 or 1.3 despite many alluring new
  features. The SparkLauncher, which provides “support for
  programmatically running Spark jobs” (SPARK-3733 and SPARK-4924)
  will not work in our environment or for our use case -- which
  requires programmatically initiating and monitoring Spark jobs on
  Yarn in cluster mode from a cloud-based application server.

It is not just
  that
  the Yarn ApplicationId is no longer directly or indirectly
  available. More critically, it violates constraints imposed by any
  application server and additional constraints imposed by security,
  process, and dynamic resource allocation requirements in our cloud
  services environment. 
In Spark 1.0 and
  1.1, with yarn.Client public, our applications' job
scheduler marshalls configuration and environmental
  resources
  necessary for any Spark job, including cluster-, data- or
  job-specific parameters, makes the appropriate calls to initialize
  and run yarn.Client, which together with the other classes
  in
  the spark-yarn module requests the Yarn resource manager to start
  and
  monitor a job (see Figure 1) on the cluster. (Our job scheduler is
  not Yarn replacement; it leverages Yarn to coordinate a variety of
  different Spark analytic and data enrichment jobs.)
More recent Spark
  versions make yarn.Client private and thus remove
  that
  capability, but the SparkLauncher,
  scheduled for Spark 1.4, replaces this simple programmatic
  solution
  with one considerably more complicated. Based on our
  understanding,
  in this scenario, our job scheduler marshalls configuration and
  environmental resources for the SparkLauncher much as it
  did
  for yarn.Client. It then calls launch() to
  initialize
  a new Linux process to execute the spark-submit shell
  script
  with the specified configuration and environment, which in turn
  starts a new JVM (with the Spark assembly jar in its class path)
  that
  executes launcher.Main. This
  ultimately calls yarn.Client (see Figure 2). This
  is
  more than an arm's-length transaction. There are three legs: job
  scheduler SparkLauncher.launch() call → spark-submit
  bash
  execution → launcher.Main call to yarn.Client →
  Yarn resource manager allocation and execution of job driver and
  executors.
Not only is this
  scenario unnecessarily complicated, it will simply not work. The
  “programmatic” call to SparkLauncher.launch() starts a new
JVM, which is not allowed in any application server, which
  must
  own all its JVMs. Perhaps, spark-submit and the launcher.Main
  JVM process could be hosted outside the application server,
  but
  in violation of security and multiple-tenant cloud architectural
  constraints. 
We appreciate that
  yarn.Client was perhaps never intended to be public. Configuring
  it
  is not for the faint-of-heart, and some of its methods should
  indeed
  be private. We wonder whether there is another option.
In researching and
  discussing these issues with Cloudera and others, we've been told
  that only one mechanism is supported for starting Spark jobs: the
  spark-submit scripts. We also have gathered (perhaps
  mistakenly) from discussions
  reaching back 20 months that Spark's intention is to have a
  unified
  job submission interface for all supported platforms.
  Unfortunately
  this doesn't recognize the asymmetries among those platforms.
  Submitting a local Spark job or a job to a Spark master in cluster
  mode may indeed require initializing a separate process in order
  to
  pass configuration parameters via the environment and command
  line. But Spark's yarn.Client in cluster mode already has
  an arm's
  length relationship with the Yarn resource manager. Configuration
  may be passed from the job scheduling application to yarn.Client
  as Strings or property map variables and method parameters. 
Our request is for
  a public yarn.Client
  or some reasonable facsimile.
Thanks.






On 05/13/2015 08:22 PM, Patrick Wendell
  wrote:


  Hey Chester,

Thanks for sending this. It's very helpful to have this list.

The reason we made the Client API private was that it was never
intended to be used by t

Re: Contribute code to MLlib

2015-05-21 Thread Trevor Grant
Thank you Ram and Joseph.

I am also hoping to contribute to MLib once my Scala gets up to snuff, this
is the guidance I needed for how to proceed when ready.

Best wishes,
Trevor



On Wed, May 20, 2015 at 1:55 PM, Joseph Bradley 
wrote:

> Hi Trevor,
>
> I may be repeating what Ram said, but to 2nd it, a few points:
>
> We do want MLlib to become an extensive and rich ML library; as you said,
> scikit-learn is a great example.  To make that happen, we of course need to
> include important algorithms.  "Important" is hazy, but roughly means being
> useful to a large number of users, improving a large number of use cases
> (above what is currently available), and being well-established and tested.
>
> Others and I may not be familiar with Tarek's algorithm (since it is so
> new), so it will be important to discuss details on JIRA to establish the
> cases in which the algorithm improves over current PCA.  That may require
> discussion, community testing, etc.  If we establish that it is a clear
> improvement in a large domain, then it could be valuable to have in MLlib
> proper.  It's always going to be hard to tell where to draw the line, so
> less common algorithms will require more testing before we commit to
> including them in MLlib.
>
> I like the Spark package suggestion since it would allow users immediately
> start using the code, while the discussion on JIRA happens.  (Plus, if
> package users find it useful, they can report that on the JIRA.)
>
> Joseph
>
> On Wed, May 20, 2015 at 10:01 AM, Ram Sriharsha 
> wrote:
>
>> Hi Trevor
>>
>> I'm attaching the MLLib contribution guideline here:
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-MLlib-specificContributionGuidelines
>>
>> It speaks to widely known and accepted algorithms but not to whether an
>> algorithm has to be better than another in every scenario etc
>>
>> I think the guideline explains what a good contribution to the core
>> library should look like better than I initially attempted to !
>>
>> Sent from my iPhone
>>
>> On May 20, 2015, at 9:31 AM, Ram Sriharsha 
>> wrote:
>>
>> Hi Trevor
>>
>> Good point, I didn't mean that some algorithm has to be clearly better
>> than another in every scenario to be included in MLLib. However, even if
>> someone is willing to be the maintainer of a piece of code, it does not
>> make sense to accept every possible algorithm into the core library.
>>
>> That said, the specific algorithms should be discussed in the JIRA: as
>> you point out, there is no clear way to decide what algorithm to include
>> and what not to, and usually mature algorithms that serve a wide variety of
>> scenarios are easier to argue about but nothing prevents anyone from
>> opening a ticket to discuss any specific machine learning algorithm.
>>
>> My suggestion was simply that for purposes of making experimental or
>> newer algorithms available to Spark users, it doesn't necessarily have to
>> be in the core library. Spark packages are good enough in this respect.
>>
>> Isn't it better for newer algorithms to take this route and prove
>> themselves before we bring them into the core library? Especially given the
>> barrier to using spark packages is very low.
>>
>> Ram
>>
>>
>>
>> On Wed, May 20, 2015 at 9:05 AM, Trevor Grant 
>> wrote:
>>
>>> Hey Ram,
>>>
>>> I'm not speaking to Tarek's package specifically but to the spirit of
>>> MLib.  There are a number of method/algorithms for PCA, I'm not sure by
>>> what criterion the current one is considered 'standard'.
>>>
>>> It is rare to find ANY machine learning algo that is 'clearly better'
>>> than any other.  They are all tools, they have their place and time.  I
>>> agree that it makes sense to field new algorithms as packages and then
>>> integrate into MLib once they are 'proven' (in terms of
>>> stability/performance/anyone cares).  That being said, if MLib takes the
>>> stance that 'what we have is good enough unless something is *clearly*
>>> better', then it will never grow into a suite with the depth and richness
>>> of sklearn. From a practitioner's stand point, its nice to have everything
>>> I could ever want ready in an 'off-the-shelf' form.
>>>
>>> 'A large number of use cases better than existing' shouldn't be a
>>> criteria when selecting what to include in MLib.  The important question
>>> should be, 'Are you willing to take on responsibility for maintaining this
>>> because you may be the only person on earth who understands the mechanics
>>> AND how to code it?'.   Obviously we don't want any random junk algo
>>> included.  But trying to say, 'this way of doing PCA is better than that
>>> way in a large class of cases' is like trying to say 'geometry is more
>>> important than calculus in large class of cases", maybe its true- but
>>> geometry won't help you if you are in a case where you need calculus.
>>>
>>> This all relies on the assumption that MLib is destined to be a rich
>>> data science/machine learn

Re: Adding/Using More Resolution Types on JIRA

2015-05-21 Thread Sean Owen
On Thu, May 21, 2015 at 10:03 PM, Santiago Mola  wrote:
> Sure. That is why I was talking about the Inactive resolution specifically. 
> The
> combination of Priority + other statuses are enough to solve these issues. A
> minor/trivial issue that is incomplete is probably not going to hurt much to
> someone looking for critical open issues.

If you mean you intend to consider them resolved, then yeah we agree a
lot. The names of the resolved states don't matter nearly so much to
me. For instance, in your examples, I could call those CannotReproduce
or Incomplete. I would not want to leave them Open in that state
indefinitely.

> On a side-note, I would like to contribute some time on improving this. When
> identifying this kind of issue, should I ask in the issue itself to resolve 
> it in a
> specific way?

I think reviewing JIRAs actually contributes to a better overall
process, so I'd just dive in. Anything to advance a JIRA / PR to a
resolution is very helpful. Ask for more info, investigate a problem
to confirm it or fail to reproduce, propose a fix, identify
duplicates, flag JIRAs for closing, review changes, say you think the
change is good, etc. -- among the most helpful things anyone can do
right now IMHO.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Adding/Using More Resolution Types on JIRA

2015-05-21 Thread Santiago Mola
2015-05-21 22:39 GMT+02:00 Sean Owen :
>
> I don't think sorting helps or that browsing is the issue. What if
> you're searching for Open Critical issues concerning Pyspark? If the
> list is full of issues that are actually out of scope, later, won't
> fix, then that's a problem.

Sure. That is why I was talking about the Inactive resolution specifically.
The
combination of Priority + other statuses are enough to solve these issues. A
minor/trivial issue that is incomplete is probably not going to hurt much to
someone looking for critical open issues.

>
> Game for whose benefit? nobody is being evaluated on this stuff. This
> is being proposed for real reasons, not for fun.
>

Sorry. That was unfortunate on my side.

> A bunch of JIRA cruft is a symptom, not a cause. Something is wrong
> somewhere if people file JIRAs and they go nowhere.
>[...]
> I think it's more useful to actually close these to communicate back
> clearly what is not going to be accepted. Things can be reopened if
> needed. Silently ignoring them forever as an Open JIRA seems less
> constructive.
> [...]
> Yes, best to try to make the process better. That's why I started with
> things like a more comprehensive
> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark
> to make better contributions in the first place. By the time dead
> JIRAs are closed, something's already gone wrong and time has been
> wasted. But we still need that culture of not letting stuff sit
> around.
>

Agreed. That is basically my point.

On a side-note, I would like to contribute some time on improving this. When
identifying this kind of issue, should I ask in the issue itself to resolve
it in a
specific way?

Best,
-- 

Santiago M. Mola



Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
*


Re: Tungsten's Vectorized Execution

2015-05-21 Thread Davies Liu
We have not start to prototype the vectorized one yet, will evaluated
in 1.5 and may targeted for 1.6.

We're glad to hear some feedback/suggestions/comments from your side!

On Thu, May 21, 2015 at 9:37 AM, Yijie Shen  wrote:
> Hi all,
>
> I’ve seen the Blog of Project Tungsten here, it sounds awesome to me!
>
> I’ve also noticed there is a plan to change the code generation from
> record-at-a-time evaluation to a vectorized one, which interests me most.
>
> What’s the status of vectorized evaluation?  Is this an inner effort of
> Databricks or welcome to be involved?
>
> Since I’ve done similar stuffs on Spark SQL, I would like to get involved if
> that’s possible.
>
>
> Yours,
>
> Yijie

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Spark MOOC - early access

2015-05-21 Thread Marco Shaw
*Hi Spark Devs and Users,BerkeleyX and Databricks are currently developing
two Spark-related MOOC on edX (intro
,
ml
),
the first of which starts on June 1st.  Together these courses have over
75K enrolled students!To help students perform exercises course content, we
have created a Vagrant box that contains Spark and IPython (running on
Ubuntu 32-bit).  This will simplify user setup and helps us support them.
We are writing to give you early access to the VM environment and the first
assignment, and to request your help to test out the VM/assignment before
we unleash it to 75K people (see instructions below). We’ve provided
instructions below.  We’re happy to help if you have any difficulties
getting the VM setup; please feel free to contact me (marco.s...@gmail.com
)  with any issues, comments, or
questions.Sincerely,Marco ShawSpark MOOC TA_(This is being sent
as an HTML formatted email.  Some of the links have been duplicated just in
case.)1. Install VirtualBox here
 on your OS (see Windows
tutorial here 
(https://www.youtube.com/watch?v=06Sf-m64fcY
))2. Install Vagrant here
 on your OS (see Windows tutorial
here 
(https://www.youtube.com/watch?v=LZVS23BaA1I
))3) Install virtual machine
using the following steps: (see Windows tutorial here

(https://www.youtube.com/watch?v=ZuJCqHC7IYc
))a. Create a custom directory
(e.g. c:\users\marco\myvagrant or /home/marco/myvagrant)b. Download the
file

to the custom directory (NOTE: It must be named exactly "Vagrantfile" with
no extension)c. Open a DOS prompt (Windows) or terminal (Mac/Linux) to the
custom directory and issue the command "vagrant up"4) Perform basic
commands in VM as described below: (see Windows tutorial here

(https://www.youtube.com/watch?v=bkteLH77IR0
))a. To start the VM, from a
DOS prompt (Windows) or terminal (Mac/Linux), issue the command "vagrant
up".b. To stop the VM, from a DOS prompt (Windows) or terminal (Mac/Linux),
issue the command "vagrant halt".c. To erase or delete the VM, from a DOS
prompt (Windows) or terminal (Mac/Linux), issue the command "vagrant
destroy".d. Once the VM is running, to access the notebook, open a web
browser to "http://localhost:8001 ".5) Using test
notebook as described below: (see Windows tutorial here

(https://www.youtube.com/watch?v=mlfAmyF3Q-s
))a. To start the VM, from a
DOS prompt (Windows) or terminal (Mac/Linux), issue the command "vagrant
up".b. Once the VM is running, to access the notebook, open a web browser
to "http://localhost:8001 ".c. Upload this IPython
notebook:
https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/vm_test_student.ipynb
.d.
Run through the notebook.6) Play around with the first MOOC assignment
(email Marco for details when you get to this point).7) Please answer the
following questionsa. What machine are you using (OS, RAM, CPU, age)?b. How
long did the entire process take?c. How long did the VM download take?
Relatedly, where are you located?d. Do you have any other
comments/suggestions?*


Re: Adding/Using More Resolution Types on JIRA

2015-05-21 Thread Sean Owen
On Thu, May 21, 2015 at 9:06 PM, Santiago Mola  wrote:
>> Inactive - A feature or bug that has had no activity from users or
>> developers in a long time
>
> Why is this needed? Every JIRA listing can be sorted by activity. That gets
> the inactive ones out of your view quickly. I do not see any reason why an
> issue should be closed because of this. If it's inactive, maybe it's because
> it falls on some of the other categories (out of scope, later, won't fix).

I don't think sorting helps or that browsing is the issue. What if
you're searching for Open Critical issues concerning Pyspark? If the
list is full of issues that are actually out of scope, later, won't
fix, then that's a problem.

> That is a much more specific case than "Inactivity", and a lot of large scale
> open source projects use specific resolutions for this.

Yes, that's "CannotReproduce".

I think the walking-dead JIRAs we have in mind are some combination
of: a JIRA opened without a lot of detail, that might or might not be
a problem, nobody else seemed to have the problem and/or nobody cared
to investigate, much has changed since anyway so might be obsolete.
WontFix, CannotReproduce, NotAProblem are all possibly reasonable
resolutions. If this is just about semantics, I also don't feel a
strong need for a new state.


> On a more general note: what is the problem with open issues / pull requests?
> I see a tendency in the Spark project to do unusual things with issues / PRs
> just to maintain the numbers low. For example, closing PRs after a couple of
> weeks of inactivity just to shrink the queue or closing active issues just 
> for the
> shake of it.
>
> Honestly, this looks a lot like trying to game metrics. But maybe there is
> something that I am missing.

Game for whose benefit? nobody is being evaluated on this stuff. This
is being proposed for real reasons, not for fun.

A bunch of JIRA cruft is a symptom, not a cause. Something is wrong
somewhere if people file JIRAs and they go nowhere. Everyone's time is
wasted and with no conclusion, there's no feedback or learning
anywhere. So it keeps happening. Is it bad JIRAs? scope issues? lack
of follow up from developer or contributor? all of the above?

I actually think it's mostly bad JIRAs: too large, too invasive, not
that useful, hacky fixes to a facet of a problem, incomplete
description, duplicate, etc.

I think it's more useful to actually close these to communicate back
clearly what is not going to be accepted. Things can be reopened if
needed. Silently ignoring them forever as an Open JIRA seems less
constructive.


> Maybe what it is actually needed is to improve the lifecycle of an issue while
> it is alive, instead of trying to kill it earlier. Some examples of this that 
> are
> used on other projects are the "incomplete" status to signal that there is 
> more
> info required from the reporter in order to take further action. Also 
> "confirmed"
> to acknowledge that a bug is confirmed to be present and needs action by
> a developer.

Yes, best to try to make the process better. That's why I started with
things like a more comprehensive
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark
to make better contributions in the first place. By the time dead
JIRAs are closed, something's already gone wrong and time has been
wasted. But we still need that culture of not letting stuff sit
around.

I don't mind the idea of an Unresolved "InformationNeeded" status,
yeah. I don't actually think that would solve a problem though. The
dead JIRAs are ones that never got any follow up, or, that got a lot
of follow-up from the contributor but aren't going to be merged.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Adding/Using More Resolution Types on JIRA

2015-05-21 Thread Santiago Mola
Some examples to illustrate my point. A couple of issues from the oldest
open issues
in the SQL component:

[SQL] spark-sql exits while encountered an error
https://issues.apache.org/jira/browse/SPARK-4572
This is an incomplete report that nobody can take action on. It can be
resolved
as "Incomplete", instead of Inactive. In fact, there is no need to wait
that much
to close it as "Incomplete".

Using a field in a WHERE clause that is not in the schema does not throw an
exception
https://issues.apache.org/jira/browse/SPARK-5305
This is also "Incomplete", it is not reproducible currently and it is
missing version info.

Rather than a need of these new statuses, it seems that there is a need of
more
people assessing and triaging new issues.

Best,
-- 

Santiago M. Mola



Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
*


Re: Adding/Using More Resolution Types on JIRA

2015-05-21 Thread Santiago Mola
2015-05-12 9:50 GMT+02:00 Patrick Wendell :

>
> Inactive - A feature or bug that has had no activity from users or
> developers in a long time
>

Why is this needed? Every JIRA listing can be sorted by activity. That gets
the inactive ones out of your view quickly. I do not see any reason why an
issue should be closed because of this. If it's inactive, maybe it's because
it falls on some of the other categories (out of scope, later, won't fix).

I can only think about a case where closing an inactive issue makes sense:

*  A bug was reported long time ago. Nobody was able to reproduce (after
   trying actively) and the reporter is no longer around to provide more
info.

That is a much more specific case than "Inactivity", and a lot of large
scale
open source projects use specific resolutions for this.

On a more general note: what is the problem with open issues / pull
requests?
I see a tendency in the Spark project to do unusual things with issues / PRs
just to maintain the numbers low. For example, closing PRs after a couple of
weeks of inactivity just to shrink the queue or closing active issues just
for the
shake of it.

Honestly, this looks a lot like trying to game metrics. But maybe there is
something that I am missing.

Maybe what it is actually needed is to improve the lifecycle of an issue
while
it is alive, instead of trying to kill it earlier. Some examples of this
that are
used on other projects are the "incomplete" status to signal that there is
more
info required from the reporter in order to take further action. Also
"confirmed"
to acknowledge that a bug is confirmed to be present and needs action by
a developer.

Best,
-- 

Santiago M. Mola



Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
*


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-05-21 Thread Tathagata Das
Looks like somehow the file size reported by the FSInputDStream of
Tachyon's FileSystem interface, is returning zero.

On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Just to follow up this thread further .
>
> I was doing some fault tolerant testing of Spark Streaming with Tachyon as
> OFF_HEAP block store. As I said in earlier email, I could able to solve the
> BlockNotFound exception when I used Hierarchical Storage of Tachyon ,
>  which is good.
>
> I continue doing some testing around storing the Spark Streaming WAL and
> CheckPoint files also in Tachyon . Here is few finding ..
>
>
> When I store the Spark Streaming Checkpoint location in Tachyon , the
> throughput is much higher . I tested the Driver and Receiver failure cases
> , and Spark Streaming is able to recover without any Data Loss on Driver
> failure.
>
> *But on Receiver failure , Spark Streaming looses data* as I see
> Exception while reading the WAL file from Tachyon "receivedData" location
>  for the same Receiver id which just failed.
>
> If I change the Checkpoint location back to HDFS , Spark Streaming can
> recover from both Driver and Receiver failure .
>
> Here is the Log details when Spark Streaming receiver failed ...I raised a
> JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525
>
>
>
> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch
> 1)*
> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
> remove executor 2 from BlockManagerMaster.
> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
> block manager BlockManagerId(2, 10.252.5.54, 45789)
> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
> successfully in removeExecutor
> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
> receiver for stream 2 from 10.252.5.62*:47255
> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
> 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not
> read data from write ahead log record
> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
> )*
> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.IllegalArgumentException:* Seek position is past
> EOF: 645603894, fileSize = 0*
> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
> ... 15 more
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
> stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage
> 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException
> (Could not read data from write ahead log record
> FileBasedWriteAheadLogSegment(tachyon-ft://
> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
> [duplicate 1]
> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in
> stage 103.0 (TID 423, 10.2

Customizing Akka configuration for Spark

2015-05-21 Thread Akshat Aranya
Hi,

Is there some way to customize the Akka configuration for Spark?
Specifically, I want to experiment with custom serialization for messages
that are send between the driver and executors in standalone mode.

Thanks,
Akshat


Tungsten's Vectorized Execution

2015-05-21 Thread Yijie Shen
Hi all,

I’ve seen the Blog of Project Tungsten here, it sounds awesome to me!

I’ve also noticed there is a plan to change the code generation from 
record-at-a-time evaluation to a vectorized one, which interests me most.

What’s the status of vectorized evaluation?  Is this an inner effort of 
Databricks or welcome to be involved?   

Since I’ve done similar stuffs on Spark SQL, I would like to get involved if 
that’s possible.



Yours,

Yijie

Why use "lib_managed" for the Sbt build?

2015-05-21 Thread Iulian Dragoș
I’m trying to understand why Sbt is configured to pull all libs under
lib_managed.

   - it seems like unnecessary duplication (I will have those libraries
   under ./m2, via maven anyway)
   - every time I call make-distribution I lose lib_managed (via mvn clean
   install) and have to wait to download again all jars next time I use sbt
   - Eclipse does not handle relative paths very well (source attachments
   from lib_managed don’t always work)

So, what is the advantage of putting all dependencies in there, instead of
using the default `~/.ivy2`?

cheers,
iulian
​
-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Resource usage of a spark application

2015-05-21 Thread Ryan Williams
On Thu, May 21, 2015 at 5:22 AM Peter Prettenhofer <
peter.prettenho...@gmail.com> wrote:

> Thanks Akhil, Ryan!
>
> @Akhil: YARN can only tell me how much vcores my app has been granted but
> not actual cpu usage, right? Pulling mem/cpu usage from the OS means i need
> to map JVM executor processes to the context they belong to, right?
>
> @Ryan: what a great blog post -- this is super relevant for me to analyze
> the state of the cluster as a whole. However, it seems to me that those
> metrics are mostly reported globally and not per spark application.
>

Thanks! You can definitely analyze metrics per-application in several ways:

   - If you're running Spark on YARN, use the "app" URL param
   
   to specify a YARN application ID, which will set the Spark application ID
   as well as parse job start/end times.
   - Set the "prefix" URL param
   
   to your Spark app's ID, and all metrics will be namespaced to that app ID.
  - You actually have to do one of these two, otherwise it doesn't know
  what app's metrics to look for; it is set up specifically to view per-app
  metrics.
   - There is a dropdown in the upper-left of the page (sorry, don't have a
   screenshot right now) that will let you select from all app IDs that
   graphite has seen metrics from.

Let me know, here or in issues on the repo, if you have any issues with
that or that doesn't make sense!


>
> 2015-05-19 21:43 GMT+02:00 Ryan Williams :
>
>> Hi Peter, a few months ago I was using MetricsSystem to export to
>> Graphite and then view in Grafana; relevant scripts and some
>> instructions are here
>>  if you want to
>> take a look.
>>
>>
>> On Sun, May 17, 2015 at 8:48 AM Peter Prettenhofer <
>> peter.prettenho...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm looking for a way to measure the current memory / cpu usage of a
>>> spark application to provide users feedback how much resources are actually
>>> being used.
>>> It seems that the metric system provides this information to some
>>> extend. It logs metrics on application level (nr of cores granted) and on
>>> the JVM level (memory usage).
>>> Is this the recommended way to gather this kind of information? If so,
>>> how do i best map a spark application to the corresponding JVM processes?
>>>
>>> If not, should i rather request this information from the resource
>>> manager (e.g. Mesos/YARN)?
>>>
>>> thanks,
>>>  Peter
>>>
>>> --
>>> Peter Prettenhofer
>>>
>>
>
>
> --
> Peter Prettenhofer
>


Re: Resource usage of a spark application

2015-05-21 Thread Akhil Das
Yes Peter that's correct, you need to identify the processes and with that
you can pull the actual usage metrics.

Thanks
Best Regards

On Thu, May 21, 2015 at 2:52 PM, Peter Prettenhofer <
peter.prettenho...@gmail.com> wrote:

> Thanks Akhil, Ryan!
>
> @Akhil: YARN can only tell me how much vcores my app has been granted but
> not actual cpu usage, right? Pulling mem/cpu usage from the OS means i need
> to map JVM executor processes to the context they belong to, right?
>
> @Ryan: what a great blog post -- this is super relevant for me to analyze
> the state of the cluster as a whole. However, it seems to me that those
> metrics are mostly reported globally and not per spark application.
>
> 2015-05-19 21:43 GMT+02:00 Ryan Williams :
>
>> Hi Peter, a few months ago I was using MetricsSystem to export to
>> Graphite and then view in Grafana; relevant scripts and some
>> instructions are here
>>  if you want to
>> take a look.
>>
>>
>> On Sun, May 17, 2015 at 8:48 AM Peter Prettenhofer <
>> peter.prettenho...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm looking for a way to measure the current memory / cpu usage of a
>>> spark application to provide users feedback how much resources are actually
>>> being used.
>>> It seems that the metric system provides this information to some
>>> extend. It logs metrics on application level (nr of cores granted) and on
>>> the JVM level (memory usage).
>>> Is this the recommended way to gather this kind of information? If so,
>>> how do i best map a spark application to the corresponding JVM processes?
>>>
>>> If not, should i rather request this information from the resource
>>> manager (e.g. Mesos/YARN)?
>>>
>>> thanks,
>>>  Peter
>>>
>>> --
>>> Peter Prettenhofer
>>>
>>
>
>
> --
> Peter Prettenhofer
>


Re:Re: Low throughput and effect of GC in SparkSql GROUP BY

2015-05-21 Thread zhangxiongfei
Hi Pramod


 Is your data compressed? I encountered similar problem,however, after turned 
codegen on, the GC time was still very long.The size of  input data for my map 
task is about 100M lzo file.
My query is ""select ip, count(*) as c from stage_bitauto_adclick_d group by ip 
sort by c limit 100""


Thanks
Zhang Xiongfei




At 2015-05-21 12:18:35, "Reynold Xin"  wrote:

Does this turn codegen on? I think the performance is fairly different when 
codegen is turned on.


For 1.5, we are investigating having codegen on by default, so users get much 
better performance out of the box.




On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri  
wrote:

Hi,
Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have a 
data point regarding the performance of Group By, indicating there's excessive 
GC and it's impacting the throughput. I want to know if the new memory manager 
for aggregations (https://github.com/apache/spark/pull/5725/) is going to 
address this kind of issue.


I only have a small amount of data on each node (~360MB) with a large heap size 
(18 Gig). I still see 2-3 minor collections happening whenever I do a Select 
Sum() with a group by(). I have tried with different sizes for Young Generation 
without much effect, though not with different GC algorithms (Hm..I ought to 
try reducing the rdd storage fraction perhaps).


I have made a chart of my results [1] by adding timing code to 
Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab 
benchmark, running over 10 million records. The chart is from one of the 4 
worker nodes in the cluster.


I am trying to square this with a claim on the Project Tungsten blog post [2]: 
"When profiling Spark user applications, we’ve found that a large fraction of 
the CPU time is spent waiting for data to be fetched from main memory. "


Am I correct in assuming that SparkSql is yet to reach that level of 
efficiency, at least in aggregation operations?



Thanks.


[1] - 
https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174
[2] 
https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html


Pramod



Re: Resource usage of a spark application

2015-05-21 Thread Peter Prettenhofer
Thanks Akhil, Ryan!

@Akhil: YARN can only tell me how much vcores my app has been granted but
not actual cpu usage, right? Pulling mem/cpu usage from the OS means i need
to map JVM executor processes to the context they belong to, right?

@Ryan: what a great blog post -- this is super relevant for me to analyze
the state of the cluster as a whole. However, it seems to me that those
metrics are mostly reported globally and not per spark application.

2015-05-19 21:43 GMT+02:00 Ryan Williams :

> Hi Peter, a few months ago I was using MetricsSystem to export to Graphite
> and then view in Grafana; relevant scripts and some instructions are here
>  if you want to
> take a look.
>
>
> On Sun, May 17, 2015 at 8:48 AM Peter Prettenhofer <
> peter.prettenho...@gmail.com> wrote:
>
>> Hi all,
>>
>> I'm looking for a way to measure the current memory / cpu usage of a
>> spark application to provide users feedback how much resources are actually
>> being used.
>> It seems that the metric system provides this information to some extend.
>> It logs metrics on application level (nr of cores granted) and on the JVM
>> level (memory usage).
>> Is this the recommended way to gather this kind of information? If so,
>> how do i best map a spark application to the corresponding JVM processes?
>>
>> If not, should i rather request this information from the resource
>> manager (e.g. Mesos/YARN)?
>>
>> thanks,
>>  Peter
>>
>> --
>> Peter Prettenhofer
>>
>


-- 
Peter Prettenhofer


Re: Spark Streaming - Design considerations/Knobs

2015-05-21 Thread Hemant Bhanawat
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks
for reading and replying. However, I have a follow-up question:

I don't think if I understand the block replication completely. Are the
blocks replicated immediately after they are received by the receiver? Or
are they kept on the receiver node only and are moved only on shuffle? Has
the replication something to do with locality.wait?

Thanks,
Hemant

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das  wrote:

> Correcting the ones that are incorrect or incomplete. BUT this is good
> list for things to remember about Spark Streaming.
>
>
> On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat 
> wrote:
>
>> Hi,
>>
>> I have compiled a list (from online sources) of knobs/design
>> considerations that need to be taken care of by applications running on
>> spark streaming. Is my understanding correct?  Any other important design
>> consideration that I should take care of?
>>
>>
>>- A DStream is associated with a single receiver. For attaining read
>>parallelism multiple receivers i.e. multiple DStreams need to be created.
>>- A receiver is run within an executor. It occupies one core. Ensure
>>that there are enough cores for processing after receiver slots are booked
>>i.e. spark.cores.max should take the receiver slots into account.
>>- The receivers are allocated to executors in a round robin fashion.
>>- When data is received from a stream source, receiver creates blocks
>>of data.  A new block of data is generated every blockInterval
>>milliseconds. N blocks of data are created during the batchInterval where 
>> N
>>= batchInterval/blockInterval.
>>- These blocks are distributed by the BlockManager of the current
>>executor to the block managers of other executors. After that, the Network
>>Input Tracker running on the driver is informed about the block locations
>>for further processing.
>>- A RDD is created on the driver for the blocks created during the
>>batchInterval. The blocks generated during the batchInterval are 
>> partitions
>>of the RDD. Each partition is a task in spark. blockInterval==
>>batchinterval would mean that a single partition is created and probably 
>> it
>>is processed locally.
>>
>> The map tasks on the blocks are processed in the executors (one that
> received the block, and another where the block was replicated) that has
> the blocks irrespective of block interval, unless non-local scheduling
> kicks in (as you observed next).
>
>>
>>- Having bigger blockinterval means bigger blocks. A high value of
>>spark.locality.wait increases the chance of processing a block on the 
>> local
>>node. A balance needs to be found out between these two parameters to
>>ensure that the bigger blocks are processed locally.
>>- Instead of relying on batchInterval and blockInterval, you can
>>define the number of partitions by calling dstream.repartition(n). This
>>reshuffles the data in RDD randomly to create n number of partitions.
>>
>> Yes, for greater parallelism. Though comes at the cost of a shuffle.
>
>>
>>- An RDD's processing is scheduled by driver's jobscheduler as a job.
>>At a given point of time only one job is active. So, if one job is
>>executing the other jobs are queued.
>>
>>
>>- If you have two dstreams there will be two RDDs formed and there
>>will be two jobs created which will be scheduled one after the another.
>>
>>
>>- To avoid this, you can union two dstreams. This will ensure that a
>>single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
>>is then considered as a single job. However the partitioning of the RDDs 
>> is
>>not impacted.
>>
>> To further clarify, the jobs depend on the number of output operations
> (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
> output operations.
>
> dstream1.union(dstream2).foreachRDD { rdd => rdd.count() }// one Spark
> job per batch
>
> dstream1.union(dstream2).foreachRDD { rdd => { rdd.count() ; rdd.count() }
> }// TWO Spark jobs per batch
>
> dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd =>
> rdd.count }  // TWO Spark jobs per batch
>
>>
>>
>>
>
>>
>>-
>>- If the batch processing time is more than batchinterval then
>>obviously the receiver's memory will start filling up and will end up in
>>throwing exceptions (most probably BlockNotFoundException). Currently 
>> there
>>is  no way to pause the receiver.
>>
>> You can limit the rate of receiver using SparkConf config
> spark.streaming.receiver.maxRate
>
>>
>>-
>>- For being fully fault tolerant, spark streaming needs to enable
>>checkpointing. Checkpointing increases the batch processing time.
>>
>> Incomplete. There are two types of checkpointing - data and metadata.
> Only data checkpointing, needed by only some operations, increase batch
> processing time. Read -
>