Re: structured streaming join of streaming dataframe with static dataframe performance

2022-08-04 Thread Koert Kuipers
thats good point about skewness and potential join optimizations. i will
try turning off all skew optimizations, and force a sort-merge-join, and
see if it then re-uses shuffle files on the static side.

unfortunately my static side is too large to broadcast. the streaming side
can be broadcasted indeed and this is faster but it still forces spark to
re-read (but not shuffle) the static side on every join, which in my case
still adds minutes to every microbatch.

On Thu, Aug 4, 2022 at 6:35 AM kant kodali  wrote:

> I suspect it is probably because the incoming rows when I joined with
> static frame can lead to variable degree of skewness over time and if so it
> is probably better to employ different join strategies at run time. But if
> you know your Dataset I believe you can just do broadcast join for your
> case!
>
> Its been a while since I used spark so you might want to wait for more
> authoritative response
>
> Sent from my iPhone
>
> On Jul 17, 2022, at 5:38 PM, Koert Kuipers  wrote:
>
> 
> i was surprised to find out that if a streaming dataframe is joined with a
> static dataframe, that the static dataframe is re-shuffled for every
> microbatch, which adds considerable overhead.
>
> wouldn't it make more sense to re-use the shuffle files?
>
> or if that is not possible then load the static dataframe into the
> statestore? this would turn the join into a lookup (in rocksdb)?
>
>
> CONFIDENTIALITY NOTICE: This electronic communication and any files
> transmitted with it are confidential, privileged and intended solely for
> the use of the individual or entity to whom they are addressed. If you are
> not the intended recipient, you are hereby notified that any disclosure,
> copying, distribution (electronic or otherwise) or forwarding of, or the
> taking of any action in reliance on the contents of this transmission is
> strictly prohibited. Please notify the sender immediately by e-mail if you
> have received this email by mistake and delete this email from your system.
>
> Is it necessary to print this email? If you care about the environment
> like we do, please refrain from printing emails. It helps to keep the
> environment forested and litter-free.
>
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


structured streaming join of streaming dataframe with static dataframe performance

2022-07-17 Thread Koert Kuipers
i was surprised to find out that if a streaming dataframe is joined with a
static dataframe, that the static dataframe is re-shuffled for every
microbatch, which adds considerable overhead.

wouldn't it make more sense to re-use the shuffle files?

or if that is not possible then load the static dataframe into the
statestore? this would turn the join into a lookup (in rocksdb)?

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: [DISCUSS] SPIP: Spark Connect - A client and server interface for Apache Spark.

2022-06-03 Thread Koert Kuipers
how would scala udfs be supported in this?

On Fri, Jun 3, 2022 at 1:52 PM Martin Grund
 wrote:

> Hi Everyone,
>
> We would like to start a discussion on the "Spark Connect" proposal.
> Please find the links below:
>
> *JIRA* - https://issues.apache.org/jira/browse/SPARK-39375
> *SPIP Document* -
> https://docs.google.com/document/d/1Mnl6jmGszixLW4KcJU5j9IgpG9-UabS0dcM6PM2XGDc/edit#heading=h.wmsrrfealhrj
>
> *Excerpt from the document: *
>
> We propose to extend Apache Spark by building on the DataFrame API and the
> underlying unresolved logical plans. The DataFrame API is widely used and
> makes it very easy to iteratively express complex logic. We will introduce
> Spark Connect, a remote option of the DataFrame API that separates the
> client from the Spark server. With Spark Connect, Spark will become
> decoupled, allowing for built-in remote connectivity: The decoupled client
> SDK can be used to run interactive data exploration and connect to the
> server for DataFrame operations.
>
> Spark Connect will benefit Spark developers in different ways: The
> decoupled architecture will result in improved stability, as clients are
> separated from the driver. From the Spark Connect client perspective, Spark
> will be (almost) versionless, and thus enable seamless upgradability, as
> server APIs can evolve without affecting the client API. The decoupled
> client-server architecture can be leveraged to build close integrations
> with local developer tooling. Finally, separating the client process from
> the Spark server process will improve Spark’s overall security posture by
> avoiding the tight coupling of the client inside the Spark runtime
> environment.
>
> Spark Connect will strengthen Spark’s position as the modern unified
> engine for large-scale data analytics and expand applicability to use cases
> and developers we could not reach with the current setup: Spark will become
> ubiquitously usable as the DataFrame API can be used with (almost) any
> programming language.
>
> We would like to start a discussion on the document and any feedback is
> welcome!
>
> Thanks a lot in advance,
> Martin
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: The draft of the Spark 3.3.0 release notes

2022-06-03 Thread Koert Kuipers
i thought SPARK-36837 didnt make it in? i see it in notes

On Fri, Jun 3, 2022 at 4:31 AM Maxim Gekk 
wrote:

> Hi All,
>
> I am preparing the release notes of Spark 3.3.0. Here is a draft document:
>
> https://docs.google.com/document/d/1gGySrLGvIK8bajKdGjTI_mDqk0-YPvHmPN64YjoWfOQ/edit?usp=sharing
>
> Please take a look and let me know if I missed any major changes or
> something.
>
> Maxim Gekk
>
> Software Engineer
>
> Databricks, Inc.
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: Scala 2.13 actual class used for Seq

2020-10-19 Thread Koert Kuipers
i rebuild master for Spark 2.12 and i see it also uses List instead of
WrappedArray. so the change is in master (compared to 3.0.1) and it is not
limited to Scala 2.13.
this might impact user programs somewhat? List has different performance
characteristics than WrappedArray... for starters it is not an IndexedSeq.


On Mon, Oct 19, 2020 at 8:24 AM Sean Owen  wrote:

> Scala 2.13 changed the typedef of Seq to an immutable.Seq, yes. So lots of
> things will now return an immutable Seq. Almost all code doesn't care what
> Seq it returns and we didn't change any of that in the code, so, this is
> just what we're getting as a 'default' from whatever operations produce the
> Seq. (But a user app expecting a Seq in 2.13 will still just work, as it
> will be expecting an immutable.Seq then)
>
> You're right that many things don't necessarily return a WrappedArray
> anymore (I think that doesn't exist anymore in 2.13? ArraySeq now?) so user
> apps may need to change for 2.13, but, there are N things that any 2.13 app
> would have to change.
>
> On Mon, Oct 19, 2020 at 12:29 AM Koert Kuipers  wrote:
>
>> i have gotten used to spark always returning a WrappedArray for Seq. at
>> some point i think i even read this was guaranteed to be the case. not sure
>> if it still is...
>>
>> in spark 3.0.1 with scala 2.12 i get a WrappedArray as expected:
>>
>> scala> val x = Seq((1,2),(1,3)).toDF
>> x: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
>>
>> scala>
>> x.groupBy("_1").agg(collect_list(col("_2")).as("_3")).withColumn("class_of_3",
>> udf{ (s: Seq[Int]) => s.getClass.toString }.apply(col("_3"))).show(false)
>> +---+--+-+
>> |_1 |_3|class_of_3   |
>> +---+--+-+
>> |1  |[2, 3]|class scala.collection.mutable.WrappedArray$ofRef|
>> +---+--+-+
>>
>> but when i build current master with scala 2.13 i get:
>>
>> scala> val x = Seq((1,2),(1,3)).toDF
>> warning: 1 deprecation (since 2.13.3); for details, enable `:setting
>> -deprecation' or `:replay -deprecation'
>> val x: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
>>
>> scala>
>> x.groupBy("_1").agg(collect_list(col("_2")).as("_3")).withColumn("class",
>> udf{ (s: Seq[Int]) => s.getClass.toString }.apply(col("_3"))).show(false)
>> +---+--+-+
>> |_1 |_3|class|
>> +---+--+-+
>> |1  |[2, 3]|class scala.collection.immutable.$colon$colon|
>> +---+--+-+
>>
>> i am curious if we are planning on returning immutable Seq going forward
>> (which is nice)? and if so is List the best choice? i was sort of guessing
>> it would be an immutable ArraySeq perhaps (given it provides efficient ways
>> to wrap an array and access the underlying array)?
>>
>> best
>>
>


Scala 2.13 actual class used for Seq

2020-10-18 Thread Koert Kuipers
i have gotten used to spark always returning a WrappedArray for Seq. at
some point i think i even read this was guaranteed to be the case. not sure
if it still is...

in spark 3.0.1 with scala 2.12 i get a WrappedArray as expected:

scala> val x = Seq((1,2),(1,3)).toDF
x: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

scala>
x.groupBy("_1").agg(collect_list(col("_2")).as("_3")).withColumn("class_of_3",
udf{ (s: Seq[Int]) => s.getClass.toString }.apply(col("_3"))).show(false)
+---+--+-+
|_1 |_3|class_of_3   |
+---+--+-+
|1  |[2, 3]|class scala.collection.mutable.WrappedArray$ofRef|
+---+--+-+

but when i build current master with scala 2.13 i get:

scala> val x = Seq((1,2),(1,3)).toDF
warning: 1 deprecation (since 2.13.3); for details, enable `:setting
-deprecation' or `:replay -deprecation'
val x: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

scala>
x.groupBy("_1").agg(collect_list(col("_2")).as("_3")).withColumn("class",
udf{ (s: Seq[Int]) => s.getClass.toString }.apply(col("_3"))).show(false)
+---+--+-+
|_1 |_3|class|
+---+--+-+
|1  |[2, 3]|class scala.collection.immutable.$colon$colon|
+---+--+-+

i am curious if we are planning on returning immutable Seq going forward
(which is nice)? and if so is List the best choice? i was sort of guessing
it would be an immutable ArraySeq perhaps (given it provides efficient ways
to wrap an array and access the underlying array)?

best


Re: Scala 3 support approach

2020-10-18 Thread Koert Kuipers
i think scala 3.0 will be able to use libraries built with Scala 2.13 (as
long as they dont use macros)

see:
https://www.scala-lang.org/2019/12/18/road-to-scala-3.html

On Sun, Oct 18, 2020 at 9:54 AM Sean Owen  wrote:

> Spark depends on a number of Scala libraries, so needs them all to support
> version X before Spark can. This only happened for 2.13 about 4-5 months
> ago. I wonder if even a fraction of the necessary libraries have 3.0
> support yet?
>
> It can be difficult to test and support multiple Scala versions
> simultaneously. 2.11 has already been dropped and 2.13 is coming, but it
> might be hard to have a code base that works for 2.12, 2.13, and 3.0.
>
> So one dependency could be, when can 2.12 be dropped? And with Spark
> supporting 2.13 only early next year, and user apps migrating over a year
> or more, it seems difficult to do that anytime soon.
>
> I think Spark 3 support is eventually desirable, so maybe the other way to
> resolve that is to show that Spark 3 support doesn't interfere much with
> maintenance of 2.12/2.13 support. I am a little bit skeptical of it, just
> because the 2.11->2.12 and 2.12->2.13 changes were fairly significant, let
> alone 2.13->3.0 I'm sure, but I don't know.
>
> That is, if we start to have to implement workarounds are parallel code
> trees and so on for 3.0 support, and if it can't be completed for a while
> to come because of downstream dependencies, then it may not be worth
> iterating in the code base yet or even considering.
>
> You can file an umbrella JIRA to track it, yes, with a possible target of
> Spark 4.0. Non-intrusive changes can go in anytime. We may not want to get
> into major ones until later.
>
> On Sat, Oct 17, 2020 at 8:49 PM gemelen  wrote:
>
>> Hi all!
>>
>> I'd like to ask for an opinion and discuss the next thing:
>> at this moment in general Spark could be built with Scala 2.11 and 2.12
>> (mostly), and close to the point to have support for Scala 2.13. On the
>> other hand, Scala 3 is going into the pre-release phase (with 3.0.0-M1
>> released at the beginning of October).
>>
>> Previously, support of the current Scala version by Spark was a bit
>> behind of desired state, dictated by all circumstances. To move things
>> differently with Scala 3 I'd like to contribute my efforts (and help others
>> if there would be any) to support it starting as soon as possible (ie to
>> have Spark build compiled with Scala 3 and to have release artifacts when
>> it would be possible).
>>
>> I suggest that it would require to add an experimental profile to the
>> build file so further changes to compile, test and run other tasks could be
>> done in incremental manner (with respect to compatibility with current code
>> for versions 2.12 and 2.13 and backporting where possible). I'd like to do
>> it that way since I do not represent any company, contribute in my own time
>> and thus cannot guarantee consistent time spent on this (so just in case of
>> anything such contribution would not be left in the fork repo).
>>
>> In fact, with recent changes to move Spark build to use the latest SBT,
>> such starting changes are pretty small on the SBT side (about 10 LOC) and I
>> was already able to see how build fails with Scala 3 compiler :)
>>
>> To summarize:
>> 1. Is this approach suitable for the project at this moment, so it would
>> be accepted and accounted for in the release schedule (in 2021 I assume)?
>> 2. how should it be filed, as an umbrella Jira ticket with minor tasks or
>> as a SPIP at first with more thorough analysis?
>>
>


Re: Apache Spark 3.1 Preparation Status (Oct. 2020)

2020-10-07 Thread Koert Kuipers
it seems to me with SPARK-20202 we are no longer planning to support
hadoop2 + hive 1.2. is that correct?

so basically spark 3.1 will no longer run on say CDH 5.x or HDP2.x with
hive?

my use case is building spark 3.1 and launching on these existing clusters
that are not managed by me. e.g. i do not use the spark version provided by
cloudera.
however there are workarounds for me (using older spark version to extract
out of hive, then switch to newer spark version) so i am not too worried
about this. just making sure i understand.

thanks

On Sat, Oct 3, 2020 at 8:17 PM Dongjoon Hyun 
wrote:

> Hi, All.
>
> As of today, master branch (Apache Spark 3.1.0) resolved
> 852+ JIRA issues and 606+ issues are 3.1.0-only patches.
> According to the 3.1.0 release window, branch-3.1 will be
> created on November 1st and enters QA period.
>
> Here are some notable updates I've been monitoring.
>
> *Language*
> 01. SPARK-25075 Support Scala 2.13
>   - Since SPARK-32926, Scala 2.13 build test has
> become a part of GitHub Action jobs.
>   - After SPARK-33044, Scala 2.13 test will be
> a part of Jenkins jobs.
> 02. SPARK-29909 Drop Python 2 and Python 3.4 and 3.5
> 03. SPARK-32082 Project Zen: Improving Python usability
>   - 7 of 16 issues are resolved.
> 04. SPARK-32073 Drop R < 3.5 support
>   - This is done for Spark 3.0.1 and 3.1.0.
>
> *Dependency*
> 05. SPARK-32058 Use Apache Hadoop 3.2.0 dependency
>   - This changes the default dist. for better cloud support
> 06. SPARK-32981 Remove hive-1.2 distribution
> 07. SPARK-20202 Remove references to org.spark-project.hive
>   - This will remove Hive 1.2.1 from source code
> 08. SPARK-29250 Upgrade to Hadoop 3.2.1 (WIP)
>
> *Core*
> 09. SPARK-27495 Support Stage level resource conf and scheduling
>   - 11 of 15 issues are resolved
> 10. SPARK-25299 Use remote storage for persisting shuffle data
>   - 8 of 14 issues are resolved
>
> *Resource Manager*
> 11. SPARK-33005 Kubernetes GA preparation
>   - It is on the way and we are waiting for more feedback.
>
> *SQL*
> 12. SPARK-30648/SPARK-32346 Support filters pushdown
>   to JSON/Avro
> 13. SPARK-32948/SPARK-32958 Add Json expression optimizer
> 14. SPARK-12312 Support JDBC Kerberos w/ keytab
>   - 11 of 17 issues are resolved
> 15. SPARK-27589 DSv2 was mostly completed in 3.0
>   and added more features in 3.1 but still we missed
>   - All built-in DataSource v2 write paths are disabled
> and v1 write is used instead.
>   - Support partition pruning with subqueries
>   - Support bucketing
>
> We still have one month before the feature freeze
> and starting QA. If you are working for 3.1,
> please consider the timeline and share your schedule
> with the Apache Spark community. For the other stuff,
> we can put it into 3.2 release scheduled in June 2021.
>
> Last not but least, I want to emphasize (7) once again.
> We need to remove the forked unofficial Hive eventually.
> Please let us know your reasons if you need to build
> from Apache Spark 3.1 source code for Hive 1.2.
>
> https://github.com/apache/spark/pull/29936
>
> As I wrote in the above PR description, for old releases,
> Apache Spark 2.4(LTS) and 3.0 (~2021.12) will provide
> Hive 1.2-based distribution.
>
> Bests,
> Dongjoon.
>


Re: [FYI] Removing `spark-3.1.0-bin-hadoop2.7-hive1.2.tgz` from Apache Spark 3.1 distribution

2020-10-07 Thread Koert Kuipers
i am a little confused about this. i assumed spark would no longer make a
distribution with hive 1.x, but the hive-1.2 profile remains.

yet i see the hive-1.2 profile has been removed from pom.xml?

On Wed, Sep 23, 2020 at 6:58 PM Dongjoon Hyun 
wrote:

> Hi, All.
>
> Since Apache Spark 3.0.0, Apache Hive 2.3.7 is the default
> Hive execution library. The forked Hive 1.2.1 library is not
> recommended because it's not maintained properly.
>
> In Apache Spark 3.1 on December 2020, we are going to
> remove it from our official distribution.
>
> https://github.com/apache/spark/pull/29856
> SPARK-32981 Remove hive-1.2/hadoop-2.7 from Apache Spark 3.1
> distribution
>
> Of course, the users still can build it from the source because the
> profile `hive-1.2` is still alive.
>
> Please let us know if you are going to build with the forked unofficial
> Hive 1.2.1 library still in Apache Spark 3.1. We want to listen to your
> pain-points before moving forward in this area. Eventually we will remove
> Hive 1.2 as a last piece of migration to Hive 2.3/Hadoop3/Java11+.
>
> Bests,
> Dongjoon.
>


Re: AQE effectiveness

2020-09-29 Thread Koert Kuipers
i have been doing tests with iterative algorithms that do caching/uncaching
at each iteration and i see improvements when i turn on AQE for cache.

now i am wondering... with an iterative algo using AQE it is true that the
output of every iteration can have a slightly different number of
partitions (which i observed), and this introduces extra shuffles in joints
between these outputs. so far this downside seems outweighed by the upsides
of AQE, so its not a big deal. but it would be rather simple to remove many
unnecessary shuffles and still retain all the benefits of AQE if we just
only slightly restricted AQEs choices for the number of partitions. for
example we could force it to use a power of 2? this way you get the
benefits of AQE but you dont have a frustrating situation where one
iteration has 67 partitions and the next has 68 partitions and then a join
introduces a shuffle.





On Fri, Aug 21, 2020 at 12:10 PM Maryann Xue 
wrote:

> It would break CachedTableSuite."A cached table preserves the partitioning
> and ordering of its cached SparkPlan" if AQE was turned on.
>
> Anyway, the chance of this outputPartitioning being useful is rather low
> and should not justify turning off AQE for SQL cache.
>
> On Thu, Aug 20, 2020 at 10:54 PM Koert Kuipers  wrote:
>
>> in our inhouse spark version i changed this without trouble and it didnt
>> even break any tests
>> just some minor changes in CacheManager it seems
>>
>> On Thu, Aug 20, 2020 at 1:12 PM Maryann Xue 
>> wrote:
>>
>>> No. The worst case of enabling AQE in cached data is not losing the
>>> opportunity of using/reusing the cache, but rather just an extra shuffle if
>>> the outputPartitioning happens to match without AQE and not match after
>>> AQE. The chance of this happening is rather low.
>>>
>>> On Thu, Aug 20, 2020 at 12:09 PM Koert Kuipers 
>>> wrote:
>>>
>>>> i see. it makes sense to maximize re-use of cached data. i didn't
>>>> realize we have two potentially conflicting goals here.
>>>>
>>>>
>>>> On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <
>>>> maryann@databricks.com> wrote:
>>>>
>>>>> AQE has been turned off deliberately so that the `outputPartitioning`
>>>>> of the cached relation won't be changed by AQE partition coalescing or 
>>>>> skew
>>>>> join optimization and the outputPartitioning can potentially be used by
>>>>> relations built on top of the cache.
>>>>>
>>>>> On a second thought, we should probably add a config there and enable
>>>>> AQE by default.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Maryann
>>>>>
>>>>> On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers 
>>>>> wrote:
>>>>>
>>>>>> we tend to have spark.sql.shuffle.partitions set very high by default
>>>>>> simply because some jobs need it to be high and it's easier to then just
>>>>>> set the default high instead of having people tune it manually per job. 
>>>>>> the
>>>>>> main downside is lots of part files which leads to pressure on the 
>>>>>> driver,
>>>>>> and dynamic allocation becomes troublesome if every aggregation requires
>>>>>> thousands of tasks... even the simplest aggregation on tiny small data 
>>>>>> will
>>>>>> demand all resources on the cluster.
>>>>>>
>>>>>> because of these issues AQE appeals a lot to me: by automatically
>>>>>> scaling the reducer partitions we avoid these issues. so we have AQE 
>>>>>> turned
>>>>>> on by default. every once in a while i scan through our spark AMs and 
>>>>>> logs
>>>>>> to see how it's doing. i mostly look for stages that have a number of 
>>>>>> tasks
>>>>>> equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
>>>>>> effective. unfortunately this seems to be the majority. i suspect it has 
>>>>>> to
>>>>>> do with caching/persisting which we use frequently. a simple reproduction
>>>>>> is below.
>>>>>>
>>>>>> any idea why caching/persisting would interfere with AQE?
>>>>>>
>>>>>> best, koert
>>>>>>
>>>>>> $ hadoop fs -text fruits.csv
>>>>>> fruit,color,quantity
>>>>>> apple,red,5
>>>>>> grape,blue,50
>>>>>> pear,green,3
>>>>>>
>>>>>> # works well using AQE, uses 1 to 3 tasks per job
>>>>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>>>>> spark.sql.adaptive.enabled=true
>>>>>> scala> val data = spark.read.format("csv").option("header",
>>>>>> true).load("fruits.csv").persist()
>>>>>> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>>>>>>
>>>>>> # does not work well using AQR, uses 200 tasks (e.g.
>>>>>> spark.sql.shuffle.partitions) for certain jobs. the only difference is 
>>>>>> when
>>>>>> persist is called.
>>>>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>>>>> spark.sql.adaptive.enabled=true
>>>>>> scala> val data = spark.read.format("csv").option("header",
>>>>>> true).load("fruits.csv").groupBy("fruit").count().persist()
>>>>>> scala> data.write.format("csv").save("out)
>>>>>>
>>>>>>


Re: AQE effectiveness

2020-08-20 Thread Koert Kuipers
i see. it makes sense to maximize re-use of cached data. i didn't realize
we have two potentially conflicting goals here.


On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue 
wrote:

> AQE has been turned off deliberately so that the `outputPartitioning` of
> the cached relation won't be changed by AQE partition coalescing or skew
> join optimization and the outputPartitioning can potentially be used by
> relations built on top of the cache.
>
> On a second thought, we should probably add a config there and enable AQE
> by default.
>
>
> Thanks,
> Maryann
>
> On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers  wrote:
>
>> we tend to have spark.sql.shuffle.partitions set very high by default
>> simply because some jobs need it to be high and it's easier to then just
>> set the default high instead of having people tune it manually per job. the
>> main downside is lots of part files which leads to pressure on the driver,
>> and dynamic allocation becomes troublesome if every aggregation requires
>> thousands of tasks... even the simplest aggregation on tiny small data will
>> demand all resources on the cluster.
>>
>> because of these issues AQE appeals a lot to me: by automatically scaling
>> the reducer partitions we avoid these issues. so we have AQE turned on by
>> default. every once in a while i scan through our spark AMs and logs to see
>> how it's doing. i mostly look for stages that have a number of tasks equal
>> to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
>> effective. unfortunately this seems to be the majority. i suspect it has to
>> do with caching/persisting which we use frequently. a simple reproduction
>> is below.
>>
>> any idea why caching/persisting would interfere with AQE?
>>
>> best, koert
>>
>> $ hadoop fs -text fruits.csv
>> fruit,color,quantity
>> apple,red,5
>> grape,blue,50
>> pear,green,3
>>
>> # works well using AQE, uses 1 to 3 tasks per job
>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>> spark.sql.adaptive.enabled=true
>> scala> val data = spark.read.format("csv").option("header",
>> true).load("fruits.csv").persist()
>> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>>
>> # does not work well using AQR, uses 200 tasks (e.g.
>> spark.sql.shuffle.partitions) for certain jobs. the only difference is when
>> persist is called.
>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>> spark.sql.adaptive.enabled=true
>> scala> val data = spark.read.format("csv").option("header",
>> true).load("fruits.csv").groupBy("fruit").count().persist()
>> scala> data.write.format("csv").save("out)
>>
>>


AQE effectiveness

2020-08-20 Thread Koert Kuipers
we tend to have spark.sql.shuffle.partitions set very high by default
simply because some jobs need it to be high and it's easier to then just
set the default high instead of having people tune it manually per job. the
main downside is lots of part files which leads to pressure on the driver,
and dynamic allocation becomes troublesome if every aggregation requires
thousands of tasks... even the simplest aggregation on tiny small data will
demand all resources on the cluster.

because of these issues AQE appeals a lot to me: by automatically scaling
the reducer partitions we avoid these issues. so we have AQE turned on by
default. every once in a while i scan through our spark AMs and logs to see
how it's doing. i mostly look for stages that have a number of tasks equal
to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
effective. unfortunately this seems to be the majority. i suspect it has to
do with caching/persisting which we use frequently. a simple reproduction
is below.

any idea why caching/persisting would interfere with AQE?

best, koert

$ hadoop fs -text fruits.csv
fruit,color,quantity
apple,red,5
grape,blue,50
pear,green,3

# works well using AQE, uses 1 to 3 tasks per job
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header",
true).load("fruits.csv").persist()
scala> data.groupBy("fruit").count().write.format("csv").save("out)

# does not work well using AQR, uses 200 tasks (e.g.
spark.sql.shuffle.partitions) for certain jobs. the only difference is when
persist is called.
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header",
true).load("fruits.csv").groupBy("fruit").count().persist()
scala> data.write.format("csv").save("out)


Re: 回复: [DISCUSS] Apache Spark 3.0.1 Release

2020-08-15 Thread Koert Kuipers
i noticed commit today that seems to prepare for 3.0.1-rc1:
commit 05144a5c10cd37ebdbb55fde37d677def49af11f
Author: Ruifeng Zheng 
Date:   Sat Aug 15 01:37:47 2020 +

Preparing Spark release v3.0.1-rc1

so i tried to build spark on that commit and i get failure in sql:

09:36:57.371 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in
stage 77.0 failed 1 times; aborting job
[info] - SPARK-28224: Aggregate sum big decimal overflow *** FAILED ***
(306 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 77.0 failed 1 times, most recent failure: Lost task 0.0 in
stage 77.0 (TID 197, 192.168.11.17, executor driver):
java.lang.ArithmeticException:
Decimal(expanded,0.246000,39,18}) cannot be
represented as Decimal(38, 18).
[info] at org.apache.spark.sql.types.Decimal.toPrecision(Decimal.scala:369)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregate_sum_0$(Unknown
Source)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doConsume_0$(Unknown
Source)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
Source)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
[info] at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[info] at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
[info] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
[info] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
[info] at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1804)
[info] at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1227)
[info] at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1227)
[info] at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2138)
[info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
[info] at org.apache.spark.scheduler.Task.run(Task.scala:127)
[info] at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
[info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
[info] at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
[info] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info] at java.lang.Thread.run(Thread.java:748)

[error] Failed tests:
[error] org.apache.spark.sql.DataFrameSuite

On Thu, Aug 13, 2020 at 8:19 PM Jason Moore
 wrote:

> Thank you so much!  Any update on getting the RC1 up for vote?
>
> Jason.
>
>
> --
> *From:* 郑瑞峰 
> *Sent:* Wednesday, 5 August 2020 12:54 PM
> *To:* Jason Moore ; Spark dev list <
> dev@spark.apache.org>
> *Subject:* 回复: [DISCUSS] Apache Spark 3.0.1 Release
>
> Hi all,
> I am going to prepare the realease of 3.0.1 RC1, with the help of Wenchen.
>
>
> -- 原始邮件 --
> *发件人:* "Jason Moore" ;
> *发送时间:* 2020年7月30日(星期四) 上午10:35
> *收件人:* "dev";
> *主题:* Re: [DISCUSS] Apache Spark 3.0.1 Release
>
> Hi all,
>
>
>
> Discussion around 3.0.1 seems to have trickled away.  What was blocking
> the release process kicking off?  I can see some unresolved bugs raised
> against 3.0.0, but conversely there were quite a few critical correctness
> fixes waiting to be released.
>
>
>
> Cheers,
>
> Jason.
>
>
>
> *From: *Takeshi Yamamuro 
> *Date: *Wednesday, 15 July 2020 at 9:00 am
> *To: *Shivaram Venkataraman 
> *Cc: *"dev@spark.apache.org" 
> *Subject: *Re: [DISCUSS] Apache Spark 3.0.1 Release
>
>
>
> > Just wanted to check if there are any blockers that we are still waiting
> for to start the new release process.
>
> I don't see any on-going blocker in my area.
>
> Thanks for the notification.
>
>
>
> Bests,
>
> Tkaeshi
>
>
>
> On Wed, Jul 15, 2020 at 4:03 AM Dongjoon Hyun 
> wrote:
>
> Hi, Yi.
>
>
>
> Could you explain why you think that is a blocker? For the given example
> from the JIRA description,
>
>
>
> spark.udf.register("key", udf((m: Map[String, String]) => m.keys.head.toInt))
>
> Seq(Map("1" -> "one", "2" -> "two")).toDF("a").createOrReplaceTempView("t")
>
> checkAnswer(sql("SELECT key(a) AS k FROM t GROUP BY key(a)"), Row(1) :: Nil)
>
>
>
> Apache Spark 3.0.0 seems to work like the following.
>
>
>
> scala> spark.version
>
> res0: String = 3.0.0
>
>
>
> scala> spark.udf.register("key", udf((m: Map[String, String]) =>
> m.keys.head.toInt))
>
> res1: org.apache.spark.sql.expressions.UserDefinedFunction =
> SparkUserDefinedFunction($Lambda$1958/948653928@5d6bed7b,IntegerType,List(Some(class[value[0]:
> map])),None,false,true)
>
>
>
> scala> Seq(Map("1" -> "one", "2" ->
> 

Re: [VOTE] Apache Spark 3.0 RC2

2020-05-22 Thread Koert Kuipers
i would like to point out that SPARK-27194 is a fault tolerance bug that
causes jobs to fail when any single task is retried. for us this is a major
headache because we have to keep restarting jobs (and explain that spark is
really fault tolerant generally, just not here).
https://issues.apache.org/jira/browse/SPARK-27194
this is not a regression and its not a blocker but if it could make it into
spark 3.0.0 that would be a win i think. pullreq is waiting for review.
thanks!
best, koert

On Thu, May 21, 2020 at 11:06 PM Jungtaek Lim 
wrote:

> Looks like there're new blocker issues newly figured out.
>
> * https://issues.apache.org/jira/browse/SPARK-31786
> * https://issues.apache.org/jira/browse/SPARK-31761 (not yet marked as
> blocker but according to JIRA comment it's a regression issue as well as
> correctness issue IMHO)
>
> Let's collect the list of blocker issues so that RC3 won't miss them.
>
> On Thu, May 21, 2020 at 2:12 AM Ryan Blue 
> wrote:
>
>> Okay, I took a look at the PR and I think it should be okay. The new
>> classes are unfortunately public, but are in catalyst which is considered
>> private. So this is the approach we discussed.
>>
>> I'm fine with the commit, other than the fact that it violated ASF norms
>>  to commit without
>> waiting for a review.
>>
>> On Wed, May 20, 2020 at 10:00 AM Ryan Blue  wrote:
>>
>>> Why was https://github.com/apache/spark/pull/28523 merged with a -1? We
>>> discussed this months ago and concluded that it was a bad idea to introduce
>>> a new v2 API that cannot have reliable behavior across sources.
>>>
>>> The last time I checked that PR, the approach I discussed with Tathagata
>>> was to not add update mode to DSv2. Instead, Tathagata gave a couple of
>>> reasonable options to avoid it. Why were those not done?
>>>
>>> This is the second time this year that a PR with a -1 was merged. Does
>>> the Spark community not follow the convention to build consensus before
>>> merging changes?
>>>
>>> On Wed, May 20, 2020 at 12:13 AM Wenchen Fan 
>>> wrote:
>>>
 Seems the priority of SPARK-31706 is incorrectly marked, and it's a
 blocker now. The fix was merged just a few hours ago.

 This should be a -1 for RC2.

 On Wed, May 20, 2020 at 2:42 PM rickestcode <
 matthias.harder...@gmail.com> wrote:

> +1
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: spark lacks fault tolerance with dynamic partition overwrite

2020-04-07 Thread Koert Kuipers
ah ok i was not aware of that jira issue. i will follow the progress there.
thanks for letting me known

On Tue, Apr 7, 2020 at 11:20 AM wuyi  wrote:

> Hi,  Koert,
>
> The community is back to this issue to recently and there's already a fix
> https://github.com/apache/spark/pull/26339 for it.
>
> You can track and review it there.
>
> Best,
>
> Yi Wu
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


spark lacks fault tolerance with dynamic partition overwrite

2020-04-02 Thread Koert Kuipers
i wanted to highlight here the issue we are facing with dynamic partition
overwrite.

it seems that any tasks that writes to disk using this feature and that
need to be retried fails upon retry, leading to a failure for the entire
job.

we have seen this issue show up with preemption (task gets killed by
pre-emption, and when it gets rescheduled it fails consistently). it can
also show up if a hardware issue causes your task to fail, or if you have
speculative execution enabled.

relevant jiras are SPARK-30320 and SPARK-29302

this affects spark 2.4.x and spark 3.0.0-SNAPSHOT
writing to hive does not seem to be impacted.

best,
koert


Re: Use Hadoop-3.2 as a default Hadoop profile in 3.0.0?

2019-11-04 Thread Koert Kuipers
i get that cdh and hdp backport a lot and in that way left 2.7 behind. but
they kept the public apis stable at the 2.7 level, because thats kind of
the point. arent those the hadoop apis spark uses?

On Mon, Nov 4, 2019 at 10:07 AM Steve Loughran 
wrote:

>
>
> On Mon, Nov 4, 2019 at 12:39 AM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> On Fri, Nov 1, 2019 at 8:41 AM Steve Loughran 
>> wrote:
>>
>>> It would be really good if the spark distributions shipped with later
>>> versions of the hadoop artifacts.
>>>
>>
>> I second this. If we need to keep a Hadoop 2.x profile around, why not
>> make it Hadoop 2.8 or something newer?
>>
>
> go for 2.9
>
>>
>> Koert Kuipers  wrote:
>>
>>> given that latest hdp 2.x is still hadoop 2.7 bumping hadoop 2 profile
>>> to latest would probably be an issue for us.
>>
>>
>> When was the last time HDP 2.x bumped their minor version of Hadoop? Do
>> we want to wait for them to bump to Hadoop 2.8 before we do the same?
>>
>
> The internal builds of CDH and HDP are not those of ASF 2.7.x. A really
> large proportion of the later branch-2 patches are backported. 2,7 was left
> behind a long time ago
>
>
>
>


Re: Use Hadoop-3.2 as a default Hadoop profile in 3.0.0?

2019-11-02 Thread Koert Kuipers
yes i am not against hadoop 3 becoming the default. i was just questioning
the statement that we are close to dropping support for hadoop 2.

we build our own spark releases that we deploy on the clusters of our
clients. these clusters are hdp 2.x, cdh 5, emr, dataproc, etc.

i am aware that hadoop 2.6 profile was dropped and we are handling this
in-house.

given that latest hdp 2.x is still hadoop 2.7 bumping hadoop 2 profile to
latest would probably be an issue for us.

On Sat, Nov 2, 2019, 15:47 Dongjoon Hyun  wrote:

> Hi, Koert.
>
> Could you be more specific to your Hadoop version requirement?
>
> Although we will have Hadoop 2.7 profile, Hadoop 2.6 and older support is
> officially already dropped in Apache Spark 3.0.0. We can not give you the
> answer for Hadoop 2.6 and older version clusters because we are not testing
> at all.
>
> Also, Steve already pointed out that Hadoop 2.7 is also EOL. According to
> his advice, we might need to upgrade our Hadoop 2.7 profile to the latest
> 2.x. I'm wondering you are against on that because of Hadoop 2.6 or older
> version support.
>
> BTW, I'm the one of the users of Hadoop 3.x clusters. It's used already
> and we are migrating more. Apache Spark 3.0 will arrive 2020 (not today).
> We need to consider that, too. Do you have any migration plan in 2020?
>
> In short, for the clusters using Hadoop 2.6 and older versions, Apache
> Spark 2.4 is supported as a LTS version. You can get the bug fixes. For
> Hadoop 2.7, Apache Spark 3.0 will have the profile and the binary release.
> Making Hadoop 3.2 profile as a default is irrelevant to that.
>
> Bests,
> Dongjoon.
>
>
> On Sat, Nov 2, 2019 at 09:35 Koert Kuipers  wrote:
>
>> i dont see how we can be close to the point where we dont need to support
>> hadoop 2.x. this does not agree with the reality from my perspective, which
>> is that all our clients are on hadoop 2.x. not a single one is on hadoop
>> 3.x currently. this includes deployments of cloudera distros, hortonworks
>> distros, and cloud distros like emr and dataproc.
>>
>> forcing us to be on older spark versions would be unfortunate for us, and
>> also bad for the community (as deployments like ours help find bugs in
>> spark).
>>
>> On Mon, Oct 28, 2019 at 3:51 PM Sean Owen  wrote:
>>
>>> I'm OK with that, but don't have a strong opinion nor info about the
>>> implications.
>>> That said my guess is we're close to the point where we don't need to
>>> support Hadoop 2.x anyway, so, yeah.
>>>
>>> On Mon, Oct 28, 2019 at 2:33 PM Dongjoon Hyun 
>>> wrote:
>>> >
>>> > Hi, All.
>>> >
>>> > There was a discussion on publishing artifacts built with Hadoop 3 .
>>> > But, we are still publishing with Hadoop 2.7.3 and `3.0-preview` will
>>> be the same because we didn't change anything yet.
>>> >
>>> > Technically, we need to change two places for publishing.
>>> >
>>> > 1. Jenkins Snapshot Publishing
>>> >
>>> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20Packaging/job/spark-master-maven-snapshots/
>>> >
>>> > 2. Release Snapshot/Release Publishing
>>> >
>>> https://github.com/apache/spark/blob/master/dev/create-release/release-build.sh
>>> >
>>> > To minimize the change, we need to switch our default Hadoop profile.
>>> >
>>> > Currently, the default is `hadoop-2.7 (2.7.4)` profile and `hadoop-3.2
>>> (3.2.0)` is optional.
>>> > We had better use `hadoop-3.2` profile by default and `hadoop-2.7`
>>> optionally.
>>> >
>>> > Note that this means we use Hive 2.3.6 by default. Only `hadoop-2.7`
>>> distribution will use `Hive 1.2.1` like Apache Spark 2.4.x.
>>> >
>>> > Bests,
>>> > Dongjoon.
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>


Re: Use Hadoop-3.2 as a default Hadoop profile in 3.0.0?

2019-11-02 Thread Koert Kuipers
i dont see how we can be close to the point where we dont need to support
hadoop 2.x. this does not agree with the reality from my perspective, which
is that all our clients are on hadoop 2.x. not a single one is on hadoop
3.x currently. this includes deployments of cloudera distros, hortonworks
distros, and cloud distros like emr and dataproc.

forcing us to be on older spark versions would be unfortunate for us, and
also bad for the community (as deployments like ours help find bugs in
spark).

On Mon, Oct 28, 2019 at 3:51 PM Sean Owen  wrote:

> I'm OK with that, but don't have a strong opinion nor info about the
> implications.
> That said my guess is we're close to the point where we don't need to
> support Hadoop 2.x anyway, so, yeah.
>
> On Mon, Oct 28, 2019 at 2:33 PM Dongjoon Hyun 
> wrote:
> >
> > Hi, All.
> >
> > There was a discussion on publishing artifacts built with Hadoop 3 .
> > But, we are still publishing with Hadoop 2.7.3 and `3.0-preview` will be
> the same because we didn't change anything yet.
> >
> > Technically, we need to change two places for publishing.
> >
> > 1. Jenkins Snapshot Publishing
> >
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20Packaging/job/spark-master-maven-snapshots/
> >
> > 2. Release Snapshot/Release Publishing
> >
> https://github.com/apache/spark/blob/master/dev/create-release/release-build.sh
> >
> > To minimize the change, we need to switch our default Hadoop profile.
> >
> > Currently, the default is `hadoop-2.7 (2.7.4)` profile and `hadoop-3.2
> (3.2.0)` is optional.
> > We had better use `hadoop-3.2` profile by default and `hadoop-2.7`
> optionally.
> >
> > Note that this means we use Hive 2.3.6 by default. Only `hadoop-2.7`
> distribution will use `Hive 1.2.1` like Apache Spark 2.4.x.
> >
> > Bests,
> > Dongjoon.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: concurrent writes with dynamic partition overwrite mode

2019-09-01 Thread Koert Kuipers
https://issues.apache.org/jira/browse/SPARK-28945

On Sun, Sep 1, 2019 at 2:53 PM Koert Kuipers  wrote:

> hi,
> i am struggling to understand if concurrent writes to same basedir but
> different partitions are save with file sources such as parquet.
>
> i tested this in spark 2.4 and spark 3.0.0-SNAPSHOT with real concurrent
> jobs on hdfs and it seemed to work fine. admittedly this was a rather
> limited test.
> as the jobs are running i see on hdfs:
>
> drwxr-xr-x   - koert koert  0 2019-08-29 18:18
> out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
> Found 1 items
> drwxr-xr-x   - koert koert  0 2019-08-29 18:18
> out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
> Found 1 items
> drwxr-xr-x   - koert koert  0 2019-08-29 18:17 out/_temporary/0
>
> it seems each job has its own temporary directory it writes to, set to
> ".spark-staging-" + jobId
> this is consistent with the stagingDir i found in
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, and i can find
> it being used specifically for dynamic partition overwrite mode.
> so it thought with this i am all set.
> i dont really know what this _temporary/0 is for but it did not seem to
> get in the way.
>
> but then i added some unit tests that also do concurrent writes to
> different partitions with dynamic partition overwrite mode (these test are
> much more rigorous than my ad-hoc tests on hdfs), and now i see errors like
> this:
>
> java.io.FileNotFoundException: File file:/some/base/dir/_temporary/0 does
> not exist
> at
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:376)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:570)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
> at
> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
> at
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105)
> at
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:330)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:314)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
> at com.tresata.spark.sql.source.ParquetSource.writeBatchImpl
>
> so this seems to hint that the issue is with that _temporary/0. why is it
> trying to do a listStatus on this _temporary/0?
> did i always have this issue and was just lucky enough to not run into it
> on hdfs, or is it specific to RawLocalFileSystem?
>
> thanks!
> koert
>


concurrent writes with dynamic partition overwrite mode

2019-09-01 Thread Koert Kuipers
hi,
i am struggling to understand if concurrent writes to same basedir but
different partitions are save with file sources such as parquet.

i tested this in spark 2.4 and spark 3.0.0-SNAPSHOT with real concurrent
jobs on hdfs and it seemed to work fine. admittedly this was a rather
limited test.
as the jobs are running i see on hdfs:

drwxr-xr-x   - koert koert  0 2019-08-29 18:18
out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
Found 1 items
drwxr-xr-x   - koert koert  0 2019-08-29 18:18
out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
Found 1 items
drwxr-xr-x   - koert koert  0 2019-08-29 18:17 out/_temporary/0

it seems each job has its own temporary directory it writes to, set to
".spark-staging-" + jobId
this is consistent with the stagingDir i found in
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, and i can find
it being used specifically for dynamic partition overwrite mode.
so it thought with this i am all set.
i dont really know what this _temporary/0 is for but it did not seem to get
in the way.

but then i added some unit tests that also do concurrent writes to
different partitions with dynamic partition overwrite mode (these test are
much more rigorous than my ad-hoc tests on hdfs), and now i see errors like
this:

java.io.FileNotFoundException: File file:/some/base/dir/_temporary/0 does
not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:376)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:570)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:330)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:314)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
at com.tresata.spark.sql.source.ParquetSource.writeBatchImpl

so this seems to hint that the issue is with that _temporary/0. why is it
trying to do a listStatus on this _temporary/0?
did i always have this issue and was just lucky enough to not run into it
on hdfs, or is it specific to RawLocalFileSystem?

thanks!
koert


Re: Hadoop version(s) compatible with spark-2.4.3-bin-without-hadoop-scala-2.12

2019-05-20 Thread Koert Kuipers
its somewhat weird because avro-mapred-1.8.2-hadoop2.jar is included in the
hadoop-provided distro, but avro-1.8.2.jar is not. i tried to fix it but i
am not too familiar with the pom file.

regarding jline you only run into this if you use spark-shell (and it isnt
always reproducible it seems). see SPARK-25783
<https://issues.apache.org/jira/browse/SPARK-25783>
best,
koert




On Mon, May 20, 2019 at 5:43 PM Sean Owen  wrote:

> Re: 1), I think we tried to fix that on the build side and it requires
> flags that not all tar versions (i.e. OS X) have. But that's
> tangential.
>
> I think the Avro + Parquet dependency situation is generally
> problematic -- see JIRA for some details. But yes I'm not surprised if
> Spark has a different version from Hadoop 2.7.x and that would cause
> problems -- if using Avro. I'm not sure the mistake is that the JARs
> are missing, as I think this is supposed to be a 'provided'
> dependency, but I haven't looked into it. If there's any easy obvious
> correction to be made there, by all means.
>
> Not sure what the deal is with jline... I'd expect that's in the
> "hadoop-provided" distro? That one may be a real issue if it's
> considered provided but isn't used that way.
>
>
> On Mon, May 20, 2019 at 4:15 PM Koert Kuipers  wrote:
> >
> > we run it without issues on hadoop 2.6 - 2.8 on top of my head.
> >
> > we however do some post-processing on the tarball:
> > 1) we fix the ownership of the files inside the tar.gz file (should be
> uid/gid 0/0, otherwise untarring by root can lead to ownership by unknown
> user).
> > 2) add avro-1.8.2.jar and jline-2.14.6.jar to jars folder. i believe
> these jars missing in provided profile is simply a mistake.
> >
> > best,
> > koert
> >
> > On Mon, May 20, 2019 at 3:37 PM Michael Heuer  wrote:
> >>
> >> Hello,
> >>
> >> Which Hadoop version or versions are compatible with Spark 2.4.3 and
> Scala 2.12?
> >>
> >> The binary distribution spark-2.4.3-bin-without-hadoop-scala-2.12.tgz
> is missing avro-1.8.2.jar, so when attempting to run with Hadoop 2.7.7
> there are classpath conflicts at runtime, as Hadoop 2.7.7 includes
> avro-1.7.4.jar.
> >>
> >> https://issues.apache.org/jira/browse/SPARK-27781
> >>
> >>michael
>


Re: Hadoop version(s) compatible with spark-2.4.3-bin-without-hadoop-scala-2.12

2019-05-20 Thread Koert Kuipers
we run it without issues on hadoop 2.6 - 2.8 on top of my head.

we however do some post-processing on the tarball:
1) we fix the ownership of the files inside the tar.gz file (should be
uid/gid 0/0, otherwise untarring by root can lead to ownership by unknown
user).
2) add avro-1.8.2.jar and jline-2.14.6.jar to jars folder. i believe these
jars missing in provided profile is simply a mistake.

best,
koert

On Mon, May 20, 2019 at 3:37 PM Michael Heuer  wrote:

> Hello,
>
> Which Hadoop version or versions are compatible with Spark 2.4.3 and Scala
> 2.12?
>
> The binary distribution spark-2.4.3-bin-without-hadoop-scala-2.12.tgz is
> missing avro-1.8.2.jar, so when attempting to run with Hadoop 2.7.7 there
> are classpath conflicts at runtime, as Hadoop 2.7.7 includes avro-1.7.4.jar.
>
> https://issues.apache.org/jira/browse/SPARK-27781
>
>michael
>


Re: Is there value in publishing nightly snapshots?

2019-04-16 Thread Koert Kuipers
we have used it at times to detect any breaking changes, since it allows us
to run out internal unit tests against spark snapshot binaries

but we can also build these snapshots in-house if you want to turn it off

On Tue, Apr 16, 2019 at 9:29 AM Sean Owen  wrote:

> I noticed recently ...
>
> https://github.com/apache/spark-website/pull/194/files#diff-d95d573366135f01d4fbae2d64522500R466
>
> ... that we stopped publishing nightly releases a long while ago. That's
> fine.
>
> What about turning off the job that builds -SNAPSHOTs of the artifacts
> each night? does anyone use that? It's no big deal to leave it on, but
> I recall it is always a little bit controversial to publish binaries
> from the project that aren't PMC releases (it is allowed if only
> something developers would find and understand is not an official
> release).
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Do you use single-quote syntax for the DataFrame API?

2019-03-31 Thread Koert Kuipers
i don't care much about the symbol class but i find 'a much easier on the
eye than $"a" or "a" and we use it extensively as such in many DSLs
including spark.
so its the syntax i would like to preserve not the class, which seems to be
the opposite of what they are suggesting.





On Sun, Mar 31, 2019 at 10:07 AM Rubén Berenguel 
wrote:

> I favour using either $”foo” or columnar expressions, but know of several
> developers who prefer single quote syntax and consider it a better practice.
>
> R
>
> On 31 March 2019 at 15:15:00, Sean Owen (sro...@apache.org) wrote:
>
>> FWIW I use "foo" in Pyspark or col("foo") where necessary, and $"foo" in
>> Scala
>>
>> On Sun, Mar 31, 2019 at 1:58 AM Reynold Xin  wrote:
>>
>>> As part of evolving the Scala language, the Scala team is considering
>>> removing single-quote syntax for representing symbols. Single-quote syntax
>>> is one of the ways to represent a column in Spark's DataFrame API. While I
>>> personally don't use them (I prefer just using strings for column names, or
>>> using expr function), I see them used quite a lot by other people's code,
>>> e.g.
>>>
>>> df.select('id, 'name).show()
>>>
>>> I want to bring this to more people's attention, in case they are
>>> depending on this. The discussion thread is:
>>> https://contributors.scala-lang.org/t/proposal-to-deprecate-and-remove-symbol-literals/2953
>>>
>>>
>>>
>>>


Re: [DISCUSS] Upgrade built-in Hive to 2.3.4

2019-02-01 Thread Koert Kuipers
introducing hive serdes in sql core sounds a bit like a step back to me.
how can you build spark without hive support if there are imports for org.
apache.hadoop.hive.serde2 in sql core? are these imports very limited in
scope (and not suck all of hive into it)?

On Fri, Feb 1, 2019 at 3:03 PM Felix Cheung 
wrote:

> What’s the update and next step on this?
>
> We have real users getting blocked by this issue.
>
>
> --
> *From:* Xiao Li 
> *Sent:* Wednesday, January 16, 2019 9:37 AM
> *To:* Ryan Blue
> *Cc:* Marcelo Vanzin; Hyukjin Kwon; Sean Owen; Felix Cheung; Yuming Wang;
> dev
> *Subject:* Re: [DISCUSS] Upgrade built-in Hive to 2.3.4
>
> Thanks for your feedbacks!
>
> Working with Yuming to reduce the risk of stability and quality. Will keep
> you posted when the proposal is ready.
>
> Cheers,
>
> Xiao
>
> Ryan Blue  于2019年1月16日周三 上午9:27写道:
>
>> +1 for what Marcelo and Hyukjin said.
>>
>> In particular, I agree that we can't expect Hive to release a version
>> that is now more than 3 years old just to solve a problem for Spark. Maybe
>> that would have been a reasonable ask instead of publishing a fork years
>> ago, but I think this is now Spark's problem.
>>
>> On Tue, Jan 15, 2019 at 9:02 PM Marcelo Vanzin 
>> wrote:
>>
>>> +1 to that. HIVE-16391 by itself means we're giving up things like
>>> Hadoop 3, and we're also putting the burden on the Hive folks to fix a
>>> problem that we created.
>>>
>>> The current PR is basically a Spark-side fix for that bug. It does
>>> mean also upgrading Hive (which gives us Hadoop 3, yay!), but I think
>>> it's really the right path to take here.
>>>
>>> On Tue, Jan 15, 2019 at 6:32 PM Hyukjin Kwon 
>>> wrote:
>>> >
>>> > Resolving HIVE-16391 means Hive to release 1.2.x that contains the
>>> fixes of our Hive fork (correct me if I am mistaken).
>>> >
>>> > Just to be honest by myself and as a personal opinion, that basically
>>> says Hive to take care of Spark's dependency.
>>> > Hive looks going ahead for 3.1.x and no one would use the newer
>>> release of 1.2.x. In practice, Spark doesn't make a release 1.6.x anymore
>>> for instance,
>>> >
>>> > Frankly, my impression was that it's, honestly, our mistake to fix.
>>> Since Spark community is big enough, I was thinking we should try to fix it
>>> by ourselves first.
>>> > I am not saying upgrading is the only way to get through this but I
>>> think we should at least try first, and see what's next.
>>> >
>>> > It does, yes, sound more risky to upgrade it in our side but I think
>>> it's worth to check and try it and see if it's possible.
>>> > I think this is a standard approach to upgrade the dependency than
>>> using the fork or letting Hive side to release another 1.2.x.
>>> >
>>> > If we fail to upgrade it for critical or inevitable reasons somehow,
>>> yes, we could find an alternative but that basically means
>>> > we're going to stay in 1.2.x for, at least, a long time (say .. until
>>> Spark 4.0.0?).
>>> >
>>> > I know somehow it happened to be sensitive but to be just literally
>>> honest to myself, I think we should make a try.
>>> >
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: [VOTE] SPARK 2.4.0 (RC3)

2018-10-19 Thread Koert Kuipers
i deployed 2.4.0 RC3 on our dev cluster and ran into issue with spark shell
and jline. there is probably a simple workaround so this is not a serious
issue but just wanted to let you know.

https://issues.apache.org/jira/browse/SPARK-25783

On Mon, Oct 15, 2018 at 4:59 PM Imran Rashid 
wrote:

> I just discovered https://issues.apache.org/jira/browse/SPARK-25738 with
> some more testing.  I only marked it as critical, but seems pretty bad --
> I'll defer to others opinion
>
> On Sat, Oct 13, 2018 at 4:15 PM Dongjoon Hyun 
> wrote:
>
>> Yes. From my side, it's -1 for RC3.
>>
>> Bests,
>> Dongjoon.
>>
>> On Sat, Oct 13, 2018 at 1:24 PM Holden Karau 
>> wrote:
>>
>>> So if it's a blocker would you think this should be a -1?
>>>
>>> On Fri, Oct 12, 2018 at 3:52 PM Dongjoon Hyun 
>>> wrote:
>>>
 Hi, Holden.

 Since that's a performance at 2.4.0, I marked as `Blocker` four days
 ago.

 Bests,
 Dongjoon.


 On Fri, Oct 12, 2018 at 11:45 AM Holden Karau 
 wrote:

> Following up I just wanted to make sure this new blocker that Dongjoon
> designated is surfaced -
> https://jira.apache.org/jira/browse/SPARK-25579?filter=12340409=affectedVersion%20%3D%202.4.0%20AND%20cf%5B12310320%5D%20is%20EMPTY%20AND%20project%20%3D%20spark%20AND%20(status%20%3D%20%22In%20Progress%22%20OR%20resolution%20%3D%20Unresolved)%20AND%20priority%20%3D%20Blocker
>
> On Thu, Oct 11, 2018 at 2:05 PM Xiao Li  wrote:
>
>> -1. We have two correctness bugs:
>> https://issues.apache.org/jira/browse/SPARK-25714 and
>> https://issues.apache.org/jira/browse/SPARK-25708.
>>
>> Let us fix all the three issues in ScalaUDF, as mentioned by Sean.
>>
>> Xiao
>>
>>
>> Sean Owen  于2018年10月11日周四 上午9:04写道:
>>
>>> This is a legitimate question about the behavior of ScalaUDF after
>>> the
>>> change to support 2.12:
>>> https://github.com/apache/spark/pull/22259#discussion_r224295469
>>> Not quite a blocker I think, but a potential gotcha we definitely
>>> need
>>> to highlight in release notes. There may be an argument for changing
>>> ScalaUDF again before the release. Have a look, anyone familiar with
>>> catalyst.
>>> On Wed, Oct 10, 2018 at 3:00 PM Sean Owen  wrote:
>>> >
>>> > +1. I tested the source build against Scala 2.12 and common build
>>> > profiles. License and sigs look OK.
>>> >
>>> > No blockers; one critical:
>>> >
>>> > SPARK-25378 ArrayData.toArray(StringType) assume UTF8String in 2.4
>>> >
>>> > I think this one is "won't fix" though? not trying to restore the
>>> behavior?
>>> >
>>> > Other items open for 2.4.0:
>>> >
>>> > SPARK-25347 Document image data source in doc site
>>> > SPARK-25584 Document libsvm data source in doc site
>>> > SPARK-25179 Document the features that require Pyarrow 0.10
>>> > SPARK-25507 Update documents for the new features in 2.4 release
>>> > SPARK-25346 Document Spark builtin data sources
>>> > SPARK-24464 Unit tests for MLlib's Instrumentation
>>> > SPARK-23197 Flaky test:
>>> spark.streaming.ReceiverSuite."receiver_life_cycle"
>>> > SPARK-22809 pyspark is sensitive to imports with dots
>>> > SPARK-21030 extend hint syntax to support any expression for
>>> Python and R
>>> >
>>> > Anyone know enough to close or retarget them? they don't look
>>> critical
>>> > for 2.4, SPARK-25507 has no content, itself. SPARK-25179 "Document
>>> the
>>> > features that require Pyarrow 0.10" however sounds like it could
>>> have
>>> > been important for 2.4? if not a blocker.
>>> >
>>> > PS I don't think that SPARK-25150 is an issue; see JIRA. At least
>>> > there is some ongoing discussion there.
>>> >
>>> > I am evaluating
>>> > https://github.com/apache/spark/pull/22259#discussion_r224252642
>>> right
>>> > now.
>>> >
>>> >
>>> > On Wed, Oct 10, 2018 at 9:47 AM Wenchen Fan 
>>> wrote:
>>> > >
>>> > > Please vote on releasing the following candidate as Apache Spark
>>> version 2.4.0.
>>> > >
>>> > > The vote is open until October 1 PST and passes if a majority +1
>>> PMC votes are cast, with
>>> > > a minimum of 3 +1 votes.
>>> > >
>>> > > [ ] +1 Release this package as Apache Spark 2.4.0
>>> > > [ ] -1 Do not release this package because ...
>>> > >
>>> > > To learn more about Apache Spark, please see
>>> http://spark.apache.org/
>>> > >
>>> > > The tag to be voted on is v2.4.0-rc3 (commit
>>> 8e4a99bd201b9204fec52580f19ae70a229ed94e):
>>> > > https://github.com/apache/spark/tree/v2.4.0-rc3
>>> > >
>>> > > The release files, including signatures, digests, etc. can be
>>> found at:
>>> > > https://dist.apache.org/repos/dist/dev/spark/v2.4.0-rc3-bin/
>>> > >
>>> > > Signatures used for Spark RCs can be found 

Re: Coalesce behaviour

2018-10-15 Thread Koert Kuipers
i realize it is unlikely all data will be local to tasks, so placement will
not be optimal and there will be some network traffic, but is this the same
as a shuffle?

in CoalesceRDD it shows a NarrowDependency, which i thought meant it could
be implemented without a shuffle.

On Mon, Oct 15, 2018 at 2:49 AM Jörn Franke  wrote:

> This is not fully correct. If you have less files then you need to move
> some data to some other nodes, because not all the data is there for
> writing (even the case for the same node, but then it is easier from a
> network perspective). Hence a shuffling is needed.
>
>
> Am 15.10.2018 um 05:04 schrieb Koert Kuipers :
>
> sure, i understand currently the workaround is to add a shuffle. but
> that's just a workaround, not a satisfactory solution: we shouldn't have to
> introduce another shuffle (an expensive operation) just to reduce the
> number of files.
>
> logically all you need is a map-phase with less tasks after the reduce
> phase with many tasks to reduce the number of files, but there is currently
> no way to express this in spark. it seems the map operation always gets
> tagged on to the end of the previous reduce operation, which is generally a
> reasonable optimization, but not here since it causes the tasks for the
> reduce to go down which is unacceptable.
>
> On Sun, Oct 14, 2018 at 10:06 PM Wenchen Fan  wrote:
>
>> You have a heavy workload, you want to run it with many tasks for better
>> performance and stability(no OMM), but you also want to run it with few
>> tasks to avoid too many small files. The reality is, mostly you can't reach
>> these 2 goals together, they conflict with each other. The solution I can
>> think of is to sacrifice performance a little: run the workload with many
>> tasks at first, and then merge the many small files. Generally this is how
>> `coalesce(n, shuffle = true)` does.
>>
>> On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers  wrote:
>>
>>> we have a collection of programs in dataframe api that all do big
>>> shuffles for which we use 2048+ partitions. this works fine but it produces
>>> a lot of (small) output files, which put pressure on the memory of the
>>> drivers programs of any spark program that reads this data in again.
>>>
>>> so one of our developers stuck in a .coalesce at the end of every
>>> program just before writing to disk to reduce the output files thinking
>>> this would solve the many files issue. to his surprise the coalesce caused
>>> the existing shuffles to run with less tasks, leading to unacceptable
>>> slowdowns and OOMs. so this is not a solution.
>>>
>>> how can we insert a coalesce as a new map-phase (new job on application
>>> manager with narrow dependency) instead of modifying the existing reduce
>>> phase? i am saying map-phase because it should not introduce a new shuffle:
>>> this is wasteful and unnecessary.
>>>
>>>
>>> On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan  wrote:
>>>
>>>> In your first example, the root RDD has 1000 partitions, then you do a
>>>> shuffle (with repartitionAndSortWithinPartitions), and shuffles data
>>>> to 1000 reducers. Then you do coalesce, which asks Spark to launch
>>>> only 20 reducers to process the data which were prepared for 1
>>>> reducers. since the reducers have heavy work(sorting), so you OOM. In
>>>> general, your work flow is: 1000 mappers -> 20 reducers.
>>>>
>>>> In your second example, the coalesce introduces shuffle, so your work
>>>> flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The
>>>> sorting is done by 1000 tasks so no OOM.
>>>>
>>>> BTW have you tried DataFrame API? With Spark SQL, the memory management
>>>> is more precise, so even we only have 20 tasks to do the heavy sorting, the
>>>> system should just have more disk spills instead of OOM.
>>>>
>>>>
>>>> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers 
>>>> wrote:
>>>>
>>>>> how can i get a shuffle with 2048 partitions and 2048 tasks and then a
>>>>> map phase with 10 partitions and 10 tasks that writes to hdfs?
>>>>>
>>>>> every time i try to do this using coalesce the shuffle ends up having
>>>>> 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat
>>>>> useless.
>>>>>
>>>>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan 
>>>>> wrote:
>>>>>
>>>>>> Note that, RDD

Re: Coalesce behaviour

2018-10-14 Thread Koert Kuipers
sure, i understand currently the workaround is to add a shuffle. but that's
just a workaround, not a satisfactory solution: we shouldn't have to
introduce another shuffle (an expensive operation) just to reduce the
number of files.

logically all you need is a map-phase with less tasks after the reduce
phase with many tasks to reduce the number of files, but there is currently
no way to express this in spark. it seems the map operation always gets
tagged on to the end of the previous reduce operation, which is generally a
reasonable optimization, but not here since it causes the tasks for the
reduce to go down which is unacceptable.

On Sun, Oct 14, 2018 at 10:06 PM Wenchen Fan  wrote:

> You have a heavy workload, you want to run it with many tasks for better
> performance and stability(no OMM), but you also want to run it with few
> tasks to avoid too many small files. The reality is, mostly you can't reach
> these 2 goals together, they conflict with each other. The solution I can
> think of is to sacrifice performance a little: run the workload with many
> tasks at first, and then merge the many small files. Generally this is how
> `coalesce(n, shuffle = true)` does.
>
> On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers  wrote:
>
>> we have a collection of programs in dataframe api that all do big
>> shuffles for which we use 2048+ partitions. this works fine but it produces
>> a lot of (small) output files, which put pressure on the memory of the
>> drivers programs of any spark program that reads this data in again.
>>
>> so one of our developers stuck in a .coalesce at the end of every program
>> just before writing to disk to reduce the output files thinking this would
>> solve the many files issue. to his surprise the coalesce caused the
>> existing shuffles to run with less tasks, leading to unacceptable slowdowns
>> and OOMs. so this is not a solution.
>>
>> how can we insert a coalesce as a new map-phase (new job on application
>> manager with narrow dependency) instead of modifying the existing reduce
>> phase? i am saying map-phase because it should not introduce a new shuffle:
>> this is wasteful and unnecessary.
>>
>>
>> On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan  wrote:
>>
>>> In your first example, the root RDD has 1000 partitions, then you do a
>>> shuffle (with repartitionAndSortWithinPartitions), and shuffles data to
>>> 1000 reducers. Then you do coalesce, which asks Spark to launch only 20
>>> reducers to process the data which were prepared for 1 reducers. since
>>> the reducers have heavy work(sorting), so you OOM. In general, your work
>>> flow is: 1000 mappers -> 20 reducers.
>>>
>>> In your second example, the coalesce introduces shuffle, so your work
>>> flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The
>>> sorting is done by 1000 tasks so no OOM.
>>>
>>> BTW have you tried DataFrame API? With Spark SQL, the memory management
>>> is more precise, so even we only have 20 tasks to do the heavy sorting, the
>>> system should just have more disk spills instead of OOM.
>>>
>>>
>>> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers 
>>> wrote:
>>>
>>>> how can i get a shuffle with 2048 partitions and 2048 tasks and then a
>>>> map phase with 10 partitions and 10 tasks that writes to hdfs?
>>>>
>>>> every time i try to do this using coalesce the shuffle ends up having
>>>> 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat
>>>> useless.
>>>>
>>>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan 
>>>> wrote:
>>>>
>>>>> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>>>>>
>>>>> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`.
>>>>> Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and
>>>>> `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and
>>>>> this stage has 10 tasks (decided by the last RDD). This means, each Spark
>>>>> task will process 10 partitions of `rdd1`.
>>>>>
>>>>> Looking at your example, I don't see where is the problem. Can you
>>>>> describe what is not expected?
>>>>>
>>>>> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky 
>>>>> wrote:
>>>>>
>>>>>> Well, it seems that I can still extend the CoalesceRDD to make it
>>>>>> preserve the total number of partitions from the pa

Re: Coalesce behaviour

2018-10-13 Thread Koert Kuipers
we have a collection of programs in dataframe api that all do big shuffles
for which we use 2048+ partitions. this works fine but it produces a lot of
(small) output files, which put pressure on the memory of the drivers
programs of any spark program that reads this data in again.

so one of our developers stuck in a .coalesce at the end of every program
just before writing to disk to reduce the output files thinking this would
solve the many files issue. to his surprise the coalesce caused the
existing shuffles to run with less tasks, leading to unacceptable slowdowns
and OOMs. so this is not a solution.

how can we insert a coalesce as a new map-phase (new job on application
manager with narrow dependency) instead of modifying the existing reduce
phase? i am saying map-phase because it should not introduce a new shuffle:
this is wasteful and unnecessary.


On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan  wrote:

> In your first example, the root RDD has 1000 partitions, then you do a
> shuffle (with repartitionAndSortWithinPartitions), and shuffles data to
> 1000 reducers. Then you do coalesce, which asks Spark to launch only 20
> reducers to process the data which were prepared for 1 reducers. since
> the reducers have heavy work(sorting), so you OOM. In general, your work
> flow is: 1000 mappers -> 20 reducers.
>
> In your second example, the coalesce introduces shuffle, so your work
> flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The
> sorting is done by 1000 tasks so no OOM.
>
> BTW have you tried DataFrame API? With Spark SQL, the memory management is
> more precise, so even we only have 20 tasks to do the heavy sorting, the
> system should just have more disk spills instead of OOM.
>
>
> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers  wrote:
>
>> how can i get a shuffle with 2048 partitions and 2048 tasks and then a
>> map phase with 10 partitions and 10 tasks that writes to hdfs?
>>
>> every time i try to do this using coalesce the shuffle ends up having 10
>> tasks which is unacceptable due to OOM. this makes coalesce somewhat
>> useless.
>>
>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan  wrote:
>>
>>> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>>>
>>> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then
>>> `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and
>>> `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and
>>> this stage has 10 tasks (decided by the last RDD). This means, each Spark
>>> task will process 10 partitions of `rdd1`.
>>>
>>> Looking at your example, I don't see where is the problem. Can you
>>> describe what is not expected?
>>>
>>> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky 
>>> wrote:
>>>
>>>> Well, it seems that I can still extend the CoalesceRDD to make it
>>>> preserve the total number of partitions from the parent RDD, reduce some
>>>> partitons in the same way as the original coalesce does for map-only jobs
>>>> and fill the gaps (partitions which should reside on the positions of the
>>>> coalesced ones) with just a special kind of partitions which do not have
>>>> any parent dependencies and always return an empty iterator.
>>>>
>>>> I believe this should work as desired (at least the previous
>>>> ShuffleMapStage will think that the number of partitons in the next stage,
>>>> it generates shuffle output for, is not changed).
>>>>
>>>> There are few issues though - existence of empty partitions which can
>>>> be evaluated almost for free and empty output files from these empty
>>>> partitons which can be beaten by means of LazyOutputFormat in case of RDDs.
>>>>
>>>>
>>>>
>>>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers  wrote:
>>>>
>>>>> although i personally would describe this as a bug the answer will be
>>>>> that this is the intended behavior. the coalesce "infects" the shuffle
>>>>> before it, making a coalesce useless for reducing output files after a
>>>>> shuffle with many partitions b design.
>>>>>
>>>>> your only option left is a repartition for which you pay the price in
>>>>> that it introduces another expensive shuffle.
>>>>>
>>>>> interestingly if you do a coalesce on a map-only job it knows how to
>>>>> reduce the partitions and output files without introducing a shuffle, so
>>>>> clearly it is possible, bu

Re: Coalesce behaviour

2018-10-12 Thread Koert Kuipers
how can i get a shuffle with 2048 partitions and 2048 tasks and then a map
phase with 10 partitions and 10 tasks that writes to hdfs?

every time i try to do this using coalesce the shuffle ends up having 10
tasks which is unacceptable due to OOM. this makes coalesce somewhat
useless.

On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan  wrote:

> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>
> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then
> `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and
> `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and
> this stage has 10 tasks (decided by the last RDD). This means, each Spark
> task will process 10 partitions of `rdd1`.
>
> Looking at your example, I don't see where is the problem. Can you
> describe what is not expected?
>
> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky 
> wrote:
>
>> Well, it seems that I can still extend the CoalesceRDD to make it
>> preserve the total number of partitions from the parent RDD, reduce some
>> partitons in the same way as the original coalesce does for map-only jobs
>> and fill the gaps (partitions which should reside on the positions of the
>> coalesced ones) with just a special kind of partitions which do not have
>> any parent dependencies and always return an empty iterator.
>>
>> I believe this should work as desired (at least the previous
>> ShuffleMapStage will think that the number of partitons in the next stage,
>> it generates shuffle output for, is not changed).
>>
>> There are few issues though - existence of empty partitions which can be
>> evaluated almost for free and empty output files from these empty partitons
>> which can be beaten by means of LazyOutputFormat in case of RDDs.
>>
>>
>>
>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers  wrote:
>>
>>> although i personally would describe this as a bug the answer will be
>>> that this is the intended behavior. the coalesce "infects" the shuffle
>>> before it, making a coalesce useless for reducing output files after a
>>> shuffle with many partitions b design.
>>>
>>> your only option left is a repartition for which you pay the price in
>>> that it introduces another expensive shuffle.
>>>
>>> interestingly if you do a coalesce on a map-only job it knows how to
>>> reduce the partitions and output files without introducing a shuffle, so
>>> clearly it is possible, but i dont know how to get this behavior after a
>>> shuffle in an existing job.
>>>
>>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky 
>>> wrote:
>>>
>>>> Hello guys,
>>>>
>>>> Currently I'm a little bit confused with coalesce behaviour.
>>>>
>>>> Consider the following usecase - I'd like to join two pretty big RDDs.
>>>> To make a join more stable and to prevent it from failures by OOM RDDs
>>>> are usually repartitioned to redistribute data more evenly and to
>>>> prevent every partition from hitting 2GB limit. Then after join with a
>>>> lot of partitions.
>>>>
>>>> Then after successful join I'd like to save the resulting dataset.
>>>> But I don't need such a huge amount of files as the number of
>>>> partitions/tasks during joining. Actually I'm fine with such number of
>>>> files as the total number of executor cores allocated to the job. So
>>>> I've considered using a coalesce.
>>>>
>>>> The problem is that coalesce with shuffling disabled prevents join
>>>> from using the specified number of partitions and instead forces join
>>>> to use the number of partitions provided to coalesce
>>>>
>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>>> false).toDebugString
>>>> res5: String =
>>>> (5) CoalescedRDD[15] at coalesce at :25 []
>>>>  |  MapPartitionsRDD[14] at repartition at :25 []
>>>>  |  CoalescedRDD[13] at repartition at :25 []
>>>>  |  ShuffledRDD[12] at repartition at :25 []
>>>>  +-(20) MapPartitionsRDD[11] at repartition at :25 []
>>>> |   ParallelCollectionRDD[10] at makeRDD at :25 []
>>>>
>>>> With shuffling enabled everything is ok, e.g.
>>>>
>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>>> true).toDebugString
>>>> res6: String =
>>>> (5) MapPartitionsRDD[24] at coalesce at :25 []
>>>>  |  CoalescedRDD[23]

Re: Coalesce behaviour

2018-10-08 Thread Koert Kuipers
although i personally would describe this as a bug the answer will be that
this is the intended behavior. the coalesce "infects" the shuffle before
it, making a coalesce useless for reducing output files after a shuffle
with many partitions b design.

your only option left is a repartition for which you pay the price in that
it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce
the partitions and output files without introducing a shuffle, so clearly
it is possible, but i dont know how to get this behavior after a shuffle in
an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky 
wrote:

> Hello guys,
>
> Currently I'm a little bit confused with coalesce behaviour.
>
> Consider the following usecase - I'd like to join two pretty big RDDs.
> To make a join more stable and to prevent it from failures by OOM RDDs
> are usually repartitioned to redistribute data more evenly and to
> prevent every partition from hitting 2GB limit. Then after join with a
> lot of partitions.
>
> Then after successful join I'd like to save the resulting dataset.
> But I don't need such a huge amount of files as the number of
> partitions/tasks during joining. Actually I'm fine with such number of
> files as the total number of executor cores allocated to the job. So
> I've considered using a coalesce.
>
> The problem is that coalesce with shuffling disabled prevents join
> from using the specified number of partitions and instead forces join
> to use the number of partitions provided to coalesce
>
> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
> false).toDebugString
> res5: String =
> (5) CoalescedRDD[15] at coalesce at :25 []
>  |  MapPartitionsRDD[14] at repartition at :25 []
>  |  CoalescedRDD[13] at repartition at :25 []
>  |  ShuffledRDD[12] at repartition at :25 []
>  +-(20) MapPartitionsRDD[11] at repartition at :25 []
> |   ParallelCollectionRDD[10] at makeRDD at :25 []
>
> With shuffling enabled everything is ok, e.g.
>
> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
> true).toDebugString
> res6: String =
> (5) MapPartitionsRDD[24] at coalesce at :25 []
>  |  CoalescedRDD[23] at coalesce at :25 []
>  |  ShuffledRDD[22] at coalesce at :25 []
>  +-(100) MapPartitionsRDD[21] at coalesce at :25 []
>  |   MapPartitionsRDD[20] at repartition at :25 []
>  |   CoalescedRDD[19] at repartition at :25 []
>  |   ShuffledRDD[18] at repartition at :25 []
>  +-(20) MapPartitionsRDD[17] at repartition at :25 []
> |   ParallelCollectionRDD[16] at makeRDD at :25 []
>
> In that case the problem is that for pretty huge datasets additional
> reshuffling can take hours or at least comparable amount of time as
> for the join itself.
>
> So I'd like to understand whether it is a bug or just an expected
> behaviour?
> In case it is expected is there any way to insert additional
> ShuffleMapStage into an appropriate position of DAG but without
> reshuffling itself?
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Naming policy for packages

2018-08-15 Thread Koert Kuipers
ok it doesnt sound so bad if the maven identifier can have spark it in. no
big deal!

otherwise i was going to suggest "kraps". like kraps-xml

scala> "spark".reverse
res0: String = kraps


On Wed, Aug 15, 2018 at 2:43 PM, Sean Owen  wrote:

> I'd refer you again to the trademark policy. In the first link I see
> projects whose software ID is like "spark-foo" but title/subtitle is like
> "Foo for Apache Spark". This is OK. 'sparklyr' is in a gray area we've
> talked about before; see https://www.apache.org/foundation/marks/ as
> well. I think it's in a gray area, myself.
>
> My best advice to anyone is to avoid this entirely by just not naming your
> project anything like 'spark'.
>
> On Wed, Aug 15, 2018 at 10:39 AM <0xf0f...@protonmail.com> wrote:
>
>> Does it mean that majority of Spark related projects, including top
>> Datatbricks (https://github.com/databricks?utf8=%E2%9C%93=
>> spark==) or RStudio (sparklyr) contributions, violate the
>> trademark?
>>
>>
>> Sent with ProtonMail  Secure Email.
>>
>> ‐‐‐ Original Message ‐‐‐
>> On August 15, 2018 5:51 PM, Sean Owen  wrote:
>>
>> You might be interested in the full policy: https://spark.apache.
>> org/trademarks.html
>>
>> What it is trying to prevent is confusion. Is spark-xml from the Spark
>> project? Sounds like it but who knows ? What is a vendor releases ASFSpark
>> 3.0? Are people going to think this is an official real project release?
>>
>> You can release 'Foo for Apache Spark'. You can use shorthand like
>> foo-spark in software identifiers like Maven coordinates.
>>
>> Keeping trademark rights is essential in OSS and part of it is making an
>> effort to assert that right.
>>
>>
>>


Re: Naming policy for packages

2018-08-15 Thread Koert Kuipers
mhhh thats somewhat unfortunate?

its helpful to me that something is called say spark-xml, it tells me its
xml for spark! any other name would probably be less informative.

or is this still allowed?


On Wed, Aug 15, 2018 at 11:35 AM, Reynold Xin  wrote:

