Re: Alternatives for dataframe collectAsList()

2017-04-04 Thread lucas.g...@gmail.com
As Keith said, it depends on what you want to do with your data.

>From a pipelining perspective the general flow (YMMV) is:

Load dataset(s) -> Transform and / or Join --> Aggregate --> Write dataset

Each step in the pipeline does something distinct with the data.

The end step is usually loading the final data into something that can
display / query it (IE a DBMS of some sort)

That's where you'd start doing your queries etc.

There's generally no 'good' IMO reason to be collecting your data on the
driver except for testing / validation / exploratory work.

I hope that helps.

Gary Lucas

On 4 April 2017 at 12:12, Keith Chapman  wrote:

> As Paul said it really depends on what you want to do with your data,
> perhaps writing it to a file would be a better option, but again it depends
> on what you want to do with the data you collect.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Tue, Apr 4, 2017 at 7:38 AM, Eike von Seggern <
> eike.segg...@sevenval.com> wrote:
>
>> Hi,
>>
>> depending on what you're trying to achieve `RDD.toLocalIterator()` might
>> help you.
>>
>> Best
>>
>> Eike
>>
>>
>> 2017-03-29 21:00 GMT+02:00 szep.laszlo.it :
>>
>>> Hi,
>>>
>>> after I created a dataset
>>>
>>> Dataset df = sqlContext.sql("query");
>>>
>>> I need to have a result values and I call a method: collectAsList()
>>>
>>> List list = df.collectAsList();
>>>
>>> But it's very slow, if I work with large datasets (20-30 million
>>> records). I
>>> know, that the result isn't presented in driver app, that's why it takes
>>> long time, because collectAsList() collect all data from worker nodes.
>>>
>>> But then what is the right way to get result values? Is there an other
>>> solution to iterate over a result dataset rows, or get values? Can anyone
>>> post a small & working example?
>>>
>>> Thanks & Regards,
>>> Laszlo Szep
>>>
>>
>


Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-12 Thread lucas.g...@gmail.com
"Building data products is a very different discipline from that of
building software."

That is a fundamentally incorrect assumption.

There will always be a need for figuring out how to apply said principles,
but saying 'we're different' has always turned out to be incorrect and I
have seen no reason to think otherwise for data products.

At some point it always comes down to 'how do I get this to my customer, in
a reliable and repeatable fashion'.  The CI/CD patterns that we've come to
rely on are designed to optimize that process.

I have seen no evidence that 'data products' don't benefit from those
practices and I have definitely seen evidence that not following those
patterns has had substantial costs.

Of course there's always a balancing act in the early phases of discovery,
but at some point the needle swings from: "Do I have a valuable product"
to: "How do I get this to customers"

Gary Lucas

On 12 April 2017 at 10:46, Steve Loughran  wrote:

>
> On 12 Apr 2017, at 17:25, Gourav Sengupta 
> wrote:
>
> Hi,
>
> Your answer is like saying, I know how to code in assembly level language
> and I am going to build the next GUI in assembly level code and I think
> that there is a genuine functional requirement to see a color of a button
> in green on the screen.
>
>
> well, I reserve the right to have incomplete knowledge, and look forward
> to improving it.
>
> Perhaps it may be pertinent to read the first preface of a CI/ CD book and
> realize to what kind of software development disciplines is it applicable
> to.
>
>
> the original introduction on CI was probably Fowler's Cruise Control
> article,
> https://martinfowler.com/articles/originalContinuousIntegration.html
>
> "The key is to automate absolutely everything and run the process so often
> that integration errors are found quickly"
>
> Java Development with Ant, 2003, looks at Cruise Control, Anthill and
> Gump, again, with that focus on team coding and automated regression
> testing, both of unit tests, and, with things like HttpUnit, web UIs.
> There's no discussion of "Data" per-se, though databases are implicit.
>
> Apache Gump [Sam Ruby, 2001] was designed to address a single problem "get
> the entire ASF project portfolio to build and test against the latest build
> of everything else". Lots of finger pointing there, especially when
> something foundational like Ant or Xerces did bad.
>
> AFAIK, The earliest known in-print reference to Continuous Deployme3nt is
> the HP Labs 2002 paper, *Making Web Services that Work*. That introduced
> the concept with a focus on automating deployment, staging testing and
> treating ops problems as use cases for which engineers could often write
> tests for, and, perhaps, even design their applications to support. "We are
> exploring extending this model to one we term Continuous Deployment —after
> passing the local test suite, a service can be automatically deployed to a
> public staging server for stress and acceptance testing by physically
> remote calling parties"
>
> At this time, the applications weren't modern "big data" apps as they
> didn't have affordable storage or the tools to schedule work over it. It
> wasn't that the people writing the books and papers looked at big data and
> said "not for us", it just wasn't on their horizons. 1TB was a lot of
> storage in those days, not a high-end SSD.
>
> Otherwise your approach is just another line of defense in saving your job
> by applying an impertinent, incorrect, and outdated skill and tool to a
> problem.
>
>
> please be a bit more constructive here, the ASF code of conduct encourages
> empathy and coillaboration. https://www.apache.org/foundation/
> policies/conduct . Thanks.,
>
>
> Building data products is a very different discipline from that of
> building software.
>
>
> Which is why we ned to consider how to take what are core methodologies
> for software and apply them, and, where appropriate, supercede them with
> new workflows, ideas, technologies. But doing so with an understanding of
> the reasoning behind today's tools and workflows. I'm really interested in
> how do we get from experimental notebook code to something usable in
> production, pushing it out, finding the dirty-data-problems before it goes
> live, etc, etc. I do think today's tools have been outgrown by the
> applications we now build, and am thinking not so much "which tools to
> use', but one step further, "what are the new tools and techniques to
> use?".
>
> I look forward to whatever insight people have here.
>
>
> My genuine advice to everyone in all spheres of activities will be to
> first understand the problem to solve before solving it and definitely
> before selecting the tools to solve it, otherwise you will land up with a
> bowl of soup and fork in hand and argue that CI/ CD is still applicable to
> building data products and data warehousing.
>
>
> I concur
>
> Regards,
> Gourav
>
>
> -Steve
>
> On 

Re: Does spark 2.1.0 structured streaming support jdbc sink?

2017-04-09 Thread lucas.g...@gmail.com
Interesting, does anyone know if we'll be seeing the JDBC sinks in upcoming
releases?

Thanks!

Gary Lucas

On 9 April 2017 at 13:52, Silvio Fiorito 
wrote:

> JDBC sink is not in 2.1. You can see here for an example implementation
> using the ForEachWriter sink instead: https://databricks.com/blog/20
> 17/04/04/real-time-end-to-end-integration-with-apache-kafka-
> in-apache-sparks-structured-streaming.html
>
>
>
>
>
> *From: *Hemanth Gudela 
> *Date: *Sunday, April 9, 2017 at 4:30 PM
> *To: *"user@spark.apache.org" 
> *Subject: *Does spark 2.1.0 structured streaming support jdbc sink?
>
>
>
> Hello Everyone,
>
> I am new to Spark, especially spark streaming.
>
>
>
> I am trying to read an input stream from Kafka, perform windowed
> aggregations in spark using structured streaming, and finally write
> aggregates to a sink.
>
> -  MySQL as an output sink doesn’t seem to be an option, because
> this block of code throws an error
>
> streamingDF.writeStream.format("jdbc").start("jdbc:mysql…”)
>
> *ava.lang.UnsupportedOperationException*: Data source jdbc does not
> support streamed writing
>
> This is strange because, this
> 
> document shows that jdbc is supported as an output sink!
>
>
>
> -  Parquet doesn’t seem to be an option, because it doesn’t
> support “complete” output mode, but “append” only. As I’m preforming
> windows aggregations in spark streaming, the output mode has to be
> complete, and cannot be “append”
>
>
>
> -  Memory and console sinks are good for debugging, but are not
> suitable for production jobs.
>
>
>
> So, please correct me if I’m missing something in my code to enable jdbc
> output sink.
>
> If jdbc output sink is not option, please suggest me an alternative output
> sink that suits my needs better.
>
>
>
> Or since structured streaming is still ‘alpha’, should I resort to spark
> dstreams to achieve my use case described above.
>
> Please suggest.
>
>
>
> Thanks in advance,
>
> Hemanth
>


Re: SPIP: Spark on Kubernetes

2017-08-15 Thread lucas.g...@gmail.com
>From our perspective, we have invested heavily in Kubernetes as our cluster
manager of choice.

We also make quite heavy use of spark.  We've been experimenting with using
these builds (2.1 with pyspark enabled) quite heavily.  Given that we've
already 'paid the price' to operate Kubernetes in AWS it seems rational to
move our jobs over to spark on k8s.  Having this project merged into the
master will significantly ease keeping our Data Munging toolchain primarily
on Spark.


Gary Lucas
Data Ops Team Lead
Unbounce

On 15 August 2017 at 15:52, Andrew Ash  wrote:

> +1 (non-binding)
>
> We're moving large amounts of infrastructure from a combination of open
> source and homegrown cluster management systems to unify on Kubernetes and
> want to bring Spark workloads along with us.
>
> On Tue, Aug 15, 2017 at 2:29 PM, liyinan926  wrote:
>
>> +1 (non-binding)
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-developers
>> -list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22164.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>


Re: With 2.2.0 PySpark is now available for pip install from PyPI :)

2017-07-12 Thread lucas.g...@gmail.com
That's great!



On 12 July 2017 at 12:41, Felix Cheung  wrote:

> Awesome! Congrats!!
>
> --
> *From:* holden.ka...@gmail.com  on behalf of
> Holden Karau 
> *Sent:* Wednesday, July 12, 2017 12:26:00 PM
> *To:* user@spark.apache.org
> *Subject:* With 2.2.0 PySpark is now available for pip install from PyPI
> :)
>
> Hi wonderful Python + Spark folks,
>
> I'm excited to announce that with Spark 2.2.0 we finally have PySpark
> published on PyPI (see https://pypi.python.org/pypi/pyspark /
> https://twitter.com/holdenkarau/status/885207416173756417). This has been
> a long time coming (previous releases included pip installable artifacts
> that for a variety of reasons couldn't be published to PyPI). So if you (or
> your friends) want to be able to work with PySpark locally on your laptop
> you've got an easier path getting started (pip install pyspark).
>
> If you are setting up a standalone cluster your cluster will still need
> the "full" Spark packaging, but the pip installed PySpark should be able to
> work with YARN or an existing standalone cluster installation (of the same
> version).
>
> Happy Sparking y'all!
>
> Holden :)
>
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>


Re: Flatten JSON to multiple columns in Spark

2017-07-18 Thread lucas.g...@gmail.com
I've been wondering about this for awhile.

We wanted to do something similar for generically saving thousands of
individual homogenous events into well formed parquet.

Ultimately I couldn't find something I wanted to own and pushed back on the
requirements.

It seems the canonical answer is that you need to 'own' the schema of the
json and parse it out manually and into your dataframe.  There's nothing
challenging about it.  Just verbose code.  If you're 'info' is a consistent
schema then you'll be fine.  For us it was 12 wildly diverging schemas and
I didn't want to own the transforms.

I also recommend persisting anything that isn't part of your schema in an
'extras field'  So when you parse out your json, if you've got anything
leftover drop it in there for later analysis.

I can provide some sample code but I think it's pretty straightforward /
you can google it.

What you can't seem to do efficiently is dynamically generate a dataframe
from random JSON.


On 18 July 2017 at 01:57, Chetan Khatri  wrote:

> Implicit tried - didn't worked!
>
> from_json - didnt support spark 2.0.1 any alternate solution would be
> welcome please
>
>
> On Tue, Jul 18, 2017 at 12:18 PM, Georg Heiler 
> wrote:
>
>> You need to have spark implicits in scope
>> Richard Xin  schrieb am Di. 18. Juli
>> 2017 um 08:45:
>>
>>> I believe you could use JOLT (bazaarvoice/jolt
>>> ) to flatten it to a json string
>>> and then to dataframe or dataset.
>>>
>>> bazaarvoice/jolt
>>>
>>> jolt - JSON to JSON transformation library written in Java.
>>> 
>>>
>>>
>>>
>>>
>>> On Monday, July 17, 2017, 11:18:24 PM PDT, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>
>>> Explode is not working in this scenario with error - string cannot be
>>> used in explore either array or map in spark
>>> On Tue, Jul 18, 2017 at 11:39 AM, 刘虓  wrote:
>>>
>>> Hi,
>>> have you tried to use explode?
>>>
>>> Chetan Khatri  于2017年7月18日 周二下午2:06写道:
>>>
>>> Hello Spark Dev's,
>>>
>>> Can you please guide me, how to flatten JSON to multiple columns in
>>> Spark.
>>>
>>> *Example:*
>>>
>>> Sr No Title ISBN Info
>>> 1 Calculus Theory 1234567890
>>>
>>> [{"cert":[{
>>> "authSbmtr":"009415da-c8cd- 418d-869e-0a19601d79fa",
>>> 009415da-c8cd-418d-869e- 0a19601d79fa
>>> "certUUID":"03ea5a1a-5530- 4fa3-8871-9d1ebac627c4",
>>>
>>> "effDt":"2016-05-06T15:04:56. 279Z",
>>>
>>>
>>> "fileFmt":"rjrCsv","status":" live"}],
>>>
>>> "expdCnt":"15",
>>> "mfgAcctNum":"531093",
>>>
>>> "oUUID":"23d07397-4fbe-4897- 8a18-b79c9f64726c",
>>>
>>>
>>> "pgmRole":["RETAILER"],
>>> "pgmUUID":"1cb5dd63-817a-45bc- a15c-5660e4accd63",
>>> "regUUID":"cc1bd898-657d-40dc- af5d-4bf1569a1cc4",
>>> "rtlrsSbmtd":["009415da-c8cd- 418d-869e-0a19601d79fa"]}]
>>>
>>> I want to get single row with 11 columns.
>>>
>>> Thanks.
>>>
>>>
>


Re: Flatten JSON to multiple columns in Spark

2017-07-18 Thread lucas.g...@gmail.com
That's a great link Michael, thanks!

For us it was around attempting to provide for dynamic schemas which is a
bit of an anti-pattern.

Ultimately it just comes down to owning your transforms, all the basic
tools are there.



On 18 July 2017 at 11:03, Michael Armbrust <mich...@databricks.com> wrote:

