Re: Package Release Annoucement: Spark SQL on HBase Astro
That's awesome Yan. I was considering Phoenix for SQL calls to HBase since Cassandra supports CQL but HBase QL support was lacking. I will get back to you as I start using it on our loads. I am assuming the latencies won't be much different from accessing HBase through tsdb asynchbase as that's one more option I am looking into. On Mon, Jul 27, 2015 at 10:12 PM, Yan Zhou.sc yan.zhou...@huawei.com wrote: HBase in this case is no different from any other Spark SQL data sources, so yes you should be able to access HBase data through Astro from Spark SQL’s JDBC interface. Graphically, the access path is as follows: Spark SQL JDBC Interface - Spark SQL Parser/Analyzer/Optimizer-Astro Optimizer- HBase Scans/Gets - … - HBase Region server Regards, Yan *From:* Debasish Das [mailto:debasish.da...@gmail.com] *Sent:* Monday, July 27, 2015 10:02 PM *To:* Yan Zhou.sc *Cc:* Bing Xiao (Bing); dev; user *Subject:* RE: Package Release Annoucement: Spark SQL on HBase Astro Hi Yan, Is it possible to access the hbase table through spark sql jdbc layer ? Thanks. Deb On Jul 22, 2015 9:03 PM, Yan Zhou.sc yan.zhou...@huawei.com wrote: Yes, but not all SQL-standard insert variants . *From:* Debasish Das [mailto:debasish.da...@gmail.com] *Sent:* Wednesday, July 22, 2015 7:36 PM *To:* Bing Xiao (Bing) *Cc:* user; dev; Yan Zhou.sc *Subject:* Re: Package Release Annoucement: Spark SQL on HBase Astro Does it also support insert operations ? On Jul 22, 2015 4:53 PM, Bing Xiao (Bing) bing.x...@huawei.com wrote: We are happy to announce the availability of the Spark SQL on HBase 1.0.0 release. http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase The main features in this package, dubbed “Astro”, include: · Systematic and powerful handling of data pruning and intelligent scan, based on partial evaluation technique · HBase pushdown capabilities like custom filters and coprocessor to support ultra low latency processing · SQL, Data Frame support · More SQL capabilities made possible (Secondary index, bloom filter, Primary Key, Bulk load, Update) · Joins with data from other sources · Python/Java/Scala support · Support latest Spark 1.4.0 release The tests by Huawei team and community contributors covered the areas: bulk load; projection pruning; partition pruning; partial evaluation; code generation; coprocessor; customer filtering; DML; complex filtering on keys and non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. We will post the test results including performance tests the middle of August. You are very welcomed to try out or deploy the package, and help improve the integration tests with various combinations of the settings, extensive Data Frame tests, complex join/union test and extensive performance tests. Please use the “Issues” “Pull Requests” links at this package homepage, if you want to report bugs, improvement or feature requests. Special thanks to project owner and technical leader Yan Zhou, Huawei global team, community contributors and Databricks. Databricks has been providing great assistance from the design to the release. “Astro”, the Spark SQL on HBase package will be useful for ultra low latency* query and analytics of large scale data sets in vertical enterprises**.* We will continue to work with the community to develop new features and improve code base. Your comments and suggestions are greatly appreciated. Yan Zhou / Bing Xiao Huawei Big Data team
Re: ReceiverStream SPARK not able to cope up with 20,000 events /sec .
You need to find the bottleneck here, it could your network (if the data is huge) or your producer code isn't pushing at 20k/s, If you are able to produce at 20k/s then make sure you are able to receive at that rate (try it without spark). Thanks Best Regards On Sat, Jul 25, 2015 at 3:29 PM, anshu shukla anshushuk...@gmail.com wrote: My eventGen is emitting 20,000 events/sec ,and I am using store(s1) in receive() method to push data to receiverStream . But this logic is working fine for upto 4000 events/sec and no batch are seen emitting for larger rate . *CODE:TOPOLOGY -* *JavaDStreamString sourcestream = ssc.receiverStream(new TetcCustomEventReceiver(datafilename,spoutlog,argumentClass.getScalingFactor(),datasetType));* *CODE:TetcCustomEventReceiver -* public void receive(ListString event) { StringBuffer tuple=new StringBuffer(); msgId++; for(String s:event) { tuple.append(s).append(,); } String s1=MsgIdAddandRemove.addMessageId(tuple.toString(),msgId); store(s1); } -- Thanks Regards, Anshu Shukla
RE: Two joins in GraphX Pregel implementation
I’ve found two PRs (almost identical) for replacing mapReduceTriplets with aggregateMessages: https://github.com/apache/spark/pull/3782 https://github.com/apache/spark/pull/3883 First is closed by Dave’s suggestion, second is stale. Also there is a PR for the new Pregel API, which is also closed. Do you know the reason why this improvement is not pushed? CC’ing Dave From: Robin East [mailto:robin.e...@xense.co.uk] Sent: Monday, July 27, 2015 9:11 AM To: Ulanov, Alexander Cc: dev@spark.apache.org Subject: Re: Two joins in GraphX Pregel implementation Quite possibly - there is a JIRA open for replacing mapReduceTriplets with aggregateMessages (don’t recall the number off the top of my head) Robin On 27 Jul 2015, at 17:01, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: Thank you, your explanation does make sense to me. Do you think that one join will work if `mapReduceTriplets` is replaced by the new `aggregateMessages`? The latter does not return the vertices that did not receive a message. From: Robin East [mailto:robin.e...@xense.co.uk] Sent: Monday, July 27, 2015 8:56 AM To: Ulanov, Alexander Cc: dev@spark.apache.orgmailto:dev@spark.apache.org Subject: Re: Two joins in GraphX Pregel implementation What happens to this line of code: messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() Part of the Pregel ‘contract’ is that vertices that don’t receive messages from the previous superstep don’t get to send messages this superstep. Not sure if there is a test for that but there ought to be. Robin On 27 Jul 2015, at 16:42, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: Dear Spark developers, Below is the GraphX Pregel code snippet from https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api: (it does not contain caching step): while (activeMessages 0 i maxIterations) { // Receive the messages: --- // (1st join) Run the vertex program on all vertices that receive messages val newVerts = g.vertices.innerJoin(messages)(vprog).cache() // (2nd join) Merge the new vertex values back into the graph g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) = newOpt.getOrElse(old) }.cache() // Send Messages: -- // Vertices that didn't receive a message above don't appear in newVerts and therefore don't // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked // on edges in the activeDir of vertices in newVerts messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() activeMessages = messages.count() i += 1 } It seems that the mentioned two joins can be rewritten as one outer join as follows: g = g.outerJoinVertices(messages) { (vid, old, mess) = mess match { case Some(mess) = vprog(vid, old, mess) case None = old } } This code passes PregelSuite (after removing newVerts). Could you elaborate why two joins are used instead of one and why do you need intermediate variable `newVerts`? Are there some performance considerations? Best regards, Alexander
Re: DataFrame#rdd doesn't respect DataFrame#cache, slowing down CrossValidator
Can you add your description of the problem as a comment to that ticket and we'll make sure to test both cases and break it out if the root cause ends up being different. On Tue, Jul 28, 2015 at 2:48 PM, Justin Uang justin.u...@gmail.com wrote: Sweet! Does this cover DataFrame#rdd also using the cached query from DataFrame#cache? I think the ticket 9141 is mainly concerned with whether a derived DataFrame (B) of a cached DataFrame (A) uses the cached query of A, not whether the rdd from A.rdd or B.rdd uses the cached query of A. On Tue, Jul 28, 2015 at 11:33 PM Joseph Bradley jos...@databricks.com wrote: Thanks for bringing this up! I talked with Michael Armbrust, and it sounds like this is a from a bug in DataFrame caching: https://issues.apache.org/jira/browse/SPARK-9141 It's marked as a blocker for 1.5. Joseph On Tue, Jul 28, 2015 at 2:36 AM, Justin Uang justin.u...@gmail.com wrote: Hey guys, I'm running into some pretty bad performance issues when it comes to using a CrossValidator, because of caching behavior of DataFrames. The root of the problem is that while I have cached my DataFrame representing the features and labels, it is caching at the DataFrame level, while CrossValidator/LogisticRegression both drop down to the dataset.rdd level, which ignores the caching that I have previously done. This is worsened by the fact that for each combination of a fold and a param set from the grid, it recomputes my entire input dataset because the caching was lost. My current solution is to force the input DataFrame to be based off of a cached RDD, which I did with this horrible hack (had to drop down to java from the pyspark because of something to do with vectors not be inferred correctly): def checkpoint_dataframe_caching(df): return DataFrame(sqlContext._ssql_ctx.createDataFrame(df._jdf.rdd().cache(), train_data._jdf.schema()), sqlContext) before I pass it into the CrossValidator.fit(). If I do this, I still have to cache the underlying rdd once more than necessary (in addition to DataFrame#cache()), but at least in cross validation, it doesn't recompute the RDD graph anymore. Note, that input_df.rdd.cache() doesn't work because the python CrossValidator implementation applies some more dataframe transformations like filter, which then causes filtered_df.rdd to return a completely different rdd that recomputes the entire graph. Is it the intention of Spark SQL that calling DataFrame#rdd removes any caching that was done for the query? Is the fix as simple as getting the DataFrame#rdd to reference the cached query, or is there something more subtle going on. Best, Justin
Fwd: Writing streaming data to cassandra creates duplicates
Hi TD, Thanks for the info. I have the scenario like this. I am reading the data from kafka topic. Let's say kafka has 3 partitions for the topic. In my streaming application, I would configure 3 receivers with 1 thread each such that they would receive 3 dstreams (from 3 partitions of kafka topic) and also I implement partitioner. Now there is a possibility of receiving messages with same primary key twice or more, one is at the time message is created and other times if there is an update to any fields for same message. If two messages M1 and M2 with same primary key are read by 2 receivers then even the partitioner in spark would still end up in parallel processing as there are altogether in different dstreams. How do we address in this situation ? Thanks, Padma Ch On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com wrote: You have to partition that data on the Spark Streaming by the primary key, and then make sure insert data into Cassandra atomically per key, or per set of keys in the partition. You can use the combination of the (batch time, and partition Id) of the RDD inside foreachRDD as the unique id for the data you are inserting. This will guard against multiple attempts to run the task that inserts into Cassandra. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations TD On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, I have a problem when writing streaming data to cassandra. Or existing product is on Oracle DB in which while wrtiting data, locks are maintained such that duplicates in the DB are avoided. But as spark has parallel processing architecture, if more than 1 thread is trying to write same data i.e with same primary key, is there as any scope to created duplicates? If yes, how to address this problem either from spark or from cassandra side ? Thanks, Padma Ch
Reminder about Spark 1.5.0 code freeze deadline of Aug 1st
Hey All, Just a friendly reminder that Aug 1st is the feature freeze for Spark 1.5, meaning major outstanding changes will need to land in the this week. After May 1st we'll package a release for testing and then go into the normal triage process where bugs are prioritized and some smaller features are allowed on a case by case basis (if they are very low risk/ additive/feature flagged/etc). As always, I'll invite the community to help participate in code review of patches in the this week, since review bandwidth is the single biggest determinant of how many features will get in. Please also keep in mind that most active committers are working overtime (nights/weekends) during this period and will try their best to help usher in as many patches as possible, along with their own code. As a reminder, release window dates are always maintained on the wiki and are updated after each release according to our 3 month release cadence: https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage Thanks - and happy coding! - Reynold
Re: Generalised Spark-HBase integration
Thanks Michal, Just to share what I'm working on in a related topic. So a long time ago I build SparkOnHBase and put it into Cloudera Labs in this link. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ Also recently I am working on getting this into HBase core. It will hopefully be in HBase core with in the next couple of weeks. https://issues.apache.org/jira/browse/HBASE-13992 Then I'm planing on adding dataframe and bulk load support through https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 Also if you are interested this is running today a at least a half a dozen companies with Spark Streaming. Here is one blog post of successful implementation http://blog.cloudera.com/blog/2015/03/how-edmunds-com-used-spark-streaming-to-build-a-near-real-time-dashboard/ Also here is an additional example blog I also put together http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ Let me know if you have any questions, also let me know if you want to connect to join efforts. Ted Malaska On Tue, Jul 28, 2015 at 11:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Generalised Spark-HBase integration
Oops, yes, I'm still messing with the repo on a daily basis.. fixed On 28 July 2015 at 17:11, Ted Yu yuzhih...@gmail.com wrote: I got a compilation error: [INFO] /home/hbase/s-on-hbase/src/main/scala:-1: info: compiling [INFO] Compiling 18 source files to /home/hbase/s-on-hbase/target/classes at 1438099569598 [ERROR] /home/hbase/s-on-hbase/src/main/scala/org/apache/spark/hbase/examples/simple/HBaseTableSimple.scala:36: error: type mismatch; [INFO] found : Int [INFO] required: Short [INFO] while (scanner.advance) numCells += 1 [INFO]^ [ERROR] one error found FYI On Tue, Jul 28, 2015 at 8:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Generalised Spark-HBase integration
Hi Ted, yes, cloudera blog and your code was my starting point - but I needed something more spark-centric rather than on hbase. Basically doing a lot of ad-hoc transformations with RDDs that were based on HBase tables and then mutating them after series of iterative (bsp-like) steps. On 28 July 2015 at 17:06, Ted Malaska ted.mala...@cloudera.com wrote: Thanks Michal, Just to share what I'm working on in a related topic. So a long time ago I build SparkOnHBase and put it into Cloudera Labs in this link. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ Also recently I am working on getting this into HBase core. It will hopefully be in HBase core with in the next couple of weeks. https://issues.apache.org/jira/browse/HBASE-13992 Then I'm planing on adding dataframe and bulk load support through https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 Also if you are interested this is running today a at least a half a dozen companies with Spark Streaming. Here is one blog post of successful implementation http://blog.cloudera.com/blog/2015/03/how-edmunds-com-used-spark-streaming-to-build-a-near-real-time-dashboard/ Also here is an additional example blog I also put together http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ Let me know if you have any questions, also let me know if you want to connect to join efforts. Ted Malaska On Tue, Jul 28, 2015 at 11:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Generalised Spark-HBase integration
Brilliant! Will check it out. Cheers Jules -- The Best Ideas Are Simple Jules Damji Developer Relations Community Outreach jda...@hortonworks.com http://hortonworks.com On 7/28/15, 8:59 AM, Michal Haris michal.ha...@visualdna.commailto:michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.comhttp://www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Generalised Spark-HBase integration
Cool, will revisit, is your latest code visible publicly somewhere ? On 28 July 2015 at 17:14, Ted Malaska ted.mala...@cloudera.com wrote: Yup you should be able to do that with the APIs that are going into HBase. Let me know if you need to chat about the problem and how to implement it with the HBase apis. We have tried to cover any possible way to use HBase with Spark. Let us know if we missed anything if we did we will add it. On Tue, Jul 28, 2015 at 12:12 PM, Michal Haris michal.ha...@visualdna.com wrote: Hi Ted, yes, cloudera blog and your code was my starting point - but I needed something more spark-centric rather than on hbase. Basically doing a lot of ad-hoc transformations with RDDs that were based on HBase tables and then mutating them after series of iterative (bsp-like) steps. On 28 July 2015 at 17:06, Ted Malaska ted.mala...@cloudera.com wrote: Thanks Michal, Just to share what I'm working on in a related topic. So a long time ago I build SparkOnHBase and put it into Cloudera Labs in this link. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ Also recently I am working on getting this into HBase core. It will hopefully be in HBase core with in the next couple of weeks. https://issues.apache.org/jira/browse/HBASE-13992 Then I'm planing on adding dataframe and bulk load support through https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 Also if you are interested this is running today a at least a half a dozen companies with Spark Streaming. Here is one blog post of successful implementation http://blog.cloudera.com/blog/2015/03/how-edmunds-com-used-spark-streaming-to-build-a-near-real-time-dashboard/ Also here is an additional example blog I also put together http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ Let me know if you have any questions, also let me know if you want to connect to join efforts. Ted Malaska On Tue, Jul 28, 2015 at 11:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
ReceiverTrackerSuite failing in master build
Hi, I noticed that ReceiverTrackerSuite is failing in master Jenkins build for both hadoop profiles. The failure seems to start with: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/3104/ FYI
Generalised Spark-HBase integration
Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Generalised Spark-HBase integration
I got a compilation error: [INFO] /home/hbase/s-on-hbase/src/main/scala:-1: info: compiling [INFO] Compiling 18 source files to /home/hbase/s-on-hbase/target/classes at 1438099569598 [ERROR] /home/hbase/s-on-hbase/src/main/scala/org/apache/spark/hbase/examples/simple/HBaseTableSimple.scala:36: error: type mismatch; [INFO] found : Int [INFO] required: Short [INFO] while (scanner.advance) numCells += 1 [INFO]^ [ERROR] one error found FYI On Tue, Jul 28, 2015 at 8:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
[Spark SQL]Could not read parquet table after recreating it with the same table name
Hi all, I'am using SparkSQL in Spark 1.4.1. I encounter an error when using parquet table after recreating it, we can reproduce the error as following: ```scala // hc is an instance of HiveContext hc.sql(select * from b).show() // this is ok and b is a parquet table val df = hc.sql(select * from a) df.write.mode(SaveMode.Overwrite).saveAsTable(b) hc.sql(select * from b).show() // got error ``` The error is: java.io.FileNotFoundException: File does not exist: /user/hive/warehouse/test.db/b/part-r-4-3abcbb07-e20a-4b5e-a6e5-59356c3d3149.gz.parquet at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:65) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:55) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1716) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1659) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1639) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1613) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:497) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:322) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1144) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1132) at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1182) at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:218) at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:214) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:214) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:206) at org.apache.spark.sql.parquet.FilteringParquetRowInputFormat$$anonfun$getTaskSideSplits$1.apply(ParquetTableOperations.scala:625) at org.apache.spark.sql.parquet.FilteringParquetRowInputFormat$$anonfun$getTaskSideSplits$1.apply(ParquetTableOperations.scala:621) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.getTaskSideSplits(ParquetTableOperations.scala:621) at org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.getSplits(ParquetTableOperations.scala:511) at parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:245) at org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.getSplits(ParquetTableOperations.scala:464) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$buildScan$1$$anon$1.getPartitions(newParquet.scala:305) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at
Re: PySpark on PyPi
// ping do we have any signoff from the pyspark devs to submit a PR to publish to PyPI? On Fri, Jul 24, 2015 at 10:50 PM Jeremy Freeman freeman.jer...@gmail.com wrote: Hey all, great discussion, just wanted to +1 that I see a lot of value in steps that make it easier to use PySpark as an ordinary python library. You might want to check out this (https://github.com/minrk/findspark), started by Jupyter project devs, that offers one way to facilitate this stuff. I’ve also cced them here to join the conversation. Also, @Jey, I can also confirm that at least in some scenarios (I’ve done it in an EC2 cluster in standalone mode) it’s possible to run PySpark jobs just using `from pyspark import SparkContext; sc = SparkContext(master=“X”)` so long as the environmental variables (PYTHONPATH and PYSPARK_PYTHON) are set correctly on *both* workers and driver. That said, there’s definitely additional configuration / functionality that would require going through the proper submit scripts. On Jul 22, 2015, at 7:41 PM, Punyashloka Biswal punya.bis...@gmail.com wrote: I agree with everything Justin just said. An additional advantage of publishing PySpark's Python code in a standards-compliant way is the fact that we'll be able to declare transitive dependencies (Pandas, Py4J) in a way that pip can use. Contrast this with the current situation, where df.toPandas() exists in the Spark API but doesn't actually work until you install Pandas. Punya On Wed, Jul 22, 2015 at 12:49 PM Justin Uang justin.u...@gmail.com wrote: // + *Davies* for his comments // + Punya for SA For development and CI, like Olivier mentioned, I think it would be hugely beneficial to publish pyspark (only code in the python/ dir) on PyPI. If anyone wants to develop against PySpark APIs, they need to download the distribution and do a lot of PYTHONPATH munging for all the tools (pylint, pytest, IDE code completion). Right now that involves adding python/ and python/lib/py4j-0.8.2.1-src.zip. In case pyspark ever wants to add more dependencies, we would have to manually mirror all the PYTHONPATH munging in the ./pyspark script. With a proper pyspark setup.py which declares its dependencies, and a published distribution, depending on pyspark will just be adding pyspark to my setup.py dependencies. Of course, if we actually want to run parts of pyspark that is backed by Py4J calls, then we need the full spark distribution with either ./pyspark or ./spark-submit, but for things like linting and development, the PYTHONPATH munging is very annoying. I don't think the version-mismatch issues are a compelling reason to not go ahead with PyPI publishing. At runtime, we should definitely enforce that the version has to be exact, which means there is no backcompat nightmare as suggested by Davies in https://issues.apache.org/jira/browse/SPARK-1267. This would mean that even if the user got his pip installed pyspark to somehow get loaded before the spark distribution provided pyspark, then the user would be alerted immediately. *Davies*, if you buy this, should me or someone on my team pick up https://issues.apache.org/jira/browse/SPARK-1267 and https://github.com/apache/spark/pull/464? On Sat, Jun 6, 2015 at 12:48 AM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Ok, I get it. Now what can we do to improve the current situation, because right now if I want to set-up a CI env for PySpark, I have to : 1- download a pre-built version of pyspark and unzip it somewhere on every agent 2- define the SPARK_HOME env 3- symlink this distribution pyspark dir inside the python install dir site-packages/ directory and if I rely on additional packages (like databricks' Spark-CSV project), I have to (except if I'm mistaken) 4- compile/assembly spark-csv, deploy the jar in a specific directory on every agent 5- add this jar-filled directory to the Spark distribution's additional classpath using the conf/spark-default file Then finally we can launch our unit/integration-tests. Some issues are related to spark-packages, some to the lack of python-based dependency, and some to the way SparkContext are launched when using pyspark. I think step 1 and 2 are fair enough 4 and 5 may already have solutions, I didn't check and considering spark-shell is downloading such dependencies automatically, I think if nothing's done yet it will (I guess ?). For step 3, maybe just adding a setup.py to the distribution would be enough, I'm not exactly advocating to distribute a full 300Mb spark distribution in PyPi, maybe there's a better compromise ? Regards, Olivier. Le ven. 5 juin 2015 à 22:12, Jey Kottalam j...@cs.berkeley.edu a écrit : Couldn't we have a pip installable pyspark package that just serves as a shim to an existing Spark installation? Or it could even download the latest Spark binary if SPARK_HOME isn't set during installation. Right now, Spark doesn't play very well with
Re: Broadcast variable of size 1 GB fails with negative memory exception
Hi Imran, Thanks for your reply. I have double-checked the code I ran to generate an nxn matrix and nx1 vector for n = 2^27. There was unfortunately a bug in it, where instead of having typed 134,217,728 for n = 2^27, I included a third '7' by mistake, making the size 10x larger. However, even after having corrected this, my question about broadcasting is still whether or not a variable = 2G in size may be transferred? In this case, for n = 2^28, the broadcast variable crashes, and an array of size MAX_INT cannot be broadcast. Looking at Chowdhury's Performance and Scalability of Broadcast in Spark technical report, I realize that the results are reported only for broadcast variables up to 1 GB in physical size. I was hoping, however, that an Array of size MAX_INT would be transferrable via a broadcast (since the previous PR I mentioned seems to have added support for 2GB variables) such that the matrix-vector multiplication would scale to MAX_INT x MAX_INT matrices with a broadcast variable. Would you or anyone on the dev list be able to comment on whether this is possible? Since the (corrected) overflow I'm seeing is for 2^31 physical bytes being transferred, I am guessing that there is still a physical limitation on how many bytes may be sent via broadcasting, at least for a primitive Array[Double]? Thanks, Mike 19176INFOIndexedRowMatrixBroadcasting vecArray with size 268435456 19177INFOMemoryStoreensureFreeSpace(-2147483592) called with curMem=6888, maxMem=92610625536 19177INFOMemoryStoreBlock broadcast_2 stored as values in memory (estimated size -2147483592.0 B, free 88.3 GB) Exception in thread main java.lang.IllegalArgumentException: requirement failed: sizeInBytes was negative: -2147483592 On 7/28/15, Imran Rashid iras...@cloudera.com wrote: Hi Mike, are you sure there the size isn't off 2x somehow? I just tried to reproduce with a simple test in BlockManagerSuite: test(large block) { store = makeBlockManager(4e9.toLong) val arr = new Array[Double](1 28) println(arr.size) val blockId = BlockId(rdd_3_10) val result = store.putIterator(blockId, Iterator(arr), StorageLevel.MEMORY_AND_DISK) result.foreach{println} } it fails at 1 28 with nearly the same message, but its fine for (1 28) - 1 with a reported block size of 2147483680. Not exactly the same as what you did, but I expect it to be close enough to exhibit the same error. On Tue, Jul 28, 2015 at 12:37 PM, Mike Hynes 91m...@gmail.com wrote: Hello Devs, I am investigating how matrix vector multiplication can scale for an IndexedRowMatrix in mllib.linalg.distributed. Currently, I am broadcasting the vector to be multiplied on the right. The IndexedRowMatrix is stored across a cluster with up to 16 nodes, each with 200 GB of memory. The spark driver is on an identical node, having more than 200 Gb of memory. In scaling n, the size of the vector to be broadcast, I find that the maximum size of n that I can use is 2^26. For 2^27, the broadcast will fail. The array being broadcast is of type Array[Double], so the contents have size 2^30 bytes, which is approximately 1 (metric) GB. I have read in PR [SPARK-3721] [PySpark] broadcast objects larger than 2G that this should be supported (I assume this works for scala, as well?). However, when I increase n to 2^27 or above, the program invariably crashes at the broadcast. The problem stems from the size of the result block to be sent in BlockInfo.scala; the size is reportedly negative. An example error log is shown below. If anyone has more experience or knowledge of why this broadcast is failing, I'd appreciate the input. -- Thanks, Mike 55584:INFO:MemoryStore:ensureFreeSpace(-2147480008) called with curMem=0, maxMem=92610625536: 55584:INFO:MemoryStore:Block broadcast-2 stored as values in memory (estimated size -2147480008.0 B, free 88.3 GB): Exception in thread main java.lang.IllegalArgumentException: requirement failed: sizeInBytes was negative: -2147480008 at scala.Predef$.require(Predef.scala:233) at org.apache.spark.storage.BlockInfo.markReady(BlockInfo.scala:55) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:815) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:996) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1297) at org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix.multiply(IndexedRowMatrix.scala:184) at
Re: Generalised Spark-HBase integration
Yup you should be able to do that with the APIs that are going into HBase. Let me know if you need to chat about the problem and how to implement it with the HBase apis. We have tried to cover any possible way to use HBase with Spark. Let us know if we missed anything if we did we will add it. On Tue, Jul 28, 2015 at 12:12 PM, Michal Haris michal.ha...@visualdna.com wrote: Hi Ted, yes, cloudera blog and your code was my starting point - but I needed something more spark-centric rather than on hbase. Basically doing a lot of ad-hoc transformations with RDDs that were based on HBase tables and then mutating them after series of iterative (bsp-like) steps. On 28 July 2015 at 17:06, Ted Malaska ted.mala...@cloudera.com wrote: Thanks Michal, Just to share what I'm working on in a related topic. So a long time ago I build SparkOnHBase and put it into Cloudera Labs in this link. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ Also recently I am working on getting this into HBase core. It will hopefully be in HBase core with in the next couple of weeks. https://issues.apache.org/jira/browse/HBASE-13992 Then I'm planing on adding dataframe and bulk load support through https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 Also if you are interested this is running today a at least a half a dozen companies with Spark Streaming. Here is one blog post of successful implementation http://blog.cloudera.com/blog/2015/03/how-edmunds-com-used-spark-streaming-to-build-a-near-real-time-dashboard/ Also here is an additional example blog I also put together http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ Let me know if you have any questions, also let me know if you want to connect to join efforts. Ted Malaska On Tue, Jul 28, 2015 at 11:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Broadcast variable of size 1 GB fails with negative memory exception
Hello Devs, I am investigating how matrix vector multiplication can scale for an IndexedRowMatrix in mllib.linalg.distributed. Currently, I am broadcasting the vector to be multiplied on the right. The IndexedRowMatrix is stored across a cluster with up to 16 nodes, each with 200 GB of memory. The spark driver is on an identical node, having more than 200 Gb of memory. In scaling n, the size of the vector to be broadcast, I find that the maximum size of n that I can use is 2^26. For 2^27, the broadcast will fail. The array being broadcast is of type Array[Double], so the contents have size 2^30 bytes, which is approximately 1 (metric) GB. I have read in PR [SPARK-3721] [PySpark] broadcast objects larger than 2G that this should be supported (I assume this works for scala, as well?). However, when I increase n to 2^27 or above, the program invariably crashes at the broadcast. The problem stems from the size of the result block to be sent in BlockInfo.scala; the size is reportedly negative. An example error log is shown below. If anyone has more experience or knowledge of why this broadcast is failing, I'd appreciate the input. -- Thanks, Mike 55584:INFO:MemoryStore:ensureFreeSpace(-2147480008) called with curMem=0, maxMem=92610625536: 55584:INFO:MemoryStore:Block broadcast-2 stored as values in memory (estimated size -2147480008.0 B, free 88.3 GB): Exception in thread main java.lang.IllegalArgumentException: requirement failed: sizeInBytes was negative: -2147480008 at scala.Predef$.require(Predef.scala:233) at org.apache.spark.storage.BlockInfo.markReady(BlockInfo.scala:55) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:815) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:996) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1297) at org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix.multiply(IndexedRowMatrix.scala:184) at himrod.linalg.KrylovTests$.main(KrylovTests.scala:172) at himrod.linalg.KrylovTests.main(KrylovTests.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:666) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:118) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Generalised Spark-HBase integration
Stuff that people are using is here. https://github.com/cloudera-labs/SparkOnHBase The stuff going into HBase is here https://issues.apache.org/jira/browse/HBASE-13992 If you want to add things to the hbase ticket lets do it in another jira. Like these jira https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 This first jira is mainly getting the Spark dependancies and separate module set up so we can start making additional jiras to add additional functionality. The goal is to have the following in HBase by end of summer: RDD and DStream Functions 1. BulkPut 2. BulkGet 3. BulkDelete 4. Foreach with connection 5. Map with connection 6. Distributed Scan 7. BulkLoad DataFrame Functions 1. BulkPut 2. BulkGet 6. Distributed Scan 7. BulkLoad If you think there should be more let me know Ted Malaska On Tue, Jul 28, 2015 at 12:17 PM, Michal Haris michal.ha...@visualdna.com wrote: Cool, will revisit, is your latest code visible publicly somewhere ? On 28 July 2015 at 17:14, Ted Malaska ted.mala...@cloudera.com wrote: Yup you should be able to do that with the APIs that are going into HBase. Let me know if you need to chat about the problem and how to implement it with the HBase apis. We have tried to cover any possible way to use HBase with Spark. Let us know if we missed anything if we did we will add it. On Tue, Jul 28, 2015 at 12:12 PM, Michal Haris michal.ha...@visualdna.com wrote: Hi Ted, yes, cloudera blog and your code was my starting point - but I needed something more spark-centric rather than on hbase. Basically doing a lot of ad-hoc transformations with RDDs that were based on HBase tables and then mutating them after series of iterative (bsp-like) steps. On 28 July 2015 at 17:06, Ted Malaska ted.mala...@cloudera.com wrote: Thanks Michal, Just to share what I'm working on in a related topic. So a long time ago I build SparkOnHBase and put it into Cloudera Labs in this link. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ Also recently I am working on getting this into HBase core. It will hopefully be in HBase core with in the next couple of weeks. https://issues.apache.org/jira/browse/HBASE-13992 Then I'm planing on adding dataframe and bulk load support through https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 Also if you are interested this is running today a at least a half a dozen companies with Spark Streaming. Here is one blog post of successful implementation http://blog.cloudera.com/blog/2015/03/how-edmunds-com-used-spark-streaming-to-build-a-near-real-time-dashboard/ Also here is an additional example blog I also put together http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ Let me know if you have any questions, also let me know if you want to connect to join efforts. Ted Malaska On Tue, Jul 28, 2015 at 11:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Generalised Spark-HBase integration
Sorry this is more correct RDD and DStream Functions 1. BulkPut 2. BulkGet 3. BulkDelete 4. Foreach with connection 5. Map with connection 6. Distributed Scan 7. BulkLoad DataFrame Functions 1. BulkPut 2. BulkGet 3. Foreach with connection 4. Map with connection 5. Distributed Scan 6. BulkLoad On Tue, Jul 28, 2015 at 12:23 PM, Ted Malaska ted.mala...@cloudera.com wrote: Stuff that people are using is here. https://github.com/cloudera-labs/SparkOnHBase The stuff going into HBase is here https://issues.apache.org/jira/browse/HBASE-13992 If you want to add things to the hbase ticket lets do it in another jira. Like these jira https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 This first jira is mainly getting the Spark dependancies and separate module set up so we can start making additional jiras to add additional functionality. The goal is to have the following in HBase by end of summer: RDD and DStream Functions 1. BulkPut 2. BulkGet 3. BulkDelete 4. Foreach with connection 5. Map with connection 6. Distributed Scan 7. BulkLoad DataFrame Functions 1. BulkPut 2. BulkGet 6. Distributed Scan 7. BulkLoad If you think there should be more let me know Ted Malaska On Tue, Jul 28, 2015 at 12:17 PM, Michal Haris michal.ha...@visualdna.com wrote: Cool, will revisit, is your latest code visible publicly somewhere ? On 28 July 2015 at 17:14, Ted Malaska ted.mala...@cloudera.com wrote: Yup you should be able to do that with the APIs that are going into HBase. Let me know if you need to chat about the problem and how to implement it with the HBase apis. We have tried to cover any possible way to use HBase with Spark. Let us know if we missed anything if we did we will add it. On Tue, Jul 28, 2015 at 12:12 PM, Michal Haris michal.ha...@visualdna.com wrote: Hi Ted, yes, cloudera blog and your code was my starting point - but I needed something more spark-centric rather than on hbase. Basically doing a lot of ad-hoc transformations with RDDs that were based on HBase tables and then mutating them after series of iterative (bsp-like) steps. On 28 July 2015 at 17:06, Ted Malaska ted.mala...@cloudera.com wrote: Thanks Michal, Just to share what I'm working on in a related topic. So a long time ago I build SparkOnHBase and put it into Cloudera Labs in this link. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ Also recently I am working on getting this into HBase core. It will hopefully be in HBase core with in the next couple of weeks. https://issues.apache.org/jira/browse/HBASE-13992 Then I'm planing on adding dataframe and bulk load support through https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 Also if you are interested this is running today a at least a half a dozen companies with Spark Streaming. Here is one blog post of successful implementation http://blog.cloudera.com/blog/2015/03/how-edmunds-com-used-spark-streaming-to-build-a-near-real-time-dashboard/ Also here is an additional example blog I also put together http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ Let me know if you have any questions, also let me know if you want to connect to join efforts. Ted Malaska On Tue, Jul 28, 2015 at 11:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Custom UDFs with zero parameters support
I think we do support 0 arg UDFs: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L2165 How are you using UDFs? On Tue, Jul 28, 2015 at 2:15 AM, Sachith Withana swsach...@gmail.com wrote: Hi all, Currently I need to support custom UDFs with sparkSQL queries which have no parameters. ex: now() : which returns the current time in milliseconds. Spark currently have support for UDFs having 1 or more parameters but does not contain a UDF0 Adaptor. Is there a way to implement this? Or is there a way to support custom keywords such as now which would act as an custom UDF with no parameters. -- Thanks, Sachith Withana
Re: ReceiverTrackerSuite failing in master build
Thanks ted for pointing this out. CC to Ryan and TD On Tue, Jul 28, 2015 at 8:25 AM, Ted Yu yuzhih...@gmail.com wrote: Hi, I noticed that ReceiverTrackerSuite is failing in master Jenkins build for both hadoop profiles. The failure seems to start with: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/3104/ FYI - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Two joins in GraphX Pregel implementation
On 27 Jul 2015, at 16:42, Ulanov, Alexander alexander.ula...@hp.com wrote: It seems that the mentioned two joins can be rewritten as one outer join You're right. In fact, the outer join can be streamlined further using a method from GraphOps: g = g.joinVertices(messages)(vprog).cache() Then, instead of passing newVerts as the active set for mapReduceTriplets, we could pass `messages`. If you're interested in proposing a PR for this, I've attached a patch with these changes and updates to the comments. On Tue, Jul 28, 2015 at 1:15 AM, Ulanov, Alexander alexander.ula...@hp.com wrote: I’ve found two PRs (almost identical) for replacing mapReduceTriplets with aggregateMessages [...] Do you know the reason why this improvement is not pushed? There isn't any performance benefit to switching Pregel to use aggregateMessages while preserving its current interface, because the interface uses Iterators and would require us to wrap and unwrap them anyway. The semantics of aggregateMessagesWithActiveSet are otherwise the same as mapReduceTriplets, so there isn't any functionality we are missing out on. And this change seems too small to justify introducing a new version of Pregel, though it would be worthwhile when combined with other improvements https://github.com/apache/spark/pull/1217. Ankur http://www.ankurdave.com/ pregel-simplify-join.patch Description: Binary data - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
update on git timeouts for jenkins builds
hey all, i'm just back in from my wedding weekend (woot!) and am working on figuring out what's happening w/the git timeouts for pull request builds. TL;DR: if your build fails due to a timeout, please retrigger your builds. i know this isn't the BEST solution, but until we get some stuff implemented (traffic shaping, git cache for the workers) it's the only thing i can recommend. here's a snapshot of the state of the union: $ get_timeouts.sh 5 timeouts by date: 2015-07-23 -- 3 2015-07-24 -- 1 2015-07-26 -- 7 2015-07-27 -- 18 2015-07-28 -- 9 timeouts by project: 35 SparkPullRequestBuilder 3 Tachyon-Pull-Request-Builder total builds (excepting aborted by a user): 1908 total percentage of builds timing out: 01% nothing has changed on our end AFAIK, our traffic graphs look totally fine, but starting sunday, we started seeing a spike in timeouts, with yesterday being the worst. today is also not looking good either. github is looking OK, but not great: https://status.github.com/ as a solution, we'll be setting up some traffic shaping on our end, as well as implementing a git cache on the workers so that we'll (hopefully) minimize how many hits we make against github. i was planning on doing the git cache months ago, but the timeout issue pretty much went away and i back-burnered that idea until today. other than that, i'll be posting updates as we get them. shane - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: update on git timeouts for jenkins builds
btw, the directory perm issue was only happening on amp-jenkins-worker-04 and -05. both of the broken dirs were clobbered, so we won't be seeing any more of these again. On Tue, Jul 28, 2015 at 12:28 PM, shane knapp skn...@berkeley.edu wrote: ++joshrosen ok, i found out some of what's going on. some builds were failing as such: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38749/console note that it's unable to remove the target/ directory during the build... this is caused by 'git clean -fdx' running, and deep in the target directory there were a couple of dirs that had the wrong permission bits set: dr-xr-xr-x. 2 jenkins jenkins 4096 Jul 27 06:54 /home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-615f93cc-27ad-464b-b0d4-4352c96c22ee note the missing 'w' on the owner bits. this is what was causing those failures. after manually deleting the two entries that i found (using the command below), we've whacked this mole for now. for x in $(cat jenkins_workers.txt); do echo $x; ssh $x find /home/jenkins/workspace/SparkPullRequestBuilder*/target/tmp -maxdepth 3| xargs ls -ld | egrep ^dr-x; echo; echo; done as for what exactly is messing up the perms, i'm not entirely sure. josh, you have any ideas? shane On Tue, Jul 28, 2015 at 11:51 AM, shane knapp skn...@berkeley.edu wrote: hey all, i'm just back in from my wedding weekend (woot!) and am working on figuring out what's happening w/the git timeouts for pull request builds. TL;DR: if your build fails due to a timeout, please retrigger your builds. i know this isn't the BEST solution, but until we get some stuff implemented (traffic shaping, git cache for the workers) it's the only thing i can recommend. here's a snapshot of the state of the union: $ get_timeouts.sh 5 timeouts by date: 2015-07-23 -- 3 2015-07-24 -- 1 2015-07-26 -- 7 2015-07-27 -- 18 2015-07-28 -- 9 timeouts by project: 35 SparkPullRequestBuilder 3 Tachyon-Pull-Request-Builder total builds (excepting aborted by a user): 1908 total percentage of builds timing out: 01% nothing has changed on our end AFAIK, our traffic graphs look totally fine, but starting sunday, we started seeing a spike in timeouts, with yesterday being the worst. today is also not looking good either. github is looking OK, but not great: https://status.github.com/ as a solution, we'll be setting up some traffic shaping on our end, as well as implementing a git cache on the workers so that we'll (hopefully) minimize how many hits we make against github. i was planning on doing the git cache months ago, but the timeout issue pretty much went away and i back-burnered that idea until today. other than that, i'll be posting updates as we get them. shane - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Opinion on spark-class script simplification and posix compliance
Hi, Out of curiosity, I have tried to replace the dependence on bash by sh in the different scripts to launch Spark daemons and jobs. So far, most scripts work with sh, except bin/spark-class. The culprit is the while loop that compose the final command by parsing the output of launcher library. After a bit of fiddling, I found that the following loop: CMD=() while IFS= read -d '' -r ARG; do CMD+=($ARG) done ($RUNNER -cp $LAUNCH_CLASSPATH org.apache.spark.launcher.Main $@) could be replaced by a single line: CMD=$($RUNNER -cp $LAUNCH_CLASSPATH org.apache.spark.launcher.Main $@ | xargs -0) The while loop cannot be executed with sh, while the single line can be. Since on my system, sh is simply a link on bash, with some options activated, I guess this simply means that the while loop syntax is not posix compliant. Which spawns two questions: 1. Would it be useful to make sure Spark scripts are POSIX compliant? 2. Is the simplification of spark-class enough to consider making a pull-request? Thanks, Felix-Antoine Fortin - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: update on git timeouts for jenkins builds
++joshrosen ok, i found out some of what's going on. some builds were failing as such: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38749/console note that it's unable to remove the target/ directory during the build... this is caused by 'git clean -fdx' running, and deep in the target directory there were a couple of dirs that had the wrong permission bits set: dr-xr-xr-x. 2 jenkins jenkins 4096 Jul 27 06:54 /home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-615f93cc-27ad-464b-b0d4-4352c96c22ee note the missing 'w' on the owner bits. this is what was causing those failures. after manually deleting the two entries that i found (using the command below), we've whacked this mole for now. for x in $(cat jenkins_workers.txt); do echo $x; ssh $x find /home/jenkins/workspace/SparkPullRequestBuilder*/target/tmp -maxdepth 3| xargs ls -ld | egrep ^dr-x; echo; echo; done as for what exactly is messing up the perms, i'm not entirely sure. josh, you have any ideas? shane On Tue, Jul 28, 2015 at 11:51 AM, shane knapp skn...@berkeley.edu wrote: hey all, i'm just back in from my wedding weekend (woot!) and am working on figuring out what's happening w/the git timeouts for pull request builds. TL;DR: if your build fails due to a timeout, please retrigger your builds. i know this isn't the BEST solution, but until we get some stuff implemented (traffic shaping, git cache for the workers) it's the only thing i can recommend. here's a snapshot of the state of the union: $ get_timeouts.sh 5 timeouts by date: 2015-07-23 -- 3 2015-07-24 -- 1 2015-07-26 -- 7 2015-07-27 -- 18 2015-07-28 -- 9 timeouts by project: 35 SparkPullRequestBuilder 3 Tachyon-Pull-Request-Builder total builds (excepting aborted by a user): 1908 total percentage of builds timing out: 01% nothing has changed on our end AFAIK, our traffic graphs look totally fine, but starting sunday, we started seeing a spike in timeouts, with yesterday being the worst. today is also not looking good either. github is looking OK, but not great: https://status.github.com/ as a solution, we'll be setting up some traffic shaping on our end, as well as implementing a git cache on the workers so that we'll (hopefully) minimize how many hits we make against github. i was planning on doing the git cache months ago, but the timeout issue pretty much went away and i back-burnered that idea until today. other than that, i'll be posting updates as we get them. shane - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Opinion on spark-class script simplification and posix compliance
On Tue, Jul 28, 2015 at 12:13 PM, Félix-Antoine Fortin felix-antoine.for...@calculquebec.ca wrote: The while loop cannot be executed with sh, while the single line can be. Since on my system, sh is simply a link on bash, with some options activated, I guess this simply means that the while loop syntax is not posix compliant. Which spawns two questions: 1. Would it be useful to make sure Spark scripts are POSIX compliant? I guess if people are trying to run Spark on systems where bash is not super common it could help. Not sure how common that use case is. Otherwise, it seems like it would be more noise than actually useful... Also note that bash, when executed as /bin/sh, still allows syntax that other shells do not necessarily understand. A better test would be to try to run the script with dash or something else that is not bash. -- Marcelo
Re: Rebase and Squash Commits to Revise PR?
Thanks Sean. Very helpful! On Tue, Jul 28, 2015 at 1:49 PM, Sean Owen so...@cloudera.com wrote: You only need to rebase if your branch/PR now conflicts with master. you don't need to squash since the merge script will do that in the end for you. You can squash commits and force-push if you think it would help clean up your intent, but, often it's clearer to leave the review and commit history of your branch since the review comments go along with it. On Tue, Jul 28, 2015 at 9:46 PM, Meihua Wu rotationsymmetr...@gmail.com wrote: I am planning to update my PR to incorporate comments from reviewers. Do I need to rebase/squash the commits into a single one? Thanks! -MW - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Broadcast variable of size 1 GB fails with negative memory exception
Hi Mike, are you sure there the size isn't off 2x somehow? I just tried to reproduce with a simple test in BlockManagerSuite: test(large block) { store = makeBlockManager(4e9.toLong) val arr = new Array[Double](1 28) println(arr.size) val blockId = BlockId(rdd_3_10) val result = store.putIterator(blockId, Iterator(arr), StorageLevel.MEMORY_AND_DISK) result.foreach{println} } it fails at 1 28 with nearly the same message, but its fine for (1 28) - 1 with a reported block size of 2147483680. Not exactly the same as what you did, but I expect it to be close enough to exhibit the same error. On Tue, Jul 28, 2015 at 12:37 PM, Mike Hynes 91m...@gmail.com wrote: Hello Devs, I am investigating how matrix vector multiplication can scale for an IndexedRowMatrix in mllib.linalg.distributed. Currently, I am broadcasting the vector to be multiplied on the right. The IndexedRowMatrix is stored across a cluster with up to 16 nodes, each with 200 GB of memory. The spark driver is on an identical node, having more than 200 Gb of memory. In scaling n, the size of the vector to be broadcast, I find that the maximum size of n that I can use is 2^26. For 2^27, the broadcast will fail. The array being broadcast is of type Array[Double], so the contents have size 2^30 bytes, which is approximately 1 (metric) GB. I have read in PR [SPARK-3721] [PySpark] broadcast objects larger than 2G that this should be supported (I assume this works for scala, as well?). However, when I increase n to 2^27 or above, the program invariably crashes at the broadcast. The problem stems from the size of the result block to be sent in BlockInfo.scala; the size is reportedly negative. An example error log is shown below. If anyone has more experience or knowledge of why this broadcast is failing, I'd appreciate the input. -- Thanks, Mike 55584:INFO:MemoryStore:ensureFreeSpace(-2147480008) called with curMem=0, maxMem=92610625536: 55584:INFO:MemoryStore:Block broadcast-2 stored as values in memory (estimated size -2147480008.0 B, free 88.3 GB): Exception in thread main java.lang.IllegalArgumentException: requirement failed: sizeInBytes was negative: -2147480008 at scala.Predef$.require(Predef.scala:233) at org.apache.spark.storage.BlockInfo.markReady(BlockInfo.scala:55) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:815) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:996) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1297) at org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix.multiply(IndexedRowMatrix.scala:184) at himrod.linalg.KrylovTests$.main(KrylovTests.scala:172) at himrod.linalg.KrylovTests.main(KrylovTests.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:666) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:118) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: DataFrame#rdd doesn't respect DataFrame#cache, slowing down CrossValidator
Thanks for bringing this up! I talked with Michael Armbrust, and it sounds like this is a from a bug in DataFrame caching: https://issues.apache.org/jira/browse/SPARK-9141 It's marked as a blocker for 1.5. Joseph On Tue, Jul 28, 2015 at 2:36 AM, Justin Uang justin.u...@gmail.com wrote: Hey guys, I'm running into some pretty bad performance issues when it comes to using a CrossValidator, because of caching behavior of DataFrames. The root of the problem is that while I have cached my DataFrame representing the features and labels, it is caching at the DataFrame level, while CrossValidator/LogisticRegression both drop down to the dataset.rdd level, which ignores the caching that I have previously done. This is worsened by the fact that for each combination of a fold and a param set from the grid, it recomputes my entire input dataset because the caching was lost. My current solution is to force the input DataFrame to be based off of a cached RDD, which I did with this horrible hack (had to drop down to java from the pyspark because of something to do with vectors not be inferred correctly): def checkpoint_dataframe_caching(df): return DataFrame(sqlContext._ssql_ctx.createDataFrame(df._jdf.rdd().cache(), train_data._jdf.schema()), sqlContext) before I pass it into the CrossValidator.fit(). If I do this, I still have to cache the underlying rdd once more than necessary (in addition to DataFrame#cache()), but at least in cross validation, it doesn't recompute the RDD graph anymore. Note, that input_df.rdd.cache() doesn't work because the python CrossValidator implementation applies some more dataframe transformations like filter, which then causes filtered_df.rdd to return a completely different rdd that recomputes the entire graph. Is it the intention of Spark SQL that calling DataFrame#rdd removes any caching that was done for the query? Is the fix as simple as getting the DataFrame#rdd to reference the cached query, or is there something more subtle going on. Best, Justin
Re: update on git timeouts for jenkins builds
git caches are set up on all workers for the pull request builder, and builds are building w/the cache... however in the build logs it doesn't seem to be actually *hitting* the cache, so i guess i'll be doing some more poking and prodding to see wtf is going on. On Tue, Jul 28, 2015 at 12:49 PM, shane knapp skn...@berkeley.edu wrote: btw, the directory perm issue was only happening on amp-jenkins-worker-04 and -05. both of the broken dirs were clobbered, so we won't be seeing any more of these again. On Tue, Jul 28, 2015 at 12:28 PM, shane knapp skn...@berkeley.edu wrote: ++joshrosen ok, i found out some of what's going on. some builds were failing as such: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38749/console note that it's unable to remove the target/ directory during the build... this is caused by 'git clean -fdx' running, and deep in the target directory there were a couple of dirs that had the wrong permission bits set: dr-xr-xr-x. 2 jenkins jenkins 4096 Jul 27 06:54 /home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-615f93cc-27ad-464b-b0d4-4352c96c22ee note the missing 'w' on the owner bits. this is what was causing those failures. after manually deleting the two entries that i found (using the command below), we've whacked this mole for now. for x in $(cat jenkins_workers.txt); do echo $x; ssh $x find /home/jenkins/workspace/SparkPullRequestBuilder*/target/tmp -maxdepth 3| xargs ls -ld | egrep ^dr-x; echo; echo; done as for what exactly is messing up the perms, i'm not entirely sure. josh, you have any ideas? shane On Tue, Jul 28, 2015 at 11:51 AM, shane knapp skn...@berkeley.edu wrote: hey all, i'm just back in from my wedding weekend (woot!) and am working on figuring out what's happening w/the git timeouts for pull request builds. TL;DR: if your build fails due to a timeout, please retrigger your builds. i know this isn't the BEST solution, but until we get some stuff implemented (traffic shaping, git cache for the workers) it's the only thing i can recommend. here's a snapshot of the state of the union: $ get_timeouts.sh 5 timeouts by date: 2015-07-23 -- 3 2015-07-24 -- 1 2015-07-26 -- 7 2015-07-27 -- 18 2015-07-28 -- 9 timeouts by project: 35 SparkPullRequestBuilder 3 Tachyon-Pull-Request-Builder total builds (excepting aborted by a user): 1908 total percentage of builds timing out: 01% nothing has changed on our end AFAIK, our traffic graphs look totally fine, but starting sunday, we started seeing a spike in timeouts, with yesterday being the worst. today is also not looking good either. github is looking OK, but not great: https://status.github.com/ as a solution, we'll be setting up some traffic shaping on our end, as well as implementing a git cache on the workers so that we'll (hopefully) minimize how many hits we make against github. i was planning on doing the git cache months ago, but the timeout issue pretty much went away and i back-burnered that idea until today. other than that, i'll be posting updates as we get them. shane - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Rebase and Squash Commits to Revise PR?
I am planning to update my PR to incorporate comments from reviewers. Do I need to rebase/squash the commits into a single one? Thanks! -MW - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Rebase and Squash Commits to Revise PR?
You only need to rebase if your branch/PR now conflicts with master. you don't need to squash since the merge script will do that in the end for you. You can squash commits and force-push if you think it would help clean up your intent, but, often it's clearer to leave the review and commit history of your branch since the review comments go along with it. On Tue, Jul 28, 2015 at 9:46 PM, Meihua Wu rotationsymmetr...@gmail.com wrote: I am planning to update my PR to incorporate comments from reviewers. Do I need to rebase/squash the commits into a single one? Thanks! -MW - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: DataFrame#rdd doesn't respect DataFrame#cache, slowing down CrossValidator
Sweet! Does this cover DataFrame#rdd also using the cached query from DataFrame#cache? I think the ticket 9141 is mainly concerned with whether a derived DataFrame (B) of a cached DataFrame (A) uses the cached query of A, not whether the rdd from A.rdd or B.rdd uses the cached query of A. On Tue, Jul 28, 2015 at 11:33 PM Joseph Bradley jos...@databricks.com wrote: Thanks for bringing this up! I talked with Michael Armbrust, and it sounds like this is a from a bug in DataFrame caching: https://issues.apache.org/jira/browse/SPARK-9141 It's marked as a blocker for 1.5. Joseph On Tue, Jul 28, 2015 at 2:36 AM, Justin Uang justin.u...@gmail.com wrote: Hey guys, I'm running into some pretty bad performance issues when it comes to using a CrossValidator, because of caching behavior of DataFrames. The root of the problem is that while I have cached my DataFrame representing the features and labels, it is caching at the DataFrame level, while CrossValidator/LogisticRegression both drop down to the dataset.rdd level, which ignores the caching that I have previously done. This is worsened by the fact that for each combination of a fold and a param set from the grid, it recomputes my entire input dataset because the caching was lost. My current solution is to force the input DataFrame to be based off of a cached RDD, which I did with this horrible hack (had to drop down to java from the pyspark because of something to do with vectors not be inferred correctly): def checkpoint_dataframe_caching(df): return DataFrame(sqlContext._ssql_ctx.createDataFrame(df._jdf.rdd().cache(), train_data._jdf.schema()), sqlContext) before I pass it into the CrossValidator.fit(). If I do this, I still have to cache the underlying rdd once more than necessary (in addition to DataFrame#cache()), but at least in cross validation, it doesn't recompute the RDD graph anymore. Note, that input_df.rdd.cache() doesn't work because the python CrossValidator implementation applies some more dataframe transformations like filter, which then causes filtered_df.rdd to return a completely different rdd that recomputes the entire graph. Is it the intention of Spark SQL that calling DataFrame#rdd removes any caching that was done for the query? Is the fix as simple as getting the DataFrame#rdd to reference the cached query, or is there something more subtle going on. Best, Justin