Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-11 Thread Imran Rashid
That is not exactly correct -- that being said I'm not 100% on these
details either so I'd appreciate you double checking  and / or another dev
confirming my description.


Spark actually has more threads going then the numCores you specify.
numCores is really used for how many threads are actively executing
tasks.  There are more threads for doing the fetching (the details of which
I'm not that familiar with) -- that never cuts into the number of actively
executing tasks for numCores.  There isn't a 1-to-1 correspondence
between one shuffle block and one task -- a shuffle-read task most likely
needs to fetch many shuffle blocks, with some local and some remote (in
most cases).  So, from your lingo above, c is numCores, but c_1 is just an
independent pool of threads.

this obvious follow up question is, if you've actually more than numCores
threads going at the same time, how come cpu usage is low?  You could still
have your cpus stuck waiting on i/o, from disk or network, so they aren't
getting fully utilized.  And the cpu can also be idle waiting for memory,
if there are a lot of cache misses (I'm not sure how that will show up in
cpu monitoring).  If that were the case, that could even be from too many
threads, as a lot of time is spent context switching ... but I'm just
guessing now.

hope this helps,
Imran



On Wed, Jun 10, 2015 at 1:41 PM, Mike Hynes 91m...@gmail.com wrote:

 Hi Imran,

 Thank you again for your email.

 I just want to ask one further question to clarify the implementation
 of the shuffle block fetches. When you say that rather than sitting
 idle, [the executor] will immediately start reading the local block, I
 would guess that, in implementation, the executor is going to launch
 concurrent threads to read both local and remote blocks, which it
 seems to do in the initialize() method of
 core/.../storage/ShuffleBlockFetcherIterator.scala. Is that the case
 or would the Executor run all local fetch threads first?

 The reason I ask is that if the slave machine on which the Executor is
 running has some number of cores, c, then I  would have thought that
 some of the threads launched would occupy some number, c_1, of the
 cores and conduct the local reads (where c_1 = c). The other threads
 would occupy the other (c - c_1) cores' cycles until *all* necessary
 blocks have been read, and depending on c and the number of blocks to
 fetch so that none of the cores are idle if there are many blocks to
 fetch. (I monitor the CPU utilization of our nodes throughout a job,
 and generally find them under-utilized statistically speaking; that
 is, their usage over the whole job is lower than expected, with short
 burst of high usage, so I ask this question in a specific way for this
 reason, since I can see trends in the probability density functions of
 CPU utilization as the #partitions of our RDDs are increased).

 ShuffleBlockFetcherIterator.scala:

   private[this] def initialize(): Unit = {
 ...
 // Send out initial requests for blocks, up to our maxBytesInFlight
 while (fetchRequests.nonEmpty 
   (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size
 = maxBytesInFlight)) {
   sendRequest(fetchRequests.dequeue())
 }
 val numFetches = remoteRequests.size - fetchRequests.size
 logInfo(Started  + numFetches +  remote fetches in +
 Utils.getUsedTimeMs(startTime))

 // Get Local Blocks
 fetchLocalBlocks()
 logDebug(Got local blocks in  + Utils.getUsedTimeMs(startTime))
   }
   private[this] def fetchLocalBlocks() {
 val iter = localBlocks.iterator
 while (iter.hasNext) {
   val blockId = iter.next()
   try {
 val buf = blockManager.getBlockData(blockId)
 shuffleMetrics.incLocalBlocksFetched(1)
 shuffleMetrics.incLocalBytesRead(buf.size)
 buf.retain()
 results.put(new SuccessFetchResult(blockId, 0, buf))
   } catch {
 ...
   }
 }
   }

 Obviously, I will have to sit down with core/.../network/nio/* and
 core/.../shuffle/* and do my own homework on this, but from what I can
 tell, the BlockDataManager relies on either
 NioBlockTransferService.scala or the NettyBlockTransferService.scala
 (which are set in SparkEnv.scala), both of which do the grunt work of
 actually buffering and transferring the blocks' bytes. Finally, the
 tasks in new stage for which the shuffle outputs have been fetched
 will not commence until all of the block fetching threads (both local
 and remote) have terminated.

 Does the above paint an accurate picture? I would really appreciate
 clarification on the concurrency, since I would like to determine why
 our jobs have under-utilization and poor weak scaling efficiency.

 I will cc this thread over to the dev list. I did not cc them in case
 my previous question was trivial---I didn't want to spam the list
 unnecessarily, since I do not see these kinds of questions posed there
 frequently.

 Thanks a bunch,

Re: [ml] Why all model classes are final?

2015-06-11 Thread Erik Erlandson
I was able to work around this problem in several cases using the class 
'enhancement' or 'extension' pattern to add some functionality to the decision 
tree model data structures.


- Original Message -
 Hi, previously all the models in ml package were private to package, so
 if i need to customize some models i inherit them in org.apache.spark.ml
 package in my project. But now new models
 (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala#L46)
 are final classes. So if i need to customize 1 line or so, i need to
 redefine the whole class. Any reasons to do so? As a developer,i
 understand all the risks of using Developer/Alpha API. That's why i'm
 using spark, because it provides a building blocks that i could easily
 customize and combine for my need.
 
 Thanks,
 Peter Rudenko
 
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org
 
 

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-11 Thread Kay Ousterhout
Here’s how the shuffle works.  This explains what happens for a single
task; this will happen in parallel for each task running on the machine,
and as Imran said, Spark runs up to “numCores” tasks concurrently on each
machine.  There's also an answer to the original question about why CPU use
is low at the very bottom.

The key data structure used in fetching shuffle data is the “results” queue
in ShuffleBlockFetcherIterator, which buffers data that we have in
serialized (and maybe compressed) form, but haven’t yet deserialized /
processed.  The results queue is filled by many threads fetching data over
the network (the number of concurrent threads fetching data is equal to the
number of remote executors we’re currently fetching data from) [0], and is
consumed by a single thread that deserializes the data and computes some
function over it (e.g., if you’re doing rdd.count(), the thread
deserializes the data and counts the number of items).  As we fetch data
over the network, we track bytesInFlight, which is data that has been
requested (and possibly received) from a remote executor, but that hasn’t
yet been deserialized / processed by the consumer thread.  So, this
includes all of the data in the “results” queue, and possibly more data
that’s currently outstanding over the network.  We always issue as many
requests as we can, with the constraint that bytesInFlight remains less
than a specified maximum [1].

In a little more detail, here’s exactly what happens when a task begins
reading shuffled data:

(1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1]
over the network (this happens here
https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260
).

These requests are all executed asynchronously using a ShuffleClient [2]
via the shuffleClient.fetchBlocks call
https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149
 [3].  We pass in a callback that, once a block has been successfully
fetched, sticks it on the “results” queue.

(2) Begin processing the local data.  One by one, we request the local data
from the local block manager (which memory maps the file) and then stick
the result onto the results queue
https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230.
Because we memory map the files, which is speedy, the local data typically
all ends up on the results in front of the remote data.

(3) One the async network requests have been issued (note — issued, but not
finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and
(2) have happened), ShuffleBlockFetcherIterator returns an iterator that
gets wrapped too many times to count [4] and eventually gets unrolled [5].
Each time next() is called on the iterator, it blocks waiting for an item
from the results queue.  This may return right away, or if the queue is
empty, will block waiting on new data from the network [6].  Before
returning from next(), we update our accounting for the bytes in flight:
the chunk of data we return is no longer considered in-flight, because it’s
about to be processed, so we update the current bytesInFlight, and if it
won’t result in  maxBytesInFlight outstanding, send some more requests for
data.



Notes:

[0] Note that these threads consume almost no CPU resources, because they
just receive data from the OS and then execute a callback that sticks the
data on the results queue.

[1] We limit the data outstanding on the network to avoid using too much
memory to hold the data we’ve fetched over the network but haven’t yet
processed.

[1.5] Each request may include multiple shuffle blocks, where is a block
is the data output for this reduce task by a particular map task.  All of
the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks
shuffle blocks; each reduce task reads # map tasks blocks.  We do some hacks
https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177
to try to size these requests in a good way: we limit each request to
about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines
concurrently without exceeding maxBytesInFlight.  5 is completely a magic
number here that was probably guessed by someone long long ago, and it
seems to work ok.

[2] The default configuration uses NettyBlockTransferService as the
ShuffleClient implementation (note that this extends BlockTransferService,
which extends ShuffleClient).

[3] If you’re curious how the shuffle client fetches data, the default
Spark configuration results in exactly one TCP connection from an executor
to each other executor.  If executor A is getting shuffle data 

Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-11 Thread Gerard Maas
Kay,

Excellent write-up. This should be preserved for reference somewhere
searchable.

-Gerard.



On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout k...@eecs.berkeley.edu
wrote:

 Here’s how the shuffle works.  This explains what happens for a single
 task; this will happen in parallel for each task running on the machine,
 and as Imran said, Spark runs up to “numCores” tasks concurrently on each
 machine.  There's also an answer to the original question about why CPU use
 is low at the very bottom.

 The key data structure used in fetching shuffle data is the “results”
 queue in ShuffleBlockFetcherIterator, which buffers data that we have in
 serialized (and maybe compressed) form, but haven’t yet deserialized /
 processed.  The results queue is filled by many threads fetching data over
 the network (the number of concurrent threads fetching data is equal to the
 number of remote executors we’re currently fetching data from) [0], and is
 consumed by a single thread that deserializes the data and computes some
 function over it (e.g., if you’re doing rdd.count(), the thread
 deserializes the data and counts the number of items).  As we fetch data
 over the network, we track bytesInFlight, which is data that has been
 requested (and possibly received) from a remote executor, but that hasn’t
 yet been deserialized / processed by the consumer thread.  So, this
 includes all of the data in the “results” queue, and possibly more data
 that’s currently outstanding over the network.  We always issue as many
 requests as we can, with the constraint that bytesInFlight remains less
 than a specified maximum [1].

 In a little more detail, here’s exactly what happens when a task begins
 reading shuffled data:

 (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1]
 over the network (this happens here
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260
 ).

 These requests are all executed asynchronously using a ShuffleClient [2]
 via the shuffleClient.fetchBlocks call
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149
  [3].  We pass in a callback that, once a block has been successfully
 fetched, sticks it on the “results” queue.

 (2) Begin processing the local data.  One by one, we request the local
 data from the local block manager (which memory maps the file) and then stick
 the result onto the results queue
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230.
 Because we memory map the files, which is speedy, the local data typically
 all ends up on the results in front of the remote data.

 (3) One the async network requests have been issued (note — issued, but
 not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1)
 and (2) have happened), ShuffleBlockFetcherIterator returns an iterator
 that gets wrapped too many times to count [4] and eventually gets unrolled
 [5].  Each time next() is called on the iterator, it blocks waiting for an
 item from the results queue.  This may return right away, or if the queue
 is empty, will block waiting on new data from the network [6].  Before
 returning from next(), we update our accounting for the bytes in flight:
 the chunk of data we return is no longer considered in-flight, because it’s
 about to be processed, so we update the current bytesInFlight, and if it
 won’t result in  maxBytesInFlight outstanding, send some more requests for
 data.

 

 Notes:

 [0] Note that these threads consume almost no CPU resources, because they
 just receive data from the OS and then execute a callback that sticks the
 data on the results queue.

 [1] We limit the data outstanding on the network to avoid using too much
 memory to hold the data we’ve fetched over the network but haven’t yet
 processed.

 [1.5] Each request may include multiple shuffle blocks, where is a block
 is the data output for this reduce task by a particular map task.  All of
 the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks
 shuffle blocks; each reduce task reads # map tasks blocks.  We do some
 hacks
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177
 to try to size these requests in a good way: we limit each request to
 about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines
 concurrently without exceeding maxBytesInFlight.  5 is completely a magic
 number here that was probably guessed by someone long long ago, and it
 seems to work ok.

 [2] The default configuration uses NettyBlockTransferService as the
 ShuffleClient implementation (note that this extends 

Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-11 Thread Amit Ramesh
Hi Jerry,

Take a look at this example:
https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

The offsets are needed because as RDDs get generated within spark the
offsets move further along. With direct Kafka mode the current offsets are
no more persisted in Zookeeper but rather within Spark itself. If you want
to be able to use zookeeper based monitoring tools to keep track of
progress, then this is needed.

In my specific case we need to persist Kafka offsets externally so that we
can continue from where we left off after a code deployment. In other
words, we need exactly-once processing guarantees across code deployments.
Spark does not support any state persistence across deployments so this is
something we need to handle on our own.

Hope that helps. Let me know if not.

Thanks!
Amit


On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't
 been able to figure out how to get the offsets from the RDD. Looks like the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit





Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-11 Thread Saisai Shao
OK, I get it, I think currently Python based Kafka direct API do not
provide such equivalence like Scala, maybe we should figure out to add this
into Python API also.

2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets are
 no more persisted in Zookeeper but rather within Spark itself. If you want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so that we
 can continue from where we left off after a code deployment. In other
 words, we need exactly-once processing guarantees across code deployments.
 Spark does not support any state persistence across deployments so this is
 something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't
 been able to figure out how to get the offsets from the RDD. Looks like the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit






Re: Contributing to pyspark

2015-06-11 Thread Manoj Kumar
Hi,

Thanks for your interest in PySpark.

The first thing is to have a look at the how to contribute guide
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark and
filter the JIRA's using the label PySpark.

If you have your own improvement in mind, you can file your a JIRA, discuss
and then send a Pull Request

HTH

Regards.

On Fri, Jun 12, 2015 at 9:36 AM, Usman Ehtesham uehtesha...@gmail.com
wrote:

 Hello,

 I am currently taking a course in Apache Spark via EdX (
 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x)
 and at the same time I try to look at the code for pySpark too. I wanted to
 ask, if ideally I would like to contribute to pyspark specifically, how can
 I do that? I do not intend to contribute to core Apache Spark any time soon
 (mainly because I do not know Scala) but I am very comfortable in Python.

 Any tips on how to contribute specifically to pyspark without being
 affected by other parts of Spark would be greatly appreciated.

 P.S.: I ask this because there is a small change/improvement I would like
 to propose. Also since I just started learning Spark, I would like to also
 read and understand the pyspark code as I learn about Spark. :)

 Hope to hear from you soon.

 Usman Ehtesham Gul
 https://github.com/ueg1990




-- 
Godspeed,
Manoj Kumar,
http://manojbits.wordpress.com
http://goog_1017110195
http://github.com/MechCoder


Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-11 Thread Saisai Shao
Hi,

What is your meaning of getting the offsets from the RDD, from my
understanding, the offsetRange is a parameter you offered to KafkaRDD, why
do you still want to get the one previous you set into?

Thanks
Jerry

2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't been
 able to figure out how to get the offsets from the RDD. Looks like the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I
 am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit




Re: How to support dependency jars and files on HDFS in standalone cluster mode?

2015-06-11 Thread Cheng Lian
Oh sorry, I mistook --jars for --files. Yeah, for jars we need to add 
them to classpath, which is different from regular files.


Cheng

On 6/11/15 2:18 PM, Dong Lei wrote:


Thanks Cheng,

If I do not use --jars how can I tell spark to search the jars(and 
files) on HDFS?


Do you mean the driver will not need to setup a HTTP file server for 
this scenario and the worker will fetch the jars and files from HDFS?


Thanks

Dong Lei

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Thursday, June 11, 2015 12:50 PM
*To:* Dong Lei; dev@spark.apache.org
*Cc:* Dianfei (Keith) Han
*Subject:* Re: How to support dependency jars and files on HDFS in 
standalone cluster mode?


Since the jars are already on HDFS, you can access them directly in 
your Spark application without using --jars


Cheng

On 6/11/15 11:04 AM, Dong Lei wrote:

Hi spark-dev:

I can not use a hdfs location for the “--jars” or “--files” option
when doing a spark-submit in a standalone cluster mode. For example:

Spark-submit  …   --jars hdfs://ip/1.jar  ….
 hdfs://ip/app.jar (standalone cluster mode)

will not download 1.jar to driver’s http file server(but the
app.jar will be downloaded to the driver’s dir).

I figure out the reason spark not downloading the jars is that
when doing sc.addJar to http file server, the function called is
Files.copy which does not support a remote location.

And I think if spark can download the jars and add them to http
file server, the classpath is not correctly set, because the
classpath contains remote location.

So I’m trying to make it work and come up with two options, but
neither of them seem to be elegant, and I want to hear your advices:

Option 1:

Modify HTTPFileServer.addFileToDir, let it recognize a “hdfs” prefix.

This is not good because I think it breaks the scope of http file
server.

Option 2:

Modify DriverRunner.downloadUserJar, let it download all the
“--jars” and “--files” with the application jar.

This sounds more reasonable that option 1 for downloading files.
But this way I need to read the “spark.jars” and “spark.files” on
downloadUserJar or DriverRunnder.start and replace it with a local
path. How can I do that?

Do you have a more elegant solution, or do we have a plan to
support it in the furture?

Thanks

Dong Lei





RE: How to support dependency jars and files on HDFS in standalone cluster mode?

2015-06-11 Thread Dong Lei
I think in standalone cluster mode, spark is supposed to do:

1.   Download jars, files to driver

2.   Set the driver’s class path

3.   Driver setup a http file server to distribute these files

4.   Worker download from driver and setup classpath

Right?

But somehow, the first step fails.
Even if I can make the first step works(use option1), it seems that the 
classpath in driver is not correctly set.

Thanks
Dong Lei

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Thursday, June 11, 2015 2:32 PM
To: Dong Lei
Cc: Dianfei (Keith) Han; dev@spark.apache.org
Subject: Re: How to support dependency jars and files on HDFS in standalone 
cluster mode?

Oh sorry, I mistook --jars for --files. Yeah, for jars we need to add them to 
classpath, which is different from regular files.

Cheng
On 6/11/15 2:18 PM, Dong Lei wrote:
Thanks Cheng,

If I do not use --jars how can I tell spark to search the jars(and files) on 
HDFS?

Do you mean the driver will not need to setup a HTTP file server for this 
scenario and the worker will fetch the jars and files from HDFS?

Thanks
Dong Lei

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Thursday, June 11, 2015 12:50 PM
To: Dong Lei; dev@spark.apache.orgmailto:dev@spark.apache.org
Cc: Dianfei (Keith) Han
Subject: Re: How to support dependency jars and files on HDFS in standalone 
cluster mode?

Since the jars are already on HDFS, you can access them directly in your Spark 
application without using --jars

Cheng
On 6/11/15 11:04 AM, Dong Lei wrote:
Hi spark-dev:

I can not use a hdfs location for the “--jars” or “--files” option when doing a 
spark-submit in a standalone cluster mode. For example:
Spark-submit  …   --jars hdfs://ip/1.jar  ….  hdfs://ip/app.jar 
(standalone cluster mode)
will not download 1.jar to driver’s http file server(but the app.jar will be 
downloaded to the driver’s dir).

I figure out the reason spark not downloading the jars is that when doing 
sc.addJar to http file server, the function called is Files.copy which does not 
support a remote location.
And I think if spark can download the jars and add them to http file server, 
the classpath is not correctly set, because the classpath contains remote 
location.

So I’m trying to make it work and come up with two options, but neither of them 
seem to be elegant, and I want to hear your advices:

Option 1:
Modify HTTPFileServer.addFileToDir, let it recognize a “hdfs” prefix.

This is not good because I think it breaks the scope of http file server.

Option 2:
Modify DriverRunner.downloadUserJar, let it download all the “--jars” and 
“--files” with the application jar.

This sounds more reasonable that option 1 for downloading files. But this way I 
need to read the “spark.jars” and “spark.files” on downloadUserJar or 
DriverRunnder.start and replace it with a local path. How can I do that?


Do you have a more elegant solution, or do we have a plan to support it in the 
furture?

Thanks
Dong Lei




RE: How to support dependency jars and files on HDFS in standalone cluster mode?

2015-06-11 Thread Dong Lei
Thanks Cheng,

If I do not use --jars how can I tell spark to search the jars(and files) on 
HDFS?

Do you mean the driver will not need to setup a HTTP file server for this 
scenario and the worker will fetch the jars and files from HDFS?

Thanks
Dong Lei

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Thursday, June 11, 2015 12:50 PM
To: Dong Lei; dev@spark.apache.org
Cc: Dianfei (Keith) Han
Subject: Re: How to support dependency jars and files on HDFS in standalone 
cluster mode?

Since the jars are already on HDFS, you can access them directly in your Spark 
application without using --jars

Cheng
On 6/11/15 11:04 AM, Dong Lei wrote:
Hi spark-dev:

I can not use a hdfs location for the --jars or --files option when doing a 
spark-submit in a standalone cluster mode. For example:
Spark-submit  ...   --jars hdfs://ip/1.jar    
hdfs://ip/app.jar (standalone cluster mode)
will not download 1.jar to driver's http file server(but the app.jar will be 
downloaded to the driver's dir).

I figure out the reason spark not downloading the jars is that when doing 
sc.addJar to http file server, the function called is Files.copy which does not 
support a remote location.
And I think if spark can download the jars and add them to http file server, 
the classpath is not correctly set, because the classpath contains remote 
location.

So I'm trying to make it work and come up with two options, but neither of them 
seem to be elegant, and I want to hear your advices:

Option 1:
Modify HTTPFileServer.addFileToDir, let it recognize a hdfs prefix.

This is not good because I think it breaks the scope of http file server.

Option 2:
Modify DriverRunner.downloadUserJar, let it download all the --jars and 
--files with the application jar.

This sounds more reasonable that option 1 for downloading files. But this way I 
need to read the spark.jars and spark.files on downloadUserJar or 
DriverRunnder.start and replace it with a local path. How can I do that?


Do you have a more elegant solution, or do we have a plan to support it in the 
furture?

Thanks
Dong Lei



Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-11 Thread Kay Ousterhout
Good idea -- I've added this to the wiki:
https://cwiki.apache.org/confluence/display/SPARK/Shuffle+Internals.  Happy
to stick it elsewhere if folks think there's a more convenient place.

On Thu, Jun 11, 2015 at 4:46 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Kay,

 Excellent write-up. This should be preserved for reference somewhere
 searchable.

 -Gerard.



 On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout k...@eecs.berkeley.edu
 wrote:

 Here’s how the shuffle works.  This explains what happens for a single
 task; this will happen in parallel for each task running on the machine,
 and as Imran said, Spark runs up to “numCores” tasks concurrently on each
 machine.  There's also an answer to the original question about why CPU use
 is low at the very bottom.

 The key data structure used in fetching shuffle data is the “results”
 queue in ShuffleBlockFetcherIterator, which buffers data that we have in
 serialized (and maybe compressed) form, but haven’t yet deserialized /
 processed.  The results queue is filled by many threads fetching data over
 the network (the number of concurrent threads fetching data is equal to the
 number of remote executors we’re currently fetching data from) [0], and is
 consumed by a single thread that deserializes the data and computes some
 function over it (e.g., if you’re doing rdd.count(), the thread
 deserializes the data and counts the number of items).  As we fetch data
 over the network, we track bytesInFlight, which is data that has been
 requested (and possibly received) from a remote executor, but that hasn’t
 yet been deserialized / processed by the consumer thread.  So, this
 includes all of the data in the “results” queue, and possibly more data
 that’s currently outstanding over the network.  We always issue as many
 requests as we can, with the constraint that bytesInFlight remains less
 than a specified maximum [1].

 In a little more detail, here’s exactly what happens when a task begins
 reading shuffled data:

 (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data
 [1] over the network (this happens here
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260
 ).

 These requests are all executed asynchronously using a ShuffleClient [2]
 via the shuffleClient.fetchBlocks call
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149
  [3].  We pass in a callback that, once a block has been successfully
 fetched, sticks it on the “results” queue.

 (2) Begin processing the local data.  One by one, we request the local
 data from the local block manager (which memory maps the file) and then stick
 the result onto the results queue
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230.
 Because we memory map the files, which is speedy, the local data typically
 all ends up on the results in front of the remote data.

 (3) One the async network requests have been issued (note — issued, but
 not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1)
 and (2) have happened), ShuffleBlockFetcherIterator returns an iterator
 that gets wrapped too many times to count [4] and eventually gets unrolled
 [5].  Each time next() is called on the iterator, it blocks waiting for an
 item from the results queue.  This may return right away, or if the queue
 is empty, will block waiting on new data from the network [6].  Before
 returning from next(), we update our accounting for the bytes in flight:
 the chunk of data we return is no longer considered in-flight, because it’s
 about to be processed, so we update the current bytesInFlight, and if it
 won’t result in  maxBytesInFlight outstanding, send some more requests for
 data.

 

 Notes:

 [0] Note that these threads consume almost no CPU resources, because they
 just receive data from the OS and then execute a callback that sticks the
 data on the results queue.

 [1] We limit the data outstanding on the network to avoid using too much
 memory to hold the data we’ve fetched over the network but haven’t yet
 processed.

 [1.5] Each request may include multiple shuffle blocks, where is a
 block is the data output for this reduce task by a particular map task.
 All of the reduce tasks for a shuffle read a total of # map tasks * #
 reduce tasks shuffle blocks; each reduce task reads # map tasks blocks.  We
 do some hacks
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177
 to try to size these requests in a good way: we limit each request to
 about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines
 concurrently without exceeding 

Re: When to expect UTF8String?

2015-06-11 Thread Michael Armbrust
Through the DataFrame API, users should never see UTF8String.

Expression (and any class in the catalyst package) is considered internal
and so uses the internal representation of various types.  Which type we
use here is not stable across releases.

Is there a reason you aren't defining a UDF instead?

On Thu, Jun 11, 2015 at 8:08 PM, zsampson zsamp...@palantir.com wrote:

 I'm hoping for some clarity about when to expect String vs UTF8String when
 using the Java DataFrames API.

 In upgrading to Spark 1.4, I'm dealing with a lot of errors where what was
 once a String is now a UTF8String. The comments in the file and the related
 commit message indicate that maybe it should be internal to SparkSQL's
 implementation.

 However, when I add a column containing a custom subclass of Expression,
 the
 row passed to the eval method contains instances of UTF8String. Ditto for
 AggregateFunction.update. Is this expected? If so, when should I generally
 know to deal with UTF8String objects?



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/When-to-expect-UTF8String-tp12710.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




When to expect UTF8String?

2015-06-11 Thread zsampson
I'm hoping for some clarity about when to expect String vs UTF8String when
using the Java DataFrames API.

In upgrading to Spark 1.4, I'm dealing with a lot of errors where what was
once a String is now a UTF8String. The comments in the file and the related
commit message indicate that maybe it should be internal to SparkSQL's
implementation. 

However, when I add a column containing a custom subclass of Expression, the
row passed to the eval method contains instances of UTF8String. Ditto for
AggregateFunction.update. Is this expected? If so, when should I generally
know to deal with UTF8String objects?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/When-to-expect-UTF8String-tp12710.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Contributing to pyspark

2015-06-11 Thread Usman Ehtesham
Hello,

I am currently taking a course in Apache Spark via EdX (
https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x)
and at the same time I try to look at the code for pySpark too. I wanted to
ask, if ideally I would like to contribute to pyspark specifically, how can
I do that? I do not intend to contribute to core Apache Spark any time soon
(mainly because I do not know Scala) but I am very comfortable in Python.

Any tips on how to contribute specifically to pyspark without being
affected by other parts of Spark would be greatly appreciated.

P.S.: I ask this because there is a small change/improvement I would like
to propose. Also since I just started learning Spark, I would like to also
read and understand the pyspark code as I learn about Spark. :)

Hope to hear from you soon.

Usman Ehtesham Gul
https://github.com/ueg1990


Re: [DISCUSS] Minimize use of MINOR, BUILD, and HOTFIX w/ no JIRA

2015-06-11 Thread shane knapp
+1, and i know i've been guilty of this in the past.  :)

On Wed, Jun 10, 2015 at 10:20 PM, Joseph Bradley jos...@databricks.com
wrote:

 +1

 On Sat, Jun 6, 2015 at 9:01 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 Hey All,

 Just a request here - it would be great if people could create JIRA's
 for any and all merged pull requests. The reason is that when patches
 get reverted due to build breaks or other issues, it is very difficult
 to keep track of what is going on if there is no JIRA. Here is a list
 of 5 patches we had to revert recently that didn't include a JIRA:

 Revert [MINOR] [BUILD] Use custom temp directory during build.
 Revert [SQL] [TEST] [MINOR] Uses a temporary log4j.properties in
 HiveThriftServer2Test to ensure expected logging behavior
 Revert [BUILD] Always run SQL tests in master build.
 Revert [MINOR] [CORE] Warn users who try to cache RDDs with
 dynamic allocation on.
 Revert [HOT FIX] [YARN] Check whether `/lib` exists before
 listing its files

 The cost overhead of creating a JIRA relative to other aspects of
 development is very small. If it's *really* a documentation change or
 something small, that's okay.

 But anything affecting the build, packaging, etc. These all need to
 have a JIRA to ensure that follow-up can be well communicated to all
 Spark developers.

 Hopefully this is something everyone can get behind, but opened a
 discussion here in case others feel differently.

 - Patrick

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org





[ANNOUNCE] Announcing Spark 1.4

2015-06-11 Thread Patrick Wendell
Hi All,

I'm happy to announce the availability of Spark 1.4.0! Spark 1.4.0 is
the fifth release on the API-compatible 1.X line. It is Spark's
largest release ever, with contributions from 210 developers and more
than 1,000 commits!

A huge thanks go to all of the individuals and organizations involved
in development and testing of this release.

Visit the release notes [1] to read about the new features, or
download [2] the release today.

For errata in the contributions or release notes, please e-mail me
*directly* (not on-list).

Thanks to everyone who helped work on this release!

[1] http://spark.apache.org/releases/spark-release-1-4-0.html
[2] http://spark.apache.org/downloads.html

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org