> Here is an overview of how to work with complex JSON in Spark:
> https://databricks.com/blog/2017/02/23/working-complex-data-formats-
> structured-streaming-apache-spark-2-1.html (works in streaming and batch)
>
> On Tue, Jul 18, 2017 at 10:29 AM, Riccardo Ferrari <ferra...@gmail.com>
> wrote:
>
>> What's against:
>>
>> df.rdd.map(...)
>>
>> or
>>
>> dataset.foreach()
>>
>> https://spark.apache.org/docs/2.0.1/api/scala/index.html#org
>> .apache.spark.sql.Dataset@foreach(f:T=>Unit):Unit
>>
>> Best,
>>
>> On Tue, Jul 18, 2017 at 6:46 PM, lucas.g...@gmail.com <
>> lucas.g...@gmail.com> wrote:
>>
>>> I've been wondering about this for awhile.
>>>
>>> We wanted to do something similar for generically saving thousands of
>>> individual homogenous events into well formed parquet.
>>>
>>> Ultimately I couldn't find something I wanted to own and pushed back on
>>> the requirements.
>>>
>>> It seems the canonical answer is that you need to 'own' the schema of
>>> the json and parse it out manually and into your dataframe.  There's
>>> nothing challenging about it.  Just verbose code.  If you're 'info' is a
>>> consistent schema then you'll be fine.  For us it was 12 wildly diverging
>>> schemas and I didn't want to own the transforms.
>>>
>>> I also recommend persisting anything that isn't part of your schema in
>>> an 'extras field'  So when you parse out your json, if you've got anything
>>> leftover drop it in there for later analysis.
>>>
>>> I can provide some sample code but I think it's pretty straightforward /
>>> you can google it.
>>>
>>> What you can't seem to do efficiently is dynamically generate a
>>> dataframe from random JSON.
>>>
>>>
>>> On 18 July 2017 at 01:57, Chetan Khatri <chetan.opensou...@gmail.com>
>>> wrote:
>>>
>>>> Implicit tried - didn't worked!
>>>>
>>>> from_json - didnt support spark 2.0.1 any alternate solution would be
>>>> welcome please
>>>>
>>>>
>>>> On Tue, Jul 18, 2017 at 12:18 PM, Georg Heiler <
>>>> georg.kf.hei...@gmail.com> wrote:
>>>>
>>>>> You need to have spark implicits in scope
>>>>> Richard Xin <richardxin...@yahoo.com.invalid> schrieb am Di. 18. Juli
>>>>> 2017 um 08:45:
>>>>>
>>>>>> I believe you could use JOLT (bazaarvoice/jolt
>>>>>> <https://github.com/bazaarvoice/jolt>) to flatten it to a json
>>>>>> string and then to dataframe or dataset.
>>>>>>
>>>>>> bazaarvoice/jolt
>>>>>>
>>>>>> jolt - JSON to JSON transformation library written in Java.
>>>>>> <https://github.com/bazaarvoice/jolt>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Monday, July 17, 2017, 11:18:24 PM PDT, Chetan Khatri <
>>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>>
>>>>>>
>>>>>> Explode is not working in this scenario with error - string cannot be
>>>>>> used in explore either array or map in spark
>>>>>> On Tue, Jul 18, 2017 at 11:39 AM, 刘虓 <ipf...@gmail.com> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> have you tried to use explode?
>>>>>>
>>>>>> Chetan Khatri <chetan.opensou...@gmail.com> 于2017年7月18日 周二下午2:06写道:
>>>>>>
>>>>>> Hello Spark Dev's,
>>>>>>
>>>>>> Can you please guide me, how to flatten JSON to multiple columns in
>>>>>> Spark.
>>>>>>
>>>>>> *Example:*
>>>>>>
>>>>>> Sr No Title ISBN Info
>>>>>> 1 Calculus Theory 1234567890
>>>>>>
>>>>>> [{"cert":[{
>>>>>> "authSbmtr":"009415da-c8cd- 418d-869e-0a19601d79fa",
>>>>>> 009415da-c8cd-418d-869e- 0a19601d79fa
>>>>>> "certUUID":"03ea5a1a-5530- 4fa3-8871-9d1ebac627c4",
>>>>>>
>>>>>> "effDt":"2016-05-06T15:04:56. 279Z",
>>>>>>
>>>>>>
>>>>>> "fileFmt":"rjrCsv","status":" live"}],
>>>>>>
>>>>>> "expdCnt":"15",
>>>>>> "mfgAcctNum":"531093",
>>>>>>
>>>>>> "oUUID":"23d07397-4fbe-4897- 8a18-b79c9f64726c",
>>>>>>
>>>>>>
>>>>>> "pgmRole":["RETAILER"],
>>>>>> "pgmUUID":"1cb5dd63-817a-45bc- a15c-5660e4accd63",
>>>>>> "regUUID":"cc1bd898-657d-40dc- af5d-4bf1569a1cc4",
>>>>>> "rtlrsSbmtd":["009415da-c8cd- 418d-869e-0a19601d79fa"]}]
>>>>>>
>>>>>> I want to get single row with 11 columns.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>
>>>
>>
>


Re: Spark Testing Library Discussion

2017-04-25 Thread lucas.g...@gmail.com
Hi all, whoever (Sam I think) was going to do some work on doing a template
testing pipeline.  I'd love to be involved, I have a current task in my day
job (data engineer) to flesh out our testing how-to / best practices for
Spark jobs and I think I'll be doing something very similar for the next
week or 2.

I'll scrape out what i have now in the next day or so and put it up in a
gist that I can share too.

G

On 25 April 2017 at 13:04, Holden Karau  wrote:

> Urgh hangouts did something frustrating, updated link
> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe
>
> On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau 
> wrote:
>
>> The (tentative) link for those interested is https://hangouts.google.com
>> /hangouts/_/oyjvcnffejcjhi6qazf3lysypue .
>>
>> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau 
>> wrote:
>>
>>> So 14 people have said they are available on Tuesday the 25th at 1PM
>>> pacific so we will do this meeting then ( https://doodle.com/poll/69y6
>>> yab4pyf7u8bn ).
>>>
>>> Since hangouts tends to work ok on the Linux distro I'm running my
>>> default is to host this as a "hangouts-on-air" unless there are alternative
>>> ideas.
>>>
>>> I'll record the hangout and if it isn't terrible I'll post it for those
>>> who weren't able to make it (and for next time I'll include more European
>>> friendly time options - Doodle wouldn't let me update it once posted).
>>>
>>> On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau 
>>> wrote:
>>>
 Hi Spark Users (+ Some Spark Testing Devs on BCC),

 Awhile back on one of the many threads about testing in Spark there was
 some interest in having a chat about the state of Spark testing and what
 people want/need.

 So if you are interested in joining an online (with maybe an IRL
 component if enough people are SF based) chat about Spark testing please
 fill out this doodle - https://doodle.com/poll/69y6yab4pyf7u8bn

 I think reasonable topics of discussion could be:

 1) What is the state of the different Spark testing libraries in the
 different core (Scala, Python, R, Java) and extended languages (C#,
 Javascript, etc.)?
 2) How do we make these more easily discovered by users?
 3) What are people looking for in their testing libraries that we are
 missing? (can be functionality, documentation, etc.)
 4) Are there any examples of well tested open source Spark projects and
 where are they?

 If you have other topics that's awesome.

 To clarify this about libraries and best practices for people testing
 their Spark applications, and less about testing Spark's internals
 (although as illustrated by some of the libraries there is some strong
 overlap in what is required to make that work).

 Cheers,

 Holden :)

 --
 Cell : 425-233-8271 <(425)%20233-8271>
 Twitter: https://twitter.com/holdenkarau

>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271 <(425)%20233-8271>
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>


Re: Spark Testing Library Discussion

2017-04-28 Thread lucas.g...@gmail.com
Awesome, thanks.

Just reading your post

A few observations:
1) You're giving out Marius's email: "I have been lucky enough to
build this pipeline with the amazing Marius Feteanu".  A linked or
github link might be more helpful.

2) "If you are in Pyspark world sadly Holden’s test base wont work so
I suggest you check out Pytest and pytest-bdd.".  doesn't read well to
me, on first read I was wondering if Spark-Test-Base wasn't available
in python... It took me about 20 seconds to figure out that you
probably meant it doesn't allow for direct BDD semantics.  My 2nd
observation here is that BDD semantics can be aped in any given
testing framework.  You just need to be flexible :)

3) You're doing a transformation (IE JSON input against a JSON
schema).  You are testing for # of rows which is a good start.  But I
don't think that really exercises a test against your JSON schema. I
tend to view schema as the things that need the most rigorous testing
(it's code after all).  IE I would want to confirm that the output
matches the expected shape and values after being loaded against the
schema.

I saw a few minor spelling and grammatical issues as well.  I put a PR
into your blog for them.  I won't be offended if you squish it :)

I should be getting into our testing 'how-to' stuff this week.  I'll
scrape our org specific stuff and put it up to github this week as
well.  It'll be in python so maybe we'll get both use cases covered
with examples :)

G

On 27 April 2017 at 03:46, Sam Elamin <hussam.ela...@gmail.com> wrote:
> Hi
>
> @Lucas I certainly would love to write an integration testing library for
> workflows, I have a few ideas I would love to share with others and they are
> focused around Airflow since that is what we use
>
>
> As promised here is the first blog post in a series of posts I hope to write
> on how we build data pipelines
>
> Please feel free to retweet my original tweet and share because the more
> ideas we have the better!
>
> Feedback is always welcome!
>
> Regards
> Sam
>
> On Tue, Apr 25, 2017 at 10:32 PM, lucas.g...@gmail.com
> <lucas.g...@gmail.com> wrote:
>>
>> Hi all, whoever (Sam I think) was going to do some work on doing a
>> template testing pipeline.  I'd love to be involved, I have a current task
>> in my day job (data engineer) to flesh out our testing how-to / best
>> practices for Spark jobs and I think I'll be doing something very similar
>> for the next week or 2.
>>
>> I'll scrape out what i have now in the next day or so and put it up in a
>> gist that I can share too.
>>
>> G
>>
>> On 25 April 2017 at 13:04, Holden Karau <hol...@pigscanfly.ca> wrote:
>>>
>>> Urgh hangouts did something frustrating, updated link
>>> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe
>>>
>>> On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>>
>>>> The (tentative) link for those interested is
>>>> https://hangouts.google.com/hangouts/_/oyjvcnffejcjhi6qazf3lysypue .
>>>>
>>>> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau <hol...@pigscanfly.ca>
>>>> wrote:
>>>>>
>>>>> So 14 people have said they are available on Tuesday the 25th at 1PM
>>>>> pacific so we will do this meeting then (
>>>>> https://doodle.com/poll/69y6yab4pyf7u8bn ).
>>>>>
>>>>> Since hangouts tends to work ok on the Linux distro I'm running my
>>>>> default is to host this as a "hangouts-on-air" unless there are 
>>>>> alternative
>>>>> ideas.
>>>>>
>>>>> I'll record the hangout and if it isn't terrible I'll post it for those
>>>>> who weren't able to make it (and for next time I'll include more European
>>>>> friendly time options - Doodle wouldn't let me update it once posted).
>>>>>
>>>>> On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>>
>>>>>> Hi Spark Users (+ Some Spark Testing Devs on BCC),
>>>>>>
>>>>>> Awhile back on one of the many threads about testing in Spark there
>>>>>> was some interest in having a chat about the state of Spark testing and 
>>>>>> what
>>>>>> people want/need.
>>>>>>
>>>>>> So if you are interested in joining an online (with maybe an IRL
>>>>>> component if enough people are SF based) chat about Spark testing please
>>>>>> fill out this doo

Re: Spark Testing Library Discussion

2017-04-29 Thread lucas.g...@gmail.com
Oh interesting. I did send a PR or thought I did will check this eve.

On Apr 29, 2017 10:04 AM, "Sam Elamin" <hussam.ela...@gmail.com> wrote:

> Hi lucas
>
>
> Thanks for the detailed feedback, that's really useful!
>
> I did suggest Github but my colleague asked for an email
>
> You raise a good point with the grammar, sure I will rephrase it. I am
> more than happy to merge in the PR if you send it
>
>
> Th at said I know you can make BDD tests using any framework but I am a
> lazy developer and would rather use the framework or library defaults to
> make it easier for other devs to pick up.
>
> The number of rows is only a start correct, we can add more tests to check
> the transformed version but I was going to point that out on the future
> part of the series since this one is mainly about raw extracts.
>
>
> Thank you very much for the feedback and I will be sure to add it once I
> have more feedback
>
>
> Maybe we can create a gist of all this or even a tiny book on best
> practices if people find it useful
>
> Looking forward to the PR!
>
> Regards
> Sam
>
>
>
>
>
> On Sat, 29 Apr 2017 at 06:36, lucas.g...@gmail.com <lucas.g...@gmail.com>
> wrote:
>
>> Awesome, thanks.
>>
>> Just reading your post
>>
>> A few observations:
>> 1) You're giving out Marius's email: "I have been lucky enough to
>> build this pipeline with the amazing Marius Feteanu".  A linked or
>> github link might be more helpful.
>>
>> 2) "If you are in Pyspark world sadly Holden’s test base wont work so
>> I suggest you check out Pytest and pytest-bdd.".  doesn't read well to
>> me, on first read I was wondering if Spark-Test-Base wasn't available
>> in python... It took me about 20 seconds to figure out that you
>> probably meant it doesn't allow for direct BDD semantics.  My 2nd
>> observation here is that BDD semantics can be aped in any given
>> testing framework.  You just need to be flexible :)
>>
>> 3) You're doing a transformation (IE JSON input against a JSON
>> schema).  You are testing for # of rows which is a good start.  But I
>> don't think that really exercises a test against your JSON schema. I
>> tend to view schema as the things that need the most rigorous testing
>> (it's code after all).  IE I would want to confirm that the output
>> matches the expected shape and values after being loaded against the
>> schema.
>>
>> I saw a few minor spelling and grammatical issues as well.  I put a PR
>> into your blog for them.  I won't be offended if you squish it :)
>>
>> I should be getting into our testing 'how-to' stuff this week.  I'll
>> scrape our org specific stuff and put it up to github this week as
>> well.  It'll be in python so maybe we'll get both use cases covered
>> with examples :)
>>
>> G
>>
>> On 27 April 2017 at 03:46, Sam Elamin <hussam.ela...@gmail.com> wrote:
>> > Hi
>> >
>> > @Lucas I certainly would love to write an integration testing library
>> for
>> > workflows, I have a few ideas I would love to share with others and
>> they are
>> > focused around Airflow since that is what we use
>> >
>> >
>> > As promised here is the first blog post in a series of posts I hope to
>> write
>> > on how we build data pipelines
>> >
>> > Please feel free to retweet my original tweet and share because the more
>> > ideas we have the better!
>> >
>> > Feedback is always welcome!
>> >
>> > Regards
>> > Sam
>> >
>> > On Tue, Apr 25, 2017 at 10:32 PM, lucas.g...@gmail.com
>> > <lucas.g...@gmail.com> wrote:
>> >>
>> >> Hi all, whoever (Sam I think) was going to do some work on doing a
>> >> template testing pipeline.  I'd love to be involved, I have a current
>> task
>> >> in my day job (data engineer) to flesh out our testing how-to / best
>> >> practices for Spark jobs and I think I'll be doing something very
>> similar
>> >> for the next week or 2.
>> >>
>> >> I'll scrape out what i have now in the next day or so and put it up in
>> a
>> >> gist that I can share too.
>> >>
>> >> G
>> >>
>> >> On 25 April 2017 at 13:04, Holden Karau <hol...@pigscanfly.ca> wrote:
>> >>>
>> >>> Urgh hangouts did something frustrating, updated link
>> >>> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe
>> >>

