Re: Minimum cost flow problem solving in Spark

2017-09-13 Thread Michael Malak
You might be interested in "Maximum Flow implementation on Spark GraphX" done 
by a Colorado School of Mines grad student a couple of years ago.


http://datascienceassn.org/2016-01-27-maximum-flow-implementation-spark-graphx


From: Swapnil Shinde 
To: user@spark.apache.org; d...@spark.apache.org 
Sent: Wednesday, September 13, 2017 9:41 AM
Subject: Minimum cost flow problem solving in Spark



Hello
Has anyone used Spark to solve minimum cost flow problems in Spark? I am 
quite new to combinatorial optimization algorithms so any help or suggestions, 
libraries are very appreciated. 

Thanks
Swapnil

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Shortest path with directed and weighted graphs

2016-10-24 Thread Michael Malak
Chapter 6 of my book implements Dijkstra's Algorithm. The source code is 
available to download for free. 
https://www.manning.com/books/spark-graphx-in-action



  From: Brian Wilson 
 To: user@spark.apache.org 
 Sent: Monday, October 24, 2016 7:11 AM
 Subject: Shortest path with directed and weighted graphs
   
I have been looking at the ShortestPaths function inbuilt with Spark here.
Am I correct in saying there is no support for weighted graphs with this 
function? By that I mean that it assumes all edges carry a weight = 1
Many thanks
Brian 

   

Re: GraphX drawing algorithm

2016-09-11 Thread Michael Malak
In chapter 10 of Spark GraphX In Action, we describe how to use Zeppelin with 
d3.js to render graphs using d3's force-directed rendering algorithm. The 
source code can be downloaded for free from 
https://www.manning.com/books/spark-graphx-in-action
  From: agc studio 
 To: user@spark.apache.org 
 Sent: Sunday, September 11, 2016 5:59 PM
 Subject: GraphX drawing algorithm
   
Hi all,
I was wondering if a force-directed graph drawing algorithm has been 
implemented for graphX?

Thanks

   

Re: Where is DataFrame.scala in 2.0?

2016-06-03 Thread Michael Malak
It's been reduced to a single line of code.
http://technicaltidbit.blogspot.com/2016/03/dataframedataset-swap-places-in-spark-20.html




  From: Gerhard Fiedler 
 To: "dev@spark.apache.org"  
 Sent: Friday, June 3, 2016 9:01 AM
 Subject: Where is DataFrame.scala in 2.0?
   
 When I look at the sources in Github, I see 
DataFrame.scala 
athttps://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
 in the 1.6 branch. But when I change the branch to branch-2.0 or master, I get 
a 404 error. I also can’t find the file in the directory listings, for example 
https://github.com/apache/spark/tree/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql
 (for branch-2.0).    It seems that quite a few APIs use the DataFrame class, 
even in 2.0. Can someone please point me to its location, or otherwise explain 
why it is not there?    Thanks, Gerhard    

  

Re: GraphX Java API

2016-05-30 Thread Michael Malak
Yes, it is possible to use GraphX from Java but it requires 10x the amount of 
code and involves using obscure typing and pre-defined lambda prototype 
facilities. I give an example of it in my book, the source code for which can 
be downloaded for free from 
https://www.manning.com/books/spark-graphx-in-action The relevant example is 
EdgeCount.java in chapter 10.
As I suggest in my book, likely the only reason you'd want to put yourself 
through that torture is corporate mandate or compatibility with Java bytecode 
tools.

  From: Sean Owen 
 To: Takeshi Yamamuro ; "Kumar, Abhishek (US - 
Bengaluru)"  
Cc: "user@spark.apache.org" 
 Sent: Monday, May 30, 2016 7:07 AM
 Subject: Re: GraphX Java API
   
No, you can call any Scala API in Java. It is somewhat less convenient if the 
method was not written with Java in mind but does work. 

On Mon, May 30, 2016, 00:32 Takeshi Yamamuro  wrote:

These package are used only for Scala.
On Mon, May 30, 2016 at 2:23 PM, Kumar, Abhishek (US - Bengaluru) 
 wrote:

Hey,·  I see some graphx packages listed 
here:http://spark.apache.org/docs/latest/api/java/index.html·  
org.apache.spark.graphx·  org.apache.spark.graphx.impl·  
org.apache.spark.graphx.lib·  org.apache.spark.graphx.utilAren’t they meant 
to be used with JAVA?Thanks From: Santoshakhilesh 
[mailto:santosh.akhil...@huawei.com]
Sent: Friday, May 27, 2016 4:52 PM
To: Kumar, Abhishek (US - Bengaluru) ; 
user@spark.apache.org
Subject: RE: GraphX Java API GraphX APis are available only in Scala. If you 
need to use GraphX you need to switch to Scala. From: Kumar, Abhishek (US - 
Bengaluru) [mailto:abhishekkuma...@deloitte.com]
Sent: 27 May 2016 19:59
To: user@spark.apache.org
Subject: GraphX Java API Hi, We are trying to consume the Java API for GraphX, 
but there is no documentation available online on the usage or examples. It 
would be great if we could get some examples in Java. Thanks and regards, 
Abhishek Kumar   This message (including any attachments) contains confidential 
information intended for a specific individual and purpose, and is protected by 
law. If you are not the intended recipient, you should delete this message and 
any disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.v.E.1



-- 
---
Takeshi Yamamuro



  

Re: Adhoc queries on Spark 2.0 with Structured Streaming

2016-05-06 Thread Michael Malak
At first glance, it looks like the only streaming data sources available out of 
the box from the github master branch are 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 and 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
 . Out of the Jira epic for Structured Streaming 
https://issues.apache.org/jira/browse/SPARK-8360 it would seem the still-open 
https://issues.apache.org/jira/browse/SPARK-10815 "API design: data sources and 
sinks" is relevant here.
In short, it would seem the code is not there yet to create a Kafka-fed 
Dataframe/Dataset that can be queried with Structured Streaming; or if it is, 
it's not obvious how to write such code.

  From: Anthony May 
 To: Deepak Sharma ; Sunita Arvind 
 
Cc: "user@spark.apache.org" 
 Sent: Friday, May 6, 2016 11:50 AM
 Subject: Re: Adhoc queries on Spark 2.0 with Structured Streaming
   
Yeah, there isn't even a RC yet and no documentation but you can work off the 
code base and test suites:
https://github.com/apache/spark
And this might help:
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala

On Fri, 6 May 2016 at 11:07 Deepak Sharma  wrote:

Spark 2.0 is yet to come out for public release.
I am waiting to get hands on it as well.
Please do let me know if i can download source and build spark2.0 from github.

Thanks
Deepak

On Fri, May 6, 2016 at 9:51 PM, Sunita Arvind  wrote:

Hi All,

We are evaluating a few real time streaming query engines and spark is my 
personal choice. The addition of adhoc queries is what is getting me further 
excited about it, however the talks I have heard so far only mention about it 
but do not provide details. I need to build a prototype to ensure it works for 
our use cases. 

Can someone point me to relevant material for this.

regards
Sunita




-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


  

Re: Spark 2.0 forthcoming features

2016-04-20 Thread Michael Malak
http://go.databricks.com/apache-spark-2.0-presented-by-databricks-co-founder-reynold-xin



  From: Sourav Mazumder 
 To: user  
 Sent: Wednesday, April 20, 2016 11:07 AM
 Subject: Spark 2.0 forthcoming features
   
Hi All,

Is there somewhere we can get idea of the upcoming features in Spark 2.0.

I got a list for Spark ML from here 
https://issues.apache.org/jira/browse/SPARK-12626.

Is there other links where I can similar enhancements planned for Sparl SQL, 
Spark Core, Spark Streaming. GraphX etc. ?

Thanks in advance.

Regards,
Sourav


   

Re: Apache Flink

2016-04-17 Thread Michael Malak
As with all history, "what if"s are not scientifically testable hypotheses, but 
my speculation is the energy (VCs, startups, big Internet companies, 
universities) within Silicon Valley contrasted to Germany.

  From: Mich Talebzadeh <mich.talebza...@gmail.com>
 To: Michael Malak <michaelma...@yahoo.com>; "user @spark" 
<user@spark.apache.org> 
 Sent: Sunday, April 17, 2016 3:55 PM
 Subject: Re: Apache Flink
   
Assuming that both Spark and Flink are contemporaries what are the reasons that 
Flink has not been adopted widely? (this may sound obvious and or prejudged). I 
mean Spark has surged in popularity in the past year if I am correct
Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 22:49, Michael Malak <michaelma...@yahoo.com> wrote:

In terms of publication date, a paper on Nephele was published in 2009, prior 
to the 2010 USENIX paper on Spark. Nephele is the execution engine of 
Stratosphere, which became Flink.

  From: Mark Hamstra <m...@clearstorydata.com>
 To: Mich Talebzadeh <mich.talebza...@gmail.com> 
Cc: Corey Nolet <cjno...@gmail.com>; "user @spark" <user@spark.apache.org>
 Sent: Sunday, April 17, 2016 3:30 PM
 Subject: Re: Apache Flink
  
To be fair, the Stratosphere project from which Flink springs was started as a 
collaborative university research project in Germany about the same time that 
Spark was first released as Open Source, so they are near contemporaries rather 
than Flink having been started only well after Spark was an established and 
widely-used Apache project.
On Sun, Apr 17, 2016 at 2:25 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
wrote:

Also it always amazes me why they are so many tangential projects in Big Data 
space? Would not it be easier if efforts were spent on adding to Spark 
functionality rather than creating a new product like Flink?
Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 21:08, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:

Thanks Corey for the useful info.
I have used Sybase Aleri and StreamBase as commercial CEPs engines. However, 
there does not seem to be anything close to these products in Hadoop Ecosystem. 
So I guess there is nothing there?
Regards.

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 20:43, Corey Nolet <cjno...@gmail.com> wrote:

i have not been intrigued at all by the microbatching concept in Spark. I am 
used to CEP in real streams processing environments like Infosphere Streams & 
Storm where the granularity of processing is at the level of each individual 
tuple and processing units (workers) can react immediately to events being 
received and processed. The closest Spark streaming comes to this concept is 
the notion of "state" that that can be updated via the "updateStateBykey()" 
functions which are only able to be run in a microbatch. Looking at the 
expected design changes to Spark Streaming in Spark 2.0.0, it also does not 
look like tuple-at-a-time processing is on the radar for Spark, though I have 
seen articles stating that more effort is going to go into the Spark SQL layer 
in Spark streaming which may make it more reminiscent of Esper.
For these reasons, I have not even tried to implement CEP in Spark. I feel it's 
a waste of time without immediate tuple-at-a-time processing. Without this, 
they avoid the whole problem of "back pressure" (though keep in mind, it is 
still very possible to overload the Spark streaming layer with stages that will 
continue to pile up and never get worked off) but they lose the granular 
control that you get in CEP environments by allowing the rules & processors to 
react with the receipt of each tuple, right away. 
Awhile back, I did attempt to implement an InfoSphere Streams-like API [1] on 
top of Apache Storm as an example of what such a design may look like. It looks 
like Storm is going to be replaced in the not so distant future by Twitter's 
new design called Heron. IIRC, Heron does not have an open source 
implementation as of yet. 
[1] https://github.com/calrissian/flowmix
On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
wrote:

Hi Corey,
Can you please point me to docs on using Spark for CEP? Do we have a set of CEP 
libraries somewhere. I am keen on getting hold of adaptor libraries for Spark 
something like below


​Thanks

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 16:07, Corey Nolet <cjno...@gma

Re: Apache Flink

2016-04-17 Thread Michael Malak
There have been commercial CEP solutions for decades, including from my 
employer.

  From: Mich Talebzadeh 
 To: Mark Hamstra  
Cc: Corey Nolet ; "user @spark" 
 Sent: Sunday, April 17, 2016 3:48 PM
 Subject: Re: Apache Flink
   
The problem is that the strength and wider acceptance of a typical Open source 
project is its sizeable user and development community. When the community is 
small like Flink, then it is not a viable solution to adopt 
I am rather disappointed that no big data project can be used for Complex Event 
Processing as it has wider use in Algorithmic trading among others.

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 22:30, Mark Hamstra  wrote:

To be fair, the Stratosphere project from which Flink springs was started as a 
collaborative university research project in Germany about the same time that 
Spark was first released as Open Source, so they are near contemporaries rather 
than Flink having been started only well after Spark was an established and 
widely-used Apache project.
On Sun, Apr 17, 2016 at 2:25 PM, Mich Talebzadeh  
wrote:

Also it always amazes me why they are so many tangential projects in Big Data 
space? Would not it be easier if efforts were spent on adding to Spark 
functionality rather than creating a new product like Flink?
Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 21:08, Mich Talebzadeh  wrote:

Thanks Corey for the useful info.
I have used Sybase Aleri and StreamBase as commercial CEPs engines. However, 
there does not seem to be anything close to these products in Hadoop Ecosystem. 
So I guess there is nothing there?
Regards.

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 20:43, Corey Nolet  wrote:

i have not been intrigued at all by the microbatching concept in Spark. I am 
used to CEP in real streams processing environments like Infosphere Streams & 
Storm where the granularity of processing is at the level of each individual 
tuple and processing units (workers) can react immediately to events being 
received and processed. The closest Spark streaming comes to this concept is 
the notion of "state" that that can be updated via the "updateStateBykey()" 
functions which are only able to be run in a microbatch. Looking at the 
expected design changes to Spark Streaming in Spark 2.0.0, it also does not 
look like tuple-at-a-time processing is on the radar for Spark, though I have 
seen articles stating that more effort is going to go into the Spark SQL layer 
in Spark streaming which may make it more reminiscent of Esper.
For these reasons, I have not even tried to implement CEP in Spark. I feel it's 
a waste of time without immediate tuple-at-a-time processing. Without this, 
they avoid the whole problem of "back pressure" (though keep in mind, it is 
still very possible to overload the Spark streaming layer with stages that will 
continue to pile up and never get worked off) but they lose the granular 
control that you get in CEP environments by allowing the rules & processors to 
react with the receipt of each tuple, right away. 
Awhile back, I did attempt to implement an InfoSphere Streams-like API [1] on 
top of Apache Storm as an example of what such a design may look like. It looks 
like Storm is going to be replaced in the not so distant future by Twitter's 
new design called Heron. IIRC, Heron does not have an open source 
implementation as of yet. 
[1] https://github.com/calrissian/flowmix
On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh  
wrote:

Hi Corey,
Can you please point me to docs on using Spark for CEP? Do we have a set of CEP 
libraries somewhere. I am keen on getting hold of adaptor libraries for Spark 
something like below


​Thanks

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 16:07, Corey Nolet  wrote:

One thing I've noticed about Flink in my following of the project has been that 
it has established, in a few cases, some novel ideas and improvements over 
Spark. The problem with it, however, is that both the development team and the 
community around it are very small and many of those novel improvements have 
been rolled directly into Spark in subsequent versions. I was considering 
changing over my architecture to Flink at one point to get better, more 
real-time CEP streaming support, but in the end I 

Re: Apache Flink

2016-04-17 Thread Michael Malak
In terms of publication date, a paper on Nephele was published in 2009, prior 
to the 2010 USENIX paper on Spark. Nephele is the execution engine of 
Stratosphere, which became Flink.

  From: Mark Hamstra 
 To: Mich Talebzadeh  
Cc: Corey Nolet ; "user @spark" 
 Sent: Sunday, April 17, 2016 3:30 PM
 Subject: Re: Apache Flink
   
To be fair, the Stratosphere project from which Flink springs was started as a 
collaborative university research project in Germany about the same time that 
Spark was first released as Open Source, so they are near contemporaries rather 
than Flink having been started only well after Spark was an established and 
widely-used Apache project.
On Sun, Apr 17, 2016 at 2:25 PM, Mich Talebzadeh  
wrote:

Also it always amazes me why they are so many tangential projects in Big Data 
space? Would not it be easier if efforts were spent on adding to Spark 
functionality rather than creating a new product like Flink?
Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 21:08, Mich Talebzadeh  wrote:

Thanks Corey for the useful info.
I have used Sybase Aleri and StreamBase as commercial CEPs engines. However, 
there does not seem to be anything close to these products in Hadoop Ecosystem. 
So I guess there is nothing there?
Regards.

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 20:43, Corey Nolet  wrote:

i have not been intrigued at all by the microbatching concept in Spark. I am 
used to CEP in real streams processing environments like Infosphere Streams & 
Storm where the granularity of processing is at the level of each individual 
tuple and processing units (workers) can react immediately to events being 
received and processed. The closest Spark streaming comes to this concept is 
the notion of "state" that that can be updated via the "updateStateBykey()" 
functions which are only able to be run in a microbatch. Looking at the 
expected design changes to Spark Streaming in Spark 2.0.0, it also does not 
look like tuple-at-a-time processing is on the radar for Spark, though I have 
seen articles stating that more effort is going to go into the Spark SQL layer 
in Spark streaming which may make it more reminiscent of Esper.
For these reasons, I have not even tried to implement CEP in Spark. I feel it's 
a waste of time without immediate tuple-at-a-time processing. Without this, 
they avoid the whole problem of "back pressure" (though keep in mind, it is 
still very possible to overload the Spark streaming layer with stages that will 
continue to pile up and never get worked off) but they lose the granular 
control that you get in CEP environments by allowing the rules & processors to 
react with the receipt of each tuple, right away. 
Awhile back, I did attempt to implement an InfoSphere Streams-like API [1] on 
top of Apache Storm as an example of what such a design may look like. It looks 
like Storm is going to be replaced in the not so distant future by Twitter's 
new design called Heron. IIRC, Heron does not have an open source 
implementation as of yet. 
[1] https://github.com/calrissian/flowmix
On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh  
wrote:

Hi Corey,
Can you please point me to docs on using Spark for CEP? Do we have a set of CEP 
libraries somewhere. I am keen on getting hold of adaptor libraries for Spark 
something like below


​Thanks

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 17 April 2016 at 16:07, Corey Nolet  wrote:

One thing I've noticed about Flink in my following of the project has been that 
it has established, in a few cases, some novel ideas and improvements over 
Spark. The problem with it, however, is that both the development team and the 
community around it are very small and many of those novel improvements have 
been rolled directly into Spark in subsequent versions. I was considering 
changing over my architecture to Flink at one point to get better, more 
real-time CEP streaming support, but in the end I decided to stick with Spark 
and just watch Flink continue to pressure it into improvement.
On Sun, Apr 17, 2016 at 11:03 AM, Koert Kuipers  wrote:

i never found much info that flink was actually designed to be fault tolerant. 
if fault tolerance is more bolt-on/add-on/afterthought then that doesn't bode 
well for large scale data processing. spark was designed with fault tolerance 
in mind from the beginning.

On Sun, Apr 17, 2016 at 9:52 AM, Mich Talebzadeh 

Re: [discuss] using deep learning to improve Spark

2016-04-01 Thread Michael Malak
I see you've been burning the midnight oil.

  From: Reynold Xin 
 To: "dev@spark.apache.org"  
 Sent: Friday, April 1, 2016 1:15 AM
 Subject: [discuss] using deep learning to improve Spark
   
Hi all,
Hope you all enjoyed the Tesla 3 unveiling earlier tonight.
I'd like to bring your attention to a project called DeepSpark that we have 
been working on for the past three years. We realized that scaling software 
development was challenging. A large fraction of software engineering has been 
manual and mundane: writing test cases, fixing bugs, implementing features 
according to specs, and reviewing pull requests. So we started this project to 
see how much we could automate.
After three years of development and one year of testing, we now have enough 
confidence that this could work well in practice. For example, Matei confessed 
to me today: "It looks like DeepSpark has a better understanding of Spark 
internals than I ever will. It updated several pieces of code I wrote long ago 
that even I no longer understood.”

