Re: Minimum cost flow problem solving in Spark
You might be interested in "Maximum Flow implementation on Spark GraphX" done by a Colorado School of Mines grad student a couple of years ago. http://datascienceassn.org/2016-01-27-maximum-flow-implementation-spark-graphx From: Swapnil ShindeTo: user@spark.apache.org; d...@spark.apache.org Sent: Wednesday, September 13, 2017 9:41 AM Subject: Minimum cost flow problem solving in Spark Hello Has anyone used Spark to solve minimum cost flow problems in Spark? I am quite new to combinatorial optimization algorithms so any help or suggestions, libraries are very appreciated. Thanks Swapnil - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Shortest path with directed and weighted graphs
Chapter 6 of my book implements Dijkstra's Algorithm. The source code is available to download for free. https://www.manning.com/books/spark-graphx-in-action From: Brian WilsonTo: user@spark.apache.org Sent: Monday, October 24, 2016 7:11 AM Subject: Shortest path with directed and weighted graphs I have been looking at the ShortestPaths function inbuilt with Spark here. Am I correct in saying there is no support for weighted graphs with this function? By that I mean that it assumes all edges carry a weight = 1 Many thanks Brian
Re: GraphX drawing algorithm
In chapter 10 of Spark GraphX In Action, we describe how to use Zeppelin with d3.js to render graphs using d3's force-directed rendering algorithm. The source code can be downloaded for free from https://www.manning.com/books/spark-graphx-in-action From: agc studioTo: user@spark.apache.org Sent: Sunday, September 11, 2016 5:59 PM Subject: GraphX drawing algorithm Hi all, I was wondering if a force-directed graph drawing algorithm has been implemented for graphX? Thanks
Re: Where is DataFrame.scala in 2.0?
It's been reduced to a single line of code. http://technicaltidbit.blogspot.com/2016/03/dataframedataset-swap-places-in-spark-20.html From: Gerhard FiedlerTo: "dev@spark.apache.org" Sent: Friday, June 3, 2016 9:01 AM Subject: Where is DataFrame.scala in 2.0? When I look at the sources in Github, I see DataFrame.scala athttps://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala in the 1.6 branch. But when I change the branch to branch-2.0 or master, I get a 404 error. I also can’t find the file in the directory listings, for example https://github.com/apache/spark/tree/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql (for branch-2.0). It seems that quite a few APIs use the DataFrame class, even in 2.0. Can someone please point me to its location, or otherwise explain why it is not there? Thanks, Gerhard
Re: GraphX Java API
Yes, it is possible to use GraphX from Java but it requires 10x the amount of code and involves using obscure typing and pre-defined lambda prototype facilities. I give an example of it in my book, the source code for which can be downloaded for free from https://www.manning.com/books/spark-graphx-in-action The relevant example is EdgeCount.java in chapter 10. As I suggest in my book, likely the only reason you'd want to put yourself through that torture is corporate mandate or compatibility with Java bytecode tools. From: Sean OwenTo: Takeshi Yamamuro ; "Kumar, Abhishek (US - Bengaluru)" Cc: "user@spark.apache.org" Sent: Monday, May 30, 2016 7:07 AM Subject: Re: GraphX Java API No, you can call any Scala API in Java. It is somewhat less convenient if the method was not written with Java in mind but does work. On Mon, May 30, 2016, 00:32 Takeshi Yamamuro wrote: These package are used only for Scala. On Mon, May 30, 2016 at 2:23 PM, Kumar, Abhishek (US - Bengaluru) wrote: Hey,· I see some graphx packages listed here:http://spark.apache.org/docs/latest/api/java/index.html· org.apache.spark.graphx· org.apache.spark.graphx.impl· org.apache.spark.graphx.lib· org.apache.spark.graphx.utilAren’t they meant to be used with JAVA?Thanks From: Santoshakhilesh [mailto:santosh.akhil...@huawei.com] Sent: Friday, May 27, 2016 4:52 PM To: Kumar, Abhishek (US - Bengaluru) ; user@spark.apache.org Subject: RE: GraphX Java API GraphX APis are available only in Scala. If you need to use GraphX you need to switch to Scala. From: Kumar, Abhishek (US - Bengaluru) [mailto:abhishekkuma...@deloitte.com] Sent: 27 May 2016 19:59 To: user@spark.apache.org Subject: GraphX Java API Hi, We are trying to consume the Java API for GraphX, but there is no documentation available online on the usage or examples. It would be great if we could get some examples in Java. Thanks and regards, Abhishek Kumar This message (including any attachments) contains confidential information intended for a specific individual and purpose, and is protected by law. If you are not the intended recipient, you should delete this message and any disclosure, copying, or distribution of this message, or the taking of any action based on it, by you is strictly prohibited.v.E.1 -- --- Takeshi Yamamuro
Re: Adhoc queries on Spark 2.0 with Structured Streaming
At first glance, it looks like the only streaming data sources available out of the box from the github master branch are https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala and https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala . Out of the Jira epic for Structured Streaming https://issues.apache.org/jira/browse/SPARK-8360 it would seem the still-open https://issues.apache.org/jira/browse/SPARK-10815 "API design: data sources and sinks" is relevant here. In short, it would seem the code is not there yet to create a Kafka-fed Dataframe/Dataset that can be queried with Structured Streaming; or if it is, it's not obvious how to write such code. From: Anthony MayTo: Deepak Sharma ; Sunita Arvind Cc: "user@spark.apache.org" Sent: Friday, May 6, 2016 11:50 AM Subject: Re: Adhoc queries on Spark 2.0 with Structured Streaming Yeah, there isn't even a RC yet and no documentation but you can work off the code base and test suites: https://github.com/apache/spark And this might help: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala On Fri, 6 May 2016 at 11:07 Deepak Sharma wrote: Spark 2.0 is yet to come out for public release. I am waiting to get hands on it as well. Please do let me know if i can download source and build spark2.0 from github. Thanks Deepak On Fri, May 6, 2016 at 9:51 PM, Sunita Arvind wrote: Hi All, We are evaluating a few real time streaming query engines and spark is my personal choice. The addition of adhoc queries is what is getting me further excited about it, however the talks I have heard so far only mention about it but do not provide details. I need to build a prototype to ensure it works for our use cases. Can someone point me to relevant material for this. regards Sunita -- Thanks Deepak www.bigdatabig.com www.keosha.net
Re: Spark 2.0 forthcoming features
http://go.databricks.com/apache-spark-2.0-presented-by-databricks-co-founder-reynold-xin From: Sourav MazumderTo: user Sent: Wednesday, April 20, 2016 11:07 AM Subject: Spark 2.0 forthcoming features Hi All, Is there somewhere we can get idea of the upcoming features in Spark 2.0. I got a list for Spark ML from here https://issues.apache.org/jira/browse/SPARK-12626. Is there other links where I can similar enhancements planned for Sparl SQL, Spark Core, Spark Streaming. GraphX etc. ? Thanks in advance. Regards, Sourav
Re: Apache Flink
As with all history, "what if"s are not scientifically testable hypotheses, but my speculation is the energy (VCs, startups, big Internet companies, universities) within Silicon Valley contrasted to Germany. From: Mich Talebzadeh <mich.talebza...@gmail.com> To: Michael Malak <michaelma...@yahoo.com>; "user @spark" <user@spark.apache.org> Sent: Sunday, April 17, 2016 3:55 PM Subject: Re: Apache Flink Assuming that both Spark and Flink are contemporaries what are the reasons that Flink has not been adopted widely? (this may sound obvious and or prejudged). I mean Spark has surged in popularity in the past year if I am correct Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 22:49, Michael Malak <michaelma...@yahoo.com> wrote: In terms of publication date, a paper on Nephele was published in 2009, prior to the 2010 USENIX paper on Spark. Nephele is the execution engine of Stratosphere, which became Flink. From: Mark Hamstra <m...@clearstorydata.com> To: Mich Talebzadeh <mich.talebza...@gmail.com> Cc: Corey Nolet <cjno...@gmail.com>; "user @spark" <user@spark.apache.org> Sent: Sunday, April 17, 2016 3:30 PM Subject: Re: Apache Flink To be fair, the Stratosphere project from which Flink springs was started as a collaborative university research project in Germany about the same time that Spark was first released as Open Source, so they are near contemporaries rather than Flink having been started only well after Spark was an established and widely-used Apache project. On Sun, Apr 17, 2016 at 2:25 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: Also it always amazes me why they are so many tangential projects in Big Data space? Would not it be easier if efforts were spent on adding to Spark functionality rather than creating a new product like Flink? Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 21:08, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: Thanks Corey for the useful info. I have used Sybase Aleri and StreamBase as commercial CEPs engines. However, there does not seem to be anything close to these products in Hadoop Ecosystem. So I guess there is nothing there? Regards. Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 20:43, Corey Nolet <cjno...@gmail.com> wrote: i have not been intrigued at all by the microbatching concept in Spark. I am used to CEP in real streams processing environments like Infosphere Streams & Storm where the granularity of processing is at the level of each individual tuple and processing units (workers) can react immediately to events being received and processed. The closest Spark streaming comes to this concept is the notion of "state" that that can be updated via the "updateStateBykey()" functions which are only able to be run in a microbatch. Looking at the expected design changes to Spark Streaming in Spark 2.0.0, it also does not look like tuple-at-a-time processing is on the radar for Spark, though I have seen articles stating that more effort is going to go into the Spark SQL layer in Spark streaming which may make it more reminiscent of Esper. For these reasons, I have not even tried to implement CEP in Spark. I feel it's a waste of time without immediate tuple-at-a-time processing. Without this, they avoid the whole problem of "back pressure" (though keep in mind, it is still very possible to overload the Spark streaming layer with stages that will continue to pile up and never get worked off) but they lose the granular control that you get in CEP environments by allowing the rules & processors to react with the receipt of each tuple, right away. Awhile back, I did attempt to implement an InfoSphere Streams-like API [1] on top of Apache Storm as an example of what such a design may look like. It looks like Storm is going to be replaced in the not so distant future by Twitter's new design called Heron. IIRC, Heron does not have an open source implementation as of yet. [1] https://github.com/calrissian/flowmix On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: Hi Corey, Can you please point me to docs on using Spark for CEP? Do we have a set of CEP libraries somewhere. I am keen on getting hold of adaptor libraries for Spark something like below Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 16:07, Corey Nolet <cjno...@gma
Re: Apache Flink
There have been commercial CEP solutions for decades, including from my employer. From: Mich TalebzadehTo: Mark Hamstra Cc: Corey Nolet ; "user @spark" Sent: Sunday, April 17, 2016 3:48 PM Subject: Re: Apache Flink The problem is that the strength and wider acceptance of a typical Open source project is its sizeable user and development community. When the community is small like Flink, then it is not a viable solution to adopt I am rather disappointed that no big data project can be used for Complex Event Processing as it has wider use in Algorithmic trading among others. Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 22:30, Mark Hamstra wrote: To be fair, the Stratosphere project from which Flink springs was started as a collaborative university research project in Germany about the same time that Spark was first released as Open Source, so they are near contemporaries rather than Flink having been started only well after Spark was an established and widely-used Apache project. On Sun, Apr 17, 2016 at 2:25 PM, Mich Talebzadeh wrote: Also it always amazes me why they are so many tangential projects in Big Data space? Would not it be easier if efforts were spent on adding to Spark functionality rather than creating a new product like Flink? Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 21:08, Mich Talebzadeh wrote: Thanks Corey for the useful info. I have used Sybase Aleri and StreamBase as commercial CEPs engines. However, there does not seem to be anything close to these products in Hadoop Ecosystem. So I guess there is nothing there? Regards. Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 20:43, Corey Nolet wrote: i have not been intrigued at all by the microbatching concept in Spark. I am used to CEP in real streams processing environments like Infosphere Streams & Storm where the granularity of processing is at the level of each individual tuple and processing units (workers) can react immediately to events being received and processed. The closest Spark streaming comes to this concept is the notion of "state" that that can be updated via the "updateStateBykey()" functions which are only able to be run in a microbatch. Looking at the expected design changes to Spark Streaming in Spark 2.0.0, it also does not look like tuple-at-a-time processing is on the radar for Spark, though I have seen articles stating that more effort is going to go into the Spark SQL layer in Spark streaming which may make it more reminiscent of Esper. For these reasons, I have not even tried to implement CEP in Spark. I feel it's a waste of time without immediate tuple-at-a-time processing. Without this, they avoid the whole problem of "back pressure" (though keep in mind, it is still very possible to overload the Spark streaming layer with stages that will continue to pile up and never get worked off) but they lose the granular control that you get in CEP environments by allowing the rules & processors to react with the receipt of each tuple, right away. Awhile back, I did attempt to implement an InfoSphere Streams-like API [1] on top of Apache Storm as an example of what such a design may look like. It looks like Storm is going to be replaced in the not so distant future by Twitter's new design called Heron. IIRC, Heron does not have an open source implementation as of yet. [1] https://github.com/calrissian/flowmix On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh wrote: Hi Corey, Can you please point me to docs on using Spark for CEP? Do we have a set of CEP libraries somewhere. I am keen on getting hold of adaptor libraries for Spark something like below Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 16:07, Corey Nolet wrote: One thing I've noticed about Flink in my following of the project has been that it has established, in a few cases, some novel ideas and improvements over Spark. The problem with it, however, is that both the development team and the community around it are very small and many of those novel improvements have been rolled directly into Spark in subsequent versions. I was considering changing over my architecture to Flink at one point to get better, more real-time CEP streaming support, but in the end I
Re: Apache Flink
In terms of publication date, a paper on Nephele was published in 2009, prior to the 2010 USENIX paper on Spark. Nephele is the execution engine of Stratosphere, which became Flink. From: Mark HamstraTo: Mich Talebzadeh Cc: Corey Nolet ; "user @spark" Sent: Sunday, April 17, 2016 3:30 PM Subject: Re: Apache Flink To be fair, the Stratosphere project from which Flink springs was started as a collaborative university research project in Germany about the same time that Spark was first released as Open Source, so they are near contemporaries rather than Flink having been started only well after Spark was an established and widely-used Apache project. On Sun, Apr 17, 2016 at 2:25 PM, Mich Talebzadeh wrote: Also it always amazes me why they are so many tangential projects in Big Data space? Would not it be easier if efforts were spent on adding to Spark functionality rather than creating a new product like Flink? Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 21:08, Mich Talebzadeh wrote: Thanks Corey for the useful info. I have used Sybase Aleri and StreamBase as commercial CEPs engines. However, there does not seem to be anything close to these products in Hadoop Ecosystem. So I guess there is nothing there? Regards. Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 20:43, Corey Nolet wrote: i have not been intrigued at all by the microbatching concept in Spark. I am used to CEP in real streams processing environments like Infosphere Streams & Storm where the granularity of processing is at the level of each individual tuple and processing units (workers) can react immediately to events being received and processed. The closest Spark streaming comes to this concept is the notion of "state" that that can be updated via the "updateStateBykey()" functions which are only able to be run in a microbatch. Looking at the expected design changes to Spark Streaming in Spark 2.0.0, it also does not look like tuple-at-a-time processing is on the radar for Spark, though I have seen articles stating that more effort is going to go into the Spark SQL layer in Spark streaming which may make it more reminiscent of Esper. For these reasons, I have not even tried to implement CEP in Spark. I feel it's a waste of time without immediate tuple-at-a-time processing. Without this, they avoid the whole problem of "back pressure" (though keep in mind, it is still very possible to overload the Spark streaming layer with stages that will continue to pile up and never get worked off) but they lose the granular control that you get in CEP environments by allowing the rules & processors to react with the receipt of each tuple, right away. Awhile back, I did attempt to implement an InfoSphere Streams-like API [1] on top of Apache Storm as an example of what such a design may look like. It looks like Storm is going to be replaced in the not so distant future by Twitter's new design called Heron. IIRC, Heron does not have an open source implementation as of yet. [1] https://github.com/calrissian/flowmix On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh wrote: Hi Corey, Can you please point me to docs on using Spark for CEP? Do we have a set of CEP libraries somewhere. I am keen on getting hold of adaptor libraries for Spark something like below Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 April 2016 at 16:07, Corey Nolet wrote: One thing I've noticed about Flink in my following of the project has been that it has established, in a few cases, some novel ideas and improvements over Spark. The problem with it, however, is that both the development team and the community around it are very small and many of those novel improvements have been rolled directly into Spark in subsequent versions. I was considering changing over my architecture to Flink at one point to get better, more real-time CEP streaming support, but in the end I decided to stick with Spark and just watch Flink continue to pressure it into improvement. On Sun, Apr 17, 2016 at 11:03 AM, Koert Kuipers wrote: i never found much info that flink was actually designed to be fault tolerant. if fault tolerance is more bolt-on/add-on/afterthought then that doesn't bode well for large scale data processing. spark was designed with fault tolerance in mind from the beginning. On Sun, Apr 17, 2016 at 9:52 AM, Mich Talebzadeh
Re: [discuss] using deep learning to improve Spark
I see you've been burning the midnight oil. From: Reynold XinTo: "dev@spark.apache.org" Sent: Friday, April 1, 2016 1:15 AM Subject: [discuss] using deep learning to improve Spark Hi all, Hope you all enjoyed the Tesla 3 unveiling earlier tonight. I'd like to bring your attention to a project called DeepSpark that we have been working on for the past three years. We realized that scaling software development was challenging. A large fraction of software engineering has been manual and mundane: writing test cases, fixing bugs, implementing features according to specs, and reviewing pull requests. So we started this project to see how much we could automate. After three years of development and one year of testing, we now have enough confidence that this could work well in practice. For example, Matei confessed to me today: "It looks like DeepSpark has a better understanding of Spark internals than I ever will. It updated several pieces of code I wrote long ago that even I no longer understood.” I think it's time to discuss as a community about how we want to continue this project to ensure Spark is stable, secure, and easy to use yet able to progress as fast as possible. I'm still working on a more formal design doc, and it might take a little bit more time since I haven't been able to fully grasp DeepSpark's capabilities yet. Based on my understanding right now, I've written a blog post about DeepSpark here: https://databricks.com/blog/2016/04/01/unreasonable-effectiveness-of-deep-learning-on-spark.html Please take a look and share your thoughts. Obviously, this is an ambitious project and could take many years to fully implement. One major challenge is cost. The current Spark Jenkins infrastructure provided by the AMPLab has only 8 machines, but DeepSpark uses 12000 machines. I'm not sure whether AMPLab or Databricks can fund DeepSpark's operation for a long period of time. Perhaps AWS can help out here. Let me know if you have other ideas.
Re: Spark with Druid
Will Spark 2.0 Structured Streaming obviate some of the Druid/Spark use cases? From: Raymond HonderdorsTo: "yuzhih...@gmail.com" Cc: "user@spark.apache.org" Sent: Wednesday, March 23, 2016 8:43 AM Subject: Re: Spark with Druid I saw these but i fail to understand how to direct the code to use rhe index json Sent from Outlook Mobile On Wed, Mar 23, 2016 at 7:19 AM -0700, "Ted Yu" wrote: Please see: https://www.linkedin.com/pulse/combining-druid-spark-interactive-flexible-analytics-scale-butani which references https://github.com/SparklineData/spark-druid-olap On Wed, Mar 23, 2016 at 5:59 AM, Raymond Honderdors wrote: Does anyone have a good overview on how to integrate Spark and Druid? I am now struggling with the creation of a druid data source in spark. Raymond HonderdorsTeam Lead Analytics BIBusiness Intelligence developerraymond.honderd...@sizmek.comt +972.7325.3569Herzliya
Re: [discuss] DataFrame vs Dataset in Spark 2.0
Would it make sense (in terms of feasibility, code organization, and politically) to have a JavaDataFrame, as a way to isolate the 1000+ extra lines to a Java compatibility layer/class? From: Reynold XinTo: "dev@spark.apache.org" Sent: Thursday, February 25, 2016 4:23 PM Subject: [discuss] DataFrame vs Dataset in Spark 2.0 When we first introduced Dataset in 1.6 as an experimental API, we wanted to merge Dataset/DataFrame but couldn't because we didn't want to break the pre-existing DataFrame API (e.g. map function should return Dataset, rather than RDD). In Spark 2.0, one of the main API changes is to merge DataFrame and Dataset. Conceptually, DataFrame is just a Dataset[Row]. In practice, there are two ways to implement this: Option 1. Make DataFrame a type alias for Dataset[Row] Option 2. DataFrame as a concrete class that extends Dataset[Row] I'm wondering what you think about this. The pros and cons I can think of are: Option 1. Make DataFrame a type alias for Dataset[Row] + Cleaner conceptually, especially in Scala. It will be very clear what libraries or applications need to do, and we won't see type mismatches (e.g. a function expects DataFrame, but user is passing in Dataset[Row] + A lot less code- Breaks source compatibility for the DataFrame API in Java, and binary compatibility for Scala/Java Option 2. DataFrame as a concrete class that extends Dataset[Row] The pros/cons are basically the inverse of Option 1. + In most cases, can maintain source compatibility for the DataFrame API in Java, and binary compatibility for Scala/Java- A lot more code (1000+ loc)- Less cleaner, and can be confusing when users pass in a Dataset[Row] into a function that expects a DataFrame The concerns are mostly with Scala/Java. For Python, it is very easy to maintain source compatibility for both (there is no concept of binary compatibility), and for R, we are only supporting the DataFrame operations anyway because that's more familiar interface for R users outside of Spark.
[jira] [Commented] (SPARK-3789) [GRAPHX] Python bindings for GraphX
[ https://issues.apache.org/jira/browse/SPARK-3789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14990391#comment-14990391 ] Michael Malak commented on SPARK-3789: -- My publisher tells me the MEAP for Spark GraphX In Action has been picking up lately. And just two weeks ago Ion Stoica announced at GraphConnect "We look forward to bringing Cypher's graph pattern matching capabilities into the Spark stack, making graph querying more accessible to the masses" http://www.reuters.com/article/2015/10/21/idUSnMKWNtdbfa+1e0+MKW20151021 . So one has to wonder whether Ion was referring to GraphX or some other graph technology. > [GRAPHX] Python bindings for GraphX > --- > > Key: SPARK-3789 > URL: https://issues.apache.org/jira/browse/SPARK-3789 > Project: Spark > Issue Type: New Feature > Components: GraphX, PySpark >Reporter: Ameet Talwalkar >Assignee: Kushal Datta > Attachments: PyGraphX_design_doc.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-11278) PageRank fails with unified memory manager
[ https://issues.apache.org/jira/browse/SPARK-11278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Malak updated SPARK-11278: -- Component/s: GraphX > PageRank fails with unified memory manager > -- > > Key: SPARK-11278 > URL: https://issues.apache.org/jira/browse/SPARK-11278 > Project: Spark > Issue Type: Bug > Components: GraphX, Spark Core >Affects Versions: 1.5.2, 1.6.0 >Reporter: Nishkam Ravi > > PageRank (6-nodes, 32GB input) runs very slow and eventually fails with > ExecutorLostFailure. Traced it back to the 'unified memory manager' commit > from Oct 13th. Took a quick look at the code and couldn't see the problem > (changes look pretty good). cc'ing [~andrewor14][~vanzin] who may be able to > spot the problem quickly. Can be reproduced by running PageRank on a large > enough input dataset if needed. Sorry for not being of much help here. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2365) Add IndexedRDD, an efficient updatable key-value store
[ https://issues.apache.org/jira/browse/SPARK-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14950679#comment-14950679 ] Michael Malak commented on SPARK-2365: -- It's off-topic of IndexedRDD, but you can have a look at AMPLab's experimental/alpha Succinct, which is a key/value store over Tachyon. https://github.com/amplab/succinct . I haven't tried it myself. > Add IndexedRDD, an efficient updatable key-value store > -- > > Key: SPARK-2365 > URL: https://issues.apache.org/jira/browse/SPARK-2365 > Project: Spark > Issue Type: New Feature > Components: GraphX, Spark Core >Reporter: Ankur Dave >Assignee: Ankur Dave > Attachments: 2014-07-07-IndexedRDD-design-review.pdf > > > RDDs currently provide a bulk-updatable, iterator-based interface. This > imposes minimal requirements on the storage layer, which only needs to > support sequential access, enabling on-disk and serialized storage. > However, many applications would benefit from a richer interface. Efficient > support for point lookups would enable serving data out of RDDs, but it > currently requires iterating over an entire partition to find the desired > element. Point updates similarly require copying an entire iterator. Joins > are also expensive, requiring a shuffle and local hash joins. > To address these problems, we propose IndexedRDD, an efficient key-value > store built on RDDs. IndexedRDD would extend RDD[(Long, V)] by enforcing key > uniqueness and pre-indexing the entries for efficient joins and point > lookups, updates, and deletions. > It would be implemented by (1) hash-partitioning the entries by key, (2) > maintaining a hash index within each partition, and (3) using purely > functional (immutable and efficiently updatable) data structures to enable > efficient modifications and deletions. > GraphX would be the first user of IndexedRDD, since it currently implements a > limited form of this functionality in VertexRDD. We envision a variety of > other uses for IndexedRDD, including streaming updates to RDDs, direct > serving from RDDs, and as an execution strategy for Spark SQL. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10939) Misaligned data with RDD.zip after repartition
[ https://issues.apache.org/jira/browse/SPARK-10939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948596#comment-14948596 ] Michael Malak commented on SPARK-10939: --- Here Matei explains the explicit design decision to prefer shuffle performance arising from randomization over deterministic RDD computation: https://issues.apache.org/jira/browse/SPARK-3098?focusedCommentId=14110183=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14110183 It has made it into the documentation (though perhaps not clearly enough, especially regarding the rationale): https://issues.apache.org/jira/browse/SPARK-3356 https://github.com/apache/spark/pull/2508/files > Misaligned data with RDD.zip after repartition > -- > > Key: SPARK-10939 > URL: https://issues.apache.org/jira/browse/SPARK-10939 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.0, 1.4.1, 1.5.0 > Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5 > - Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5 >Reporter: Dan Brown > > Split out from https://issues.apache.org/jira/browse/SPARK-10685: > Here's a weird behavior where {{RDD.zip}} after a {{repartition}} produces > "misaligned" data, meaning different column values in the same row aren't > matched, as if a zip shuffled the collections before zipping them. It's > difficult to reproduce because it's nondeterministic, doesn't occur in local > mode, and requires ≥2 workers (≥3 in one case). I was able to repro it using > pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), and 1.5.0 > (bin-without-hadoop). > Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying > to build it ourselves when we ran into this problem. Let me put in my vote > for reopening the issue and supporting {{DataFrame.zip}} in the standard lib. > - https://issues.apache.org/jira/browse/SPARK-7460 > h3. Repro > Fail: RDD.zip after repartition > {code} > df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(1)) > df = df.repartition(100) > rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, > b=y.b)) > [r for r in rdd.collect() if r.a != r.b][:3] # Should be [] > {code} > Sample outputs (nondeterministic): > {code} > [] > [Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)] > [] > [] > [Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)] > [] > {code} > Test setup: > - local\[8]: {{MASTER=local\[8]}} > - dist\[N]: 1 driver + 1 master + N workers > {code} > "Fail" tests pass? cluster mode spark version > > yes local[8] 1.3.0-cdh5.4.5 > no dist[4] 1.3.0-cdh5.4.5 > yes local[8] 1.4.1 > yes dist[1] 1.4.1 > no dist[2] 1.4.1 > no dist[4] 1.4.1 > yes local[8] 1.5.0 > yes dist[1] 1.5.0 > no dist[2] 1.5.0 > no dist[4] 1.5.0 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10972) UDFs in SQL joins
Michael Malak created SPARK-10972: - Summary: UDFs in SQL joins Key: SPARK-10972 URL: https://issues.apache.org/jira/browse/SPARK-10972 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 1.5.1 Reporter: Michael Malak Currently expressions used to .join() in DataFrames are limited to column names plus the operators exposed in org.apache.spark.sql.Column. It would be nice to be able to .join() based on a UDF, such as, say, euclideanDistance() < 0.1. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10972) UDFs in SQL joins
[ https://issues.apache.org/jira/browse/SPARK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Malak updated SPARK-10972: -- Description: Currently expressions used to .join() in DataFrames are limited to column names plus the operators exposed in org.apache.spark.sql.Column. It would be nice to be able to .join() based on a UDF, such as, say, euclideanDistance(col1, col2) < 0.1. was: Currently expressions used to .join() in DataFrames are limited to column names plus the operators exposed in org.apache.spark.sql.Column. It would be nice to be able to .join() based on a UDF, such as, say, euclideanDistance() < 0.1. > UDFs in SQL joins > - > > Key: SPARK-10972 > URL: https://issues.apache.org/jira/browse/SPARK-10972 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 1.5.1 >Reporter: Michael Malak > > Currently expressions used to .join() in DataFrames are limited to column > names plus the operators exposed in org.apache.spark.sql.Column. > It would be nice to be able to .join() based on a UDF, such as, say, > euclideanDistance(col1, col2) < 0.1. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10722) Uncaught exception: RDDBlockId not found in driver-heartbeater
[ https://issues.apache.org/jira/browse/SPARK-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14909883#comment-14909883 ] Michael Malak commented on SPARK-10722: --- I have seen this in a small Hello World type program compiled and run from sbt that reads a large text file and calls .cache(). But if instead I do sbt package and then spark-submit (instead of just sbt run), it works. That suggests there may be some dependency omitted from Artifactory for spark-core but that is in spark-assembly. This link suggests slf4j-simple.jar, but adding that to my .sbt didn't help. https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/spark-Exception-in-thread-quot-main-quot-java-lang/td-p/19544 Googling, it seems the problem is more commonly encountered while running unit tests during the build of Spark itself. > Uncaught exception: RDDBlockId not found in driver-heartbeater > -- > > Key: SPARK-10722 > URL: https://issues.apache.org/jira/browse/SPARK-10722 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 1.3.1, 1.4.1, 1.5.0 >Reporter: Simeon Simeonov > > Some operations involving cached RDDs generate an uncaught exception in > driver-heartbeater. If the {{.cache()}} call is removed, processing happens > without the exception. However, not all RDDs trigger the problem, i.e., some > {{.cache()}} operations are fine. > I can see the problem with 1.4.1 and 1.5.0 but I have not been able to create > a reproducible test case. The same exception is [reported on > SO|http://stackoverflow.com/questions/31280355/spark-test-on-local-machine] > for v1.3.1 but the behavior is related to large broadcast variables. > The full stack trace is: > {code} > 15/09/20 22:10:08 ERROR Utils: Uncaught exception in thread driver-heartbeater > java.io.IOException: java.lang.ClassNotFoundException: > org.apache.spark.storage.RDDBlockId > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) > at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at org.apache.spark.util.Utils$.deserialize(Utils.scala:91) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:440) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:430) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:430) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:428) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:472) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) > at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:472) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$
[jira] [Commented] (SPARK-10489) GraphX dataframe wrapper
[ https://issues.apache.org/jira/browse/SPARK-10489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14739681#comment-14739681 ] Michael Malak commented on SPARK-10489: --- Feynman Liang: Link https://github.com/databricks/spark-df-graph seems to be broken? > GraphX dataframe wrapper > > > Key: SPARK-10489 > URL: https://issues.apache.org/jira/browse/SPARK-10489 > Project: Spark > Issue Type: New Feature > Components: GraphX >Reporter: Feynman Liang > > We want to wrap GraphX Graph using DataFrames and implement basic high-level > algorithms like PageRank. Then we can easily implement Python API, > import/export, and other features. > {code} > val graph = new GraphF(vDF, eDF) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: Build k-NN graph for large dataset
Yes. And a paper that describes using grids (actually varying grids) is http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf In the Spark GraphX In Action book that Robin East and I are writing, we implement a drastically simplified version of this in chapter 7, which should become available in the MEAP mid-September. http://www.manning.com/books/spark-graphx-in-action From: Kristina Rogale Plazonic kpl...@gmail.com To: Jaonary Rabarisoa jaon...@gmail.com Cc: user user@spark.apache.org Sent: Wednesday, August 26, 2015 7:24 AM Subject: Re: Build k-NN graph for large dataset If you don't want to compute all N^2 similarities, you need to implement some kind of blocking first. For example, LSH (locally sensitive hashing). A quick search gave this link to a Spark implementation: http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm trying to find an efficient way to build a k-NN graph for a large dataset. Precisely, I have a large set of high dimensional vector (say d 1) and I want to build a graph where those high dimensional points are the vertices and each one is linked to the k-nearest neighbor based on some kind similarity defined on the vertex spaces. My problem is to implement an efficient algorithm to compute the weight matrix of the graph. I need to compute a N*N similarities and the only way I know is to use cartesian operation follow by map operation on RDD. But, this is very slow when the N is large. Is there a more cleaver way to do this for an arbitrary similarity function ? Cheers, Jao
RE: What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce?
I would also add, from a data locality theoretic standpoint, mapPartitions() provides for node-local computation that plain old map-reduce does not. From my Android phone on T-Mobile. The first nationwide 4G network. Original message From: Ashic Mahtab as...@live.com Date: 06/28/2015 10:51 AM (GMT-07:00) To: YaoPau jonrgr...@gmail.com,Apache Spark user@spark.apache.org Subject: RE: What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce? Spark comes with quite a few components. At it's core is..surprisespark core. This provides the core things required to run spark jobs. Spark provides a lot of operators out of the box...take a look at https://spark.apache.org/docs/latest/programming-guide.html#transformations https://spark.apache.org/docs/latest/programming-guide.html#actions While all of them can be implemented with variations of rd.map().reduce(), there are optimisations to be gained in terms of data locality, etc., and the additional operators simply make life simpler. In addition to the core stuff, spark also brings things like Spark Streaming, Spark Sql and data frames, MLLib, GraphX, etc. Spark Streaming gives you microbatches of rdds at periodic intervals.Think give me the last 15 seconds of events every 5 seconds. You can then program towards the small collection, and the job will run in a fault tolerant manner on your cluster. Spark Sql provides hive like functionality that works nicely with various data sources, and RDDs. MLLib provide a lot of oob machine learning algorithms, and the new Spark ML project provides a nice elegant pipeline api to take care of a lot of common machine learning tasks. GraphX allows you to represent data in graphs, and run graph algorithms on it. e.g. you can represent your data as RDDs of vertexes and edges, and run pagerank on a distributed cluster. And there's moreso, yeah...Spark is definitely not just MapReduce. :) Date: Sun, 28 Jun 2015 09:13:18 -0700 From: jonrgr...@gmail.com To: user@spark.apache.org Subject: What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce? I've heard Spark is not just MapReduce mentioned during Spark talks, but it seems like every method that Spark has is really doing something like (Map - Reduce) or (Map - Map - Map - Reduce) etc behind the scenes, with the performance benefit of keeping RDDs in memory between stages. Am I wrong about that? Is Spark doing anything more efficiently than a series of Maps followed by a Reduce in memory? What methods does Spark have that can't easily be mapped (with somewhat similar efficiency) to Map and Reduce in memory? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-is-not-just-MapReduce-mean-Isn-t-every-Spark-job-a-form-of-MapReduce-tp23518.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why Spark is much faster than Hadoop MapReduce even on disk
http://www.datascienceassn.org/content/making-sense-making-sense-performance-data-analytics-frameworks From: bit1...@163.com bit1...@163.com To: user user@spark.apache.org Sent: Monday, April 27, 2015 8:33 PM Subject: Why Spark is much faster than Hadoop MapReduce even on disk #yiv1713360705 body {line-height:1.5;}#yiv1713360705 body {font-size:10.5pt;color:rgb(0, 0, 0);line-height:1.5;}Hi, I am frequently asked why spark is also much faster than Hadoop MapReduce on disk (without the use of memory cache). I have no convencing answer for this question, could you guys elaborate on this? Thanks!
Re: How to restrict foreach on a streaming RDD only once upon receiver completion
You could have your receiver send a magic value when it is done. I discuss this Spark Streaming pattern in my presentation Spark Gotchas and Anti-Patterns. In the PDF version, it's slides 34-36.http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language YouTube version cued to that place: http://www.youtube.com/watch?v=W5Uece_JmNst=23m18s From: Hari Polisetty hpoli...@icloud.com To: Tathagata Das t...@databricks.com Cc: user user@spark.apache.org Sent: Monday, April 6, 2015 2:02 PM Subject: Re: How to restrict foreach on a streaming RDD only once upon receiver completion Yes, I’m using updateStateByKey and it works. But then I need to perform further computation on this Stateful RDD (see code snippet below). I perform forEach on the final RDD and get the top 10 records. I just don’t want the foreach to be performed every time a new batch is received. Only when the receiver is done fetching all the records. My requirements are to programmatically invoke the E.S query (it varies by usecase) , get all the records and apply certain transformations and get the top 10 results based on certain criteria back into the driver program for further processing. I’m able to apply the transformations on the batches of records fetched from E.S using streaming. So, I don’t need to wait for all the records to be fetched. The RDD transformations are happening all the time and the top k results are getting updated constantly until all the records are fetched by the receiver. Is there any drawback with this approach? Can you give more pointers on what you mean by creating a custom RDD that reads from ElasticSearch? Here is the relevant portion of my Spark streaming code: //Create a custom streaming receiver to query for relevant data from E.S JavaReceiverInputDStreamString jsonStrings = ssc.receiverStream( new ElasticSearchResponseReceiver(query…….)); //Apply JSON Paths to extract specific value(s) from each record JavaDStreamString fieldVariations = jsonStrings.flatMap(new FlatMapFunctionString, String() { private static final long serialVersionUID = 465237345751948L; @Override public IterableString call(String jsonString) { ListString r = JsonPath.read(jsonString, attributeDetail.getJsonPath()); return r; } }); //Perform a stateful map reduce on each variation JavaPairDStreamString, Integer fieldVariationCounts = fieldVariations.mapToPair( new PairFunctionString, String, Integer() { private static final long serialVersionUID = -1241276515559408238L; @Override public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }).updateStateByKey(new Function2ListInteger, OptionalInteger, OptionalInteger () { private static final long serialVersionUID = 7598681835161199865L; public OptionalInteger call(ListInteger nums, OptionalInteger current) { Integer sum = current.or((int) 0L); return (OptionalInteger) Optional.of(sum + nums.size()); } }).reduceByKey(new Function2Integer, Integer, Integer() { private static final long serialVersionUID = -5906059838295609562L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //Swap the Map from Enum String,Int to Int,Enum String. This is so that we can sort on frequencies JavaPairDStreamInteger, String swappedPair = fieldVariationCounts.mapToPair(new PairFunctionTuple2String, Integer, Integer, String() { private static final long serialVersionUID = -5889774695187619957L; @Override public Tuple2Integer, String call(Tuple2String, Integer item) throws Exception { return item.swap(); } }); //Sort based on Key i.e, frequency JavaPairDStreamInteger, String sortedCounts = swappedPair.transformToPair( new FunctionJavaPairRDDInteger, String, JavaPairRDDInteger, String() { private static final long serialVersionUID = -4172702039963232779L; public JavaPairRDDInteger, String call(JavaPairRDDInteger, String in) throws Exception { //False to denote sort in descending order return in.sortByKey(false); } }); //Iterate through the RDD and get the top 20 values in the sorted pair and write to results list sortedCounts.foreach( new FunctionJavaPairRDDInteger, String, Void () { private static final long serialVersionUID = 2186144129973051920L; public Void call(JavaPairRDDInteger, String rdd) { resultList.clear(); for (Tuple2Integer, String t: rdd.take(MainDriver.NUMBER_OF_TOP_VARIATIONS)) { resultList.add(new Tuple3String,Integer, Double(t._2(), t._1(), (double) (100*t._1())/totalProcessed.value())); } return null; } } ); On Apr 7, 2015, at 1:14 AM, Tathagata Das t...@databricks.com wrote: So you want to sort based on the total count of the all the records received through receiver? In that case, you have to combine all the counts using updateStateByKey (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala) But stepping back, if you want to
[jira] [Created] (SPARK-6710) Wrong initial bias in GraphX SVDPlusPlus
Michael Malak created SPARK-6710: Summary: Wrong initial bias in GraphX SVDPlusPlus Key: SPARK-6710 URL: https://issues.apache.org/jira/browse/SPARK-6710 Project: Spark Issue Type: Bug Components: GraphX Affects Versions: 1.3.0 Reporter: Michael Malak In the initialization portion of GraphX SVDPlusPluS, the initialization of biases appears to be incorrect. Specifically, in line https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala#L96 instead of (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) it should probably be (vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 / scala.math.sqrt(msg.get._1)) That is, the biases bu and bi (both represented as the third component of the Tuple4[] above, depending on whether the vertex is a user or an item), described in equation (1) of the Koren paper, are supposed to be small offsets to the mean (represented by the variable u, signifying the Greek letter mu) to account for peculiarities of individual users and items. Initializing these biases to wrong values should theoretically not matter given enough iterations of the algorithm, but some quick empirical testing shows it has trouble converging at all, even after many orders of magnitude additional iterations. This perhaps could be the source of previously reported trouble with SVDPlusPlus. http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-SVDPlusPlus-problem-td12885.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Wrong initial bias in GraphX SVDPlusPlus?
I believe that in the initialization portion of GraphX SVDPlusPluS, the initialization of biases is incorrect. Specifically, in line https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala#L96 instead of (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) it should be (vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 / scala.math.sqrt(msg.get._1)) That is, the biases bu and bi (both represented as the third component of the Tuple4[] above, depending on whether the vertex is a user or an item), described in equation (1) of the Koren paper, are supposed to be small offsets to the mean (represented by the variable u, signifying the Greek letter mu) to account for peculiarities of individual users and items. Initializing these biases to wrong values should theoretically not matter given enough iterations of the algorithm, but some quick empirical testing shows it has trouble converging at all, even after many orders of magnitude additional iterations. This perhaps could be the source of previously reported trouble with SVDPlusPlus. http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-SVDPlusPlus-problem-td12885.html If after a day, no one tells me I'm crazy here, I'll go ahead and create a Jira ticket. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Spark GraphX In Action on documentation page?
Can my new book, Spark GraphX In Action, which is currently in MEAP http://manning.com/malak/, be added to https://spark.apache.org/documentation.html and, if appropriate, to https://spark.apache.org/graphx/ ? Michael Malak - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[jira] [Commented] (SPARK-6388) Spark 1.3 + Hadoop 2.6 Can't work on Java 8_40
[ https://issues.apache.org/jira/browse/SPARK-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14365758#comment-14365758 ] Michael Malak commented on SPARK-6388: -- Isn't it Hadoop 2.7 that is supposed to provide Java 8 compatibility? Spark 1.3 + Hadoop 2.6 Can't work on Java 8_40 -- Key: SPARK-6388 URL: https://issues.apache.org/jira/browse/SPARK-6388 Project: Spark Issue Type: Bug Components: Block Manager, Spark Submit, YARN Affects Versions: 1.3.0 Environment: 1. Linux version 3.16.0-30-generic (buildd@komainu) (gcc version 4.9.1 (Ubuntu 4.9.1-16ubuntu6) ) #40-Ubuntu SMP Mon Jan 12 22:06:37 UTC 2015 2. Oracle Java 8 update 40 for Linux X64 3. Scala 2.10.5 4. Hadoop 2.6 (pre-build version) Reporter: John Original Estimate: 24h Remaining Estimate: 24h I build Apache Spark 1.3 munally. --- JAVA_HOME=PATH_TO_JAVA8 mvn clean package -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests --- Something goes wrong, akka always tell me --- 15/03/17 21:28:10 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@Server2:42161] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. --- I build another version of Spark 1.3 + Hadoop 2.6 under Java 7. Everything goes well. Logs --- 15/03/17 21:27:06 INFO spark.SparkContext: Running Spark version 1.3.0 15/03/17 21:27:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/17 21:27:08 INFO spark.SecurityManager: Changing view Servers to: hduser 15/03/17 21:27:08 INFO spark.SecurityManager: Changing modify Servers to: hduser 15/03/17 21:27:08 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui Servers disabled; users with view permissions: Set(hduser); users with modify permissions: Set(hduser) 15/03/17 21:27:08 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/03/17 21:27:08 INFO Remoting: Starting remoting 15/03/17 21:27:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@Server3:37951] 15/03/17 21:27:09 INFO util.Utils: Successfully started service 'sparkDriver' on port 37951. 15/03/17 21:27:09 INFO spark.SparkEnv: Registering MapOutputTracker 15/03/17 21:27:09 INFO spark.SparkEnv: Registering BlockManagerMaster 15/03/17 21:27:09 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-0db692bb-cd02-40c8-a8f0-3813c6da18e2/blockmgr-a1d0ad23-ab76-4177-80a0-a6f982a64d80 15/03/17 21:27:09 INFO storage.MemoryStore: MemoryStore started with capacity 265.1 MB 15/03/17 21:27:09 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-502ef3f8-b8cd-45cf-b1df-97df297cdb35/httpd-6303e24d-4b2b-4614-bb1d-74e8d331189b 15/03/17 21:27:09 INFO spark.HttpServer: Starting HTTP Server 15/03/17 21:27:09 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/17 21:27:10 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48000 15/03/17 21:27:10 INFO util.Utils: Successfully started service 'HTTP file server' on port 48000. 15/03/17 21:27:10 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/03/17 21:27:10 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/17 21:27:10 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/03/17 21:27:10 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/03/17 21:27:10 INFO ui.SparkUI: Started SparkUI at http://Server3:4040 15/03/17 21:27:10 INFO spark.SparkContext: Added JAR file:/home/hduser/spark-java2.jar at http://192.168.11.42:48000/jars/spark-java2.jar with timestamp 1426598830307 15/03/17 21:27:10 INFO client.RMProxy: Connecting to ResourceManager at Server3/192.168.11.42:8050 15/03/17 21:27:11 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers 15/03/17 21:27:11 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 15/03/17 21:27:11 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/03/17 21:27:11 INFO yarn.Client: Setting up container launch context for our AM 15/03/17 21:27:11 INFO yarn.Client: Preparing resources for our AM container 15/03/17 21:27:12 INFO yarn.Client: Uploading resource file:/home/hduser/spark-1.3.0/assembly/target/scala-2.10/spark-assembly-1.3.0-hadoop2.6.0.jar - hdfs://Server3:9000/user/hduser/.sparkStaging/application_1426595477608_0002/spark-assembly-1.3.0-hadoop2.6.0.jar 15/03/17 21:27:21 INFO yarn.Client: Setting up the launch environment for our
textFile() ordering and header rows
Since RDDs are generally unordered, aren't things like textFile().first() not guaranteed to return the first row (such as looking for a header row)? If so, doesn't that make the example in http://spark.apache.org/docs/1.2.1/quick-start.html#basics misleading? - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[jira] [Commented] (SPARK-4279) Implementing TinkerPop on top of GraphX
[ https://issues.apache.org/jira/browse/SPARK-4279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14309459#comment-14309459 ] Michael Malak commented on SPARK-4279: -- Is there another place where I might be able to track progress on this -- either another Jira or on a mailing list? Implementing TinkerPop on top of GraphX --- Key: SPARK-4279 URL: https://issues.apache.org/jira/browse/SPARK-4279 Project: Spark Issue Type: New Feature Components: GraphX Reporter: Brennon York Priority: Minor [TinkerPop|https://github.com/tinkerpop] is a great abstraction for graph databases and has been implemented across various graph database backends. Has anyone thought about integrating the TinkerPop framework with GraphX to enable GraphX as another backend? Not sure if this has been brought up or not, but would certainly volunteer to spearhead this effort if the community thinks it to be a good idea. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Word2Vec IndexedRDD
1. Is IndexedRDD planned for 1.3? https://issues.apache.org/jira/browse/SPARK-2365 2. Once IndexedRDD is in, is it planned to convert Word2VecModel to it from its current Map[String,Array[Float]]? https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala#L425 - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: spark challenge: zip with next???
But isn't foldLeft() overkill for the originally stated use case of max diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative non-associative accumulation as opposed to an embarrassingly parallel operation such as this one? This use case reminds me of FIR filtering in DSP. It seems that RDDs could use something that serves the same purpose as scala.collection.Iterator.sliding. From: Koert Kuipers ko...@tresata.com To: Mohit Jaggi mohitja...@gmail.com Cc: Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; user@spark.apache.org user@spark.apache.org Sent: Friday, January 30, 2015 7:11 AM Subject: Re: spark challenge: zip with next??? assuming the data can be partitioned then you have many timeseries for which you want to detect potential gaps. also assuming the resulting gaps info per timeseries is much smaller data then the timeseries data itself, then this is a classical example to me of a sorted (streaming) foldLeft, requiring an efficient secondary sort in the spark shuffle. i am trying to get that into spark here: https://issues.apache.org/jira/browse/SPARK-3655 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote: http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E you can use the MLLib function or do the following (which is what I had done): - in first pass over the data, using mapPartitionWithIndex, gather the first item in each partition. you can use collect (or aggregator) for this. “key” them by the partition index. at the end, you will have a map (partition index) -- first item- in the second pass over the data, using mapPartitionWithIndex again, look at two (or in the general case N items at a time, you can use scala’s sliding iterator) items at a time and check the time difference(or any sliding window computation). To this mapParitition, pass the map created in previous step. You will need to use them to check the last item in this partition. If you can tolerate a few inaccuracies then you can just do the second step. You will miss the “boundaries” of the partitions but it might be acceptable for your use case. On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Does that work? I recently tried something to compute differences between each entry and the next, so I did val rdd1 = ... // null element + rdd val rdd2 = ... // rdd + null elementbut got an error message about zip requiring data sizes in each partition to match. Tobias
Re: renaming SchemaRDD - DataFrame
I personally have no preference DataFrame vs. DataTable, but only wish to lay out the history and etymology simply because I'm into that sort of thing. Frame comes from Marvin Minsky's 1970's AI construct: slots and the data that go in them. The S programming language (precursor to R) adopted this terminology in 1991. R of course became popular with the rise of Data Science around 2012. http://www.google.com/trends/explore#q=%22data%20science%22%2C%20%22r%20programming%22cmpt=qtz= DataFrame would carry the implication that it comes along with its own metadata, whereas DataTable might carry the implication that metadata is stored in a central metadata repository. DataFrame is thus technically more correct for SchemaRDD, but is a less familiar (and thus less accessible) term for those not immersed in data science or AI and thus may have narrower appeal. - Original Message - From: Evan R. Sparks evan.spa...@gmail.com To: Matei Zaharia matei.zaha...@gmail.com Cc: Koert Kuipers ko...@tresata.com; Michael Malak michaelma...@yahoo.com; Patrick Wendell pwend...@gmail.com; Reynold Xin r...@databricks.com; dev@spark.apache.org dev@spark.apache.org Sent: Tuesday, January 27, 2015 9:55 AM Subject: Re: renaming SchemaRDD - DataFrame I'm +1 on this, although a little worried about unknowingly introducing SparkSQL dependencies every time someone wants to use this. It would be great if the interface can be abstract and the implementation (in this case, SparkSQL backend) could be swapped out. One alternative suggestion on the name - why not call it DataTable? DataFrame seems like a name carried over from pandas (and by extension, R), and it's never been obvious to me what a Frame is. On Mon, Jan 26, 2015 at 5:32 PM, Matei Zaharia matei.zaha...@gmail.com wrote: (Actually when we designed Spark SQL we thought of giving it another name, like Spark Schema, but we decided to stick with SQL since that was the most obvious use case to many users.) Matei On Jan 26, 2015, at 5:31 PM, Matei Zaharia matei.zaha...@gmail.com wrote: While it might be possible to move this concept to Spark Core long-term, supporting structured data efficiently does require quite a bit of the infrastructure in Spark SQL, such as query planning and columnar storage. The intent of Spark SQL though is to be more than a SQL server -- it's meant to be a library for manipulating structured data. Since this is possible to build over the core API, it's pretty natural to organize it that way, same as Spark Streaming is a library. Matei On Jan 26, 2015, at 4:26 PM, Koert Kuipers ko...@tresata.com wrote: The context is that SchemaRDD is becoming a common data format used for bringing data into Spark from external systems, and used for various components of Spark, e.g. MLlib's new pipeline API. i agree. this to me also implies it belongs in spark core, not sql On Mon, Jan 26, 2015 at 6:11 PM, Michael Malak michaelma...@yahoo.com.invalid wrote: And in the off chance that anyone hasn't seen it yet, the Jan. 13 Bay Area Spark Meetup YouTube contained a wealth of background information on this idea (mostly from Patrick and Reynold :-). https://www.youtube.com/watch?v=YWppYPWznSQ From: Patrick Wendell pwend...@gmail.com To: Reynold Xin r...@databricks.com Cc: dev@spark.apache.org dev@spark.apache.org Sent: Monday, January 26, 2015 4:01 PM Subject: Re: renaming SchemaRDD - DataFrame One thing potentially not clear from this e-mail, there will be a 1:1 correspondence where you can get an RDD to/from a DataFrame. On Mon, Jan 26, 2015 at 2:18 PM, Reynold Xin r...@databricks.com wrote: Hi, We are considering renaming SchemaRDD - DataFrame in 1.3, and wanted to get the community's opinion. The context is that SchemaRDD is becoming a common data format used for bringing data into Spark from external systems, and used for various components of Spark, e.g. MLlib's new pipeline API. We also expect more and more users to be programming directly against SchemaRDD API rather than the core RDD API. SchemaRDD, through its less commonly used DSL originally designed for writing test cases, always has the data-frame like API. In 1.3, we are redesigning the API to make the API usable for end users. There are two motivations for the renaming: 1. DataFrame seems to be a more self-evident name than SchemaRDD. 2. SchemaRDD/DataFrame is actually not going to be an RDD anymore (even though it would contain some RDD functions like map, flatMap, etc), and calling it Schema*RDD* while it is not an RDD is highly confusing. Instead. DataFrame.rdd will return the underlying RDD for all RDD methods. My understanding is that very few users program directly against the SchemaRDD API at the moment, because they are not well documented. However, oo maintain backward compatibility, we can
[jira] [Created] (SPARK-5343) ShortestPaths traverses backwards
Michael Malak created SPARK-5343: Summary: ShortestPaths traverses backwards Key: SPARK-5343 URL: https://issues.apache.org/jira/browse/SPARK-5343 Project: Spark Issue Type: Bug Components: GraphX Affects Versions: 1.2.0 Reporter: Michael Malak GraphX ShortestPaths seems to be following edges backwards instead of forwards: import org.apache.spark.graphx._ val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L, lib.ShortestPaths.run(g,Array(3)).vertices.collect res1: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 - 0)), (2,Map())) lib.ShortestPaths.run(g,Array(1)).vertices.collect res2: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), (3,Map(1 - 2)), (2,Map(1 - 1))) The following changes may be what will make it run forward: Change one occurrence of src to dst in https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L64 Change three occurrences of dst to src in https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L65 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: GraphX ShortestPaths backwards?
I created https://issues.apache.org/jira/browse/SPARK-5343 for this. - Original Message - From: Michael Malak michaelma...@yahoo.com To: dev@spark.apache.org dev@spark.apache.org Cc: Sent: Monday, January 19, 2015 5:09 PM Subject: GraphX ShortestPaths backwards? GraphX ShortestPaths seems to be following edges backwards instead of forwards: import org.apache.spark.graphx._ val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L, lib.ShortestPaths.run(g,Array(3)).vertices.collect res1: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 - 0)), (2,Map())) lib.ShortestPaths.run(g,Array(1)).vertices.collect res2: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), (3,Map(1 - 2)), (2,Map(1 - 1))) If I am not mistaken about my assessment, then I believe the following changes will make it run forward: Change one occurrence of src to dst in https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L64 Change three occurrences of dst to src in https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L65 - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
GraphX ShortestPaths backwards?
GraphX ShortestPaths seems to be following edges backwards instead of forwards: import org.apache.spark.graphx._ val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L, lib.ShortestPaths.run(g,Array(3)).vertices.collect res1: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 - 0)), (2,Map())) lib.ShortestPaths.run(g,Array(1)).vertices.collect res2: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), (3,Map(1 - 2)), (2,Map(1 - 1))) If I am not mistaken about my assessment, then I believe the following changes will make it run forward: Change one occurrence of src to dst in https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L64 Change three occurrences of dst to src in https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L65 - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: GraphX vertex partition/location strategy
But wouldn't the gain be greater under something similar to EdgePartition1D (but perhaps better load-balanced based on number of edges for each vertex) and an algorithm that primarily follows edges in the forward direction? From: Ankur Dave ankurd...@gmail.com To: Michael Malak michaelma...@yahoo.com Cc: dev@spark.apache.org dev@spark.apache.org Sent: Monday, January 19, 2015 2:08 PM Subject: Re: GraphX vertex partition/location strategy No - the vertices are hash-partitioned onto workers independently of the edges. It would be nice for each vertex to be on the worker with the most adjacent edges, but we haven't done this yet since it would add a lot of complexity to avoid load imbalance while reducing the overall communication by a small factor. We refer to the number of partitions containing adjacent edges for a particular vertex as the vertex's replication factor. I think the typical replication factor for power-law graphs with 100-200 partitions is 10-15, and placing the vertex at the ideal location would only reduce the replication factor by 1. Ankur On Mon, Jan 19, 2015 at 12:20 PM, Michael Malak michaelma...@yahoo.com.invalid wrote: Does GraphX make an effort to co-locate vertices onto the same workers as the majority (or even some) of its edges?
GraphX vertex partition/location strategy
Does GraphX make an effort to co-locate vertices onto the same workers as the majority (or even some) of its edges? - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
GraphX doc: triangleCount() requirement overstatement?
According to: https://spark.apache.org/docs/1.2.0/graphx-programming-guide.html#triangle-counting Note that TriangleCount requires the edges to be in canonical orientation (srcId dstId) But isn't this overstating the requirement? Isn't the requirement really that IF there are duplicate edges between two vertices, THEN those edges must all be in the same direction (in order for the groupEdges() at the beginning of triangleCount() to produce the intermediate results that triangleCount() expects)? If so, should I enter a JIRA ticket to clarify the documentation? Or is it the case that https://issues.apache.org/jira/browse/SPARK-3650 will make it into Spark 1.3 anyway? - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: GraphX rmatGraph hangs
Thank you. I created https://issues.apache.org/jira/browse/SPARK-5064 - Original Message - From: xhudik xhu...@gmail.com To: dev@spark.apache.org Cc: Sent: Saturday, January 3, 2015 2:04 PM Subject: Re: GraphX rmatGraph hangs Hi Michael, yes, I can confirm the behavior. It get stuck (loop?) and eat all resources, command top gives: 14013 ll 20 0 2998136 489992 19804 S 100.2 12.10 13:29.39 java You might create an issue/bug in jira (https://issues.apache.org/jira/browse/SPARK) best, tomas -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/GraphX-rmatGraph-hangs-tp9995p9996.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
GraphX rmatGraph hangs
The following single line just hangs, when executed in either Spark Shell or standalone: org.apache.spark.graphx.util.GraphGenerators.rmatGraph(sc, 4, 8) It just outputs 0 edges and then locks up. The only other information I've found via Google is: http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3c1408617621830-12570.p...@n3.nabble.com%3E - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[jira] [Created] (SPARK-5064) GraphX rmatGraph hangs
Michael Malak created SPARK-5064: Summary: GraphX rmatGraph hangs Key: SPARK-5064 URL: https://issues.apache.org/jira/browse/SPARK-5064 Project: Spark Issue Type: Bug Components: GraphX Affects Versions: 1.2.0 Environment: CentOS 7 REPL (no HDFS). Also tried Cloudera 5.2.0 QuickStart standalone compiled Scala with spark-submit. Reporter: Michael Malak org.apache.spark.graphx.util.GraphGenerators.rmatGraph(sc, 4, 8) It just outputs 0 edges and then locks up. A spark-user message reports similar behavior: http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3c1408617621830-12570.p...@n3.nabble.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: Rdd of Rdds
On Wednesday, October 22, 2014 9:06 AM, Sean Owen so...@cloudera.com wrote: No, there's no such thing as an RDD of RDDs in Spark. Here though, why not just operate on an RDD of Lists? or a List of RDDs? Usually one of these two is the right approach whenever you feel inclined to operate on an RDD of RDDs. Depending on one's needs, one could also consider the matrix (RDD[Vector]) operations provided by MLLib, such as https://spark.apache.org/docs/latest/mllib-statistics.html - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UpdateStateByKey - How to improve performance?
Depending on the density of your keys, the alternative signature def updateStateByKey[S](updateFunc: (Iterator[(K, Seq[V], Option[S])]) ? Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean)(implicit arg0: ClassTag[S]): DStream[(K, S)] at least iterates by key rather than by (old) value. I believe your thinking is correct that there might be a performance improvement opportunity for your case if there were an updateStateByKey() that instead iterated by (new) value. BTW, my impression from the stock examples is that the signature I pasted above was intended to be the more typically called updateStateByKey(), as opposed to the one you pasted, for which my impression is that it is the more general purpose one. I have used the more general purpose one but only when I needed to peek into the entire set of states for some unusual reason. On Wednesday, August 6, 2014 2:30 PM, Venkat Subramanian vsubr...@gmail.com wrote: The method def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] takes Dstream (K,V) and Produces DStream (K,S) in Spark Streaming We have a input Dstream(K,V) that has 40,000 elements. We update on average of 1000 elements of them in every 3 second batch, but based on how this updateStateByKey function is defined, we are looping through 40,000 elements (Seq[V]) to make an update for just 1000 elements and not updating 39000 elements. I think looping through extra 39000 elements is a waste of performance. Isn't there a better way to update this efficiently by just figuring out the a hash map for the 1000 elements that are required to be updated and just updating it (without looping through the unwanted elements)? Shouldn't there be a Streaming update function provided that updates selective members or are we missing some concepts here? I think updateStateByKey may be causing lot of performance degradation in our app as we keep doing this again and again for every batch. Please let us know if my thought process is correct here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: relationship of RDD[Array[String]] to Array[Array[String]]
It's really more of a Scala question than a Spark question, but the standard OO (not Scala-specific) way is to create your own custom supertype (e.g. MyCollectionTrait), inherited/implemented by two concrete classes (e.g. MyRDD and MyArray), each of which manually forwards method calls to the corresponding pre-existing library implementations. Writing all those forwarding method calls is tedious, but Scala provides at least one bit of syntactic sugar, which alleviates having to type in twice the parameter lists for each method: http://stackoverflow.com/questions/8230831/is-method-parameter-forwarding-possible-in-scala I'm not seeing a way to utilize implicit conversions in this case. Since Scala is statically (albeit inferred) typed, I don't see a way around having a common supertype. On Monday, July 21, 2014 11:01 AM, Philip Ogren philip.og...@oracle.com wrote: It is really nice that Spark RDD's provide functions that are often equivalent to functions found in Scala collections. For example, I can call: myArray.map(myFx) and equivalently myRdd.map(myFx) Awesome! My question is this. Is it possible to write code that works on either an RDD or a local collection without having to have parallel implementations? I can't tell that RDD or Array share any supertypes or traits by looking at the respective scaladocs. Perhaps implicit conversions could be used here. What I would like to do is have a single function whose body is like this: myData.map(myFx) where myData could be an RDD[Array[String]] (for example) or an Array[Array[String]]. Has anyone had success doing this? Thanks, Philip
15 new MLlib algorithms
At Spark Summit, Patrick Wendell indicated the number of MLlib algorithms would roughly double in 1.1 from the current approx. 15. http://spark-summit.org/wp-content/uploads/2014/07/Future-of-Spark-Patrick-Wendell.pdf What are the planned additional algorithms? In Jira, I only see two when filtering on version 1.1, component MLlib: one on multi-label and another on high dimensionality. https://issues.apache.org/jira/browse/SPARK-2329?jql=issuetype%20in%20(Brainstorming%2C%20Epic%2C%20%22New%20Feature%22%2C%20Story)%20AND%20fixVersion%20%3D%201.1.0%20AND%20component%20%3D%20MLlib http://tinyurl.com/ku7sehu
Re: parallel Reduce within a key
How about a treeReduceByKey? :-) On Friday, June 20, 2014 11:55 AM, DB Tsai dbt...@stanford.edu wrote: Currently, the reduce operation combines the result from mapper sequentially, so it's O(n). Xiangrui is working on treeReduce which is O(log(n)). Based on the benchmark, it dramatically increase the performance. You can test the code in his own branch. https://github.com/apache/spark/pull/1110 Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Jun 20, 2014 at 6:57 AM, ansriniv ansri...@gmail.com wrote: Hi, I am on Spark 0.9.0 I have a 2 node cluster (2 worker nodes) with 16 cores on each node (so, 32 cores in the cluster). I have an input rdd with 64 partitions. I am running sc.mapPartitions(...).reduce(...) I can see that I get full parallelism on the mapper (all my 32 cores are busy simultaneously). However, when it comes to reduce(), the outputs of the mappers are all reduced SERIALLY. Further, all the reduce processing happens only on 1 of the workers. I was expecting that the outputs of the 16 mappers on node 1 would be reduced in parallel in node 1 while the outputs of the 16 mappers on node 2 would be reduced in parallel on node 2 and there would be 1 final inter-node reduce (of node 1 reduced result and node 2 reduced result). Isn't parallel reduce supported WITHIN a key (in this case I have no key) ? (I know that there is parallelism in reduce across keys) Best Regards Anand -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallel-Reduce-within-a-key-tp7998.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
GraphX triplets on 5-node graph
Shouldn't I be seeing N2 and N4 in the output below? (Spark 0.9.0 REPL) Or am I missing something fundamental? val nodes = sc.parallelize(Array((1L, N1), (2L, N2), (3L, N3), (4L, N4), (5L, N5))) val edges = sc.parallelize(Array(Edge(1L, 2L, E1), Edge(1L, 3L, E2), Edge(2L, 4L, E3), Edge(3L, 5L, E4))) Graph(nodes, edges).triplets.collect res1: Array[org.apache.spark.graphx.EdgeTriplet[String,String]] = Array(((1,N1),(3,N3),E2), ((1,N1),(3,N3),E2), ((3,N3),(5,N5),E4), ((3,N3),(5,N5),E4))
[jira] [Commented] (SPARK-1199) Type mismatch in Spark shell when using case class defined in shell
[ https://issues.apache.org/jira/browse/SPARK-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011492#comment-14011492 ] Michael Malak commented on SPARK-1199: -- See also additional test cases in https://issues.apache.org/jira/browse/SPARK-1836 which has now been marked as a duplicate. Type mismatch in Spark shell when using case class defined in shell --- Key: SPARK-1199 URL: https://issues.apache.org/jira/browse/SPARK-1199 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 0.9.0 Reporter: Andrew Kerr Priority: Critical Fix For: 1.1.0 Define a class in the shell: {code} case class TestClass(a:String) {code} and an RDD {code} val data = sc.parallelize(Seq(a)).map(TestClass(_)) {code} define a function on it and map over the RDD {code} def itemFunc(a:TestClass):TestClass = a data.map(itemFunc) {code} Error: {code} console:19: error: type mismatch; found : TestClass = TestClass required: TestClass = ? data.map(itemFunc) {code} Similarly with a mapPartitions: {code} def partitionFunc(a:Iterator[TestClass]):Iterator[TestClass] = a data.mapPartitions(partitionFunc) {code} {code} console:19: error: type mismatch; found : Iterator[TestClass] = Iterator[TestClass] required: Iterator[TestClass] = Iterator[?] Error occurred in an application involving default arguments. data.mapPartitions(partitionFunc) {code} The behavior is the same whether in local mode or on a cluster. This isn't specific to RDDs. A Scala collection in the Spark shell has the same problem. {code} scala Seq(TestClass(foo)).map(itemFunc) console:15: error: type mismatch; found : TestClass = TestClass required: TestClass = ? Seq(TestClass(foo)).map(itemFunc) ^ {code} When run in the Scala console (not the Spark shell) there are no type mismatch errors. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (SPARK-1836) REPL $outer type mismatch causes lookup() and equals() problems
[ https://issues.apache.org/jira/browse/SPARK-1836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Malak resolved SPARK-1836. -- Resolution: Duplicate REPL $outer type mismatch causes lookup() and equals() problems --- Key: SPARK-1836 URL: https://issues.apache.org/jira/browse/SPARK-1836 Project: Spark Issue Type: Bug Affects Versions: 0.9.0 Reporter: Michael Malak Anand Avati partially traced the cause to REPL wrapping classes in $outer classes. There are at least two major symptoms: 1. equals() = In REPL equals() (required in custom classes used as a key for groupByKey) seems to have to be written using instanceOf[] instead of the canonical match{} Spark Shell (equals uses match{}): {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = false {noformat} Spark Shell (equals uses isInstanceOf[]): {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s = s) else false } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true {noformat} Scala Shell (equals uses match{}): {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true {noformat} 2. lookup() = {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) o.asInstanceOf[C].s == s else false override def hashCode = s.hashCode override def toString = s } val r = sc.parallelize(Array((new C(a),11),(new C(a),12))) r.lookup(new C(a)) console:17: error: type mismatch; found : C required: C r.lookup(new C(a)) ^ {noformat} See http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3C1400019424.80629.YahooMailNeo%40web160801.mail.bf1.yahoo.com%3E -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: rdd ordering gets scrambled
Mohit Jaggi: A workaround is to use zipWithIndex (to appear in Spark 1.0, but if you're still on 0.9x you can swipe the code from https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala ), map it to (x = (x._2,x._1)) and then sortByKey. Spark developers: The lack of ordering guarantee for RDDs should be better documented, and the presence of a method called first() is a bit deceiving, in my opinion, if that same first element doesn't survive a map(). On Tuesday, April 29, 2014 3:45 PM, Mohit Jaggi mohitja...@gmail.com wrote: Hi, I started with a text file(CSV) of sorted data (by first column), parsed it into Scala objects using map operation in Scala. Then I used more maps to add some extra info to the data and saved it as text file. The final text file is not sorted. What do I need to do to keep the order from the original input intact? My code looks like: csvFile = sc.textFile(..) //file is CSV and ordered by first column splitRdd = csvFile map { line = line.split(,,-1) } parsedRdd = rdd map { parts = { key = parts(0) //use first column as key value = new MyObject(parts(0), parts(1)) //parse into scala objects (key, value) } augmentedRdd = parsedRdd map { x = key = x._1 value = //add extra fields to x._2 (key, value) } augmentedRdd.saveAsFile(...) //this file is not sorted Mohit.
[jira] [Commented] (SPARK-1867) Spark Documentation Error causes java.lang.IllegalStateException: unread block data
[ https://issues.apache.org/jira/browse/SPARK-1867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14007565#comment-14007565 ] Michael Malak commented on SPARK-1867: -- Thank you, sam, that fixed it for me! FYI, I had: {noformat} dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId version2.3.0-cdh5.0.1/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-mapreduce-client-core/artifactId version2.3.0-cdh5.0.1/version /dependency {noformat} By removing the second one, textfile().count now works. Spark Documentation Error causes java.lang.IllegalStateException: unread block data --- Key: SPARK-1867 URL: https://issues.apache.org/jira/browse/SPARK-1867 Project: Spark Issue Type: Bug Reporter: sam I've employed two System Administrators on a contract basis (for quite a bit of money), and both contractors have independently hit the following exception. What we are doing is: 1. Installing Spark 0.9.1 according to the documentation on the website, along with CDH4 (and another cluster with CDH5) distros of hadoop/hdfs. 2. Building a fat jar with a Spark app with sbt then trying to run it on the cluster I've also included code snippets, and sbt deps at the bottom. When I've Googled this, there seems to be two somewhat vague responses: a) Mismatching spark versions on nodes/user code b) Need to add more jars to the SparkConf Now I know that (b) is not the problem having successfully run the same code on other clusters while only including one jar (it's a fat jar). But I have no idea how to check for (a) - it appears Spark doesn't have any version checks or anything - it would be nice if it checked versions and threw a mismatching version exception: you have user code using version X and node Y has version Z. I would be very grateful for advice on this. The exception: Exception in thread main org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 32 times (most recent failure: Exception failure: java.lang.IllegalStateException: unread block data) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/05/16 18:05:31 INFO scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 59] My code snippet: val conf = new SparkConf() .setMaster(clusterMaster) .setAppName(appName) .setSparkHome(sparkHome) .setJars(SparkContext.jarOfClass(this.getClass)) println(count = + new SparkContext(conf).textFile(someHdfsPath).count()) My SBT dependencies: // relevant org.apache.spark % spark-core_2.10 % 0.9.1, org.apache.hadoop % hadoop-client % 2.3.0-mr1-cdh5.0.0, // standard, probably unrelated com.github.seratch %% awscala % [0.2,), org.scalacheck %% scalacheck % 1.10.1 % test, org.specs2 %% specs2 % 1.14 % test, org.scala-lang % scala-reflect % 2.10.3, org.scalaz %% scalaz-core % 7.0.5, net.minidev % json-smart % 1.2 -- This message was sent by Atlassian JIRA (v6.2
Re: [VOTE] Release Apache Spark 1.0.0 (rc5)
While developers may appreciate 1.0 == API stability, I'm not sure that will be the understanding of the VP who gives the green light to a Spark-based development effort. I fear a bug that silently produces erroneous results will be perceived like the FDIV bug, but in this case without the momentum of an existing large installed base and with a number of competitors (GridGain, H20, Stratosphere). Despite the stated intention of API stability, the perception (which becomes the reality) of 1.0 is that it's ready for production use -- not bullet-proof, but also not with known silent generation of erroneous results. Exceptions and crashes are much more tolerated than silent corruption of data. The result may be a reputation of the Spark team unconcerned about data integrity. I ran into (and submitted) https://issues.apache.org/jira/browse/SPARK-1817 due to the lack of zipWithIndex(). zip() with a self-created partitioned range was the way I was trying to number with IDs a collection of nodes in preparation for the GraphX constructor. For the record, it was a frequent Spark committer who escalated it to blocker; I did not submit it as such. Partitioning a Scala range isn't just a toy example; it has a real-life use. I also wonder about the REPL. Cloudera, for example, touts it as key to making Spark a crossover tool that Data Scientists can also use. The REPL can be considered an API of sorts -- not a traditional Scala or Java API, of course, but the API that a human data analyst would use. With the Scala REPL exhibiting some of the same bad behaviors as the Spark REPL, there is a question of whether the Spark REPL can even be fixed. If the Spark REPL has to be eliminated after 1.0 due to an inability to repair it, that would constitute API instability. On Saturday, May 17, 2014 2:49 PM, Matei Zaharia matei.zaha...@gmail.com wrote: As others have said, the 1.0 milestone is about API stability, not about saying “we’ve eliminated all bugs”. The sooner you declare 1.0, the sooner users can confidently build on Spark, knowing that the application they build today will still run on Spark 1.9.9 three years from now. This is something that I’ve seen done badly (and experienced the effects thereof) in other big data projects, such as MapReduce and even YARN. The result is that you annoy users, you end up with a fragmented userbase where everyone is building against a different version, and you drastically slow down development. With a project as fast-growing as fast-growing as Spark in particular, there will be new bugs discovered and reported continuously, especially in the non-core components. Look at the graph of # of contributors in time to Spark: https://www.ohloh.net/p/apache-spark (bottom-most graph; “commits” changed when we started merging each patch as a single commit). This is not slowing down, and we need to have the culture now that we treat API stability and release numbers at the level expected for a 1.0 project instead of having people come in and randomly change the API. I’ll also note that the issues marked “blocker” were marked so by their reporters, since the reporter can set the priority. I don’t consider stuff like parallelize() not partitioning ranges in the same way as other collections a blocker — it’s a bug, it would be good to fix it, but it only affects a small number of use cases. Of course if we find a real blocker (in particular a regression from a previous version, or a feature that’s just completely broken), we will delay the release for that, but at some point you have to say “okay, this fix will go into the next maintenance release”. Maybe we need to write a clear policy for what the issue priorities mean. Finally, I believe it’s much better to have a culture where you can make releases on a regular schedule, and have the option to make a maintenance release in 3-4 days if you find new bugs, than one where you pile up stuff into each release. This is what much large project than us, like Linux, do, and it’s the only way to avoid indefinite stalling with a large contributor base. In the worst case, if you find a new bug that warrants immediate release, it goes into 1.0.1 a week after 1.0.0 (we can vote on 1.0.1 in three days with just your bug fix in it). And if you find an API that you’d like to improve, just add a new one and maybe deprecate the old one — at some point we have to respect our users and let them know that code they write today will still run tomorrow. Matei On May 17, 2014, at 10:32 AM, Kan Zhang kzh...@apache.org wrote: +1 on the running commentary here, non-binding of course :-) On Sat, May 17, 2014 at 8:44 AM, Andrew Ash and...@andrewash.com wrote: +1 on the next release feeling more like a 0.10 than a 1.0 On May 17, 2014 4:38 AM, Mridul Muralidharan mri...@gmail.com wrote: I had echoed similar sentiments a while back when there was a discussion around 0.10 vs 1.0 ... I would have
[jira] [Updated] (SPARK-1836) REPL $outer type mismatch causes lookup() and equals() problems
[ https://issues.apache.org/jira/browse/SPARK-1836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Malak updated SPARK-1836: - Description: Anand Avati partially traced the cause to REPL wrapping classes in $outer classes. There are at least two major symptoms: 1. equals() = In REPL equals() (required in custom classes used as a key for groupByKey) seems to have to be written using instanceOf[] instead of the canonical match{} Spark Shell (equals uses match{}): {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = false {noformat} Spark Shell (equals uses isInstanceOf[]): {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s = s) else false } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true {noformat} Scala Shell (equals uses match{}): {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true {noformat} 2. lookup() = {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) o.asInstanceOf[C].s == s else false override def hashCode = s.hashCode override def toString = s } val r = sc.parallelize(Array((new C(a),11),(new C(a),12))) r.lookup(new C(a)) console:17: error: type mismatch; found : C required: C r.lookup(new C(a)) ^ {noformat} See http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3C1400019424.80629.YahooMailNeo%40web160801.mail.bf1.yahoo.com%3E was: Anand Avati partially traced the cause to REPL wrapping classes in $outer classes. There are at least two major symptoms: 1. equals() = In REPL equals() (required in custom classes used as a key for groupByKey) seems to have to be written using instanceOf[] instead of the canonical match{} Spark Shell (equals uses match{}): class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = false Spark Shell (equals uses isInstanceOf[]): class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s = s) else false } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true Scala Shell (equals uses match{}): class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true 2. lookup() = class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) o.asInstanceOf[C].s == s else false override def hashCode = s.hashCode override def toString = s } val r = sc.parallelize(Array((new C(a),11),(new C(a),12))) r.lookup(new C(a)) console:17: error: type mismatch; found : C required: C r.lookup(new C(a)) ^ See http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3C1400019424.80629.YahooMailNeo%40web160801.mail.bf1
[jira] [Commented] (SPARK-1836) REPL $outer type mismatch causes lookup() and equals() problems
[ https://issues.apache.org/jira/browse/SPARK-1836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13998807#comment-13998807 ] Michael Malak commented on SPARK-1836: -- Michael Ambrust: Indeed. Do you think I should add my additional case of equals() (and its workaround) as a comment to SPARK-1199 and mark this one as a duplicate? REPL $outer type mismatch causes lookup() and equals() problems --- Key: SPARK-1836 URL: https://issues.apache.org/jira/browse/SPARK-1836 Project: Spark Issue Type: Bug Affects Versions: 0.9.0 Reporter: Michael Malak Anand Avati partially traced the cause to REPL wrapping classes in $outer classes. There are at least two major symptoms: 1. equals() = In REPL equals() (required in custom classes used as a key for groupByKey) seems to have to be written using instanceOf[] instead of the canonical match{} Spark Shell (equals uses match{}): {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = false {noformat} Spark Shell (equals uses isInstanceOf[]): {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s = s) else false } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true {noformat} Scala Shell (equals uses match{}): {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true {noformat} 2. lookup() = {noformat} class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) o.asInstanceOf[C].s == s else false override def hashCode = s.hashCode override def toString = s } val r = sc.parallelize(Array((new C(a),11),(new C(a),12))) r.lookup(new C(a)) console:17: error: type mismatch; found : C required: C r.lookup(new C(a)) ^ {noformat} See http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3C1400019424.80629.YahooMailNeo%40web160801.mail.bf1.yahoo.com%3E -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-1817) RDD zip erroneous when partitions do not divide RDD count
Michael Malak created SPARK-1817: Summary: RDD zip erroneous when partitions do not divide RDD count Key: SPARK-1817 URL: https://issues.apache.org/jira/browse/SPARK-1817 Project: Spark Issue Type: Bug Affects Versions: 0.9.0 Reporter: Michael Malak Example: scala sc.parallelize(1L to 2L,4).zip(sc.parallelize(11 to 12,4)).collect res1: Array[(Long, Int)] = Array((2,11)) But more generally, it's whenever the number of partitions does not evenly divide the total number of elements in the RDD. See https://groups.google.com/forum/#!msg/spark-users/demrmjHFnoc/Ek3ijiXHr2MJ -- This message was sent by Atlassian JIRA (v6.2#6252)
Serializable different behavior Spark Shell vs. Scala Shell
Reposting here on dev since I didn't see a response on user: I'm seeing different Serializable behavior in Spark Shell vs. Scala Shell. In the Spark Shell, equals() fails when I use the canonical equals() pattern of match{}, but works when I subsitute with isInstanceOf[]. I am using Spark 0.9.0/Scala 2.10.3. Is this a bug? Spark Shell (equals uses match{}) = class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = false Spark Shell (equals uses isInstanceOf[]) class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s == s) else false } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true Scala Shell (equals uses match{}) = class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true
Re: Serializable different behavior Spark Shell vs. Scala Shell
Thank you for your investigation into this! Just for completeness, I've confirmed it's a problem only in REPL, not in compiled Spark programs. But within REPL, a direct consequence of non-same classes after serialization/deserialization also means that lookup() doesn't work: scala class C(val s:String) extends Serializable { | override def equals(o: Any) = if (o.isInstanceOf[C]) o.asInstanceOf[C].s == s else false | override def toString = s | } defined class C scala val r = sc.parallelize(Array((new C(a),11),(new C(a),12))) r: org.apache.spark.rdd.RDD[(C, Int)] = ParallelCollectionRDD[3] at parallelize at console:14 scala r.lookup(new C(a)) console:17: error: type mismatch; found : C required: C r.lookup(new C(a)) ^ On Tuesday, May 13, 2014 3:05 PM, Anand Avati av...@gluster.org wrote: On Tue, May 13, 2014 at 8:26 AM, Michael Malak michaelma...@yahoo.com wrote: Reposting here on dev since I didn't see a response on user: I'm seeing different Serializable behavior in Spark Shell vs. Scala Shell. In the Spark Shell, equals() fails when I use the canonical equals() pattern of match{}, but works when I subsitute with isInstanceOf[]. I am using Spark 0.9.0/Scala 2.10.3. Is this a bug? Spark Shell (equals uses match{}) = class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = false Spark Shell (equals uses isInstanceOf[]) class C(val s:String) extends Serializable { override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s == s) else false } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true Scala Shell (equals uses match{}) = class C(val s:String) extends Serializable { override def equals(o: Any) = o match { case that: C = that.s == s case _ = false } } val x = new C(a) val bos = new java.io.ByteArrayOutputStream() val out = new java.io.ObjectOutputStream(bos) out.writeObject(x); val b = bos.toByteArray(); out.close bos.close val y = new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C] x.equals(y) res: Boolean = true Hmm. I see that this can be reproduced without Spark in Scala 2.11, with and without -Yrepl-class-based command line flag to the repl. Spark's REPL has the effective behavior of 2.11's -Yrepl-class-based flag. Inspecting the byte code generated, it appears -Yrepl-class-based results in the creation of $outer field in the generated classes (including class C). The first case match in equals() is resulting code along the lines of (simplified): if (o isinstanceof Cstr this.$outer == that.$outer) { // do string compare // } $outer is the synthetic field object to the outer object in which the object was created (in this case, the repl environment). Now obviously, when x is taken through the bytestream and deserialized, it would have a new $outer created (it may have deserialized in a different jvm or machine for all we know). So the $outer's mismatching is expected. However I'm still trying to understand why the outers need to be the same for the case match.
Re: Bug when zip with longs and too many partitions?
I've discovered that it was noticed a year ago that RDD zip() does not work when the number of partitions does not evenly divide the total number of elements in the RDD: https://groups.google.com/forum/#!msg/spark-users/demrmjHFnoc/Ek3ijiXHr2MJ I will enter a JIRA ticket just as soon as the ASF Jira system will let me reset my password. On Sunday, May 11, 2014 4:40 AM, Michael Malak michaelma...@yahoo.com wrote: Is this a bug? scala sc.parallelize(1 to 2,4).zip(sc.parallelize(11 to 12,4)).collect res0: Array[(Int, Int)] = Array((1,11), (2,12)) scala sc.parallelize(1L to 2L,4).zip(sc.parallelize(11 to 12,4)).collect res1: Array[(Long, Int)] = Array((2,11))
Re: Opinions stratosphere
looks like Spark outperforms Stratosphere fairly consistently in the experiments There was one exception the paper noted, which was when memory resources were constrained. In that case, Stratosphere seemed to have degraded more gracefully than Spark, but the author did not explore it deeper. The author did insert into his conclusion section, though, However, in our experiments, for iterative algorithms, the Spark programs may show the poor results in performance in the environment of limited memory resources. I recently blogged a fuller list of alternatives/competitors to Spark: http://datascienceassn.org/content/alternatives-spark-memory-distributed-computing On Friday, May 2, 2014 10:39 AM, Philip Ogren philip.og...@oracle.com wrote: Great reference! I just skimmed through the results without reading much of the methodology - but it looks like Spark outperforms Stratosphere fairly consistently in the experiments. It's too bad the data sources only range from 2GB to 8GB. Who knows if the apparent pattern would extend out to 64GB, 128GB, 1TB, and so on... On 05/01/2014 06:02 PM, Christopher Nguyen wrote: Someone (Ze Ni, https://www.sics.se/people/ze-ni) has actually attempted such a comparative study as a Masters thesis: http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf According to this snapshot (c. 2013), Stratosphere is different from Spark in not having an explicit concept of an in-memory dataset (e.g., RDD). In principle this could be argued to be an implementation detail; the operators and execution plan/data flow are of primary concern in the API, and the data representation/materializations are otherwise unspecified. But in practice, for long-running interactive applications, I consider RDDs to be of fundamental, first-class citizen importance, and the key distinguishing feature of Spark's model vs other in-memory approaches that treat memory merely as an implicit cache. -- Christopher T. Nguyen Co-founder CEO, Adatao linkedin.com/in/ctnguyen On Tue, Nov 26, 2013 at 1:26 PM, Matei Zaharia matei.zaha...@gmail.com wrote: I don’t know a lot about it except from the research side, where the team has done interesting optimization stuff for these types of applications. In terms of the engine, one thing I’m not sure of is whether Stratosphere allows explicit caching of datasets (similar to RDD.cache()) and interactive queries (similar to spark-shell). But it’s definitely an interesting project to watch. Matei On Nov 22, 2013, at 4:17 PM, Ankur Chauhan achau...@brightcove.com wrote: Hi, That's what I thought but as per the slides on http://www.stratosphere.eu they seem to know about spark and the scala api does look similar. I found the PACT model interesting. Would like to know if matei or other core comitters have something to weight in on. -- Ankur On 22 Nov 2013, at 16:05, Patrick Wendell pwend...@gmail.com wrote: I've never seen that project before, would be interesting to get a comparison. Seems to offer a much lower level API. For instance this is a wordcount program: https://github.com/stratosphere/stratosphere/blob/master/pact/pact-examples/src/main/java/eu/stratosphere/pact/example/wordcount/WordCount.java On Thu, Nov 21, 2013 at 3:15 PM, Ankur Chauhan achau...@brightcove.com wrote: Hi, I was just curious about https://github.com/stratosphere/stratosphere and how does spark compare to it. Anyone has any experience with it to make any comments? -- Ankur
Re: UDFs with package names
Yup, it was the directory structure com/mystuff/whateverUDF.class that was missing. Thought I had tried that before posting my question, but... Thanks for your help! From: Edward Capriolo edlinuxg...@gmail.com To: user@hive.apache.org user@hive.apache.org; Michael Malak michaelma...@yahoo.com Sent: Tuesday, July 30, 2013 7:06 PM Subject: Re: UDFs with package names It might be a better idea to use your own package com.mystuff.x. You might be running into an issue where java is not finding the file because it assumes the relation between package and jar is 1 to 1. You might also be compiling wrong If your package is com.mystuff that class file should be in a directory structure com/mystuff/whateverUDF.class I am not seeing that from your example. On Tue, Jul 30, 2013 at 8:00 PM, Michael Malak michaelma...@yahoo.com wrote: Thus far, I've been able to create Hive UDFs, but now I need to define them within a Java package name (as opposed to the default Java package as I had been doing), but once I do that, I'm no longer able to load them into Hive. First off, this works: add jar /usr/lib/hive/lib/hive-contrib-0.10.0-cdh4.3.0.jar; create temporary function row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'; Then I took the source code for UDFRowSequence.java from http://svn.apache.org/repos/asf/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udf/UDFRowSequence.java and renamed the file and the class inside to UDFRowSequence2.java I compile and deploy it with: javac -cp /usr/lib/hive/lib/hive-exec-0.10.0-cdh4.3.0.jar:/usr/lib/hadoop/hadoop-common.jar UDFRowSequence2.java jar cvf UDFRowSequence2.jar UDFRowSequence2.class sudo cp UDFRowSequence2.jar /usr/local/lib But in Hive, I get the following: hive add jar /usr/local/lib/UDFRowSequence2.jar; Added /usr/local/lib/UDFRowSequence2.jar to class path Added resource: /usr/local/lib/UDFRowSequence2.jar hive create temporary function row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence2'; FAILED: Class org.apache.hadoop.hive.contrib.udf.UDFRowSequence2 not found FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask But if I comment out the package line in UDFRowSequence2.java (to put the UDF into the default Java package), it works: hive add jar /usr/local/lib/UDFRowSequence2.jar; Added /usr/local/lib/UDFRowSequence2.jar to class path Added resource: /usr/local/lib/UDFRowSequence2.jar hive create temporary function row_sequence as 'UDFRowSequence2'; OK Time taken: 0.383 seconds What am I doing wrong? I have a feeling it's something simple.
UDFs with package names
Thus far, I've been able to create Hive UDFs, but now I need to define them within a Java package name (as opposed to the default Java package as I had been doing), but once I do that, I'm no longer able to load them into Hive. First off, this works: add jar /usr/lib/hive/lib/hive-contrib-0.10.0-cdh4.3.0.jar; create temporary function row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'; Then I took the source code for UDFRowSequence.java from http://svn.apache.org/repos/asf/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udf/UDFRowSequence.java and renamed the file and the class inside to UDFRowSequence2.java I compile and deploy it with: javac -cp /usr/lib/hive/lib/hive-exec-0.10.0-cdh4.3.0.jar:/usr/lib/hadoop/hadoop-common.jar UDFRowSequence2.java jar cvf UDFRowSequence2.jar UDFRowSequence2.class sudo cp UDFRowSequence2.jar /usr/local/lib But in Hive, I get the following: hive add jar /usr/local/lib/UDFRowSequence2.jar; Added /usr/local/lib/UDFRowSequence2.jar to class path Added resource: /usr/local/lib/UDFRowSequence2.jar hive create temporary function row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence2'; FAILED: Class org.apache.hadoop.hive.contrib.udf.UDFRowSequence2 not found FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask But if I comment out the package line in UDFRowSequence2.java (to put the UDF into the default Java package), it works: hive add jar /usr/local/lib/UDFRowSequence2.jar; Added /usr/local/lib/UDFRowSequence2.jar to class path Added resource: /usr/local/lib/UDFRowSequence2.jar hive create temporary function row_sequence as 'UDFRowSequence2'; OK Time taken: 0.383 seconds What am I doing wrong? I have a feeling it's something simple.
Re: Best Performance on Large Scale Join
Perhaps you can first create a temp table that contains only the records that will match? See the UNION ALL trick at http://www.mail-archive.com/hive-user@hadoop.apache.org/msg01906.html From: Brad Ruderman bruder...@radiumone.com To: user@hive.apache.org Sent: Monday, July 29, 2013 11:38 AM Subject: Best Performance on Large Scale Join Hi All- I have 2 tables: CREATE TABLE users ( a bigint, b int ) CREATE TABLE products ( a bigint, c int ) Each table has about 8 billion records (roughly 2k files total mappers). I want to know the most performant way to do the following query: SELECT u.b, p.c, count(*) as count FROM users u INNER JOIN products p ON u.a = p.a GROUP BY u.b, p.c Right now the reducing is killing me. Any suggestions on improving performance? Would a mapbucket join be optimal here? Thanks, Brad
Re: Oracle to Hive
Untested: SELECT a.c100, a.c300, b.c400 FROM t1 a JOIN t2 b ON a.c200 = b.c200 JOIN (SELECT DISTINCT a.c100 FROM t1 a2 JOIN t2 b2 ON a2.c200 = b2.c200 WHERE b2.c400 = SYSDATE - 1) a3 ON a.c100 = a3.c100 WHERE b.c400 = SYSDATE - 1 AND a.c300 = 0 From: Raj Hadoop hadoop...@yahoo.com To: Hive user@hive.apache.org Sent: Wednesday, July 10, 2013 3:30 PM Subject: Oracle to Hive All, Can anyone give me tips on how to convert the following Oracle SQL to a Hive query. SELECT a.c100, a.c300, b.c400 FROM t1 a JOIN t2 b ON a.c200 = b.c200 WHERE a.c100 IN (SELECT DISTINCT a.c100 FROM t1 a JOIN t2 b ON a.c200 = b.c200 WHERE b.c400 = SYSDATE - 1) AND b.c400 = SYSDATE - 1 AND a.c300 = 0 The SYSDATE can be replaced by date_sub(FROM_UNIXTIME(UNIX_TIMESTAMP(),'-MM-dd') , 1) in Hive. But I wanted to know the rest of the query. Any pointers or tips so that I can start on my own. Thanks in advance. Regards, Raj
Re: How Can I store the Hive query result in one file ?
I have found that for output larger than a few GB, redirecting stdout results in an incomplete file. For very large output, I do CREATE TABLE MYTABLE AS SELECT ... and then copy the resulting HDFS files directly out of /user/hive/warehouse. From: Bertrand Dechoux decho...@gmail.com To: user@hive.apache.org Sent: Thursday, July 4, 2013 7:09 AM Subject: Re: How Can I store the Hive query result in one file ? The question is what is the volume of your output. There is one file per output task (map or reduce) because that way each can write it independently and in parallel. That's how mapreduce work. And except by forcing the number of tasks to 1, there is no certain way to have one output file. But indeed if the volume is low enough, you could also capture the standard output into a local file like Nitin described. Bertrand On Thu, Jul 4, 2013 at 12:38 PM, Nitin Pawar nitinpawar...@gmail.com wrote: will hive -e query filename or hive -f query.q filename will do ? you specially want it to write into a named file on hdfs only? On Thu, Jul 4, 2013 at 3:12 PM, Matouk IFTISSEN matouk.iftis...@ysance.com wrote: Hello Hive users, Is there a manner to store the Hive query result (SELECT *.) in a specfique and alone file (given the file name) like (INSERT OVERWRITE LOCAL DIRECTORY '/directory_path_name/')? Thanks for your answers -- Nitin Pawar -- Bertrand Dechoux
Re: Fwd: Need urgent help in hive query
Just copy and paste the whole long expressions to their second occurrences. From: dyuti a hadoop.hiv...@gmail.com To: user@hive.apache.org Sent: Friday, June 28, 2013 10:58 AM Subject: Fwd: Need urgent help in hive query Hi Experts, I'm trying with the below SQL query in Hive, which does not support column alias access in subsequent columns as shown below in the query. Is there any other way to rewrite the same without using alias? any of your help are really appreciated. INSERT INTO CAUD ( pst_cc pst_no pst_ber pst_tkn pst_dtm pst_nbr pst_cde pst_dte ) SELECT der.cc der.no der.ber der.tkn der.dtm der.nbr der.cde der.dte FROM (SELECT udp.cc udp.no udp.ber udp.tkn ,CASE WHEN udp.SYSTEM_DTE1600 AND udp.SYSTEM_DTE1 THEN udp.SYSTEM_DTE WHEN udp.DTE_OUT1600 AND udp.DTE_OUT1 THEN udp.DTE_OUT WHEN udp.DTE_IN1600 AND udp.DTE_IN1 THEN udp.DTE_IN ELSE '1231' END AS DTE_OUT ,CASE WHEN udp.TME_OUT 0 THEN udp.TME_OUT WHEN udp.TME_IN 0 THEN udp.TME_IN ELSE 0 END AS TME_OUT ,TRIM(CAST(TME_OUT AS CHAR(6))) AS TME_OUT1 ,CAST(CAST(SUBSTR(TRIM(DTE_OUT),1,8) AS CHAR(8)) AS DATE FORMAT 'mmdd') AS DTE_OUT_O ,CASE WHEN TME_OUT9 THEN CAST(TME_OUT1 AS CHAR(6)) WHEN TME_OUT AND TME_OUT=9 THEN CAST('0'||TME_OUT1 AS CHAR(6)) WHEN TME_OUT999 AND TME_OUT= THEN CAST('00'||TME_OUT1 AS CHAR(6)) WHEN TME_OUT99 AND TME_OUT=999 THEN CAST('000'||TME_OUT1 AS CHAR(6)) WHEN TME_OUT9 AND TME_OUT=99 THEN CAST(''||TME_OUT1 AS CHAR(6)) WHEN TME_OUT0 AND TME_OUT=9 THEN CAST('0'||TME_OUT1 AS CHAR(6)) WHEN TME_OUT=0 THEN '00' END AS TME_OUT2 ,SUBSTR(TRIM(TME_OUT2),1,2)||':'||SUBSTR(TRIM(TME_OUT2),3,2)||':'||SUBSTR(TRIM(TME_OUT2),5,2) AS TME_OUT_O , CAST( DTE_OUT_O||' '||TME_OUT_O AS TIMESTAMP FORMAT 'MMDD:HH: MI:SS') AS DTM ,udp.nbr AS nbr FROM STS_GNCAUDP udp INNER JOIN LOAD_LOG LZ_LL ON udp.LOG_KEY=LZ_LL.LOG_KEY INNER JOIN ESA_LOAD_LOG ESA_LL ON LZ_LL.ESA_LOAD_LOG_KEY=ESA_LL.LOG_KEY AND ESA_LL.PBLSH_IND='$PBLSH_IND' AND ESA_LL.LOAD_END_DTM ='$HIGH_DATE_TIME' AND ESA_LL.SOR_CD= '$CLM_SOR_CD' AND ESA_LL.SUBJ_AREA_NM= '$SUBJ_AREA_NM' AND ESA_LL.WORK_FLOW_NM= '$WORK_FLOW_NM' QUALIFY ROW_NUMBER() OVER (PARTITION BY udp.cc,udp.pst_no, udp.cde,udp.nbr,udp.dte,udp.LOG_KEY ORDER BY DTM DESC)=1) AS der ; Thanks in advance! Dti
Re: INSERT non-static data to array?
I've created https://issues.apache.org/jira/browse/HIVE-4771 to track this issue. - Original Message - From: Michael Malak michaelma...@yahoo.com To: user@hive.apache.org user@hive.apache.org Cc: Sent: Wednesday, June 19, 2013 2:35 PM Subject: Re: INSERT non-static data to array? The example code for inline_table() there has static data. It's not possible to use a subquery inside the inline_table() or array() is it? The SQL1999 way is described here: http://www.postgresql.org/message-id/20041028232152.ga76...@winnie.fuhr.org CREATE TABLE table_a(a int, b int, c int[]); INSERT INTO table_a SELECT a, b, ARRAY(SELECT c FROM table_c WHERE table_c.parent = table_b.id) FROM table_b From: Edward Capriolo edlinuxg...@gmail.com To: user@hive.apache.org user@hive.apache.org; Michael Malak michaelma...@yahoo.com Sent: Wednesday, June 19, 2013 2:06 PM Subject: Re: INSERT non-static data to array? : https://issues.apache.org/jira/browse/HIVE-3238 This might fit the bill. On Wed, Jun 19, 2013 at 3:23 PM, Michael Malak michaelma...@yahoo.com wrote: Is the only way to INSERT data into a column of type array to load data from a pre-existing file, to use hard-coded values in the INSERT statement, or copy an entire array verbatim from another table? I.e. I'm assuming that a) SQL1999 array INSERT via subquery is not (yet) implemented in Hive, and b) there is also no other way to load dynamically generated data into an array column? If my assumption in a) is true, does a Jira item need to be created for it?
Re: INSERT non-static data to array?
My understanding is that LATERAL VIEW goes the other direction: takes an array and makes it into separate rows. I use that a lot. But I also need to go the other way sometimes: take a bunch of rows and squeeze them down into an array. Please correct me if I'm missing something. From: Edward Capriolo edlinuxg...@gmail.com To: user@hive.apache.org user@hive.apache.org; Michael Malak michaelma...@yahoo.com Sent: Thursday, June 20, 2013 9:15 PM Subject: Re: INSERT non-static data to array? i think you could select into as sub query and then use lateral view.not exactly the same but something similar could be done,. On Thursday, June 20, 2013, Michael Malak michaelma...@yahoo.com wrote: I've created https://issues.apache.org/jira/browse/HIVE-4771 to track this issue. - Original Message - From: Michael Malak michaelma...@yahoo.com To: user@hive.apache.org user@hive.apache.org Cc: Sent: Wednesday, June 19, 2013 2:35 PM Subject: Re: INSERT non-static data to array? The example code for inline_table() there has static data. It's not possible to use a subquery inside the inline_table() or array() is it? The SQL1999 way is described here: http://www.postgresql.org/message-id/20041028232152.ga76...@winnie.fuhr.org CREATE TABLE table_a(a int, b int, c int[]); INSERT INTO table_a SELECT a, b, ARRAY(SELECT c FROM table_c WHERE table_c.parent = table_b.id) FROM table_b From: Edward Capriolo edlinuxg...@gmail.com To: user@hive.apache.org user@hive.apache.org; Michael Malak michaelma...@yahoo.com Sent: Wednesday, June 19, 2013 2:06 PM Subject: Re: INSERT non-static data to array? : https://issues.apache.org/jira/browse/HIVE-3238 This might fit the bill. On Wed, Jun 19, 2013 at 3:23 PM, Michael Malak michaelma...@yahoo.com wrote: Is the only way to INSERT data into a column of type array to load data from a pre-existing file, to use hard-coded values in the INSERT statement, or copy an entire array verbatim from another table? I.e. I'm assuming that a) SQL1999 array INSERT via subquery is not (yet) implemented in Hive, and b) there is also no other way to load dynamically generated data into an array column? If my assumption in a) is true, does a Jira item need to be created for it?
[jira] [Created] (HIVE-4771) Support subqueries in INSERT for array types
Michael Malak created HIVE-4771: --- Summary: Support subqueries in INSERT for array types Key: HIVE-4771 URL: https://issues.apache.org/jira/browse/HIVE-4771 Project: Hive Issue Type: Improvement Components: Query Processor Reporter: Michael Malak Since Hive supports SQL1999-style array types for columns, it would be nice for there to be a way to INSERT non-static data into such columns -- i.e. from another table based on a complex query as opposed to loading from a static file, loading from hard-coded values within the INSERT query, or copying complete arrays verbatim from another table. An example can be found at: http://www.postgresql.org/message-id/20041028232152.ga76...@winnie.fuhr.org CREATE TABLE table_a(a int, b int, c int[]); INSERT INTO table_a SELECT a, b, ARRAY(SELECT c FROM table_c WHERE table_c.parent = table_b.id) FROM table_b This should be implemented after regular correlated and uncorrelated subqueries are implemented: https://issues.apache.org/jira/browse/HIVE-784 Support uncorrelated subqueries in the WHERE clause https://issues.apache.org/jira/browse/HIVE-1799 Support correlated subqueries in the WHERE clause -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: INSERT non-static data to array?
The example code for inline_table() there has static data. It's not possible to use a subquery inside the inline_table() or array() is it? The SQL1999 way is described here: http://www.postgresql.org/message-id/20041028232152.ga76...@winnie.fuhr.org CREATE TABLE table_a(a int, b int, c int[]); INSERT INTO table_a SELECT a, b, ARRAY(SELECT c FROM table_c WHERE table_c.parent = table_b.id) FROM table_b From: Edward Capriolo edlinuxg...@gmail.com To: user@hive.apache.org user@hive.apache.org; Michael Malak michaelma...@yahoo.com Sent: Wednesday, June 19, 2013 2:06 PM Subject: Re: INSERT non-static data to array? : https://issues.apache.org/jira/browse/HIVE-3238 This might fit the bill. On Wed, Jun 19, 2013 at 3:23 PM, Michael Malak michaelma...@yahoo.com wrote: Is the only way to INSERT data into a column of type array to load data from a pre-existing file, to use hard-coded values in the INSERT statement, or copy an entire array verbatim from another table? I.e. I'm assuming that a) SQL1999 array INSERT via subquery is not (yet) implemented in Hive, and b) there is also no other way to load dynamically generated data into an array column? If my assumption in a) is true, does a Jira item need to be created for it?
Re: Hive QL - NOT IN, NOT EXIST
--- On Sun, 5/5/13, Peter Chu pete@outlook.com wrote: I am wondering if there is any way to do this without resorting to using left outer join and finding nulls. I have found this to be an acceptable substitute. Is it not working for you?
[jira] [Commented] (HIVE-4022) Structs and struct fields cannot be NULL in INSERT statements
[ https://issues.apache.org/jira/browse/HIVE-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13582662#comment-13582662 ] Michael Malak commented on HIVE-4022: - Note that there is a workaround for the case of setting STRUCT fields to NULL, but not for setting the whole STRUCT to a NULL. The following workaround does work: INSERT INT TABLE oc SELECT named_struct('a', cast(null as int), 'b', cast(null as int)) FROM tc; But there is no equivalent workaround to casting the whole STRUCT to NULL, as noted in the first comment of https://issues.apache.org/jira/browse/HIVE-1287 Structs and struct fields cannot be NULL in INSERT statements - Key: HIVE-4022 URL: https://issues.apache.org/jira/browse/HIVE-4022 Project: Hive Issue Type: Bug Components: Serializers/Deserializers Reporter: Michael Malak Originally thought to be Avro-specific, and first noted with respect to HIVE-3528 Avro SerDe doesn't handle serializing Nullable types that require access to a Schema, it turns out even native Hive tables cannot store NULL in a STRUCT field or for the entire STRUCT itself, at least when the NULL is specified directly in the INSERT statement. Again, this affects both Avro-backed tables and native Hive tables. ***For native Hive tables: The following: echo 1,2 twovalues.csv hive CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc; CREATE TABLE oc (z STRUCTa: int, b: int); INSERT INTO TABLE oc SELECT null FROM tc; produces the error FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'oc': Cannot convert column 0 from void to structa:int,b:int. The following: INSERT INTO TABLE oc SELECT named_struct('a', null, 'b', null) FROM tc; produces the error: FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'oc': Cannot convert column 0 from structa:void,b:void to structa:int,b:int. ***For Avro: In HIVE-3528, there is in fact a null-struct test case in line 14 of https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt The test script at https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q does indeed work. But in that test, the query gets all of its data from a test table verbatim: INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer; If instead we stick in a hard-coded null for the struct directly into the query, it fails: INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, bytes1, fixed1 FROM test_serializer; with the following error: FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target table because column number/types are different 'as_avro': Cannot convert column 10 from void to structsint:int,sboolean:boolean,sstring:string. Note, though, that substituting a hard-coded null for string1 (and restoring struct1 into the query) does work: INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, bytes1, fixed1 FROM test_serializer; -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (HIVE-3528) Avro SerDe doesn't handle serializing Nullable types that require access to a Schema
[ https://issues.apache.org/jira/browse/HIVE-3528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13582664#comment-13582664 ] Michael Malak commented on HIVE-3528: - As noted in the first comment from https://issues.apache.org/jira/browse/HIVE-1287, casting to a STRUCT is not currently supported. However, I did just now try casting individual fields of a STRUCT and that indeed does work. I just now added details to the JIRA that I created last week. https://issues.apache.org/jira/browse/HIVE-4022 Avro SerDe doesn't handle serializing Nullable types that require access to a Schema Key: HIVE-3528 URL: https://issues.apache.org/jira/browse/HIVE-3528 Project: Hive Issue Type: Bug Components: Serializers/Deserializers Reporter: Sean Busbey Assignee: Sean Busbey Labels: avro Fix For: 0.11.0 Attachments: HIVE-3528.1.patch.txt, HIVE-3528.2.patch.txt Deserialization properly handles hiding Nullable Avro types, including complex types like record, map, array, etc. However, when Serialization attempts to write out these types it erroneously makes use of the UNION schema that contains NULL and the other type. This results in Schema mis-match errors for Record, Array, Enum, Fixed, and Bytes. Here's a [review board of unit tests that express the problem|https://reviews.apache.org/r/7431/], as well as one that supports the case that it's only when the schema is needed. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: NULLable STRUCTs
If no one has any objection, I'm going to update HIVE-4022, which I entered a week ago when I thought the behavior was Avro-specific, to indicate it actually affects even native Hive tables. https://issues.apache.org/jira/browse/HIVE-4022 --- On Fri, 2/15/13, Michael Malak michaelma...@yahoo.com wrote: From: Michael Malak michaelma...@yahoo.com Subject: NULLable STRUCTs To: user@hive.apache.org Date: Friday, February 15, 2013, 5:03 PM It seems that all Hive columns (at least those of primitive types) are always NULLable? What about columns of type STRUCT? The following: echo 1,2 twovalues.csv hive CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc; CREATE TABLE oc (z STRUCTa: int, b: int); INSERT INTO TABLE oc SELECT null FROM tc; produces the error FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'oc': Cannot convert column 0 from void to structa:int,b:int. I initially discovered such behavior with Avro-backed tables, and even entered a JIRA https://issues.apache.org/jira/browse/HIVE-4022 but now I realized it happens with CSV-backed tables as well. Perhaps related, perhaps not, it seems that all members of a STRUCT are always non-NULLable. The following: INSERT INTO TABLE oc SELECT named_struct('a', null, 'b', null) FROM tc; produces the error: FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'oc': Cannot convert column 0 from structa:void,b:void to structa:int,b:int.
[jira] [Updated] (HIVE-4022) Structs and struct fields cannot be NULL in INSERT statements
[ https://issues.apache.org/jira/browse/HIVE-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Malak updated HIVE-4022: Description: Originally thought to be Avro-specific, and first noted with respect to HIVE-3528 Avro SerDe doesn't handle serializing Nullable types that require access to a Schema, it turns out even native Hive tables cannot store NULL in a STRUCT field or for the entire STRUCT itself, at least when the NULL is specified directly in the INSERT statement. Again, this affects both Avro-backed tables and native Hive tables. ***For native Hive tables: The following: echo 1,2 twovalues.csv hive CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc; CREATE TABLE oc (z STRUCTa: int, b: int); INSERT INTO TABLE oc SELECT null FROM tc; produces the error FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'oc': Cannot convert column 0 from void to structa:int,b:int. The following: INSERT INTO TABLE oc SELECT named_struct('a', null, 'b', null) FROM tc; produces the error: FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'oc': Cannot convert column 0 from structa:void,b:void to structa:int,b:int. ***For Avro: In HIVE-3528, there is in fact a null-struct test case in line 14 of https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt The test script at https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q does indeed work. But in that test, the query gets all of its data from a test table verbatim: INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer; If instead we stick in a hard-coded null for the struct directly into the query, it fails: INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, bytes1, fixed1 FROM test_serializer; with the following error: FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target table because column number/types are different 'as_avro': Cannot convert column 10 from void to structsint:int,sboolean:boolean,sstring:string. Note, though, that substituting a hard-coded null for string1 (and restoring struct1 into the query) does work: INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, bytes1, fixed1 FROM test_serializer; was: Related to HIVE-3528, There is in fact a null-struct test case in line 14 of https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt The test script at https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q does indeed work. But in that test, the query gets all of its data from a test table verbatim: INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer; If instead we stick in a hard-coded null for the struct directly into the query, it fails: INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, bytes1, fixed1 FROM test_serializer; with the following error: FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target table because column number/types are different 'as_avro': Cannot convert column 10 from void to structsint:int,sboolean:boolean,sstring:string. Note, though, that substituting a hard-coded null for string1 (and restoring struct1 into the query) does work: INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, bytes1, fixed1 FROM test_serializer; Summary: Structs and struct fields cannot be NULL in INSERT statements (was: Avro SerDe queries don't handle hard-coded nulls for optional/nullable structs) Structs and struct fields cannot be NULL in INSERT statements - Key: HIVE-4022 URL: https://issues.apache.org/jira/browse/HIVE-4022 Project: Hive Issue Type: Bug Components: Serializers/Deserializers Reporter: Michael Malak Originally thought to be Avro-specific, and first noted with respect to HIVE-3528 Avro SerDe doesn't handle serializing Nullable types that require access to a Schema, it turns out even native Hive tables cannot store NULL in a STRUCT field or for the entire STRUCT itself, at least when the NULL is specified directly in the INSERT
NULLable STRUCTs
It seems that all Hive columns (at least those of primitive types) are always NULLable? What about columns of type STRUCT? The following: echo 1,2 twovalues.csv hive CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc; CREATE TABLE oc (z STRUCTa: int, b: int); INSERT INTO TABLE oc SELECT null FROM tc; produces the error FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'oc': Cannot convert column 0 from void to structa:int,b:int. I initially discovered such behavior with Avro-backed tables, and even entered a JIRA https://issues.apache.org/jira/browse/HIVE-4022 but now I realized it happens with CSV-backed tables as well. Perhaps related, perhaps not, it seems that all members of a STRUCT are always non-NULLable. The following: INSERT INTO TABLE oc SELECT named_struct('a', null, 'b', null) FROM tc; produces the error: FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'oc': Cannot convert column 0 from structa:void,b:void to structa:int,b:int.
[jira] [Commented] (HIVE-3528) Avro SerDe doesn't handle serializing Nullable types that require access to a Schema
[ https://issues.apache.org/jira/browse/HIVE-3528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13578523#comment-13578523 ] Michael Malak commented on HIVE-3528: - I've tried the latest Avro SerDe from GitHub, and it allows me to write NULLs with a simple schema, but not with anything involving STRUCTs. If a STRUCT contains NULLable fields, or the entire STRUCT is NULLable (optional), Hive throws exceptions. Should I create new JIRA item(s)? Avro SerDe doesn't handle serializing Nullable types that require access to a Schema Key: HIVE-3528 URL: https://issues.apache.org/jira/browse/HIVE-3528 Project: Hive Issue Type: Bug Components: Serializers/Deserializers Reporter: Sean Busbey Assignee: Sean Busbey Labels: avro Fix For: 0.11.0 Attachments: HIVE-3528.1.patch.txt, HIVE-3528.2.patch.txt Deserialization properly handles hiding Nullable Avro types, including complex types like record, map, array, etc. However, when Serialization attempts to write out these types it erroneously makes use of the UNION schema that contains NULL and the other type. This results in Schema mis-match errors for Record, Array, Enum, Fixed, and Bytes. Here's a [review board of unit tests that express the problem|https://reviews.apache.org/r/7431/], as well as one that supports the case that it's only when the schema is needed. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (HIVE-3528) Avro SerDe doesn't handle serializing Nullable types that require access to a Schema
[ https://issues.apache.org/jira/browse/HIVE-3528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13578538#comment-13578538 ] Michael Malak commented on HIVE-3528: - Sean: I mean https://github.com/apache/hive/tree/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro which has: AvroSerializer.java 20 days ago HIVE-3528 : Avro SerDe doesn't handle serializing Nullable types that Avro SerDe doesn't handle serializing Nullable types that require access to a Schema Key: HIVE-3528 URL: https://issues.apache.org/jira/browse/HIVE-3528 Project: Hive Issue Type: Bug Components: Serializers/Deserializers Reporter: Sean Busbey Assignee: Sean Busbey Labels: avro Fix For: 0.11.0 Attachments: HIVE-3528.1.patch.txt, HIVE-3528.2.patch.txt Deserialization properly handles hiding Nullable Avro types, including complex types like record, map, array, etc. However, when Serialization attempts to write out these types it erroneously makes use of the UNION schema that contains NULL and the other type. This results in Schema mis-match errors for Record, Array, Enum, Fixed, and Bytes. Here's a [review board of unit tests that express the problem|https://reviews.apache.org/r/7431/], as well as one that supports the case that it's only when the schema is needed. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (HIVE-3528) Avro SerDe doesn't handle serializing Nullable types that require access to a Schema
[ https://issues.apache.org/jira/browse/HIVE-3528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13578783#comment-13578783 ] Michael Malak commented on HIVE-3528: - Sean: OK, I've researched the problem further. There is in fact a null-struct test case in line 14 of https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt The test script at https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q does indeed work when I tested it locally. But in that test, the query gets all of its data from a test table verbatim: INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer; If instead we stick in a hard-coded null for the struct directly into the query, it fails: INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, bytes1, fixed1 FROM test_serializer; with the following error: FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target table because column number/types are different 'as_avro': Cannot convert column 10 from void to structsint:int,sboolean:boolean,sstring:string. Note, though, that substituting a hard-coded null for string1 (and restoring struct1 to the query) does work: INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, bytes1, fixed1 FROM test_serializer; I will be entering an all-new JIRA for this. Avro SerDe doesn't handle serializing Nullable types that require access to a Schema Key: HIVE-3528 URL: https://issues.apache.org/jira/browse/HIVE-3528 Project: Hive Issue Type: Bug Components: Serializers/Deserializers Reporter: Sean Busbey Assignee: Sean Busbey Labels: avro Fix For: 0.11.0 Attachments: HIVE-3528.1.patch.txt, HIVE-3528.2.patch.txt Deserialization properly handles hiding Nullable Avro types, including complex types like record, map, array, etc. However, when Serialization attempts to write out these types it erroneously makes use of the UNION schema that contains NULL and the other type. This results in Schema mis-match errors for Record, Array, Enum, Fixed, and Bytes. Here's a [review board of unit tests that express the problem|https://reviews.apache.org/r/7431/], as well as one that supports the case that it's only when the schema is needed. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (HIVE-4022) Avro SerDe queries don't handle hard-coded nulls for optional/nullable structs
Michael Malak created HIVE-4022: --- Summary: Avro SerDe queries don't handle hard-coded nulls for optional/nullable structs Key: HIVE-4022 URL: https://issues.apache.org/jira/browse/HIVE-4022 Project: Hive Issue Type: Bug Components: Serializers/Deserializers Reporter: Michael Malak Related to HIVE-3528, There is in fact a null-struct test case in line 14 of https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt The test script at https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q does indeed work. But in that test, the query gets all of its data from a test table verbatim: INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer; If instead we stick in a hard-coded null for the struct directly into the query, it fails: INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, bytes1, fixed1 FROM test_serializer; with the following error: FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target table because column number/types are different 'as_avro': Cannot convert column 10 from void to structsint:int,sboolean:boolean,sstring:string. Note, though, that substituting a hard-coded null for string1 (and restoring struct1 into the query) does work: INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, bigint1, boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, bytes1, fixed1 FROM test_serializer; -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: INSERT INTO table with STRUCT, SELECT FROM
Figured it out fromhttps://cwiki.apache.org/Hive/languagemanual-udf.html#LanguageManualUDF-ComplexTypeConstructors It should beINSERT INTO TABLE oc SELECT named_struct('a', x, 'b', y) FROM tc; --- On Wed, 2/13/13, Dean Wampler dean.wamp...@thinkbiganalytics.com wrote: From: Dean Wampler dean.wamp...@thinkbiganalytics.com Subject: Re: INSERT INTO table with STRUCT, SELECT FROM To: user@hive.apache.org Date: Wednesday, February 13, 2013, 12:47 PM Hmm. I tried the following hacks, but all wouldn't parse. Ideas? I changed: ... select struct(x,y) ... to ... select struct(x,y) as structa:int,b:int ... ... select cast(struct(x,y) as structa:int,b:int) ... ... select struct(x as a,y as b) ... Okay, but there is a hack that does work; By pass INSERT INTO and just write to the directory: INSERT DIRECTORY '/path/to/table/directory' SELECT ...; Just be careful it doesn't clobber any files already there. I'm paranoid, so I would write to a different directory and then move the files over... dean On Wed, Feb 13, 2013 at 1:26 PM, Michael Malak michaelma...@yahoo.com wrote: Is it possible to INSERT INTO TABLE t SELECT FROM where t has a column with a STRUCT? Based on http://grokbase.com/t/hive/user/109r87hh3e/insert-data-into-a-column-of-complex-type I thought perhaps the following would work: echo 1,2 twovalues.csv hive CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc; CREATE TABLE oc (z STRUCTa: int, b: int); INSERT INTO TABLE oc SELECT struct(x,y) FROM tc; but when I do the above I get: FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'oc': Cannot convert column 0 from structcol1:int,col2:int to structa:int,b:int. -- Dean Wampler, Ph.D.thinkbiganalytics.com+1-312-339-1330
[jira] [Commented] (AVRO-1035) Add the possibility to append to existing avro files
[ https://issues.apache.org/jira/browse/AVRO-1035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13573711#comment-13573711 ] Michael Malak commented on AVRO-1035: - ha...@cloudera.com has provided example code on how to accomplish HDFS Avro append at https://gist.github.com/QwertyManiac/4724582 Add the possibility to append to existing avro files -- Key: AVRO-1035 URL: https://issues.apache.org/jira/browse/AVRO-1035 Project: Avro Issue Type: New Feature Reporter: Vyacheslav Zholudev Currently it is not possible to append to avro files that were written and closed. Here is a Scott Carey's reply on the mailing list: {quote} It is not possible without modifying DataFileWriter. Please open a JIRA ticket. It could not simply append to an OutputStream, since it must either: * Seek to the start to validate the schemas match and find the sync marker, or * Trust that the schemas match and find the sync marker from the last block DataFileWriter cannot refer to Hadoop classes such as FileSystem, but we could add something to the mapred module that takes a Path and FileSystem and returns something that implemements an interface that DataFileWriter can append to. This would be something that is both a http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/SeekableInp ut.html and an OutputStream, or has both an InputStream from the start of the existing file and an OutputStream at the end. {quote} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: Is it possible to append to an already existing avro file
I confess to being a user of rather than a developer of open source, but perhaps you could elaborate on what depends on means and what the consequences are? Isn't it -- or couldn't it be made -- a run-time binding, so that only those who try to use the HDFS append functionality would be required to also include the HDFS Jars in their classpath? Or is the issue more of a bookkeeping one, whereby every update to HDFS will require an Avro regression test? Now that Hive supports Avro as of the Jan. 11 release of Hive 0.10, the use case of ingesting data into Avro on HDFS is only going to get more popular, and appending is very handy for ingesting, especially for live real-time or near-real-time data. So it seems to me that if the inconveniences are minor or can be worked around, that Avro indeed should perhaps depend on HDFS. --- On Thu, 2/7/13, Harsh J ha...@cloudera.com wrote: From: Harsh J ha...@cloudera.com Subject: Re: Is it possible to append to an already existing avro file To: user@avro.apache.org Date: Thursday, February 7, 2013, 9:28 AM I assume by non-trivial you meant the extra Seekable stuff I needed to wrap around the DFS output streams to let Avro take it as append-able? I don't think its possible for Avro to carry it since Avro (core) does not reverse-depend on Hadoop. Should we document it somewhere though? Do you have any ideas on the best place to do that? On Thu, Feb 7, 2013 at 6:12 AM, Michael Malak michaelma...@yahoo.com wrote: Thanks so much for the code -- it works great! Since it is a non-trivial amount of code required to achieve append, I suggest attaching that code to AVRO-1035, in the hopes that someone will come up with an interface that requires just one line of user code to achieve append. --- On Wed, 2/6/13, Harsh J ha...@cloudera.com wrote: From: Harsh J ha...@cloudera.com Subject: Re: Is it possible to append to an already existing avro file To: user@avro.apache.org Date: Wednesday, February 6, 2013, 11:17 AM Hey Michael, It does implement the regular Java OutputStream interface, as seen in the API: http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FSDataOutputStream.html. Here's a sample program that works on Hadoop 2.x in my tests: https://gist.github.com/QwertyManiac/4724582
Re: Is it possible to append to an already existing avro file
Thanks so much for the code -- it works great! Since it is a non-trivial amount of code required to achieve append, I suggest attaching that code to AVRO-1035, in the hopes that someone will come up with an interface that requires just one line of user code to achieve append. --- On Wed, 2/6/13, Harsh J ha...@cloudera.com wrote: From: Harsh J ha...@cloudera.com Subject: Re: Is it possible to append to an already existing avro file To: user@avro.apache.org Date: Wednesday, February 6, 2013, 11:17 AM Hey Michael, It does implement the regular Java OutputStream interface, as seen in the API: http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FSDataOutputStream.html. Here's a sample program that works on Hadoop 2.x in my tests: https://gist.github.com/QwertyManiac/4724582 On Wed, Feb 6, 2013 at 9:00 AM, Michael Malak michaelma...@yahoo.com wrote: I don't believe a Hadoop FileSystem is a Java OutputStream? --- On Tue, 2/5/13, Doug Cutting cutt...@apache.org wrote: From: Doug Cutting cutt...@apache.org Subject: Re: Is it possible to append to an already existing avro file To: user@avro.apache.org Date: Tuesday, February 5, 2013, 5:27 PM It will work on an OutputStream that supports append. http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(org.apache.avro.file.SeekableInput, java.io.OutputStream) So it depends on how well HDFS implements FileSystem#append(), not on any changes in Avro. http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append(org.apache.hadoop.fs.Path) I have no recent personal experience with append in HDFS. Does anyone else here? Doug On Tue, Feb 5, 2013 at 4:10 PM, Michael Malak michaelma...@yahoo.com wrote: My understanding is that will append to a file on the local filesystem, but not to a file on HDFS. --- On Tue, 2/5/13, Doug Cutting cutt...@apache.org wrote: From: Doug Cutting cutt...@apache.org Subject: Re: Is it possible to append to an already existing avro file To: user@avro.apache.org Date: Tuesday, February 5, 2013, 5:08 PM The Jira is: https://issues.apache.org/jira/browse/AVRO-1035 It is possible to append to an existing Avro file: http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(java.io.File) Should we close that issue as fixed? Doug On Fri, Feb 1, 2013 at 11:32 AM, Michael Malak michaelma...@yahoo.com wrote: Was a JIRA ticket ever created regarding appending to an existing Avro file on HDFS? What is the status of such a capability, a year out from when the issue below was raised? On Wed, 22 Feb 2012 10:57:48 +0100, Vyacheslav Zholudev vyacheslav.zholu...@gmail.com wrote: Thanks for your reply, I suspected this. I will create a JIRA ticket. Vyacheslav On Feb 21, 2012, at 6:02 PM, Scott Carey wrote: On 2/21/12 7:29 AM, Vyacheslav Zholudev vyacheslav.zholu...@gmail.com wrote: Yep, I saw that method as well as the stackoverflow post. However, I'm interested how to append to a file on the arbitrary file system, not only on the local one. I want to get an OutputStream based on the Path and the FileSystem implementation and then pass it for appending to avro methods. Is that possible? It is not possible without modifying DataFileWriter. Please open a JIRA ticket. It could not simply append to an OutputStream, since it must either: * Seek to the start to validate the schemas match and find the sync marker, or * Trust that the schemas match and find the sync marker from the last block DataFileWriter cannot refer to Hadoop classes such as FileSystem, but we could add something to the mapred module that takes a Path and FileSystem and returns something that implemements an interface that DataFileWriter can append to. This would be something that is both a http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/SeekableInput.html and an OutputStream, or has both an InputStream from the start of the existing file and an OutputStream at the end. Thanks, Vyacheslav On Feb 21, 2012, at 5:29 AM, Harsh J wrote: Hi, Use the appendTo feature of the DataFileWriter. See http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(java.io.File) For a quick setup example, read also: http://stackoverflow.com/questions/8806689/can-you-append-data-to-an-existing-avro-data-file On Tue, Feb 21, 2012 at 3:15 AM, Vyacheslav Zholudev vyacheslav.zholu...@gmail.com wrote: Hi, is it possible to append
Re: Is it possible to append to an already existing avro file
I don't believe a Hadoop FileSystem is a Java OutputStream? --- On Tue, 2/5/13, Doug Cutting cutt...@apache.org wrote: From: Doug Cutting cutt...@apache.org Subject: Re: Is it possible to append to an already existing avro file To: user@avro.apache.org Date: Tuesday, February 5, 2013, 5:27 PM It will work on an OutputStream that supports append. http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(org.apache.avro.file.SeekableInput, java.io.OutputStream) So it depends on how well HDFS implements FileSystem#append(), not on any changes in Avro. http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append(org.apache.hadoop.fs.Path) I have no recent personal experience with append in HDFS. Does anyone else here? Doug On Tue, Feb 5, 2013 at 4:10 PM, Michael Malak michaelma...@yahoo.com wrote: My understanding is that will append to a file on the local filesystem, but not to a file on HDFS. --- On Tue, 2/5/13, Doug Cutting cutt...@apache.org wrote: From: Doug Cutting cutt...@apache.org Subject: Re: Is it possible to append to an already existing avro file To: user@avro.apache.org Date: Tuesday, February 5, 2013, 5:08 PM The Jira is: https://issues.apache.org/jira/browse/AVRO-1035 It is possible to append to an existing Avro file: http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(java.io.File) Should we close that issue as fixed? Doug On Fri, Feb 1, 2013 at 11:32 AM, Michael Malak michaelma...@yahoo.com wrote: Was a JIRA ticket ever created regarding appending to an existing Avro file on HDFS? What is the status of such a capability, a year out from when the issue below was raised? On Wed, 22 Feb 2012 10:57:48 +0100, Vyacheslav Zholudev vyacheslav.zholu...@gmail.com wrote: Thanks for your reply, I suspected this. I will create a JIRA ticket. Vyacheslav On Feb 21, 2012, at 6:02 PM, Scott Carey wrote: On 2/21/12 7:29 AM, Vyacheslav Zholudev vyacheslav.zholu...@gmail.com wrote: Yep, I saw that method as well as the stackoverflow post. However, I'm interested how to append to a file on the arbitrary file system, not only on the local one. I want to get an OutputStream based on the Path and the FileSystem implementation and then pass it for appending to avro methods. Is that possible? It is not possible without modifying DataFileWriter. Please open a JIRA ticket. It could not simply append to an OutputStream, since it must either: * Seek to the start to validate the schemas match and find the sync marker, or * Trust that the schemas match and find the sync marker from the last block DataFileWriter cannot refer to Hadoop classes such as FileSystem, but we could add something to the mapred module that takes a Path and FileSystem and returns something that implemements an interface that DataFileWriter can append to. This would be something that is both a http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/SeekableInput.html and an OutputStream, or has both an InputStream from the start of the existing file and an OutputStream at the end. Thanks, Vyacheslav On Feb 21, 2012, at 5:29 AM, Harsh J wrote: Hi, Use the appendTo feature of the DataFileWriter. See http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(java.io.File) For a quick setup example, read also: http://stackoverflow.com/questions/8806689/can-you-append-data-to-an-existing-avro-data-file On Tue, Feb 21, 2012 at 3:15 AM, Vyacheslav Zholudev vyacheslav.zholu...@gmail.com wrote: Hi, is it possible to append to an already existing avro file when it was written and closed before? If I use outputStream = fs.append(avroFilePath); then later on I get: java.io.IOException: Invalid sync! Probably because the schema is written twice and some other issues. If I use outputStream = fs.create(avroFilePath); then the avro file gets overwritten. Thanks, Vyacheslav -- Harsh J Customer Ops. Engineer Cloudera | http://tiny.cloudera.com/about On Fri, Feb 1, 2013 at 11:32 AM, Michael Malak michaelma...@yahoo.com wrote: Was a JIRA ticket ever created regarding appending to an existing Avro file on HDFS? What is the status of such a capability, a year out from when the issue below was raised? On Wed, 22 Feb 2012 10:57:48 +0100, Vyacheslav Zholudev vyacheslav.zholu...@gmail.com wrote: Thanks for your reply, I suspected this. I will create a JIRA ticket. Vyacheslav On Feb 21, 2012, at 6:02 PM, Scott Carey wrote: On 2/21/12 7:29 AM, Vyacheslav
Hard-coded inline relations
I'm new to Pig, and it looks like there is no provision to declare relations inline in a Pig script (without LOADing from an external file)? Based on http://pig.apache.org/docs/r0.7.0/piglatin_ref2.html#Constants I would have thought the following would constitute Hello World for Pig: A = {('Hello'),('World')}; DUMP A; But I get a syntax error. The ability to inline relations would be useful for debugging. Is this limitation by design, or is it just not implemented yet?