Re: How can i remove the need for calling cache

2017-08-01 Thread lucas.g...@gmail.com
Hi Jeff, that looks sane to me.  Do you have additional details?

On 1 August 2017 at 11:05, jeff saremi  wrote:

> Calling cache/persist fails all our jobs (i have  posted 2 threads on
> this).
>
> And we're giving up hope in finding a solution.
> So I'd like to find a workaround for that:
>
> If I save an RDD to hdfs and read it back, can I use it in more than one
> operation?
>
> Example: (using cache)
> // do a whole bunch of transformations on an RDD
>
> myrdd.cache()
>
> val result1 = myrdd.map(op1(_))
>
> val result2 = myrdd.map(op2(_))
>
> // in the above I am assuming that a call to cache will prevent all
> previous transformation from being calculated twice
>
> I'd like to somehow get result1 and result2 without duplicating work. How
> can I do that?
>
> thanks
>
> Jeff
>


Spark <--> S3 flakiness

2017-05-10 Thread lucas.g...@gmail.com
Hi users, we have a bunch of pyspark jobs that are using S3 for loading /
intermediate steps and final output of parquet files.

We're running into the following issues on a semi regular basis:
* These are intermittent errors, IE we have about 300 jobs that run
nightly... And a fairly random but small-ish percentage of them fail with
the following classes of errors.


*S3 write errors*

> "ERROR Utils: Aborting task
> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS
> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS Error
> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>


> "Py4JJavaError: An error occurred while calling o43.parquet.
> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code:
> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error
> Message: One or more objects could not be deleted, S3 Extended Request ID:
> null"




*S3 Read Errors:*

> [Stage 1:=>   (27 + 4)
> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage 1.0
> (TID 11)
> java.net.SocketException: Connection reset
> at java.net.SocketInputStream.read(SocketInputStream.java:196)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> at
> org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
> at
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
> at
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
> at
> org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
> at
> org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:168)
> at
> org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
> at
> org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:187)



We have literally tons of logs we can add but it would make the email
unwieldy big.  If it would be helpful I'll drop them in a pastebin or
something.

Our config is along the lines of:

   - spark-2.1.0-bin-hadoop2.7
   - '--packages
   com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
   pyspark-shell'

Given the stack overflow / googling I've been doing I know we're not the
only org with these issues but I haven't found a good set of solutions in
those spaces yet.

Thanks!

Gary Lucas


Re: Spark <--> S3 flakiness

2017-05-11 Thread lucas.g...@gmail.com
Looks like this isn't viable in spark 2.0.0 (and greater I presume).  I'm
pretty sure I came across this blog and ignored it due to that.

Any other thoughts?  The linked tickets in:
https://issues.apache.org/jira/browse/SPARK-10063
https://issues.apache.org/jira/browse/HADOOP-13786
https://issues.apache.org/jira/browse/HADOOP-9565 look relevant too.

On 10 May 2017 at 22:24, Miguel Morales <therevolti...@gmail.com> wrote:

> Try using the DirectParquetOutputCommiter:
> http://dev.sortable.com/spark-directparquetoutputcommitter/
>
> On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
> <lucas.g...@gmail.com> wrote:
> > Hi users, we have a bunch of pyspark jobs that are using S3 for loading /
> > intermediate steps and final output of parquet files.
> >
> > We're running into the following issues on a semi regular basis:
> > * These are intermittent errors, IE we have about 300 jobs that run
> > nightly... And a fairly random but small-ish percentage of them fail with
> > the following classes of errors.
> >
> > S3 write errors
> >
> >> "ERROR Utils: Aborting task
> >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404,
> AWS
> >> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
> Error
> >> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
> >
> >
> >>
> >> "Py4JJavaError: An error occurred while calling o43.parquet.
> >> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
> Code:
> >> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
> Error
> >> Message: One or more objects could not be deleted, S3 Extended Request
> ID:
> >> null"
> >
> >
> >
> > S3 Read Errors:
> >
> >> [Stage 1:=>   (27
> + 4)
> >> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage
> 1.0
> >> (TID 11)
> >> java.net.SocketException: Connection reset
> >> at java.net.SocketInputStream.read(SocketInputStream.java:196)
> >> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> >> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
> >> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
> >> at sun.security.ssl.SSLSocketImpl.readDataRecord(
> SSLSocketImpl.java:884)
> >> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> >> at
> >> org.apache.http.impl.io.AbstractSessionInputBuffer.read(
> AbstractSessionInputBuffer.java:198)
> >> at
> >> org.apache.http.impl.io.ContentLengthInputStream.read(
> ContentLengthInputStream.java:178)
> >> at
> >> org.apache.http.impl.io.ContentLengthInputStream.read(
> ContentLengthInputStream.java:200)
> >> at
> >> org.apache.http.impl.io.ContentLengthInputStream.close(
> ContentLengthInputStream.java:103)
> >> at
> >> org.apache.http.conn.BasicManagedEntity.streamClosed(
> BasicManagedEntity.java:168)
> >> at
> >> org.apache.http.conn.EofSensorInputStream.checkClose(
> EofSensorInputStream.java:228)
> >> at
> >> org.apache.http.conn.EofSensorInputStream.close(
> EofSensorInputStream.java:174)
> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> >> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
> >> at org.apache.hadoop.fs.s3a.S3AInputStream.close(
> S3AInputStream.java:187)
> >
> >
> >
> > We have literally tons of logs we can add but it would make the email
> > unwieldy big.  If it would be helpful I'll drop them in a pastebin or
> > something.
> >
> > Our config is along the lines of:
> >
> > spark-2.1.0-bin-hadoop2.7
> > '--packages
> > com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
> > pyspark-shell'
> >
> > Given the stack overflow / googling I've been doing I know we're not the
> > only org with these issues but I haven't found a good set of solutions in
> > those spaces yet.
> >
> > Thanks!
> >
> > Gary Lucas
>


Re: Spark <--> S3 flakiness

2017-05-11 Thread lucas.g...@gmail.com
Also, and this is unrelated to the actual question... Why don't these
messages show up in the archive?

http://apache-spark-user-list.1001560.n3.nabble.com/

Ideally I'd want to post a link to our internal wiki for these questions,
but can't find them in the archive.

On 11 May 2017 at 07:16, lucas.g...@gmail.com <lucas.g...@gmail.com> wrote:

> Looks like this isn't viable in spark 2.0.0 (and greater I presume).  I'm
> pretty sure I came across this blog and ignored it due to that.
>
> Any other thoughts?  The linked tickets in: https://issues.apache.org/
> jira/browse/SPARK-10063 https://issues.apache.org/jira/browse/HADOOP-13786
>  https://issues.apache.org/jira/browse/HADOOP-9565 look relevant too.
>
> On 10 May 2017 at 22:24, Miguel Morales <therevolti...@gmail.com> wrote:
>
>> Try using the DirectParquetOutputCommiter:
>> http://dev.sortable.com/spark-directparquetoutputcommitter/
>>
>> On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
>> <lucas.g...@gmail.com> wrote:
>> > Hi users, we have a bunch of pyspark jobs that are using S3 for loading
>> /
>> > intermediate steps and final output of parquet files.
>> >
>> > We're running into the following issues on a semi regular basis:
>> > * These are intermittent errors, IE we have about 300 jobs that run
>> > nightly... And a fairly random but small-ish percentage of them fail
>> with
>> > the following classes of errors.
>> >
>> > S3 write errors
>> >
>> >> "ERROR Utils: Aborting task
>> >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404,
>> AWS
>> >> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
>> Error
>> >> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>> >
>> >
>> >>
>> >> "Py4JJavaError: An error occurred while calling o43.parquet.
>> >> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
>> Code:
>> >> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
>> Error
>> >> Message: One or more objects could not be deleted, S3 Extended Request
>> ID:
>> >> null"
>> >
>> >
>> >
>> > S3 Read Errors:
>> >
>> >> [Stage 1:=>   (27
>> + 4)
>> >> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage
>> 1.0
>> >> (TID 11)
>> >> java.net.SocketException: Connection reset
>> >> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>> >> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>> >> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>> >> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>> >> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.
>> java:884)
>> >> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>> >> at
>> >> org.apache.http.impl.io.AbstractSessionInputBuffer.read(Abst
>> ractSessionInputBuffer.java:198)
>> >> at
>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>> tLengthInputStream.java:178)
>> >> at
>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>> tLengthInputStream.java:200)
>> >> at
>> >> org.apache.http.impl.io.ContentLengthInputStream.close(Conte
>> ntLengthInputStream.java:103)
>> >> at
>> >> org.apache.http.conn.BasicManagedEntity.streamClosed(BasicMa
>> nagedEntity.java:168)
>> >> at
>> >> org.apache.http.conn.EofSensorInputStream.checkClose(EofSens
>> orInputStream.java:228)
>> >> at
>> >> org.apache.http.conn.EofSensorInputStream.close(EofSensorInp
>> utStream.java:174)
>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> >> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
>> >> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream
>> .java:187)
>> >
>> >
>> >
>> > We have literally tons of logs we can add but it would make the email
>> > unwieldy big.  If it would be helpful I'll drop them in a pastebin or
>> > something.
>> >
>> > Our config is along the lines of:
>> >
>> > spark-2.1.0-bin-hadoop2.7
>> > '--packages
>> > com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
>> > pyspark-shell'
>> >
>> > Given the stack overflow / googling I've been doing I know we're not the
>> > only org with these issues but I haven't found a good set of solutions
>> in
>> > those spaces yet.
>> >
>> > Thanks!
>> >
>> > Gary Lucas
>>
>
>


Multiple CSV libs causes issues spark 2.1

2017-05-09 Thread lucas.g...@gmail.com
>
> df = spark.sqlContext.read.csv('out/df_in.csv')
>


> 17/05/09 15:51:29 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording
> the schema version 1.2.0
> 17/05/09 15:51:29 WARN ObjectStore: Failed to get database default,
> returning NoSuchObjectException
> 17/05/09 15:51:30 WARN ObjectStore: Failed to get database global_temp,
> returning NoSuchObjectException
>


> Py4JJavaError: An error occurred while calling o72.csv.
> : java.lang.RuntimeException: Multiple sources found for csv 
> (*com.databricks.spark.csv.DefaultSource15,
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat*), please
> specify the fully qualified class name.
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:591)
> at
> org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
> at
> org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214) at
> java.lang.Thread.run(Thread.java:745)


When I change our call to:

df = spark.hiveContext.read \
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')
\
.load('df_in.csv)

No such issue, I was under the impression (obviously wrongly) that spark
would automatically pick the local lib.  We have the databricks library
because other jobs still explicitly call it.

Is the 'correct answer' to go through and modify so as to remove the
databricks lib / remove it from our deploy?  Or should this just work?

One of the things I find less helpful in the spark docs are when there's
multiple ways to do it but no clear guidance on what those methods are
intended to accomplish.

Thanks!


Re: Multiple CSV libs causes issues spark 2.1

2017-05-09 Thread lucas.g...@gmail.com
I'm a bit confused by that answer, I'm assuming it's spark deciding which
lib to use.

On 9 May 2017 at 14:30, Mark Hamstra <m...@clearstorydata.com> wrote:

> This looks more like a matter for Databricks support than spark-user.
>
> On Tue, May 9, 2017 at 2:02 PM, lucas.g...@gmail.com <lucas.g...@gmail.com
> > wrote:
>
>> df = spark.sqlContext.read.csv('out/df_in.csv')
>>>
>>
>>
>>> 17/05/09 15:51:29 WARN ObjectStore: Version information not found in
>>> metastore. hive.metastore.schema.verification is not enabled so
>>> recording the schema version 1.2.0
>>> 17/05/09 15:51:29 WARN ObjectStore: Failed to get database default,
>>> returning NoSuchObjectException
>>> 17/05/09 15:51:30 WARN ObjectStore: Failed to get database global_temp,
>>> returning NoSuchObjectException
>>>
>>
>>
>>> Py4JJavaError: An error occurred while calling o72.csv.
>>> : java.lang.RuntimeException: Multiple sources found for csv 
>>> (*com.databricks.spark.csv.DefaultSource15,
>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat*), please
>>> specify the fully qualified class name.
>>> at scala.sys.package$.error(package.scala:27)
>>> at org.apache.spark.sql.execution.datasources.DataSource$.
>>> lookupDataSource(DataSource.scala:591)
>>> at org.apache.spark.sql.execution.datasources.DataSource.
>>> providingClass$lzycompute(DataSource.scala:86)
>>> at org.apache.spark.sql.execution.datasources.DataSource.
>>> providingClass(DataSource.scala:86)
>>> at org.apache.spark.sql.execution.datasources.DataSource.
>>> resolveRelation(DataSource.scala:325)
>>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
>>> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:57)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>> at py4j.Gateway.invoke(Gateway.java:280)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:214) at
>>> java.lang.Thread.run(Thread.java:745)
>>
>>
>> When I change our call to:
>>
>> df = spark.hiveContext.read \
>> .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')
>> \
>> .load('df_in.csv)
>>
>> No such issue, I was under the impression (obviously wrongly) that spark
>> would automatically pick the local lib.  We have the databricks library
>> because other jobs still explicitly call it.
>>
>> Is the 'correct answer' to go through and modify so as to remove the
>> databricks lib / remove it from our deploy?  Or should this just work?
>>
>> One of the things I find less helpful in the spark docs are when there's
>> multiple ways to do it but no clear guidance on what those methods are
>> intended to accomplish.
>>
>> Thanks!
>>
>
>


Re: Multiple CSV libs causes issues spark 2.1

2017-05-09 Thread lucas.g...@gmail.com
>
> df = spark.sqlContext.read.csv('out/df_in.csv')
>

shouldn't this be just -
df = spark.read.csv('out/df_in.csv')
sparkSession itself is in entry point to dataframes and SQL functionality .



our bootstrap is a bit messy, in our case no.  In the general case yes.

On 9 May 2017 at 16:56, Pushkar.Gujar <pushkarvgu...@gmail.com> wrote:

> df = spark.sqlContext.read.csv('out/df_in.csv')
>>
>
> shouldn't this be just -
>
> df = spark.read.csv('out/df_in.csv')
>
> sparkSession itself is in entry point to dataframes and SQL functionality .
>
>
> Thank you,
> *Pushkar Gujar*
>
>
> On Tue, May 9, 2017 at 6:09 PM, Mark Hamstra <m...@clearstorydata.com>
> wrote:
>
>> Looks to me like it is a conflict between a Databricks library and Spark
>> 2.1. That's an issue for Databricks to resolve or provide guidance.
>>
>> On Tue, May 9, 2017 at 2:36 PM, lucas.g...@gmail.com <
>> lucas.g...@gmail.com> wrote:
>>
>>> I'm a bit confused by that answer, I'm assuming it's spark deciding
>>> which lib to use.
>>>
>>> On 9 May 2017 at 14:30, Mark Hamstra <m...@clearstorydata.com> wrote:
>>>
>>>> This looks more like a matter for Databricks support than spark-user.
>>>>
>>>> On Tue, May 9, 2017 at 2:02 PM, lucas.g...@gmail.com <
>>>> lucas.g...@gmail.com> wrote:
>>>>
>>>>> df = spark.sqlContext.read.csv('out/df_in.csv')
>>>>>>
>>>>>
>>>>>
>>>>>> 17/05/09 15:51:29 WARN ObjectStore: Version information not found in
>>>>>> metastore. hive.metastore.schema.verification is not enabled so
>>>>>> recording the schema version 1.2.0
>>>>>> 17/05/09 15:51:29 WARN ObjectStore: Failed to get database default,
>>>>>> returning NoSuchObjectException
>>>>>> 17/05/09 15:51:30 WARN ObjectStore: Failed to get database
>>>>>> global_temp, returning NoSuchObjectException
>>>>>>
>>>>>
>>>>>
>>>>>> Py4JJavaError: An error occurred while calling o72.csv.
>>>>>> : java.lang.RuntimeException: Multiple sources found for csv 
>>>>>> (*com.databricks.spark.csv.DefaultSource15,
>>>>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat*),
>>>>>> please specify the fully qualified class name.
>>>>>> at scala.sys.package$.error(package.scala:27)
>>>>>> at org.apache.spark.sql.execution.datasources.DataSource$.looku
>>>>>> pDataSource(DataSource.scala:591)
>>>>>> at org.apache.spark.sql.execution.datasources.DataSource.provid
>>>>>> ingClass$lzycompute(DataSource.scala:86)
>>>>>> at org.apache.spark.sql.execution.datasources.DataSource.provid
>>>>>> ingClass(DataSource.scala:86)
>>>>>> at org.apache.spark.sql.execution.datasources.DataSource.resolv
>>>>>> eRelation(DataSource.scala:325)
>>>>>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.sc
>>>>>> ala:152)
>>>>>> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.sca
>>>>>> la:415)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>> ssorImpl.java:57)
>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>> thodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>>>> at py4j.Gateway.invoke(Gateway.java:280)
>>>>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.j
>>>>>> ava:132)
>>>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>>> at py4j.GatewayConnection.run(GatewayConnection.java:214) at
>>>>>> java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>
>>>>> When I change our call to:
>>>>>
>>>>> df = spark.hiveContext.read \
>>>>> 
>>>>> .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')
>>>>> \
>>>>> .load('df_in.csv)
>>>>>
>>>>> No such issue, I was under the impression (obviously wrongly) that
>>>>> spark would automatically pick the local lib.  We have the databricks
>>>>> library because other jobs still explicitly call it.
>>>>>
>>>>> Is the 'correct answer' to go through and modify so as to remove the
>>>>> databricks lib / remove it from our deploy?  Or should this just work?
>>>>>
>>>>> One of the things I find less helpful in the spark docs are when
>>>>> there's multiple ways to do it but no clear guidance on what those methods
>>>>> are intended to accomplish.
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>>
>>>
>>
>


Re: [Spark Core]: Python and Scala generate different DAGs for identical code

2017-05-10 Thread lucas.g...@gmail.com
Any chance of a link to that video :)

Thanks!

G

On 10 May 2017 at 09:49, Holden Karau  wrote:

> So this Python side pipelining happens in a lot of places which can make
> debugging extra challenging. Some people work around this with persist
> which breaks the pipelining during debugging, but if your interested in
> more general Python debugging I've got a YouTube video on the topic which
> could be a good intro (of course I'm pretty biased about that).
>
> On Wed, May 10, 2017 at 9:42 AM Pavel Klemenkov 
> wrote:
>
>> Thanks for the quick answer, Holden!
>>
>> Are there any other tricks with PySpark which are hard to debug using UI
>> or toDebugString?
>>
>> On Wed, May 10, 2017 at 7:18 PM, Holden Karau 
>> wrote:
>>
>>> In PySpark the filter and then map steps are combined into a single
>>> transformation from the JVM point of view. This allows us to avoid copying
>>> the data back to Scala in between the filter and the map steps. The
>>> debugging exeperience is certainly much harder in PySpark and I think is an
>>> interesting area for those interested in contributing :)
>>>
>>> On Wed, May 10, 2017 at 7:33 AM pklemenkov  wrote:
>>>
 This Scala code:
 scala> val logs = sc.textFile("big_data_specialization/log.txt").
  | filter(x => !x.contains("INFO")).
  | map(x => (x.split("\t")(1), 1)).
  | reduceByKey((x, y) => x + y)

 generated obvious lineage:

 (2) ShuffledRDD[4] at reduceByKey at :27 []
  +-(2) MapPartitionsRDD[3] at map at :26 []
 |  MapPartitionsRDD[2] at filter at :25 []
 |  big_data_specialization/log.txt MapPartitionsRDD[1] at textFile
 at
 :24 []
 |  big_data_specialization/log.txt HadoopRDD[0] at textFile at
 :24 []

 But Python code:

 logs = sc.textFile("../log.txt")\
  .filter(lambda x: 'INFO' not in x)\
  .map(lambda x: (x.split('\t')[1], 1))\
  .reduceByKey(lambda x, y: x + y)

 generated something strange which is hard to follow:

 (2) PythonRDD[13] at RDD at PythonRDD.scala:48 []
  |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
  |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0
 []
  +-(2) PairwiseRDD[10] at reduceByKey at :1
 []
 |  PythonRDD[9] at reduceByKey at :1
 []
 |  ../log.txt MapPartitionsRDD[8] at textFile at
 NativeMethodAccessorImpl.java:0 []
 |  ../log.txt HadoopRDD[7] at textFile at
 NativeMethodAccessorImpl.java:0 []

 Why is that? Does pyspark do some optimizations under the hood? This
 debug
 string is really useless for debugging.



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Spark-Core-Python-and-Scala-
 generate-different-DAGs-for-identical-code-tp28674.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org

 --
>>> Cell : 425-233-8271 <(425)%20233-8271>
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Yours faithfully, Pavel Klemenkov.
>>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>


Re: Spark <--> S3 flakiness

2017-05-17 Thread lucas.g...@gmail.com
Steve, just to clarify:

"FWIW, if you can move up to the Hadoop 2.8 version of the S3A client it is
way better on high-performance reads, especially if you are working with
column data and can set the fs.s3a.experimental.fadvise=random option. "

Are you talking about the hadoop-aws lib or hadoop itself.  I see that
spark is currently only pre-built against hadoop 2.7.

Most of our failures are on write, the other fix I've seen advertised has
been: "fileoutputcommitter.algorithm.version=2"

Still doing some reading and will start testing in the next day or so.

Thanks!

Gary

On 17 May 2017 at 03:19, Steve Loughran <ste...@hortonworks.com> wrote:

>
> On 17 May 2017, at 06:00, lucas.g...@gmail.com wrote:
>
> Steve, thanks for the reply.  Digging through all the documentation now.
>
> Much appreciated!
>
>
>
> FWIW, if you can move up to the Hadoop 2.8 version of the S3A client it is
> way better on high-performance reads, especially if you are working with
> column data and can set the fs.s3a.experimental.fadvise=random option.
>
> That's in apache Hadoop 2.8, HDP 2.5+, and I suspect also the latest
> versions of CDH, even if their docs don't mention it
>
> https://hortonworks.github.io/hdp-aws/s3-performance/
> https://www.cloudera.com/documentation/enterprise/5-9-
> x/topics/spark_s3.html
>
>
> On 16 May 2017 at 10:10, Steve Loughran <ste...@hortonworks.com> wrote:
>
>>
>> On 11 May 2017, at 06:07, lucas.g...@gmail.com wrote:
>>
>> Hi users, we have a bunch of pyspark jobs that are using S3 for loading /
>> intermediate steps and final output of parquet files.
>>
>>
>> Please don't, not without a committer specially written to work against
>> S3 in the presence of failures.You are at risk of things going wrong and
>> you not even noticing.
>>
>> The only one that I trust to do this right now is;
>> https://github.com/rdblue/s3committer
>>
>>
>> see also : https://github.com/apache/spark/blob/master/docs/cloud-int
>> egration.md
>>
>>
>>
>> We're running into the following issues on a semi regular basis:
>> * These are intermittent errors, IE we have about 300 jobs that run
>> nightly... And a fairly random but small-ish percentage of them fail with
>> the following classes of errors.
>>
>>
>> *S3 write errors *
>>
>>> "ERROR Utils: Aborting task
>>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404,
>>> AWS Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
>>> Error Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>>>
>>
>>
>>> "Py4JJavaError: An error occurred while calling o43.parquet.
>>> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
>>> Code: 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
>>> Error Message: One or more objects could not be deleted, S3 Extended
>>> Request ID: null"
>>
>>
>>
>>
>> *S3 Read Errors: *
>>
>>> [Stage 1:=>   (27 +
>>> 4) / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage
>>> 1.0 (TID 11)
>>> java.net.SocketException: Connection reset
>>> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>> at org.apache.http.impl.io.AbstractSessionInputBuffer.read(Abst
>>> ractSessionInputBuffer.java:198)
>>> at org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>>> tLengthInputStream.java:178)
>>> at org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>>> tLengthInputStream.java:200)
>>> at org.apache.http.impl.io.ContentLengthInputStream.close(Conte
>>> ntLengthInputStream.java:103)
>>> at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicMa
>>> nagedEntity.java:168)
>>> at org.apache.http.conn.EofSensorInputStream.checkClose(EofSens
>>> orInputStream.java:228)
>>> at org.apache.http.conn.EofSensorInputStream.close(EofSensorInp
>>> utStream.java:174)
>&

Re: Spark <--> S3 flakiness

2017-05-16 Thread lucas.g...@gmail.com
Steve, thanks for the reply.  Digging through all the documentation now.

Much appreciated!



On 16 May 2017 at 10:10, Steve Loughran <ste...@hortonworks.com> wrote:

>
> On 11 May 2017, at 06:07, lucas.g...@gmail.com wrote:
>
> Hi users, we have a bunch of pyspark jobs that are using S3 for loading /
> intermediate steps and final output of parquet files.
>
>
> Please don't, not without a committer specially written to work against S3
> in the presence of failures.You are at risk of things going wrong and you
> not even noticing.
>
> The only one that I trust to do this right now is;
> https://github.com/rdblue/s3committer
>
>
> see also : https://github.com/apache/spark/blob/master/docs/cloud-
> integration.md
>
>
>
> We're running into the following issues on a semi regular basis:
> * These are intermittent errors, IE we have about 300 jobs that run
> nightly... And a fairly random but small-ish percentage of them fail with
> the following classes of errors.
>
>
> *S3 write errors *
>
>> "ERROR Utils: Aborting task
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS
>> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS Error
>> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>>
>
>
>> "Py4JJavaError: An error occurred while calling o43.parquet.
>> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
>> Code: 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
>> Error Message: One or more objects could not be deleted, S3 Extended
>> Request ID: null"
>
>
>
>
> *S3 Read Errors: *
>
>> [Stage 1:=>   (27 +
>> 4) / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage
>> 1.0 (TID 11)
>> java.net.SocketException: Connection reset
>> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>> at org.apache.http.impl.io.AbstractSessionInputBuffer.read(
>> AbstractSessionInputBuffer.java:198)
>> at org.apache.http.impl.io.ContentLengthInputStream.read(
>> ContentLengthInputStream.java:178)
>> at org.apache.http.impl.io.ContentLengthInputStream.read(
>> ContentLengthInputStream.java:200)
>> at org.apache.http.impl.io.ContentLengthInputStream.close(
>> ContentLengthInputStream.java:103)
>> at org.apache.http.conn.BasicManagedEntity.streamClosed(
>> BasicManagedEntity.java:168)
>> at org.apache.http.conn.EofSensorInputStream.checkClose(
>> EofSensorInputStream.java:228)
>> at org.apache.http.conn.EofSensorInputStream.close(
>> EofSensorInputStream.java:174)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
>> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:187)
>
>
>
> We have literally tons of logs we can add but it would make the email
> unwieldy big.  If it would be helpful I'll drop them in a pastebin or
> something.
>
> Our config is along the lines of:
>
>- spark-2.1.0-bin-hadoop2.7
>- '--packages 
> com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
>pyspark-shell'
>
>
> You should have the Hadoop 2.7 JARs on your CP, as s3a on 2.6 wasn't ready
> to play with. In particular, in a close() call it reads to the end of the
> stream, which is a performance killer on large files. That stack trace you
> see is from that same phase of operation, so should go away too.
>
> Hadoop 2.7.3 depends on Amazon SDK 1.7.4; trying to use a different one
> will probably cause link errors.
> http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/2.7.3
>
> Also: make sure Joda time >= 2.8.1 for Java 8
>
> If you go up to 2.8.0, and you still see the errors, file something
> against HADOOP in JIRA
>
>
> Given the stack overflow / googling I've been doing I know we're not the
> only org with these issues but I haven't found a good set of solutions in
> those spaces yet.
>
> Thanks!
>
> Gary Lucas
>
>
>


Re: Deciphering spark warning "Truncated the string representation of a plan since it was too large."

2017-06-12 Thread lucas.g...@gmail.com
AFAIK the process a spark program follows is:

   1. A set of transformations are defined on a given input dataset.
   2. At some point an action is called
  1. In your case this is writing to your parquet file.
   3. When that happens spark creates a logical plan and then a physical
   plan (This is largely where your transformations are optimized) to perform
   the transformations specified.
  1. This is similar to what a sql engine does, it takes your raw SQL
  and turns it into something that it can execute to get the data you
  requested.
  2. There are a set of artifacts generated, one of those artifacts
  would be the plan that you're seeing is being truncated.