I think it's time to discuss as a community about how we want to continue this 
project to ensure Spark is stable, secure, and easy to use yet able to progress 
as fast as possible. I'm still working on a more formal design doc, and it 
might take a little bit more time since I haven't been able to fully grasp 
DeepSpark's capabilities yet. Based on my understanding right now, I've written 
a blog post about DeepSpark here: 
https://databricks.com/blog/2016/04/01/unreasonable-effectiveness-of-deep-learning-on-spark.html

Please take a look and share your thoughts. Obviously, this is an ambitious 
project and could take many years to fully implement. One major challenge is 
cost. The current Spark Jenkins infrastructure provided by the AMPLab has only 
8 machines, but DeepSpark uses 12000 machines. I'm not sure whether AMPLab or 
Databricks can fund DeepSpark's operation for a long period of time. Perhaps 
AWS can help out here. Let me know if you have other ideas.







  

Re: Spark with Druid

2016-03-23 Thread Michael Malak
Will Spark 2.0 Structured Streaming obviate some of the Druid/Spark use cases?

  From: Raymond Honderdors 
 To: "yuzhih...@gmail.com"  
Cc: "user@spark.apache.org" 
 Sent: Wednesday, March 23, 2016 8:43 AM
 Subject: Re: Spark with Druid
   
I saw these but i fail to understand how to direct the code to use rhe index 
json
Sent from Outlook Mobile



On Wed, Mar 23, 2016 at 7:19 AM -0700, "Ted Yu"  wrote:

Please see:
https://www.linkedin.com/pulse/combining-druid-spark-interactive-flexible-analytics-scale-butani

which references https://github.com/SparklineData/spark-druid-olap
On Wed, Mar 23, 2016 at 5:59 AM, Raymond Honderdors 
 wrote:

Does anyone have a good overview on how to integrate Spark and Druid? I am now 
struggling with the creation of a druid data source in spark.  Raymond 
HonderdorsTeam Lead Analytics BIBusiness Intelligence 
developerraymond.honderd...@sizmek.comt +972.7325.3569Herzliya 



  

Re: [discuss] DataFrame vs Dataset in Spark 2.0

2016-02-25 Thread Michael Malak
Would it make sense (in terms of feasibility, code organization, and 
politically) to have a JavaDataFrame, as a way to isolate the 1000+ extra lines 
to a Java compatibility layer/class?

  From: Reynold Xin 
 To: "dev@spark.apache.org"  
 Sent: Thursday, February 25, 2016 4:23 PM
 Subject: [discuss] DataFrame vs Dataset in Spark 2.0
   
When we first introduced Dataset in 1.6 as an experimental API, we wanted to 
merge Dataset/DataFrame but couldn't because we didn't want to break the 
pre-existing DataFrame API (e.g. map function should return Dataset, rather 
than RDD). In Spark 2.0, one of the main API changes is to merge DataFrame and 
Dataset.
Conceptually, DataFrame is just a Dataset[Row]. In practice, there are two ways 
to implement this:
Option 1. Make DataFrame a type alias for Dataset[Row]
Option 2. DataFrame as a concrete class that extends Dataset[Row]

I'm wondering what you think about this. The pros and cons I can think of are:

Option 1. Make DataFrame a type alias for Dataset[Row]
+ Cleaner conceptually, especially in Scala. It will be very clear what 
libraries or applications need to do, and we won't see type mismatches (e.g. a 
function expects DataFrame, but user is passing in Dataset[Row]
+ A lot less code- Breaks source compatibility for the DataFrame API in Java, 
and binary compatibility for Scala/Java

Option 2. DataFrame as a concrete class that extends Dataset[Row]
The pros/cons are basically the inverse of Option 1.
+ In most cases, can maintain source compatibility for the DataFrame API in 
Java, and binary compatibility for Scala/Java- A lot more code (1000+ loc)- 
Less cleaner, and can be confusing when users pass in a Dataset[Row] into a 
function that expects a DataFrame

The concerns are mostly with Scala/Java. For Python, it is very easy to 
maintain source compatibility for both (there is no concept of binary 
compatibility), and for R, we are only supporting the DataFrame operations 
anyway because that's more familiar interface for R users outside of Spark.



  

[jira] [Commented] (SPARK-3789) [GRAPHX] Python bindings for GraphX

2015-11-04 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14990391#comment-14990391
 ] 

Michael Malak commented on SPARK-3789:
--

My publisher tells me the MEAP for Spark GraphX In Action has been picking up 
lately. And just two weeks ago Ion Stoica announced at GraphConnect "We look 
forward to bringing Cypher's graph pattern matching capabilities into the Spark 
stack, making graph querying more accessible to the masses" 
http://www.reuters.com/article/2015/10/21/idUSnMKWNtdbfa+1e0+MKW20151021 . So 
one has to wonder whether Ion was referring to GraphX or some other graph 
technology.

> [GRAPHX] Python bindings for GraphX
> ---
>
> Key: SPARK-3789
> URL: https://issues.apache.org/jira/browse/SPARK-3789
> Project: Spark
>  Issue Type: New Feature
>  Components: GraphX, PySpark
>Reporter: Ameet Talwalkar
>Assignee: Kushal Datta
> Attachments: PyGraphX_design_doc.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-11278) PageRank fails with unified memory manager

2015-10-23 Thread Michael Malak (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Malak updated SPARK-11278:
--
Component/s: GraphX

> PageRank fails with unified memory manager
> --
>
> Key: SPARK-11278
> URL: https://issues.apache.org/jira/browse/SPARK-11278
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX, Spark Core
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Nishkam Ravi
>
> PageRank (6-nodes, 32GB input) runs very slow and eventually fails with 
> ExecutorLostFailure. Traced it back to the 'unified memory manager' commit 
> from Oct 13th. Took a quick look at the code and couldn't see the problem 
> (changes look pretty good). cc'ing [~andrewor14][~vanzin] who may be able to 
> spot the problem quickly. Can be reproduced by running PageRank on a large 
> enough input dataset if needed. Sorry for not being of much help here.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2365) Add IndexedRDD, an efficient updatable key-value store

2015-10-09 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14950679#comment-14950679
 ] 

Michael Malak commented on SPARK-2365:
--

It's off-topic of IndexedRDD, but you can have a look at AMPLab's 
experimental/alpha Succinct, which is a key/value store over Tachyon. 
https://github.com/amplab/succinct . I haven't tried it myself.

> Add IndexedRDD, an efficient updatable key-value store
> --
>
> Key: SPARK-2365
> URL: https://issues.apache.org/jira/browse/SPARK-2365
> Project: Spark
>  Issue Type: New Feature
>  Components: GraphX, Spark Core
>Reporter: Ankur Dave
>Assignee: Ankur Dave
> Attachments: 2014-07-07-IndexedRDD-design-review.pdf
>
>
> RDDs currently provide a bulk-updatable, iterator-based interface. This 
> imposes minimal requirements on the storage layer, which only needs to 
> support sequential access, enabling on-disk and serialized storage.
> However, many applications would benefit from a richer interface. Efficient 
> support for point lookups would enable serving data out of RDDs, but it 
> currently requires iterating over an entire partition to find the desired 
> element. Point updates similarly require copying an entire iterator. Joins 
> are also expensive, requiring a shuffle and local hash joins.
> To address these problems, we propose IndexedRDD, an efficient key-value 
> store built on RDDs. IndexedRDD would extend RDD[(Long, V)] by enforcing key 
> uniqueness and pre-indexing the entries for efficient joins and point 
> lookups, updates, and deletions.
> It would be implemented by (1) hash-partitioning the entries by key, (2) 
> maintaining a hash index within each partition, and (3) using purely 
> functional (immutable and efficiently updatable) data structures to enable 
> efficient modifications and deletions.
> GraphX would be the first user of IndexedRDD, since it currently implements a 
> limited form of this functionality in VertexRDD. We envision a variety of 
> other uses for IndexedRDD, including streaming updates to RDDs, direct 
> serving from RDDs, and as an execution strategy for Spark SQL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10939) Misaligned data with RDD.zip after repartition

2015-10-08 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948596#comment-14948596
 ] 

Michael Malak commented on SPARK-10939:
---

Here Matei explains the explicit design decision to prefer shuffle performance 
arising from randomization over deterministic RDD computation:

https://issues.apache.org/jira/browse/SPARK-3098?focusedCommentId=14110183=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14110183

It has made it into the documentation (though perhaps not clearly enough, 
especially regarding the rationale):

https://issues.apache.org/jira/browse/SPARK-3356
https://github.com/apache/spark/pull/2508/files


> Misaligned data with RDD.zip after repartition
> --
>
> Key: SPARK-10939
> URL: https://issues.apache.org/jira/browse/SPARK-10939
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.3.0, 1.4.1, 1.5.0
> Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5
> - Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5
>Reporter: Dan Brown
>
> Split out from https://issues.apache.org/jira/browse/SPARK-10685:
> Here's a weird behavior where {{RDD.zip}} after a {{repartition}} produces 
> "misaligned" data, meaning different column values in the same row aren't 
> matched, as if a zip shuffled the collections before zipping them. It's 
> difficult to reproduce because it's nondeterministic, doesn't occur in local 
> mode, and requires ≥2 workers (≥3 in one case). I was able to repro it using 
> pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), and 1.5.0 
> (bin-without-hadoop).
> Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying 
> to build it ourselves when we ran into this problem. Let me put in my vote 
> for reopening the issue and supporting {{DataFrame.zip}} in the standard lib.
> - https://issues.apache.org/jira/browse/SPARK-7460
> h3. Repro
> Fail: RDD.zip after repartition
> {code}
> df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(1))
> df  = df.repartition(100)
> rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, 
> b=y.b))
> [r for r in rdd.collect() if r.a != r.b][:3] # Should be []
> {code}
> Sample outputs (nondeterministic):
> {code}
> []
> [Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)]
> []
> []
> [Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)]
> []
> {code}
> Test setup:
> - local\[8]: {{MASTER=local\[8]}}
> - dist\[N]: 1 driver + 1 master + N workers
> {code}
> "Fail" tests pass?  cluster mode  spark version
> 
> yes local[8]  1.3.0-cdh5.4.5
> no  dist[4]   1.3.0-cdh5.4.5
> yes local[8]  1.4.1
> yes dist[1]   1.4.1
> no  dist[2]   1.4.1
> no  dist[4]   1.4.1
> yes local[8]  1.5.0
> yes dist[1]   1.5.0
> no  dist[2]   1.5.0
> no  dist[4]   1.5.0
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10972) UDFs in SQL joins

2015-10-07 Thread Michael Malak (JIRA)
Michael Malak created SPARK-10972:
-

 Summary: UDFs in SQL joins
 Key: SPARK-10972
 URL: https://issues.apache.org/jira/browse/SPARK-10972
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 1.5.1
Reporter: Michael Malak


Currently expressions used to .join() in DataFrames are limited to column names 
plus the operators exposed in org.apache.spark.sql.Column.

It would be nice to be able to .join() based on a UDF, such as, say, 
euclideanDistance() < 0.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-10972) UDFs in SQL joins

2015-10-07 Thread Michael Malak (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Malak updated SPARK-10972:
--
Description: 
Currently expressions used to .join() in DataFrames are limited to column names 
plus the operators exposed in org.apache.spark.sql.Column.

It would be nice to be able to .join() based on a UDF, such as, say, 
euclideanDistance(col1, col2) < 0.1.

  was:
Currently expressions used to .join() in DataFrames are limited to column names 
plus the operators exposed in org.apache.spark.sql.Column.

It would be nice to be able to .join() based on a UDF, such as, say, 
euclideanDistance() < 0.1.


> UDFs in SQL joins
> -
>
> Key: SPARK-10972
> URL: https://issues.apache.org/jira/browse/SPARK-10972
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 1.5.1
>Reporter: Michael Malak
>
> Currently expressions used to .join() in DataFrames are limited to column 
> names plus the operators exposed in org.apache.spark.sql.Column.
> It would be nice to be able to .join() based on a UDF, such as, say, 
> euclideanDistance(col1, col2) < 0.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10722) Uncaught exception: RDDBlockId not found in driver-heartbeater

2015-09-27 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14909883#comment-14909883
 ] 

Michael Malak commented on SPARK-10722:
---

I have seen this in a small Hello World type program compiled and run from sbt 
that reads a large text file and calls .cache(). But if instead I do sbt 
package and then spark-submit (instead of just sbt run), it works. That 
suggests there may be some dependency omitted from Artifactory for spark-core 
but that is in spark-assembly.

This link suggests slf4j-simple.jar, but adding that to my .sbt didn't help.
https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/spark-Exception-in-thread-quot-main-quot-java-lang/td-p/19544

Googling, it seems the problem is more commonly encountered while running unit 
tests during the build of Spark itself.

> Uncaught exception: RDDBlockId not found in driver-heartbeater
> --
>
> Key: SPARK-10722
> URL: https://issues.apache.org/jira/browse/SPARK-10722
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.3.1, 1.4.1, 1.5.0
>Reporter: Simeon Simeonov
>
> Some operations involving cached RDDs generate an uncaught exception in 
> driver-heartbeater. If the {{.cache()}} call is removed, processing happens 
> without the exception. However, not all RDDs trigger the problem, i.e., some 
> {{.cache()}} operations are fine. 
> I can see the problem with 1.4.1 and 1.5.0 but I have not been able to create 
> a reproducible test case. The same exception is [reported on 
> SO|http://stackoverflow.com/questions/31280355/spark-test-on-local-machine] 
> for v1.3.1 but the behavior is related to large broadcast variables.
> The full stack trace is:
> {code}
> 15/09/20 22:10:08 ERROR Utils: Uncaught exception in thread driver-heartbeater
> java.io.IOException: java.lang.ClassNotFoundException: 
> org.apache.spark.storage.RDDBlockId
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
>   at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at org.apache.spark.util.Utils$.deserialize(Utils.scala:91)
>   at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:440)
>   at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:430)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:430)
>   at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:428)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:472)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
>   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>   at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:472)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$

[jira] [Commented] (SPARK-10489) GraphX dataframe wrapper

2015-09-10 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14739681#comment-14739681
 ] 

Michael Malak commented on SPARK-10489:
---

Feynman Liang:

Link https://github.com/databricks/spark-df-graph seems to be broken?

> GraphX dataframe wrapper
> 
>
> Key: SPARK-10489
> URL: https://issues.apache.org/jira/browse/SPARK-10489
> Project: Spark
>  Issue Type: New Feature
>  Components: GraphX
>Reporter: Feynman Liang
>
> We want to wrap GraphX Graph using DataFrames and implement basic high-level 
> algorithms like PageRank. Then we can easily implement Python API, 
> import/export, and other features.
> {code}
> val graph = new GraphF(vDF, eDF)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Build k-NN graph for large dataset

2015-08-26 Thread Michael Malak
Yes. And a paper that describes using grids (actually varying grids) is 
http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf
 In the Spark GraphX In Action book that Robin East and I are writing, we 
implement a drastically simplified version of this in chapter 7, which should 
become available in the MEAP mid-September. 
http://www.manning.com/books/spark-graphx-in-action

  From: Kristina Rogale Plazonic kpl...@gmail.com
 To: Jaonary Rabarisoa jaon...@gmail.com 
Cc: user user@spark.apache.org 
 Sent: Wednesday, August 26, 2015 7:24 AM
 Subject: Re: Build k-NN graph for large dataset
   
If you don't want to compute all N^2 similarities, you need to implement some 
kind of blocking first. For example, LSH (locally sensitive hashing). A quick 
search gave this link to a Spark implementation: 
http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing


On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

Dear all,
I'm trying to find an efficient way to build a k-NN graph for a large dataset. 
Precisely, I have a large set of high dimensional vector (say d  1) and 
I want to build a graph where those high dimensional points are the vertices 
and each one is linked to the k-nearest neighbor based on some kind similarity 
defined on the vertex spaces. 
My problem is to implement an efficient algorithm to compute the weight matrix 
of the graph. I need to compute a N*N similarities and the only way I know is 
to use cartesian operation follow by map operation on RDD. But, this is 
very slow when the N is large. Is there a more cleaver way to do this for an 
arbitrary similarity function ? 
Cheers,
Jao



  

RE: What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce?

2015-06-28 Thread Michael Malak
I would also add, from a data locality theoretic standpoint, mapPartitions() 
provides for node-local computation that plain old map-reduce does not.


From my Android phone on T-Mobile. The first nationwide 4G network.

 Original message 
From: Ashic Mahtab as...@live.com 
Date: 06/28/2015  10:51 AM  (GMT-07:00) 
To: YaoPau jonrgr...@gmail.com,Apache Spark user@spark.apache.org 
Subject: RE: What does Spark is not just MapReduce mean?  Isn't every Spark 
job a form of MapReduce? 
 
Spark comes with quite a few components. At it's core is..surprisespark 
core. This provides the core things required to run spark jobs. Spark provides 
a lot of operators out of the box...take a look at 
https://spark.apache.org/docs/latest/programming-guide.html#transformations
https://spark.apache.org/docs/latest/programming-guide.html#actions

While all of them can be implemented with variations of rd.map().reduce(), 
there are optimisations to be gained in terms of data locality, etc., and the 
additional operators simply make life simpler.

In addition to the core stuff, spark also brings things like Spark Streaming, 
Spark Sql and data frames, MLLib, GraphX, etc. Spark Streaming gives you 
microbatches of rdds at periodic intervals.Think give me the last 15 seconds 
of events every 5 seconds. You can then program towards the small collection, 
and the job will run in a fault tolerant manner on your cluster. Spark Sql 
provides hive like functionality that works nicely with various data sources, 
and RDDs. MLLib provide a lot of oob machine learning algorithms, and the new 
Spark ML project provides a nice elegant pipeline api to take care of a lot of 
common machine learning tasks. GraphX allows you to represent data in graphs, 
and run graph algorithms on it. e.g. you can represent your data as RDDs of 
vertexes and edges, and run pagerank on a distributed cluster.

And there's moreso, yeah...Spark is definitely not just MapReduce. :)

 Date: Sun, 28 Jun 2015 09:13:18 -0700
 From: jonrgr...@gmail.com
 To: user@spark.apache.org
 Subject: What does Spark is not just MapReduce mean? Isn't every Spark job 
 a form of MapReduce?
 
 I've heard Spark is not just MapReduce mentioned during Spark talks, but it
 seems like every method that Spark has is really doing something like (Map
 - Reduce) or (Map - Map - Map - Reduce) etc behind the scenes, with the
 performance benefit of keeping RDDs in memory between stages.
 
 Am I wrong about that? Is Spark doing anything more efficiently than a
 series of Maps followed by a Reduce in memory? What methods does Spark have
 that can't easily be mapped (with somewhat similar efficiency) to Map and
 Reduce in memory?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-is-not-just-MapReduce-mean-Isn-t-every-Spark-job-a-form-of-MapReduce-tp23518.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


Re: Why Spark is much faster than Hadoop MapReduce even on disk

2015-04-27 Thread Michael Malak
http://www.datascienceassn.org/content/making-sense-making-sense-performance-data-analytics-frameworks
  
  From: bit1...@163.com bit1...@163.com
 To: user user@spark.apache.org 
 Sent: Monday, April 27, 2015 8:33 PM
 Subject: Why Spark is much faster than Hadoop MapReduce even on disk
   
#yiv1713360705 body {line-height:1.5;}#yiv1713360705 body 
{font-size:10.5pt;color:rgb(0, 0, 0);line-height:1.5;}Hi,
I am frequently asked why spark is also much faster than Hadoop MapReduce on 
disk (without the use of memory cache). I have no convencing answer for this 
question, could you guys elaborate on this? Thanks!



  

Re: How to restrict foreach on a streaming RDD only once upon receiver completion