> Unfortunately that’s an Apache foundation policy and the Spark community
> has no power to change it. My understanding: The reason Spark can’t be in
> the name is because if it is used frequently enough, the foundation would
> lose the Spark trademark. Cheers.
>
> On Wed, Aug 15, 2018 at 7:19 AM Simon Dirmeier 
> wrote:
>
>> Hey,
>> thanks for clearning that up.
>> Imho this is somewhat unfortunate, because package names that contain
>> "spark", somewhat promote and advertise Apache Spark, right?
>>
>> Best,
>> Simon
>>
>> Am 15.08.18 um 14:00 schrieb Sean Owen:
>>
>> You raise a great point, and we were just discussing this. The page is
>> old and contains many projects that were listed before the trademarks we're
>> being enforced. Some have renamed themselves. We will update the page and
>> remove stale or noncompliant projects and ask those that need to change to
>> do so.
>>
>> You are correct that the guidance you quote is current and should be
>> followed.
>>
>> Note there is an exception for software identifiers.
>>
>> On Wed, Aug 15, 2018, 6:13 AM Simon Dirmeier 
>> wrote:
>>
>>> Dear all,
>>>
>>> I am currently developing two OSS extension packages for spark; one
>>> related to machine learning; one related to biological applications.
>>> According to the trademark guidelines (https://spark.apache.org/
>>> trademarks.html) I am not allowed to use
>>> *Names derived from “Spark”, such as “sparkly”. *
>>> My question is if that is really the case or how stringent these
>>> guidelines are, given that so many spark packages (
>>> https://spark.apache.org/third-party-projects.html) contain Spark as
>>> name already. I already contacted the official email for questions like
>>> these, but didn't hear back until now.
>>>
>>> Can anyone please shed light on this?
>>> Thanks in advance!
>>>
>>> Best,
>>> Simon
>>>
>>
>>


Re: [DISCUSS][SQL] Control the number of output files

2018-08-10 Thread Koert Kuipers
we have found that to make shuffles reliable without OOMs we need to have
spark.sql.shuffle.partitions at a high number, bigger than 2000 at least.
yet this leads to a large amount of part files, which puts big pressure on
spark driver programs.

i tried to mitigate this with dataframe.coalesce to reduce the number of
files, but this is not acceptable. coalesce changes the tasks for the last
shuffle before it, bringing back the issues we tried to mitigate with a
high number for spark.sql.shuffle.partitions in the first place. doing a
dataframe.repartition before every write is also not an unacceptable
approach, it is too high a price to pay just to bring down the number of
files.

so i am very excited about any approach that efficiently merges files when
writing.



On Mon, Aug 6, 2018 at 5:28 PM, lukas nalezenec  wrote:

> Hi Koert,
> There is no such Jira yet. We need SPARK-23889 before. You can find some
> mentions in the design document inside 23889.
> Best regards
> Lukas
>
> 2018-08-06 18:34 GMT+02:00 Koert Kuipers :
>
>> i went through the jiras targeting 2.4.0 trying to find a feature where
>> spark would coalesce/repartition by size (so merge small files
>> automatically), but didn't find it.
>> can someone point me to it?
>> thank you.
>> best,
>> koert
>>
>> On Sun, Aug 5, 2018 at 9:06 PM, Koert Kuipers  wrote:
>>
>>> lukas,
>>> what is the jira ticket for this? i would like to follow it's activity.
>>> thanks!
>>> koert
>>>
>>> On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec 
>>> wrote:
>>>
>>>> Hi,
>>>> Yes, This feature is planned - Spark should be soon able to repartition
>>>> output by size.
>>>> Lukas
>>>>
>>>>
>>>> Dne st 25. 7. 2018 23:26 uživatel Forest Fang 
>>>> napsal:
>>>>
>>>>> Has there been any discussion to simply support Hive's merge small
>>>>> files configuration? It simply adds one additional stage to inspect size 
>>>>> of
>>>>> each output file, recompute the desired parallelism to reach a target 
>>>>> size,
>>>>> and runs a map-only coalesce before committing the final files. Since 
>>>>> AFAIK
>>>>> SparkSQL already stages the final output commit, it seems feasible to
>>>>> respect this Hive config.
>>>>>
>>>>> https://community.hortonworks.com/questions/106987/hive-mult
>>>>> iple-small-files.html
>>>>>
>>>>>
>>>>> On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra 
>>>>> wrote:
>>>>>
>>>>>> See some of the related discussion under https://github.com/apach
>>>>>> e/spark/pull/21589
>>>>>>
>>>>>> If feels to me like we need some kind of user code mechanism to
>>>>>> signal policy preferences to Spark. This could also include ways to 
>>>>>> signal
>>>>>> scheduling policy, which could include things like scheduling pool and/or
>>>>>> barrier scheduling. Some of those scheduling policies operate at 
>>>>>> inherently
>>>>>> different levels currently -- e.g. scheduling pools at the Job level
>>>>>> (really, the thread local level in the current implementation) and 
>>>>>> barrier
>>>>>> scheduling at the Stage level -- so it is not completely obvious how to
>>>>>> unify all of these policy options/preferences/mechanism, or whether it is
>>>>>> possible, but I think it is worth considering such things at a fairly 
>>>>>> high
>>>>>> level of abstraction and try to unify and simplify before making things
>>>>>> more complex with multiple policy mechanisms.
>>>>>>
>>>>>> On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin 
>>>>>> wrote:
>>>>>>
>>>>>>> Seems like a good idea in general. Do other systems have similar
>>>>>>> concepts? In general it'd be easier if we can follow existing 
>>>>>>> convention if
>>>>>>> there is any.
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 25, 2018 at 11:50 AM John Zhuge 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> Many Spark users in my company are asking for a way to control the
>>>>>>>> number of output files in Spark SQL. There are use cases to either 
>>>>>>>> reduce
>>>>>>>> or increase the number. The users prefer not to use function
>>>>>>>> *repartition*(n) or *coalesce*(n, shuffle) that require them to
>>>>>>>> write and deploy Scala/Java/Python code.
>>>>>>>>
>>>>>>>> Could we introduce a query hint for this purpose (similar to
>>>>>>>> Broadcast Join Hints)?
>>>>>>>>
>>>>>>>> /*+ *COALESCE*(n, shuffle) */
>>>>>>>>
>>>>>>>> In general, is query hint is the best way to bring DF functionality
>>>>>>>> to SQL without extending SQL syntax? Any suggestion is highly 
>>>>>>>> appreciated.
>>>>>>>>
>>>>>>>> This requirement is not the same as SPARK-6221 that asked for
>>>>>>>> auto-merging output files.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> John Zhuge
>>>>>>>>
>>>>>>>
>>>
>>
>


Re: [DISCUSS][SQL] Control the number of output files

2018-08-06 Thread Koert Kuipers
i went through the jiras targeting 2.4.0 trying to find a feature where
spark would coalesce/repartition by size (so merge small files
automatically), but didn't find it.
can someone point me to it?
thank you.
best,
koert

On Sun, Aug 5, 2018 at 9:06 PM, Koert Kuipers  wrote:

> lukas,
> what is the jira ticket for this? i would like to follow it's activity.
> thanks!
> koert
>
> On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec  wrote:
>
>> Hi,
>> Yes, This feature is planned - Spark should be soon able to repartition
>> output by size.
>> Lukas
>>
>>
>> Dne st 25. 7. 2018 23:26 uživatel Forest Fang 
>> napsal:
>>
>>> Has there been any discussion to simply support Hive's merge small files
>>> configuration? It simply adds one additional stage to inspect size of each
>>> output file, recompute the desired parallelism to reach a target size, and
>>> runs a map-only coalesce before committing the final files. Since AFAIK
>>> SparkSQL already stages the final output commit, it seems feasible to
>>> respect this Hive config.
>>>
>>> https://community.hortonworks.com/questions/106987/hive-mult
>>> iple-small-files.html
>>>
>>>
>>> On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra 
>>> wrote:
>>>
>>>> See some of the related discussion under https://github.com/apach
>>>> e/spark/pull/21589
>>>>
>>>> If feels to me like we need some kind of user code mechanism to signal
>>>> policy preferences to Spark. This could also include ways to signal
>>>> scheduling policy, which could include things like scheduling pool and/or
>>>> barrier scheduling. Some of those scheduling policies operate at inherently
>>>> different levels currently -- e.g. scheduling pools at the Job level
>>>> (really, the thread local level in the current implementation) and barrier
>>>> scheduling at the Stage level -- so it is not completely obvious how to
>>>> unify all of these policy options/preferences/mechanism, or whether it is
>>>> possible, but I think it is worth considering such things at a fairly high
>>>> level of abstraction and try to unify and simplify before making things
>>>> more complex with multiple policy mechanisms.
>>>>
>>>> On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin 
>>>> wrote:
>>>>
>>>>> Seems like a good idea in general. Do other systems have similar
>>>>> concepts? In general it'd be easier if we can follow existing convention 
>>>>> if
>>>>> there is any.
>>>>>
>>>>>
>>>>> On Wed, Jul 25, 2018 at 11:50 AM John Zhuge  wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Many Spark users in my company are asking for a way to control the
>>>>>> number of output files in Spark SQL. There are use cases to either reduce
>>>>>> or increase the number. The users prefer not to use function
>>>>>> *repartition*(n) or *coalesce*(n, shuffle) that require them to
>>>>>> write and deploy Scala/Java/Python code.
>>>>>>
>>>>>> Could we introduce a query hint for this purpose (similar to
>>>>>> Broadcast Join Hints)?
>>>>>>
>>>>>> /*+ *COALESCE*(n, shuffle) */
>>>>>>
>>>>>> In general, is query hint is the best way to bring DF functionality
>>>>>> to SQL without extending SQL syntax? Any suggestion is highly 
>>>>>> appreciated.
>>>>>>
>>>>>> This requirement is not the same as SPARK-6221 that asked for
>>>>>> auto-merging output files.
>>>>>>
>>>>>> Thanks,
>>>>>> John Zhuge
>>>>>>
>>>>>
>


Re: [DISCUSS][SQL] Control the number of output files

2018-08-05 Thread Koert Kuipers
lukas,
what is the jira ticket for this? i would like to follow it's activity.
thanks!
koert

On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec  wrote:

> Hi,
> Yes, This feature is planned - Spark should be soon able to repartition
> output by size.
> Lukas
>
>
> Dne st 25. 7. 2018 23:26 uživatel Forest Fang 
> napsal:
>
>> Has there been any discussion to simply support Hive's merge small files
>> configuration? It simply adds one additional stage to inspect size of each
>> output file, recompute the desired parallelism to reach a target size, and
>> runs a map-only coalesce before committing the final files. Since AFAIK
>> SparkSQL already stages the final output commit, it seems feasible to
>> respect this Hive config.
>>
>> https://community.hortonworks.com/questions/106987/hive-
>> multiple-small-files.html
>>
>>
>> On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra 
>> wrote:
>>
>>> See some of the related discussion under https://github.com/
>>> apache/spark/pull/21589
>>>
>>> If feels to me like we need some kind of user code mechanism to signal
>>> policy preferences to Spark. This could also include ways to signal
>>> scheduling policy, which could include things like scheduling pool and/or
>>> barrier scheduling. Some of those scheduling policies operate at inherently
>>> different levels currently -- e.g. scheduling pools at the Job level
>>> (really, the thread local level in the current implementation) and barrier
>>> scheduling at the Stage level -- so it is not completely obvious how to
>>> unify all of these policy options/preferences/mechanism, or whether it is
>>> possible, but I think it is worth considering such things at a fairly high
>>> level of abstraction and try to unify and simplify before making things
>>> more complex with multiple policy mechanisms.
>>>
>>> On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin  wrote:
>>>
 Seems like a good idea in general. Do other systems have similar
 concepts? In general it'd be easier if we can follow existing convention if
 there is any.


 On Wed, Jul 25, 2018 at 11:50 AM John Zhuge  wrote:

> Hi all,
>
> Many Spark users in my company are asking for a way to control the
> number of output files in Spark SQL. There are use cases to either reduce
> or increase the number. The users prefer not to use function
> *repartition*(n) or *coalesce*(n, shuffle) that require them to write
> and deploy Scala/Java/Python code.
>
> Could we introduce a query hint for this purpose (similar to Broadcast
> Join Hints)?
>
> /*+ *COALESCE*(n, shuffle) */
>
> In general, is query hint is the best way to bring DF functionality to
> SQL without extending SQL syntax? Any suggestion is highly appreciated.
>
> This requirement is not the same as SPARK-6221 that asked for
> auto-merging output files.
>
> Thanks,
> John Zhuge
>



Re: eager execution and debuggability

2018-05-08 Thread Koert Kuipers
yeah we run into this all the time with new hires. they will send emails
explaining there is an error in the .write operation and they are debugging
the writing to disk, focusing on that piece of code :)

unrelated, but another frequent cause for confusion is cascading errors.
like the FetchFailedException. they will be debugging the reducer task not
realizing the error happened before that, and the FetchFailedException is
not the root cause.


On Tue, May 8, 2018 at 2:52 PM, Reynold Xin  wrote:

> Similar to the thread yesterday about improving ML/DL integration, I'm
> sending another email on what I've learned recently from Spark users. I
> recently talked to some educators that have been teaching Spark in their
> (top-tier) university classes. They are some of the most important users
> for adoption because of the multiplicative effect they have on the future
> generation.
>
> To my surprise the single biggest ask they want is to enable eager
> execution mode on all operations for teaching and debuggability:
>
> (1) Most of the students are relatively new to programming, and they need
> multiple iterations to even get the most basic operation right. In these
> cases, in order to trigger an error, they would need to explicitly add
> actions, which is non-intuitive.
>
> (2) If they don't add explicit actions to every operation and there is a
> mistake, the error pops up somewhere later where an action is triggered.
> This is in a different position from the code that causes the problem, and
> difficult for students to correlate the two.
>
> I suspect in the real world a lot of Spark users also struggle in similar
> ways as these students. While eager execution is really not practical in
> big data, in learning environments or in development against small, sampled
> datasets it can be pretty helpful.
>
>
>
>
>
>
>
>
>
>


Re: Drop the Hadoop 2.6 profile?

2018-02-08 Thread Koert Kuipers
w
​ire compatibility is relevant if hadoop is included in spark build


for those of us that build spark without hadoop included hadoop (binary)
api compatibility matters. i wouldn't want to build against hadoop 2.7 and
deploy on hadoop 2.6, but i am ok the other way around. so to get the
compatibility with all the major distros and cloud providers building
against hadoop 2.6 is currently the way to go.


On Thu, Feb 8, 2018 at 5:09 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> I think it would make sense to drop one of them, but not necessarily 2.6.
>
> It kinda depends on what wire compatibility guarantees the Hadoop
> libraries have; can a 2.6 client talk to 2.7 (pretty certain it can)?
> Is the opposite safe (not sure)?
>
> If the answer to the latter question is "no", then keeping 2.6 and
> dropping 2.7 makes more sense. Those who really want a
> Hadoop-version-specific package can override the needed versions in
> the command line, or use the "without hadoop" package.
>
> But in the context of trying to support 3.0 it makes sense to drop one
> of them, at least from jenkins.
>
>
> On Thu, Feb 8, 2018 at 2:03 PM, Sean Owen <so...@cloudera.com> wrote:
> > That would still work with a Hadoop-2.7-based profile, as there isn't
> > actually any code difference in Spark that treats the two versions
> > differently (nor, really, much different between 2.6 and 2.7 to begin
> with).
> > This practice of different profile builds was pretty unnecessary after
> 2.2;
> > it's mostly vestigial now.
> >
> > On Thu, Feb 8, 2018 at 3:57 PM Koert Kuipers <ko...@tresata.com> wrote:
> >>
> >> CDH 5 is still based on hadoop 2.6
> >>
> >> On Thu, Feb 8, 2018 at 2:03 PM, Sean Owen <so...@cloudera.com> wrote:
> >>>
> >>> Mostly just shedding the extra build complexity, and builds. The
> primary
> >>> little annoyance is it's 2x the number of flaky build failures to
> examine.
> >>> I suppose it allows using a 2.7+-only feature, but outside of YARN, not
> >>> sure there is anything compelling.
> >>>
> >>> It's something that probably gains us virtually nothing now, but isn't
> >>> too painful either.
> >>> I think it will not make sense to distinguish them once any Hadoop
> >>> 3-related support comes into the picture, and maybe that will start
> soon;
> >>> there were some more pings on related JIRAs this week. You could view
> it as
> >>> early setup for that move.
> >>>
> >>>
> >>> On Thu, Feb 8, 2018 at 12:57 PM Reynold Xin <r...@databricks.com>
> wrote:
> >>>>
> >>>> Does it gain us anything to drop 2.6?
> >>>>
> >>>> > On Feb 8, 2018, at 10:50 AM, Sean Owen <so...@cloudera.com> wrote:
> >>>> >
> >>>> > At this point, with Hadoop 3 on deck, I think hadoop 2.6 is both
> >>>> > fairly old, and actually, not different from 2.7 with respect to
> Spark. That
> >>>> > is, I don't know if we are actually maintaining anything here but a
> separate
> >>>> > profile and 2x the number of test builds.
> >>>> >
> >>>> > The cost is, by the same token, low. However I'm floating the idea
> of
> >>>> > removing the 2.6 profile and just requiring 2.7+ as of Spark 2.4?
> >>
> >>
> >
>
>
>
> --
> Marcelo
>


Re: Drop the Hadoop 2.6 profile?

2018-02-08 Thread Koert Kuipers
oh nevermind i am used to spark builds without hadoop included. but i
realize that if hadoop is included it matters if its 2.6 or 2.7...

On Thu, Feb 8, 2018 at 5:06 PM, Koert Kuipers <ko...@tresata.com> wrote:

> wouldn't hadoop 2.7 profile means someone by introduces usage of some
> hadoop apis that dont exist in hadoop 2.6?
>
> why not keep 2.6 and ditch 2.7 given that hadoop 2.7 is backwards
> compatible with 2.6? what is the added value of having a 2.7 profile?
>
> On Thu, Feb 8, 2018 at 5:03 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> That would still work with a Hadoop-2.7-based profile, as there isn't
>> actually any code difference in Spark that treats the two versions
>> differently (nor, really, much different between 2.6 and 2.7 to begin
>> with). This practice of different profile builds was pretty unnecessary
>> after 2.2; it's mostly vestigial now.
>>
>> On Thu, Feb 8, 2018 at 3:57 PM Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> CDH 5 is still based on hadoop 2.6
>>>
>>> On Thu, Feb 8, 2018 at 2:03 PM, Sean Owen <so...@cloudera.com> wrote:
>>>
>>>> Mostly just shedding the extra build complexity, and builds. The
>>>> primary little annoyance is it's 2x the number of flaky build failures to
>>>> examine.
>>>> I suppose it allows using a 2.7+-only feature, but outside of YARN, not
>>>> sure there is anything compelling.
>>>>
>>>> It's something that probably gains us virtually nothing now, but isn't
>>>> too painful either.
>>>> I think it will not make sense to distinguish them once any Hadoop
>>>> 3-related support comes into the picture, and maybe that will start soon;
>>>> there were some more pings on related JIRAs this week. You could view it as
>>>> early setup for that move.
>>>>
>>>>
>>>> On Thu, Feb 8, 2018 at 12:57 PM Reynold Xin <r...@databricks.com>
>>>> wrote:
>>>>
>>>>> Does it gain us anything to drop 2.6?
>>>>>
>>>>> > On Feb 8, 2018, at 10:50 AM, Sean Owen <so...@cloudera.com> wrote:
>>>>> >
>>>>> > At this point, with Hadoop 3 on deck, I think hadoop 2.6 is both
>>>>> fairly old, and actually, not different from 2.7 with respect to Spark.
>>>>> That is, I don't know if we are actually maintaining anything here but a
>>>>> separate profile and 2x the number of test builds.
>>>>> >
>>>>> > The cost is, by the same token, low. However I'm floating the idea
>>>>> of removing the 2.6 profile and just requiring 2.7+ as of Spark 2.4?
>>>>>
>>>>
>>>
>


Re: Drop the Hadoop 2.6 profile?

2018-02-08 Thread Koert Kuipers
wouldn't hadoop 2.7 profile means someone by introduces usage of some
hadoop apis that dont exist in hadoop 2.6?

why not keep 2.6 and ditch 2.7 given that hadoop 2.7 is backwards
compatible with 2.6? what is the added value of having a 2.7 profile?

On Thu, Feb 8, 2018 at 5:03 PM, Sean Owen <so...@cloudera.com> wrote:

> That would still work with a Hadoop-2.7-based profile, as there isn't
> actually any code difference in Spark that treats the two versions
> differently (nor, really, much different between 2.6 and 2.7 to begin
> with). This practice of different profile builds was pretty unnecessary
> after 2.2; it's mostly vestigial now.
>
> On Thu, Feb 8, 2018 at 3:57 PM Koert Kuipers <ko...@tresata.com> wrote:
>
>> CDH 5 is still based on hadoop 2.6
>>
>> On Thu, Feb 8, 2018 at 2:03 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> Mostly just shedding the extra build complexity, and builds. The primary
>>> little annoyance is it's 2x the number of flaky build failures to examine.
>>> I suppose it allows using a 2.7+-only feature, but outside of YARN, not
>>> sure there is anything compelling.
>>>
>>> It's something that probably gains us virtually nothing now, but isn't
>>> too painful either.
>>> I think it will not make sense to distinguish them once any Hadoop
>>> 3-related support comes into the picture, and maybe that will start soon;
>>> there were some more pings on related JIRAs this week. You could view it as
>>> early setup for that move.
>>>
>>>
>>> On Thu, Feb 8, 2018 at 12:57 PM Reynold Xin <r...@databricks.com> wrote:
>>>
>>>> Does it gain us anything to drop 2.6?
>>>>
>>>> > On Feb 8, 2018, at 10:50 AM, Sean Owen <so...@cloudera.com> wrote:
>>>> >
>>>> > At this point, with Hadoop 3 on deck, I think hadoop 2.6 is both
>>>> fairly old, and actually, not different from 2.7 with respect to Spark.
>>>> That is, I don't know if we are actually maintaining anything here but a
>>>> separate profile and 2x the number of test builds.
>>>> >
>>>> > The cost is, by the same token, low. However I'm floating the idea of
>>>> removing the 2.6 profile and just requiring 2.7+ as of Spark 2.4?
>>>>
>>>
>>


Re: Drop the Hadoop 2.6 profile?

2018-02-08 Thread Koert Kuipers
CDH 5 is still based on hadoop 2.6

On Thu, Feb 8, 2018 at 2:03 PM, Sean Owen  wrote:

> Mostly just shedding the extra build complexity, and builds. The primary
> little annoyance is it's 2x the number of flaky build failures to examine.
> I suppose it allows using a 2.7+-only feature, but outside of YARN, not
> sure there is anything compelling.
>
> It's something that probably gains us virtually nothing now, but isn't too
> painful either.
> I think it will not make sense to distinguish them once any Hadoop
> 3-related support comes into the picture, and maybe that will start soon;
> there were some more pings on related JIRAs this week. You could view it as
> early setup for that move.
>
>
> On Thu, Feb 8, 2018 at 12:57 PM Reynold Xin  wrote:
>
>> Does it gain us anything to drop 2.6?
>>
>> > On Feb 8, 2018, at 10:50 AM, Sean Owen  wrote:
>> >
>> > At this point, with Hadoop 3 on deck, I think hadoop 2.6 is both fairly
>> old, and actually, not different from 2.7 with respect to Spark. That is, I
>> don't know if we are actually maintaining anything here but a separate
>> profile and 2x the number of test builds.
>> >
>> > The cost is, by the same token, low. However I'm floating the idea of
>> removing the 2.6 profile and just requiring 2.7+ as of Spark 2.4?
>>
>


Re: no-reopen-closed?

2018-01-28 Thread Koert Kuipers
what would prevent a person who kept reopening a ticket from now creating a
new ticket every time it is closed?


On Sat, Jan 27, 2018 at 8:41 PM, Sean Owen  wrote:

> Yeah you'd have to create a new one. You could link the two.
>
>
> On Sat, Jan 27, 2018, 7:07 PM Xiao Li  wrote:
>
>> Hi, Sean,
>>
>> Thanks for your quick reply. For example, https://issues.apache.org/
>> jira/browse/SPARK-17282
>>
>> To reopen it, I need to create a new one, right?
>>
>> Thanks,
>>
>> Xiao
>>
>>
>>
>> 2018-01-27 17:02 GMT-08:00 Sean Owen :
>>
>>> Yes this happened about 6 months ago when we had a person reopen a JIRA
>>> over and over despite being told not to. We changed the workflow such that
>>> Closed can't become Reopened.
>>>
>>> I would not move anything to Closed unless you need it to be permanent
>>> for reasons like that. Resolved is the normal end state of JIRAs.
>>>
>>> On Sat, Jan 27, 2018 at 7:01 PM Xiao Li  wrote:
>>>
 Unable to reopen the closed JIRA? I am wondering if anybody changed the
 workflow?

 Thanks,

 Xiao





>>


Re: Spark 3

2018-01-19 Thread Koert Kuipers
i was expecting to be able to move to scala 2.12 sometime this year

if this cannot be done in spark 2.x then that could be a compelling reason
to move spark 3 up to 2018 i think

hadoop 3 sounds great but personally i have no use case for it yet

On Fri, Jan 19, 2018 at 12:31 PM, Sean Owen  wrote:

> Forking this thread to muse about Spark 3. Like Spark 2, I assume it would
> be more about making all those accumulated breaking changes and updating
> lots of dependencies. Hadoop 3 looms large in that list as well as Scala
> 2.12.
>
> Spark 1 was release in May 2014, and Spark 2 in July 2016. If Spark 2.3 is
> out in Feb 2018 and it takes the now-usual 6 months until a next release,
> Spark 3 could reasonably be next.
>
> However the release cycles are naturally slowing down, and it could also
> be said that 2019 would be more on schedule for Spark 3.
>
> Nothing particularly urgent about deciding, but I'm curious if anyone had
> an opinion on whether to move on to Spark 3 next or just continue with 2.4
> later this year.
>
> On Fri, Jan 19, 2018 at 11:13 AM Sean Owen  wrote:
>
>> Yeah, if users are using Kryo directly, they should be insulated from a
>> Spark-side change because of shading.
>> However this also entails updating (unshaded) Chill from 0.8.x to 0.9.x.
>> I am not sure if that causes problems for apps.
>>
>> Normally I'd avoid any major-version change in a minor release. This one
>> looked potentially entirely internal.
>> I think if there are any doubts, we can leave it for Spark 3. There was a
>> bug report that needed a fix from Kryo 4, but it might be minor after all.
>>
>>>
>>>


Re: Kryo 4 serialized form changes -- a problem?

2018-01-19 Thread Koert Kuipers
i think its probably fine, but i remember updating kryo and chill to be a
major issue with scalding historically exactly because kryo was also used
for serialized data on disk by some major users.

On Fri, Jan 19, 2018 at 12:13 PM, Sean Owen <so...@cloudera.com> wrote:

> Yeah, if users are using Kryo directly, they should be insulated from a
> Spark-side change because of shading.
> However this also entails updating (unshaded) Chill from 0.8.x to 0.9.x. I
> am not sure if that causes problems for apps.
>
> Normally I'd avoid any major-version change in a minor release. This one
> looked potentially entirely internal.
> I think if there are any doubts, we can leave it for Spark 3. There was a
> bug report that needed a fix from Kryo 4, but it might be minor after all.
>
>
> On Fri, Jan 19, 2018 at 11:05 AM Koert Kuipers <ko...@tresata.com> wrote:
>
>> it is mainly a problem because for reasons of sanity one wants to keep
>> single kryo/chill version, and kryo/chill could be used in other places for
>> somewhat persistent serialization by the user.
>>
>> i know, this is not spark's problem... it is the users problem. but i
>> would find it odd to change kryo in a minor upgrade in general. not that it
>> cannot be done.
>>
>>
>>
>> On Fri, Jan 19, 2018 at 8:55 AM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> See:
>>>
>>> https://issues.apache.org/jira/browse/SPARK-23131
>>> https://github.com/apache/spark/pull/20301#issuecomment-358473199
>>>
>>> I expected a major Kryo upgrade to be problematic, but it worked fine.
>>> It picks up a number of fixes:
>>> https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-4.0.0
>>>
>>> It might be good for Spark 2.4.
>>>
>>> Its serialized format isn't entirely compatible though. I'm trying to
>>> recall whether this is a problem in practice. We don't guarantee wire
>>> compatibility across mismatched Spark versions, right?
>>>
>>> But does the Kryo serialized form show up in any persistent stored form?
>>> I don't believe any normal output, even that of saveAsObjectFile, uses it.
>>>
>>> I'm wondering if I am not recalling why this would be a problem to
>>> update?
>>>
>>> Sean
>>>
>>
>>


Re: [VOTE] Apache Spark 2.2.0 (RC4)

2017-06-19 Thread Koert Kuipers
If a feature added recently breaks using kryo serializer with 2000+
partitions then how can it not be a regression? I mean I use kryo with more
than 2000 partitions all the time, and it worked before. Or was I simply
not hitting this bug because there are other conditions that also need to
be satisfied besides kryo and 2000+ partitions?

On Jun 19, 2017 2:20 AM, "Liang-Chi Hsieh"  wrote:


I think it's not. This is a feature added recently.


Hyukjin Kwon wrote
> Is this a regression BTW? I am just curious.
>
> On 19 Jun 2017 1:18 pm, "Liang-Chi Hsieh" 

> viirya@

>  wrote:
>
> -1. When using kyro serializer and partition number is greater than 2000.
> There seems a NPE issue needed to fix.
>
> SPARK-21133 https://issues.apache.org/jira/browse/SPARK-21133;
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-
> 2-2-0-RC4-tp21677p21784.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail:

> dev-unsubscribe@.apache





-
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-
developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-
2-2-0-RC4-tp21677p21786.html
Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Re: [VOTE] Apache Spark 2.2.0 (RC1)

2017-05-01 Thread Koert Kuipers
sounds like you are running into the fact that you cannot really put your
classes before spark's on classpath? spark's switches to support this never
really worked for me either.

inability to control the classpath + inconsistent jars => trouble ?

On Mon, May 1, 2017 at 2:36 PM, Frank Austin Nothaft 
wrote:

> Hi Ryan,
>
> We do set Avro to 1.8 in our downstream project. We also set Spark as a
> provided dependency, and build an überjar. We run via spark-submit, which
> builds the classpath with our überjar and all of the Spark deps. This leads
> to avro 1.7.1 getting picked off of the classpath at runtime, which causes
> the no such method exception to occur.
>
> Regards,
>
> Frank Austin Nothaft
> fnoth...@berkeley.edu
> fnoth...@eecs.berkeley.edu
> 202-340-0466 <(202)%20340-0466>
>
> On May 1, 2017, at 11:31 AM, Ryan Blue  wrote:
>
> Frank,
>
> The issue you're running into is caused by using parquet-avro with Avro
> 1.7. Can't your downstream project set the Avro dependency to 1.8? Spark
> can't update Avro because it is a breaking change that would force users to
> rebuilt specific Avro classes in some cases. But you should be free to use
> Avro 1.8 to avoid the problem.
>
> On Mon, May 1, 2017 at 11:08 AM, Frank Austin Nothaft <
> fnoth...@berkeley.edu> wrote:
>
>> Hi Ryan et al,
>>
>> The issue we’ve seen using a build of the Spark 2.2.0 branch from a
>> downstream project is that parquet-avro uses one of the new Avro 1.8.0
>> methods, and you get a NoSuchMethodError since Spark puts Avro 1.7.7 as a
>> dependency. My colleague Michael (who posted earlier on this thread)
>> documented this in Spark-19697
>> . I know that Spark
>> has unit tests that check this compatibility issue, but it looks like there
>> was a recent change that sets a test scope dependency on Avro 1.8.0
>> ,
>> which masks this issue in the unit tests. With this error, you can’t use
>> the ParquetAvroOutputFormat from a application running on Spark 2.2.0.
>>
>> Regards,
>>
>> Frank Austin Nothaft
>> fnoth...@berkeley.edu
>> fnoth...@eecs.berkeley.edu
>> 202-340-0466 <(202)%20340-0466>
>>
>> On May 1, 2017, at 10:02 AM, Ryan Blue > > wrote:
>>
>> I agree with Sean. Spark only pulls in parquet-avro for tests. For
>> execution, it implements the record materialization APIs in Parquet to go
>> directly to Spark SQL rows. This doesn't actually leak an Avro 1.8
>> dependency into Spark as far as I can tell.
>>
>> rb
>>
>> On Mon, May 1, 2017 at 8:34 AM, Sean Owen  wrote:
>>
>>> See discussion at https://github.com/apache/spark/pull/17163 -- I think
>>> the issue is that fixing this trades one problem for a slightly bigger one.
>>>
>>>
>>> On Mon, May 1, 2017 at 4:13 PM Michael Heuer  wrote:
>>>
 Version 2.2.0 bumps the dependency version for parquet to 1.8.2 but
 does not bump the dependency version for avro (currently at 1.7.7).  Though
 perhaps not clear from the issue I reported [0], this means that Spark is
 internally inconsistent, in that a call through parquet (which depends on
 avro 1.8.0 [1]) may throw errors at runtime when it hits avro 1.7.7 on the
 classpath.  Avro 1.8.0 is not binary compatible with 1.7.7.

 [0] - https://issues.apache.org/jira/browse/SPARK-19697
 [1] - https://github.com/apache/parquet-mr/blob/apache-parquet-1.8
 .2/pom.xml#L96

 On Sun, Apr 30, 2017 at 3:28 AM, Sean Owen  wrote:

> I have one more issue that, if it needs to be fixed, needs to be fixed
> for 2.2.0.
>
> I'm fixing build warnings for the release and noticed that checkstyle
> actually complains there are some Java methods named in TitleCase, like
> `ProcessingTimeTimeout`:
>
> https://github.com/apache/spark/pull/17803/files#r113934080
>
> Easy enough to fix and it's right, that's not conventional. However I
> wonder if it was done on purpose to match a class name?
>
> I think this is one for @tdas
>
> On Thu, Apr 27, 2017 at 7:31 PM Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> Please vote on releasing the following candidate as Apache Spark
>> version 2.2.0. The vote is open until Tues, May 2nd, 2017 at 12:00
>> PST and passes if a majority of at least 3 +1 PMC votes are cast.
>>
>> [ ] +1 Release this package as Apache Spark 2.2.0
>> [ ] -1 Do not release this package because ...
>>
>>
>> To learn more about Apache Spark, please see http://spark.apache.org/
>>
>> The tag to be voted on is v2.2.0-rc1
>>  (8ccb4a57c82146c
>> 1a8f8966c7e64010cf5632cb6)
>>
>> List of JIRA tickets resolved can be found with this filter

Re: [VOTE] Apache Spark 2.2.0 (RC1)

2017-04-28 Thread Koert Kuipers
we have been testing the 2.2.0 snapshots in the last few weeks for inhouse
unit tests, integration tests and real workloads and we are very happy with
it. the only issue i had so far (some encoders not being serialize anymore)
has already been dealt with by wenchen.

On Thu, Apr 27, 2017 at 6:49 PM, Sean Owen  wrote:

> By the way the RC looks good. Sigs and license are OK, tests pass with
> -Phive -Pyarn -Phadoop-2.7. +1 from me.
>
> On Thu, Apr 27, 2017 at 7:31 PM Michael Armbrust 
> wrote:
>
>> Please vote on releasing the following candidate as Apache Spark version
>> 2.2.0. The vote is open until Tues, May 2nd, 2017 at 12:00 PST and
>> passes if a majority of at least 3 +1 PMC votes are cast.
>>
>> [ ] +1 Release this package as Apache Spark 2.2.0
>> [ ] -1 Do not release this package because ...
>>
>>
>> To learn more about Apache Spark, please see http://spark.apache.org/
>>
>> The tag to be voted on is v2.2.0-rc1
>>  (8ccb4a57c82146c
>> 1a8f8966c7e64010cf5632cb6)
>>
>> List of JIRA tickets resolved can be found with this filter
>> 
>> .
>>
>> The release files, including signatures, digests, etc. can be found at:
>> http://home.apache.org/~pwendell/spark-releases/spark-2.2.0-rc1-bin/
>>
>> Release artifacts are signed with the following key:
>> https://people.apache.org/keys/committer/pwendell.asc
>>
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1235/
>>
>> The documentation corresponding to this release can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.2.0-rc1-docs/
>>
>>
>> *FAQ*
>>
>> *How can I help test this release?*
>>
>> If you are a Spark user, you can help us test this release by taking an
>> existing Spark workload and running on this release candidate, then
>> reporting any regressions.
>>
>> *What should happen to JIRA tickets still targeting 2.2.0?*
>>
>> Committers should look at those and triage. Extremely important bug
>> fixes, documentation, and API tweaks that impact compatibility should be
>> worked on immediately. Everything else please retarget to 2.3.0 or 2.2.1.
>>
>> *But my bug isn't fixed!??!*
>>
>> In order to make timely releases, we will typically not hold the release
>> unless the bug in question is a regression from 2.1.1.
>>
>


Re: [VOTE] Apache Spark 2.2.0 (RC1)

2017-04-28 Thread Koert Kuipers
this is column names containing dots that do not target fields inside
structs? so not a.b as in field b inside struct a, but somehow a field
called a.b? i didnt even know it is supported at all. its something i would
never try because it sounds like a bad idea to go there...

On Fri, Apr 28, 2017 at 12:17 PM, Andrew Ash  wrote:

> -1 due to regression from 2.1.1
>
> In 2.2.0-rc1 we bumped the Parquet version from 1.8.1 to 1.8.2 in commit
> 26a4cba3ff .  Parquet
> 1.8.2 includes a backport from 1.9.0: PARQUET-389
>  in commit 2282c22c
> 
>
> This backport caused a regression in Spark, where filtering on columns
> containing dots in the column name pushes the filter down into Parquet
> where Parquet incorrectly handles the predicate.  Spark pushes the String
> "col.dots" as the column name, but Parquet interprets this as
> "struct.field" where the predicate is on a field of a struct.  The ultimate
> result is that the predicate always returns zero results, causing a data
> correctness issue.
>
> This issue is filed in Spark as SPARK-20364
>  and has a PR fix up
> at PR #17680 .
>
> I nominate SPARK-20364  as
> a release blocker due to the data correctness regression.
>
> Thanks!
> Andrew
>
> On Thu, Apr 27, 2017 at 6:49 PM, Sean Owen  wrote:
>
>> By the way the RC looks good. Sigs and license are OK, tests pass with
>> -Phive -Pyarn -Phadoop-2.7. +1 from me.
>>
>> On Thu, Apr 27, 2017 at 7:31 PM Michael Armbrust 
>> wrote:
>>
>>> Please vote on releasing the following candidate as Apache Spark
>>> version 2.2.0. The vote is open until Tues, May 2nd, 2017 at 12:00 PST
>>> and passes if a majority of at least 3 +1 PMC votes are cast.
>>>
>>> [ ] +1 Release this package as Apache Spark 2.2.0
>>> [ ] -1 Do not release this package because ...
>>>
>>>
>>> To learn more about Apache Spark, please see http://spark.apache.org/
>>>
>>> The tag to be voted on is v2.2.0-rc1
>>>  (8ccb4a57c82146c
>>> 1a8f8966c7e64010cf5632cb6)
>>>
>>> List of JIRA tickets resolved can be found with this filter
>>> 
>>> .
>>>
>>> The release files, including signatures, digests, etc. can be found at:
>>> http://home.apache.org/~pwendell/spark-releases/spark-2.2.0-rc1-bin/
>>>
>>> Release artifacts are signed with the following key:
>>> https://people.apache.org/keys/committer/pwendell.asc
>>>
>>> The staging repository for this release can be found at:
>>> https://repository.apache.org/content/repositories/orgapachespark-1235/
>>>
>>> The documentation corresponding to this release can be found at:
>>> http://people.apache.org/~pwendell/spark-releases/spark-2.2.0-rc1-docs/
>>>
>>>
>>> *FAQ*
>>>
>>> *How can I help test this release?*
>>>
>>> If you are a Spark user, you can help us test this release by taking an
>>> existing Spark workload and running on this release candidate, then
>>> reporting any regressions.
>>>
>>> *What should happen to JIRA tickets still targeting 2.2.0?*
>>>
>>> Committers should look at those and triage. Extremely important bug
>>> fixes, documentation, and API tweaks that impact compatibility should be
>>> worked on immediately. Everything else please retarget to 2.3.0 or 2.2.1.
>>>
>>> *But my bug isn't fixed!??!*
>>>
>>> In order to make timely releases, we will typically not hold the release
>>> unless the bug in question is a regression from 2.1.1.
>>>
>>
>


Re: distributed computation of median

2017-04-17 Thread Koert Kuipers
Also q-tree is implemented in algebird, not hard to get it going in spark.
That is another probabilistic data structure that is useful for this.

On Apr 17, 2017 11:27, "Jason White"  wrote:

> Have you looked at t-digests?
>
> Calculating percentiles (including medians) is something that is inherently
> difficult/inefficient to do in a distributed system. T-digests provide a
> useful probabilistic structure to allow you to compute any percentile with
> a
> known (and tunable) margin of error.
>
> https://github.com/tdunning/t-digest
>
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/distributed-computation-of-median-
> tp21356p21357.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


2.2 branch

2017-04-13 Thread Koert Kuipers
i see there is no 2.2 branch yet for spark. has this been pushed out until
after 2.1.1 is done?


thanks!


Re: internal unit tests failing against the latest spark master

2017-04-12 Thread Koert Kuipers
i confirmed that an Encoder[Array[Int]] is no longer serializable, and with
my spark build from march 7 it was.

i believe the issue is commit 295747e59739ee8a697ac3eba485d3439e4a04c3 and
i send wenchen an email about it.

