Re: Stages with non-arithmetic numbering Timing metrics in event logs
That is not exactly correct -- that being said I'm not 100% on these details either so I'd appreciate you double checking and / or another dev confirming my description. Spark actually has more threads going then the numCores you specify. numCores is really used for how many threads are actively executing tasks. There are more threads for doing the fetching (the details of which I'm not that familiar with) -- that never cuts into the number of actively executing tasks for numCores. There isn't a 1-to-1 correspondence between one shuffle block and one task -- a shuffle-read task most likely needs to fetch many shuffle blocks, with some local and some remote (in most cases). So, from your lingo above, c is numCores, but c_1 is just an independent pool of threads. this obvious follow up question is, if you've actually more than numCores threads going at the same time, how come cpu usage is low? You could still have your cpus stuck waiting on i/o, from disk or network, so they aren't getting fully utilized. And the cpu can also be idle waiting for memory, if there are a lot of cache misses (I'm not sure how that will show up in cpu monitoring). If that were the case, that could even be from too many threads, as a lot of time is spent context switching ... but I'm just guessing now. hope this helps, Imran On Wed, Jun 10, 2015 at 1:41 PM, Mike Hynes 91m...@gmail.com wrote: Hi Imran, Thank you again for your email. I just want to ask one further question to clarify the implementation of the shuffle block fetches. When you say that rather than sitting idle, [the executor] will immediately start reading the local block, I would guess that, in implementation, the executor is going to launch concurrent threads to read both local and remote blocks, which it seems to do in the initialize() method of core/.../storage/ShuffleBlockFetcherIterator.scala. Is that the case or would the Executor run all local fetch threads first? The reason I ask is that if the slave machine on which the Executor is running has some number of cores, c, then I would have thought that some of the threads launched would occupy some number, c_1, of the cores and conduct the local reads (where c_1 = c). The other threads would occupy the other (c - c_1) cores' cycles until *all* necessary blocks have been read, and depending on c and the number of blocks to fetch so that none of the cores are idle if there are many blocks to fetch. (I monitor the CPU utilization of our nodes throughout a job, and generally find them under-utilized statistically speaking; that is, their usage over the whole job is lower than expected, with short burst of high usage, so I ask this question in a specific way for this reason, since I can see trends in the probability density functions of CPU utilization as the #partitions of our RDDs are increased). ShuffleBlockFetcherIterator.scala: private[this] def initialize(): Unit = { ... // Send out initial requests for blocks, up to our maxBytesInFlight while (fetchRequests.nonEmpty (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size = maxBytesInFlight)) { sendRequest(fetchRequests.dequeue()) } val numFetches = remoteRequests.size - fetchRequests.size logInfo(Started + numFetches + remote fetches in + Utils.getUsedTimeMs(startTime)) // Get Local Blocks fetchLocalBlocks() logDebug(Got local blocks in + Utils.getUsedTimeMs(startTime)) } private[this] def fetchLocalBlocks() { val iter = localBlocks.iterator while (iter.hasNext) { val blockId = iter.next() try { val buf = blockManager.getBlockData(blockId) shuffleMetrics.incLocalBlocksFetched(1) shuffleMetrics.incLocalBytesRead(buf.size) buf.retain() results.put(new SuccessFetchResult(blockId, 0, buf)) } catch { ... } } } Obviously, I will have to sit down with core/.../network/nio/* and core/.../shuffle/* and do my own homework on this, but from what I can tell, the BlockDataManager relies on either NioBlockTransferService.scala or the NettyBlockTransferService.scala (which are set in SparkEnv.scala), both of which do the grunt work of actually buffering and transferring the blocks' bytes. Finally, the tasks in new stage for which the shuffle outputs have been fetched will not commence until all of the block fetching threads (both local and remote) have terminated. Does the above paint an accurate picture? I would really appreciate clarification on the concurrency, since I would like to determine why our jobs have under-utilization and poor weak scaling efficiency. I will cc this thread over to the dev list. I did not cc them in case my previous question was trivial---I didn't want to spam the list unnecessarily, since I do not see these kinds of questions posed there frequently. Thanks a bunch,
Re: [ml] Why all model classes are final?
I was able to work around this problem in several cases using the class 'enhancement' or 'extension' pattern to add some functionality to the decision tree model data structures. - Original Message - Hi, previously all the models in ml package were private to package, so if i need to customize some models i inherit them in org.apache.spark.ml package in my project. But now new models (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala#L46) are final classes. So if i need to customize 1 line or so, i need to redefine the whole class. Any reasons to do so? As a developer,i understand all the risks of using Developer/Alpha API. That's why i'm using spark, because it provides a building blocks that i could easily customize and combine for my need. Thanks, Peter Rudenko - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Stages with non-arithmetic numbering Timing metrics in event logs
Here’s how the shuffle works. This explains what happens for a single task; this will happen in parallel for each task running on the machine, and as Imran said, Spark runs up to “numCores” tasks concurrently on each machine. There's also an answer to the original question about why CPU use is low at the very bottom. The key data structure used in fetching shuffle data is the “results” queue in ShuffleBlockFetcherIterator, which buffers data that we have in serialized (and maybe compressed) form, but haven’t yet deserialized / processed. The results queue is filled by many threads fetching data over the network (the number of concurrent threads fetching data is equal to the number of remote executors we’re currently fetching data from) [0], and is consumed by a single thread that deserializes the data and computes some function over it (e.g., if you’re doing rdd.count(), the thread deserializes the data and counts the number of items). As we fetch data over the network, we track bytesInFlight, which is data that has been requested (and possibly received) from a remote executor, but that hasn’t yet been deserialized / processed by the consumer thread. So, this includes all of the data in the “results” queue, and possibly more data that’s currently outstanding over the network. We always issue as many requests as we can, with the constraint that bytesInFlight remains less than a specified maximum [1]. In a little more detail, here’s exactly what happens when a task begins reading shuffled data: (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1] over the network (this happens here https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260 ). These requests are all executed asynchronously using a ShuffleClient [2] via the shuffleClient.fetchBlocks call https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149 [3]. We pass in a callback that, once a block has been successfully fetched, sticks it on the “results” queue. (2) Begin processing the local data. One by one, we request the local data from the local block manager (which memory maps the file) and then stick the result onto the results queue https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230. Because we memory map the files, which is speedy, the local data typically all ends up on the results in front of the remote data. (3) One the async network requests have been issued (note — issued, but not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and (2) have happened), ShuffleBlockFetcherIterator returns an iterator that gets wrapped too many times to count [4] and eventually gets unrolled [5]. Each time next() is called on the iterator, it blocks waiting for an item from the results queue. This may return right away, or if the queue is empty, will block waiting on new data from the network [6]. Before returning from next(), we update our accounting for the bytes in flight: the chunk of data we return is no longer considered in-flight, because it’s about to be processed, so we update the current bytesInFlight, and if it won’t result in maxBytesInFlight outstanding, send some more requests for data. Notes: [0] Note that these threads consume almost no CPU resources, because they just receive data from the OS and then execute a callback that sticks the data on the results queue. [1] We limit the data outstanding on the network to avoid using too much memory to hold the data we’ve fetched over the network but haven’t yet processed. [1.5] Each request may include multiple shuffle blocks, where is a block is the data output for this reduce task by a particular map task. All of the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks shuffle blocks; each reduce task reads # map tasks blocks. We do some hacks https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177 to try to size these requests in a good way: we limit each request to about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines concurrently without exceeding maxBytesInFlight. 5 is completely a magic number here that was probably guessed by someone long long ago, and it seems to work ok. [2] The default configuration uses NettyBlockTransferService as the ShuffleClient implementation (note that this extends BlockTransferService, which extends ShuffleClient). [3] If you’re curious how the shuffle client fetches data, the default Spark configuration results in exactly one TCP connection from an executor to each other executor. If executor A is getting shuffle data
Re: Stages with non-arithmetic numbering Timing metrics in event logs
Kay, Excellent write-up. This should be preserved for reference somewhere searchable. -Gerard. On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout k...@eecs.berkeley.edu wrote: Here’s how the shuffle works. This explains what happens for a single task; this will happen in parallel for each task running on the machine, and as Imran said, Spark runs up to “numCores” tasks concurrently on each machine. There's also an answer to the original question about why CPU use is low at the very bottom. The key data structure used in fetching shuffle data is the “results” queue in ShuffleBlockFetcherIterator, which buffers data that we have in serialized (and maybe compressed) form, but haven’t yet deserialized / processed. The results queue is filled by many threads fetching data over the network (the number of concurrent threads fetching data is equal to the number of remote executors we’re currently fetching data from) [0], and is consumed by a single thread that deserializes the data and computes some function over it (e.g., if you’re doing rdd.count(), the thread deserializes the data and counts the number of items). As we fetch data over the network, we track bytesInFlight, which is data that has been requested (and possibly received) from a remote executor, but that hasn’t yet been deserialized / processed by the consumer thread. So, this includes all of the data in the “results” queue, and possibly more data that’s currently outstanding over the network. We always issue as many requests as we can, with the constraint that bytesInFlight remains less than a specified maximum [1]. In a little more detail, here’s exactly what happens when a task begins reading shuffled data: (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1] over the network (this happens here https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260 ). These requests are all executed asynchronously using a ShuffleClient [2] via the shuffleClient.fetchBlocks call https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149 [3]. We pass in a callback that, once a block has been successfully fetched, sticks it on the “results” queue. (2) Begin processing the local data. One by one, we request the local data from the local block manager (which memory maps the file) and then stick the result onto the results queue https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230. Because we memory map the files, which is speedy, the local data typically all ends up on the results in front of the remote data. (3) One the async network requests have been issued (note — issued, but not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and (2) have happened), ShuffleBlockFetcherIterator returns an iterator that gets wrapped too many times to count [4] and eventually gets unrolled [5]. Each time next() is called on the iterator, it blocks waiting for an item from the results queue. This may return right away, or if the queue is empty, will block waiting on new data from the network [6]. Before returning from next(), we update our accounting for the bytes in flight: the chunk of data we return is no longer considered in-flight, because it’s about to be processed, so we update the current bytesInFlight, and if it won’t result in maxBytesInFlight outstanding, send some more requests for data. Notes: [0] Note that these threads consume almost no CPU resources, because they just receive data from the OS and then execute a callback that sticks the data on the results queue. [1] We limit the data outstanding on the network to avoid using too much memory to hold the data we’ve fetched over the network but haven’t yet processed. [1.5] Each request may include multiple shuffle blocks, where is a block is the data output for this reduce task by a particular map task. All of the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks shuffle blocks; each reduce task reads # map tasks blocks. We do some hacks https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177 to try to size these requests in a good way: we limit each request to about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines concurrently without exceeding maxBytesInFlight. 5 is completely a magic number here that was probably guessed by someone long long ago, and it seems to work ok. [2] The default configuration uses NettyBlockTransferService as the ShuffleClient implementation (note that this extends
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Hi Jerry, Take a look at this example: https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 The offsets are needed because as RDDs get generated within spark the offsets move further along. With direct Kafka mode the current offsets are no more persisted in Zookeeper but rather within Spark itself. If you want to be able to use zookeeper based monitoring tools to keep track of progress, then this is needed. In my specific case we need to persist Kafka offsets externally so that we can continue from where we left off after a code deployment. In other words, we need exactly-once processing guarantees across code deployments. Spark does not support any state persistence across deployments so this is something we need to handle on our own. Hope that helps. Let me know if not. Thanks! Amit On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com: Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
OK, I get it, I think currently Python based Kafka direct API do not provide such equivalence like Scala, maybe we should figure out to add this into Python API also. 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com: Hi Jerry, Take a look at this example: https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 The offsets are needed because as RDDs get generated within spark the offsets move further along. With direct Kafka mode the current offsets are no more persisted in Zookeeper but rather within Spark itself. If you want to be able to use zookeeper based monitoring tools to keep track of progress, then this is needed. In my specific case we need to persist Kafka offsets externally so that we can continue from where we left off after a code deployment. In other words, we need exactly-once processing guarantees across code deployments. Spark does not support any state persistence across deployments so this is something we need to handle on our own. Hope that helps. Let me know if not. Thanks! Amit On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com: Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit
Re: Contributing to pyspark
Hi, Thanks for your interest in PySpark. The first thing is to have a look at the how to contribute guide https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark and filter the JIRA's using the label PySpark. If you have your own improvement in mind, you can file your a JIRA, discuss and then send a Pull Request HTH Regards. On Fri, Jun 12, 2015 at 9:36 AM, Usman Ehtesham uehtesha...@gmail.com wrote: Hello, I am currently taking a course in Apache Spark via EdX ( https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x) and at the same time I try to look at the code for pySpark too. I wanted to ask, if ideally I would like to contribute to pyspark specifically, how can I do that? I do not intend to contribute to core Apache Spark any time soon (mainly because I do not know Scala) but I am very comfortable in Python. Any tips on how to contribute specifically to pyspark without being affected by other parts of Spark would be greatly appreciated. P.S.: I ask this because there is a small change/improvement I would like to propose. Also since I just started learning Spark, I would like to also read and understand the pyspark code as I learn about Spark. :) Hope to hear from you soon. Usman Ehtesham Gul https://github.com/ueg1990 -- Godspeed, Manoj Kumar, http://manojbits.wordpress.com http://goog_1017110195 http://github.com/MechCoder
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com: Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit
Re: How to support dependency jars and files on HDFS in standalone cluster mode?
Oh sorry, I mistook --jars for --files. Yeah, for jars we need to add them to classpath, which is different from regular files. Cheng On 6/11/15 2:18 PM, Dong Lei wrote: Thanks Cheng, If I do not use --jars how can I tell spark to search the jars(and files) on HDFS? Do you mean the driver will not need to setup a HTTP file server for this scenario and the worker will fetch the jars and files from HDFS? Thanks Dong Lei *From:*Cheng Lian [mailto:lian.cs@gmail.com] *Sent:* Thursday, June 11, 2015 12:50 PM *To:* Dong Lei; dev@spark.apache.org *Cc:* Dianfei (Keith) Han *Subject:* Re: How to support dependency jars and files on HDFS in standalone cluster mode? Since the jars are already on HDFS, you can access them directly in your Spark application without using --jars Cheng On 6/11/15 11:04 AM, Dong Lei wrote: Hi spark-dev: I can not use a hdfs location for the “--jars” or “--files” option when doing a spark-submit in a standalone cluster mode. For example: Spark-submit … --jars hdfs://ip/1.jar …. hdfs://ip/app.jar (standalone cluster mode) will not download 1.jar to driver’s http file server(but the app.jar will be downloaded to the driver’s dir). I figure out the reason spark not downloading the jars is that when doing sc.addJar to http file server, the function called is Files.copy which does not support a remote location. And I think if spark can download the jars and add them to http file server, the classpath is not correctly set, because the classpath contains remote location. So I’m trying to make it work and come up with two options, but neither of them seem to be elegant, and I want to hear your advices: Option 1: Modify HTTPFileServer.addFileToDir, let it recognize a “hdfs” prefix. This is not good because I think it breaks the scope of http file server. Option 2: Modify DriverRunner.downloadUserJar, let it download all the “--jars” and “--files” with the application jar. This sounds more reasonable that option 1 for downloading files. But this way I need to read the “spark.jars” and “spark.files” on downloadUserJar or DriverRunnder.start and replace it with a local path. How can I do that? Do you have a more elegant solution, or do we have a plan to support it in the furture? Thanks Dong Lei
RE: How to support dependency jars and files on HDFS in standalone cluster mode?
I think in standalone cluster mode, spark is supposed to do: 1. Download jars, files to driver 2. Set the driver’s class path 3. Driver setup a http file server to distribute these files 4. Worker download from driver and setup classpath Right? But somehow, the first step fails. Even if I can make the first step works(use option1), it seems that the classpath in driver is not correctly set. Thanks Dong Lei From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Thursday, June 11, 2015 2:32 PM To: Dong Lei Cc: Dianfei (Keith) Han; dev@spark.apache.org Subject: Re: How to support dependency jars and files on HDFS in standalone cluster mode? Oh sorry, I mistook --jars for --files. Yeah, for jars we need to add them to classpath, which is different from regular files. Cheng On 6/11/15 2:18 PM, Dong Lei wrote: Thanks Cheng, If I do not use --jars how can I tell spark to search the jars(and files) on HDFS? Do you mean the driver will not need to setup a HTTP file server for this scenario and the worker will fetch the jars and files from HDFS? Thanks Dong Lei From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Thursday, June 11, 2015 12:50 PM To: Dong Lei; dev@spark.apache.orgmailto:dev@spark.apache.org Cc: Dianfei (Keith) Han Subject: Re: How to support dependency jars and files on HDFS in standalone cluster mode? Since the jars are already on HDFS, you can access them directly in your Spark application without using --jars Cheng On 6/11/15 11:04 AM, Dong Lei wrote: Hi spark-dev: I can not use a hdfs location for the “--jars” or “--files” option when doing a spark-submit in a standalone cluster mode. For example: Spark-submit … --jars hdfs://ip/1.jar …. hdfs://ip/app.jar (standalone cluster mode) will not download 1.jar to driver’s http file server(but the app.jar will be downloaded to the driver’s dir). I figure out the reason spark not downloading the jars is that when doing sc.addJar to http file server, the function called is Files.copy which does not support a remote location. And I think if spark can download the jars and add them to http file server, the classpath is not correctly set, because the classpath contains remote location. So I’m trying to make it work and come up with two options, but neither of them seem to be elegant, and I want to hear your advices: Option 1: Modify HTTPFileServer.addFileToDir, let it recognize a “hdfs” prefix. This is not good because I think it breaks the scope of http file server. Option 2: Modify DriverRunner.downloadUserJar, let it download all the “--jars” and “--files” with the application jar. This sounds more reasonable that option 1 for downloading files. But this way I need to read the “spark.jars” and “spark.files” on downloadUserJar or DriverRunnder.start and replace it with a local path. How can I do that? Do you have a more elegant solution, or do we have a plan to support it in the furture? Thanks Dong Lei
RE: How to support dependency jars and files on HDFS in standalone cluster mode?
Thanks Cheng, If I do not use --jars how can I tell spark to search the jars(and files) on HDFS? Do you mean the driver will not need to setup a HTTP file server for this scenario and the worker will fetch the jars and files from HDFS? Thanks Dong Lei From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Thursday, June 11, 2015 12:50 PM To: Dong Lei; dev@spark.apache.org Cc: Dianfei (Keith) Han Subject: Re: How to support dependency jars and files on HDFS in standalone cluster mode? Since the jars are already on HDFS, you can access them directly in your Spark application without using --jars Cheng On 6/11/15 11:04 AM, Dong Lei wrote: Hi spark-dev: I can not use a hdfs location for the --jars or --files option when doing a spark-submit in a standalone cluster mode. For example: Spark-submit ... --jars hdfs://ip/1.jar hdfs://ip/app.jar (standalone cluster mode) will not download 1.jar to driver's http file server(but the app.jar will be downloaded to the driver's dir). I figure out the reason spark not downloading the jars is that when doing sc.addJar to http file server, the function called is Files.copy which does not support a remote location. And I think if spark can download the jars and add them to http file server, the classpath is not correctly set, because the classpath contains remote location. So I'm trying to make it work and come up with two options, but neither of them seem to be elegant, and I want to hear your advices: Option 1: Modify HTTPFileServer.addFileToDir, let it recognize a hdfs prefix. This is not good because I think it breaks the scope of http file server. Option 2: Modify DriverRunner.downloadUserJar, let it download all the --jars and --files with the application jar. This sounds more reasonable that option 1 for downloading files. But this way I need to read the spark.jars and spark.files on downloadUserJar or DriverRunnder.start and replace it with a local path. How can I do that? Do you have a more elegant solution, or do we have a plan to support it in the furture? Thanks Dong Lei
Re: Stages with non-arithmetic numbering Timing metrics in event logs
Good idea -- I've added this to the wiki: https://cwiki.apache.org/confluence/display/SPARK/Shuffle+Internals. Happy to stick it elsewhere if folks think there's a more convenient place. On Thu, Jun 11, 2015 at 4:46 PM, Gerard Maas gerard.m...@gmail.com wrote: Kay, Excellent write-up. This should be preserved for reference somewhere searchable. -Gerard. On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout k...@eecs.berkeley.edu wrote: Here’s how the shuffle works. This explains what happens for a single task; this will happen in parallel for each task running on the machine, and as Imran said, Spark runs up to “numCores” tasks concurrently on each machine. There's also an answer to the original question about why CPU use is low at the very bottom. The key data structure used in fetching shuffle data is the “results” queue in ShuffleBlockFetcherIterator, which buffers data that we have in serialized (and maybe compressed) form, but haven’t yet deserialized / processed. The results queue is filled by many threads fetching data over the network (the number of concurrent threads fetching data is equal to the number of remote executors we’re currently fetching data from) [0], and is consumed by a single thread that deserializes the data and computes some function over it (e.g., if you’re doing rdd.count(), the thread deserializes the data and counts the number of items). As we fetch data over the network, we track bytesInFlight, which is data that has been requested (and possibly received) from a remote executor, but that hasn’t yet been deserialized / processed by the consumer thread. So, this includes all of the data in the “results” queue, and possibly more data that’s currently outstanding over the network. We always issue as many requests as we can, with the constraint that bytesInFlight remains less than a specified maximum [1]. In a little more detail, here’s exactly what happens when a task begins reading shuffled data: (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1] over the network (this happens here https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260 ). These requests are all executed asynchronously using a ShuffleClient [2] via the shuffleClient.fetchBlocks call https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149 [3]. We pass in a callback that, once a block has been successfully fetched, sticks it on the “results” queue. (2) Begin processing the local data. One by one, we request the local data from the local block manager (which memory maps the file) and then stick the result onto the results queue https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230. Because we memory map the files, which is speedy, the local data typically all ends up on the results in front of the remote data. (3) One the async network requests have been issued (note — issued, but not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and (2) have happened), ShuffleBlockFetcherIterator returns an iterator that gets wrapped too many times to count [4] and eventually gets unrolled [5]. Each time next() is called on the iterator, it blocks waiting for an item from the results queue. This may return right away, or if the queue is empty, will block waiting on new data from the network [6]. Before returning from next(), we update our accounting for the bytes in flight: the chunk of data we return is no longer considered in-flight, because it’s about to be processed, so we update the current bytesInFlight, and if it won’t result in maxBytesInFlight outstanding, send some more requests for data. Notes: [0] Note that these threads consume almost no CPU resources, because they just receive data from the OS and then execute a callback that sticks the data on the results queue. [1] We limit the data outstanding on the network to avoid using too much memory to hold the data we’ve fetched over the network but haven’t yet processed. [1.5] Each request may include multiple shuffle blocks, where is a block is the data output for this reduce task by a particular map task. All of the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks shuffle blocks; each reduce task reads # map tasks blocks. We do some hacks https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177 to try to size these requests in a good way: we limit each request to about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines concurrently without exceeding
Re: When to expect UTF8String?
Through the DataFrame API, users should never see UTF8String. Expression (and any class in the catalyst package) is considered internal and so uses the internal representation of various types. Which type we use here is not stable across releases. Is there a reason you aren't defining a UDF instead? On Thu, Jun 11, 2015 at 8:08 PM, zsampson zsamp...@palantir.com wrote: I'm hoping for some clarity about when to expect String vs UTF8String when using the Java DataFrames API. In upgrading to Spark 1.4, I'm dealing with a lot of errors where what was once a String is now a UTF8String. The comments in the file and the related commit message indicate that maybe it should be internal to SparkSQL's implementation. However, when I add a column containing a custom subclass of Expression, the row passed to the eval method contains instances of UTF8String. Ditto for AggregateFunction.update. Is this expected? If so, when should I generally know to deal with UTF8String objects? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/When-to-expect-UTF8String-tp12710.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
When to expect UTF8String?
I'm hoping for some clarity about when to expect String vs UTF8String when using the Java DataFrames API. In upgrading to Spark 1.4, I'm dealing with a lot of errors where what was once a String is now a UTF8String. The comments in the file and the related commit message indicate that maybe it should be internal to SparkSQL's implementation. However, when I add a column containing a custom subclass of Expression, the row passed to the eval method contains instances of UTF8String. Ditto for AggregateFunction.update. Is this expected? If so, when should I generally know to deal with UTF8String objects? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/When-to-expect-UTF8String-tp12710.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Contributing to pyspark
Hello, I am currently taking a course in Apache Spark via EdX ( https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x) and at the same time I try to look at the code for pySpark too. I wanted to ask, if ideally I would like to contribute to pyspark specifically, how can I do that? I do not intend to contribute to core Apache Spark any time soon (mainly because I do not know Scala) but I am very comfortable in Python. Any tips on how to contribute specifically to pyspark without being affected by other parts of Spark would be greatly appreciated. P.S.: I ask this because there is a small change/improvement I would like to propose. Also since I just started learning Spark, I would like to also read and understand the pyspark code as I learn about Spark. :) Hope to hear from you soon. Usman Ehtesham Gul https://github.com/ueg1990
Re: [DISCUSS] Minimize use of MINOR, BUILD, and HOTFIX w/ no JIRA
+1, and i know i've been guilty of this in the past. :) On Wed, Jun 10, 2015 at 10:20 PM, Joseph Bradley jos...@databricks.com wrote: +1 On Sat, Jun 6, 2015 at 9:01 AM, Patrick Wendell pwend...@gmail.com wrote: Hey All, Just a request here - it would be great if people could create JIRA's for any and all merged pull requests. The reason is that when patches get reverted due to build breaks or other issues, it is very difficult to keep track of what is going on if there is no JIRA. Here is a list of 5 patches we had to revert recently that didn't include a JIRA: Revert [MINOR] [BUILD] Use custom temp directory during build. Revert [SQL] [TEST] [MINOR] Uses a temporary log4j.properties in HiveThriftServer2Test to ensure expected logging behavior Revert [BUILD] Always run SQL tests in master build. Revert [MINOR] [CORE] Warn users who try to cache RDDs with dynamic allocation on. Revert [HOT FIX] [YARN] Check whether `/lib` exists before listing its files The cost overhead of creating a JIRA relative to other aspects of development is very small. If it's *really* a documentation change or something small, that's okay. But anything affecting the build, packaging, etc. These all need to have a JIRA to ensure that follow-up can be well communicated to all Spark developers. Hopefully this is something everyone can get behind, but opened a discussion here in case others feel differently. - Patrick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[ANNOUNCE] Announcing Spark 1.4
Hi All, I'm happy to announce the availability of Spark 1.4.0! Spark 1.4.0 is the fifth release on the API-compatible 1.X line. It is Spark's largest release ever, with contributions from 210 developers and more than 1,000 commits! A huge thanks go to all of the individuals and organizations involved in development and testing of this release. Visit the release notes [1] to read about the new features, or download [2] the release today. For errata in the contributions or release notes, please e-mail me *directly* (not on-list). Thanks to everyone who helped work on this release! [1] http://spark.apache.org/releases/spark-release-1-4-0.html [2] http://spark.apache.org/downloads.html - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org