2015-04-06 Thread Michael Malak
You could have your receiver send a magic value when it is done. I discuss 
this Spark Streaming pattern in my presentation Spark Gotchas and 
Anti-Patterns. In the PDF version, it's slides 
34-36.http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language

YouTube version cued to that place: 
http://www.youtube.com/watch?v=W5Uece_JmNst=23m18s   
  From: Hari Polisetty hpoli...@icloud.com
 To: Tathagata Das t...@databricks.com 
Cc: user user@spark.apache.org 
 Sent: Monday, April 6, 2015 2:02 PM
 Subject: Re: How to restrict foreach on a streaming RDD only once upon 
receiver completion
   
Yes, I’m using updateStateByKey and it works. But then I need to perform 
further computation on this Stateful RDD (see code snippet below). I perform 
forEach on the final RDD and get the top 10 records. I just don’t want the 
foreach to be performed every time a new batch is received. Only when the 
receiver is done fetching all the records.
My requirements are to programmatically invoke the E.S query (it varies by 
usecase) , get all the records and apply certain transformations and get the 
top 10 results based on certain criteria back into the driver program for 
further processing. I’m able to apply the transformations on the batches of 
records fetched from E.S  using streaming. So, I don’t need to wait for all the 
records to be fetched. The RDD transformations are happening all the time and 
the top k results are getting updated constantly until all the records are 
fetched by the receiver. Is there any drawback with this approach?
Can you give more pointers on what you mean by creating a custom RDD that reads 
from ElasticSearch? 
Here is the relevant portion of my Spark streaming code:
 //Create a custom streaming receiver to query for relevant data from E.S 
JavaReceiverInputDStreamString jsonStrings = ssc.receiverStream( new 
ElasticSearchResponseReceiver(query…….));
 //Apply JSON Paths to extract specific value(s) from each record 
JavaDStreamString fieldVariations = jsonStrings.flatMap(new 
FlatMapFunctionString, String() { private static final long serialVersionUID 
= 465237345751948L;
 @Override public IterableString call(String jsonString) { ListString r = 
JsonPath.read(jsonString, attributeDetail.getJsonPath()); return r; }
 });
 //Perform a stateful map reduce on each variation JavaPairDStreamString, 
Integer fieldVariationCounts = fieldVariations.mapToPair( new 
PairFunctionString, String, Integer() { private static final long 
serialVersionUID = -1241276515559408238L;
 @Override public Tuple2String, Integer call(String s) { return new 
Tuple2String, Integer(s, 1); } }).updateStateByKey(new 
Function2ListInteger, OptionalInteger, OptionalInteger () { private 
static final long serialVersionUID = 7598681835161199865L;
 public OptionalInteger call(ListInteger nums, OptionalInteger current) { 
Integer sum =  current.or((int) 0L); return (OptionalInteger) Optional.of(sum 
+ nums.size()); } }).reduceByKey(new Function2Integer, Integer, Integer() { 
private static final long serialVersionUID = -5906059838295609562L;
 @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } });
 //Swap the Map from Enum String,Int to Int,Enum String. This is so that we can 
sort on frequencies JavaPairDStreamInteger, String swappedPair = 
fieldVariationCounts.mapToPair(new PairFunctionTuple2String, Integer, 
Integer, String() { private static final long serialVersionUID = 
-5889774695187619957L;
 @Override public Tuple2Integer, String call(Tuple2String, Integer item) 
throws Exception { return item.swap(); }
 });
 //Sort based on Key i.e, frequency JavaPairDStreamInteger, String  
sortedCounts = swappedPair.transformToPair( new FunctionJavaPairRDDInteger, 
String, JavaPairRDDInteger, String() { private static final long 
serialVersionUID = -4172702039963232779L;
 public JavaPairRDDInteger, String call(JavaPairRDDInteger, String in) 
throws Exception { //False to denote sort in descending order return 
in.sortByKey(false); } });
 //Iterate through the RDD and get the top 20 values in the sorted pair and 
write to results list sortedCounts.foreach( new FunctionJavaPairRDDInteger, 
String, Void () { private static final long serialVersionUID = 
2186144129973051920L;
 public Void call(JavaPairRDDInteger, String rdd) { resultList.clear(); for 
(Tuple2Integer, String t: rdd.take(MainDriver.NUMBER_OF_TOP_VARIATIONS)) { 
resultList.add(new Tuple3String,Integer, Double(t._2(), t._1(), (double) 
(100*t._1())/totalProcessed.value())); } return null; } } );        



On Apr 7, 2015, at 1:14 AM, Tathagata Das t...@databricks.com wrote:
So you want to sort based on the total count of the all the records received 
through receiver? In that case, you have to combine all the counts using 
updateStateByKey 
(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala)
 But stepping back, if you want to 

[jira] [Created] (SPARK-6710) Wrong initial bias in GraphX SVDPlusPlus

2015-04-04 Thread Michael Malak (JIRA)
Michael Malak created SPARK-6710:


 Summary: Wrong initial bias in GraphX SVDPlusPlus
 Key: SPARK-6710
 URL: https://issues.apache.org/jira/browse/SPARK-6710
 Project: Spark
  Issue Type: Bug
  Components: GraphX
Affects Versions: 1.3.0
Reporter: Michael Malak


In the initialization portion of GraphX SVDPlusPluS, the initialization of 
biases appears to be incorrect. Specifically, in line 
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala#L96
 
instead of 
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) 
it should probably be 
(vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 / scala.math.sqrt(msg.get._1)) 

That is, the biases bu and bi (both represented as the third component of the 
Tuple4[] above, depending on whether the vertex is a user or an item), 
described in equation (1) of the Koren paper, are supposed to be small offsets 
to the mean (represented by the variable u, signifying the Greek letter mu) to 
account for peculiarities of individual users and items. 

Initializing these biases to wrong values should theoretically not matter given 
enough iterations of the algorithm, but some quick empirical testing shows it 
has trouble converging at all, even after many orders of magnitude additional 
iterations. 

This perhaps could be the source of previously reported trouble with 
SVDPlusPlus. 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-SVDPlusPlus-problem-td12885.html
 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Wrong initial bias in GraphX SVDPlusPlus?

2015-04-03 Thread Michael Malak
I believe that in the initialization portion of GraphX SVDPlusPluS, the 
initialization of biases is incorrect. Specifically, in line 
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala#L96
 
instead of 
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) 
it should be 
(vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 / scala.math.sqrt(msg.get._1)) 

That is, the biases bu and bi (both represented as the third component of the 
Tuple4[] above, depending on whether the vertex is a user or an item), 
described in equation (1) of the Koren paper, are supposed to be small offsets 
to the mean (represented by the variable u, signifying the Greek letter mu) to 
account for peculiarities of individual users and items. 

Initializing these biases to wrong values should theoretically not matter given 
enough iterations of the algorithm, but some quick empirical testing shows it 
has trouble converging at all, even after many orders of magnitude additional 
iterations. 

This perhaps could be the source of previously reported trouble with 
SVDPlusPlus. 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-SVDPlusPlus-problem-td12885.html
 

If after a day, no one tells me I'm crazy here, I'll go ahead and create a Jira 
ticket. 

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Spark GraphX In Action on documentation page?

2015-03-24 Thread Michael Malak
Can my new book, Spark GraphX In Action, which is currently in MEAP 
http://manning.com/malak/, be added to 
https://spark.apache.org/documentation.html and, if appropriate, to 
https://spark.apache.org/graphx/ ?

Michael Malak

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[jira] [Commented] (SPARK-6388) Spark 1.3 + Hadoop 2.6 Can't work on Java 8_40

2015-03-17 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14365758#comment-14365758
 ] 

Michael Malak commented on SPARK-6388:
--

Isn't it Hadoop 2.7 that is supposed to provide Java 8 compatibility?

 Spark 1.3 + Hadoop 2.6 Can't work on Java 8_40
 --

 Key: SPARK-6388
 URL: https://issues.apache.org/jira/browse/SPARK-6388
 Project: Spark
  Issue Type: Bug
  Components: Block Manager, Spark Submit, YARN
Affects Versions: 1.3.0
 Environment: 1. Linux version 3.16.0-30-generic (buildd@komainu) (gcc 
 version 4.9.1 (Ubuntu 4.9.1-16ubuntu6) ) #40-Ubuntu SMP Mon Jan 12 22:06:37 
 UTC 2015
 2. Oracle Java 8 update 40  for Linux X64
 3. Scala 2.10.5
 4. Hadoop 2.6 (pre-build version)
Reporter: John
   Original Estimate: 24h
  Remaining Estimate: 24h

 I build Apache Spark 1.3 munally.
 ---
 JAVA_HOME=PATH_TO_JAVA8
 mvn clean package -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests
 ---
 Something goes wrong, akka always tell me 
 ---
 15/03/17 21:28:10 WARN remote.ReliableDeliverySupervisor: Association with 
 remote system [akka.tcp://sparkYarnAM@Server2:42161] has failed, address is 
 now gated for [5000] ms. Reason is: [Disassociated].
 ---
 I build another version of Spark 1.3 + Hadoop 2.6 under Java 7.
 Everything goes well.
 Logs
 ---
 15/03/17 21:27:06 INFO spark.SparkContext: Running Spark version 1.3.0
 15/03/17 21:27:07 WARN util.NativeCodeLoader: Unable to load native-hadoop 
 library for your platform... using builtin-java classes where applicable
 15/03/17 21:27:08 INFO spark.SecurityManager: Changing view Servers to: hduser
 15/03/17 21:27:08 INFO spark.SecurityManager: Changing modify Servers to: 
 hduser
 15/03/17 21:27:08 INFO spark.SecurityManager: SecurityManager: authentication 
 disabled; ui Servers disabled; users with view permissions: Set(hduser); 
 users with modify permissions: Set(hduser)
 15/03/17 21:27:08 INFO slf4j.Slf4jLogger: Slf4jLogger started
 15/03/17 21:27:08 INFO Remoting: Starting remoting
 15/03/17 21:27:09 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkDriver@Server3:37951]
 15/03/17 21:27:09 INFO util.Utils: Successfully started service 'sparkDriver' 
 on port 37951.
 15/03/17 21:27:09 INFO spark.SparkEnv: Registering MapOutputTracker
 15/03/17 21:27:09 INFO spark.SparkEnv: Registering BlockManagerMaster
 15/03/17 21:27:09 INFO storage.DiskBlockManager: Created local directory at 
 /tmp/spark-0db692bb-cd02-40c8-a8f0-3813c6da18e2/blockmgr-a1d0ad23-ab76-4177-80a0-a6f982a64d80
 15/03/17 21:27:09 INFO storage.MemoryStore: MemoryStore started with capacity 
 265.1 MB
 15/03/17 21:27:09 INFO spark.HttpFileServer: HTTP File server directory is 
 /tmp/spark-502ef3f8-b8cd-45cf-b1df-97df297cdb35/httpd-6303e24d-4b2b-4614-bb1d-74e8d331189b
 15/03/17 21:27:09 INFO spark.HttpServer: Starting HTTP Server
 15/03/17 21:27:09 INFO server.Server: jetty-8.y.z-SNAPSHOT
 15/03/17 21:27:10 INFO server.AbstractConnector: Started 
 SocketConnector@0.0.0.0:48000
 15/03/17 21:27:10 INFO util.Utils: Successfully started service 'HTTP file 
 server' on port 48000.
 15/03/17 21:27:10 INFO spark.SparkEnv: Registering OutputCommitCoordinator
 15/03/17 21:27:10 INFO server.Server: jetty-8.y.z-SNAPSHOT
 15/03/17 21:27:10 INFO server.AbstractConnector: Started 
 SelectChannelConnector@0.0.0.0:4040
 15/03/17 21:27:10 INFO util.Utils: Successfully started service 'SparkUI' on 
 port 4040.
 15/03/17 21:27:10 INFO ui.SparkUI: Started SparkUI at http://Server3:4040
 15/03/17 21:27:10 INFO spark.SparkContext: Added JAR 
 file:/home/hduser/spark-java2.jar at 
 http://192.168.11.42:48000/jars/spark-java2.jar with timestamp 1426598830307
 15/03/17 21:27:10 INFO client.RMProxy: Connecting to ResourceManager at 
 Server3/192.168.11.42:8050
 15/03/17 21:27:11 INFO yarn.Client: Requesting a new application from cluster 
 with 3 NodeManagers
 15/03/17 21:27:11 INFO yarn.Client: Verifying our application has not 
 requested more than the maximum memory capability of the cluster (8192 MB per 
 container)
 15/03/17 21:27:11 INFO yarn.Client: Will allocate AM container, with 896 MB 
 memory including 384 MB overhead
 15/03/17 21:27:11 INFO yarn.Client: Setting up container launch context for 
 our AM
 15/03/17 21:27:11 INFO yarn.Client: Preparing resources for our AM container
 15/03/17 21:27:12 INFO yarn.Client: Uploading resource 
 file:/home/hduser/spark-1.3.0/assembly/target/scala-2.10/spark-assembly-1.3.0-hadoop2.6.0.jar
  - 
 hdfs://Server3:9000/user/hduser/.sparkStaging/application_1426595477608_0002/spark-assembly-1.3.0-hadoop2.6.0.jar
 15/03/17 21:27:21 INFO yarn.Client: Setting up the launch environment for our

textFile() ordering and header rows

2015-02-22 Thread Michael Malak
Since RDDs are generally unordered, aren't things like textFile().first() not 
guaranteed to return the first row (such as looking for a header row)? If so, 
doesn't that make the example in 
http://spark.apache.org/docs/1.2.1/quick-start.html#basics misleading?

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[jira] [Commented] (SPARK-4279) Implementing TinkerPop on top of GraphX

2015-02-06 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14309459#comment-14309459
 ] 

Michael Malak commented on SPARK-4279:
--

Is there another place where I might be able to track progress on this -- 
either another Jira or on a mailing list?

 Implementing TinkerPop on top of GraphX
 ---

 Key: SPARK-4279
 URL: https://issues.apache.org/jira/browse/SPARK-4279
 Project: Spark
  Issue Type: New Feature
  Components: GraphX
Reporter: Brennon York
Priority: Minor

 [TinkerPop|https://github.com/tinkerpop] is a great abstraction for graph 
 databases and has been implemented across various graph database backends. 
 Has anyone thought about integrating the TinkerPop framework with GraphX to 
 enable GraphX as another backend? Not sure if this has been brought up or 
 not, but would certainly volunteer to spearhead this effort if the community 
 thinks it to be a good idea.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Word2Vec IndexedRDD

2015-02-01 Thread Michael Malak
1. Is IndexedRDD planned for 1.3? 
https://issues.apache.org/jira/browse/SPARK-2365

2. Once IndexedRDD is in, is it planned to convert Word2VecModel to it from its 
current Map[String,Array[Float]]? 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala#L425

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: spark challenge: zip with next???

2015-01-30 Thread Michael Malak
But isn't foldLeft() overkill for the originally stated use case of max diff of 
adjacent pairs? Isn't foldLeft() for recursive non-commutative non-associative 
accumulation as opposed to an embarrassingly parallel operation such as this 
one?
This use case reminds me of FIR filtering in DSP. It seems that RDDs could use 
something that serves the same purpose as scala.collection.Iterator.sliding.
  From: Koert Kuipers ko...@tresata.com
 To: Mohit Jaggi mohitja...@gmail.com 
Cc: Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya 
ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; 
user@spark.apache.org user@spark.apache.org 
 Sent: Friday, January 30, 2015 7:11 AM
 Subject: Re: spark challenge: zip with next???
   
assuming the data can be partitioned then you have many timeseries for which 
you want to detect potential gaps. also assuming the resulting gaps info per 
timeseries is much smaller data then the timeseries data itself, then this is a 
classical example to me of a sorted (streaming) foldLeft, requiring an 
efficient secondary sort in the spark shuffle. i am trying to get that into 
spark here:
https://issues.apache.org/jira/browse/SPARK-3655



On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote:

http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
you can use the MLLib function or do the following (which is what I had done):
- in first pass over the data, using mapPartitionWithIndex, gather the first 
item in each partition. you can use collect (or aggregator) for this. “key” 
them by the partition index. at the end, you will have a map   (partition 
index) -- first item- in the second pass over the data, using 
mapPartitionWithIndex again, look at two (or in the general case N items at a 
time, you can use scala’s sliding iterator) items at a time and check the time 
difference(or any sliding window computation). To this mapParitition, pass the 
map created in previous step. You will need to use them to check the last item 
in this partition.
If you can tolerate a few inaccuracies then you can just do the second step. 
You will miss the “boundaries” of the partitions but it might be acceptable for 
your use case.



On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote:
Hi,

On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com 
wrote:

Make a copy of your RDD with an extra entry in the beginning to offset. The you 
can zip the two RDDs and run a map to generate an RDD of differences.


Does that work? I recently tried something to compute differences between each 
entry and the next, so I did  val rdd1 = ... // null element + rdd  val rdd2 = 
... // rdd + null elementbut got an error message about zip requiring data 
sizes in each partition to match.
Tobias






  

Re: renaming SchemaRDD - DataFrame

2015-01-27 Thread Michael Malak
I personally have no preference DataFrame vs. DataTable, but only wish to lay 
out the history and etymology simply because I'm into that sort of thing.

Frame comes from Marvin Minsky's 1970's AI construct: slots and the data 
that go in them. The S programming language (precursor to R) adopted this 
terminology in 1991. R of course became popular with the rise of Data Science 
around 2012.
http://www.google.com/trends/explore#q=%22data%20science%22%2C%20%22r%20programming%22cmpt=qtz=

DataFrame would carry the implication that it comes along with its own 
metadata, whereas DataTable might carry the implication that metadata is 
stored in a central metadata repository.

DataFrame is thus technically more correct for SchemaRDD, but is a less 
familiar (and thus less accessible) term for those not immersed in data science 
or AI and thus may have narrower appeal.


- Original Message -
From: Evan R. Sparks evan.spa...@gmail.com
To: Matei Zaharia matei.zaha...@gmail.com
Cc: Koert Kuipers ko...@tresata.com; Michael Malak michaelma...@yahoo.com; 
Patrick Wendell pwend...@gmail.com; Reynold Xin r...@databricks.com; 
dev@spark.apache.org dev@spark.apache.org
Sent: Tuesday, January 27, 2015 9:55 AM
Subject: Re: renaming SchemaRDD - DataFrame

I'm +1 on this, although a little worried about unknowingly introducing
SparkSQL dependencies every time someone wants to use this. It would be
great if the interface can be abstract and the implementation (in this
case, SparkSQL backend) could be swapped out.

One alternative suggestion on the name - why not call it DataTable?
DataFrame seems like a name carried over from pandas (and by extension, R),
and it's never been obvious to me what a Frame is.



On Mon, Jan 26, 2015 at 5:32 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 (Actually when we designed Spark SQL we thought of giving it another name,
 like Spark Schema, but we decided to stick with SQL since that was the most
 obvious use case to many users.)

 Matei

  On Jan 26, 2015, at 5:31 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
 
  While it might be possible to move this concept to Spark Core long-term,
 supporting structured data efficiently does require quite a bit of the
 infrastructure in Spark SQL, such as query planning and columnar storage.
 The intent of Spark SQL though is to be more than a SQL server -- it's
 meant to be a library for manipulating structured data. Since this is
 possible to build over the core API, it's pretty natural to organize it
 that way, same as Spark Streaming is a library.
 
  Matei
 
  On Jan 26, 2015, at 4:26 PM, Koert Kuipers ko...@tresata.com wrote:
 
  The context is that SchemaRDD is becoming a common data format used for
  bringing data into Spark from external systems, and used for various
  components of Spark, e.g. MLlib's new pipeline API.
 
  i agree. this to me also implies it belongs in spark core, not sql
 
  On Mon, Jan 26, 2015 at 6:11 PM, Michael Malak 
  michaelma...@yahoo.com.invalid wrote:
 
  And in the off chance that anyone hasn't seen it yet, the Jan. 13 Bay
 Area
  Spark Meetup YouTube contained a wealth of background information on
 this
  idea (mostly from Patrick and Reynold :-).
 
  https://www.youtube.com/watch?v=YWppYPWznSQ
 
  
  From: Patrick Wendell pwend...@gmail.com
  To: Reynold Xin r...@databricks.com
  Cc: dev@spark.apache.org dev@spark.apache.org
  Sent: Monday, January 26, 2015 4:01 PM
  Subject: Re: renaming SchemaRDD - DataFrame
 
 
  One thing potentially not clear from this e-mail, there will be a 1:1
  correspondence where you can get an RDD to/from a DataFrame.
 
 
  On Mon, Jan 26, 2015 at 2:18 PM, Reynold Xin r...@databricks.com
 wrote:
  Hi,
 
  We are considering renaming SchemaRDD - DataFrame in 1.3, and wanted
 to
  get the community's opinion.
 
  The context is that SchemaRDD is becoming a common data format used
 for
  bringing data into Spark from external systems, and used for various
  components of Spark, e.g. MLlib's new pipeline API. We also expect
 more
  and
  more users to be programming directly against SchemaRDD API rather
 than
  the
  core RDD API. SchemaRDD, through its less commonly used DSL originally
  designed for writing test cases, always has the data-frame like API.
 In
  1.3, we are redesigning the API to make the API usable for end users.
 
 
  There are two motivations for the renaming:
 
  1. DataFrame seems to be a more self-evident name than SchemaRDD.
 
  2. SchemaRDD/DataFrame is actually not going to be an RDD anymore
 (even
  though it would contain some RDD functions like map, flatMap, etc),
 and
  calling it Schema*RDD* while it is not an RDD is highly confusing.
  Instead.
  DataFrame.rdd will return the underlying RDD for all RDD methods.
 
 
  My understanding is that very few users program directly against the
  SchemaRDD API at the moment, because they are not well documented.
  However,
  oo maintain backward compatibility, we can

[jira] [Created] (SPARK-5343) ShortestPaths traverses backwards

2015-01-20 Thread Michael Malak (JIRA)
Michael Malak created SPARK-5343:


 Summary: ShortestPaths traverses backwards
 Key: SPARK-5343
 URL: https://issues.apache.org/jira/browse/SPARK-5343
 Project: Spark
  Issue Type: Bug
  Components: GraphX
Affects Versions: 1.2.0
Reporter: Michael Malak


GraphX ShortestPaths seems to be following edges backwards instead of forwards:

import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), 
sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L,