On Wed, Apr 12, 2017 at 4:31 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i believe the error is related to an 
> org.apache.spark.sql.expressions.Aggregator
> where the buffer type (BUF) is Array[Int]
>
> On Wed, Apr 12, 2017 at 4:19 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> hey all,
>> today i tried upgrading the spark version we use internally by creating a
>> new internal release from the spark master branch. last time i did this was
>> march 7.
>>
>> with this updated spark i am seeing some serialization errors in the unit
>> tests for our own libraries. looks like a scala reflection type that is not
>> serializable is getting sucked into serialization for the encoder?
>> see below.
>> best,
>> koert
>>
>> [info]   org.apache.spark.SparkException: Task not serializable
>> [info]   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(Clo
>> sureCleaner.scala:298)
>> [info]   at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>> ClosureCleaner$$clean(ClosureCleaner.scala:288)
>> [info]   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.
>> scala:108)
>> [info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2284)
>> [info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2058)
>> ...
>> [info] Serialization stack:
>> [info] - object not serializable (class:
>> scala.reflect.internal.BaseTypeSeqs$BaseTypeSeq, value:
>> BTS(Int,AnyVal,Any))
>> [info] - field (class: scala.reflect.internal.Types$TypeRef, name:
>> baseTypeSeqCache, type: class scala.reflect.internal.BaseTyp
>> eSeqs$BaseTypeSeq)
>> [info] - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef,
>> Int)
>> [info] - field (class: 
>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6,
>> name: elementType$2, type: class scala.reflect.api.Types$TypeApi)
>> [info] - object (class 
>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6,
>> )
>> [info] - field (class: org.apache.spark.sql.catalyst.
>> expressions.objects.UnresolvedMapObjects, name: function, type:
>> interface scala.Function1)
>> [info] - object (class org.apache.spark.sql.catalyst.
>> expressions.objects.UnresolvedMapObjects, unresolvedmapobjects(,
>> getcolumnbyordinal(0, ArrayType(IntegerType,false)), Some(interface
>> scala.collection.Seq)))
>> [info] - field (class: org.apache.spark.sql.catalyst.
>> expressions.objects.WrapOption, name: child, type: class
>> org.apache.spark.sql.catalyst.expressions.Expression)
>> [info] - object (class org.apache.spark.sql.catalyst.
>> expressions.objects.WrapOption, wrapoption(unresolvedmapobjects(,
>> getcolumnbyordinal(0, ArrayType(IntegerType,false)), Some(interface
>> scala.collection.Seq)), ObjectType(interface scala.collection.Seq)))
>> [info] - writeObject data (class: scala.collection.immutable.Lis
>> t$SerializationProxy)
>> [info] - object (class 
>> scala.collection.immutable.List$SerializationProxy,
>> scala.collection.immutable.List$SerializationProxy@69040c85)
>> [info] - writeReplace data (class: scala.collection.immutable.Lis
>> t$SerializationProxy)
>> [info] - object (class scala.collection.immutable.$colon$colon,
>> List(wrapoption(unresolvedmapobjects(, getcolumnbyordinal(0,
>> ArrayType(IntegerType,false)), Some(interface scala.collection.Seq)),
>> ObjectType(interface scala.collection.Seq
>> [info] - field (class: org.apache.spark.sql.catalyst.
>> expressions.objects.NewInstance, name: arguments, type: interface
>> scala.collection.Seq)
>> [info] - object (class org.apache.spark.sql.catalyst.
>> expressions.objects.NewInstance, newInstance(class scala.Tuple1))
>> [info] - field (class: 
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
>> name: deserializer, type: class org.apache.spark.sql.catalyst.
>> expressions.Expression)
>> [info] - object (class 
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
>> class[_1[0]: array])
>> ...
>>
>>
>


Re: internal unit tests failing against the latest spark master

2017-04-12 Thread Koert Kuipers
i believe the error is related to an
org.apache.spark.sql.expressions.Aggregator where the buffer type (BUF) is
Array[Int]

On Wed, Apr 12, 2017 at 4:19 PM, Koert Kuipers <ko...@tresata.com> wrote:

> hey all,
> today i tried upgrading the spark version we use internally by creating a
> new internal release from the spark master branch. last time i did this was
> march 7.
>
> with this updated spark i am seeing some serialization errors in the unit
> tests for our own libraries. looks like a scala reflection type that is not
> serializable is getting sucked into serialization for the encoder?
> see below.
> best,
> koert
>
> [info]   org.apache.spark.SparkException: Task not serializable
> [info]   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:298)
> [info]   at org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> [info]   at org.apache.spark.util.ClosureCleaner$.clean(
> ClosureCleaner.scala:108)
> [info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2284)
> [info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2058)
> ...
> [info] Serialization stack:
> [info] - object not serializable (class: 
> scala.reflect.internal.BaseTypeSeqs$BaseTypeSeq,
> value: BTS(Int,AnyVal,Any))
> [info] - field (class: scala.reflect.internal.Types$TypeRef, name:
> baseTypeSeqCache, type: class scala.reflect.internal.
> BaseTypeSeqs$BaseTypeSeq)
> [info] - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef,
> Int)
> [info] - field (class: 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6,
> name: elementType$2, type: class scala.reflect.api.Types$TypeApi)
> [info] - object (class 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6,
> )
> [info] - field (class: org.apache.spark.sql.catalyst.
> expressions.objects.UnresolvedMapObjects, name: function, type: interface
> scala.Function1)
> [info] - object (class org.apache.spark.sql.catalyst.
> expressions.objects.UnresolvedMapObjects, unresolvedmapobjects(,
> getcolumnbyordinal(0, ArrayType(IntegerType,false)), Some(interface
> scala.collection.Seq)))
> [info] - field (class: org.apache.spark.sql.catalyst.
> expressions.objects.WrapOption, name: child, type: class
> org.apache.spark.sql.catalyst.expressions.Expression)
> [info] - object (class org.apache.spark.sql.catalyst.
> expressions.objects.WrapOption, wrapoption(unresolvedmapobjects(,
> getcolumnbyordinal(0, ArrayType(IntegerType,false)), Some(interface
> scala.collection.Seq)), ObjectType(interface scala.collection.Seq)))
> [info] - writeObject data (class: scala.collection.immutable.
> List$SerializationProxy)
> [info] - object (class scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@69040c85)
> [info] - writeReplace data (class: scala.collection.immutable.
> List$SerializationProxy)
> [info] - object (class scala.collection.immutable.$colon$colon,
> List(wrapoption(unresolvedmapobjects(, getcolumnbyordinal(0,
> ArrayType(IntegerType,false)), Some(interface scala.collection.Seq)),
> ObjectType(interface scala.collection.Seq
> [info] - field (class: org.apache.spark.sql.catalyst.
> expressions.objects.NewInstance, name: arguments, type: interface
> scala.collection.Seq)
> [info] - object (class org.apache.spark.sql.catalyst.
> expressions.objects.NewInstance, newInstance(class scala.Tuple1))
> [info] - field (class: 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
> name: deserializer, type: class org.apache.spark.sql.catalyst.
> expressions.Expression)
> [info] - object (class 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
> class[_1[0]: array])
> ...
>
>


internal unit tests failing against the latest spark master

2017-04-12 Thread Koert Kuipers
hey all,
today i tried upgrading the spark version we use internally by creating a
new internal release from the spark master branch. last time i did this was
march 7.

with this updated spark i am seeing some serialization errors in the unit
tests for our own libraries. looks like a scala reflection type that is not
serializable is getting sucked into serialization for the encoder?
see below.
best,
koert

[info]   org.apache.spark.SparkException: Task not serializable
[info]   at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
[info]   at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
[info]   at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
[info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2284)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2058)
...
[info] Serialization stack:
[info] - object not serializable (class:
scala.reflect.internal.BaseTypeSeqs$BaseTypeSeq, value: BTS(Int,AnyVal,Any))
[info] - field (class: scala.reflect.internal.Types$TypeRef, name:
baseTypeSeqCache, type: class
scala.reflect.internal.BaseTypeSeqs$BaseTypeSeq)
[info] - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef,
Int)
[info] - field (class:
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name:
elementType$2, type: class scala.reflect.api.Types$TypeApi)
[info] - object (class
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, )
[info] - field (class:
org.apache.spark.sql.catalyst.expressions.objects.UnresolvedMapObjects,
name: function, type: interface scala.Function1)
[info] - object (class
org.apache.spark.sql.catalyst.expressions.objects.UnresolvedMapObjects,
unresolvedmapobjects(, getcolumnbyordinal(0,
ArrayType(IntegerType,false)), Some(interface scala.collection.Seq)))
[info] - field (class:
org.apache.spark.sql.catalyst.expressions.objects.WrapOption, name: child,
type: class org.apache.spark.sql.catalyst.expressions.Expression)
[info] - object (class
org.apache.spark.sql.catalyst.expressions.objects.WrapOption,
wrapoption(unresolvedmapobjects(, getcolumnbyordinal(0,
ArrayType(IntegerType,false)), Some(interface scala.collection.Seq)),
ObjectType(interface scala.collection.Seq)))
[info] - writeObject data (class:
scala.collection.immutable.List$SerializationProxy)
[info] - object (class
scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@69040c85)
[info] - writeReplace data (class:
scala.collection.immutable.List$SerializationProxy)
[info] - object (class scala.collection.immutable.$colon$colon,
List(wrapoption(unresolvedmapobjects(, getcolumnbyordinal(0,
ArrayType(IntegerType,false)), Some(interface scala.collection.Seq)),
ObjectType(interface scala.collection.Seq
[info] - field (class:
org.apache.spark.sql.catalyst.expressions.objects.NewInstance, name:
arguments, type: interface scala.collection.Seq)
[info] - object (class
org.apache.spark.sql.catalyst.expressions.objects.NewInstance,
newInstance(class scala.Tuple1))
[info] - field (class:
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name:
deserializer, type: class
org.apache.spark.sql.catalyst.expressions.Expression)
[info] - object (class
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[_1[0]:
array])
...


Re: Spark Improvement Proposals

2017-03-09 Thread Koert Kuipers
gonna end up with a stackoverflow on recursive votes here

On Thu, Mar 9, 2017 at 1:17 PM, Mark Hamstra 
wrote:

> -0 on voting on whether we need a vote.
>
> On Thu, Mar 9, 2017 at 9:00 AM, Reynold Xin  wrote:
>
>> I'm fine without a vote. (are we voting on wether we need a vote?)
>>
>>
>> On Thu, Mar 9, 2017 at 8:55 AM, Sean Owen  wrote:
>>
>>> I think a VOTE is over-thinking it, and is rarely used, but, can't hurt.
>>> Nah, anyone can call a vote. This really isn't that formal. We just want to
>>> declare and document consensus.
>>>
>>> I think SPIP is just a remix of existing process anyway, and don't think
>>> it will actually do much anyway, which is why I am sanguine about the whole
>>> thing.
>>>
>>> To bring this to a conclusion, I will just put the contents of the doc
>>> in an email tomorrow for a VOTE. Raise any objections now.
>>>
>>> On Thu, Mar 9, 2017 at 3:39 PM Cody Koeninger 
>>> wrote:
>>>
 I started this idea as a fork with a merge-able change to docs.
 Reynold moved it to his google doc, and has suggested during this
 email thread that a vote should occur.
 If a vote needs to occur, I can't see anything on
 http://apache.org/foundation/voting.html suggesting that I can call
 for a vote, which is why I'm asking PMC members to do it since they're
 the ones who would vote anyway.
 Now Sean is saying this is a code/doc change that can just be reviewed
 and merged as usual...which is what I tried to do to begin with.

 The fact that you haven't agreed on a process to agree on your process
 is, I think, an indication that the process really does need
 improvement ;)


>>
>


Re: Spark join over sorted columns of dataset.

2017-03-03 Thread Koert Kuipers
For RDD the shuffle is already skipped but the sort is not. In spark-sorted
we track partitioning and sorting within partitions for key-value RDDs and
can avoid the sort. See:
https://github.com/tresata/spark-sorted

For Dataset/DataFrame such optimizations are done automatically, however
it's currently not always working for Dataset, see:
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-19468

On Mar 3, 2017 11:06 AM, "Rohit Verma"  wrote:

Sending it to dev’s.
Can you please help me providing some ideas for below.

Regards
Rohit
> On Feb 23, 2017, at 3:47 PM, Rohit Verma 
wrote:
>
> Hi
>
> While joining two columns of different dataset, how to optimize join if
both the columns are pre sorted within the dataset.
> So that when spark do sort merge join the sorting phase can skipped.
>
> Regards
> Rohit


Re: Straw poll: dropping support for things like Scala 2.10

2017-03-02 Thread Koert Kuipers
given the issues with scala 2.10 and java 8 i am in favor of dropping scala
2.10 in next release

On Sat, Feb 25, 2017 at 2:10 PM, Sean Owen  wrote:

> I want to bring up the issue of Scala 2.10 support again, to see how
> people feel about it. Key opinions from the previous responses, I think:
>
> Cody: only drop 2.10 support when 2.12 support is added
> Koert: we need all dependencies to support 2.12; Scala updates are pretty
> transparent to IT/ops
> Ofir: make sure to deprecate 2.10 in Spark 2.1
> Reynold: let’s maybe remove support for Scala 2.10 and Java 7 in Spark 2.2
> Matei: let’s not remove things unless they’re burdensome for the project;
> some people are still on old environments that their IT can’t easily update
>
> Scala 2.10 support was deprecated in 2.1, and we did remove Java 7 support
> for 2.2. https://issues.apache.org/jira/browse/SPARK-14220 tracks the
> work to support 2.12, and there is progress, especially in dependencies
> supporting 2.12.
>
> It looks like 2.12 support may even entail a breaking change as documented
> in https://issues.apache.org/jira/browse/SPARK-14643 and will mean
> dropping Kafka 0.8, for example. In any event it’s going to take some
> surgery and a few hacks to make one code base work across 2.11 and 2.12. I
> don’t see this happening for Spark 2.2.0 because there are just a few weeks
> left.
>
> Supporting three versions at once is probably infeasible, so dropping 2.10
> should precede 2.12 support. Right now, I would like to make progress
> towards changes that 2.12 will require but that 2.11/2.10 can support. For
> example, we have to update scalatest, breeze, chill, etc. and can do that
> before 2.12 is enabled. However I’m finding making those changes tricky or
> maybe impossible in one case while 2.10 is still supported.
>
> For 2.2.0, I’m wondering if it makes sense to go ahead and drop 2.10
> support, and even get in additional prep work for 2.12, into the 2.2.0
> release. The move to support 2.12 in 2.3.0 would then be a smaller change.
> It isn’t strictly necessary. We could delay all of that until after 2.2.0
> and get it all done between 2.2.0 and 2.3.0. But I wonder if 2.10 is legacy
> enough at this stage to drop for Spark 2.2.0?
>
> I don’t feel strongly about it but there are some reasonable arguments for
> dropping it:
>
> - 2.10 doesn’t technically support Java 8, though we do have it working
> still even after requiring Java 8
> - Safe to say virtually all common _2.10 libraries has a _2.11 counterpart
> at this point?
> - 2.10.x was “EOL” in September 2015 with the final 2.10.6 release
> - For a vendor viewpoint: CDH only supports Scala 2.11 with Spark 2.x
>
> Before I open a JIRA, just soliciting opinions.
>
>
> On Tue, Oct 25, 2016 at 4:36 PM Sean Owen  wrote:
>
>> I'd like to gauge where people stand on the issue of dropping support for
>> a few things that were considered for 2.0.
>>
>> First: Scala 2.10. We've seen a number of build breakages this week
>> because the PR builder only tests 2.11. No big deal at this stage, but, it
>> did cause me to wonder whether it's time to plan to drop 2.10 support,
>> especially with 2.12 coming soon.
>>
>> Next, Java 7. It's reasonably old and out of public updates at this
>> stage. It's not that painful to keep supporting, to be honest. It would
>> simplify some bits of code, some scripts, some testing.
>>
>> Hadoop versions: I think the the general argument is that most anyone
>> would be using, at the least, 2.6, and it would simplify some code that has
>> to reflect to use not-even-that-new APIs. It would remove some moderate
>> complexity in the build.
>>
>>
>> "When" is a tricky question. Although it's a little aggressive for minor
>> releases, I think these will all happen before 3.x regardless. 2.1.0 is not
>> out of the question, though coming soon. What about ... 2.2.0?
>>
>>
>> Although I tend to favor dropping support, I'm mostly asking for current
>> opinions.
>>
>


Re: Request for comments: Java 7 removal

2017-02-14 Thread Koert Kuipers
what about the conversation about dropping scala 2.10?

On Fri, Feb 10, 2017 at 11:47 AM, Sean Owen  wrote:

> As you have seen, there's a WIP PR to implement removal of Java 7 support:
> https://github.com/apache/spark/pull/16871
>
> I have heard several +1s at https://issues.apache.org/
> jira/browse/SPARK-19493 but am asking for concerns too, now that there's
> a concrete change to review.
>
> If this goes in for 2.2 it can be followed by more extensive update of the
> Java code to take advantage of Java 8; this is more or less the baseline
> change.
>
> We also just removed Hadoop 2.5 support. I know there was talk about
> removing Python 2.6. I have no opinion on that myself, but, might be time
> to revive that conversation too.
>


Re: benefits of code gen

2017-02-10 Thread Koert Kuipers
yes agreed. however i believe nullSafeEval is not used for codegen?

On Fri, Feb 10, 2017 at 4:56 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Function1 is specialized, but nullSafeEval is Any => Any, so that's still
> going to box in the non-codegened execution path.
>
> On Fri, Feb 10, 2017 at 1:32 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> based on that i take it that math functions would be primary
>> beneficiaries since they work on primitives.
>>
>> so if i take UnaryMathExpression as an example, would i not get the same
>> benefit if i change it to this?
>>
>> abstract class UnaryMathExpression(val f: Double => Double, name: String)
>>   extends UnaryExpression with Serializable with ImplicitCastInputTypes {
>>
>>   override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
>>   override def dataType: DataType = DoubleType
>>   override def nullable: Boolean = true
>>   override def toString: String = s"$name($child)"
>>   override def prettyName: String = name
>>
>>   protected override def nullSafeEval(input: Any): Any = {
>> f(input.asInstanceOf[Double])
>>   }
>>
>>   // name of function in java.lang.Math
>>   def funcName: String = name.toLowerCase
>>
>>   def function(d: Double): Double = f(d)
>>
>>   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
>> val self = ctx.addReferenceObj(name, this, getClass.getName)
>> defineCodeGen(ctx, ev, c => s"$self.function($c)")
>>   }
>> }
>>
>> admittedly in this case the benefit in terms of removing complex codegen
>> is not there (the codegen was only one line), but if i can remove codegen
>> here i could also remove it in lots of other places where it does get very
>> unwieldy simply by replacing it with calls to methods.
>>
>> Function1 is specialized, so i think (or hope) that my version does no
>> extra boxes/unboxing.
>>
>> On Fri, Feb 10, 2017 at 2:24 PM, Reynold Xin <r...@databricks.com> wrote:
>>
>>> With complex types it doesn't work as well, but for primitive types the
>>> biggest benefit of whole stage codegen is that we don't even need to put
>>> the intermediate data into rows or columns anymore. They are just variables
>>> (stored in CPU registers).
>>>
>>> On Fri, Feb 10, 2017 at 8:22 PM, Koert Kuipers <ko...@tresata.com>
>>> wrote:
>>>
>>>> so i have been looking for a while now at all the catalyst expressions,
>>>> and all the relative complex codegen going on.
>>>>
>>>> so first off i get the benefit of codegen to turn a bunch of chained
>>>> iterators transformations into a single codegen stage for spark. that makes
>>>> sense to me, because it avoids a bunch of overhead.
>>>>
>>>> but what i am not so sure about is what the benefit is of converting
>>>> the actual stuff that happens inside the iterator transformations into
>>>> codegen.
>>>>
>>>> say if we have an expression that has 2 children and creates a struct
>>>> for them. why would this be faster in codegen by re-creating the code to do
>>>> this in a string (which is complex and error prone) compared to simply have
>>>> the codegen call the normal method for this in my class?
>>>>
>>>> i see so much trivial code be re-created in codegen. stuff like this:
>>>>
>>>>   private[this] def castToDateCode(
>>>>   from: DataType,
>>>>   ctx: CodegenContext): CastFunction = from match {
>>>> case StringType =>
>>>>   val intOpt = ctx.freshName("intOpt")
>>>>   (c, evPrim, evNull) => s"""
>>>> scala.Option $intOpt =
>>>>   org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDat
>>>> e($c);
>>>> if ($intOpt.isDefined()) {
>>>>   $evPrim = ((Integer) $intOpt.get()).intValue();
>>>> } else {
>>>>   $evNull = true;
>>>> }
>>>>"""
>>>>
>>>> is this really faster than simply calling an equivalent functions from
>>>> the codegen, and keeping the codegen logic restricted to the "unrolling" of
>>>> chained iterators?
>>>>
>>>>
>>>
>>
>


Re: benefits of code gen

2017-02-10 Thread Koert Kuipers
based on that i take it that math functions would be primary beneficiaries
since they work on primitives.

so if i take UnaryMathExpression as an example, would i not get the same
benefit if i change it to this?

abstract class UnaryMathExpression(val f: Double => Double, name: String)
  extends UnaryExpression with Serializable with ImplicitCastInputTypes {

  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
  override def dataType: DataType = DoubleType
  override def nullable: Boolean = true
  override def toString: String = s"$name($child)"
  override def prettyName: String = name

  protected override def nullSafeEval(input: Any): Any = {
f(input.asInstanceOf[Double])
  }

  // name of function in java.lang.Math
  def funcName: String = name.toLowerCase

  def function(d: Double): Double = f(d)

  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val self = ctx.addReferenceObj(name, this, getClass.getName)
defineCodeGen(ctx, ev, c => s"$self.function($c)")
  }
}

admittedly in this case the benefit in terms of removing complex codegen is
not there (the codegen was only one line), but if i can remove codegen here
i could also remove it in lots of other places where it does get very
unwieldy simply by replacing it with calls to methods.

Function1 is specialized, so i think (or hope) that my version does no
extra boxes/unboxing.

On Fri, Feb 10, 2017 at 2:24 PM, Reynold Xin <r...@databricks.com> wrote:

> With complex types it doesn't work as well, but for primitive types the
> biggest benefit of whole stage codegen is that we don't even need to put
> the intermediate data into rows or columns anymore. They are just variables
> (stored in CPU registers).
>
> On Fri, Feb 10, 2017 at 8:22 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> so i have been looking for a while now at all the catalyst expressions,
>> and all the relative complex codegen going on.
>>
>> so first off i get the benefit of codegen to turn a bunch of chained
>> iterators transformations into a single codegen stage for spark. that makes
>> sense to me, because it avoids a bunch of overhead.
>>
>> but what i am not so sure about is what the benefit is of converting the
>> actual stuff that happens inside the iterator transformations into codegen.
>>
>> say if we have an expression that has 2 children and creates a struct for
>> them. why would this be faster in codegen by re-creating the code to do
>> this in a string (which is complex and error prone) compared to simply have
>> the codegen call the normal method for this in my class?
>>
>> i see so much trivial code be re-created in codegen. stuff like this:
>>
>>   private[this] def castToDateCode(
>>   from: DataType,
>>   ctx: CodegenContext): CastFunction = from match {
>> case StringType =>
>>   val intOpt = ctx.freshName("intOpt")
>>   (c, evPrim, evNull) => s"""
>> scala.Option $intOpt =
>>   org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDat
>> e($c);
>> if ($intOpt.isDefined()) {
>>   $evPrim = ((Integer) $intOpt.get()).intValue();
>> } else {
>>   $evNull = true;
>> }
>>"""
>>
>> is this really faster than simply calling an equivalent functions from
>> the codegen, and keeping the codegen logic restricted to the "unrolling" of
>> chained iterators?
>>
>>
>


benefits of code gen

2017-02-10 Thread Koert Kuipers
so i have been looking for a while now at all the catalyst expressions, and
all the relative complex codegen going on.

so first off i get the benefit of codegen to turn a bunch of chained
iterators transformations into a single codegen stage for spark. that makes
sense to me, because it avoids a bunch of overhead.

but what i am not so sure about is what the benefit is of converting the
actual stuff that happens inside the iterator transformations into codegen.

say if we have an expression that has 2 children and creates a struct for
them. why would this be faster in codegen by re-creating the code to do
this in a string (which is complex and error prone) compared to simply have
the codegen call the normal method for this in my class?

i see so much trivial code be re-created in codegen. stuff like this:

  private[this] def castToDateCode(
  from: DataType,
  ctx: CodegenContext): CastFunction = from match {
case StringType =>
  val intOpt = ctx.freshName("intOpt")
  (c, evPrim, evNull) => s"""
scala.Option $intOpt =
  org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDate($c);
if ($intOpt.isDefined()) {
  $evPrim = ((Integer) $intOpt.get()).intValue();
} else {
  $evNull = true;
}
   """

is this really faster than simply calling an equivalent functions from the
codegen, and keeping the codegen logic restricted to the "unrolling" of
chained iterators?


when is doGenCode called?

2017-02-08 Thread Koert Kuipers
hello all,
i am trying to add an Expression to catalyst.

my Expression compiles fine and has:

override def eval(input: InternalRow): Any = ...

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ...

it also seems to run fine. but i only ever see eval get called. how do i
tell spark to use doGenCode instead of eval?

thanks! koert


Re: Executors exceed maximum memory defined with `--executor-memory` in Spark 2.1.0

2017-01-22 Thread Koert Kuipers
could this be related to SPARK-18787?

On Sun, Jan 22, 2017 at 1:45 PM, Reynold Xin  wrote:

> Are you using G1 GC? G1 sometimes uses a lot more memory than the size
> allocated.
>
>
> On Sun, Jan 22, 2017 at 12:58 AM StanZhai  wrote:
>
>> Hi all,
>>
>>
>>
>> We just upgraded our Spark from 1.6.2 to 2.1.0.
>>
>>
>>
>> Our Spark application is started by spark-submit with config of
>>
>> `--executor-memory 35G` in standalone model, but the actual use of memory
>> up
>>
>> to 65G after a full gc(jmap -histo:live $pid) as follow:
>>
>>
>>
>> test@c6 ~ $ ps aux | grep CoarseGrainedExecutorBackend
>>
>> test  181941  181 34.7 94665384 68836752 ?   Sl   09:25 711:21
>>
>> /home/test/service/jdk/bin/java -cp
>>
>> /home/test/service/hadoop/share/hadoop/common/hadoop-
>> lzo-0.4.20-SNAPSHOT.jar:/home/test/service/hadoop/share/
>> hadoop/common/hadoop-lzo-0.4.20-SNAPSHOT.jar:/home/test/
>> service/spark/conf/:/home/test/service/spark/jars/*:/
>> home/test/service/hadoop/etc/hadoop/
>>
>> -Xmx35840M -Dspark.driver.port=47781 -XX:+PrintGCDetails
>>
>> -XX:+PrintGCDateStamps -Xloggc:./gc.log -verbose:gc
>>
>> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
>>
>> spark://coarsegrainedschedu...@xxx.xxx.xxx.xxx:47781 --executor-id 1
>>
>> --hostname test-192 --cores 36 --app-id app-20170122092509-0017
>> --worker-url
>>
>> spark://Worker@test-192:33890
>>
>>
>>
>> Our Spark jobs are all sql.
>>
>>
>>
>> The exceed memory looks like off-heap memory, but the default value of
>>
>> `spark.memory.offHeap.enabled` is `false`.
>>
>>
>>
>> We didn't find the problem in Spark 1.6.x, what causes this in Spark
>> 2.1.0?
>>
>>
>>
>> Any help is greatly appreicated!
>>
>>
>>
>> Best,
>>
>> Stan
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> View this message in context: http://apache-spark-
>> developers-list.1001551.n3.nabble.com/Executors-exceed-
>> maximum-memory-defined-with-executor-memory-in-Spark-2-1-0-tp20697.html
>>
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>>
>>
>> -
>>
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>>
>>


Re: shapeless in spark 2.1.0

2016-12-29 Thread Koert Kuipers
we also use spray for webservices that execute on spark, and spray depends
on even older (and incompatible) shapeless 1.x
to get rid of the old shapeless i would have to upgrade from spray to
akka-http, which means going to java 8

this might also affect spark-job-server, which it seems uses spray.

On Thu, Dec 29, 2016 at 1:17 PM, Maciej Szymkiewicz <mszymkiew...@gmail.com>
wrote:

> Breeze 0.13 (RC-1 right now) bumps shapeless to 2.2.0 and 2.2.5 for Scala
> 2.10 and 2.11 respectively:
>
> https://github.com/scalanlp/breeze/pull/509
>
>
> On 12/29/2016 07:13 PM, Ryan Williams wrote:
>
> Other option would presumably be for someone to make a release of breeze
> with old-shapeless shaded... unless shapeless classes are exposed in
> breeze's public API, in which case you'd have to copy the relevant
> shapeless classes into breeze and then publish that?
>
> On Thu, Dec 29, 2016, 1:05 PM Sean Owen <so...@cloudera.com> wrote:
>
>> It is breeze, but, what's the option? It can't be excluded. I think this
>> falls in the category of things an app would need to shade in this
>> situation.
>>
>> On Thu, Dec 29, 2016, 16:49 Koert Kuipers <ko...@tresata.com> wrote:
>>
>> i just noticed that spark 2.1.0 bring in a new transitive dependency on
>> shapeless 2.0.0
>>
>> shapeless is a popular library for scala users, and shapeless 2.0.0 is
>> old (2014) and not compatible with more current versions.
>>
>> so this means a spark user that uses shapeless in his own development
>> cannot upgrade safely from 2.0.0 to 2.1.0, i think.
>>
>> wish i had noticed this sooner
>>
>>
> --
> Maciej Szymkiewicz
>
>


shapeless in spark 2.1.0

2016-12-29 Thread Koert Kuipers
i just noticed that spark 2.1.0 bring in a new transitive dependency on
shapeless 2.0.0

shapeless is a popular library for scala users, and shapeless 2.0.0 is old
(2014) and not compatible with more current versions.

so this means a spark user that uses shapeless in his own development
cannot upgrade safely from 2.0.0 to 2.1.0, i think.

wish i had noticed this sooner


Re: Aggregating over sorted data

2016-12-22 Thread Koert Kuipers
yes it's less optimal because an abstraction is missing and with
mapPartitions it is done without optimizations. but aggregator is not the
right abstraction to begin with, is assumes a monoid which means no
ordering guarantees. you need a fold operation.

On Dec 22, 2016 02:20, "Liang-Chi Hsieh" <vii...@gmail.com> wrote:

>
> You can't use existing aggregation functions with that. Besides, the
> execution plan of `mapPartitions` doesn't support wholestage codegen.
> Without that and some optimization around aggregation, that might be
> possible performance degradation. Also when you have more than one keys in
> a
> partition, you will need to take care of that in your function applied to
> each partition.
>
>
> Koert Kuipers wrote
> > it can also be done with repartition + sortWithinPartitions +
> > mapPartitions.
> > perhaps not as convenient but it does not rely on undocumented behavior.
> > i used this approach in spark-sorted. see here:
> > https://github.com/tresata/spark-sorted/blob/master/src/
> main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala
> >
> > On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh 
>
> > viirya@
>
> >  wrote:
> >
> >>
> >> I agreed that to make sure this work, you might need to know the Spark
> >> internal implementation for APIs such as `groupBy`.
> >>
> >> But without any more changes to current Spark implementation, I think
> >> this
> >> is the one possible way to achieve the required function to aggregate on
> >> sorted data per key.
> >>
> >>
> >>
> >>
> >>
> >> -
> >> Liang-Chi Hsieh | @viirya
> >> Spark Technology Center
> >> http://www.spark.tc/
> >> --
> >> View this message in context: http://apache-spark-
> >> developers-list.1001551.n3.nabble.com/Aggregating-over-
> >> sorted-data-tp1p20331.html
> >> Sent from the Apache Spark Developers List mailing list archive at
> >> Nabble.com.
> >>
> >> -
> >> To unsubscribe e-mail:
>
> > dev-unsubscribe@.apache
>
> >>
> >>
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Aggregating-over-
> sorted-data-tp1p20333.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Aggregating over sorted data

2016-12-21 Thread Koert Kuipers
it can also be done with repartition + sortWithinPartitions + mapPartitions.
perhaps not as convenient but it does not rely on undocumented behavior.
i used this approach in spark-sorted. see here:
https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala

On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh  wrote:

>
> I agreed that to make sure this work, you might need to know the Spark
> internal implementation for APIs such as `groupBy`.
>
> But without any more changes to current Spark implementation, I think this
> is the one possible way to achieve the required function to aggregate on
> sorted data per key.
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Aggregating-over-
> sorted-data-tp1p20331.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Aggregating over sorted data

2016-12-21 Thread Koert Kuipers
i think this works but it relies on groupBy and agg respecting the sorting.
the api provides no such guarantee, so this could break in future versions.
i would not rely on this i think...

On Dec 20, 2016 18:58, "Liang-Chi Hsieh"  wrote:

Hi,

Can you try the combination of `repartition` + `sortWithinPartitions` on the
dataset?

E.g.,

val df = Seq((2, "b c a"), (1, "c a b"), (3, "a c b")).toDF("number",
"letters")
val df2 =
  df.explode('letters) {
case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
  }

df2
  .select('number, '_1 as 'letter)
  .repartition('number)
  .sortWithinPartitions('number, 'letter)
  .groupBy('number)
  .agg(collect_list('letter))
  .show()

+--++
|number|collect_list(letter)|
+--++
| 3|   [a, b, c]|
| 1|   [a, b, c]|
| 2|   [a, b, c]|
+--++

I think it should let you do aggregate on sorted data per key.




-
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-
developers-list.1001551.n3.nabble.com/Aggregating-over-
sorted-data-tp1p20310.html
Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Re: Aggregating over sorted data

2016-12-19 Thread Koert Kuipers
take a look at:
https://issues.apache.org/jira/browse/SPARK-15798


On Dec 19, 2016 00:17, "Robin East"  wrote:

This is also a feature we need for our time-series processing



> On 19 Dec 2016, at 04:07, Liang-Chi Hsieh  wrote:
>
>
> Hi,
>
> As I know, Spark SQL doesn't provide native support for this feature now.
> After searching, I found only few database systems support it, e.g.,
> PostgreSQL.
>
> Actually based on the Spark SQL's aggregate system, I think it is not very
> difficult to add the support for this feature. The problem is how
frequently
> this feature is needed for Spark SQL users and if it is worth adding this,
> because as I see, this feature is not very common.
>
> Alternative possible to achieve this in current Spark SQL, is to use
> Aggregator with Dataset API. You can write your custom Aggregator which
has
> an user-defined JVM object as buffer to hold the input data into your
> aggregate function. But you may need to write necessary encoder for the
> buffer object.
>
> If you really need this feature, you may open a Jira to ask others'
opinion
> about this feature.
>
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
developers-list.1001551.n3.nabble.com/Aggregating-over-
sorted-data-tp1p20273.html
> Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Re: [VOTE] Apache Spark 2.1.0 (RC1)

2016-12-04 Thread Koert Kuipers
with the current branch-2.1 after rc1 i am now also seeing this error in
our unit tests:

 java.lang.UnsupportedOperationException: Cannot create encoder for Option
of Product type, because Product type is represented as a row, and the
entire row can not be null in Spark SQL like normal databases. You can wrap
your type with Tuple1 if you do want top level null Product objects, e.g.
instead of creating `Dataset[Option[MyClass]]`, you can do something like
`val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)),
Tuple1(null)).toDS`

the issue is that we have Aggregator[String, Option[SomeCaseClass], String]
and it doesn't like creating the Encoder for that Option[SameCaseClass]
anymore.

this is related to SPARK-18251
<https://issues.apache.org/jira/browse/SPARK-18251>
we have a workaround for this: we will wrap all buffer encoder types in
Tuple1. a little inefficient but its okay with me.

On Sun, Dec 4, 2016 at 11:16 PM, Koert Kuipers <ko...@tresata.com> wrote:

> somewhere between rc1 and the current head of branch-2.1 i started seeing
> an NPE in our in-house unit tests for Dataset + Aggregator. i created
> SPARK-18711 <https://issues.apache.org/jira/browse/SPARK-18711> for this.
> <https://issues.apache.org/jira/browse/SPARK-18711>
>
> On Mon, Nov 28, 2016 at 8:25 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> Please vote on releasing the following candidate as Apache Spark version
>> 2.1.0. The vote is open until Thursday, December 1, 2016 at 18:00 UTC and
>> passes if a majority of at least 3 +1 PMC votes are cast.
>>
>> [ ] +1 Release this package as Apache Spark 2.1.0
>> [ ] -1 Do not release this package because ...
>>
>>
>> To learn more about Apache Spark, please see http://spark.apache.org/
>>
>> The tag to be voted on is v2.1.0-rc1 (80aabc0bd33dc5661a90133156247
>> e7a8c1bf7f5)
>>
>> The release files, including signatures, digests, etc. can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc1-bin/
>>
>> Release artifacts are signed with the following key:
>> https://people.apache.org/keys/committer/pwendell.asc
>>
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1216/
>>
>> The documentation corresponding to this release can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc1-docs/
>>
>>
>> ===
>> How can I help test this release?
>> ===
>> If you are a Spark user, you can help us test this release by taking an
>> existing Spark workload and running on this release candidate, then
>> reporting any regressions.
>>
>> ===
>> What should happen to JIRA tickets still targeting 2.1.0?
>> ===
>> Committers should look at those and triage. Extremely important bug
>> fixes, documentation, and API tweaks that impact compatibility should be
>> worked on immediately. Everything else please retarget to 2.1.1 or 2.2.0.
>>
>>
>>
>


Re: [VOTE] Apache Spark 2.1.0 (RC1)

2016-12-04 Thread Koert Kuipers
somewhere between rc1 and the current head of branch-2.1 i started seeing
an NPE in our in-house unit tests for Dataset + Aggregator. i created
SPARK-18711  for this.


On Mon, Nov 28, 2016 at 8:25 PM, Reynold Xin  wrote:

> Please vote on releasing the following candidate as Apache Spark version
> 2.1.0. The vote is open until Thursday, December 1, 2016 at 18:00 UTC and
> passes if a majority of at least 3 +1 PMC votes are cast.
>
> [ ] +1 Release this package as Apache Spark 2.1.0
> [ ] -1 Do not release this package because ...
>
>
> To learn more about Apache Spark, please see http://spark.apache.org/
>
> The tag to be voted on is v2.1.0-rc1 (80aabc0bd33dc5661a90133156247e
> 7a8c1bf7f5)
>
> The release files, including signatures, digests, etc. can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc1-bin/
>
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1216/
>
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc1-docs/
>
>
> ===
> How can I help test this release?
> ===
> If you are a Spark user, you can help us test this release by taking an
> existing Spark workload and running on this release candidate, then
> reporting any regressions.
>
> ===
> What should happen to JIRA tickets still targeting 2.1.0?
> ===
> Committers should look at those and triage. Extremely important bug fixes,
> documentation, and API tweaks that impact compatibility should be worked on
> immediately. Everything else please retarget to 2.1.1 or 2.2.0.
>
>
>


Re: [VOTE] Apache Spark 2.1.0 (RC1)

2016-11-30 Thread Koert Kuipers
after seeing Hyukjin Kwon's comment in SPARK-17583 i think its safe to say
that what i am seeing with csv is not bug or regression. it was unintended
and/or unreliable behavior in spark 2.0.x

On Wed, Nov 30, 2016 at 5:56 PM, Koert Kuipers <ko...@tresata.com> wrote:

> running our inhouse unit-tests (that work with spark 2.0.2) against spark
> 2.1.0-rc1 i see the following issues.
>
> any test that use avro (spark-avro 3.1.0) have this error:
> java.lang.AbstractMethodError
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$
> SingleDirectoryWriteTask.(FileFormatWriter.scala:232)
> at org.apache.spark.sql.execution.datasources.
> FileFormatWriter$.org$apache$spark$sql$execution$
> datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)
> at org.apache.spark.sql.execution.datasources.
> FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(
> FileFormatWriter.scala:129)
> at org.apache.spark.sql.execution.datasources.
> FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(
> FileFormatWriter.scala:128)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:282)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
> so looks like some api got changed or broken. i dont know if this is an
> issue or if this is OK.
>
> also a bunch of unit test related to reading and writing csv files fail.
> the issue seems to be newlines inside quoted values. this worked before and
> now it doesnt work anymore. i dont know if this was an accidentally
> supported feature and its ok to be broken? i am not even sure it is a good
> idea to support newlines inside quoted values. anyhow they still get
> written out the same way as before, but now when reading it back in things
> break down.
>
>
> On Mon, Nov 28, 2016 at 8:25 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> Please vote on releasing the following candidate as Apache Spark version
>> 2.1.0. The vote is open until Thursday, December 1, 2016 at 18:00 UTC and
>> passes if a majority of at least 3 +1 PMC votes are cast.
>>
>> [ ] +1 Release this package as Apache Spark 2.1.0
>> [ ] -1 Do not release this package because ...
>>
>>
>> To learn more about Apache Spark, please see http://spark.apache.org/
>>
>> The tag to be voted on is v2.1.0-rc1 (80aabc0bd33dc5661a90133156247
>> e7a8c1bf7f5)
>>
>> The release files, including signatures, digests, etc. can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc1-bin/
>>
>> Release artifacts are signed with the following key:
>> https://people.apache.org/keys/committer/pwendell.asc
>>
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1216/
>>
>> The documentation corresponding to this release can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc1-docs/
>>
>>
>> ===
>> How can I help test this release?
>> ===
>> If you are a Spark user, you can help us test this release by taking an
>> existing Spark workload and running on this release candidate, then
>> reporting any regressions.
>>
>> ===
>> What should happen to JIRA tickets still targeting 2.1.0?
>> ===
>> Committers should look at those and triage. Extremely important bug
>> fixes, documentation, and API tweaks that impact compatibility should be
>> worked on immediately. Everything else please retarget to 2.1.1 or 2.2.0.
>>
>>
>>
>


Re: [SparkStreaming] 1 SQL tab for each SparkStreaming batch in SparkUI

2016-11-22 Thread Koert Kuipers
you are creating a new hive context per microbatch? is that a good idea?

On Tue, Nov 22, 2016 at 8:51 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Has anybody seen this behavior (see tha attached picture) in Spark
> Streaming?
> It started to happen here after I changed the HiveContext creation to
> stream.foreachRDD {
> rdd =>
> val hiveContext = new HiveContext(rdd.sparkContext)
> }
>
> Is this expected?
>
> Kind Regards,
> Dirceu
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>


Re: getting encoder implicits to be more accurate

2016-11-14 Thread Koert Kuipers
sorry this message by me was confusing. i was frustrated about how hard it
is to use the Encoder machinery myself directly on Row objects, this is
unrelated to the question if a shapeless based approach like sam suggest
would be better way to do encoders in general

On Mon, Nov 14, 2016 at 3:03 PM, Koert Kuipers <ko...@tresata.com> wrote:

> that makes sense. we have something like that inhouse as well, but not as
> nice and not using shapeless (we simply relied on sbt-boilerplate to handle
> all tuples and do not support case classes).
>
> however the frustrating part is that spark sql already has this more or
> less. look for example at ExpressionEncoder.fromRow and
> ExpressionEncoder.toRow. but these methods use InternalRow while the rows
> exposed to me as a user are not that.
>
> at this point i am more tempted to simply open up InternalRow at a few
> places strategically than to maintain another inhouse row marshalling
> class. once i have InternalRows looks of good stuff is available to me to
> use.
>
>
>
>
> On Wed, Nov 2, 2016 at 12:15 AM, Sam Goodwin <sam.goodwi...@gmail.com>
> wrote:
>
>> You don't need compiler time macros for this, you can do it quite easily
>> using shapeless. I've been playing with a project which borrows ideas from
>> spray-json and spray-json-shapeless to implement Row marshalling for
>> arbitrary case classes. It's checked and generated at compile time,
>> supports arbitrary/nested case classes, and allows custom types. It is also
>> entirely pluggable meaning you can bypass the default implementations and
>> provide your own, just like any type class.
>>
>> https://github.com/upio/spark-sql-formats
>>
>>
>> *From:* Michael Armbrust <mich...@databricks.com>
>> *Date:* October 26, 2016 at 12:50:23 PM PDT
>> *To:* Koert Kuipers <ko...@tresata.com>
>> *Cc:* Ryan Blue <rb...@netflix.com>, "dev@spark.apache.org" <
>> dev@spark.apache.org>
>> *Subject:* *Re: getting encoder implicits to be more accurate*
>>
>> Sorry, I realize that set is only one example here, but I don't think
>> that making the type of the implicit more narrow to include only ProductN
>> or something eliminates the issue.  Even with that change, we will fail to
>> generate an encoder with the same error if you, for example, have a field
>> of your case class that is an unsupported type.
>>
>>
>>
>> Short of changing this to compile-time macros, I think we are stuck with
>> this class of errors at runtime.  The simplest solution seems to be to
>> expand the set of thing we can handle as much as possible and allow users
>> to turn on a kryo fallback for expression encoders.  I'd be hesitant to
>> make this the default though, as behavior would change with each release
>> that adds support for more types.  I would be very supportive of making
>> this fallback a built-in option though.
>>
>>
>>
>> On Wed, Oct 26, 2016 at 11:47 AM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>>
>> yup, it doesnt really solve the underlying issue.
>>
>> we fixed it internally by having our own typeclass that produces encoders
>> and that does check the contents of the products, but we did this by simply
>> supporting Tuple1 - Tuple22 and Option explicitly, and not supporting
>> Product, since we dont have a need for case classes
>>
>> if case classes extended ProductN (which they will i think in scala
>> 2.12?) then we could drop Product and support Product1 - Product22 and
>> Option explicitly while checking the classes they contain. that would be
>> the cleanest.
>>
>>
>>
>> On Wed, Oct 26, 2016 at 2:33 PM, Ryan Blue <rb...@netflix.com> wrote:
>>
>> Isn't the problem that Option is a Product and the class it contains
>> isn't checked? Adding support for Set fixes the example, but the problem
>> would happen with any class there isn't an encoder for, right?
>>
>>
>>
>> On Wed, Oct 26, 2016 at 11:18 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>> Hmm, that is unfortunate.  Maybe the best solution is to add support for
>> sets?  I don't think that would be super hard.
>>
>>
>>
>> On Tue, Oct 25, 2016 at 8:52 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>> i am trying to use encoders as a typeclass where if it fails to find an
>> ExpressionEncoder it falls back to KryoEncoder.
>>
>> the issue seems to be that ExpressionEncoder claims a little more than it
>> can handle here:
>>   implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] =
>> Encoders.product[T]
>>
>> this "claims" to handle for example Option[Set[Int]], but it really
>> cannot handle Set so it leads to a runtime exception.
>>
>> would it be useful to make this a little more specific? i guess the
>> challenge is going to be case classes which unfortunately dont extend
>> Product1, Product2, etc.
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>> Software Engineer
>>
>> Netflix
>>
>>
>>
>>
>


Re: getting encoder implicits to be more accurate

2016-11-14 Thread Koert Kuipers
just taking it for a quick spin it looks great, with correct support for
nested rows and using option for nullability.

scala> val format = implicitly[RowFormat[(String, Seq[(String,
Option[Int])])]]
format: com.github.upio.spark.sql.RowFormat[(String, Seq[(String,
Option[Int])])] = com.github.upio.spark.sql.FamilyFormats$$anon$3@2c0961e2

scala> format.schema
res12: org.apache.spark.sql.types.StructType =
StructType(StructField(_1,StringType,false),
StructField(_2,ArrayType(StructType(StructField(_1,StringType,false),
StructField(_2,IntegerType,true)),true),false))

scala> val x = format.read(Row("a", Seq(Row("a", 5
x: (String, Seq[(String, Option[Int])]) = (a,List((a,Some(5

scala> format.write(x)
res13: org.apache.spark.sql.Row = [a,List([a,5])]



On Mon, Nov 14, 2016 at 3:10 PM, Koert Kuipers <ko...@tresata.com> wrote:

> agreed on your point that this can be done without macros
>
> On Wed, Nov 2, 2016 at 12:15 AM, Sam Goodwin <sam.goodwi...@gmail.com>
> wrote:
>
>> You don't need compiler time macros for this, you can do it quite easily
>> using shapeless. I've been playing with a project which borrows ideas from
>> spray-json and spray-json-shapeless to implement Row marshalling for
>> arbitrary case classes. It's checked and generated at compile time,
>> supports arbitrary/nested case classes, and allows custom types. It is also
>> entirely pluggable meaning you can bypass the default implementations and
>> provide your own, just like any type class.
>>
>> https://github.com/upio/spark-sql-formats
>>
>>
>> *From:* Michael Armbrust <mich...@databricks.com>
>> *Date:* October 26, 2016 at 12:50:23 PM PDT
>> *To:* Koert Kuipers <ko...@tresata.com>
>> *Cc:* Ryan Blue <rb...@netflix.com>, "dev@spark.apache.org" <
>> dev@spark.apache.org>
>> *Subject:* *Re: getting encoder implicits to be more accurate*
>>
>> Sorry, I realize that set is only one example here, but I don't think
>> that making the type of the implicit more narrow to include only ProductN
>> or something eliminates the issue.  Even with that change, we will fail to
>> generate an encoder with the same error if you, for example, have a field
>> of your case class that is an unsupported type.
>>
>>
>>
>> Short of changing this to compile-time macros, I think we are stuck with
>> this class of errors at runtime.  The simplest solution seems to be to
>> expand the set of thing we can handle as much as possible and allow users
>> to turn on a kryo fallback for expression encoders.  I'd be hesitant to
>> make this the default though, as behavior would change with each release
>> that adds support for more types.  I would be very supportive of making
>> this fallback a built-in option though.
>>
>>
>>
>> On Wed, Oct 26, 2016 at 11:47 AM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>>
>> yup, it doesnt really solve the underlying issue.
>>
>> we fixed it internally by having our own typeclass that produces encoders
>> and that does check the contents of the products, but we did this by simply
>> supporting Tuple1 - Tuple22 and Option explicitly, and not supporting
>> Product, since we dont have a need for case classes
>>
>> if case classes extended ProductN (which they will i think in scala
>> 2.12?) then we could drop Product and support Product1 - Product22 and
>> Option explicitly while checking the classes they contain. that would be
>> the cleanest.
>>
>>
>>
>> On Wed, Oct 26, 2016 at 2:33 PM, Ryan Blue <rb...@netflix.com> wrote:
>>
>> Isn't the problem that Option is a Product and the class it contains
>> isn't checked? Adding support for Set fixes the example, but the problem
>> would happen with any class there isn't an encoder for, right?
>>
>>
>>
>> On Wed, Oct 26, 2016 at 11:18 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>> Hmm, that is unfortunate.  Maybe the best solution is to add support for
>> sets?  I don't think that would be super hard.
>>
>>
>>
>> On Tue, Oct 25, 2016 at 8:52 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>> i am trying to use encoders as a typeclass where if it fails to find an
>> ExpressionEncoder it falls back to KryoEncoder.
>>
>> the issue seems to be that ExpressionEncoder claims a little more than it
>> can handle here:
>>   implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] =
>> Encoders.product[T]
>>
>> this "claims" to handle for example Option[Set[Int]], but it really
>> cannot handle Set so it leads to a runtime exception.
>>
>> would it be useful to make this a little more specific? i guess the
>> challenge is going to be case classes which unfortunately dont extend
>> Product1, Product2, etc.
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>> Software Engineer
>>
>> Netflix
>>
>>
>>
>>
>


Re: getting encoder implicits to be more accurate

2016-11-14 Thread Koert Kuipers
agreed on your point that this can be done without macros

On Wed, Nov 2, 2016 at 12:15 AM, Sam Goodwin <sam.goodwi...@gmail.com>
wrote:

> You don't need compiler time macros for this, you can do it quite easily
> using shapeless. I've been playing with a project which borrows ideas from
> spray-json and spray-json-shapeless to implement Row marshalling for
> arbitrary case classes. It's checked and generated at compile time,
> supports arbitrary/nested case classes, and allows custom types. It is also
> entirely pluggable meaning you can bypass the default implementations and
> provide your own, just like any type class.
>
> https://github.com/upio/spark-sql-formats
>
>
> *From:* Michael Armbrust <mich...@databricks.com>
> *Date:* October 26, 2016 at 12:50:23 PM PDT
> *To:* Koert Kuipers <ko...@tresata.com>
> *Cc:* Ryan Blue <rb...@netflix.com>, "dev@spark.apache.org" <
> dev@spark.apache.org>
> *Subject:* *Re: getting encoder implicits to be more accurate*
>
> Sorry, I realize that set is only one example here, but I don't think that
> making the type of the implicit more narrow to include only ProductN or
> something eliminates the issue.  Even with that change, we will fail to
> generate an encoder with the same error if you, for example, have a field
> of your case class that is an unsupported type.
>
>
>
> Short of changing this to compile-time macros, I think we are stuck with
> this class of errors at runtime.  The simplest solution seems to be to
> expand the set of thing we can handle as much as possible and allow users
> to turn on a kryo fallback for expression encoders.  I'd be hesitant to
> make this the default though, as behavior would change with each release
> that adds support for more types.  I would be very supportive of making
> this fallback a built-in option though.
>
>
>
> On Wed, Oct 26, 2016 at 11:47 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
> yup, it doesnt really solve the underlying issue.
>
> we fixed it internally by having our own typeclass that produces encoders
> and that does check the contents of the products, but we did this by simply
> supporting Tuple1 - Tuple22 and Option explicitly, and not supporting
> Product, since we dont have a need for case classes
>
> if case classes extended ProductN (which they will i think in scala 2.12?)
> then we could drop Product and support Product1 - Product22 and Option
> explicitly while checking the classes they contain. that would be the
> cleanest.
>
>
>
> On Wed, Oct 26, 2016 at 2:33 PM, Ryan Blue <rb...@netflix.com> wrote:
>
> Isn't the problem that Option is a Product and the class it contains isn't
> checked? Adding support for Set fixes the example, but the problem would
> happen with any class there isn't an encoder for, right?
>
>
>
> On Wed, Oct 26, 2016 at 11:18 AM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> Hmm, that is unfortunate.  Maybe the best solution is to add support for
> sets?  I don't think that would be super hard.
>
>
>
> On Tue, Oct 25, 2016 at 8:52 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
> i am trying to use encoders as a typeclass where if it fails to find an
> ExpressionEncoder it falls back to KryoEncoder.
>
> the issue seems to be that ExpressionEncoder claims a little more than it
> can handle here:
>   implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] =
> Encoders.product[T]
>
> this "claims" to handle for example Option[Set[Int]], but it really cannot
> handle Set so it leads to a runtime exception.
>
> would it be useful to make this a little more specific? i guess the
> challenge is going to be case classes which unfortunately dont extend
> Product1, Product2, etc.
>
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>


Re: Straw poll: dropping support for things like Scala 2.10

2016-10-28 Thread Koert Kuipers
thats correct in my experience: we have found a scala update to be
straightforward and basically somewhat invisible to ops, but a java upgrade
a pain because it is managed and "certified" by ops.

On Fri, Oct 28, 2016 at 9:44 AM, Steve Loughran 
wrote:

> Twitter just led the release of Hadoop 2.6.5 precisely because they wanted
> to keep a Java 6 cluster up: the bigger your cluster, the less of a rush to
> upgrade.
>
> HDP? I believe we install & prefer (openjdk) Java 8, but the Hadoop
> branch-2 line is intended to build/run on Java 7 too. There's always a
> conflict between us developers "shiny new features" and ops "keep cluster
> alive". That's actually where Scala has an edge: no need to upgrade the
> cluster-wide JVM just for an update, or play games configuring your
> deployed application to use a different JVM from the Hadoop services (which
> you can do, after all: it's just path setup). Thinking about it, knowing
> what can be done there —including documenting it in the spark docs, could
> be a good migration strategy.
>
> me? I look forward to when we can use Java 9 to isolate transitive
> dependencies; the bane of everyone's life. Someone needs to start on
> preparing everything for that to work though.
>
>
> On 28 Oct 2016, at 11:47, Chris Fregly  wrote:
>
> i seem to remember a large spark user (tencent, i believe) chiming in late
> during these discussions 6-12 months ago and squashing any sort of
> deprecation given the massive effort that would be required to upgrade
> their environment.
>
> i just want to make sure these convos take into consideration large spark
> users - and reflect the real world versus ideal world.
>
> otherwise, this is all for naught like last time.
>
> On Oct 28, 2016, at 10:43 AM, Sean Owen  wrote:
>
> If the subtext is vendors, then I'd have a look at what recent distros
> look like. I'll write about CDH as a representative example, but I think
> other distros are naturally similar.
>
> CDH has been on Java 8, Hadoop 2.6, Python 2.7 for almost two years (CDH
> 5.3 / Dec 2014). Granted, this depends on installing on an OS with that
> Java / Python version. But Java 8 / Python 2.7 is available for all of the
> supported OSes. The population that isn't on CDH 4, because that supported
> was dropped a long time ago in Spark, and who is on a version released
> 2-2.5 years ago, and won't update, is a couple percent of the installed
> base. They do not in general want anything to change at all.
>
> I assure everyone that vendors too are aligned in wanting to cater to the
> crowd that wants the most recent version of everything. For example, CDH
> offers both Spark 2.0.1 and 1.6 at the same time.
>
> I wouldn't dismiss support for these supporting components as a relevant
> proxy for whether they are worth supporting in Spark. Java 7 is long since
> EOL (no, I don't count paying Oracle for support). No vendor is supporting
> Hadoop < 2.6. Scala 2.10 was EOL at the end of 2014. Is there a criteria
> here that reaches a different conclusion about these things just for Spark?
> This was roughly the same conversation that happened 6 months ago.
>
> I imagine we're going to find that in about 6 months it'll make more sense
> all around to remove these. If we can just give a heads up with deprecation
> and then kick the can down the road a bit more, that sounds like enough for
> now.
>
> On Fri, Oct 28, 2016 at 8:58 AM Matei Zaharia 
> wrote:
>
>> Deprecating them is fine (and I know they're already deprecated), the
>> question is just whether to remove them. For example, what exactly is the
>> downside of having Python 2.6 or Java 7 right now? If it's high, then we
>> can remove them, but I just haven't seen a ton of details. It also sounded
>> like fairly recent versions of CDH, HDP, RHEL, etc still have old versions
>> of these.
>>
>> Just talking with users, I've seen many of people who say "we have a
>> Hadoop cluster from $VENDOR, but we just download Spark from Apache and run
>> newer versions of that". That's great for Spark IMO, and we need to stay
>> compatible even with somewhat older Hadoop installs because they are
>> time-consuming to update. Having the whole community on a small set of
>> versions leads to a better experience for everyone and also to more of a
>> "network effect": more people can battle-test new versions, answer
>> questions about them online, write libraries that easily reach the majority
>> of Spark users, etc.
>>
>
>


Re: encoders for more complex types

2016-10-27 Thread Koert Kuipers
https://issues.apache.org/jira/browse/SPARK-18147

On Thu, Oct 27, 2016 at 4:55 PM, Koert Kuipers <ko...@tresata.com> wrote:

> ok will do
>
> On Thu, Oct 27, 2016 at 4:51 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> I would categorize these as bugs.  We should (but probably don't fully
>> yet) support arbitrary nesting as long as you use basic collections / case
>> classes / primitives.  Please do open JIRAs as you find problems.
>>
>> On Thu, Oct 27, 2016 at 1:05 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> well i was using Aggregators that returned sequences of structs, or
>>> structs with sequences inside etc. and got compilation errors on the
>>> codegen.
>>>
>>> i didnt bother trying to reproduce it so far, or post it, since what i
>>> did was beyond the supposed usage anyhow.
>>>
>>>
>>> On Thu, Oct 27, 2016 at 4:02 PM, Herman van Hövell tot Westerflier <
>>> hvanhov...@databricks.com> wrote:
>>>
>>>> What kind of difficulties are you experiencing?
>>>>
>>>> On Thu, Oct 27, 2016 at 9:57 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> i have been pushing my luck a bit and started using ExpressionEncoder
>>>>> for more complex types like sequences of case classes etc. (where the case
>>>>> classes only had primitives and Strings).
>>>>>
>>>>> it all seems to work but i think the wheels come off in certain cases
>>>>> in the code generation. i guess this is not unexpected, after all what i 
>>>>> am
>>>>> doing is not yet supported.
>>>>>
>>>>> is there a planned path forward to support more complex types with
>>>>> encoders? it would be nice if we can at least support all types that
>>>>> spark-sql supports in general for DataFrame.
>>>>>
>>>>> best, koert
>>>>>
>>>>
>>>>
>>>
>>
>


Re: encoders for more complex types

2016-10-27 Thread Koert Kuipers
ok will do

On Thu, Oct 27, 2016 at 4:51 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I would categorize these as bugs.  We should (but probably don't fully
> yet) support arbitrary nesting as long as you use basic collections / case
> classes / primitives.  Please do open JIRAs as you find problems.
>
> On Thu, Oct 27, 2016 at 1:05 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> well i was using Aggregators that returned sequences of structs, or
>> structs with sequences inside etc. and got compilation errors on the
>> codegen.
>>
>> i didnt bother trying to reproduce it so far, or post it, since what i
>> did was beyond the supposed usage anyhow.
>>
>>
>> On Thu, Oct 27, 2016 at 4:02 PM, Herman van Hövell tot Westerflier <
>> hvanhov...@databricks.com> wrote:
>>
>>> What kind of difficulties are you experiencing?
>>>
>>> On Thu, Oct 27, 2016 at 9:57 PM, Koert Kuipers <ko...@tresata.com>
>>> wrote:
>>>
>>>> i have been pushing my luck a bit and started using ExpressionEncoder
>>>> for more complex types like sequences of case classes etc. (where the case
>>>> classes only had primitives and Strings).
>>>>
>>>> it all seems to work but i think the wheels come off in certain cases
>>>> in the code generation. i guess this is not unexpected, after all what i am
>>>> doing is not yet supported.
>>>>
>>>> is there a planned path forward to support more complex types with
>>>> encoders? it would be nice if we can at least support all types that
>>>> spark-sql supports in general for DataFrame.
>>>>
>>>> best, koert
>>>>
>>>
>>>
>>
>


Re: encoders for more complex types

2016-10-27 Thread Koert Kuipers
well i was using Aggregators that returned sequences of structs, or structs
with sequences inside etc. and got compilation errors on the codegen.

i didnt bother trying to reproduce it so far, or post it, since what i did
was beyond the supposed usage anyhow.


On Thu, Oct 27, 2016 at 4:02 PM, Herman van Hövell tot Westerflier <
hvanhov...@databricks.com> wrote:

> What kind of difficulties are you experiencing?
>
> On Thu, Oct 27, 2016 at 9:57 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> i have been pushing my luck a bit and started using ExpressionEncoder for
>> more complex types like sequences of case classes etc. (where the case
>> classes only had primitives and Strings).
>>
>> it all seems to work but i think the wheels come off in certain cases in
>> the code generation. i guess this is not unexpected, after all what i am
>> doing is not yet supported.
>>
>> is there a planned path forward to support more complex types with
>> encoders? it would be nice if we can at least support all types that
>> spark-sql supports in general for DataFrame.
>>
>> best, koert
>>
>
>


encoders for more complex types

2016-10-27 Thread Koert Kuipers
i have been pushing my luck a bit and started using ExpressionEncoder for
more complex types like sequences of case classes etc. (where the case
classes only had primitives and Strings).

it all seems to work but i think the wheels come off in certain cases in
the code generation. i guess this is not unexpected, after all what i am
doing is not yet supported.

is there a planned path forward to support more complex types with
encoders? it would be nice if we can at least support all types that
spark-sql supports in general for DataFrame.

best, koert


Re: [VOTE] Release Apache Spark 2.0.2 (RC1)

2016-10-27 Thread Koert Kuipers
+1 non binding

compiled and unit tested in-house libraries against 2.0.2-rc1 successfully

was able to build, deploy and launch on cdh 5.7 yarn cluster

on a side note... these artifacts on staging repo having version 2.0.2
instead of 2.0.2-rc1 makes it somewhat dangerous to test against it in
existing project. i can add a sbt resolver for staging repo and change my
spark version to 2.0.2, but now it starts downloading and caching these
artifacts as version 2.0.2 which means i am now hunting down local cache
locations afterwards like ~/.ivy2/cache to make sure i wipe them or run the
risk of in the future compiling against the rc instead of the actual
release by accident. not sure if i should be doing something different?


On Thu, Oct 27, 2016 at 3:18 AM, Reynold Xin  wrote:

> Greetings from Spark Summit Europe at Brussels.
>
> Please vote on releasing the following candidate as Apache Spark version
> 2.0.2. The vote is open until Sun, Oct 30, 2016 at 00:30 PDT and passes if
> a majority of at least 3+1 PMC votes are cast.
>
> [ ] +1 Release this package as Apache Spark 2.0.2
> [ ] -1 Do not release this package because ...
>
>
> The tag to be voted on is v2.0.2-rc1 (1c2908eeb8890fdc91413a3f5bad2b
> b3d114db6c)
>
> This release candidate resolves 75 issues: https://s.apache.org/spark-2.
> 0.2-jira
>
> The release files, including signatures, digests, etc. can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.2-rc1-bin/
>
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1208/
>
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.2-rc1-docs/
>
>
> Q: How can I help test this release?
> A: If you are a Spark user, you can help us test this release by taking an
> existing Spark workload and running on this release candidate, then
> reporting any regressions from 2.0.1.
>
> Q: What justifies a -1 vote for this release?
> A: This is a maintenance release in the 2.0.x series. Bugs already present
> in 2.0.1, missing features, or bugs related to new features will not
> necessarily block this release.
>
> Q: What fix version should I use for patches merging into branch-2.0 from
> now on?
> A: Please mark the fix version as 2.0.3, rather than 2.0.2. If a new RC
> (i.e. RC2) is cut, I will change the fix version of those patches to 2.0.2.
>
>
>


Re: getting encoder implicits to be more accurate

2016-10-26 Thread Koert Kuipers
if kryo could transparently be used for subtrees without narrowing the
implicit that would be great

On Wed, Oct 26, 2016 at 5:10 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i use kryo for the whole thing currently
>
> it would be better to use it for the subtree
>
> On Wed, Oct 26, 2016 at 5:06 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> You use kryo encoder for the whole thing?  Or just the subtree that we
>> don't have specific encoders for?
>>
>> Also, I'm saying I like the idea of having a kryo fallback.  I don't see
>> the point of narrowing the the definition of the implicit.
>>
>> On Wed, Oct 26, 2016 at 1:07 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> for example (the log shows when it creates a kryo encoder):
>>>
>>> scala> implicitly[EncoderEvidence[Option[Seq[String.encoder
>>> res5: org.apache.spark.sql.Encoder[Option[Seq[String]]] =
>>> class[value[0]: array]
>>>
>>> scala> implicitly[EncoderEvidence[Option[Set[String.encoder
>>> dataframe.EncoderEvidence$: using kryo encoder for
>>> scala.Option[Set[String]]
>>> res6: org.apache.spark.sql.Encoder[Option[Set[String]]] =
>>> class[value[0]: binary]
>>>
>>>
>>>
>>>
>>> On Wed, Oct 26, 2016 at 4:00 PM, Koert Kuipers <ko...@tresata.com>
>>> wrote:
>>>
>>>> why would generating implicits for ProductN where you also require the
>>>> elements in the Product to have an expression encoder not work?
>>>>
>>>> we do this. and then we have a generic fallback where it produces a
>>>> kryo encoder.
>>>>
>>>> for us the result is that say an implicit for Seq[(Int, Seq[(String,
>>>> Int)])] will create a new ExpressionEncoder(), while an implicit for
>>>> Seq[(Int, Set[(String, Int)])] produces a Encoders.kryoEncoder()
>>>>
>>>> On Wed, Oct 26, 2016 at 3:50 PM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> Sorry, I realize that set is only one example here, but I don't think
>>>>> that making the type of the implicit more narrow to include only ProductN
>>>>> or something eliminates the issue.  Even with that change, we will fail to
>>>>> generate an encoder with the same error if you, for example, have a field
>>>>> of your case class that is an unsupported type.
>>>>>
>>>>> Short of changing this to compile-time macros, I think we are stuck
>>>>> with this class of errors at runtime.  The simplest solution seems to be 
>>>>> to
>>>>> expand the set of thing we can handle as much as possible and allow users
>>>>> to turn on a kryo fallback for expression encoders.  I'd be hesitant to
>>>>> make this the default though, as behavior would change with each release
>>>>> that adds support for more types.  I would be very supportive of making
>>>>> this fallback a built-in option though.
>>>>>
>>>>> On Wed, Oct 26, 2016 at 11:47 AM, Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> yup, it doesnt really solve the underlying issue.
>>>>>>
>>>>>> we fixed it internally by having our own typeclass that produces
>>>>>> encoders and that does check the contents of the products, but we did 
>>>>>> this
>>>>>> by simply supporting Tuple1 - Tuple22 and Option explicitly, and not
>>>>>> supporting Product, since we dont have a need for case classes
>>>>>>
>>>>>> if case classes extended ProductN (which they will i think in scala
>>>>>> 2.12?) then we could drop Product and support Product1 - Product22 and
>>>>>> Option explicitly while checking the classes they contain. that would be
>>>>>> the cleanest.
>>>>>>
>>>>>>
>>>>>> On Wed, Oct 26, 2016 at 2:33 PM, Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> Isn't the problem that Option is a Product and the class it contains
>>>>>>> isn't checked? Adding support for Set fixes the example, but the problem
>>>>>>> would happen with any class there isn't an encoder for, right?
>>>>>>>
>>>>>>> On Wed, Oct 26, 2016 at 11:18 AM, Michael Armbrust <
>>>>>>> m

Re: getting encoder implicits to be more accurate

2016-10-26 Thread Koert Kuipers
i use kryo for the whole thing currently

it would be better to use it for the subtree

On Wed, Oct 26, 2016 at 5:06 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> You use kryo encoder for the whole thing?  Or just the subtree that we
> don't have specific encoders for?
>
> Also, I'm saying I like the idea of having a kryo fallback.  I don't see
> the point of narrowing the the definition of the implicit.
>
> On Wed, Oct 26, 2016 at 1:07 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> for example (the log shows when it creates a kryo encoder):
>>
>> scala> implicitly[EncoderEvidence[Option[Seq[String.encoder
>> res5: org.apache.spark.sql.Encoder[Option[Seq[String]]] =
>> class[value[0]: array]
>>
>> scala> implicitly[EncoderEvidence[Option[Set[String.encoder
>> dataframe.EncoderEvidence$: using kryo encoder for
>> scala.Option[Set[String]]
>> res6: org.apache.spark.sql.Encoder[Option[Set[String]]] =
>> class[value[0]: binary]
>>
>>
>>
>>
>> On Wed, Oct 26, 2016 at 4:00 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> why would generating implicits for ProductN where you also require the
>>> elements in the Product to have an expression encoder not work?
>>>
>>> we do this. and then we have a generic fallback where it produces a kryo
>>> encoder.
>>>
>>> for us the result is that say an implicit for Seq[(Int, Seq[(String,
>>> Int)])] will create a new ExpressionEncoder(), while an implicit for
>>> Seq[(Int, Set[(String, Int)])] produces a Encoders.kryoEncoder()
>>>
>>> On Wed, Oct 26, 2016 at 3:50 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Sorry, I realize that set is only one example here, but I don't think
>>>> that making the type of the implicit more narrow to include only ProductN
>>>> or something eliminates the issue.  Even with that change, we will fail to
>>>> generate an encoder with the same error if you, for example, have a field
>>>> of your case class that is an unsupported type.
>>>>
>>>> Short of changing this to compile-time macros, I think we are stuck
>>>> with this class of errors at runtime.  The simplest solution seems to be to
>>>> expand the set of thing we can handle as much as possible and allow users
>>>> to turn on a kryo fallback for expression encoders.  I'd be hesitant to
>>>> make this the default though, as behavior would change with each release
>>>> that adds support for more types.  I would be very supportive of making
>>>> this fallback a built-in option though.
>>>>
>>>> On Wed, Oct 26, 2016 at 11:47 AM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> yup, it doesnt really solve the underlying issue.
>>>>>
>>>>> we fixed it internally by having our own typeclass that produces
>>>>> encoders and that does check the contents of the products, but we did this
>>>>> by simply supporting Tuple1 - Tuple22 and Option explicitly, and not
>>>>> supporting Product, since we dont have a need for case classes
>>>>>
>>>>> if case classes extended ProductN (which they will i think in scala
>>>>> 2.12?) then we could drop Product and support Product1 - Product22 and
>>>>> Option explicitly while checking the classes they contain. that would be
>>>>> the cleanest.
>>>>>
>>>>>
>>>>> On Wed, Oct 26, 2016 at 2:33 PM, Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> Isn't the problem that Option is a Product and the class it contains
>>>>>> isn't checked? Adding support for Set fixes the example, but the problem
>>>>>> would happen with any class there isn't an encoder for, right?
>>>>>>
>>>>>> On Wed, Oct 26, 2016 at 11:18 AM, Michael Armbrust <
>>>>>> mich...@databricks.com> wrote:
>>>>>>
>>>>>>> Hmm, that is unfortunate.  Maybe the best solution is to add support
>>>>>>> for sets?  I don't think that would be super hard.
>>>>>>>
>>>>>>> On Tue, Oct 25, 2016 at 8:52 PM, Koert Kuipers <ko...@tresata.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> i am trying to use encoders as a typeclass where if it fails to
>>>>>>>> find an ExpressionEncoder it falls back to KryoEncoder.
>>>>>>>>
>>>>>>>> the issue seems to be that ExpressionEncoder claims a little more
>>>>>>>> than it can handle here:
>>>>>>>>   implicit def newProductEncoder[T <: Product : TypeTag]:
>>>>>>>> Encoder[T] = Encoders.product[T]
>>>>>>>>
>>>>>>>> this "claims" to handle for example Option[Set[Int]], but it really
>>>>>>>> cannot handle Set so it leads to a runtime exception.
>>>>>>>>
>>>>>>>> would it be useful to make this a little more specific? i guess the
>>>>>>>> challenge is going to be case classes which unfortunately dont extend
>>>>>>>> Product1, Product2, etc.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: getting encoder implicits to be more accurate

2016-10-26 Thread Koert Kuipers
for example (the log shows when it creates a kryo encoder):

scala> implicitly[EncoderEvidence[Option[Seq[String.encoder
res5: org.apache.spark.sql.Encoder[Option[Seq[String]]] = class[value[0]:
array]

scala> implicitly[EncoderEvidence[Option[Set[String.encoder
dataframe.EncoderEvidence$: using kryo encoder for scala.Option[Set[String]]
res6: org.apache.spark.sql.Encoder[Option[Set[String]]] = class[value[0]:
binary]




On Wed, Oct 26, 2016 at 4:00 PM, Koert Kuipers <ko...@tresata.com> wrote:

> why would generating implicits for ProductN where you also require the
> elements in the Product to have an expression encoder not work?
>
> we do this. and then we have a generic fallback where it produces a kryo
> encoder.
>
> for us the result is that say an implicit for Seq[(Int, Seq[(String,
> Int)])] will create a new ExpressionEncoder(), while an implicit for
> Seq[(Int, Set[(String, Int)])] produces a Encoders.kryoEncoder()
>
> On Wed, Oct 26, 2016 at 3:50 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Sorry, I realize that set is only one example here, but I don't think
>> that making the type of the implicit more narrow to include only ProductN
>> or something eliminates the issue.  Even with that change, we will fail to
>> generate an encoder with the same error if you, for example, have a field
>> of your case class that is an unsupported type.
>>
>> Short of changing this to compile-time macros, I think we are stuck with
>> this class of errors at runtime.  The simplest solution seems to be to
>> expand the set of thing we can handle as much as possible and allow users
>> to turn on a kryo fallback for expression encoders.  I'd be hesitant to
>> make this the default though, as behavior would change with each release
>> that adds support for more types.  I would be very supportive of making
>> this fallback a built-in option though.
>>
>> On Wed, Oct 26, 2016 at 11:47 AM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>>
>>> yup, it doesnt really solve the underlying issue.
>>>
>>> we fixed it internally by having our own typeclass that produces
>>> encoders and that does check the contents of the products, but we did this
>>> by simply supporting Tuple1 - Tuple22 and Option explicitly, and not
>>> supporting Product, since we dont have a need for case classes
>>>
>>> if case classes extended ProductN (which they will i think in scala
>>> 2.12?) then we could drop Product and support Product1 - Product22 and
>>> Option explicitly while checking the classes they contain. that would be
>>> the cleanest.
>>>
>>>
>>> On Wed, Oct 26, 2016 at 2:33 PM, Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> Isn't the problem that Option is a Product and the class it contains
>>>> isn't checked? Adding support for Set fixes the example, but the problem
>>>> would happen with any class there isn't an encoder for, right?
>>>>
>>>> On Wed, Oct 26, 2016 at 11:18 AM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> Hmm, that is unfortunate.  Maybe the best solution is to add support
>>>>> for sets?  I don't think that would be super hard.
>>>>>
>>>>> On Tue, Oct 25, 2016 at 8:52 PM, Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> i am trying to use encoders as a typeclass where if it fails to find
>>>>>> an ExpressionEncoder it falls back to KryoEncoder.
>>>>>>
>>>>>> the issue seems to be that ExpressionEncoder claims a little more
>>>>>> than it can handle here:
>>>>>>   implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T]
>>>>>> = Encoders.product[T]
>>>>>>
>>>>>> this "claims" to handle for example Option[Set[Int]], but it really
>>>>>> cannot handle Set so it leads to a runtime exception.
>>>>>>
>>>>>> would it be useful to make this a little more specific? i guess the
>>>>>> challenge is going to be case classes which unfortunately dont extend
>>>>>> Product1, Product2, etc.
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>>
>


Re: getting encoder implicits to be more accurate

2016-10-26 Thread Koert Kuipers
why would generating implicits for ProductN where you also require the
elements in the Product to have an expression encoder not work?

we do this. and then we have a generic fallback where it produces a kryo
encoder.

for us the result is that say an implicit for Seq[(Int, Seq[(String,
Int)])] will create a new ExpressionEncoder(), while an implicit for
Seq[(Int, Set[(String, Int)])] produces a Encoders.kryoEncoder()

On Wed, Oct 26, 2016 at 3:50 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Sorry, I realize that set is only one example here, but I don't think that
> making the type of the implicit more narrow to include only ProductN or
> something eliminates the issue.  Even with that change, we will fail to
> generate an encoder with the same error if you, for example, have a field
> of your case class that is an unsupported type.
>
> Short of changing this to compile-time macros, I think we are stuck with
> this class of errors at runtime.  The simplest solution seems to be to
> expand the set of thing we can handle as much as possible and allow users
> to turn on a kryo fallback for expression encoders.  I'd be hesitant to
> make this the default though, as behavior would change with each release
> that adds support for more types.  I would be very supportive of making
> this fallback a built-in option though.
>
> On Wed, Oct 26, 2016 at 11:47 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> yup, it doesnt really solve the underlying issue.
>>
>> we fixed it internally by having our own typeclass that produces encoders
>> and that does check the contents of the products, but we did this by simply
>> supporting Tuple1 - Tuple22 and Option explicitly, and not supporting
>> Product, since we dont have a need for case classes
>>
>> if case classes extended ProductN (which they will i think in scala
>> 2.12?) then we could drop Product and support Product1 - Product22 and
>> Option explicitly while checking the classes they contain. that would be
>> the cleanest.
>>
>>
>> On Wed, Oct 26, 2016 at 2:33 PM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Isn't the problem that Option is a Product and the class it contains
>>> isn't checked? Adding support for Set fixes the example, but the problem
>>> would happen with any class there isn't an encoder for, right?
>>>
>>> On Wed, Oct 26, 2016 at 11:18 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Hmm, that is unfortunate.  Maybe the best solution is to add support
>>>> for sets?  I don't think that would be super hard.
>>>>
>>>> On Tue, Oct 25, 2016 at 8:52 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> i am trying to use encoders as a typeclass where if it fails to find
>>>>> an ExpressionEncoder it falls back to KryoEncoder.
>>>>>
>>>>> the issue seems to be that ExpressionEncoder claims a little more than
>>>>> it can handle here:
>>>>>   implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] =
>>>>> Encoders.product[T]
>>>>>
>>>>> this "claims" to handle for example Option[Set[Int]], but it really
>>>>> cannot handle Set so it leads to a runtime exception.
>>>>>
>>>>> would it be useful to make this a little more specific? i guess the
>>>>> challenge is going to be case classes which unfortunately dont extend
>>>>> Product1, Product2, etc.
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>


Re: Straw poll: dropping support for things like Scala 2.10

2016-10-26 Thread Koert Kuipers
that sounds good to me

On Wed, Oct 26, 2016 at 2:26 PM, Reynold Xin  wrote:

> We can do the following concrete proposal:
>
> 1. Plan to remove support for Java 7 / Scala 2.10 in Spark 2.2.0 (Mar/Apr
> 2017).
>
> 2. In Spark 2.1.0 release, aggressively and explicitly announce the
> deprecation of Java 7 / Scala 2.10 support.
>
> (a) It should appear in release notes, documentations that mention how to
> build Spark
>
> (b) and a warning should be shown every time SparkContext is started using
> Scala 2.10 or Java 7.
>
>
>
> On Wed, Oct 26, 2016 at 7:50 PM, Dongjoon Hyun 
> wrote:
>
>> Hi, Daniel.
>>
>> I guess that kind of works will start sufficiently in 2.1.0 after PMC's
>> annoucement/reminder on mailing list.
>>
>> Bests,
>> Dongjoon.
>>
>>
>> On Wednesday, October 26, 2016, Daniel Siegmann <
>> dsiegm...@securityscorecard.io> wrote:
>>
>>> Is the deprecation of JDK 7 and Scala 2.10 documented anywhere outside
>>> the release notes for Spark 2.0.0? I do not consider release notes to be
>>> sufficient public notice for deprecation of supported platforms - this
>>> should be noted in the documentation somewhere. Here are on the only
>>> mentions I could find:
>>>
>>> At http://spark.apache.org/downloads.html it says:
>>>
>>> "*Note: Starting version 2.0, Spark is built with Scala 2.11 by
>>> default. Scala 2.10 users should download the Spark source package and
>>> build with Scala 2.10 support
>>> ."*
>>>
>>> At http://spark.apache.org/docs/latest/#downloading it says:
>>>
>>> "Spark runs on Java 7+, Python 2.6+/3.4+ and R 3.1+. For the Scala API,
>>> Spark 2.0.1 uses Scala 2.11. You will need to use a compatible Scala
>>> version (2.11.x)."
>>>
>>> At http://spark.apache.org/docs/latest/programming-guide.html#l
>>> inking-with-spark it says:
>>>
>>>- "Spark 2.0.1 is built and distributed to work with Scala 2.11 by
>>>default. (Spark can be built to work with other versions of Scala, too.) 
>>> To
>>>write applications in Scala, you will need to use a compatible Scala
>>>version (e.g. 2.11.X)."
>>>- "Spark 2.0.1 works with Java 7 and higher. If you are using Java
>>>8, Spark supports lambda expressions
>>>
>>> 
>>>for concisely writing functions, otherwise you can use the classes in the
>>>org.apache.spark.api.java.function
>>>
>>> 
>>>package."
>>>- "Spark 2.0.1 works with Python 2.6+ or Python 3.4+. It can use the
>>>standard CPython interpreter, so C libraries like NumPy can be used. It
>>>also works with PyPy 2.3+."
>>>
>>>
>>>
>


Re: getting encoder implicits to be more accurate

2016-10-26 Thread Koert Kuipers
yup, it doesnt really solve the underlying issue.

we fixed it internally by having our own typeclass that produces encoders
and that does check the contents of the products, but we did this by simply
supporting Tuple1 - Tuple22 and Option explicitly, and not supporting
Product, since we dont have a need for case classes

if case classes extended ProductN (which they will i think in scala 2.12?)
then we could drop Product and support Product1 - Product22 and Option
explicitly while checking the classes they contain. that would be the
cleanest.


On Wed, Oct 26, 2016 at 2:33 PM, Ryan Blue <rb...@netflix.com> wrote:

> Isn't the problem that Option is a Product and the class it contains isn't
> checked? Adding support for Set fixes the example, but the problem would
> happen with any class there isn't an encoder for, right?
>
> On Wed, Oct 26, 2016 at 11:18 AM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> Hmm, that is unfortunate.  Maybe the best solution is to add support for
>> sets?  I don't think that would be super hard.
>>
>> On Tue, Oct 25, 2016 at 8:52 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> i am trying to use encoders as a typeclass where if it fails to find an
>>> ExpressionEncoder it falls back to KryoEncoder.
>>>
>>> the issue seems to be that ExpressionEncoder claims a little more than
>>> it can handle here:
>>>   implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] =
>>> Encoders.product[T]
>>>
>>> this "claims" to handle for example Option[Set[Int]], but it really
>>> cannot handle Set so it leads to a runtime exception.
>>>
>>> would it be useful to make this a little more specific? i guess the
>>> challenge is going to be case classes which unfortunately dont extend
>>> Product1, Product2, etc.
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


getting encoder implicits to be more accurate

2016-10-25 Thread Koert Kuipers
i am trying to use encoders as a typeclass where if it fails to find an
ExpressionEncoder it falls back to KryoEncoder.

the issue seems to be that ExpressionEncoder claims a little more than it
can handle here:
  implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] =
Encoders.product[T]

this "claims" to handle for example Option[Set[Int]], but it really cannot
handle Set so it leads to a runtime exception.

would it be useful to make this a little more specific? i guess the
challenge is going to be case classes which unfortunately dont extend
Product1, Product2, etc.


Re: Straw poll: dropping support for things like Scala 2.10

2016-10-25 Thread Koert Kuipers
it will take time before all libraries that spark depends on are available
for scala 2.12, so we are not talking spark 2.1.x and probably also not
2.2.x for scala 2.12

it technically makes sense to drop java 7 and scala 2.10 around the same
time as scala 2.12 is introduced

we are still heavily dependent on java 7 (and python 2.6 if we used python
but we dont). i am surprised to see new clusters installed in last few
months (CDH and HDP latest versions) to still be running on java 7. even
getting java 8 installed on these clusters so we can use them in yarn is
often not an option. it beats me as to why this is still happening.

we do not use scala 2.10 at all anymore.

On Tue, Oct 25, 2016 at 12:31 PM, Ofir Manor  wrote:

> I think that 2.1 should include a visible deprecation message about Java
> 7, Scala 2.10 and older Hadoop versions (plus python if there is a
> consensus on that), to give users / admins early warning, followed by
> dropping them from trunk for 2.2 once 2.1 is released.
> Personally, we use only Scala 2.11 on JDK8.
> Cody - Scala 2.12 will likely be released before Spark 2.1, maybe even
> later this week: http://scala-lang.org/news/2.12.0-RC2
>
> Ofir Manor
>
> Co-Founder & CTO | Equalum
>
> Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io
>
> On Tue, Oct 25, 2016 at 7:28 PM, Cody Koeninger 
> wrote:
>
>> I think only supporting 1 version of scala at any given time is not
>> sufficient, 2 probably is ok.
>>
>> I.e. don't drop 2.10 before 2.12 is out + supported
>>
>> On Tue, Oct 25, 2016 at 10:56 AM, Sean Owen  wrote:
>> > The general forces are that new versions of things to support emerge,
>> and
>> > are valuable to support, but have some cost to support in addition to
>> old
>> > versions. And the old versions become less used and therefore less
>> valuable
>> > to support, and at some point it tips to being more cost than value.
>> It's
>> > hard to judge these costs and benefits.
>> >
>> > Scala is perhaps the trickiest one because of the general mutual
>> > incompatibilities across minor versions. The cost of supporting multiple
>> > versions is high, and a third version is about to arrive. That's
>> probably
>> > the most pressing question. It's actually biting with some regularity
>> now,
>> > with compile errors on 2.10.
>> >
>> > (Python I confess I don't have an informed opinion about.)
>> >
>> > Java, Hadoop are not as urgent because they're more
>> backwards-compatible.
>> > Anecdotally, I'd be surprised if anyone today would "upgrade" to Java 7
>> or
>> > an old Hadoop version. And I think that's really the question. Even if
>> one
>> > decided to drop support for all this in 2.1.0, it would not mean people
>> > can't use Spark with these things. It merely means they can't
>> necessarily
>> > use Spark 2.1.x. This is why we have maintenance branches for 1.6.x,
>> 2.0.x.
>> >
>> > Tying Scala 2.11/12 support to Java 8 might make sense.
>> >
>> > In fact, I think that's part of the reason that an update in master,
>> perhaps
>> > 2.1.x, could be overdue, because it actually is just the beginning of
>> the
>> > end of the support burden. If you want to stop dealing with these in ~6
>> > months they need to stop being supported in minor branches by right
>> about
>> > now.
>> >
>> >
>> >
>> >
>> > On Tue, Oct 25, 2016 at 4:47 PM Mark Hamstra 
>> > wrote:
>> >>
>> >> What's changed since the last time we discussed these issues, about 7
>> >> months ago?  Or, another way to formulate the question: What are the
>> >> threshold criteria that we should use to decide when to end Scala 2.10
>> >> and/or Java 7 support?
>> >>
>> >> On Tue, Oct 25, 2016 at 8:36 AM, Sean Owen  wrote:
>> >>>
>> >>> I'd like to gauge where people stand on the issue of dropping support
>> for
>> >>> a few things that were considered for 2.0.
>> >>>
>> >>> First: Scala 2.10. We've seen a number of build breakages this week
>> >>> because the PR builder only tests 2.11. No big deal at this stage,
>> but, it
>> >>> did cause me to wonder whether it's time to plan to drop 2.10 support,
>> >>> especially with 2.12 coming soon.
>> >>>
>> >>> Next, Java 7. It's reasonably old and out of public updates at this
>> >>> stage. It's not that painful to keep supporting, to be honest. It
>> would
>> >>> simplify some bits of code, some scripts, some testing.
>> >>>
>> >>> Hadoop versions: I think the the general argument is that most anyone
>> >>> would be using, at the least, 2.6, and it would simplify some code
>> that has
>> >>> to reflect to use not-even-that-new APIs. It would remove some
>> moderate
>> >>> complexity in the build.
>> >>>
>> >>>
>> >>> "When" is a tricky question. Although it's a little aggressive for
>> minor
>> >>> releases, I think these will all happen before 3.x regardless. 2.1.0
>> is not
>> >>> out of the question, though coming soon. What about ... 2.2.0?
>> >>>
>> >>>
>> >>> 

incorrect message that path appears to be local

2016-10-12 Thread Koert Kuipers
i see this warning when running jobs on cluster:

2016-10-12 14:46:47 WARN spark.SparkContext: Spark is not running in local
mode, therefore the checkpoint directory must not be on the local
filesystem. Directory '/tmp' appears to be on the local filesystem.

however the checkpoint "directory" that it warns about is a hadoop path. i
use an unqualified path, which means a path on the default filesystem by
hadoop convention. when running on the cluster my default filesystem is
hdfs (and it correctly uses hdfs).

how about if we change the method that does this check
(Utils.nonLocalPaths) to be aware of the default filesystem instead of
incorrectly assuming its local if not specified?


Re: Structured Streaming with Kafka sources/sinks

2016-08-27 Thread Koert Kuipers
thats great

is this effort happening anywhere that is publicly visible? github?

On Tue, Aug 16, 2016 at 2:04 AM, Reynold Xin  wrote:

> We (the team at Databricks) are working on one currently.
>
>
> On Mon, Aug 15, 2016 at 7:26 PM, Cody Koeninger 
> wrote:
>
>> https://issues.apache.org/jira/browse/SPARK-15406
>>
>> I'm not working on it (yet?), never got an answer to the question of
>> who was planning to work on it.
>>
>> On Mon, Aug 15, 2016 at 9:12 PM, Guo, Chenzhao 
>> wrote:
>> > Hi all,
>> >
>> >
>> >
>> > I’m trying to write Structured Streaming test code and will deal with
>> Kafka
>> > source. Currently Spark 2.0 doesn’t support Kafka sources/sinks.
>> >
>> >
>> >
>> > I found some Databricks slides saying that Kafka sources/sinks will be
>> > implemented in Spark 2.0, so is there anybody working on this? And when
>> will
>> > it be released?
>> >
>> >
>> >
>> > Thanks,
>> >
>> > Chenzhao Guo
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>


Re: PySpark: Make persist() return a context manager

2016-08-05 Thread Koert Kuipers
i think it limits the usability of with statement. and it could be somewhat
confusing because of this, so i would mention it in docs.

i like the idea though.

On Fri, Aug 5, 2016 at 7:04 PM, Nicholas Chammas <nicholas.cham...@gmail.com
> wrote:

> Good point.
>
> Do you think it's sufficient to note this somewhere in the documentation
> (or simply assume that user understanding of transformations vs. actions
> means they know this), or are there other implications that need to be
> considered?
>
> On Fri, Aug 5, 2016 at 6:50 PM Koert Kuipers <ko...@tresata.com> wrote:
>
>> The tricky part is that the action needs to be inside the with block, not
>> just the transformation that uses the persisted data.
>>
>> On Aug 5, 2016 1:44 PM, "Nicholas Chammas" <nicholas.cham...@gmail.com>
>> wrote:
>>
>> Okie doke, I've filed a JIRA for this here: https://issues.apache.
>> org/jira/browse/SPARK-16921
>>
>> On Fri, Aug 5, 2016 at 2:08 AM Reynold Xin <r...@databricks.com> wrote:
>>
>>> Sounds like a great idea!
>>>
>>> On Friday, August 5, 2016, Nicholas Chammas <nicholas.cham...@gmail.com>
>>> wrote:
>>>
>>>> Context managers
>>>> <https://docs.python.org/3/reference/datamodel.html#context-managers>
>>>> are a natural way to capture closely related setup and teardown code in
>>>> Python.
>>>>
>>>> For example, they are commonly used when doing file I/O:
>>>>
>>>> with open('/path/to/file') as f:
>>>> contents = f.read()
>>>> ...
>>>>
>>>> Once the program exits the with block, f is automatically closed.
>>>>
>>>> Does it make sense to apply this pattern to persisting and unpersisting
>>>> DataFrames and RDDs? I feel like there are many cases when you want to
>>>> persist a DataFrame for a specific set of operations and then unpersist it
>>>> immediately afterwards.
>>>>
>>>> For example, take model training. Today, you might do something like
>>>> this:
>>>>
>>>> labeled_data.persist()
>>>> model = pipeline.fit(labeled_data)
>>>> labeled_data.unpersist()
>>>>
>>>> If persist() returned a context manager, you could rewrite this as
>>>> follows:
>>>>
>>>> with labeled_data.persist():
>>>> model = pipeline.fit(labeled_data)
>>>>
>>>> Upon exiting the with block, labeled_data would automatically be
>>>> unpersisted.
>>>>
>>>> This can be done in a backwards-compatible way since persist() would
>>>> still return the parent DataFrame or RDD as it does today, but add two
>>>> methods to the object: __enter__() and __exit__()
>>>>
>>>> Does this make sense? Is it attractive?
>>>>
>>>> Nick
>>>> ​
>>>>
>>>
>>


Re: PySpark: Make persist() return a context manager

2016-08-05 Thread Koert Kuipers
The tricky part is that the action needs to be inside the with block, not
just the transformation that uses the persisted data.

On Aug 5, 2016 1:44 PM, "Nicholas Chammas" 
wrote:

Okie doke, I've filed a JIRA for this here: https://issues.apache.
org/jira/browse/SPARK-16921

On Fri, Aug 5, 2016 at 2:08 AM Reynold Xin  wrote:

> Sounds like a great idea!
>
> On Friday, August 5, 2016, Nicholas Chammas 
> wrote:
>
>> Context managers
>> 
>> are a natural way to capture closely related setup and teardown code in
>> Python.
>>
>> For example, they are commonly used when doing file I/O:
>>
>> with open('/path/to/file') as f:
>> contents = f.read()
>> ...
>>
>> Once the program exits the with block, f is automatically closed.
>>
>> Does it make sense to apply this pattern to persisting and unpersisting
>> DataFrames and RDDs? I feel like there are many cases when you want to
>> persist a DataFrame for a specific set of operations and then unpersist it
>> immediately afterwards.
>>
>> For example, take model training. Today, you might do something like this:
>>
>> labeled_data.persist()
>> model = pipeline.fit(labeled_data)
>> labeled_data.unpersist()
>>
>> If persist() returned a context manager, you could rewrite this as
>> follows:
>>
>> with labeled_data.persist():
>> model = pipeline.fit(labeled_data)
>>
>> Upon exiting the with block, labeled_data would automatically be
>> unpersisted.
>>
>> This can be done in a backwards-compatible way since persist() would
>> still return the parent DataFrame or RDD as it does today, but add two
>> methods to the object: __enter__() and __exit__()
>>
>> Does this make sense? Is it attractive?
>>
>> Nick
>> ​
>>
>


Re: drop java 7 support for spark 2.1.x or spark 2.2.x

2016-07-23 Thread Koert Kuipers
i care about signalling it in advance mostly. and given the performance
differences we do have some interest in pushing towards java 8

On Jul 23, 2016 6:10 PM, "Mark Hamstra" <m...@clearstorydata.com> wrote:

Why the push to remove Java 7 support as soon as possible (which is how I
read your "cluster admins plan to migrate by date X, so Spark should end
Java 7 support then, too")?  First, I don't think we should be removing
Java 7 support until some time after all or nearly all relevant clusters
are actually no longer running on Java 6, and that targeting removal of
support at our best guess about when admins are just *planning* to migrate
isn't a very good idea.  Second, I don't see the significant difficulty or
harm in continuing to support Java 7 for a while longer.

On Sat, Jul 23, 2016 at 2:54 PM, Koert Kuipers <ko...@tresata.com> wrote:

> dropping java 7 support was considered for spark 2.0.x but we decided
> against it.
>
> ideally dropping support for a java version should be communicated far in
> advance to facilitate the transition.
>
> is this the right time to make that decision and start communicating it
> (mailing list, jira, etc.)? perhaps for spark 2.1.x or spark 2.2.x?
>
> my general sense is that most cluster admins have plans to migrate to java
> 8 before end of year. so that could line up nicely with spark 2.2
>
>


drop java 7 support for spark 2.1.x or spark 2.2.x

2016-07-23 Thread Koert Kuipers
dropping java 7 support was considered for spark 2.0.x but we decided
against it.

ideally dropping support for a java version should be communicated far in
advance to facilitate the transition.

is this the right time to make that decision and start communicating it
(mailing list, jira, etc.)? perhaps for spark 2.1.x or spark 2.2.x?

my general sense is that most cluster admins have plans to migrate to java
8 before end of year. so that could line up nicely with spark 2.2


Re: spark git commit: [SPARK-15204][SQL] improve nullability inference for Aggregator

2016-07-05 Thread Koert Kuipers
oh you mean instead of:
assert(ds3.select(NameAgg.toColumn).schema.head.nullable === true)
just do:
assert(ds3.select(NameAgg.toColumn).schema.head.nullable)

i did mostly === true because i also had === false, and i liked the
symmetry, but sure this can be fixed if its not the norm

On Tue, Jul 5, 2016 at 4:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> On Mon, Jul 4, 2016 at 6:14 AM,  <wenc...@apache.org> wrote:
> > Repository: spark
> > Updated Branches:
> >   refs/heads/master 88134e736 -> 8cdb81fa8
> >
> >
> > [SPARK-15204][SQL] improve nullability inference for Aggregator
> >
> > ## What changes were proposed in this pull request?
> >
> > TypedAggregateExpression sets nullable based on the schema of the
> outputEncoder
> >
> > ## How was this patch tested?
> >
> > Add test in DatasetAggregatorSuite
> >
> > Author: Koert Kuipers <ko...@tresata.com>
> ...
> > +assert(ds1.select(typed.sum((i: Int) => i)).schema.head.nullable
> === false)
> > +val ds2 = Seq(AggData(1, "a"), AggData(2, "a")).toDS()
> > +assert(ds2.select(SeqAgg.toColumn).schema.head.nullable === true)
> > +val ds3 = sql("SELECT 'Some String' AS b, 1279869254 AS
> a").as[AggData]
> > +assert(ds3.select(NameAgg.toColumn).schema.head.nullable === true)
>
> Why do we assert predicates? If it's true, it's true already (no need
> to compare whether it's true or not). I'd vote to "fix" it.
>
> Jacek
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Dataset and Aggregator API pain points

2016-07-02 Thread Koert Kuipers
after working with the Dataset and Aggregator apis for a few weeks porting
some fairly complex RDD algos (an overall pleasant experience) i wanted to
summarize the pain points and some suggestions for improvement given my
experience. all of these are already mentioned on mailing list or jira, but
i figured its good to put them in one place.
see below.
best,
koert

*) a lot of practical aggregation functions do not have a zero. this can be
dealt with correctly using null or None as the zero for Aggregator. in
algebird for example this is expressed as converting an algebird.Aggregator
(which does not have a zero) into a algebird.MonoidAggregator (which does
have a zero, so similar to spark Aggregator) by lifting it. see:
https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala#L420
something similar should be possible in spark. however currently Aggregator
does not like its zero to be null or an Option, making this approach
difficult. see:
https://www.mail-archive.com/user@spark.apache.org/msg53106.html
https://issues.apache.org/jira/browse/SPARK-15810

*) KeyValueGroupedDataset.reduceGroups needs to be efficient, probably
using an Aggregator (with null or None as the zero) under the hood. the
current implementation does a flatMapGroups which is suboptimal.

*) KeyValueGroupedDataset needs mapValues. without this porting many algos
from RDD to Dataset is difficult and clumsy. see:
https://issues.apache.org/jira/browse/SPARK-15780

*) Aggregators need to also work within DataFrames (so
RelationalGroupedDataset) without having to fall back on using Row objects
as input. otherwise all code ends up being written twice, once for
Aggregator and once for UserDefinedAggregateFunction/UDAF. this doesn't
make sense to me. my attempt at addressing this:
https://issues.apache.org/jira/browse/SPARK-15769
https://github.com/apache/spark/pull/13512

best,
koert


Re: branch-2.0 is now 2.0.1-SNAPSHOT?

2016-07-02 Thread Koert Kuipers
that helps, now i know i simply need to look at master

On Sat, Jul 2, 2016 at 1:37 PM, Sean Owen <so...@cloudera.com> wrote:

> So, on the one hand I think branch-2.0 should really still be on
> 2.0.0-SNAPSHOT but is on 2.0.1-SNAPSHOT, and while master should
> technically be on 2.1.0-SNAPSHOT but we can't quite because of MiMa
> right now, I do see that both snapshots are being produced still:
>
>
> https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-core_2.11/
>
> 2.0.0-SNAPSHOT is actually from master, kinda confusingly. Not sure if
> that helps.
>
> On Sat, Jul 2, 2016 at 5:25 PM, Koert Kuipers <ko...@tresata.com> wrote:
> > You do, snapshots for spark 2.0.0-SNAPSHOT are updated daily on the
> apache
> > snapshot repo. I use them in our own unit tests to find regressions etc.
> in
> > spark and report them back
> >
> > On Jul 2, 2016 3:35 AM, "Sean Owen" <so...@cloudera.com> wrote:
> >>
> >> Yeah, interesting question about whether it should be 2.0.1-SNAPSHOT
> >> at this stage because 2.0.0 is not yet released. But I'm not sure we
> >> publish snapshots anyway?
> >>
> >> On Sat, Jul 2, 2016 at 5:41 AM, Koert Kuipers <ko...@tresata.com>
> wrote:
> >> > is that correct?
> >> > where do i get the latest 2.0.0-SNAPSHOT?
> >> > thanks,
> >> > koert
>


Re: branch-2.0 is now 2.0.1-SNAPSHOT?

2016-07-02 Thread Koert Kuipers
You do, snapshots for spark 2.0.0-SNAPSHOT are updated daily on the apache
snapshot repo. I use them in our own unit tests to find regressions etc. in
spark and report them back
On Jul 2, 2016 3:35 AM, "Sean Owen" <so...@cloudera.com> wrote:

> Yeah, interesting question about whether it should be 2.0.1-SNAPSHOT
> at this stage because 2.0.0 is not yet released. But I'm not sure we
> publish snapshots anyway?
>
> On Sat, Jul 2, 2016 at 5:41 AM, Koert Kuipers <ko...@tresata.com> wrote:
> > is that correct?
> > where do i get the latest 2.0.0-SNAPSHOT?
> > thanks,
> > koert
>


  1   2   3   >