Re: [VOTE] Release Apache Spark 1.4.1
There are 44 issues still targeted for 1.4.1. None are Blockers; 12 are Critical. ~80% were opened and/or set by committers. Compare with 90 issues resolved for 1.4.1. I'm concerned that committers are targeting lots more for a release even in the short term than realistically can go in. On its face, it suggests that an RC is premature. Why is 1.4.1 being put forth for release now? It seems like people are saying they want a fair bit more time to work on 1.4.1. I suspect that in fact people would rather untarget / slip (again) these JIRAs, but it calls into question again how the targeting is consistently off by this much. What unresolved JIRAs targeted for 1.4.1 are *really* still open for 1.4.1? like, what would go badly if all 32 non-Critical JIRAs were untargeted now? is the reality that there are a handful of items to get in before the final release, and those are hopefully the ~12 critical ones? How about some review of that before we ask people to seriously test these bits? On Wed, Jun 24, 2015 at 8:37 AM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.1! This release fixes a handful of known issues in Spark 1.4.0, listed here: http://s.apache.org/spark-1.4.1 The tag to be voted on is v1.4.1-rc1 (commit 60e08e5): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h= 60e08e50751fe3929156de956d62faea79f5b801 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.1-rc1-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.1] https://repository.apache.org/content/repositories/orgapachespark-1118/ [published as version: 1.4.1-rc1] https://repository.apache.org/content/repositories/orgapachespark-1119/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.1-rc1-docs/ Please vote on releasing this package as Apache Spark 1.4.1! The vote is open until Saturday, June 27, at 06:32 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.1 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Python UDF performance at large scale
Fare points, I also like simpler solutions. The overhead of Python task could be a few of milliseconds, which means we also should eval them as batches (one Python task per batch). Decreasing the batch size for UDF sounds reasonable to me, together with other tricks to reduce the data in socket/pipe buffer. BTW, what do your UDF looks like? How about to use Jython to run simple Python UDF (without some external libraries). On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com wrote: // + punya Thanks for your quick response! I'm not sure that using an unbounded buffer is a good solution to the locking problem. For example, in the situation where I had 500 columns, I am in fact storing 499 extra columns on the java side, which might make me OOM if I have to store many rows. In addition, if I am using an AutoBatchedSerializer, the java side might have to write 1 16 == 65536 rows before python starts outputting elements, in which case, the Java side has to buffer 65536 complete rows. In general it seems fragile to rely on blocking behavior in the Python coprocess. By contrast, it's very easy to verify the correctness and performance characteristics of the synchronous blocking solution. On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com wrote: Thanks for looking into it, I'd like the idea of having ForkingIterator. If we have unlimited buffer in it, then will not have the problem of deadlock, I think. The writing thread will be blocked by Python process, so there will be not much rows be buffered(still be a reason to OOM). At least, this approach is better than current one. Could you create a JIRA and sending out the PR? On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com wrote: BLUF: BatchPythonEvaluation's implementation is unusable at large scale, but I have a proof-of-concept implementation that avoids caching the entire dataset. Hi, We have been running into performance problems using Python UDFs with DataFrames at large scale. From the implementation of BatchPythonEvaluation, it looks like the goal was to reuse the PythonRDD code. It caches the entire child RDD so that it can do two passes over the data. One to give to the PythonRDD, then one to join the python lambda results with the original row (which may have java objects that should be passed through). In addition, it caches all the columns, even the ones that don't need to be processed by the Python UDF. In the cases I was working with, I had a 500 column table, and i wanted to use a python UDF for one column, and it ended up caching all 500 columns. I have a working solution over here that does it in one pass over the data, avoiding caching (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b). With this patch, I go from a job that takes 20 minutes then OOMs, to a job that finishes completely in 3 minutes. It is indeed quite hacky and prone to deadlocks since there is buffering in many locations: - NEW: the ForkingIterator LinkedBlockingDeque - batching the rows before pickling them - os buffers on both sides - pyspark.serializers.BatchedSerializer We can avoid deadlock by being very disciplined. For example, we can have the ForkingIterator instead always do a check of whether the LinkedBlockingDeque is full and if so: Java - flush the java pickling buffer - send a flush command to the python process - os.flush the java side Python - flush BatchedSerializer - os.flush() I haven't added this yet. This is getting very complex however. Another model would just be to change the protocol between the java side and the worker to be a synchronous request/response. This has the disadvantage that the CPU isn't doing anything when the batch is being sent across, but it has the huge advantage of simplicity. In addition, I imagine that the actual IO between the processes isn't that slow, but rather the serialization of java objects into pickled bytes, and the deserialization/serialization + python loops on the python side. Another advantage is that we won't be taking more than 100% CPU since only one thread is doing CPU work at a time between the executor and the python interpreter. Any thoughts would be much appreciated =) Other improvements: - extract some code of the worker out of PythonRDD so that we can do a mapPartitions directly in BatchedPythonEvaluation without resorting to the hackery in ForkedRDD.compute(), which uses a cache to ensure that the other RDD can get a handle to the same iterator. - read elements and use a size estimator to create the BlockingQueue to make sure that we don't store too many things in memory when batching - patch Unpickler to not use StopException for control flow, which is slowing down
Re: [SparkSQL 1.4]Could not use concat with UDF in where clause
Hi Michael Armbrust, I have filed an issue on JIRA for this, https://issues.apache.org/jira/browse/SPARK-8588 https://issues.apache.org/jira/browse/SPARK-8588 -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SparkSQL-1-4-Could-not-use-concat-with-UDF-in-where-clause-tp12832p12848.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Python UDF performance at large scale
Hi Davies, In general, do we expect people to use CPython only for heavyweight UDFs that invoke an external library? Are there any examples of using Jython, especially performance comparisons to Java/Scala and CPython? When using Jython, do you expect the driver to send code to the executor as a string, or is there a good way to serialized Jython lambdas? (For context, I was unable to serialize Nashorn lambdas when I tried to use them in Spark.) Punya On Wed, Jun 24, 2015 at 2:26 AM Davies Liu dav...@databricks.com wrote: Fare points, I also like simpler solutions. The overhead of Python task could be a few of milliseconds, which means we also should eval them as batches (one Python task per batch). Decreasing the batch size for UDF sounds reasonable to me, together with other tricks to reduce the data in socket/pipe buffer. BTW, what do your UDF looks like? How about to use Jython to run simple Python UDF (without some external libraries). On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com wrote: // + punya Thanks for your quick response! I'm not sure that using an unbounded buffer is a good solution to the locking problem. For example, in the situation where I had 500 columns, I am in fact storing 499 extra columns on the java side, which might make me OOM if I have to store many rows. In addition, if I am using an AutoBatchedSerializer, the java side might have to write 1 16 == 65536 rows before python starts outputting elements, in which case, the Java side has to buffer 65536 complete rows. In general it seems fragile to rely on blocking behavior in the Python coprocess. By contrast, it's very easy to verify the correctness and performance characteristics of the synchronous blocking solution. On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com wrote: Thanks for looking into it, I'd like the idea of having ForkingIterator. If we have unlimited buffer in it, then will not have the problem of deadlock, I think. The writing thread will be blocked by Python process, so there will be not much rows be buffered(still be a reason to OOM). At least, this approach is better than current one. Could you create a JIRA and sending out the PR? On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com wrote: BLUF: BatchPythonEvaluation's implementation is unusable at large scale, but I have a proof-of-concept implementation that avoids caching the entire dataset. Hi, We have been running into performance problems using Python UDFs with DataFrames at large scale. From the implementation of BatchPythonEvaluation, it looks like the goal was to reuse the PythonRDD code. It caches the entire child RDD so that it can do two passes over the data. One to give to the PythonRDD, then one to join the python lambda results with the original row (which may have java objects that should be passed through). In addition, it caches all the columns, even the ones that don't need to be processed by the Python UDF. In the cases I was working with, I had a 500 column table, and i wanted to use a python UDF for one column, and it ended up caching all 500 columns. I have a working solution over here that does it in one pass over the data, avoiding caching ( https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b ). With this patch, I go from a job that takes 20 minutes then OOMs, to a job that finishes completely in 3 minutes. It is indeed quite hacky and prone to deadlocks since there is buffering in many locations: - NEW: the ForkingIterator LinkedBlockingDeque - batching the rows before pickling them - os buffers on both sides - pyspark.serializers.BatchedSerializer We can avoid deadlock by being very disciplined. For example, we can have the ForkingIterator instead always do a check of whether the LinkedBlockingDeque is full and if so: Java - flush the java pickling buffer - send a flush command to the python process - os.flush the java side Python - flush BatchedSerializer - os.flush() I haven't added this yet. This is getting very complex however. Another model would just be to change the protocol between the java side and the worker to be a synchronous request/response. This has the disadvantage that the CPU isn't doing anything when the batch is being sent across, but it has the huge advantage of simplicity. In addition, I imagine that the actual IO between the processes isn't that slow, but rather the serialization of java objects into pickled bytes, and the deserialization/serialization + python loops on the python side. Another advantage is that we won't be taking more than 100% CPU since only one thread is doing CPU work at a
Loss of data due to congestion
How spark guarantees that no RDD will fail /lost during its life cycle . Is there something like ask in storm or its does it by default . -- Thanks Regards, Anshu Shukla
Re: Python UDF performance at large scale
Correct, I was running with a batch size of about 100 when I did the tests, because I was worried about deadlocks. Do you have any concerns regarding the batched synchronous version of communication between the Java and Python processes, and if not, should I file a ticket and starting writing it? On Wed, Jun 24, 2015 at 7:27 PM Davies Liu dav...@databricks.com wrote: From you comment, the 2x improvement only happens when you have the batch size as 1, right? On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang justin.u...@gmail.com wrote: FYI, just submitted a PR to Pyrolite to remove their StopException. https://github.com/irmen/Pyrolite/pull/30 With my benchmark, removing it basically made it about 2x faster. On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal punya.bis...@gmail.com wrote: Hi Davies, In general, do we expect people to use CPython only for heavyweight UDFs that invoke an external library? Are there any examples of using Jython, especially performance comparisons to Java/Scala and CPython? When using Jython, do you expect the driver to send code to the executor as a string, or is there a good way to serialized Jython lambdas? (For context, I was unable to serialize Nashorn lambdas when I tried to use them in Spark.) Punya On Wed, Jun 24, 2015 at 2:26 AM Davies Liu dav...@databricks.com wrote: Fare points, I also like simpler solutions. The overhead of Python task could be a few of milliseconds, which means we also should eval them as batches (one Python task per batch). Decreasing the batch size for UDF sounds reasonable to me, together with other tricks to reduce the data in socket/pipe buffer. BTW, what do your UDF looks like? How about to use Jython to run simple Python UDF (without some external libraries). On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com wrote: // + punya Thanks for your quick response! I'm not sure that using an unbounded buffer is a good solution to the locking problem. For example, in the situation where I had 500 columns, I am in fact storing 499 extra columns on the java side, which might make me OOM if I have to store many rows. In addition, if I am using an AutoBatchedSerializer, the java side might have to write 1 16 == 65536 rows before python starts outputting elements, in which case, the Java side has to buffer 65536 complete rows. In general it seems fragile to rely on blocking behavior in the Python coprocess. By contrast, it's very easy to verify the correctness and performance characteristics of the synchronous blocking solution. On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com wrote: Thanks for looking into it, I'd like the idea of having ForkingIterator. If we have unlimited buffer in it, then will not have the problem of deadlock, I think. The writing thread will be blocked by Python process, so there will be not much rows be buffered(still be a reason to OOM). At least, this approach is better than current one. Could you create a JIRA and sending out the PR? On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com wrote: BLUF: BatchPythonEvaluation's implementation is unusable at large scale, but I have a proof-of-concept implementation that avoids caching the entire dataset. Hi, We have been running into performance problems using Python UDFs with DataFrames at large scale. From the implementation of BatchPythonEvaluation, it looks like the goal was to reuse the PythonRDD code. It caches the entire child RDD so that it can do two passes over the data. One to give to the PythonRDD, then one to join the python lambda results with the original row (which may have java objects that should be passed through). In addition, it caches all the columns, even the ones that don't need to be processed by the Python UDF. In the cases I was working with, I had a 500 column table, and i wanted to use a python UDF for one column, and it ended up caching all 500 columns. I have a working solution over here that does it in one pass over the data, avoiding caching ( https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b ). With this patch, I go from a job that takes 20 minutes then OOMs, to a job that finishes completely in 3 minutes. It is indeed quite hacky and prone to deadlocks since there is buffering in many locations: - NEW: the ForkingIterator LinkedBlockingDeque - batching the rows before pickling them - os buffers on both sides - pyspark.serializers.BatchedSerializer We can avoid deadlock by being very disciplined. For example, we can have
Re: Python UDF performance at large scale
From you comment, the 2x improvement only happens when you have the batch size as 1, right? On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang justin.u...@gmail.com wrote: FYI, just submitted a PR to Pyrolite to remove their StopException. https://github.com/irmen/Pyrolite/pull/30 With my benchmark, removing it basically made it about 2x faster. On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal punya.bis...@gmail.com wrote: Hi Davies, In general, do we expect people to use CPython only for heavyweight UDFs that invoke an external library? Are there any examples of using Jython, especially performance comparisons to Java/Scala and CPython? When using Jython, do you expect the driver to send code to the executor as a string, or is there a good way to serialized Jython lambdas? (For context, I was unable to serialize Nashorn lambdas when I tried to use them in Spark.) Punya On Wed, Jun 24, 2015 at 2:26 AM Davies Liu dav...@databricks.com wrote: Fare points, I also like simpler solutions. The overhead of Python task could be a few of milliseconds, which means we also should eval them as batches (one Python task per batch). Decreasing the batch size for UDF sounds reasonable to me, together with other tricks to reduce the data in socket/pipe buffer. BTW, what do your UDF looks like? How about to use Jython to run simple Python UDF (without some external libraries). On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com wrote: // + punya Thanks for your quick response! I'm not sure that using an unbounded buffer is a good solution to the locking problem. For example, in the situation where I had 500 columns, I am in fact storing 499 extra columns on the java side, which might make me OOM if I have to store many rows. In addition, if I am using an AutoBatchedSerializer, the java side might have to write 1 16 == 65536 rows before python starts outputting elements, in which case, the Java side has to buffer 65536 complete rows. In general it seems fragile to rely on blocking behavior in the Python coprocess. By contrast, it's very easy to verify the correctness and performance characteristics of the synchronous blocking solution. On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com wrote: Thanks for looking into it, I'd like the idea of having ForkingIterator. If we have unlimited buffer in it, then will not have the problem of deadlock, I think. The writing thread will be blocked by Python process, so there will be not much rows be buffered(still be a reason to OOM). At least, this approach is better than current one. Could you create a JIRA and sending out the PR? On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com wrote: BLUF: BatchPythonEvaluation's implementation is unusable at large scale, but I have a proof-of-concept implementation that avoids caching the entire dataset. Hi, We have been running into performance problems using Python UDFs with DataFrames at large scale. From the implementation of BatchPythonEvaluation, it looks like the goal was to reuse the PythonRDD code. It caches the entire child RDD so that it can do two passes over the data. One to give to the PythonRDD, then one to join the python lambda results with the original row (which may have java objects that should be passed through). In addition, it caches all the columns, even the ones that don't need to be processed by the Python UDF. In the cases I was working with, I had a 500 column table, and i wanted to use a python UDF for one column, and it ended up caching all 500 columns. I have a working solution over here that does it in one pass over the data, avoiding caching (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b). With this patch, I go from a job that takes 20 minutes then OOMs, to a job that finishes completely in 3 minutes. It is indeed quite hacky and prone to deadlocks since there is buffering in many locations: - NEW: the ForkingIterator LinkedBlockingDeque - batching the rows before pickling them - os buffers on both sides - pyspark.serializers.BatchedSerializer We can avoid deadlock by being very disciplined. For example, we can have the ForkingIterator instead always do a check of whether the LinkedBlockingDeque is full and if so: Java - flush the java pickling buffer - send a flush command to the python process - os.flush the java side Python - flush BatchedSerializer - os.flush() I haven't added this yet. This is getting very complex however. Another model would just be to change the protocol between the java side and the worker to be a synchronous request/response. This has the
parallelize method v.s. textFile method
We have a large file and we used to read chunks and then use parallelize method (distData = sc.parallelize(chunk)) and then do the map/reduce chunk by chunk. Recently we read the whole file using textFile method and found the map/reduce job is much faster. Anybody can help us to understand why? We have verified that reading file is NOT a bottleneck. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/parallelize-method-v-s-textFile-method-tp12871.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Problem with version compatibility
Hi Sean, I'm running a Mesos cluster. My driver app is built using maven against the maven 1.4.0 dependency. The Mesos slave machines have the spark distribution installed from the distribution link. I have a hard time understanding how this isn't a standard app deployment but maybe I'm missing something. If you build a driver app against 1.4.0 using maven and run it against a mesos cluster that has the 1.4.0 binary distribution installed, your driver wont run right. I meant to publish this question on the user list so my apologies if it's in the wrong place. Jim -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Problem-with-version-compatibility-tp12861p12876.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: parallelize method v.s. textFile method
If you read the file one by one and then use parallelize, it is read by a single thread on a single machine. On Wednesday, June 24, 2015, xing ehomec...@gmail.com wrote: We have a large file and we used to read chunks and then use parallelize method (distData = sc.parallelize(chunk)) and then do the map/reduce chunk by chunk. Recently we read the whole file using textFile method and found the map/reduce job is much faster. Anybody can help us to understand why? We have verified that reading file is NOT a bottleneck. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/parallelize-method-v-s-textFile-method-tp12871.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: dev-h...@spark.apache.org javascript:;
Re: parallelize method v.s. textFile method
When we compare the performance, we already excluded this part of time difference. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/parallelize-method-v-s-textFile-method-tp12871p12873.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Problem with version compatibility
They are different classes even. Your problem isn't class-not-found though. You're also comparing different builds really. You should not be including Spark code in your app. On Wed, Jun 24, 2015, 9:48 PM jimfcarroll jimfcarr...@gmail.com wrote: These jars are simply incompatible. You can see this by looking at that class in both the maven repo for 1.4.0 here: http://central.maven.org/maven2/org/apache/spark/spark-core_2.10/1.4.0/spark-core_2.10-1.4.0.jar as well as the spark-assembly jar inside the .tgz file you can get from the official download here: http://d3kbcqa49mib13.cloudfront.net/spark-1.4.0-bin-hadoop2.4.tgz Am I missing something? Thanks Jim -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Problem-with-version-compatibility-tp12861p12863.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [GraphX] Graph 500 graph generator
Hi Ryan, If you can get past the paperwork, I'm sure this can make a great Spark Package (http://spark-packages.org). People then can use it for benchmarking purposes, and I'm sure people will be looking for graph generators! Best, Burak On Wed, Jun 24, 2015 at 7:55 AM, Carr, J. Ryan ryan.c...@jhuapl.edu wrote: Hi Spark Devs, As part of a project at work, I have written a graph generator for RMAT graphs consistent with the specifications in the Graph 500 benchmark ( http://www.graph500.org/specifications). We had originally planned to use the rmatGenerator function in GraphGenerators, but found that it wasn’t suitable for generating graphs with billions of edges; the edges are generated in a single thread and stored in a Set, meaning it can’t generate a graph larger than memory on a single JVM (and I think Sets are limited to Int.MaxValue elements anyway). The generator I have is essentially a more scalable version of rmatGenerator. We have used it to generate a graph with 2^32 vertices and 2^36 edges on our modestly-specced cluster of 16 machines. It seems like other people interested in Spark might want to play with some large RMAT graphs (or run the Graph 500 benchmark), so I would like to contribute my generator. It does have some minor differences from the current generator, though: 1. Vertex IDs are shuffled after the graph structure is generated, so the degree of a vertex cannot be predicted from its ID (without this step vertex 0 would always have the largest degree, followed by vertices 1,2,4,8, etc.). This is per the Graph 500 spec. It could be easily made optional. 2. Duplicate edges are not removed from the resulting graph. This could easily be done with a call to distinct() on the resulting edge list, but then there would be slightly fewer edges than one generated by the current rmatGenerator. Also this process would be very slow on large graphs due to skew. 3. Doesn’t set the out degree as the vertex attribute. Again this would be simple to add, but it could be slow on the super vertices. My question for the Spark Devs is: Is this something you would want as part of GraphX (either as a replacement for the current rmatGenerator or a separate function in GraphGenerators)? Since it was developed at work I need to go through our legal department and QA processes to open-source it, and to fill out the paperwork I need to know whether I’ll be submitting a pull request or standing it up as a separate project on GitHub. Thanks! -Ryan -- J. Ryan Carr, Ph. D. The Johns Hopkins University, Applied Physics Laboratory 11100 Johns Hopkins Rd., Laurel, MD 20723 Office: 240-228-9157 Cell: 443-744-1004 Email: *ryan.c...@jhuapl.edu ryan.c...@jhuapl.edu* or *james.c...@jhuapl.edu james.c...@jhuapl.edu*
Re: OK to add committers active on JIRA to JIRA admin role?
+1 (partially b/c I would like jira admin myself) On Tue, Jun 23, 2015 at 3:47 AM, Sean Owen so...@cloudera.com wrote: There are some committers who are active on JIRA and sometimes need to do things that require JIRA admin access -- in particular thinking of adding a new person as Contributor in order to assign them a JIRA. We can't change what roles can do what (think that INFRA ticket is dead) but can add to the Admin role. Would anyone object to making a few more committers JIRA Admins for this purpose? - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: how can I write a language wrapper?
The SparkR code is in the `R` directory i.e. https://github.com/apache/spark/tree/master/R Shivaram On Wed, Jun 24, 2015 at 8:45 AM, Vasili I. Galchin vigalc...@gmail.com wrote: Matei, Last night I downloaded the Spark bundle. In order to save me time, can you give me the name of the SparkR example is and where it is in the Sparc tree? Thanks, Bill On Tuesday, June 23, 2015, Matei Zaharia matei.zaha...@gmail.com wrote: Just FYI, it would be easiest to follow SparkR's example and add the DataFrame API first. Other APIs will be designed to work on DataFrames (most notably machine learning pipelines), and the surface of this API is much smaller than of the RDD API. This API will also give you great performance as we continue to optimize Spark SQL. Matei On Jun 23, 2015, at 1:46 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: Every language has its own quirks / features -- so I don't think there exists a document on how to go about doing this for a new language. The most related write up I know of is the wiki page on PySpark internals https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals written by Josh Rosen -- It covers some of the issues like closure capture, serialization, JVM communication that you'll need to handle for a new language. Thanks Shivaram On Tue, Jun 23, 2015 at 1:35 PM, Vasili I. Galchin vigalc...@gmail.com wrote: Hello, I want to add language support for another language(other than Scala, Java et. al.). Where is documentation that explains to provide support for a new language? Thank you, Vasili
[GraphX] Graph 500 graph generator
Hi Spark Devs, As part of a project at work, I have written a graph generator for RMAT graphs consistent with the specifications in the Graph 500 benchmark (http://www.graph500.org/specifications). We had originally planned to use the rmatGenerator function in GraphGenerators, but found that it wasn't suitable for generating graphs with billions of edges; the edges are generated in a single thread and stored in a Set, meaning it can't generate a graph larger than memory on a single JVM (and I think Sets are limited to Int.MaxValue elements anyway). The generator I have is essentially a more scalable version of rmatGenerator. We have used it to generate a graph with 2^32 vertices and 2^36 edges on our modestly-specced cluster of 16 machines. It seems like other people interested in Spark might want to play with some large RMAT graphs (or run the Graph 500 benchmark), so I would like to contribute my generator. It does have some minor differences from the current generator, though: 1. Vertex IDs are shuffled after the graph structure is generated, so the degree of a vertex cannot be predicted from its ID (without this step vertex 0 would always have the largest degree, followed by vertices 1,2,4,8, etc.). This is per the Graph 500 spec. It could be easily made optional. 2. Duplicate edges are not removed from the resulting graph. This could easily be done with a call to distinct() on the resulting edge list, but then there would be slightly fewer edges than one generated by the current rmatGenerator. Also this process would be very slow on large graphs due to skew. 3. Doesn't set the out degree as the vertex attribute. Again this would be simple to add, but it could be slow on the super vertices. My question for the Spark Devs is: Is this something you would want as part of GraphX (either as a replacement for the current rmatGenerator or a separate function in GraphGenerators)? Since it was developed at work I need to go through our legal department and QA processes to open-source it, and to fill out the paperwork I need to know whether I'll be submitting a pull request or standing it up as a separate project on GitHub. Thanks! -Ryan -- J. Ryan Carr, Ph. D. The Johns Hopkins University, Applied Physics Laboratory 11100 Johns Hopkins Rd., Laurel, MD 20723 Office: 240-228-9157 Cell: 443-744-1004 Email: ryan.c...@jhuapl.edumailto:ryan.c...@jhuapl.edu or james.c...@jhuapl.edumailto:james.c...@jhuapl.edu
Re: how can I write a language wrapper?
Matei, Last night I downloaded the Spark bundle. In order to save me time, can you give me the name of the SparkR example is and where it is in the Sparc tree? Thanks, Bill On Tuesday, June 23, 2015, Matei Zaharia matei.zaha...@gmail.com wrote: Just FYI, it would be easiest to follow SparkR's example and add the DataFrame API first. Other APIs will be designed to work on DataFrames (most notably machine learning pipelines), and the surface of this API is much smaller than of the RDD API. This API will also give you great performance as we continue to optimize Spark SQL. Matei On Jun 23, 2015, at 1:46 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu javascript:_e(%7B%7D,'cvml','shiva...@eecs.berkeley.edu'); wrote: Every language has its own quirks / features -- so I don't think there exists a document on how to go about doing this for a new language. The most related write up I know of is the wiki page on PySpark internals https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals written by Josh Rosen -- It covers some of the issues like closure capture, serialization, JVM communication that you'll need to handle for a new language. Thanks Shivaram On Tue, Jun 23, 2015 at 1:35 PM, Vasili I. Galchin vigalc...@gmail.com javascript:_e(%7B%7D,'cvml','vigalc...@gmail.com'); wrote: Hello, I want to add language support for another language(other than Scala, Java et. al.). Where is documentation that explains to provide support for a new language? Thank you, Vasili
Re: Loss of data due to congestion
Thaks, I am talking about streaming. On 25 Jun 2015 5:37 am, ayan guha guha.a...@gmail.com wrote: Can you elaborate little more? Are you talking about receiver or streaming? On 24 Jun 2015 23:18, anshu shukla anshushuk...@gmail.com wrote: How spark guarantees that no RDD will fail /lost during its life cycle . Is there something like ask in storm or its does it by default . -- Thanks Regards, Anshu Shukla
Error in invoking a custom StandaloneRecoveryModeFactory in java env (Spark v1.3.0)
Hi all, I'm trying to implement a custom StandaloneRecoveryModeFactory in the Java environment. Pls find the implementation here. [1] . I'm new to Scala, hence I'm trying to use Java environment as much as possible. when I start a master with spark.deploy.recoveryMode.factory property to be CUSTOM, I encounter a NoSuchMethodException for my custom class's constructor. it has the following constructor. public AnalyticsStandaloneRecoveryModeFactory(SparkConf conf, Serialization serializer) but from the Master, it looks for a constructor for, org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf, akka.serialization.Serialization$) I see in the Spark source code for Master, that it uses reflection to get the custom recovery mode factory class. case CUSTOM = val clazz = Class.forName(conf.get(spark.deploy.recoveryMode.factory)) val factory = clazz.getConstructor(conf.getClass, Serialization.getClass) .newInstance(conf, SerializationExtension(context.system)) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) here, Serialization.getClass returns a akka.serialization.Serialization$ object, where as my custom class's constructor accepts akka.serialization.Serialization object. so I would like to know, 1. if this happens because I'm using this in the Java environment? 2. what is the workaround to this? thanks Please find the full stack trace of the error below. [2015-06-25 10:59:01,095] ERROR {akka.actor.OneForOneStrategy} - org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf, akka.serialization.Serialization$) akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NoSuchMethodException: org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf, akka.serialization.Serialization$) at java.lang.Class.getConstructor0(Class.java:2810) at java.lang.Class.getConstructor(Class.java:1718) at org.apache.spark.deploy.master.Master.preStart(Master.scala:165) at akka.actor.Actor$class.aroundPreStart(Actor.scala:470) at org.apache.spark.deploy.master.Master.aroundPreStart(Master.scala:52) at akka.actor.ActorCell.create(ActorCell.scala:580) ... 9 more [1] https://github.com/nirandaperera/carbon-analytics/blob/spark_master_persistance/components/analytics-processors/org.wso2.carbon.analytics.spark.core/src/main/java/org/wso2/carbon/analytics/spark/core/util/master/AnalyticsStandaloneRecoveryModeFactory.java https://github.com/nirandaperera/carbon-analytics/blob/spark_master_persistance/components/analytics-processors/org.wso2.carbon.analytics.spark.core/src/main/java/org/wso2/carbon/analytics/spark/core/util/master/AnalyticsStandaloneRecoveryModeFactory.java -- Niranda @n1r44 https://twitter.com/N1R44 https://pythagoreanscript.wordpress.com/
Spark SQL 1.3 Exception
Hi, I have Impala created table with the following io format and serde: inputFormat:parquet.hive.DeprecatedParquetInputFormat, outputFormat:parquet.hive.DeprecatedParquetOutputFormat, serdeInfo:SerDeInfo(name:null, serializationLib:parquet.hive.serde.ParquetHiveSerDe, parameters:{}) I am trying to read this table on Spark SQL 1.3 and see if caching improves my query latency but I am getting exception: java.lang.ClassNotFoundException: Class parquet.hive.serde.ParquetHiveSerDe not found I understand that in hive 0.13 (which I am using) parquet.hive.serde.ParquetHiveSerDe is deprecated but it seems Impala still used it to write the table. I also tried to provide the bundle jar with --jars option to Spark 1.3 Shell / SQL which has org.apache.parquet.hive.serde.ParquetHiveSerDe but I am confused how to configure to serde in SQLContext ? The table which has the following io format and serde can be read fine by Spark SQL 1.3: inputFormat=org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, outputFormat=org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, serializationLib=org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe Thanks. Deb On Sat, Jun 20, 2015 at 12:21 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I have some impala created parquet tables which hive 0.13.2 can read fine. Now the same table when I want to read using Spark SQL 1.3 I am getting exception class exception that parquet.hive.serde.ParquetHiveSerde not found. I am assuming that hive somewhere is putting the parquet-hive-bundle.jar in hive classpath but I tried putting the parquet-hive-bundle.jar in spark-1.3/conf/hive-site.xml through auxillary jar but even that did not work. Any input on fixing this will be really helpful. Thanks. Deb
Re: [VOTE] Release Apache Spark 1.4.1
Hey Sean, This is being shipped now because there is a severe bug in 1.4.0 that can cause data corruption for Parquet users. There are no blockers targeted for 1.4.1 - so I don't see that JIRA is inconsistent with shipping a release now. The goal of having every single targeted JIRA cleared by the time we start voting, I don't think there is broad consensus and cultural adoption of that principle yet. So I do not take it as a signal this release is premature (the story has been the same for every previous release we've ever done). The fact that we hit 90/124 of issues targeted at this release means we are targeting such that we get around 70% of issues merged. That actually doesn't seem so bad to me since there is some uncertainty in the process. B - Patrick On Wed, Jun 24, 2015 at 1:54 AM, Sean Owen so...@cloudera.com wrote: There are 44 issues still targeted for 1.4.1. None are Blockers; 12 are Critical. ~80% were opened and/or set by committers. Compare with 90 issues resolved for 1.4.1. I'm concerned that committers are targeting lots more for a release even in the short term than realistically can go in. On its face, it suggests that an RC is premature. Why is 1.4.1 being put forth for release now? It seems like people are saying they want a fair bit more time to work on 1.4.1. I suspect that in fact people would rather untarget / slip (again) these JIRAs, but it calls into question again how the targeting is consistently off by this much. What unresolved JIRAs targeted for 1.4.1 are *really* still open for 1.4.1? like, what would go badly if all 32 non-Critical JIRAs were untargeted now? is the reality that there are a handful of items to get in before the final release, and those are hopefully the ~12 critical ones? How about some review of that before we ask people to seriously test these bits? On Wed, Jun 24, 2015 at 8:37 AM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.1! This release fixes a handful of known issues in Spark 1.4.0, listed here: http://s.apache.org/spark-1.4.1 The tag to be voted on is v1.4.1-rc1 (commit 60e08e5): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h= 60e08e50751fe3929156de956d62faea79f5b801 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.1-rc1-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.1] https://repository.apache.org/content/repositories/orgapachespark-1118/ [published as version: 1.4.1-rc1] https://repository.apache.org/content/repositories/orgapachespark-1119/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.1-rc1-docs/ Please vote on releasing this package as Apache Spark 1.4.1! The vote is open until Saturday, June 27, at 06:32 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.1 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Problem with version compatibility
Hello all, I have a strange problem. I have a mesos spark cluster with Spark 1.4.0/Hadoop 2.4.0 installed and a client application use maven to include the same versions. However, I'm getting a serialUIDVersion problem on: ERROR Remoting - org.apache.spark.storage.BlockManagerMessages$RegisterBlockManager; local class incompatible: stream classdesc serialVersionUID = 3833981923223309323, local class serialVersionUID = -1833407448843930116 When I look in the jar file of the spark dependency in my maven repo I see: spark-core_2.10-1.4.0.jar contains the line: 2917 10-Jun-2015 12:20:48 org/apache/spark/storage/BlockManagerMessages$RegisterBlockManager$.class However, on my mesos cluster the jar looks like this: spark-assembly-1.4.0-hadoop2.4.0.jar contains the line: 3786 2-Jun-2015 18:23:00 org/apache/spark/storage/BlockManagerMessages$RegisterBlockManager.class Notice the classes aren't the same (different sizes), but I'm getting them both from hosted repositories. One is from maven central and the other is from the download page which points to here: http://d3kbcqa49mib13.cloudfront.net/spark-1.4.0-bin-hadoop2.4.tgz Thanks Jim -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Problem-with-version-compatibility-tp12861.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Force inner join to shuffle the smallest table
Have you tried shuffle compression? spark.shuffle.compress (true|false) if you have a filesystem capable also I’ve noticed file consolidation helps disk usage a bit. spark.shuffle.consolidateFiles (true|false) Steve On Jun 24, 2015, at 3:27 PM, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: It also fails, as I mentioned in the original question. From: CC GP [mailto:chandrika.gopalakris...@gmail.com] Sent: Wednesday, June 24, 2015 12:08 PM To: Ulanov, Alexander Cc: dev@spark.apache.orgmailto:dev@spark.apache.org Subject: Re: Force inner join to shuffle the smallest table Try below and see if it makes a difference: val result = sqlContext.sql(“select big.f1, big.f2 from small inner join big on big.s=small.s and big.d=small.d”) On Wed, Jun 24, 2015 at 11:35 AM, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: Hi, I try to inner join of two tables on two fields(string and double). One table is 2B rows, the second is 500K. They are stored in HDFS in Parquet. Spark v 1.4. val big = sqlContext.paquetFile(“hdfs://big”) data.registerTempTable(“big”) val small = sqlContext.paquetFile(“hdfs://small”) data.registerTempTable(“small”) val result = sqlContext.sql(“select big.f1, big.f2 from big inner join small on big.s=small.s and big.d=small.d”) This query fails in the middle due to one of the workers “disk out of space” with shuffle reported 1.8TB which is the maximum size of my spark working dirs (on total 7 worker nodes). This is surprising, because the “big” table takes 2TB disk space (unreplicated) and “small” about 5GB and I would expect that optimizer will shuffle the small table. How to force Spark to shuffle the small table? I tried to write “small inner join big” however it also fails with 1.8TB of shuffle. Best regards, Alexander This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment.
Re: Problem with version compatibility
These jars are simply incompatible. You can see this by looking at that class in both the maven repo for 1.4.0 here: http://central.maven.org/maven2/org/apache/spark/spark-core_2.10/1.4.0/spark-core_2.10-1.4.0.jar as well as the spark-assembly jar inside the .tgz file you can get from the official download here: http://d3kbcqa49mib13.cloudfront.net/spark-1.4.0-bin-hadoop2.4.tgz Am I missing something? Thanks Jim -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Problem-with-version-compatibility-tp12861p12863.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org