lib.ShortestPaths.run(g,Array(3)).vertices.collect
res1: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 
- 0)), (2,Map()))

lib.ShortestPaths.run(g,Array(1)).vertices.collect

res2: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), 
(3,Map(1 - 2)), (2,Map(1 - 1)))

The following changes may be what will make it run forward:

Change one occurrence of src to dst in
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L64

Change three occurrences of dst to src in
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L65




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: GraphX ShortestPaths backwards?

2015-01-20 Thread Michael Malak
I created https://issues.apache.org/jira/browse/SPARK-5343 for this.


- Original Message -
From: Michael Malak michaelma...@yahoo.com
To: dev@spark.apache.org dev@spark.apache.org
Cc: 
Sent: Monday, January 19, 2015 5:09 PM
Subject: GraphX ShortestPaths backwards?

GraphX ShortestPaths seems to be following edges backwards instead of forwards:

import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), 
sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L,

lib.ShortestPaths.run(g,Array(3)).vertices.collect
res1: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 
- 0)), (2,Map()))

lib.ShortestPaths.run(g,Array(1)).vertices.collect

res2: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), 
(3,Map(1 - 2)), (2,Map(1 - 1)))

If I am not mistaken about my assessment, then I believe the following changes 
will make it run forward:

Change one occurrence of src to dst in
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L64

Change three occurrences of dst to src in
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L65

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



GraphX ShortestPaths backwards?

2015-01-19 Thread Michael Malak
GraphX ShortestPaths seems to be following edges backwards instead of forwards:

import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), 
sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L,

lib.ShortestPaths.run(g,Array(3)).vertices.collect
res1: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 
- 0)), (2,Map()))

lib.ShortestPaths.run(g,Array(1)).vertices.collect

res2: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), 
(3,Map(1 - 2)), (2,Map(1 - 1)))

If I am not mistaken about my assessment, then I believe the following changes 
will make it run forward:

Change one occurrence of src to dst in
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L64

Change three occurrences of dst to src in
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala#L65

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: GraphX vertex partition/location strategy

2015-01-19 Thread Michael Malak
But wouldn't the gain be greater under something similar to EdgePartition1D 
(but perhaps better load-balanced based on number of edges for each vertex) and 
an algorithm that primarily follows edges in the forward direction?
  From: Ankur Dave ankurd...@gmail.com
 To: Michael Malak michaelma...@yahoo.com 
Cc: dev@spark.apache.org dev@spark.apache.org 
 Sent: Monday, January 19, 2015 2:08 PM
 Subject: Re: GraphX vertex partition/location strategy
   
No - the vertices are hash-partitioned onto workers independently of the edges. 
It would be nice for each vertex to be on the worker with the most adjacent 
edges, but we haven't done this yet since it would add a lot of complexity to 
avoid load imbalance while reducing the overall communication by a small factor.
We refer to the number of partitions containing adjacent edges for a particular 
vertex as the vertex's replication factor. I think the typical replication 
factor for power-law graphs with 100-200 partitions is 10-15, and placing the 
vertex at the ideal location would only reduce the replication factor by 1.

Ankur


On Mon, Jan 19, 2015 at 12:20 PM, Michael Malak 
michaelma...@yahoo.com.invalid wrote:

Does GraphX make an effort to co-locate vertices onto the same workers as the 
majority (or even some) of its edges?



   

GraphX vertex partition/location strategy

2015-01-19 Thread Michael Malak
Does GraphX make an effort to co-locate vertices onto the same workers as the 
majority (or even some) of its edges?

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



GraphX doc: triangleCount() requirement overstatement?

2015-01-18 Thread Michael Malak
According to:
https://spark.apache.org/docs/1.2.0/graphx-programming-guide.html#triangle-counting
 

Note that TriangleCount requires the edges to be in canonical orientation 
(srcId  dstId)

But isn't this overstating the requirement? Isn't the requirement really that 
IF there are duplicate edges between two vertices, THEN those edges must all be 
in the same direction (in order for the groupEdges() at the beginning of 
triangleCount() to produce the intermediate results that triangleCount() 
expects)?

If so, should I enter a JIRA ticket to clarify the documentation?

Or is it the case that https://issues.apache.org/jira/browse/SPARK-3650 will 
make it into Spark 1.3 anyway?

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: GraphX rmatGraph hangs

2015-01-04 Thread Michael Malak
Thank you. I created 
https://issues.apache.org/jira/browse/SPARK-5064


- Original Message -
From: xhudik xhu...@gmail.com
To: dev@spark.apache.org
Cc: 
Sent: Saturday, January 3, 2015 2:04 PM
Subject: Re: GraphX rmatGraph hangs

Hi Michael,

yes, I can confirm the behavior.
It get stuck (loop?) and eat all resources, command top gives:
14013 ll 20   0 2998136 489992  19804 S 100.2 12.10  13:29.39 java

You might create an issue/bug in jira
(https://issues.apache.org/jira/browse/SPARK)


best, tomas



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/GraphX-rmatGraph-hangs-tp9995p9996.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



GraphX rmatGraph hangs

2015-01-03 Thread Michael Malak
The following single line just hangs, when executed in either Spark Shell or 
standalone:

org.apache.spark.graphx.util.GraphGenerators.rmatGraph(sc, 4, 8)

It just outputs 0 edges and then locks up.
The only other information I've found via Google is:

http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3c1408617621830-12570.p...@n3.nabble.com%3E

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[jira] [Created] (SPARK-5064) GraphX rmatGraph hangs

2015-01-03 Thread Michael Malak (JIRA)
Michael Malak created SPARK-5064:


 Summary: GraphX rmatGraph hangs
 Key: SPARK-5064
 URL: https://issues.apache.org/jira/browse/SPARK-5064
 Project: Spark
  Issue Type: Bug
  Components: GraphX
Affects Versions: 1.2.0
 Environment: CentOS 7 REPL (no HDFS). Also tried Cloudera 5.2.0 
QuickStart standalone compiled Scala with spark-submit.
Reporter: Michael Malak


org.apache.spark.graphx.util.GraphGenerators.rmatGraph(sc, 4, 8)

It just outputs 0 edges and then locks up.

A spark-user message reports similar behavior:
http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3c1408617621830-12570.p...@n3.nabble.com%3E




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Rdd of Rdds

2014-10-22 Thread Michael Malak
On Wednesday, October 22, 2014 9:06 AM, Sean Owen so...@cloudera.com wrote:

 No, there's no such thing as an RDD of RDDs in Spark.
 Here though, why not just operate on an RDD of Lists? or a List of RDDs?
 Usually one of these two is the right approach whenever you feel
 inclined to operate on an RDD of RDDs.


Depending on one's needs, one could also consider the matrix (RDD[Vector]) 
operations provided by MLLib, such as
https://spark.apache.org/docs/latest/mllib-statistics.html

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: UpdateStateByKey - How to improve performance?

2014-08-06 Thread Michael Malak
Depending on the density of your keys, the alternative signature

def updateStateByKey[S](updateFunc: (Iterator[(K, Seq[V], Option[S])]) ? 
Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: 
Boolean)(implicit arg0: ClassTag[S]): DStream[(K, S)] 

at least iterates by key rather than by (old) value.

I believe your thinking is correct that there might be a performance 
improvement opportunity for your case if there were an updateStateByKey() that 
instead iterated by (new) value.

BTW, my impression from the stock examples is that the signature I pasted above 
was intended to be the more typically called updateStateByKey(), as opposed to 
the one you pasted, for which my impression is that it is the more general 
purpose one. I have used the more general purpose one but only when I needed to 
peek into the entire set of states for some unusual reason.



On Wednesday, August 6, 2014 2:30 PM, Venkat Subramanian vsubr...@gmail.com 
wrote:
 


The method

def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
Option[S] ): DStream[(K, S)]

takes Dstream (K,V) and Produces DStream (K,S)  in Spark Streaming

We have a input Dstream(K,V) that has 40,000 elements. We update on average
of 1000  elements of them in every 3 second batch, but based on how this
updateStateByKey function is defined, we are looping through 40,000 elements
(Seq[V]) to make an update for just 1000 elements and not updating 39000
elements. I think looping through extra 39000 elements is a waste of
performance.

Isn't there a better way to update this efficiently by just figuring out the
a hash map for the 1000 elements that are required to be updated and just
updating it (without looping through the unwanted elements)?  Shouldn't
there be a Streaming update function provided that updates selective members
or are we missing some concepts here?

I think updateStateByKey may be causing lot of performance degradation in
our app as we keep doing this again and again for every batch. Please let us
know if my thought process is correct here.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: relationship of RDD[Array[String]] to Array[Array[String]]

2014-07-21 Thread Michael Malak
It's really more of a Scala question than a Spark question, but the standard OO 
(not Scala-specific) way is to create your own custom supertype (e.g. 
MyCollectionTrait), inherited/implemented by two concrete classes (e.g. MyRDD 
and MyArray), each of which manually forwards method calls to the corresponding 
pre-existing library implementations. Writing all those forwarding method calls 
is tedious, but Scala provides at least one bit of syntactic sugar, which 
alleviates having to type in twice the parameter lists for each method:
http://stackoverflow.com/questions/8230831/is-method-parameter-forwarding-possible-in-scala

I'm not seeing a way to utilize implicit conversions in this case. Since Scala 
is statically (albeit inferred) typed, I don't see a way around having a common 
supertype.



On Monday, July 21, 2014 11:01 AM, Philip Ogren philip.og...@oracle.com wrote:
It is really nice that Spark RDD's provide functions  that are often 
equivalent to functions found in Scala collections.  For example, I can 
call:

myArray.map(myFx)

and equivalently

myRdd.map(myFx)

Awesome!

My question is this.  Is it possible to write code that works on either 
an RDD or a local collection without having to have parallel 
implementations?  I can't tell that RDD or Array share any supertypes or 
traits by looking at the respective scaladocs. Perhaps implicit 
conversions could be used here.  What I would like to do is have a 
single function whose body is like this:

myData.map(myFx)

where myData could be an RDD[Array[String]] (for example) or an 
Array[Array[String]].

Has anyone had success doing this?

Thanks,
Philip


15 new MLlib algorithms

2014-07-09 Thread Michael Malak
At Spark Summit, Patrick Wendell indicated the number of MLlib algorithms would 
roughly double in 1.1 from the current approx. 15.
http://spark-summit.org/wp-content/uploads/2014/07/Future-of-Spark-Patrick-Wendell.pdf

What are the planned additional algorithms?

In Jira, I only see two when filtering on version 1.1, component MLlib: one on 
multi-label and another on high dimensionality.

https://issues.apache.org/jira/browse/SPARK-2329?jql=issuetype%20in%20(Brainstorming%2C%20Epic%2C%20%22New%20Feature%22%2C%20Story)%20AND%20fixVersion%20%3D%201.1.0%20AND%20component%20%3D%20MLlib

http://tinyurl.com/ku7sehu


Re: parallel Reduce within a key

2014-06-20 Thread Michael Malak
How about a treeReduceByKey? :-)


On Friday, June 20, 2014 11:55 AM, DB Tsai dbt...@stanford.edu wrote:
 


Currently, the reduce operation combines the result from mapper
sequentially, so it's O(n).

Xiangrui is working on treeReduce which is O(log(n)). Based on the
benchmark, it dramatically increase the performance. You can test the
code in his own branch.
https://github.com/apache/spark/pull/1110

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai



On Fri, Jun 20, 2014 at 6:57 AM, ansriniv ansri...@gmail.com wrote:
 Hi,

 I am on Spark 0.9.0

 I have a 2 node cluster (2 worker nodes) with 16 cores on each node (so, 32
 cores in the cluster).
 I have an input rdd with 64 partitions.

 I am running  sc.mapPartitions(...).reduce(...)

 I can see that I get full parallelism on the mapper (all my 32 cores are
 busy simultaneously). However, when it comes to reduce(), the outputs of the
 mappers are all reduced SERIALLY. Further, all the reduce processing happens
 only on 1 of the workers.

 I was expecting that the outputs of the 16 mappers on node 1 would be
 reduced in parallel in node 1 while the outputs of the 16 mappers on node 2
 would be reduced in parallel on node 2 and there would be 1 final inter-node
 reduce (of node 1 reduced result and node 2 reduced result).

 Isn't parallel reduce supported WITHIN a key (in this case I have no key) ?
 (I know that there is parallelism in reduce across keys)

 Best Regards
 Anand



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/parallel-Reduce-within-a-key-tp7998.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

GraphX triplets on 5-node graph

2014-05-29 Thread Michael Malak
Shouldn't I be seeing N2 and N4 in the output below? (Spark 0.9.0 REPL) Or am I 
missing something fundamental?


val nodes = sc.parallelize(Array((1L, N1), (2L, N2), (3L, N3), (4L, 
N4), (5L, N5))) 
val edges = sc.parallelize(Array(Edge(1L, 2L, E1), Edge(1L, 3L, E2), 
Edge(2L, 4L, E3), Edge(3L, 5L, E4))) 
Graph(nodes, edges).triplets.collect 
res1: Array[org.apache.spark.graphx.EdgeTriplet[String,String]] = 
Array(((1,N1),(3,N3),E2), ((1,N1),(3,N3),E2), ((3,N3),(5,N5),E4), 
((3,N3),(5,N5),E4))


[jira] [Commented] (SPARK-1199) Type mismatch in Spark shell when using case class defined in shell

2014-05-28 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011492#comment-14011492
 ] 

Michael Malak commented on SPARK-1199:
--

See also additional test cases in 
https://issues.apache.org/jira/browse/SPARK-1836 which has now been marked as a 
duplicate.

 Type mismatch in Spark shell when using case class defined in shell
 ---

 Key: SPARK-1199
 URL: https://issues.apache.org/jira/browse/SPARK-1199
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 0.9.0
Reporter: Andrew Kerr
Priority: Critical
 Fix For: 1.1.0


 Define a class in the shell:
 {code}
 case class TestClass(a:String)
 {code}
 and an RDD
 {code}
 val data = sc.parallelize(Seq(a)).map(TestClass(_))
 {code}
 define a function on it and map over the RDD
 {code}
 def itemFunc(a:TestClass):TestClass = a
 data.map(itemFunc)
 {code}
 Error:
 {code}
 console:19: error: type mismatch;
  found   : TestClass = TestClass
  required: TestClass = ?
   data.map(itemFunc)
 {code}
 Similarly with a mapPartitions:
 {code}
 def partitionFunc(a:Iterator[TestClass]):Iterator[TestClass] = a
 data.mapPartitions(partitionFunc)
 {code}
 {code}
 console:19: error: type mismatch;
  found   : Iterator[TestClass] = Iterator[TestClass]
  required: Iterator[TestClass] = Iterator[?]
 Error occurred in an application involving default arguments.
   data.mapPartitions(partitionFunc)
 {code}
 The behavior is the same whether in local mode or on a cluster.
 This isn't specific to RDDs. A Scala collection in the Spark shell has the 
 same problem.
 {code}
 scala Seq(TestClass(foo)).map(itemFunc)
 console:15: error: type mismatch;
  found   : TestClass = TestClass
  required: TestClass = ?
   Seq(TestClass(foo)).map(itemFunc)
 ^
 {code}
 When run in the Scala console (not the Spark shell) there are no type 
 mismatch errors.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (SPARK-1836) REPL $outer type mismatch causes lookup() and equals() problems