The only time I'd be concerned about this would be if I was debugging the
code and needed to see what was being truncated, it is after all a debug
setting ('spark.debug.maxToStringFields')

Good luck!

Gary

On 12 June 2017 at 15:10, Henry M  wrote:

>
>
> I am trying to understand if I should be concerned about this warning:
>
> "WARN  Utils:66 - Truncated the string representation of a plan since it
> was too large. This behavior can be adjusted by setting 
> 'spark.debug.maxToStringFields'
> in SparkEnv.conf"
>
> It occurs while writing a data frame to parquet.
>
> Has any one on this list looked into this warning before and could help
> explain what it means?
>
> Thank you for your help,
> Henry
>


Re: Using SparkContext in Executors

2017-05-31 Thread lucas.g...@gmail.com
+1 to Ayan's answer, I think this is a common distributed anti pattern that
trips us all up at some point or another.

You definitely want to (in most cases) yield and create a new
RDD/Dataframe/Dataset and then perform your save operation on that.

On 28 May 2017 at 21:09, ayan guha  wrote:

> Hi
>
> You can modify your parse function to yield/emit the output record,
> instead of inserting. that way, you can essentially call .toDF to convert
> the outcome to a dataframe and then use driver's cassandra connection to
> save to cassandra (data will still in Executors, but now connector itself
> will create local connections and communicate with cassandra from
> executor).
>
> On Mon, May 29, 2017 at 8:55 AM, Stephen Boesch  wrote:
>
>> You would need to use *native* Cassandra API's in each Executor -   not
>> org.apache.spark.sql.cassandra.CassandraSQLContext -  including to create
>> a separate Cassandra connection on each Executor.
>>
>> 2017-05-28 15:47 GMT-07:00 Abdulfattah Safa :
>>
>>> So I can't run SQL queries in Executors ?
>>>
>>> On Sun, May 28, 2017 at 11:00 PM Mark Hamstra 
>>> wrote:
>>>
 You can't do that. SparkContext and SparkSession can exist only on the
 Driver.

 On Sun, May 28, 2017 at 6:56 AM, Abdulfattah Safa <
 fattah.s...@gmail.com> wrote:

> How can I use SparkContext (to create Spark Session or Cassandra
> Sessions) in executors?
> If I pass it as parameter to the foreach or foreachpartition, then it
> will have a null value.
> Shall I create a new SparkContext in each executor?
>
> Here is what I'm trying to do:
> Read a dump directory with millions of dump files as follows:
>
> dumpFiles = Directory.listFiles(dumpDirectory)
> dumpFilesRDD = sparkContext.parallize(dumpFiles, numOfSlices)
> dumpFilesRDD.foreachPartition(dumpFilePath->parse(dumpFilePath))
> .
> .
> .
>
> In parse(), each dump file is parsed and inserted into database using
> SparlSQL. In order to do that, SparkContext is needed in the function 
> parse
> to use the sql() method.
>


>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: RDD order preservation through transformations

2017-09-13 Thread lucas.g...@gmail.com
I'm wondering why you need order preserved, we've had situations where
keeping the source as an artificial field in the dataset was important and
I had to run contortions to inject that (In this case the datasource had no
unique key).

Is this similar?

On 13 September 2017 at 10:46, Suzen, Mehmet  wrote:

> But what happens if one of the partitions fail, how fault tolarence
> recover elements in other partitions.
>
> On 13 Sep 2017 18:39, "Ankit Maloo"  wrote:
>
>> AFAIK, the order of a rdd is maintained across a partition for Map
>> operations. There is no way a map operation  can change sequence across a
>> partition as partition is local and computation happens one record at a
>> time.
>>
>> On 13-Sep-2017 9:54 PM, "Suzen, Mehmet"  wrote:
>>
>> I think the order has no meaning in RDDs see this post, specially zip
>> methods:
>> https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>


Re: How to pass sparkSession from driver to executor

2017-09-21 Thread lucas.g...@gmail.com
I'm not sure what you're doing.  But I have in the past used spark to
consume a manifest file and then execute a .mapPartition on the result like
this:


def map_key_to_event(s3_events_data_lake):

def _map_key_to_event(event_key_list, s3_client=test_stub):
print("Events in list")
start = time.time()

return_list = []

if s3_client is None:
s3_client = boto3.Session().client('s3')

for event_key in event_key_list:
  try:
response = s3_client.get_object(Bucket=s3_events_data_lake,
Key=event_key)
contents = response['Body'].read().decode('utf-8')
entity = json.loads(contents)
event_type = json.loads(entity["Message"])["type"]
entity["Message"] = json.loads(entity["Message"])
# json.dumps here because Spark doesn't have a good json
datatype.
return_list.append((event_type, json.dumps(entity)))
  except Exception:
print("Key: {k} did not yield a valid object:
{o}".format(k=event_key, o=contents))

end = time.time()
print('time elapsed:')
print(end - start)

return return_list

return _map_key_to_event


pkeys = spark.context.parallelize(full_list_for_time_slice, 32)
print("partitions: ")
print(pkeys.getNumPartitions())
events = pkeys.mapPartitions(map_func)





In this case I'm loading heterogeneous json files with wildly different
schemas, then saving them into parquet file / event type (IE turning one
big heterogeneous dump into numerous smaller homogenous dumps)

I'm sure this isn't the only or even best way to do it.

The underlying issue is that you're trying to violate the programming
model.  The model in this case consists of telling the driver what you want
and then having the executors go do it.

Spark Context is a driver level abstraction, it kind of doesn't make sense
in the executor context, the executor is acting on behalf of the driver and
shouldn't need a back reference to it.  You'd end up with some interesting
execution graphs.

This is a common pattern in spark as far as I can tell.  IE calling a map
and and then doing something with the items in the executor, either
computing or enriching.  My case above is a bit weird and I'm not certain
it's the right mechanism in that I'm literally taking a manifest file and
turning it into 'n' actual records.

Also, if you're going to be constructing a connection string / jdbc call /
s3 client... You really don't want to use a straight .map(func).  You'll
end up instantiating a connection on every iteration.

Hope this is somewhat helpful.

Gary

On 21 September 2017 at 06:31, Weichen Xu  wrote:

> Spark do not allow executor code using `sparkSession`.
> But I think you can move all json files to one directory, and them run:
>
> ```
> spark.read.json("/path/to/jsonFileDir")
> ```
> But if you want to get filename at the same time, you can use
> ```
> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
> ```
>
> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari 
> wrote:
>
>> Depends on your use-case however broadcasting
>> 
>> could be a better option.
>>
>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
>> chaku.mi...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I want to know how to pass sparkSession from driver to executor.
>>>
>>> I have a spark program (batch job) which does following,
>>>
>>> #
>>>
>>> val spark = SparkSession.builder().appName("SampleJob").config("spark.
>>> master", "local") .getOrCreate()
>>>
>>> val df = this is dataframe which has list of file names (hdfs)
>>>
>>> df.foreach { fileName =>
>>>
>>>   *spark.read.json(fileName)*
>>>
>>>   .. some logic here
>>> }
>>>
>>> #
>>>
>>>
>>> *spark.read.json(fileName) --- this fails as it runs in executor. When I
>>> put it outside foreach, i.e. in driver, it works.*
>>>
>>> As I am trying to use spark (sparkSession) in executor which is not
>>> visible outside driver. But I want to read hdfs files inside foreach, how
>>> do I do it.
>>>
>>> Can someone help how to do this.
>>>
>>> Thanks,
>>> Chackra
>>>
>>
>>
>


Re: Question on partitionColumn for a JDBC read using a timestamp from MySql

2017-09-19 Thread lucas.g...@gmail.com
I ended up doing this for the time being.  It works but I *think* that
timestamp seems like a rational partitionColumn and I'm wondering if
there's a more built in way:


> df = spark.read.jdbc(
>
> url=os.environ["JDBCURL"],
>
> table="schema.table",
>
> predicates=predicates
>
> )
>

where Predicates is a list of:

>  ["timestamp between '2017-08-01 00:00:01' and '2017-09-01 00:00:01'",

 "timestamp between '2017-09-01 00:00:01' and '2017-10-01 00:00:01'",

 "timestamp between '2017-10-01 00:00:01' and '2017-11-01 00:00:01'"]




Gets quite nice performance (better than I expected).

Thanks!

On 18 September 2017 at 13:21, lucas.g...@gmail.com <lucas.g...@gmail.com>
wrote:

>  I'm pretty sure you can use a timestamp as a partitionColumn, It's
> Timestamp type in MySQL.  It's at base a numeric type and Spark requires a
> numeric type passed in.
>
> This doesn't work as the where parameter in MySQL becomes raw numerics
> which won't query against the mysql Timestamp.
>
>
> minTimeStamp = 1325605540 <-- This is wrong, but I'm not sure what to put
>> in here.
>>
>> maxTimeStamp = 1505641420
>>
>> numPartitions = 20*7
>>
>>
>> dt = spark.read \
>>
>> .format("jdbc") \
>>
>> .option("url", os.environ["JDBC_URL"]) \
>>
>> .option("dbtable", "schema.table") \
>>
>> .option("numPartitions", numPartitions) \
>>
>> .option("partitionColumn", "Timestamp") \
>>
>> .option("lowerBound", minTimeStamp) \
>>
>> .option("upperBound", maxTimeStamp) \
>>
>> .load()
>>
>
> mysql DB schema:
>
>> create table table
>>
>> (
>>
>> EventId VARCHAR(50) not null primary key,
>>
>> userid VARCHAR(200) null,
>>
>> Timestamp TIMESTAMP(19) default CURRENT_TIMESTAMP not null,
>>
>> Referrer VARCHAR(4000) null,
>>
>> ViewedUrl VARCHAR(4000) null
>>
>> );
>>
>> create index Timestamp on Fact_PageViewed (Timestamp);
>>
>
> I'm obviously doing it wrong, but couldn't find anything obvious while
> digging around.
>
> The query that gets generated looks like this (not exactly, it's optimized
> to include some upstream query parameters):
>
>>
>> *SELECT *`Timestamp`,`Referrer`,`EventId`,`UserId`,`ViewedUrl`
>> *FROM *schema.table  (*Timestamp*)
>> *WHERE  Timestamp *>= 1452916570 *AND Timestamp *< 1454202540;  <-- this
>> doesn't query against mysql timestamp type meaningfully.
>
>
> Thanks!
>
> Gary Lucas
>
>


Question on partitionColumn for a JDBC read using a timestamp from MySql

2017-09-18 Thread lucas.g...@gmail.com
 I'm pretty sure you can use a timestamp as a partitionColumn, It's
Timestamp type in MySQL.  It's at base a numeric type and Spark requires a
numeric type passed in.

This doesn't work as the where parameter in MySQL becomes raw numerics
which won't query against the mysql Timestamp.


minTimeStamp = 1325605540 <-- This is wrong, but I'm not sure what to put
> in here.
>
> maxTimeStamp = 1505641420
>
> numPartitions = 20*7
>
>
> dt = spark.read \
>
> .format("jdbc") \
>
> .option("url", os.environ["JDBC_URL"]) \
>
> .option("dbtable", "schema.table") \
>
> .option("numPartitions", numPartitions) \
>
> .option("partitionColumn", "Timestamp") \
>
> .option("lowerBound", minTimeStamp) \
>
> .option("upperBound", maxTimeStamp) \
>
> .load()
>

mysql DB schema:

> create table table
>
> (
>
> EventId VARCHAR(50) not null primary key,
>
> userid VARCHAR(200) null,
>
> Timestamp TIMESTAMP(19) default CURRENT_TIMESTAMP not null,
>
> Referrer VARCHAR(4000) null,
>
> ViewedUrl VARCHAR(4000) null
>
> );
>
> create index Timestamp on Fact_PageViewed (Timestamp);
>

I'm obviously doing it wrong, but couldn't find anything obvious while
digging around.

The query that gets generated looks like this (not exactly, it's optimized
to include some upstream query parameters):

>
> *SELECT *`Timestamp`,`Referrer`,`EventId`,`UserId`,`ViewedUrl`
> *FROM *schema.table  (*Timestamp*)
> *WHERE  Timestamp *>= 1452916570 *AND Timestamp *< 1454202540;  <-- this
> doesn't query against mysql timestamp type meaningfully.


Thanks!

Gary Lucas


Re: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread lucas.g...@gmail.com
We use S3, there are caveats and issues with that but it can be made to
work.

If interested let me know and I'll show you our workarounds.  I wouldn't do
it naively though, there's lots of potential problems.  If you already have
HDFS use that, otherwise all things told it's probably less effort to use
S3.

Gary

On 29 September 2017 at 05:03, Arun Rai  wrote:

> Or you can try mounting that drive to all node.
>
> On Fri, Sep 29, 2017 at 6:14 AM Jörn Franke  wrote:
>
>> You should use a distributed filesystem such as HDFS. If you want to use
>> the local filesystem then you have to copy each file to each node.
>>
>> > On 29. Sep 2017, at 12:05, Gaurav1809  wrote:
>> >
>> > Hi All,
>> >
>> > I have multi node architecture of (1 master,2 workers) Spark cluster,
>> the
>> > job runs to read CSV file data and it works fine when run on local mode
>> > (Local(*)).
>> > However, when the same job is ran in cluster mode(Spark://HOST:PORT),
>> it is
>> > not able to read it.
>> > I want to know how to reference the files Or where to store them?
>> Currently
>> > the CSV data file is on master(from where the job is submitted).
>> >
>> > Following code works fine in local mode but not in cluster mode.
>> >
>> > val spark = SparkSession
>> >  .builder()
>> >  .appName("SampleFlightsApp")
>> >  .master("spark://masterIP:7077") // change it to
>> .master("local[*])
>> > for local mode
>> >  .getOrCreate()
>> >
>> >val flightDF =
>> > spark.read.option("header",true).csv("/home/username/sampleflightdata")
>> >flightDF.printSchema()
>> >
>> > Error: FileNotFoundException: File file:/home/username/sampleflightdata
>> does
>> > not exist
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark based Data Warehouse

2017-11-17 Thread lucas.g...@gmail.com
We are using Spark on Kubernetes on AWS (it's a long story) but it does
work.  It's still on the raw side but we've been pretty successful.

We configured our cluster primarily with Kube-AWS and auto scaling groups.
There are gotcha's there, but so far we've been quite successful.

Gary Lucas

On 17 November 2017 at 22:20, ashish rawat <dceash...@gmail.com> wrote:

> Thanks everyone for their suggestions. Does any of you take care of auto
> scale up and down of your underlying spark clusters on AWS?
>
> On Nov 14, 2017 10:46 AM, "lucas.g...@gmail.com" <lucas.g...@gmail.com>
> wrote:
>
> Hi Ashish, bear in mind that EMR has some additional tooling available
> that smoothes out some S3 problems that you may / almost certainly will
> encounter.
>
> We are using Spark / S3 not on EMR and have encountered issues with file
> consistency, you can deal with it but be aware it's additional technical
> debt that you'll need to own.  We didn't want to own an HDFS cluster so we
> consider it worthwhile.
>
> Here are some additional resources:  The video is Steve Loughran talking
> about S3.
> https://medium.com/@subhojit20_27731/apache-spark-and-
> amazon-s3-gotchas-and-best-practices-a767242f3d98
> https://www.youtube.com/watch?v=ND4L_zSDqF0
>
> For the record we use S3 heavily but tend to drop our processed data into
> databases so they can be more easily consumed by visualization tools.
>
> Good luck!
>
> Gary Lucas
>
> On 13 November 2017 at 20:04, Affan Syed <as...@an10.io> wrote:
>
>> Another option that we are trying internally is to uses Mesos for
>> isolating different jobs or groups. Within a single group, using Livy to
>> create different spark contexts also works.
>>
>> - Affan
>>
>> On Tue, Nov 14, 2017 at 8:43 AM, ashish rawat <dceash...@gmail.com>
>> wrote:
>>
>>> Thanks Sky Yin. This really helps.
>>>
>>> On Nov 14, 2017 12:11 AM, "Sky Yin" <sky@gmail.com> wrote:
>>>
>>> We are running Spark in AWS EMR as data warehouse. All data are in S3
>>> and metadata in Hive metastore.
>>>
>>> We have internal tools to creat juypter notebook on the dev cluster. I
>>> guess you can use zeppelin instead, or Livy?
>>>
>>> We run genie as a job server for the prod cluster, so users have to
>>> submit their queries through the genie. For better resource utilization, we
>>> rely on Yarn dynamic allocation to balance the load of multiple
>>> jobs/queries in Spark.
>>>
>>> Hope this helps.
>>>
>>> On Sat, Nov 11, 2017 at 11:21 PM ashish rawat <dceash...@gmail.com>
>>> wrote:
>>>
>>>> Hello Everyone,
>>>>
>>>> I was trying to understand if anyone here has tried a data warehouse
>>>> solution using S3 and Spark SQL. Out of multiple possible options
>>>> (redshift, presto, hive etc), we were planning to go with Spark SQL, for
>>>> our aggregates and processing requirements.
>>>>
>>>> If anyone has tried it out, would like to understand the following:
>>>>
>>>>1. Is Spark SQL and UDF, able to handle all the workloads?
>>>>2. What user interface did you provide for data scientist, data
>>>>engineers and analysts
>>>>3. What are the challenges in running concurrent queries, by many
>>>>users, over Spark SQL? Considering Spark still does not provide spill to
>>>>disk, in many scenarios, are there frequent query failures when 
>>>> executing
>>>>concurrent queries
>>>>4. Are there any open source implementations, which provide
>>>>something similar?
>>>>
>>>>
>>>> Regards,
>>>> Ashish
>>>>
>>>
>>>
>>
>
>


Re: Spark based Data Warehouse

2017-11-13 Thread lucas.g...@gmail.com
Hi Ashish, bear in mind that EMR has some additional tooling available that
smoothes out some S3 problems that you may / almost certainly will
encounter.

We are using Spark / S3 not on EMR and have encountered issues with file
consistency, you can deal with it but be aware it's additional technical
debt that you'll need to own.  We didn't want to own an HDFS cluster so we
consider it worthwhile.

Here are some additional resources:  The video is Steve Loughran talking
about S3.
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98
https://www.youtube.com/watch?v=ND4L_zSDqF0

For the record we use S3 heavily but tend to drop our processed data into
databases so they can be more easily consumed by visualization tools.

Good luck!

Gary Lucas

On 13 November 2017 at 20:04, Affan Syed  wrote:

> Another option that we are trying internally is to uses Mesos for
> isolating different jobs or groups. Within a single group, using Livy to
> create different spark contexts also works.
>
> - Affan
>
> On Tue, Nov 14, 2017 at 8:43 AM, ashish rawat  wrote:
>
>> Thanks Sky Yin. This really helps.
>>
>> On Nov 14, 2017 12:11 AM, "Sky Yin"  wrote:
>>
>> We are running Spark in AWS EMR as data warehouse. All data are in S3 and
>> metadata in Hive metastore.
>>
>> We have internal tools to creat juypter notebook on the dev cluster. I
>> guess you can use zeppelin instead, or Livy?
>>
>> We run genie as a job server for the prod cluster, so users have to
>> submit their queries through the genie. For better resource utilization, we
>> rely on Yarn dynamic allocation to balance the load of multiple
>> jobs/queries in Spark.
>>
>> Hope this helps.
>>
>> On Sat, Nov 11, 2017 at 11:21 PM ashish rawat 
>> wrote:
>>
>>> Hello Everyone,
>>>
>>> I was trying to understand if anyone here has tried a data warehouse
>>> solution using S3 and Spark SQL. Out of multiple possible options
>>> (redshift, presto, hive etc), we were planning to go with Spark SQL, for
>>> our aggregates and processing requirements.
>>>
>>> If anyone has tried it out, would like to understand the following:
>>>
>>>1. Is Spark SQL and UDF, able to handle all the workloads?
>>>2. What user interface did you provide for data scientist, data
>>>engineers and analysts
>>>3. What are the challenges in running concurrent queries, by many
>>>users, over Spark SQL? Considering Spark still does not provide spill to
>>>disk, in many scenarios, are there frequent query failures when executing
>>>concurrent queries
>>>4. Are there any open source implementations, which provide
>>>something similar?
>>>
>>>
>>> Regards,
>>> Ashish
>>>
>>
>>
>


Re: Writing files to s3 with out temporary directory

2017-11-20 Thread lucas.g...@gmail.com
That sounds like allot of work and if I understand you correctly it sounds
like a piece of middleware that already exists (I could be wrong).  Alluxio?

Good luck and let us know how it goes!

Gary

On 20 November 2017 at 14:10, Jim Carroll  wrote:

> Thanks. In the meantime I might just write a custom file system that maps
> writes to parquet file parts to their final locations and then skips the
> move.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Writing files to s3 with out temporary directory

2017-11-20 Thread lucas.g...@gmail.com
You can expect to see some fixes for this sort of issue in the medium term
future (multiple months, probably not years).

As Taylor notes, it's a Hadoop problem, not a spark problem.  So whichever
version of hadoop includes the fix will then wait for a spark release to
get built against it.  Last I checked they were targeting v3.0 for hadoop.

Other's have listed some middle-ware style fixes which we haven't tried.
We've just been writing to the local FS and then using boto to copy them
up.  Our use case has lots of slack in the timeliness though so although we
know it's an issue, it's not something that's a serious enough problem to
try to fix on our own at this point.

Gary

On 20 November 2017 at 12:56, Tayler Lawrence Jones 
wrote:

> It is an open issue with Hadoop file committer, not spark. The simple
> workaround is to write to hdfs then copy to s3. Netflix did a talk about
> their custom output committer at the last spark summit which is a clever
> efficient way of doing that - I’d check it out on YouTube. They have open
> sourced their implementation, but it does not work (out the box) with
> parquet.
>
> -TJ
>
> On Mon, Nov 20, 2017 at 11:48 Jim Carroll  wrote:
>
>> I have this exact issue. I was going to intercept the call in the
>> filesystem
>> if I had to (since we're using the S3 filesystem from Presto anyway) but
>> if
>> there's simply a way to do this correctly I'd much prefer it. This
>> basically
>> doubles the time to write parquet files to s3.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: What do you pay attention to when validating Spark jobs?

2017-11-21 Thread lucas.g...@gmail.com
I don't think these will blow anyones minds but:

1) Row counts.  Most of our jobs 'recompute the world' nightly so we can
expect to see fairly predictable row variances.
2) Rolling snapshots.  We can also expect that for some critical datasets
we can compute a rolling average for important metrics (revenue, user
count, etc).  We're just starting to investigate this.
3) Job timing:  Jobs should normally take about the same amount of time to
execute (usually).  So we want to alert on things that finish too quickly
(no data in the pipe) or things that take too long.

I'd like to get further into anomaly detection but haven't gotten there yet.

On 21 November 2017 at 15:34, Holden Karau  wrote:

> Hi Folks,
>
> I'm working on updating a talk and I was wondering if any folks in the
> community wanted to share their best practices for validating your Spark
> jobs? Are there any counters folks have found useful for
> monitoring/validating your Spark jobs?
>
> Cheers,
>
> Holden :)
>
> --
> Twitter: https://twitter.com/holdenkarau
>


Re: spark session jdbc performance

2017-10-25 Thread lucas.g...@gmail.com
Gourav, I'm assuming you misread the code.  It's 30 partitions, which isn't
a ridiculous value.  Maybe you misread the upperBound for the partitions?
(That would be ridiculous)

Why not use the PK as the partition column?  Obviously it depends on the
downstream queries.  If you're going to be performing joins (which I assume
is the case) then partitioning on the join column would be advisable, but
what about the case where the join column would be heavily skewed?

Thanks!

Gary

On 24 October 2017 at 23:41, Gourav Sengupta 
wrote:

> Hi Naveen,
>
> I do not think that it is prudent to use the PK as the partitionColumn.
> That is too many partitions for any system to handle. The numPartitions
> will be valid in case of JDBC very differently.
>
> Please keep me updated on how things go.
>
>
> Regards,
> Gourav Sengupta
>
> On Tue, Oct 24, 2017 at 10:54 PM, Naveen Madhire 
> wrote:
>
>>
>> Hi,
>>
>>
>>
>> I am trying to fetch data from Oracle DB using a subquery and
>> experiencing lot of performance issues.
>>
>>
>>
>> Below is the query I am using,
>>
>>
>>
>> *Using Spark 2.0.2*
>>
>>
>>
>> *val *df = spark_session.read.format(*"jdbc"*)
>> .option(*"driver"*,*"*oracle.jdbc.OracleDriver*"*)
>> .option(*"url"*, jdbc_url)
>>.option(*"user"*, user)
>>.option(*"password"*, pwd)
>>.option(*"dbtable"*, *"subquery"*)
>>.option(*"partitionColumn"*, *"id"*)  //primary key column uniformly
>> distributed
>>.option(*"lowerBound"*, *"1"*)
>>.option(*"upperBound"*, *"50"*)
>> .option(*"numPartitions"*, 30)
>> .load()
>>
>>
>>
>> The above query is running using the 30 partitions, but when I see the UI
>> it is only using 1 partiton to run the query.
>>
>>
>>
>> Can anyone tell if I am missing anything or do I need to anything else to
>> tune the performance of the query.
>>
>>  *Thanks*
>>
>
>


Re: spark session jdbc performance

2017-10-25 Thread lucas.g...@gmail.com
Are we seeing the UI is showing only one partition to run the query?  The
original poster hasn't replied yet.

My assumption is that there's only one executor configured / deployed.  But
we only know what the OP stated which wasn't enough to be sure of anything.

Why are you suggesting that partitioning on the PK isn't prudent? and did
you mean to say that 30 partitions were far to many for any system to
handle?  (I'm assuming you misread the original code)

Gary

On 25 October 2017 at 13:21, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Lucas,
>
> so if I am assuming things, can you please explain why the UI is showing
> only one partition to run the query?
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Oct 25, 2017 at 6:03 PM, lucas.g...@gmail.com <
> lucas.g...@gmail.com> wrote:
>
>> Gourav, I'm assuming you misread the code.  It's 30 partitions, which
>> isn't a ridiculous value.  Maybe you misread the upperBound for the
>> partitions?  (That would be ridiculous)
>>
>> Why not use the PK as the partition column?  Obviously it depends on the
>> downstream queries.  If you're going to be performing joins (which I assume
>> is the case) then partitioning on the join column would be advisable, but
>> what about the case where the join column would be heavily skewed?
>>
>> Thanks!
>>
>> Gary
>>
>> On 24 October 2017 at 23:41, Gourav Sengupta <gourav.sengu...@gmail.com>
>> wrote:
>>
>>> Hi Naveen,
>>>
>>> I do not think that it is prudent to use the PK as the partitionColumn.
>>> That is too many partitions for any system to handle. The numPartitions
>>> will be valid in case of JDBC very differently.
>>>
>>> Please keep me updated on how things go.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Tue, Oct 24, 2017 at 10:54 PM, Naveen Madhire <vmadh...@umail.iu.edu>
>>> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I am trying to fetch data from Oracle DB using a subquery and
>>>> experiencing lot of performance issues.
>>>>
>>>>
>>>>
>>>> Below is the query I am using,
>>>>
>>>>
>>>>
>>>> *Using Spark 2.0.2*
>>>>
>>>>
>>>>
>>>> *val *df = spark_session.read.format(*"jdbc"*)
>>>> .option(*"driver"*,*"*oracle.jdbc.OracleDriver*"*)
>>>> .option(*"url"*, jdbc_url)
>>>>.option(*"user"*, user)
>>>>.option(*"password"*, pwd)
>>>>.option(*"dbtable"*, *"subquery"*)
>>>>.option(*"partitionColumn"*, *"id"*)  //primary key column
>>>> uniformly distributed
>>>>.option(*"lowerBound"*, *"1"*)
>>>>.option(*"upperBound"*, *"50"*)
>>>> .option(*"numPartitions"*, 30)
>>>> .load()
>>>>
>>>>
>>>>
>>>> The above query is running using the 30 partitions, but when I see the
>>>> UI it is only using 1 partiton to run the query.
>>>>
>>>>
>>>>
>>>> Can anyone tell if I am missing anything or do I need to anything else
>>>> to tune the performance of the query.
>>>>
>>>>  *Thanks*
>>>>
>>>
>>>
>>
>


Re: spark session jdbc performance

2017-10-24 Thread lucas.g...@gmail.com
Did you check the query plan / check the UI?

That code looks same to me.  Maybe you've only configured for one executor?

Gary

On Oct 24, 2017 2:55 PM, "Naveen Madhire"  wrote:

>
> Hi,
>
>
>
> I am trying to fetch data from Oracle DB using a subquery and experiencing
> lot of performance issues.
>
>
>
> Below is the query I am using,
>
>
>
> *Using Spark 2.0.2*
>
>
>
> *val *df = spark_session.read.format(*"jdbc"*)
> .option(*"driver"*,*"*oracle.jdbc.OracleDriver*"*)
> .option(*"url"*, jdbc_url)
>.option(*"user"*, user)
>.option(*"password"*, pwd)
>.option(*"dbtable"*, *"subquery"*)
>.option(*"partitionColumn"*, *"id"*)  //primary key column uniformly
> distributed
>.option(*"lowerBound"*, *"1"*)
>.option(*"upperBound"*, *"50"*)
> .option(*"numPartitions"*, 30)
> .load()
>
>
>
> The above query is running using the 30 partitions, but when I see the UI
> it is only using 1 partiton to run the query.
>
>
>
> Can anyone tell if I am missing anything or do I need to anything else to
> tune the performance of the query.
>
>  *Thanks*
>


Re: Spark streaming for CEP

2017-10-24 Thread lucas.g...@gmail.com
This looks really interesting, thanks for linking!

Gary Lucas

On 24 October 2017 at 15:06, Mich Talebzadeh 
wrote:

> Great thanks Steve
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 24 October 2017 at 22:58, Stephen Boesch  wrote:
>
>> Hi Mich, the github link has a brief intro - including a link to the
>> formal docs http://logisland.readthedocs.io/en/latest/index.html .
>>  They have an architectural overview, developer guide, tutorial, and pretty
>> comprehensive api docs.
>>
>> 2017-10-24 13:31 GMT-07:00 Mich Talebzadeh :
>>
>>> thanks Thomas.
>>>
>>> do you have a summary write-up for this tool please?
>>>
>>>
>>> regards,
>>>
>>>
>>>
>>>
>>> Thomas
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 24 October 2017 at 13:53, Thomas Bailet 
>>> wrote:
>>>
 Hi

 we (@ hurence) have released on open source middleware based on
 SparkStreaming over Kafka to do CEP and log mining, called *logisland*
 (https://github.com/Hurence/logisland/) it has been deployed into
 production for 2 years now and does a great job. You should have a look.


 bye

 Thomas Bailet

 CTO : hurence

 Le 18/10/17 à 22:05, Mich Talebzadeh a écrit :

 As you may be aware the granularity that Spark streaming has is
 micro-batching and that is limited to 0.5 second. So if you have continuous
 ingestion of data then Spark streaming may not be granular enough for CEP.
 You may consider other products.

 Worth looking at this old thread on mine "Spark support for Complex
 Event Processing (CEP)

 https://mail-archives.apache.org/mod_mbox/spark-user/201604.
 mbox/%3CCAJ3fcbB8eaf0JV84bA7XGUK5GajC1yGT3ZgTNCi8arJg56=LbQ@
 mail.gmail.com%3E

 HTH


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 18 October 2017 at 20:52, anna stax  wrote:

> Hello all,
>
> Has anyone used spark streaming for CEP (Complex Event processing).
> Any CEP libraries that works well with spark. I have a use case for CEP 
> and
> trying to see if spark streaming is a good fit.
>
> Currently we have a data pipeline using Kafka, Spark streaming and
> Cassandra for data ingestion and near real time dashboard.
>
> Please share your experience.
> Thanks much.
> -Anna
>
>
>


>>>
>>
>


Re: spark session jdbc performance

2017-10-24 Thread lucas.g...@gmail.com
Sorry, I meant to say: "That code looks SANE to me"

Assuming that you're seeing the query running partitioned as expected then
you're likely configured with one executor.  Very easy to check in the UI.

Gary Lucas

On 24 October 2017 at 16:09, lucas.g...@gmail.com <lucas.g...@gmail.com>
wrote:

> Did you check the query plan / check the UI?
>
> That code looks same to me.  Maybe you've only configured for one executor?
>
> Gary
>
> On Oct 24, 2017 2:55 PM, "Naveen Madhire" <vmadh...@umail.iu.edu> wrote:
>
>>
>> Hi,
>>
>>
>>
>> I am trying to fetch data from Oracle DB using a subquery and
>> experiencing lot of performance issues.
>>
>>
>>
>> Below is the query I am using,
>>
>>
>>
>> *Using Spark 2.0.2*
>>
>>
>>
>> *val *df = spark_session.read.format(*"jdbc"*)
>> .option(*"driver"*,*"*oracle.jdbc.OracleDriver*"*)
>> .option(*"url"*, jdbc_url)
>>.option(*"user"*, user)
>>.option(*"password"*, pwd)
>>.option(*"dbtable"*, *"subquery"*)
>>.option(*"partitionColumn"*, *"id"*)  //primary key column uniformly
>> distributed
>>.option(*"lowerBound"*, *"1"*)
>>.option(*"upperBound"*, *"50"*)
>> .option(*"numPartitions"*, 30)
>> .load()
>>
>>
>>
>> The above query is running using the 30 partitions, but when I see the UI
>> it is only using 1 partiton to run the query.
>>
>>
>>
>> Can anyone tell if I am missing anything or do I need to anything else to
>> tune the performance of the query.
>>
>>  *Thanks*
>>
>


Does Apache Spark take into account JDBC indexes / statistics when optimizing queries?

2017-10-19 Thread lucas.g...@gmail.com
IE:  If my JDBC table has an index on it, will the optimizer consider that
when pushing predicates down?

I noticed in a query like this:

df = spark.hiveContext.read.jdbc(
  url=jdbc_url,
  table="schema.table",
  column="id",
  lowerBound=lower_bound_id,
  upperBound=upper_bound_id,
  numPartitions=numberPartitions
)
df.registerTempTable("df")

filtered_df = spark.hiveContext.sql("""
SELECT
*
FROM
df
WHERE
type = 'type'
AND action = 'action'
AND audited_changes LIKE '---\ncompany_id:\n- %'
""")
filtered_audits.registerTempTable("filtered_df")


The queries sent to the DB look like this:
"Select fields from schema.table where type='type' and action='action' and
id > lower_bound and id <= upper_bound"

And then it does the like ( LIKE '---\ncompany_id:\n- %') in memory, which
is great!

However I'm wondering why it chooses that optimization.  In this case there
aren't any indexes on any of these except ID.

So, does spark take into account JDBC indexes in it's query plan where it
can?

Thanks!

Gary Lucas


Re: Does Apache Spark take into account JDBC indexes / statistics when optimizing queries?

2017-10-19 Thread lucas.g...@gmail.com
Ok, so when Spark is forming queries it's ignorant of the underlying
storage layer index.

If there is an index on a table Spark doesn't take that into account when
doing the predicate push down in optimization. In that case why does spark
push 2 of my conditions (where fieldx = 'action') to the database but then
do the like in memory.  Is that just a function a straightforward LIKE's
are done in memory and simple equalities are pushed to the storage layer?

remember your indexes are in RDBMS


Exactly what I'm asking about, when spark issues the query via the JDBC
reader, that query is / is not ignorant of the underlying indexes?  How
does spark determine which predicates to perform in the RDD and which
predicates to execute in the storage layer?  I guess I should just dig out
the JDBC data-frame reader code and see if I can make sense of that?  Or is
the predicate push-down stage independent of the readers?

Thanks for helping me form a more accurate question!

Gary



On 19 October 2017 at 15:46, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> remember your indexes are in RDBMS. In this case MySQL. When you are
> reading from that table you have an 'id' column which I assume is an
> integer and you are making parallel threads through JDBC connection to that
> table. You can see the threads in MySQL if you query it. You can see
> multiple threads. You stated numPartitions but MySQL will decide how many
> parallel threads it can handle.
>
> So data is read into Spark to RDDs and you can se that through SPAK GUI
> (port 4040 by default). Then you create a DataFrame (DF) and convert it
> into a tempTable. tempTable will not have any indexes. This is happening in
> Spark space not MySQL. Once you start reading in your query and collect
> data then it will try to cache data in Spark memory. You can see this again
> through Spark GUI. You can see the optimizer by using explain() function.
> You will see that no index is used.
>
> Spark uses distributed data in memory to optimize the work. It does not
> use any index. In RDBMS an index is an ordered set of column or columns
> stored on the disk in B-tree format to improve the query where needed.
> Spark tempTable does not follow that method. So in summary your tempTable
> will benefit from more executors and memory if you want to improve the
> query performance.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 19 October 2017 at 23:29, lucas.g...@gmail.com <lucas.g...@gmail.com>
> wrote:
>
>> If the underlying table(s) have indexes on them.  Does spark use those
>> indexes to optimize the query?
>>
>> IE if I had a table in my JDBC data source (mysql in this case) had
>> several indexes and my query was filtering on one of the fields with an
>> index.  Would spark know to push that predicate to the database or is the
>> predicate push-down ignorant of the underlying storage layer details.
>>
>> Apologies if that still doesn't adequately explain my question.
>>
>> Gary Lucas
>>
>> On 19 October 2017 at 15:19, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> sorry what do you mean my JDBC table has an index on it? Where are you
>>> reading the data from the table?
>>>
>>> I assume you are referring to "id" column on the table that you are
>>> reading through JDBC connection.
>>>
>>> Then you are creating a temp Table called "df". That temp table is
>>> created in temporary work space and does not have any index. That index
>>> "id" is used when doing parallel reads into RDDs not when querying the temp
>>> Table.
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and 

Re: Does Apache Spark take into account JDBC indexes / statistics when optimizing queries?

2017-10-19 Thread lucas.g...@gmail.com
If the underlying table(s) have indexes on them.  Does spark use those
indexes to optimize the query?

IE if I had a table in my JDBC data source (mysql in this case) had several
indexes and my query was filtering on one of the fields with an index.
Would spark know to push that predicate to the database or is the predicate
push-down ignorant of the underlying storage layer details.

Apologies if that still doesn't adequately explain my question.

Gary Lucas

On 19 October 2017 at 15:19, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> sorry what do you mean my JDBC table has an index on it? Where are you
> reading the data from the table?
>
> I assume you are referring to "id" column on the table that you are
> reading through JDBC connection.
>
> Then you are creating a temp Table called "df". That temp table is created
> in temporary work space and does not have any index. That index "id" is
> used when doing parallel reads into RDDs not when querying the temp Table.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 19 October 2017 at 23:10, lucas.g...@gmail.com <lucas.g...@gmail.com>
> wrote:
>
>> IE:  If my JDBC table has an index on it, will the optimizer consider
>> that when pushing predicates down?
>>
>> I noticed in a query like this:
>>
>> df = spark.hiveContext.read.jdbc(
>>   url=jdbc_url,
>>   table="schema.table",
>>   column="id",
>>   lowerBound=lower_bound_id,
>>   upperBound=upper_bound_id,
>>   numPartitions=numberPartitions
>> )
>> df.registerTempTable("df")
>>
>> filtered_df = spark.hiveContext.sql("""
>> SELECT
>> *
>> FROM
>> df
>> WHERE
>> type = 'type'
>> AND action = 'action'
>> AND audited_changes LIKE '---\ncompany_id:\n- %'
>> """)
>> filtered_audits.registerTempTable("filtered_df")
>>
>>
>> The queries sent to the DB look like this:
>> "Select fields from schema.table where type='type' and action='action'
>> and id > lower_bound and id <= upper_bound"
>>
>> And then it does the like ( LIKE '---\ncompany_id:\n- %') in memory,
>> which is great!
>>
>> However I'm wondering why it chooses that optimization.  In this case
>> there aren't any indexes on any of these except ID.
>>
>> So, does spark take into account JDBC indexes in it's query plan where it
>> can?
>>
>> Thanks!
>>
>> Gary Lucas
>>
>
>


Re: Suggestions on using scala/python for Spark Streaming

2017-10-26 Thread lucas.g...@gmail.com
I don't have any specific wisdom for you on that front.  But I've always
been served well by the 'Try both' approach.

Set up your benchmarks, configure both setups...  You don't have to go the
whole hog, but just enough to get a mostly realistic implementation
functional.  Run them both with some captured / fixture data...  And
compare.

I personally haven't come across a situation where you just have to go
scala, but I've come across multiple situations where it was preferable but
not by a big enough margin to retool a team and a product.

On the plus side you'll be well setup for integration tests with whichever
system you end up rolling out!

Good luck!  and i'd love to hear any findings discovery you may come across!

Gary Lucas

On 26 October 2017 at 09:22, umargeek  wrote:

> We are building a spark streaming application which is process and time
> intensive and currently using python API but looking forward for
> suggestions
> whether to use Scala over python such as pro's and con's as we are planning
> to production setup as next step?
>
> Thanks,
> Umar
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Controlling number of spark partitions in dataframes

2017-10-26 Thread lucas.g...@gmail.com
I think we'd need to see the code that loads the df.

Parallelism and partition count are related but they're not the same.  I've
found the documentation fuzzy on this, but it looks like
default.parrallelism is what spark uses for partitioning when it has no
other guidance.  I'm also under the impression (and I could be wrong here)
that the data loading step has some impact on partitioning.

In any case, I think it would be more helpful with the df loading code.

Good luck!

Gary Lucas

On 26 October 2017 at 09:35, Noorul Islam Kamal Malmiyoda  wrote:

> Hi all,
>
> I have the following spark configuration
>
> spark.app.name=Test
> spark.cassandra.connection.host=127.0.0.1
> spark.cassandra.connection.keep_alive_ms=5000
> spark.cassandra.connection.port=1
> spark.cassandra.connection.timeout_ms=3
> spark.cleaner.ttl=3600
> spark.default.parallelism=4
> spark.master=local[2]
> spark.ui.enabled=false
> spark.ui.showConsoleProgress=false
>
> Because I am setting spark.default.parallelism to 4, I was expecting
> only 4 spark partitions. But it looks like it is not the case
>
> When I do the following
>
> df.foreachPartition { partition =>
>   val groupedPartition = partition.toList.grouped(3).toList
>   println("Grouped partition " + groupedPartition)
> }
>
> There are too many print statements with empty list at the top. Only
> the relevant partitions are at the bottom. Is there a way to control
> number of partitions?
>
> Regards,
> Noorul
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Does Apache Spark take into account JDBC indexes / statistics when optimizing queries?

2017-10-20 Thread lucas.g...@gmail.com
Right, that makes sense and I understood that.

The thing I'm wondering about (And i think the answer is 'no' at this
stage).

When the optimizer is running and pushing predicates down, does it take
into account indexing and other storage layer strategies in determining
which predicates are processed in memory and which predicates are pushed to
storage.

Thanks!

Gary Lucas


On 20 October 2017 at 07:32, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> here below Gary
>
> filtered_df = spark.hiveContext.sql("""
> SELECT
> *
> FROM
> df
> WHERE
> type = 'type'
> AND action = 'action'
> AND audited_changes LIKE '---\ncompany_id:\n- %'
> """)
> filtered_audits.registerTempTable("filtered_df")
>
>
> you are using hql to read data from your temporary table "df" and then
> creating a temporary table on the subset of that temptable "df".
>
> What is the  purpose of it?
>
> When you are within Spark itself data is read in. Granted the indexes on
> RDBMS help reading data through the JDBC connection but do not play any
> role later in running the sal in hql.
>
> Does that make sense?
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 20 October 2017 at 00:04, lucas.g...@gmail.com <lucas.g...@gmail.com>
> wrote:
>
>> Ok, so when Spark is forming queries it's ignorant of the underlying
>> storage layer index.
>>
>> If there is an index on a table Spark doesn't take that into account when
>> doing the predicate push down in optimization. In that case why does spark
>> push 2 of my conditions (where fieldx = 'action') to the database but then
>> do the like in memory.  Is that just a function a straightforward LIKE's
>> are done in memory and simple equalities are pushed to the storage layer?
>>
>> remember your indexes are in RDBMS
>>
>>
>> Exactly what I'm asking about, when spark issues the query via the JDBC
>> reader, that query is / is not ignorant of the underlying indexes?  How
>> does spark determine which predicates to perform in the RDD and which
>> predicates to execute in the storage layer?  I guess I should just dig out
>> the JDBC data-frame reader code and see if I can make sense of that?  Or is
>> the predicate push-down stage independent of the readers?
>>
>> Thanks for helping me form a more accurate question!
>>
>> Gary
>>
>>
>>
>> On 19 October 2017 at 15:46, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> remember your indexes are in RDBMS. In this case MySQL. When you are
>>> reading from that table you have an 'id' column which I assume is an
>>> integer and you are making parallel threads through JDBC connection to that
>>> table. You can see the threads in MySQL if you query it. You can see
>>> multiple threads. You stated numPartitions but MySQL will decide how many
>>> parallel threads it can handle.
>>>
>>> So data is read into Spark to RDDs and you can se that through SPAK GUI
>>> (port 4040 by default). Then you create a DataFrame (DF) and convert it
>>> into a tempTable. tempTable will not have any indexes. This is happening in
>>> Spark space not MySQL. Once you start reading in your query and collect
>>> data then it will try to cache data in Spark memory. You can see this again
>>> through Spark GUI. You can see the optimizer by using explain() function.
>>> You will see that no index is used.
>>>
>>> Spark uses distributed data in memory to optimize the work. It does not
>>> use any index. In RDBMS an index is an ordered set of column or columns
>>> stored on the disk in B-tree format to improve the query where needed.
>>> Spark tempTable does not follow that method. So in summary your tempTable
>>> will benefit from more executors and memory if you want to improve the
>>> query performance.
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
&g

Re: Controlling number of spark partitions in dataframes

2017-10-26 Thread lucas.g...@gmail.com
Thanks Daniel!

I've been wondering that for ages!

IE where my JDBC sourced datasets are coming up with 200 partitions on
write to S3.

What do you mean for (except for the initial read)?

Can you explain that a bit further?

Gary Lucas

On 26 October 2017 at 11:28, Daniel Siegmann  wrote:

> When working with datasets, Spark uses spark.sql.shuffle.partitions. It
> defaults to 200. Between that and the default parallelism you can control
> the number of partitions (except for the initial read).
>
> More info here: http://spark.apache.org/docs/latest/sql-programming-guide.
> html#other-configuration-options
>
> I have no idea why it defaults to a fixed 200 (while default parallelism
> defaults to a number scaled to your number of cores), or why there are two
> separate configuration properties.
>
>
> --
> Daniel Siegmann
> Senior Software Engineer
> *SecurityScorecard Inc.*
> 214 W 29th Street, 5th Floor
> 
> New York, NY 10001
> 
>
>
> On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma 
> wrote:
>
>> I guess the issue is spark.default.parallelism is ignored when you are
>> working with Data frames.It is supposed to work with only raw RDDs.
>>
>> Thanks
>> Deepak
>>
>> On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <
>> noo...@noorul.com> wrote:
>>
>>> Hi all,
>>>
>>> I have the following spark configuration
>>>
>>> spark.app.name=Test
>>> spark.cassandra.connection.host=127.0.0.1
>>> spark.cassandra.connection.keep_alive_ms=5000
>>> spark.cassandra.connection.port=1
>>> spark.cassandra.connection.timeout_ms=3
>>> spark.cleaner.ttl=3600
>>> spark.default.parallelism=4
>>> spark.master=local[2]
>>> spark.ui.enabled=false
>>> spark.ui.showConsoleProgress=false
>>>
>>> Because I am setting spark.default.parallelism to 4, I was expecting
>>> only 4 spark partitions. But it looks like it is not the case
>>>
>>> When I do the following
>>>
>>> df.foreachPartition { partition =>
>>>   val groupedPartition = partition.toList.grouped(3).toList
>>>   println("Grouped partition " + groupedPartition)
>>> }
>>>
>>> There are too many print statements with empty list at the top. Only
>>> the relevant partitions are at the bottom. Is there a way to control
>>> number of partitions?
>>>
>>> Regards,
>>> Noorul
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
>


Re: Controlling number of spark partitions in dataframes

2017-10-26 Thread lucas.g...@gmail.com
Ok, so for JDBC I presume it defaults to a single partition if you don't
provide partitioning meta data?

Thanks!

Gary

On 26 October 2017 at 13:43, Daniel Siegmann <dsiegm...@securityscorecard.io
> wrote:

> Those settings apply when a shuffle happens. But they don't affect the way
> the data will be partitioned when it is initially read, for example
> spark.read.parquet("path/to/input"). So for HDFS / S3 I think it depends
> on how the data is split into chunks, but if there are lots of small chunks
> Spark will automatically merge them into small partitions. There are going
> to be various settings depending on what you're reading from.
>
> val df = spark.read.parquet("path/to/input") // partitioning will depend
> on the data
> val df2 = df.groupBy("thing").count() // a shuffle happened, so shuffle
> partitioning configuration applies
>
>
> Tip: gzip files can't be split, so if you read a gzip file everything will
> be in one partition. That's a good reason to avoid large gzip files. :-)
>
> If you don't have a shuffle but you want to change how many partitions
> there are, you will need to coalesce or repartition.
>
>
> --
> Daniel Siegmann
> Senior Software Engineer
> *SecurityScorecard Inc.*
> 214 W 29th Street, 5th Floor
> <https://maps.google.com/?q=214+W+29th+Street,+5th+FloorNew+York,+NY+10001=gmail=g>
> New York, NY 10001
> <https://maps.google.com/?q=214+W+29th+Street,+5th+FloorNew+York,+NY+10001=gmail=g>
>
>
> On Thu, Oct 26, 2017 at 11:31 AM, lucas.g...@gmail.com <
> lucas.g...@gmail.com> wrote:
>
>> Thanks Daniel!
>>
>> I've been wondering that for ages!
>>
>> IE where my JDBC sourced datasets are coming up with 200 partitions on
>> write to S3.
>>
>> What do you mean for (except for the initial read)?
>>
>> Can you explain that a bit further?
>>
>> Gary Lucas
>>
>> On 26 October 2017 at 11:28, Daniel Siegmann <
>> dsiegm...@securityscorecard.io> wrote:
>>
>>> When working with datasets, Spark uses spark.sql.shuffle.partitions. It
>>> defaults to 200. Between that and the default parallelism you can control
>>> the number of partitions (except for the initial read).
>>>
>>> More info here: http://spark.apache.org/docs/l
>>> atest/sql-programming-guide.html#other-configuration-options
>>>
>>> I have no idea why it defaults to a fixed 200 (while default parallelism
>>> defaults to a number scaled to your number of cores), or why there are two
>>> separate configuration properties.
>>>
>>>
>>> --
>>> Daniel Siegmann
>>> Senior Software Engineer
>>> *SecurityScorecard Inc.*
>>> 214 W 29th Street, 5th Floor
>>> <https://maps.google.com/?q=214+W+29th+Street,+5th+FloorNew+York,+NY+10001=gmail=g>
>>> New York, NY 10001
>>> <https://maps.google.com/?q=214+W+29th+Street,+5th+FloorNew+York,+NY+10001=gmail=g>
>>>
>>>
>>> On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma <deepakmc...@gmail.com>
>>> wrote:
>>>
>>>> I guess the issue is spark.default.parallelism is ignored when you are
>>>> working with Data frames.It is supposed to work with only raw RDDs.
>>>>
>>>> Thanks
>>>> Deepak
>>>>
>>>> On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <
>>>> noo...@noorul.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have the following spark configuration
>>>>>
>>>>> spark.app.name=Test
>>>>> spark.cassandra.connection.host=127.0.0.1
>>>>> spark.cassandra.connection.keep_alive_ms=5000
>>>>> spark.cassandra.connection.port=1
>>>>> spark.cassandra.connection.timeout_ms=3
>>>>> spark.cleaner.ttl=3600
>>>>> spark.default.parallelism=4
>>>>> spark.master=local[2]
>>>>> spark.ui.enabled=false
>>>>> spark.ui.showConsoleProgress=false
>>>>>
>>>>> Because I am setting spark.default.parallelism to 4, I was expecting
>>>>> only 4 spark partitions. But it looks like it is not the case
>>>>>
>>>>> When I do the following
>>>>>
>>>>> df.foreachPartition { partition =>
>>>>>   val groupedPartition = partition.toList.grouped(3).toList
>>>>>   println("Grouped partition " + groupedPartition)
>>>>> }
>>>>>
>>>>> There are too many print statements with empty list at the top. Only
>>>>> the relevant partitions are at the bottom. Is there a way to control
>>>>> number of partitions?
>>>>>
>>>>> Regards,
>>>>> Noorul
>>>>>
>>>>> -
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks
>>>> Deepak
>>>> www.bigdatabig.com
>>>> www.keosha.net
>>>>
>>>
>>>
>>
>


Re: Spark Tuning Tool

2018-01-22 Thread lucas.g...@gmail.com
I'd be very interested in anything I can send to my analysts to assist them
with their troubleshooting / optimization... Of course our engineers would
appreciate it as well.

However we'd be way more interested if it was OSS.

Thanks!

Gary Lucas

On 22 January 2018 at 21:16, Holden Karau  wrote:

> That's very interesting, and might also get some interest on the dev@
> list if it was open source.
>
> On Tue, Jan 23, 2018 at 4:02 PM, Roger Marin 
> wrote:
>
>> I'd be very interested.
>>
>> On 23 Jan. 2018 4:01 pm, "Rohit Karlupia"  wrote:
>>
>>> Hi,
>>>
>>> I have been working on making the performance tuning of spark
>>> applications bit easier.  We have just released the beta version of the
>>> tool on Qubole.
>>>
>>> https://www.qubole.com/blog/introducing-quboles-spark-tuning-tool/
>>>
>>> This is not OSS yet but we would like to contribute it to OSS.  Fishing
>>> for some interest in the community if people find this work interesting and
>>> would like to try to it out.
>>>
>>> thanks,
>>> Rohit Karlupia
>>>
>>>
>>>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>


Re: spark.sql call takes far too long

2018-01-24 Thread lucas.g...@gmail.com
Hi Michael.

I haven't had this particular issue previously, but I have had other
performance issues.

Some questions which may help:

1. Have you checked the Spark Console?
2. Have you isolated the query in question, are you sure it's actually
where the slowdown occurs?
3. How much data are you talking about and how complex is the query?

Usually when debugging spark slowness issues it comes down to ineffective
data ingestion and / or a partition shuffle (and in some cases both).

That can all be seen from the console.

Good luck!

Gary Lucas

On 24 January 2018 at 04:16, Michael Shtelma  wrote:

> Hi all,
>
> I have a problem with the performance of the sparkSession.sql call. It
> takes up to a couple of seconds for me right now. I have a lot of
> generated temporary tables, which are registered within the session
> and also a lot of temporary data frames. Is it possible, that the
> analysis/resolve/analysis phases take far too long? Is there a way to
> figure out, what exactly takes too long?
>
> Does anybody have any ideas on this?
> Any assistance would be greatly appreciated!
>
> Thanks,
> Michael
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster

2018-03-21 Thread lucas.g...@gmail.com
Speaking from experience, if you're already operating a kubernetes
cluster.  Getting a spark workload operating there is nearly an order of
magnitude simpler than working with / around EMR.

That's not say EMR is excessively hard, just that Kubernetes is easier, all
the steps to getting your application deployed are well documented and
ultimately the whole process is more visible.

Also, thanks for the link Yinan!  I'll be investigating that project!

We have in the past used EMR for larger workloads and as soon as we
announced our users could run those workloads on our k8s cluster everyone
immediately moved their workloads over.  This was despite having Spark on
K8s still in an infant state.  No one has expressed interest in moving back.

G

On 21 March 2018 at 07:32, Gourav Sengupta 
wrote:

> Hi,
>
> just out of curiosity, but since it in AWS, is there any specific reason
> not to use EMR? Or any particular reason to use Kubernetes?
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Mar 21, 2018 at 2:47 AM, purna pradeep 
> wrote:
>
>> Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3
>> ,now i want to run spark-submit from AWS lambda function to k8s
>> master,would like to know if there is any REST interface to run Spark
>> submit on k8s Master
>
>
>


Re: Question on Spark-kubernetes integration

2018-03-02 Thread lucas.g...@gmail.com
Oh interesting, given that pyspark was working in spark on kub 2.2 I
assumed it would be part of what got merged.

Is there a roadmap in terms of when that may get merged up?

Thanks!



On 2 March 2018 at 21:32, Felix Cheung  wrote:

> That’s in the plan. We should be sharing a bit more about the roadmap in
> future releases shortly.
>
> In the mean time this is in the official documentation on what is coming:
> https://spark.apache.org/docs/latest/running-on-kubernetes.
> html#future-work
>
> This supports started as a fork of the Apache Spark project and this fork
> has dynamic scaling support you can check out here:
> https://apache-spark-on-k8s.github.io/userdocs/running-on-
> kubernetes.html#dynamic-executor-scaling
>
>
> --
> *From:* Lalwani, Jayesh 
> *Sent:* Friday, March 2, 2018 8:08:55 AM
> *To:* user@spark.apache.org
> *Subject:* Question on Spark-kubernetes integration
>
>
> Does the Resource scheduler support dynamic resource allocation? Are there
> any plans to add in the future?
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: Schema store for Parquet

2020-03-04 Thread lucas.g...@gmail.com
Or AWS glue catalog if you're in AWS

On Wed, 4 Mar 2020 at 10:35, Magnus Nilsson  wrote:

> Google hive metastore.
>
> On Wed, Mar 4, 2020 at 7:29 PM Ruijing Li  wrote:
>
>> Hi all,
>>
>> Has anyone explored efforts to have a centralized storage of schemas of
>> different parquet files? I know there is schema management for Avro, but
>> couldn’t find solutions for parquet schema management. Thanks!
>> --
>> Cheers,
>> Ruijing Li
>>
>