2014-05-28 Thread Michael Malak (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Malak resolved SPARK-1836.
--

Resolution: Duplicate

 REPL $outer type mismatch causes lookup() and equals() problems
 ---

 Key: SPARK-1836
 URL: https://issues.apache.org/jira/browse/SPARK-1836
 Project: Spark
  Issue Type: Bug
Affects Versions: 0.9.0
Reporter: Michael Malak

 Anand Avati partially traced the cause to REPL wrapping classes in $outer 
 classes. There are at least two major symptoms:
 1. equals()
 =
 In REPL equals() (required in custom classes used as a key for groupByKey) 
 seems to have to be written using instanceOf[] instead of the canonical 
 match{}
 Spark Shell (equals uses match{}):
 {noformat}
 class C(val s:String) extends Serializable {
   override def equals(o: Any) = o match {
 case that: C = that.s == s
 case _ = false
   }
 }
 val x = new C(a)
 val bos = new java.io.ByteArrayOutputStream()
 val out = new java.io.ObjectOutputStream(bos)
 out.writeObject(x);
 val b = bos.toByteArray();
 out.close
 bos.close
 val y = new java.io.ObjectInputStream(new 
 ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
 x.equals(y)
 res: Boolean = false
 {noformat}
 Spark Shell (equals uses isInstanceOf[]):
 {noformat}
 class C(val s:String) extends Serializable {
   override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s = 
 s) else false
 }
 val x = new C(a)
 val bos = new java.io.ByteArrayOutputStream()
 val out = new java.io.ObjectOutputStream(bos)
 out.writeObject(x);
 val b = bos.toByteArray();
 out.close
 bos.close
 val y = new java.io.ObjectInputStream(new 
 ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
 x.equals(y)
 res: Boolean = true
 {noformat}
 Scala Shell (equals uses match{}):
 {noformat}
 class C(val s:String) extends Serializable {
   override def equals(o: Any) = o match {
 case that: C = that.s == s
 case _ = false
   }
 }
 val x = new C(a)
 val bos = new java.io.ByteArrayOutputStream()
 val out = new java.io.ObjectOutputStream(bos)
 out.writeObject(x);
 val b = bos.toByteArray();
 out.close
 bos.close
 val y = new java.io.ObjectInputStream(new 
 java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
 x.equals(y)
 res: Boolean = true
 {noformat}
 2. lookup()
 =
 {noformat}
 class C(val s:String) extends Serializable {
   override def equals(o: Any) = if (o.isInstanceOf[C]) o.asInstanceOf[C].s == 
 s else false
   override def hashCode = s.hashCode
   override def toString = s
 }
 val r = sc.parallelize(Array((new C(a),11),(new C(a),12)))
 r.lookup(new C(a))
 console:17: error: type mismatch;
  found   : C
  required: C
   r.lookup(new C(a))
^
 {noformat}
 See
 http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3C1400019424.80629.YahooMailNeo%40web160801.mail.bf1.yahoo.com%3E



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: rdd ordering gets scrambled

2014-05-28 Thread Michael Malak
Mohit Jaggi:

A workaround is to use zipWithIndex (to appear in Spark 1.0, but if you're 
still on 0.9x you can swipe the code from 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
 ), map it to (x = (x._2,x._1)) and then sortByKey.

Spark developers:

The lack of ordering guarantee for RDDs should be better documented, and the 
presence of a method called first() is a bit deceiving, in my opinion, if that 
same first element doesn't survive a map().
 


On Tuesday, April 29, 2014 3:45 PM, Mohit Jaggi mohitja...@gmail.com wrote:
 


Hi,
I started with a text file(CSV) of sorted data (by first column), parsed it 
into Scala objects using map operation in Scala. Then I used more maps to add 
some extra info to the data and saved it as text file.
The final text file is not sorted. What do I need to do to keep the order from 
the original input intact?

My code looks like:

csvFile = sc.textFile(..) //file is CSV and ordered by first column
splitRdd = csvFile map { line = line.split(,,-1) }
parsedRdd = rdd map { parts = 
  { 
    key = parts(0) //use first column as key
    value = new MyObject(parts(0), parts(1)) //parse into scala objects
    (key, value)
  }

augmentedRdd = parsedRdd map { x =
   key =  x._1
   value = //add extra fields to x._2
   (key, value)
}
augmentedRdd.saveAsFile(...) //this file is not sorted

Mohit.

[jira] [Commented] (SPARK-1867) Spark Documentation Error causes java.lang.IllegalStateException: unread block data

2014-05-23 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14007565#comment-14007565
 ] 

Michael Malak commented on SPARK-1867:
--

Thank you, sam, that fixed it for me!

FYI, I had:

{noformat}
dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-common/artifactId
  version2.3.0-cdh5.0.1/version
/dependency
dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-mapreduce-client-core/artifactId
  version2.3.0-cdh5.0.1/version
/dependency
{noformat}

By removing the second one, textfile().count now works.

 Spark Documentation Error causes java.lang.IllegalStateException: unread 
 block data
 ---

 Key: SPARK-1867
 URL: https://issues.apache.org/jira/browse/SPARK-1867
 Project: Spark
  Issue Type: Bug
Reporter: sam

 I've employed two System Administrators on a contract basis (for quite a bit 
 of money), and both contractors have independently hit the following 
 exception.  What we are doing is:
 1. Installing Spark 0.9.1 according to the documentation on the website, 
 along with CDH4 (and another cluster with CDH5) distros of hadoop/hdfs.
 2. Building a fat jar with a Spark app with sbt then trying to run it on the 
 cluster
 I've also included code snippets, and sbt deps at the bottom.
 When I've Googled this, there seems to be two somewhat vague responses:
 a) Mismatching spark versions on nodes/user code
 b) Need to add more jars to the SparkConf
 Now I know that (b) is not the problem having successfully run the same code 
 on other clusters while only including one jar (it's a fat jar).
 But I have no idea how to check for (a) - it appears Spark doesn't have any 
 version checks or anything - it would be nice if it checked versions and 
 threw a mismatching version exception: you have user code using version X 
 and node Y has version Z.
 I would be very grateful for advice on this.
 The exception:
 Exception in thread main org.apache.spark.SparkException: Job aborted: Task 
 0.0:1 failed 32 times (most recent failure: Exception failure: 
 java.lang.IllegalStateException: unread block data)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at scala.Option.foreach(Option.scala:236)
   at 
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at 
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 14/05/16 18:05:31 INFO scheduler.TaskSetManager: Loss was due to 
 java.lang.IllegalStateException: unread block data [duplicate 59]
 My code snippet:
 val conf = new SparkConf()
.setMaster(clusterMaster)
.setAppName(appName)
.setSparkHome(sparkHome)
.setJars(SparkContext.jarOfClass(this.getClass))
 println(count =  + new SparkContext(conf).textFile(someHdfsPath).count())
 My SBT dependencies:
 // relevant
 org.apache.spark % spark-core_2.10 % 0.9.1,
 org.apache.hadoop % hadoop-client % 2.3.0-mr1-cdh5.0.0,
 // standard, probably unrelated
 com.github.seratch %% awscala % [0.2,),
 org.scalacheck %% scalacheck % 1.10.1 % test,
 org.specs2 %% specs2 % 1.14 % test,
 org.scala-lang % scala-reflect % 2.10.3,
 org.scalaz %% scalaz-core % 7.0.5,
 net.minidev % json-smart % 1.2



--
This message was sent by Atlassian JIRA
(v6.2

Re: [VOTE] Release Apache Spark 1.0.0 (rc5)

2014-05-17 Thread Michael Malak
While developers may appreciate 1.0 == API stability, I'm not sure that will 
be the understanding of the VP who gives the green light to a Spark-based 
development effort.

I fear a bug that silently produces erroneous results will be perceived like 
the FDIV bug, but in this case without the momentum of an existing large 
installed base and with a number of competitors (GridGain, H20, 
Stratosphere). Despite the stated intention of API stability, the perception 
(which becomes the reality) of 1.0 is that it's ready for production use -- 
not bullet-proof, but also not with known silent generation of erroneous 
results. Exceptions and crashes are much more tolerated than silent corruption 
of data. The result may be a reputation of the Spark team unconcerned about 
data integrity.

I ran into (and submitted) https://issues.apache.org/jira/browse/SPARK-1817 due 
to the lack of zipWithIndex(). zip() with a self-created partitioned range was 
the way I was trying to number with IDs a collection of nodes in preparation 
for the GraphX constructor. For the record, it was a frequent Spark committer 
who escalated it to blocker; I did not submit it as such. Partitioning a 
Scala range isn't just a toy example; it has a real-life use.

I also wonder about the REPL. Cloudera, for example, touts it as key to making 
Spark a crossover tool that Data Scientists can also use. The REPL can be 
considered an API of sorts -- not a traditional Scala or Java API, of course, 
but the API that a human data analyst would use. With the Scala REPL 
exhibiting some of the same bad behaviors as the Spark REPL, there is a 
question of whether the Spark REPL can even be fixed. If the Spark REPL has to 
be eliminated after 1.0 due to an inability to repair it, that would constitute 
API instability.


 
On Saturday, May 17, 2014 2:49 PM, Matei Zaharia matei.zaha...@gmail.com 
wrote:
 
As others have said, the 1.0 milestone is about API stability, not about saying 
“we’ve eliminated all bugs”. The sooner you declare 1.0, the sooner users can 
confidently build on Spark, knowing that the application they build today will 
still run on Spark 1.9.9 three years from now. This is something that I’ve seen 
done badly (and experienced the effects thereof) in other big data projects, 
such as MapReduce and even YARN. The result is that you annoy users, you end up 
with a fragmented userbase where everyone is building against a different 
version, and you drastically slow down development.

With a project as fast-growing as fast-growing as Spark in particular, there 
will be new bugs discovered and reported continuously, especially in the 
non-core components. Look at the graph of # of contributors in time to Spark: 
https://www.ohloh.net/p/apache-spark (bottom-most graph; “commits” changed when 
we started merging each patch as a single commit). This is not slowing down, 
and we need to have the culture now that we treat API stability and release 
numbers at the level expected for a 1.0 project instead of having people come 
in and randomly change the API.

I’ll also note that the issues marked “blocker” were marked so by their 
reporters, since the reporter can set the priority. I don’t consider stuff like 
parallelize() not partitioning ranges in the same way as other collections a 
blocker — it’s a bug, it would be good to fix it, but it only affects a small 
number of use cases. Of course if we find a real blocker (in particular a 
regression from a previous version, or a feature that’s just completely 
broken), we will delay the release for that, but at some point you have to say 
“okay, this fix will go into the next maintenance release”. Maybe we need to 
write a clear policy for what the issue priorities mean.

Finally, I believe it’s much better to have a culture where you can make 
releases on a regular schedule, and have the option to make a maintenance 
release in 3-4 days if you find new bugs, than one where you pile up stuff into 
each release. This is what much large project than us, like Linux, do, and it’s 
the only way to avoid indefinite stalling with a large contributor base. In the 
worst case, if you find a new bug that warrants immediate release, it goes into 
1.0.1 a week after 1.0.0 (we can vote on 1.0.1 in three days with just your bug 
fix in it). And if you find an API that you’d like to improve, just add a new 
one and maybe deprecate the old one — at some point we have to respect our 
users and let them know that code they write today will still run tomorrow.

Matei


On May 17, 2014, at 10:32 AM, Kan Zhang kzh...@apache.org wrote:

 +1 on the running commentary here, non-binding of course :-)
 
 
 On Sat, May 17, 2014 at 8:44 AM, Andrew Ash and...@andrewash.com wrote:
 
 +1 on the next release feeling more like a 0.10 than a 1.0
 On May 17, 2014 4:38 AM, Mridul Muralidharan mri...@gmail.com wrote:
 
 I had echoed similar sentiments a while back when there was a discussion
 around 0.10 vs 1.0 ... I would have 

[jira] [Updated] (SPARK-1836) REPL $outer type mismatch causes lookup() and equals() problems

2014-05-16 Thread Michael Malak (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Malak updated SPARK-1836:
-

Description: 
Anand Avati partially traced the cause to REPL wrapping classes in $outer 
classes. There are at least two major symptoms:

1. equals()
=

In REPL equals() (required in custom classes used as a key for groupByKey) 
seems to have to be written using instanceOf[] instead of the canonical match{}

Spark Shell (equals uses match{}):

{noformat}
class C(val s:String) extends Serializable {
  override def equals(o: Any) = o match {
case that: C = that.s == s
case _ = false
  }
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = false
{noformat}

Spark Shell (equals uses isInstanceOf[]):

{noformat}
class C(val s:String) extends Serializable {
  override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s = 
s) else false
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = true
{noformat}

Scala Shell (equals uses match{}):

{noformat}
class C(val s:String) extends Serializable {
  override def equals(o: Any) = o match {
case that: C = that.s == s
case _ = false
  }
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = true
{noformat}

2. lookup()
=

{noformat}
class C(val s:String) extends Serializable {
  override def equals(o: Any) = if (o.isInstanceOf[C]) o.asInstanceOf[C].s == s 
else false
  override def hashCode = s.hashCode
  override def toString = s
}
val r = sc.parallelize(Array((new C(a),11),(new C(a),12)))
r.lookup(new C(a))
console:17: error: type mismatch;
 found   : C
 required: C
  r.lookup(new C(a))
   ^
{noformat}

See
http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3C1400019424.80629.YahooMailNeo%40web160801.mail.bf1.yahoo.com%3E

  was:
Anand Avati partially traced the cause to REPL wrapping classes in $outer 
classes. There are at least two major symptoms:

1. equals()
=

In REPL equals() (required in custom classes used as a key for groupByKey) 
seems to have to be written using instanceOf[] instead of the canonical match{}

Spark Shell (equals uses match{}):

class C(val s:String) extends Serializable {
  override def equals(o: Any) = o match {
case that: C = that.s == s
case _ = false
  }
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = false

Spark Shell (equals uses isInstanceOf[]):

class C(val s:String) extends Serializable {
  override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s = 
s) else false
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = true

Scala Shell (equals uses match{}):

class C(val s:String) extends Serializable {
  override def equals(o: Any) = o match {
case that: C = that.s == s
case _ = false
  }
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = true

2. lookup()
=

class C(val s:String) extends Serializable {
  override def equals(o: Any) = if (o.isInstanceOf[C]) o.asInstanceOf[C].s == s 
else false
  override def hashCode = s.hashCode
  override def toString = s
}
val r = sc.parallelize(Array((new C(a),11),(new C(a),12)))
r.lookup(new C(a))
console:17: error: type mismatch;
 found   : C
 required: C
  r.lookup(new C(a))
   ^

See
http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3C1400019424.80629.YahooMailNeo%40web160801.mail.bf1

[jira] [Commented] (SPARK-1836) REPL $outer type mismatch causes lookup() and equals() problems

2014-05-16 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13998807#comment-13998807
 ] 

Michael Malak commented on SPARK-1836:
--

Michael Ambrust: Indeed. Do you think I should add my additional case of 
equals() (and its workaround) as a comment to SPARK-1199 and mark this one as a 
duplicate?

 REPL $outer type mismatch causes lookup() and equals() problems
 ---

 Key: SPARK-1836
 URL: https://issues.apache.org/jira/browse/SPARK-1836
 Project: Spark
  Issue Type: Bug
Affects Versions: 0.9.0
Reporter: Michael Malak

 Anand Avati partially traced the cause to REPL wrapping classes in $outer 
 classes. There are at least two major symptoms:
 1. equals()
 =
 In REPL equals() (required in custom classes used as a key for groupByKey) 
 seems to have to be written using instanceOf[] instead of the canonical 
 match{}
 Spark Shell (equals uses match{}):
 {noformat}
 class C(val s:String) extends Serializable {
   override def equals(o: Any) = o match {
 case that: C = that.s == s
 case _ = false
   }
 }
 val x = new C(a)
 val bos = new java.io.ByteArrayOutputStream()
 val out = new java.io.ObjectOutputStream(bos)
 out.writeObject(x);
 val b = bos.toByteArray();
 out.close
 bos.close
 val y = new java.io.ObjectInputStream(new 
 ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
 x.equals(y)
 res: Boolean = false
 {noformat}
 Spark Shell (equals uses isInstanceOf[]):
 {noformat}
 class C(val s:String) extends Serializable {
   override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s = 
 s) else false
 }
 val x = new C(a)
 val bos = new java.io.ByteArrayOutputStream()
 val out = new java.io.ObjectOutputStream(bos)
 out.writeObject(x);
 val b = bos.toByteArray();
 out.close
 bos.close
 val y = new java.io.ObjectInputStream(new 
 ava.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
 x.equals(y)
 res: Boolean = true
 {noformat}
 Scala Shell (equals uses match{}):
 {noformat}
 class C(val s:String) extends Serializable {
   override def equals(o: Any) = o match {
 case that: C = that.s == s
 case _ = false
   }
 }
 val x = new C(a)
 val bos = new java.io.ByteArrayOutputStream()
 val out = new java.io.ObjectOutputStream(bos)
 out.writeObject(x);
 val b = bos.toByteArray();
 out.close
 bos.close
 val y = new java.io.ObjectInputStream(new 
 java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
 x.equals(y)
 res: Boolean = true
 {noformat}
 2. lookup()
 =
 {noformat}
 class C(val s:String) extends Serializable {
   override def equals(o: Any) = if (o.isInstanceOf[C]) o.asInstanceOf[C].s == 
 s else false
   override def hashCode = s.hashCode
   override def toString = s
 }
 val r = sc.parallelize(Array((new C(a),11),(new C(a),12)))
 r.lookup(new C(a))
 console:17: error: type mismatch;
  found   : C
  required: C
   r.lookup(new C(a))
^
 {noformat}
 See
 http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3C1400019424.80629.YahooMailNeo%40web160801.mail.bf1.yahoo.com%3E



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-1817) RDD zip erroneous when partitions do not divide RDD count

2014-05-13 Thread Michael Malak (JIRA)
Michael Malak created SPARK-1817:


 Summary: RDD zip erroneous when partitions do not divide RDD count
 Key: SPARK-1817
 URL: https://issues.apache.org/jira/browse/SPARK-1817
 Project: Spark
  Issue Type: Bug
Affects Versions: 0.9.0
Reporter: Michael Malak


Example:

scala sc.parallelize(1L to 2L,4).zip(sc.parallelize(11 to 12,4)).collect
res1: Array[(Long, Int)] = Array((2,11))

But more generally, it's whenever the number of partitions does not evenly 
divide the total number of elements in the RDD.

See https://groups.google.com/forum/#!msg/spark-users/demrmjHFnoc/Ek3ijiXHr2MJ




--
This message was sent by Atlassian JIRA
(v6.2#6252)


Serializable different behavior Spark Shell vs. Scala Shell

2014-05-13 Thread Michael Malak
Reposting here on dev since I didn't see a response on user:

I'm seeing different Serializable behavior in Spark Shell vs. Scala Shell. In 
the Spark Shell, equals() fails when I use the canonical equals() pattern of 
match{}, but works when I subsitute with isInstanceOf[]. I am using Spark 
0.9.0/Scala 2.10.3.

Is this a bug?

Spark Shell (equals uses match{})
=

class C(val s:String) extends Serializable {
  override def equals(o: Any) = o match {
    case that: C = that.s == s
    case _ = false
  }
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = false

Spark Shell (equals uses isInstanceOf[])


class C(val s:String) extends Serializable {
  override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s == 
s) else false
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = true

Scala Shell (equals uses match{})
=

class C(val s:String) extends Serializable {
  override def equals(o: Any) = o match {
    case that: C = that.s == s
    case _ = false
  }
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = true


Re: Serializable different behavior Spark Shell vs. Scala Shell

2014-05-13 Thread Michael Malak
Thank you for your investigation into this!

Just for completeness, I've confirmed it's a problem only in REPL, not in 
compiled Spark programs.

But within REPL, a direct consequence of non-same classes after 
serialization/deserialization also means that lookup() doesn't work:

scala class C(val s:String) extends Serializable {
 |   override def equals(o: Any) = if (o.isInstanceOf[C]) 
o.asInstanceOf[C].s == s else false
 |   override def toString = s
 | }
defined class C

scala val r = sc.parallelize(Array((new C(a),11),(new C(a),12)))
r: org.apache.spark.rdd.RDD[(C, Int)] = ParallelCollectionRDD[3] at parallelize 
at console:14

scala r.lookup(new C(a))
console:17: error: type mismatch;
 found   : C
 required: C
  r.lookup(new C(a))
   ^



On Tuesday, May 13, 2014 3:05 PM, Anand Avati av...@gluster.org wrote:

On Tue, May 13, 2014 at 8:26 AM, Michael Malak michaelma...@yahoo.com wrote:

Reposting here on dev since I didn't see a response on user:

I'm seeing different Serializable behavior in Spark Shell vs. Scala Shell. In 
the Spark Shell, equals() fails when I use the canonical equals() pattern of 
match{}, but works when I subsitute with isInstanceOf[]. I am using Spark 
0.9.0/Scala 2.10.3.

Is this a bug?

Spark Shell (equals uses match{})
=

class C(val s:String) extends Serializable {
  override def equals(o: Any) = o match {
    case that: C = that.s == s
    case _ = false
  }
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = false

Spark Shell (equals uses isInstanceOf[])


class C(val s:String) extends Serializable {
  override def equals(o: Any) = if (o.isInstanceOf[C]) (o.asInstanceOf[C].s == 
s) else false
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = true

Scala Shell (equals uses match{})
=

class C(val s:String) extends Serializable {
  override def equals(o: Any) = o match {
    case that: C = that.s == s
    case _ = false
  }
}

val x = new C(a)
val bos = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(bos)
out.writeObject(x);
val b = bos.toByteArray();
out.close
bos.close
val y = new java.io.ObjectInputStream(new 
java.io.ByteArrayInputStream(b)).readObject().asInstanceOf[C]
x.equals(y)

res: Boolean = true



Hmm. I see that this can be reproduced without Spark in Scala 2.11, with and 
without -Yrepl-class-based command line flag to the repl. Spark's REPL has the 
effective behavior of 2.11's -Yrepl-class-based flag. Inspecting the byte code 
generated, it appears -Yrepl-class-based results in the creation of $outer 
field in the generated classes (including class C). The first case match in 
equals() is resulting code along the lines of (simplified):

if (o isinstanceof Cstr  this.$outer == that.$outer) { // do string compare 
// }

$outer is the synthetic field object to the outer object in which the object 
was created (in this case, the repl environment). Now obviously, when x is 
taken through the bytestream and deserialized, it would have a new $outer 
created (it may have deserialized in a different jvm or machine for all we 
know). So the $outer's mismatching is expected. However I'm still trying to 
understand why the outers need to be the same for the case match.


Re: Bug when zip with longs and too many partitions?

2014-05-12 Thread Michael Malak


I've discovered that it was noticed a year ago that RDD zip() does not work 
when the number of partitions does not evenly divide the total number of 
elements in the RDD:

https://groups.google.com/forum/#!msg/spark-users/demrmjHFnoc/Ek3ijiXHr2MJ

I will enter a JIRA ticket just as soon as the ASF Jira system will let me 
reset my password.



On Sunday, May 11, 2014 4:40 AM, Michael Malak michaelma...@yahoo.com wrote:

Is this a bug?

scala sc.parallelize(1 to 2,4).zip(sc.parallelize(11 to 12,4)).collect
res0: Array[(Int, Int)] = Array((1,11), (2,12))

scala sc.parallelize(1L to 2L,4).zip(sc.parallelize(11 to 12,4)).collect
res1: Array[(Long, Int)] = Array((2,11))


Re: Opinions stratosphere

2014-05-02 Thread Michael Malak
looks like Spark outperforms Stratosphere fairly consistently in the 
experiments

There was one exception the paper noted, which was when memory resources were 
constrained. In that case, Stratosphere seemed to have degraded more gracefully 
than Spark, but the author did not explore it deeper. The author did insert 
into his conclusion section, though, However, in our experiments, for 
iterative algorithms, the Spark programs may show the poor results in 
performance in the environment of limited memory resources.

I recently blogged a fuller list of alternatives/competitors to Spark:
http://datascienceassn.org/content/alternatives-spark-memory-distributed-computing

 
On Friday, May 2, 2014 10:39 AM, Philip Ogren philip.og...@oracle.com wrote:
 
Great reference!  I just skimmed through the results without reading much of 
the methodology - but it looks like Spark outperforms Stratosphere fairly 
consistently in the experiments.  It's too bad the data sources only range from 
2GB to 8GB.  Who knows if the apparent pattern would extend out to 64GB, 128GB, 
1TB, and so on...




On 05/01/2014 06:02 PM, Christopher Nguyen wrote:

Someone (Ze Ni, https://www.sics.se/people/ze-ni) has actually attempted such a 
comparative study as a Masters thesis: 


http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf



According to this snapshot (c. 2013), Stratosphere is different from Spark in 
not having an explicit concept of an in-memory dataset (e.g., RDD).


In principle this could be argued to be an implementation detail; the 
operators and execution plan/data flow are of primary concern in the API, and 
the data representation/materializations are otherwise unspecified.


But in practice, for long-running interactive applications, I consider RDDs to 
be of fundamental, first-class citizen importance, and the key distinguishing 
feature of Spark's model vs other in-memory approaches that treat memory 
merely as an implicit cache.


--

Christopher T. Nguyen
Co-founder  CEO, Adatao
linkedin.com/in/ctnguyen




On Tue, Nov 26, 2013 at 1:26 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

I don’t know a lot about it except from the research side, where the team has 
done interesting optimization stuff for these types of applications. In terms 
of the engine, one thing I’m not sure of is whether Stratosphere allows 
explicit caching of datasets (similar to RDD.cache()) and interactive queries 
(similar to spark-shell). But it’s definitely an interesting project to watch.

Matei
 

On Nov 22, 2013, at 4:17 PM, Ankur Chauhan achau...@brightcove.com wrote:

 Hi,

 That's what I thought but as per the slides on http://www.stratosphere.eu 
 they seem to know about spark and the scala api does look similar.
 I found the PACT model interesting. Would like to
  know if matei or other core comitters have something
  to weight in on.

 -- Ankur
 On 22 Nov 2013, at 16:05, Patrick Wendell pwend...@gmail.com wrote:

 I've never seen that project before, would be
  interesting to get a
 comparison. Seems to offer a much lower level
  API. For instance this
 is a wordcount program:

 https://github.com/stratosphere/stratosphere/blob/master/pact/pact-examples/src/main/java/eu/stratosphere/pact/example/wordcount/WordCount.java

 On Thu, Nov 21, 2013 at 3:15 PM, Ankur
  Chauhan achau...@brightcove.com wrote:
 Hi,

 I was just curious about https://github.com/stratosphere/stratosphere
 and how does spark compare to it. Anyone
  has any experience with it to make
 any comments?

 -- Ankur





Re: UDFs with package names

2013-07-31 Thread Michael Malak
Yup, it was the directory structure com/mystuff/whateverUDF.class that was 
missing.  Thought I had tried that before posting my question, but...

Thanks for your help!



 From: Edward Capriolo edlinuxg...@gmail.com
To: user@hive.apache.org user@hive.apache.org; Michael Malak 
michaelma...@yahoo.com 
Sent: Tuesday, July 30, 2013 7:06 PM
Subject: Re: UDFs with package names
 


It might be a better idea to use your own package com.mystuff.x. You might be 
running into an issue where java is not finding the file because it assumes the 
relation between package and jar is 1 to 1. You might also be compiling wrong 
If your package is com.mystuff that class file should be in a directory 
structure com/mystuff/whateverUDF.class I am not seeing that from your example.




On Tue, Jul 30, 2013 at 8:00 PM, Michael Malak michaelma...@yahoo.com wrote:

Thus far, I've been able to create Hive UDFs, but now I need to define them 
within a Java package name (as opposed to the default Java package as I had 
been doing), but once I do that, I'm no longer able to load them into Hive.

First off, this works:

add jar /usr/lib/hive/lib/hive-contrib-0.10.0-cdh4.3.0.jar;
create temporary function row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';

Then I took the source code for UDFRowSequence.java from
http://svn.apache.org/repos/asf/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udf/UDFRowSequence.java

and renamed the file and the class inside to UDFRowSequence2.java

I compile and deploy it with:
javac -cp 
/usr/lib/hive/lib/hive-exec-0.10.0-cdh4.3.0.jar:/usr/lib/hadoop/hadoop-common.jar
 UDFRowSequence2.java
jar cvf UDFRowSequence2.jar UDFRowSequence2.class
sudo cp UDFRowSequence2.jar /usr/local/lib


But in Hive, I get the following:
hive  add jar /usr/local/lib/UDFRowSequence2.jar;
Added /usr/local/lib/UDFRowSequence2.jar to class path
Added resource: /usr/local/lib/UDFRowSequence2.jar
hive create temporary function row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence2';
FAILED: Class org.apache.hadoop.hive.contrib.udf.UDFRowSequence2 not found
FAILED: Execution Error, return code 1 from 
org.apache.hadoop.hive.ql.exec.FunctionTask

But if I comment out the package line in UDFRowSequence2.java (to put the UDF 
into the default Java package), it works:
hive  add jar /usr/local/lib/UDFRowSequence2.jar;
Added /usr/local/lib/UDFRowSequence2.jar to class path
Added resource: /usr/local/lib/UDFRowSequence2.jar
hive create temporary function row_sequence as 'UDFRowSequence2';
OK
Time taken: 0.383 seconds

What am I doing wrong?  I have a feeling it's something simple.



UDFs with package names

2013-07-30 Thread Michael Malak
Thus far, I've been able to create Hive UDFs, but now I need to define them 
within a Java package name (as opposed to the default Java package as I had 
been doing), but once I do that, I'm no longer able to load them into Hive.

First off, this works:

add jar /usr/lib/hive/lib/hive-contrib-0.10.0-cdh4.3.0.jar;
create temporary function row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';

Then I took the source code for UDFRowSequence.java from
http://svn.apache.org/repos/asf/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udf/UDFRowSequence.java

and renamed the file and the class inside to UDFRowSequence2.java

I compile and deploy it with:
javac -cp 
/usr/lib/hive/lib/hive-exec-0.10.0-cdh4.3.0.jar:/usr/lib/hadoop/hadoop-common.jar
 UDFRowSequence2.java
jar cvf UDFRowSequence2.jar UDFRowSequence2.class
sudo cp UDFRowSequence2.jar /usr/local/lib


But in Hive, I get the following:
hive  add jar /usr/local/lib/UDFRowSequence2.jar;
Added /usr/local/lib/UDFRowSequence2.jar to class path
Added resource: /usr/local/lib/UDFRowSequence2.jar
hive create temporary function row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence2';
FAILED: Class org.apache.hadoop.hive.contrib.udf.UDFRowSequence2 not found
FAILED: Execution Error, return code 1 from 
org.apache.hadoop.hive.ql.exec.FunctionTask

But if I comment out the package line in UDFRowSequence2.java (to put the UDF 
into the default Java package), it works:
hive  add jar /usr/local/lib/UDFRowSequence2.jar;
Added /usr/local/lib/UDFRowSequence2.jar to class path
Added resource: /usr/local/lib/UDFRowSequence2.jar
hive create temporary function row_sequence as 'UDFRowSequence2';
OK
Time taken: 0.383 seconds

What am I doing wrong?  I have a feeling it's something simple.



Re: Best Performance on Large Scale Join

2013-07-29 Thread Michael Malak
Perhaps you can first create a temp table that contains only the records that 
will match?  See the UNION ALL trick at
http://www.mail-archive.com/hive-user@hadoop.apache.org/msg01906.html




 From: Brad Ruderman bruder...@radiumone.com
To: user@hive.apache.org 
Sent: Monday, July 29, 2013 11:38 AM
Subject: Best Performance on Large Scale Join
 


Hi All-

I have 2 tables:

CREATE TABLE users (
a bigint,
b int
)

CREATE TABLE products (
a bigint,
c int
)

Each table has about 8 billion records (roughly 2k files total mappers). I want 
to know the most performant way to do the following query:

SELECT u.b,
              p.c,
              count(*) as count
FROM users u
INNER JOIN products p
ON u.a = p.a
GROUP BY u.b, p.c

Right now the reducing is killing me. Any suggestions on improving performance? 
Would a mapbucket join be optimal here?

Thanks,
Brad

Re: Oracle to Hive

2013-07-10 Thread Michael Malak
Untested:

SELECT a.c100, a.c300, b.c400
  FROM t1 a
  JOIN t2 b
  ON a.c200 = b.c200
  JOIN (SELECT DISTINCT a.c100
          FROM t1 a2
          JOIN t2 b2
          ON a2.c200 = b2.c200
        WHERE b2.c400 = SYSDATE - 1) a3
  ON a.c100 = a3.c100
  WHERE b.c400 = SYSDATE - 1
   AND a.c300 = 0




 From: Raj Hadoop hadoop...@yahoo.com
To: Hive user@hive.apache.org 
Sent: Wednesday, July 10, 2013 3:30 PM
Subject: Oracle to Hive
 


 
All,
 
Can anyone give me tips on how to convert the following Oracle SQL to a Hive 
query.
 
 
SELECT a.c100, a.c300, b.c400
  FROM t1 a JOIN t2 b ON a.c200 = b.c200
 WHERE a.c100 IN (SELECT DISTINCT a.c100
 FROM t1 a JOIN t2 b ON a.c200 = b.c200
    WHERE b.c400 = SYSDATE - 1)
   AND b.c400 = SYSDATE - 1
   AND a.c300 = 0
 
 
The SYSDATE can be replaced by  
date_sub(FROM_UNIXTIME(UNIX_TIMESTAMP(),'-MM-dd') , 1) in Hive.
 
But I wanted to know the rest of the query. Any pointers or tips so that I can 
start on my own.
 
Thanks in advance.
 
Regards,
Raj

Re: How Can I store the Hive query result in one file ?

2013-07-04 Thread Michael Malak
I have found that for output larger than a few GB, redirecting stdout results 
in an incomplete file.  For very large output, I do CREATE TABLE MYTABLE AS 
SELECT ... and then copy the resulting HDFS files directly out of 
/user/hive/warehouse.
 


 From: Bertrand Dechoux decho...@gmail.com
To: user@hive.apache.org 
Sent: Thursday, July 4, 2013 7:09 AM
Subject: Re: How Can I store the Hive query result in one file ?
  


The question is what is the volume of your output. There is one file per output 
task (map or reduce) because that way each can write it independently and in 
parallel. That's how mapreduce work. And except by forcing the number of tasks 
to 1, there is no certain way to have one output file.

But indeed if the volume is low enough, you could also capture the standard 
output into a local file like Nitin described.

Bertrand



On Thu, Jul 4, 2013 at 12:38 PM, Nitin Pawar nitinpawar...@gmail.com wrote:

will hive -e query  filename  or hive -f query.q  filename will do ? 


you specially want it to write into a named file on hdfs only? 



On Thu, Jul 4, 2013 at 3:12 PM, Matouk IFTISSEN matouk.iftis...@ysance.com 
wrote:

Hello Hive users,
Is there a manner to store the Hive  query result (SELECT *.) in a 
specfique  and alone file (given the file name) like (INSERT OVERWRITE LOCAL 
DIRECTORY '/directory_path_name/')?
Thanks for your answers






-- 
Nitin Pawar
 


-- 
Bertrand Dechoux 

Re: Fwd: Need urgent help in hive query

2013-06-28 Thread Michael Malak
Just copy and paste the whole long expressions to their second occurrences.



 From: dyuti a hadoop.hiv...@gmail.com
To: user@hive.apache.org 
Sent: Friday, June 28, 2013 10:58 AM
Subject: Fwd: Need urgent help in hive query
 


Hi Experts,
I'm trying with the below SQL query in Hive, which does not support column 
alias access in subsequent columns as shown below in the query. Is there any 
other way to rewrite the same without using alias? any of your help are really 
appreciated.
 
INSERT INTO CAUD    
   (
 pst_cc
  pst_no
  pst_ber
  pst_tkn
  pst_dtm
  pst_nbr
  pst_cde
  pst_dte
)
SELECT der.cc
  der.no
  der.ber
  der.tkn
  der.dtm
  der.nbr
  der.cde
  der.dte
  
FROM (SELECT udp.cc
   udp.no
   udp.ber
   udp.tkn
   ,CASE WHEN udp.SYSTEM_DTE1600 AND udp.SYSTEM_DTE1 THEN 
udp.SYSTEM_DTE
    WHEN udp.DTE_OUT1600 AND udp.DTE_OUT1 THEN udp.DTE_OUT
    WHEN udp.DTE_IN1600 AND udp.DTE_IN1 THEN udp.DTE_IN
    ELSE '1231'
    END  AS DTE_OUT
   ,CASE WHEN udp.TME_OUT  0 THEN udp.TME_OUT
    WHEN udp.TME_IN  0 THEN udp.TME_IN
    ELSE 0
    END AS TME_OUT
    ,TRIM(CAST(TME_OUT  AS CHAR(6))) AS TME_OUT1
    ,CAST(CAST(SUBSTR(TRIM(DTE_OUT),1,8)  AS CHAR(8)) AS DATE FORMAT 
'mmdd')  AS DTE_OUT_O
    ,CASE WHEN TME_OUT9 THEN CAST(TME_OUT1 AS CHAR(6))
    WHEN TME_OUT AND TME_OUT=9 THEN CAST('0'||TME_OUT1  AS 
CHAR(6))
    WHEN TME_OUT999 AND TME_OUT= THEN CAST('00'||TME_OUT1 AS 
CHAR(6))
    WHEN TME_OUT99 AND TME_OUT=999 THEN CAST('000'||TME_OUT1 AS 
CHAR(6))
    WHEN TME_OUT9 AND TME_OUT=99 THEN CAST(''||TME_OUT1 AS 
CHAR(6))
    WHEN TME_OUT0 AND TME_OUT=9 THEN CAST('0'||TME_OUT1 AS 
CHAR(6)) 
    WHEN TME_OUT=0 THEN '00'
    END AS TME_OUT2
    
,SUBSTR(TRIM(TME_OUT2),1,2)||':'||SUBSTR(TRIM(TME_OUT2),3,2)||':'||SUBSTR(TRIM(TME_OUT2),5,2)
 AS  
    TME_OUT_O
    , CAST( DTE_OUT_O||' '||TME_OUT_O AS TIMESTAMP FORMAT 'MMDD:HH: 
MI:SS') AS DTM
    ,udp.nbr  AS  nbr 
    
   FROM   STS_GNCAUDP   udp 
   
    INNER JOIN LOAD_LOG LZ_LL  ON udp.LOG_KEY=LZ_LL.LOG_KEY
    INNER JOIN ESA_LOAD_LOG ESA_LL  ON 
LZ_LL.ESA_LOAD_LOG_KEY=ESA_LL.LOG_KEY
    AND ESA_LL.PBLSH_IND='$PBLSH_IND'
    AND ESA_LL.LOAD_END_DTM ='$HIGH_DATE_TIME'
    AND ESA_LL.SOR_CD= '$CLM_SOR_CD'
    AND ESA_LL.SUBJ_AREA_NM= '$SUBJ_AREA_NM'
    AND ESA_LL.WORK_FLOW_NM= '$WORK_FLOW_NM'    
    QUALIFY ROW_NUMBER()  OVER  (PARTITION BY udp.cc,udp.pst_no,
    udp.cde,udp.nbr,udp.dte,udp.LOG_KEY
    ORDER BY DTM DESC)=1) AS der
;
 
 
 
Thanks in advance!
Dti

Re: INSERT non-static data to array?

2013-06-20 Thread Michael Malak
I've created
https://issues.apache.org/jira/browse/HIVE-4771

to track this issue.


- Original Message -
From: Michael Malak michaelma...@yahoo.com
To: user@hive.apache.org user@hive.apache.org
Cc: 
Sent: Wednesday, June 19, 2013 2:35 PM
Subject: Re: INSERT non-static data to array?

The example code for inline_table() there has static data.  It's not possible 
to use a subquery inside the inline_table() or array() is it?

The SQL1999 way is described here:

http://www.postgresql.org/message-id/20041028232152.ga76...@winnie.fuhr.org


CREATE TABLE table_a(a int, b int, c int[]);

INSERT INTO table_a
  SELECT a, b, ARRAY(SELECT c FROM table_c WHERE table_c.parent = table_b.id)
  FROM table_b


From: Edward Capriolo edlinuxg...@gmail.com
To: user@hive.apache.org user@hive.apache.org; Michael Malak 
michaelma...@yahoo.com 
Sent: Wednesday, June 19, 2013 2:06 PM
Subject: Re: INSERT non-static data to array?



: https://issues.apache.org/jira/browse/HIVE-3238


This might fit the bill.




On Wed, Jun 19, 2013 at 3:23 PM, Michael Malak michaelma...@yahoo.com wrote:

Is the only way to INSERT data into a column of type array to load data from 
a pre-existing file, to use hard-coded values in the INSERT statement, or copy 
an entire array verbatim from another table?  I.e. I'm assuming that a) SQL1999 
array INSERT via subquery is not (yet) implemented in Hive, and b) there is 
also no other way to load dynamically generated data into an array column?  
If my assumption in a) is true, does a Jira item need to be created for it?



Re: INSERT non-static data to array?

2013-06-20 Thread Michael Malak
My understanding is that LATERAL VIEW goes the other direction: takes an array 
and makes it into separate rows.  I use that a lot.  But I also need to go the 
other way sometimes: take a bunch of rows and squeeze them down into an array.  
Please correct me if I'm missing something.
 


 From: Edward Capriolo edlinuxg...@gmail.com
To: user@hive.apache.org user@hive.apache.org; Michael Malak 
michaelma...@yahoo.com 
Sent: Thursday, June 20, 2013 9:15 PM
Subject: Re: INSERT non-static data to array?
  


i think you could select into as sub query and then use lateral view.not 
exactly the same but something similar could be done,.

On Thursday, June 20, 2013, Michael Malak michaelma...@yahoo.com wrote:
 I've created
 https://issues.apache.org/jira/browse/HIVE-4771

 to track this issue.


 - Original Message -
 From: Michael Malak michaelma...@yahoo.com
 To: user@hive.apache.org user@hive.apache.org
 Cc:
 Sent: Wednesday, June 19, 2013 2:35 PM
 Subject: Re: INSERT non-static data to array?

 The example code for inline_table() there has static data.  It's not possible 
 to use a subquery inside the inline_table() or array() is it?

 The SQL1999 way is described here:

 http://www.postgresql.org/message-id/20041028232152.ga76...@winnie.fuhr.org


 CREATE TABLE table_a(a int, b int, c int[]);

 INSERT INTO table_a
   SELECT a, b, ARRAY(SELECT c FROM table_c WHERE table_c.parent = table_b.id)
   FROM table_b

 
 From: Edward Capriolo edlinuxg...@gmail.com
 To: user@hive.apache.org user@hive.apache.org; Michael Malak 
 michaelma...@yahoo.com
 Sent: Wednesday, June 19, 2013 2:06 PM
 Subject: Re: INSERT non-static data to array?



 : https://issues.apache.org/jira/browse/HIVE-3238


 This might fit the bill.




 On Wed, Jun 19, 2013 at 3:23 PM, Michael Malak michaelma...@yahoo.com wrote:

 Is the only way to INSERT data into a column of type array to load data 
 from a pre-existing file, to use hard-coded values in the INSERT statement, 
 or copy an entire array verbatim from another table?  I.e. I'm assuming that 
 a) SQL1999 array INSERT via subquery is not (yet) implemented in Hive, and b) 
 there is also no other way to load dynamically generated data into an array 
 column?  If my assumption in a) is true, does a Jira item need to be created 
 for it?



[jira] [Created] (HIVE-4771) Support subqueries in INSERT for array types

2013-06-20 Thread Michael Malak (JIRA)
Michael Malak created HIVE-4771:
---

 Summary: Support subqueries in INSERT for array types
 Key: HIVE-4771
 URL: https://issues.apache.org/jira/browse/HIVE-4771
 Project: Hive
  Issue Type: Improvement
  Components: Query Processor
Reporter: Michael Malak


Since Hive supports SQL1999-style array types for columns, it would be nice for 
there to be a way to INSERT non-static data into such columns -- i.e. from 
another table based on a complex query as opposed to loading from a static 
file, loading from hard-coded values within the INSERT query, or copying 
complete arrays verbatim from another table.

An example can be found at:
http://www.postgresql.org/message-id/20041028232152.ga76...@winnie.fuhr.org

CREATE TABLE table_a(a int, b int, c int[]);

INSERT INTO table_a
  SELECT a, b, ARRAY(SELECT c FROM table_c WHERE table_c.parent = table_b.id)
  FROM table_b

This should be implemented after regular correlated and uncorrelated subqueries 
are implemented:

https://issues.apache.org/jira/browse/HIVE-784 Support uncorrelated subqueries 
in the WHERE clause

https://issues.apache.org/jira/browse/HIVE-1799 Support correlated subqueries 
in the WHERE clause


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: INSERT non-static data to array?

2013-06-19 Thread Michael Malak
The example code for inline_table() there has static data.  It's not possible 
to use a subquery inside the inline_table() or array() is it?

The SQL1999 way is described here:

http://www.postgresql.org/message-id/20041028232152.ga76...@winnie.fuhr.org


CREATE TABLE table_a(a int, b int, c int[]);

INSERT INTO table_a
  SELECT a, b, ARRAY(SELECT c FROM table_c WHERE table_c.parent = table_b.id)
  FROM table_b


From: Edward Capriolo edlinuxg...@gmail.com
To: user@hive.apache.org user@hive.apache.org; Michael Malak 
michaelma...@yahoo.com 
Sent: Wednesday, June 19, 2013 2:06 PM
Subject: Re: INSERT non-static data to array?



: https://issues.apache.org/jira/browse/HIVE-3238


This might fit the bill.




On Wed, Jun 19, 2013 at 3:23 PM, Michael Malak michaelma...@yahoo.com wrote:

Is the only way to INSERT data into a column of type array to load data from 
a pre-existing file, to use hard-coded values in the INSERT statement, or copy 
an entire array verbatim from another table?  I.e. I'm assuming that a) SQL1999 
array INSERT via subquery is not (yet) implemented in Hive, and b) there is 
also no other way to load dynamically generated data into an array column?  
If my assumption in a) is true, does a Jira item need to be created for it?



Re: Hive QL - NOT IN, NOT EXIST

2013-05-05 Thread Michael Malak

--- On Sun, 5/5/13, Peter Chu pete@outlook.com wrote:

 I am wondering if there is any way to do this without resorting to
 using left outer join and finding nulls.

I have found this to be an acceptable substitute.  Is it not working for you?



[jira] [Commented] (HIVE-4022) Structs and struct fields cannot be NULL in INSERT statements

2013-02-20 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13582662#comment-13582662
 ] 

Michael Malak commented on HIVE-4022:
-

Note that there is a workaround for the case of setting STRUCT fields to NULL, 
but not for setting the whole STRUCT to a NULL.

The following workaround does work:

INSERT INT TABLE oc SELECT named_struct('a', cast(null as int), 'b', cast(null 
as int)) FROM tc;

But there is no equivalent workaround to casting the whole STRUCT to NULL, as 
noted in the first comment of https://issues.apache.org/jira/browse/HIVE-1287

 Structs and struct fields cannot be NULL in INSERT statements
 -

 Key: HIVE-4022
 URL: https://issues.apache.org/jira/browse/HIVE-4022
 Project: Hive
  Issue Type: Bug
  Components: Serializers/Deserializers
Reporter: Michael Malak

 Originally thought to be Avro-specific, and first noted with respect to 
 HIVE-3528 Avro SerDe doesn't handle serializing Nullable types that require 
 access to a Schema, it turns out even native Hive tables cannot store NULL 
 in a STRUCT field or for the entire STRUCT itself, at least when the NULL is 
 specified directly in the INSERT statement.
 Again, this affects both Avro-backed tables and native Hive tables.
 ***For native Hive tables:
 The following:
 echo 1,2 twovalues.csv
 hive
 CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
 LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc;
 CREATE TABLE oc (z STRUCTa: int, b: int);
 INSERT INTO TABLE oc SELECT null FROM tc;
 produces the error
 FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target 
 table because column number/types are different 'oc': Cannot convert column 0 
 from void to structa:int,b:int.
 The following:
 INSERT INTO TABLE oc SELECT named_struct('a', null, 'b', null) FROM tc;
 produces the error:
 FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target 
 table because column number/types are different 'oc': Cannot convert column 0 
 from structa:void,b:void to structa:int,b:int.
 ***For Avro:
 In HIVE-3528, there is in fact a null-struct test case in line 14 of
 https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt
 The test script at
 https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q
 does indeed work.  But in that test, the query gets all of its data from a 
 test table verbatim:
 INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer;
 If instead we stick in a hard-coded null for the struct directly into the 
 query, it fails:
 INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, 
 bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, 
 bytes1, fixed1 FROM test_serializer;
 with the following error:
 FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target 
 table because column number/types are different 'as_avro': Cannot convert 
 column 10 from void to structsint:int,sboolean:boolean,sstring:string.
 Note, though, that substituting a hard-coded null for string1 (and restoring 
 struct1 into the query) does work:
 INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, 
 bigint1, boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, 
 bytes1, fixed1 FROM test_serializer;

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (HIVE-3528) Avro SerDe doesn't handle serializing Nullable types that require access to a Schema

2013-02-20 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-3528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13582664#comment-13582664
 ] 

Michael Malak commented on HIVE-3528:
-

As noted in the first comment from 
https://issues.apache.org/jira/browse/HIVE-1287, casting to a STRUCT is not 
currently supported.

However, I did just now try casting individual fields of a STRUCT and that 
indeed does work.

I just now added details to the JIRA that I created last week.
https://issues.apache.org/jira/browse/HIVE-4022


 Avro SerDe doesn't handle serializing Nullable types that require access to a 
 Schema
 

 Key: HIVE-3528
 URL: https://issues.apache.org/jira/browse/HIVE-3528
 Project: Hive
  Issue Type: Bug
  Components: Serializers/Deserializers
Reporter: Sean Busbey
Assignee: Sean Busbey
  Labels: avro
 Fix For: 0.11.0

 Attachments: HIVE-3528.1.patch.txt, HIVE-3528.2.patch.txt


 Deserialization properly handles hiding Nullable Avro types, including 
 complex types like record, map, array, etc. However, when Serialization 
 attempts to write out these types it erroneously makes use of the UNION 
 schema that contains NULL and the other type.
 This results in Schema mis-match errors for Record, Array, Enum, Fixed, and 
 Bytes.
 Here's a [review board of unit tests that express the 
 problem|https://reviews.apache.org/r/7431/], as well as one that supports the 
 case that it's only when the schema is needed.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: NULLable STRUCTs

2013-02-19 Thread Michael Malak
If no one has any objection, I'm going to update HIVE-4022, which I entered a 
week ago when I thought the behavior was Avro-specific, to indicate it actually 
affects even native Hive tables.

https://issues.apache.org/jira/browse/HIVE-4022

--- On Fri, 2/15/13, Michael Malak michaelma...@yahoo.com wrote:

 From: Michael Malak michaelma...@yahoo.com
 Subject: NULLable STRUCTs
 To: user@hive.apache.org
 Date: Friday, February 15, 2013, 5:03 PM
 It seems that all Hive columns (at
 least those of primitive types) are always NULLable? 
 What about columns of type STRUCT?
 
 The following:
 
 echo 1,2 twovalues.csv
 hive
 CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS
 TERMINATED BY ',';
 LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc;
 CREATE TABLE oc (z STRUCTa: int, b: int);
 INSERT INTO TABLE oc SELECT null FROM tc;
 
 produces the error
 
 FAILED: SemanticException [Error 10044]: Line 1:18 Cannot
 insert into target table because column number/types are
 different 'oc': Cannot convert column 0 from void to
 structa:int,b:int.
 
 I initially discovered such behavior with Avro-backed
 tables, and even entered a JIRA
 https://issues.apache.org/jira/browse/HIVE-4022
 but now I realized it happens with CSV-backed tables as
 well.
 
 Perhaps related, perhaps not, it seems that all members of a
 STRUCT are always non-NULLable.  The following:
 
 INSERT INTO TABLE oc SELECT named_struct('a', null, 'b',
 null) FROM tc;
 
 produces the error:
 
 FAILED: SemanticException [Error 10044]: Line 1:18 Cannot
 insert into target table because column number/types are
 different 'oc': Cannot convert column 0 from
 structa:void,b:void to structa:int,b:int.
 



[jira] [Updated] (HIVE-4022) Structs and struct fields cannot be NULL in INSERT statements

2013-02-19 Thread Michael Malak (JIRA)

 [ 
https://issues.apache.org/jira/browse/HIVE-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Malak updated HIVE-4022:


Description: 
Originally thought to be Avro-specific, and first noted with respect to 
HIVE-3528 Avro SerDe doesn't handle serializing Nullable types that require 
access to a Schema, it turns out even native Hive tables cannot store NULL in 
a STRUCT field or for the entire STRUCT itself, at least when the NULL is 
specified directly in the INSERT statement.

Again, this affects both Avro-backed tables and native Hive tables.

***For native Hive tables:

The following:

echo 1,2 twovalues.csv
hive
CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc;
CREATE TABLE oc (z STRUCTa: int, b: int);
INSERT INTO TABLE oc SELECT null FROM tc;

produces the error

FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target 
table because column number/types are different 'oc': Cannot convert column 0 
from void to structa:int,b:int.

The following:

INSERT INTO TABLE oc SELECT named_struct('a', null, 'b', null) FROM tc;

produces the error:

FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target 
table because column number/types are different 'oc': Cannot convert column 0 
from structa:void,b:void to structa:int,b:int.

***For Avro:

In HIVE-3528, there is in fact a null-struct test case in line 14 of
https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt

The test script at
https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q

does indeed work.  But in that test, the query gets all of its data from a test 
table verbatim:

INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer;

If instead we stick in a hard-coded null for the struct directly into the 
query, it fails:

INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, 
bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, 
bytes1, fixed1 FROM test_serializer;

with the following error:

FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target 
table because column number/types are different 'as_avro': Cannot convert 
column 10 from void to structsint:int,sboolean:boolean,sstring:string.

Note, though, that substituting a hard-coded null for string1 (and restoring 
struct1 into the query) does work:

INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, bigint1, 
boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, bytes1, 
fixed1 FROM test_serializer;


  was:
Related to HIVE-3528,

There is in fact a null-struct test case in line 14 of
https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt

The test script at
https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q

does indeed work.  But in that test, the query gets all of its data from a test 
table verbatim:

INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer;

If instead we stick in a hard-coded null for the struct directly into the 
query, it fails:

INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, 
bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, 
bytes1, fixed1 FROM test_serializer;

with the following error:

FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target 
table because column number/types are different 'as_avro': Cannot convert 
column 10 from void to structsint:int,sboolean:boolean,sstring:string.

Note, though, that substituting a hard-coded null for string1 (and restoring 
struct1 into the query) does work:

INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, bigint1, 
boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, bytes1, 
fixed1 FROM test_serializer;


Summary: Structs and struct fields cannot be NULL in INSERT statements  
(was: Avro SerDe queries don't handle hard-coded nulls for optional/nullable 
structs)

 Structs and struct fields cannot be NULL in INSERT statements
 -

 Key: HIVE-4022
 URL: https://issues.apache.org/jira/browse/HIVE-4022
 Project: Hive
  Issue Type: Bug
  Components: Serializers/Deserializers
Reporter: Michael Malak

 Originally thought to be Avro-specific, and first noted with respect to 
 HIVE-3528 Avro SerDe doesn't handle serializing Nullable types that require 
 access to a Schema, it turns out even native Hive tables cannot store NULL 
 in a STRUCT field or for the entire STRUCT itself, at least when the NULL is 
 specified directly in the INSERT

NULLable STRUCTs

2013-02-15 Thread Michael Malak
It seems that all Hive columns (at least those of primitive types) are always 
NULLable?  What about columns of type STRUCT?

The following:

echo 1,2 twovalues.csv
hive
CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc;
CREATE TABLE oc (z STRUCTa: int, b: int);
INSERT INTO TABLE oc SELECT null FROM tc;

produces the error

FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target 
table because column number/types are different 'oc': Cannot convert column 0 
from void to structa:int,b:int.

I initially discovered such behavior with Avro-backed tables, and even entered 
a JIRA
https://issues.apache.org/jira/browse/HIVE-4022
but now I realized it happens with CSV-backed tables as well.

Perhaps related, perhaps not, it seems that all members of a STRUCT are always 
non-NULLable.  The following:

INSERT INTO TABLE oc SELECT named_struct('a', null, 'b', null) FROM tc;

produces the error:

FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target 
table because column number/types are different 'oc': Cannot convert column 0 
from structa:void,b:void to structa:int,b:int.



[jira] [Commented] (HIVE-3528) Avro SerDe doesn't handle serializing Nullable types that require access to a Schema

2013-02-14 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-3528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13578523#comment-13578523
 ] 

Michael Malak commented on HIVE-3528:
-

I've tried the latest Avro SerDe from GitHub, and it allows me to write NULLs 
with a simple schema, but not with anything involving STRUCTs.  If a STRUCT 
contains NULLable fields, or the entire STRUCT is NULLable (optional), Hive 
throws exceptions.  Should I create new JIRA item(s)?

 Avro SerDe doesn't handle serializing Nullable types that require access to a 
 Schema
 

 Key: HIVE-3528
 URL: https://issues.apache.org/jira/browse/HIVE-3528
 Project: Hive
  Issue Type: Bug
  Components: Serializers/Deserializers
Reporter: Sean Busbey
Assignee: Sean Busbey
  Labels: avro
 Fix For: 0.11.0

 Attachments: HIVE-3528.1.patch.txt, HIVE-3528.2.patch.txt


 Deserialization properly handles hiding Nullable Avro types, including 
 complex types like record, map, array, etc. However, when Serialization 
 attempts to write out these types it erroneously makes use of the UNION 
 schema that contains NULL and the other type.
 This results in Schema mis-match errors for Record, Array, Enum, Fixed, and 
 Bytes.
 Here's a [review board of unit tests that express the 
 problem|https://reviews.apache.org/r/7431/], as well as one that supports the 
 case that it's only when the schema is needed.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (HIVE-3528) Avro SerDe doesn't handle serializing Nullable types that require access to a Schema

2013-02-14 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-3528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13578538#comment-13578538
 ] 

Michael Malak commented on HIVE-3528:
-

Sean:

I mean
https://github.com/apache/hive/tree/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro

which has:
AvroSerializer.java 20 days ago HIVE-3528 : Avro SerDe doesn't 
handle serializing Nullable types that


 Avro SerDe doesn't handle serializing Nullable types that require access to a 
 Schema
 

 Key: HIVE-3528
 URL: https://issues.apache.org/jira/browse/HIVE-3528
 Project: Hive
  Issue Type: Bug
  Components: Serializers/Deserializers
Reporter: Sean Busbey
Assignee: Sean Busbey
  Labels: avro
 Fix For: 0.11.0

 Attachments: HIVE-3528.1.patch.txt, HIVE-3528.2.patch.txt


 Deserialization properly handles hiding Nullable Avro types, including 
 complex types like record, map, array, etc. However, when Serialization 
 attempts to write out these types it erroneously makes use of the UNION 
 schema that contains NULL and the other type.
 This results in Schema mis-match errors for Record, Array, Enum, Fixed, and 
 Bytes.
 Here's a [review board of unit tests that express the 
 problem|https://reviews.apache.org/r/7431/], as well as one that supports the 
 case that it's only when the schema is needed.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (HIVE-3528) Avro SerDe doesn't handle serializing Nullable types that require access to a Schema

2013-02-14 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-3528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13578783#comment-13578783
 ] 

Michael Malak commented on HIVE-3528:
-

Sean:

OK, I've researched the problem further.

There is in fact a null-struct test case in line 14 of
https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt

The test script at
https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q

does indeed work when I tested it locally.  But in that test, the query gets 
all of its data from a test table verbatim:

INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer;

If instead we stick in a hard-coded null for the struct directly into the 
query, it fails:

INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, 
bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, 
bytes1, fixed1 FROM test_serializer;

with the following error:

FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target 
table because column number/types are different 'as_avro': Cannot convert 
column 10 from void to structsint:int,sboolean:boolean,sstring:string.

Note, though, that substituting a hard-coded null for string1 (and restoring 
struct1 to the query) does work:

INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, bigint1, 
boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, bytes1, 
fixed1 FROM test_serializer;

I will be entering an all-new JIRA for this.


 Avro SerDe doesn't handle serializing Nullable types that require access to a 
 Schema
 

 Key: HIVE-3528
 URL: https://issues.apache.org/jira/browse/HIVE-3528
 Project: Hive
  Issue Type: Bug
  Components: Serializers/Deserializers
Reporter: Sean Busbey
Assignee: Sean Busbey
  Labels: avro
 Fix For: 0.11.0

 Attachments: HIVE-3528.1.patch.txt, HIVE-3528.2.patch.txt


 Deserialization properly handles hiding Nullable Avro types, including 
 complex types like record, map, array, etc. However, when Serialization 
 attempts to write out these types it erroneously makes use of the UNION 
 schema that contains NULL and the other type.
 This results in Schema mis-match errors for Record, Array, Enum, Fixed, and 
 Bytes.
 Here's a [review board of unit tests that express the 
 problem|https://reviews.apache.org/r/7431/], as well as one that supports the 
 case that it's only when the schema is needed.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (HIVE-4022) Avro SerDe queries don't handle hard-coded nulls for optional/nullable structs

2013-02-14 Thread Michael Malak (JIRA)
Michael Malak created HIVE-4022:
---

 Summary: Avro SerDe queries don't handle hard-coded nulls for 
optional/nullable structs
 Key: HIVE-4022
 URL: https://issues.apache.org/jira/browse/HIVE-4022
 Project: Hive
  Issue Type: Bug
  Components: Serializers/Deserializers
Reporter: Michael Malak


Related to HIVE-3528,

There is in fact a null-struct test case in line 14 of
https://github.com/apache/hive/blob/15cc604bf10f4c2502cb88fb8bb3dcd45647cf2c/data/files/csv.txt

The test script at
https://github.com/apache/hive/blob/12d6f3e7d21f94e8b8490b7c6d291c9f4cac8a4f/ql/src/test/queries/clientpositive/avro_nullable_fields.q

does indeed work.  But in that test, the query gets all of its data from a test 
table verbatim:

INSERT OVERWRITE TABLE as_avro SELECT * FROM test_serializer;

If instead we stick in a hard-coded null for the struct directly into the 
query, it fails:

INSERT OVERWRITE TABLE as_avro SELECT string1, int1, tinyint1, smallint1, 
bigint1, boolean1, float1, double1, list1, map1, null, enum1, nullableint, 
bytes1, fixed1 FROM test_serializer;

with the following error:

FAILED: SemanticException [Error 10044]: Line 1:23 Cannot insert into target 
table because column number/types are different 'as_avro': Cannot convert 
column 10 from void to structsint:int,sboolean:boolean,sstring:string.

Note, though, that substituting a hard-coded null for string1 (and restoring 
struct1 into the query) does work:

INSERT OVERWRITE TABLE as_avro SELECT null, int1, tinyint1, smallint1, bigint1, 
boolean1, float1, double1, list1, map1, struct1, enum1, nullableint, bytes1, 
fixed1 FROM test_serializer;


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: INSERT INTO table with STRUCT, SELECT FROM

2013-02-13 Thread Michael Malak
Figured it out 
fromhttps://cwiki.apache.org/Hive/languagemanual-udf.html#LanguageManualUDF-ComplexTypeConstructors

It should beINSERT INTO TABLE oc SELECT named_struct('a', x, 'b', y) FROM tc;

--- On Wed, 2/13/13, Dean Wampler dean.wamp...@thinkbiganalytics.com wrote:

From: Dean Wampler dean.wamp...@thinkbiganalytics.com
Subject: Re: INSERT INTO table with STRUCT, SELECT FROM
To: user@hive.apache.org
Date: Wednesday, February 13, 2013, 12:47 PM

Hmm. I tried the following hacks, but all wouldn't parse. Ideas?
I changed:
  ... select struct(x,y) ... 
to
  ... select struct(x,y) as structa:int,b:int ... 
  ... select cast(struct(x,y) as structa:int,b:int) ...   ... select struct(x 
as a,y as b) ... 
Okay, but there is a hack that does work; By pass INSERT INTO and just write to 
the directory:

INSERT DIRECTORY '/path/to/table/directory' SELECT ...;
Just be careful it doesn't clobber any files already there. I'm paranoid, so I 
would write to a different directory and then move the files over...

dean

On Wed, Feb 13, 2013 at 1:26 PM, Michael Malak michaelma...@yahoo.com wrote:

Is it possible to INSERT INTO TABLE t SELECT FROM where t has a column with a 
STRUCT?



Based on

http://grokbase.com/t/hive/user/109r87hh3e/insert-data-into-a-column-of-complex-type



I thought perhaps the following would work:



echo 1,2 twovalues.csv

hive

CREATE TABLE tc (x INT, y INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH 'twovalues.csv' INTO TABLE tc;

CREATE TABLE oc (z STRUCTa: int, b: int);

INSERT INTO TABLE oc SELECT struct(x,y) FROM tc;



but when I do the above I get:



FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target 
table because column number/types are different 'oc': Cannot convert column 0 
from structcol1:int,col2:int to structa:int,b:int.







-- 
Dean Wampler, Ph.D.thinkbiganalytics.com+1-312-339-1330





[jira] [Commented] (AVRO-1035) Add the possibility to append to existing avro files

2013-02-07 Thread Michael Malak (JIRA)

[ 
https://issues.apache.org/jira/browse/AVRO-1035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13573711#comment-13573711
 ] 

Michael Malak commented on AVRO-1035:
-

ha...@cloudera.com has provided example code on how to accomplish HDFS Avro 
append at https://gist.github.com/QwertyManiac/4724582

 Add the possibility to append to existing avro files  
 --

 Key: AVRO-1035
 URL: https://issues.apache.org/jira/browse/AVRO-1035
 Project: Avro
  Issue Type: New Feature
Reporter: Vyacheslav Zholudev

 Currently it is not possible to append to avro files that were written and 
 closed. 
 Here is a Scott Carey's reply on the mailing list:
 {quote}
 It is not possible without modifying DataFileWriter. Please open a JIRA
 ticket.  
 It could not simply append to an OutputStream, since it must either:
 * Seek to the start to validate the schemas match and find the sync
 marker, or
 * Trust that the schemas match and find the sync marker from the last block
 DataFileWriter cannot refer to Hadoop classes such as FileSystem, but we
 could add something to the mapred module that takes a Path and FileSystem
 and returns
 something that implemements an interface that DataFileWriter can append
 to.  This would be something that is both a
 http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/SeekableInp
 ut.html
 and an OutputStream, or has both an InputStream from the start of the
 existing file and an OutputStream at the end.
 {quote}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Is it possible to append to an already existing avro file

2013-02-07 Thread Michael Malak
I confess to being a user of rather than a developer of open source, but 
perhaps you could elaborate on what depends on means and what the 
consequences are?

Isn't it -- or couldn't it be made -- a run-time binding, so that only those 
who try to use the HDFS append functionality would be required to also include 
the HDFS Jars in their classpath?

Or is the issue more of a bookkeeping one, whereby every update to HDFS will 
require an Avro regression test?

Now that Hive supports Avro as of the Jan. 11 release of Hive 0.10, the use 
case of ingesting data into Avro on HDFS is only going to get more popular, and 
appending is very handy for ingesting, especially for live real-time or 
near-real-time data.  So it seems to me that if the inconveniences are minor or 
can be worked around, that Avro indeed should perhaps depend on HDFS.

--- On Thu, 2/7/13, Harsh J ha...@cloudera.com wrote:

 From: Harsh J ha...@cloudera.com
 Subject: Re: Is it possible to append to an already existing avro file
 To: user@avro.apache.org
 Date: Thursday, February 7, 2013, 9:28 AM
 I assume by non-trivial you meant the
 extra Seekable stuff I needed to
 wrap around the DFS output streams to let Avro take it as
 append-able?
 I don't think its possible for Avro to carry it since Avro
 (core) does
 not reverse-depend on Hadoop. Should we document it
 somewhere though?
 Do you have any ideas on the best place to do that?
 
 On Thu, Feb 7, 2013 at 6:12 AM, Michael Malak michaelma...@yahoo.com
 wrote:
  Thanks so much for the code -- it works great!
 
  Since it is a non-trivial amount of code required to
  achieve append, I suggest attaching that code to AVRO-1035,
  in the hopes that someone will come up with an interface
  that requires just one line of user code to achieve append.
 
  --- On Wed, 2/6/13, Harsh J ha...@cloudera.com
 wrote:
 
  From: Harsh J ha...@cloudera.com
  Subject: Re: Is it possible to append to an already existing avro file
  To: user@avro.apache.org
  Date: Wednesday, February 6, 2013, 11:17 AM
  Hey Michael,
 
  It does implement the regular Java OutputStream interface,
  as seen in
  the API: 
  http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FSDataOutputStream.html.
 
  Here's a sample program that works on Hadoop 2.x in my
  tests:
  https://gist.github.com/QwertyManiac/4724582



Re: Is it possible to append to an already existing avro file

2013-02-06 Thread Michael Malak
Thanks so much for the code -- it works great!

Since it is a non-trivial amount of code required to achieve append, I suggest 
attaching that code to AVRO-1035, in the hopes that someone will come up with 
an interface that requires just one line of user code to achieve append.

--- On Wed, 2/6/13, Harsh J ha...@cloudera.com wrote:

 From: Harsh J ha...@cloudera.com
 Subject: Re: Is it possible to append to an already existing avro file
 To: user@avro.apache.org
 Date: Wednesday, February 6, 2013, 11:17 AM
 Hey Michael,
 
 It does implement the regular Java OutputStream interface,
 as seen in
 the API: 
 http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FSDataOutputStream.html.
 
 Here's a sample program that works on Hadoop 2.x in my
 tests:
 https://gist.github.com/QwertyManiac/4724582
 
 On Wed, Feb 6, 2013 at 9:00 AM, Michael Malak michaelma...@yahoo.com
 wrote:
  I don't believe a Hadoop FileSystem is a Java
 OutputStream?
 
  --- On Tue, 2/5/13, Doug Cutting cutt...@apache.org
 wrote:
 
  From: Doug Cutting cutt...@apache.org
  Subject: Re: Is it possible to append to an already
 existing avro file
  To: user@avro.apache.org
  Date: Tuesday, February 5, 2013, 5:27 PM
  It will work on an OutputStream that
  supports append.
 
  http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(org.apache.avro.file.SeekableInput,
  java.io.OutputStream)
 
  So it depends on how well HDFS implements
  FileSystem#append(), not on
  any changes in Avro.
 
  http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append(org.apache.hadoop.fs.Path)
 
  I have no recent personal experience with append
 in
  HDFS.  Does anyone
  else here?
 
  Doug
 
  On Tue, Feb 5, 2013 at 4:10 PM, Michael Malak
 michaelma...@yahoo.com
  wrote:
   My understanding is that will append to a file
 on the
  local filesystem, but not to a file on HDFS.
  
   --- On Tue, 2/5/13, Doug Cutting cutt...@apache.org
  wrote:
  
   From: Doug Cutting cutt...@apache.org
   Subject: Re: Is it possible to append to
 an already
  existing avro file
   To: user@avro.apache.org
   Date: Tuesday, February 5, 2013, 5:08 PM
   The Jira is:
  
   https://issues.apache.org/jira/browse/AVRO-1035
  
   It is possible to append to an existing
 Avro file:
  
   http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(java.io.File)
  
   Should we close that issue as fixed?
  
   Doug
  
   On Fri, Feb 1, 2013 at 11:32 AM, Michael
 Malak
  michaelma...@yahoo.com
   wrote:
Was a JIRA ticket ever created
 regarding
  appending to
   an existing Avro file on HDFS?
   
What is the status of such a
 capability, a
  year out
   from when the issue below was raised?
   
On Wed, 22 Feb 2012 10:57:48 +0100,
  Vyacheslav
   Zholudev vyacheslav.zholu...@gmail.com
   wrote:
   
Thanks for your reply, I
 suspected this.
   
I will create a JIRA ticket.
   
Vyacheslav
   
On Feb 21, 2012, at 6:02 PM,
 Scott Carey
  wrote:
   
   
On 2/21/12 7:29 AM,
 Vyacheslav
  Zholudev
   vyacheslav.zholu...@gmail.com
wrote:
   
Yep, I saw that method as
 well as
  the
   stackoverflow post. However, I'm
interested how to append
 to a file
  on the
   arbitrary file system, not
only on the local one.
   
I want to get an
 OutputStream
  based on the
   Path and the FileSystem
implementation and then
 pass it
  for
   appending to avro methods.
   
Is that possible?
   
It is not possible without
 modifying
   DataFileWriter. Please open a JIRA
ticket.
   
It could not simply append to
 an
  OutputStream,
   since it must either:
* Seek to the start to
 validate the
  schemas
   match and find the sync
marker, or
* Trust that the schemas
 match and
  find the
   sync marker from the last
block
   
DataFileWriter cannot refer
 to Hadoop
  classes
   such as FileSystem, but we
could add something to the
 mapred
  module that
   takes a Path and
FileSystem and returns
 something that
   implemements an interface that
DataFileWriter can append
 to.
  This would
   be something that is both a
http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/SeekableInput.html
and an OutputStream, or has
 both an
  InputStream
   from the start of the
existing file and an
 OutputStream at
  the end.
   
Thanks,
Vyacheslav
   
On Feb 21, 2012, at 5:29
 AM, Harsh
  J
   wrote:
   
Hi,
   
Use the appendTo
 feature of
  the
   DataFileWriter. See
   
http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(java.io.File)
   
For a quick setup
 example,
  read also:
   
http://stackoverflow.com/questions/8806689/can-you-append-data-to-an-existing-avro-data-file
   
On Tue, Feb 21, 2012
 at 3:15
  AM,
   Vyacheslav Zholudev
vyacheslav.zholu...@gmail.com
   wrote:
Hi,
   
is it possible to
 append

Re: Is it possible to append to an already existing avro file

2013-02-05 Thread Michael Malak
I don't believe a Hadoop FileSystem is a Java OutputStream?

--- On Tue, 2/5/13, Doug Cutting cutt...@apache.org wrote:

 From: Doug Cutting cutt...@apache.org
 Subject: Re: Is it possible to append to an already existing avro file
 To: user@avro.apache.org
 Date: Tuesday, February 5, 2013, 5:27 PM
 It will work on an OutputStream that
 supports append.
 
 http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(org.apache.avro.file.SeekableInput,
 java.io.OutputStream)
 
 So it depends on how well HDFS implements
 FileSystem#append(), not on
 any changes in Avro.
 
 http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append(org.apache.hadoop.fs.Path)
 
 I have no recent personal experience with append in
 HDFS.  Does anyone
 else here?
 
 Doug
 
 On Tue, Feb 5, 2013 at 4:10 PM, Michael Malak michaelma...@yahoo.com
 wrote:
  My understanding is that will append to a file on the
 local filesystem, but not to a file on HDFS.
 
  --- On Tue, 2/5/13, Doug Cutting cutt...@apache.org
 wrote:
 
  From: Doug Cutting cutt...@apache.org
  Subject: Re: Is it possible to append to an already
 existing avro file
  To: user@avro.apache.org
  Date: Tuesday, February 5, 2013, 5:08 PM
  The Jira is:
 
  https://issues.apache.org/jira/browse/AVRO-1035
 
  It is possible to append to an existing Avro file:
 
  http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(java.io.File)
 
  Should we close that issue as fixed?
 
  Doug
 
  On Fri, Feb 1, 2013 at 11:32 AM, Michael Malak
 michaelma...@yahoo.com
  wrote:
   Was a JIRA ticket ever created regarding
 appending to
  an existing Avro file on HDFS?
  
   What is the status of such a capability, a
 year out
  from when the issue below was raised?
  
   On Wed, 22 Feb 2012 10:57:48 +0100,
 Vyacheslav
  Zholudev vyacheslav.zholu...@gmail.com
  wrote:
  
   Thanks for your reply, I suspected this.
  
   I will create a JIRA ticket.
  
   Vyacheslav
  
   On Feb 21, 2012, at 6:02 PM, Scott Carey
 wrote:
  
  
   On 2/21/12 7:29 AM, Vyacheslav
 Zholudev
  vyacheslav.zholu...@gmail.com
   wrote:
  
   Yep, I saw that method as well as
 the
  stackoverflow post. However, I'm
   interested how to append to a file
 on the
  arbitrary file system, not
   only on the local one.
  
   I want to get an OutputStream
 based on the
  Path and the FileSystem
   implementation and then pass it
 for
  appending to avro methods.
  
   Is that possible?
  
   It is not possible without modifying
  DataFileWriter. Please open a JIRA
   ticket.
  
   It could not simply append to an
 OutputStream,
  since it must either:
   * Seek to the start to validate the
 schemas
  match and find the sync
   marker, or
   * Trust that the schemas match and
 find the
  sync marker from the last
   block
  
   DataFileWriter cannot refer to Hadoop
 classes
  such as FileSystem, but we
   could add something to the mapred
 module that
  takes a Path and
   FileSystem and returns something that
  implemements an interface that
   DataFileWriter can append to. 
 This would
  be something that is both a
   http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/SeekableInput.html
   and an OutputStream, or has both an
 InputStream
  from the start of the
   existing file and an OutputStream at
 the end.
  
   Thanks,
   Vyacheslav
  
   On Feb 21, 2012, at 5:29 AM, Harsh
 J
  wrote:
  
   Hi,
  
   Use the appendTo feature of
 the
  DataFileWriter. See
  
   http://avro.apache.org/docs/1.6.2/api/java/org/apache/avro/file/DataFileWriter.html#appendTo(java.io.File)
  
   For a quick setup example,
 read also:
  
   http://stackoverflow.com/questions/8806689/can-you-append-data-to-an-existing-avro-data-file
  
   On Tue, Feb 21, 2012 at 3:15
 AM,
  Vyacheslav Zholudev
   vyacheslav.zholu...@gmail.com
  wrote:
   Hi,
  
   is it possible to append
 to an
  already existing avro file when it was
   written and closed
 before?
  
   If I use
   outputStream =
  fs.append(avroFilePath);
  
   then later on I get:
  java.io.IOException: Invalid sync!
  
   Probably because the
 schema is
  written twice and some other issues.
  
   If I use outputStream =
  fs.create(avroFilePath); then the avro file
   gets
   overwritten.
  
   Thanks,
   Vyacheslav
  
   --
   Harsh J
   Customer Ops. Engineer
   Cloudera | http://tiny.cloudera.com/about
  
 
  On Fri, Feb 1, 2013 at 11:32 AM, Michael Malak
 michaelma...@yahoo.com
  wrote:
   Was a JIRA ticket ever created regarding
 appending to
  an existing Avro file on HDFS?
  
   What is the status of such a capability, a
 year out
  from when the issue below was raised?
  
   On Wed, 22 Feb 2012 10:57:48 +0100,
 Vyacheslav
  Zholudev vyacheslav.zholu...@gmail.com
  wrote:
  
   Thanks for your reply, I suspected this.
  
   I will create a JIRA ticket.
  
   Vyacheslav
  
   On Feb 21, 2012, at 6:02 PM, Scott Carey
 wrote:
  
  
   On 2/21/12 7:29 AM, Vyacheslav

Hard-coded inline relations

2013-01-18 Thread Michael Malak
I'm new to Pig, and it looks like there is no provision to declare relations 
inline in a Pig script (without LOADing from an external file)?

Based on
http://pig.apache.org/docs/r0.7.0/piglatin_ref2.html#Constants
I would have thought the following would constitute Hello World for Pig:

A = {('Hello'),('World')};
DUMP A;

But I get a syntax error.  The ability to inline relations would be useful for 
debugging.  Is this limitation by design, or is it just not implemented yet?