Re: Pandas UDF cogroup.applyInPandas with multiple dataframes

2023-02-07 Thread Li Jin
I am not a Spark committer and haven't been working on Spark for a while.
However, I was heavily involved in the original cogroup work and we are
using cogroup functionality pretty heavily and I want to give my two cents
here.

I think this is a nice improvement and I hope someone from the PySpark side
can take a look at this.

On Mon, Feb 6, 2023 at 5:29 AM Santosh Pingale
 wrote:

> Created  a PR: https://github.com/apache/spark/pull/39902
>
>
> On 24 Jan 2023, at 15:04, Santosh Pingale 
> wrote:
>
> Hey all
>
> I have an interesting problem in hand. We have cases where we want to pass
> multiple(20 to 30) data frames to cogroup.applyInPandas function.
>
> RDD currently supports cogroup with upto 4 dataframes
> (ZippedPartitionsRDD4)  where as cogroup with pandas can handle only 2
> dataframes (with ZippedPartitionsRDD2). In our use case, we do not have
> much control over how many data frames we may need in the
> cogroup.applyInPandas function.
>
> To achieve this, we can:
> (a) Implement ZippedPartitionsRDD5,
> ZippedPartitionsRDD..ZippedPartitionsRDD30..ZippedPartitionsRDD50 with
> respective iterators, serializers and so on. This ensures we keep type
> safety intact but a lot more boilerplate code has to be written to achieve
> this.
> (b) Do not use cogroup.applyInPandas, rather use RDD.keyBy.cogroup and
> then getItem in a nested fashion. Then convert data to pandas df in the
> python function. This looks like a good workaround but mistakes are very
> easy to happen. We also don't look at typesafety here from user's point of
> view.
> (c) Implement ZippedPartitionsRDDN and NaryLike with childrenNodes type
> set to Seq[T] which allows for arbitrary number of children to be set. Here
> we have very little boilerplate but we sacrifice type safety.
> (d) ... some new suggestions... ?
>
> I have done preliminary work on option (c). It works like a charm but
> before I proceed, is my concern about sacrificed type safety overblown, and
> do we have an approach (d)?
> (a) is something that is too much of an investment for it to be useful.
> (b) is okay enough workaround, but it is not very efficient.
>
>
>


Re: How does PySpark send "import" to the worker when executing Python UDFs?

2022-07-19 Thread Li Jin
Aha I see. Thanks Hyukjin!

On Tue, Jul 19, 2022 at 9:09 PM Hyukjin Kwon  wrote:

> This is done by cloudpickle. They pickle global variables referred within
> the func together, and register it to the global imported modules.
>
> On Wed, 20 Jul 2022 at 00:55, Li Jin  wrote:
>
>> Hi,
>>
>> I have a question about how does "imports" get send to the python worker.
>>
>> For example, I have
>>
>> def foo(x):
>> return np.abs(x)
>>
>> If I run this code directly, it obviously failed (because np is undefined
>> on the driver process):
>>
>> sc.paralleilize([1, 2, 3]).map(foo).collect()
>>
>> However, if I add the import statement "import numpy as np" on the
>> driver, it works. So somehow driver is sending that "imports" to the worker
>> when executing foo on the worker but I cannot seem t o find the code that
>> does this - Can someone please send me a pointer?
>>
>> Thanks,
>> Li
>>
>


How does PySpark send "import" to the worker when executing Python UDFs?

2022-07-19 Thread Li Jin
Hi,

I have a question about how does "imports" get send to the python worker.

For example, I have

def foo(x):
return np.abs(x)

If I run this code directly, it obviously failed (because np is undefined
on the driver process):

sc.paralleilize([1, 2, 3]).map(foo).collect()

However, if I add the import statement "import numpy as np" on the driver,
it works. So somehow driver is sending that "imports" to the worker when
executing foo on the worker but I cannot seem t o find the code that does
this - Can someone please send me a pointer?

Thanks,
Li


Checkpoint and recomputation

2020-01-03 Thread Li Jin
Hi dear devs,

I recently came across checkpoint functionality in Spark and found (a
little surprising) that checkpoint causes the DataFrame to be computed
twice unless cache is called before checkpoint.

My guess is that this is probably hard to fix and/or maybe checkpoint
feature is not very frequently used?


Re: Revisiting Python / pandas UDF (new proposal)

2020-01-02 Thread Li Jin
I am going to review this carefully today. Thanks for the work!

Li

On Wed, Jan 1, 2020 at 10:34 PM Hyukjin Kwon  wrote:

> Thanks for comments Maciej - I am addressing them.
> adding Li Jin too.
>
> I plan to proceed this late this week or early next week to make it on
> time before code freeze.
> I am going to pretty actively respond so please give feedback if there's
> any :-).
>
>
>
> 2019년 12월 30일 (월) 오후 6:45, Hyukjin Kwon 님이 작성:
>
>> Hi all,
>>
>> I happen to come up with another idea about pandas redesign.
>> Thanks Reynold, Bryan, Xiangrui, Takuya and Tim for offline discussions
>> and
>> helping me to write this proposal.
>>
>> Please take a look and let me know what you guys think.
>>
>> -
>> https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing
>> - https://issues.apache.org/jira/browse/SPARK-28264
>>
>> I know it's a holiday season but please have some time to take a look so
>> we can make it on time before code freeze (31st Jan).
>>
>>


Debug "Java gateway process exited before sending the driver its port number"

2019-11-27 Thread Li Jin
Dear Spark devs,

I am debugging a weird "Java gateway process exited before sending the
driver its port number" when creating SparkSession with pyspark. I am
running the following simple code with pytest:

"

from pyspark.sql import SparkSession


def test_spark():

spark = SparkSession.builder.getOrCreate()

spark.range(10).show()
"

(I've setup an conda environment and conda installed pyspark version 2.4.4)

I am wondering if there is a way for me to find the stdout/stderr/log for
the Java driver process to see what's going on?

Help much appreciated!
Thanks,
Li


Re: Spark 3.0 preview release feature list and major changes

2019-10-08 Thread Li Jin
Thanks for summary!

I have a question that is semi-related - What's the process to propose a
feature to be included in the final Spark 3.0 release?

In particular, I am interested in
https://issues.apache.org/jira/browse/SPARK-28006.  I am happy to do the
work so want to make sure I don't miss the "cut" date.

On Tue, Oct 8, 2019 at 4:53 PM Xingbo Jiang  wrote:

> Hi all,
>
> Thanks for all the feedbacks, here is the updated feature list:
>
> SPARK-11215  Multiple
> columns support added to various Transformers: StringIndexer
>
> SPARK-11150  Implement
> Dynamic Partition Pruning
>
> SPARK-13677  Support
> Tree-Based Feature Transformation
>
> SPARK-16692  Add
> MultilabelClassificationEvaluator
>
> SPARK-19591  Add
> sample weights to decision trees
>
> SPARK-19712  Pushing
> Left Semi and Left Anti joins through Project, Aggregate, Window, Union etc.
>
> SPARK-19827  R API for
> Power Iteration Clustering
>
> SPARK-20286  Improve
> logic for timing out executors in dynamic allocation
>
> SPARK-20636  Eliminate
> unnecessary shuffle with adjacent Window expressions
>
> SPARK-22148  Acquire
> new executors to avoid hang because of blacklisting
>
> SPARK-22796  Multiple
> columns support added to various Transformers: PySpark QuantileDiscretizer
>
> SPARK-23128  A new
> approach to do adaptive execution in Spark SQL
>
> SPARK-23155  Apply
> custom log URL pattern for executor log URLs in SHS
>
> SPARK-23539  Add
> support for Kafka headers
>
> SPARK-23674  Add Spark
> ML Listener for Tracking ML Pipeline Status
>
> SPARK-23710  Upgrade
> the built-in Hive to 2.3.5 for hadoop-3.2
>
> SPARK-24333  Add fit
> with validation set to Gradient Boosted Trees: Python API
>
> SPARK-24417  Build and
> Run Spark on JDK11
>
> SPARK-24615 
> Accelerator-aware task scheduling for Spark
>
> SPARK-24920  Allow
> sharing Netty's memory pool allocators
>
> SPARK-25250  Fix race
> condition with tasks running when new attempt for same stage is created
> leads to other task in the next attempt running on the same partition id
> retry multiple times
>
> SPARK-25341  Support
> rolling back a shuffle map stage and re-generate the shuffle files
>
> SPARK-25348  Data
> source for binary files
>
> SPARK-25501  Add kafka
> delegation token support
>
> SPARK-25603 
> Generalize Nested Column Pruning
>
> SPARK-26132  Remove
> support for Scala 2.11 in Spark 3.0.0
>
> SPARK-26215  define
> reserved keywords after SQL standard
>
> SPARK-26412  Allow
> Pandas UDF to take an iterator of pd.DataFrames
>
> SPARK-26759  Arrow
> optimization in SparkR's interoperability
>
> SPARK-26785  data
> source v2 API refactor: streaming write
>
> SPARK-26848  Introduce
> new option to Kafka source: offset by timestamp (starting/ending)
>
> SPARK-26956  remove
> streaming output mode from data source v2 APIs
>
> SPARK-27064  create
> StreamingWrite at the beginning of streaming execution
>
> SPARK-27119  Do not
> infer schema when reading Hive serde table with native data source
>
> SPARK-27225  Implement
> join strategy hints
>
> SPARK-27240  Use
> pandas DataFrame for struct type argument in Scalar Pandas UDF
>
> SPARK-27338 

Re: Welcoming some new committers and PMC members

2019-09-17 Thread Li Jin
Congrats to all!

On Tue, Sep 17, 2019 at 6:51 PM Bryan Cutler  wrote:

> Congratulations, all well deserved!
>
> On Thu, Sep 12, 2019, 3:32 AM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> What a great news! Congrats to all awarded and the community for voting
>> them in!
>>
>> p.s. I think it should go to the user mailing list too.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> The Internals of Spark SQL https://bit.ly/spark-sql-internals
>> The Internals of Spark Structured Streaming
>> https://bit.ly/spark-structured-streaming
>> The Internals of Apache Kafka https://bit.ly/apache-kafka-internals
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>>
>> On Tue, Sep 10, 2019 at 2:32 AM Matei Zaharia 
>> wrote:
>>
>>> Hi all,
>>>
>>> The Spark PMC recently voted to add several new committers and one PMC
>>> member. Join me in welcoming them to their new roles!
>>>
>>> New PMC member: Dongjoon Hyun
>>>
>>> New committers: Ryan Blue, Liang-Chi Hsieh, Gengliang Wang, Yuming Wang,
>>> Weichen Xu, Ruifeng Zheng
>>>
>>> The new committers cover lots of important areas including ML, SQL, and
>>> data sources, so it’s great to have them here. All the best,
>>>
>>> Matei and the Spark PMC
>>>
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>


Re: Thoughts on dataframe cogroup?

2019-04-17 Thread Li Jin
I have left some comments. This looks a good proposal to me.

As a heavy pyspark user, this is a pattern that we see over and over again
and I think could be pretty high value to other pyspark users as well. The
fact that Chris and I come to same ideas sort of verifies my intuition.
Also, this isn't really something new, RDD has cogroup function from very
early on.

With that being said, I'd like to call out again for community's feedback
on the proposal.

On Mon, Apr 15, 2019 at 4:57 PM Chris Martin  wrote:

> Ah sorry- I've updated the link which should give you access.  Can you try
> again now?
>
> thanks,
>
> Chris
>
>
>
> On Mon, Apr 15, 2019 at 9:49 PM Li Jin  wrote:
>
>> Hi Chris,
>>
>> Thanks! The permission to the google doc is maybe not set up properly. I
>> cannot view the doc by default.
>>
>> Li
>>
>> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin 
>> wrote:
>>
>>> I've updated the jira so that the main body is now inside a google doc.
>>> Anyone should be able to comment- if you want/need write access please drop
>>> me a mail and I can add you.
>>>
>>> Ryan- regarding your specific point regarding why I'm not proposing to
>>> add this to the Scala API, I think the main point is that Scala users can
>>> already use Cogroup for Datasets.  For Scala this is probably a better
>>> solution as (as far as I know) there is no Scala DataFrame library that
>>> could be used in place of Pandas for manipulating  local DataFrames. As a
>>> result you'd probably be left with dealing with Iterators of Row objects,
>>> which almost certainly isn't what you'd want. This is similar to the
>>> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>>>
>>> I do think there might be a place for allowing a (Scala) DataSet Cogroup
>>> to take some sort of grouping expression as the grouping key  (this would
>>> mean that you wouldn't have to marshal the key into a JVM object and could
>>> possible lend itself to some catalyst optimisations) but I don't think that
>>> this should be done as part of this SPIP.
>>>
>>> thanks,
>>>
>>> Chris
>>>
>>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue  wrote:
>>>
>>>> I agree, it would be great to have a document to comment on.
>>>>
>>>> The main thing that stands out right now is that this is only for
>>>> PySpark and states that it will not be added to the Scala API. Why not make
>>>> this available since most of the work would be done?
>>>>
>>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin  wrote:
>>>>
>>>>> Thank you Chris, this looks great.
>>>>>
>>>>> Would you mind share a google doc version of the proposal? I believe
>>>>> that's the preferred way of discussing proposals (Other people please
>>>>> correct me if I am wrong).
>>>>>
>>>>> Li
>>>>>
>>>>> On Mon, Apr 15, 2019 at 8:20 AM  wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>>>
>>>>>> All feedback welcome!
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>> On 9 Apr 2019, at 13:22, Chris Martin  wrote:
>>>>>>
>>>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have
>>>>>> the SPIP ready in the next couple of days.
>>>>>>
>>>>>> thanks,
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler 
>>>>>> wrote:
>>>>>>
>>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't
>>>>>>> be too difficult to extend the currently functionality to transfer 
>>>>>>> multiple
>>>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>>>> think it's necessary to include details of the Python worker, we can 
>>>>>>> hash
>>>>>>> that out after the SPIP is approved.
>>>>>>>
>>>>>>> Bryan
>>>>>>>
>>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin 
>>>>>>> wrote:
>>>>>>>

Re: Thoughts on dataframe cogroup?

2019-04-15 Thread Li Jin
Hi Chris,

Thanks! The permission to the google doc is maybe not set up properly. I
cannot view the doc by default.

Li

On Mon, Apr 15, 2019 at 3:58 PM Chris Martin  wrote:

> I've updated the jira so that the main body is now inside a google doc.
> Anyone should be able to comment- if you want/need write access please drop
> me a mail and I can add you.
>
> Ryan- regarding your specific point regarding why I'm not proposing to add
> this to the Scala API, I think the main point is that Scala users can
> already use Cogroup for Datasets.  For Scala this is probably a better
> solution as (as far as I know) there is no Scala DataFrame library that
> could be used in place of Pandas for manipulating  local DataFrames. As a
> result you'd probably be left with dealing with Iterators of Row objects,
> which almost certainly isn't what you'd want. This is similar to the
> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>
> I do think there might be a place for allowing a (Scala) DataSet Cogroup
> to take some sort of grouping expression as the grouping key  (this would
> mean that you wouldn't have to marshal the key into a JVM object and could
> possible lend itself to some catalyst optimisations) but I don't think that
> this should be done as part of this SPIP.
>
> thanks,
>
> Chris
>
> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue  wrote:
>
>> I agree, it would be great to have a document to comment on.
>>
>> The main thing that stands out right now is that this is only for PySpark
>> and states that it will not be added to the Scala API. Why not make this
>> available since most of the work would be done?
>>
>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin  wrote:
>>
>>> Thank you Chris, this looks great.
>>>
>>> Would you mind share a google doc version of the proposal? I believe
>>> that's the preferred way of discussing proposals (Other people please
>>> correct me if I am wrong).
>>>
>>> Li
>>>
>>> On Mon, Apr 15, 2019 at 8:20 AM  wrote:
>>>
>>>> Hi,
>>>>
>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>
>>>> All feedback welcome!
>>>>
>>>> Chris
>>>>
>>>> On 9 Apr 2019, at 13:22, Chris Martin  wrote:
>>>>
>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have
>>>> the SPIP ready in the next couple of days.
>>>>
>>>> thanks,
>>>>
>>>> Chris
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler  wrote:
>>>>
>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be
>>>>> too difficult to extend the currently functionality to transfer multiple
>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>> think it's necessary to include details of the Python worker, we can hash
>>>>> that out after the SPIP is approved.
>>>>>
>>>>> Bryan
>>>>>
>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin  wrote:
>>>>>
>>>>>> Thanks Chris, look forward to it.
>>>>>>
>>>>>> I think sending multiple dataframes to the python worker requires
>>>>>> some changes but shouldn't be too difficult. We can probably sth like:
>>>>>>
>>>>>>
>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>
>>>>>> In:
>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>
>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>
>>>>>> Li
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 8, 2019 at 5:55 AM  wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Just to say, I really do think this is useful and am currently
>>>>>>> working on a SPIP to formally propose this. One concern I do have, 
>>>>>>> however,
>>>>>>> is that the current arrow serialization code is tied to passing through 
>>>>>>> a
>>>>>>> single dataframe as the udf parameter and so any modification to allow
>>>&g

Re: Thoughts on dataframe cogroup?

2019-04-15 Thread Li Jin
Thank you Chris, this looks great.

Would you mind share a google doc version of the proposal? I believe that's
the preferred way of discussing proposals (Other people please correct me
if I am wrong).

Li

On Mon, Apr 15, 2019 at 8:20 AM  wrote:

> Hi,
>
>  As promised I’ve raised SPARK-27463 for this.
>
> All feedback welcome!
>
> Chris
>
> On 9 Apr 2019, at 13:22, Chris Martin  wrote:
>
> Thanks Bryan and Li, that is much appreciated.  Hopefully should have the
> SPIP ready in the next couple of days.
>
> thanks,
>
> Chris
>
>
>
>
> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler  wrote:
>
>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be too
>> difficult to extend the currently functionality to transfer multiple
>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>> think it's necessary to include details of the Python worker, we can hash
>> that out after the SPIP is approved.
>>
>> Bryan
>>
>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin  wrote:
>>
>>> Thanks Chris, look forward to it.
>>>
>>> I think sending multiple dataframes to the python worker requires some
>>> changes but shouldn't be too difficult. We can probably sth like:
>>>
>>>
>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>
>>> In:
>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>
>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>
>>> Li
>>>
>>>
>>> On Mon, Apr 8, 2019 at 5:55 AM  wrote:
>>>
>>>> Hi,
>>>>
>>>> Just to say, I really do think this is useful and am currently working
>>>> on a SPIP to formally propose this. One concern I do have, however, is that
>>>> the current arrow serialization code is tied to passing through a single
>>>> dataframe as the udf parameter and so any modification to allow multiple
>>>> dataframes may not be straightforward.  If anyone has any ideas as to how
>>>> this might be achieved in an elegant manner I’d be happy to hear them!
>>>>
>>>> Thanks,
>>>>
>>>> Chris
>>>>
>>>> On 26 Feb 2019, at 14:55, Li Jin  wrote:
>>>>
>>>> Thank you both for the reply. Chris and I have very similar use cases
>>>> for cogroup.
>>>>
>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>> feels very similar and can be an extension to the groupby apply + pandas
>>>> UDF functionality.
>>>>
>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>
>>>> On Tue, Feb 26, 2019 at 2:17 AM  wrote:
>>>>
>>>>> Just to add to this I’ve also implemented my own cogroup previously
>>>>> and would welcome a cogroup for datafame.
>>>>>
>>>>> My specific use case was that I had a large amount of time series
>>>>> data. Spark has very limited support for time series (specifically as-of
>>>>> joins), but pandas has good support.
>>>>>
>>>>> My solution was to take my two dataframes and perform a group by and
>>>>> collect list on each. The resulting arrays could be passed into a udf 
>>>>> where
>>>>> they could be marshaled into a couple of pandas dataframes and processed
>>>>> using pandas excellent time series functionality.
>>>>>
>>>>> If cogroup was available natively on dataframes this would have been a
>>>>> bit nicer. The ideal would have been some pandas udf version of cogroup
>>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>>
>>>>> Chris
>>>>>
>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy 
>>>>> wrote:
>>>>>
>>>>> For info, in our team have defined our own cogroup on dataframe in the
>>>>> past on different projects using different methods (rdd[row] based or 
>>>>> union
>>>>> all collect list based).
>>>>>
>>>>> I might be biased, but find the approach very useful in project to
>>>>> simplify and speed up transformations, and re

Re: Thoughts on dataframe cogroup?

2019-04-08 Thread Li Jin
Thanks Chris, look forward to it.

I think sending multiple dataframes to the python worker requires some
changes but shouldn't be too difficult. We can probably sth like:

[numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]

In:
https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70

And have ArrowPythonRunner take multiple input iterator/schema.

Li


On Mon, Apr 8, 2019 at 5:55 AM  wrote:

> Hi,
>
> Just to say, I really do think this is useful and am currently working on
> a SPIP to formally propose this. One concern I do have, however, is that
> the current arrow serialization code is tied to passing through a single
> dataframe as the udf parameter and so any modification to allow multiple
> dataframes may not be straightforward.  If anyone has any ideas as to how
> this might be achieved in an elegant manner I’d be happy to hear them!
>
> Thanks,
>
> Chris
>
> On 26 Feb 2019, at 14:55, Li Jin  wrote:
>
> Thank you both for the reply. Chris and I have very similar use cases for
> cogroup.
>
> One of the goals for groupby apply + pandas UDF was to avoid things like
> collect list and reshaping data between Spark and Pandas. Cogroup feels
> very similar and can be an extension to the groupby apply + pandas UDF
> functionality.
>
> I wonder if any PMC/committers have any thoughts/opinions on this?
>
> On Tue, Feb 26, 2019 at 2:17 AM  wrote:
>
>> Just to add to this I’ve also implemented my own cogroup previously and
>> would welcome a cogroup for datafame.
>>
>> My specific use case was that I had a large amount of time series data.
>> Spark has very limited support for time series (specifically as-of joins),
>> but pandas has good support.
>>
>> My solution was to take my two dataframes and perform a group by and
>> collect list on each. The resulting arrays could be passed into a udf where
>> they could be marshaled into a couple of pandas dataframes and processed
>> using pandas excellent time series functionality.
>>
>> If cogroup was available natively on dataframes this would have been a
>> bit nicer. The ideal would have been some pandas udf version of cogroup
>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>
>> Chris
>>
>> On 26 Feb 2019, at 00:38, Jonathan Winandy 
>> wrote:
>>
>> For info, in our team have defined our own cogroup on dataframe in the
>> past on different projects using different methods (rdd[row] based or union
>> all collect list based).
>>
>> I might be biased, but find the approach very useful in project to
>> simplify and speed up transformations, and remove a lot of intermediate
>> stages (distinct + join => just cogroup).
>>
>> Plus spark 2.4 introduced a lot of new operator for nested data. That's a
>> win!
>>
>>
>> On Thu, 21 Feb 2019, 17:38 Li Jin,  wrote:
>>
>>> I am wondering do other people have opinion/use case on cogroup?
>>>
>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin  wrote:
>>>
>>>> Alessandro,
>>>>
>>>> Thanks for the reply. I assume by "equi-join", you mean "equality  full
>>>> outer join" .
>>>>
>>>> Two issues I see with equity outer join is:
>>>> (1) equity outer join will give n * m rows for each key (n and m being
>>>> the corresponding number of rows in df1 and df2 for each key)
>>>> (2) User needs to do some extra processing to transform n * m back to
>>>> the desired shape (two sub dataframes with n and m rows)
>>>>
>>>> I think full outer join is an inefficient way to implement cogroup. If
>>>> the end goal is to have two separate dataframes for each key, why joining
>>>> them first and then unjoin them?
>>>>
>>>>
>>>>
>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>> alessandro.solima...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>> I fail to see how an equi-join on the key columns is different than
>>>>> the cogroup you propose.
>>>>>
>>>>> I think the accepted answer can shed some light:
>>>>>
>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>
>>>>> Now you apply an udf on each iterable, one per key value (obtained
>>>>> with cogroup).
>>>>>
>>>

Re: [pyspark] dataframe map_partition

2019-03-08 Thread Li Jin
Hi,

Pandas UDF supports input as struct type. However, note that it will be
turned into python dict because pandas itself does not have native struct
type.
On Fri, Mar 8, 2019 at 2:55 PM peng yu  wrote:

> Yeah, that seems most likely i have wanted, does the scalar Pandas UDF
> support input is a StructType too ?
>
> On Fri, Mar 8, 2019 at 2:25 PM Bryan Cutler  wrote:
>
>> Hi Peng,
>>
>> I just added support for scalar Pandas UDF to return a StructType as a
>> Pandas DataFrame in https://issues.apache.org/jira/browse/SPARK-23836.
>> Is that the functionality you are looking for?
>>
>> Bryan
>>
>> On Thu, Mar 7, 2019 at 1:13 PM peng yu  wrote:
>>
>>> right now, i'm using the colums-at-a-time mapping
>>> https://github.com/yupbank/tf-spark-serving/blob/master/tss/utils.py#L129
>>>
>>>
>>>
>>>
>>> On Thu, Mar 7, 2019 at 4:00 PM Sean Owen  wrote:
>>>
 Maybe, it depends on what you're doing. It sounds like you are trying
 to do row-at-a-time mapping, even on a pandas DataFrame. Is what
 you're doing vectorized? may not help much.
 Just make the pandas Series into a DataFrame if you want? and a single
 col back to Series?

 On Thu, Mar 7, 2019 at 2:45 PM peng yu  wrote:
 >
 > pandas/arrow is for the memory efficiency, and mapPartitions is only
 available to rdds, for sure i can do everything in rdd.
 >
 > But i thought that's the whole point of having pandas_udf, so my
 program run faster and consumes less memory ?
 >
 > On Thu, Mar 7, 2019 at 3:40 PM Sean Owen  wrote:
 >>
 >> Are you just applying a function to every row in the DataFrame? you
 >> don't need pandas at all. Just get the RDD of Row from it and map a
 >> UDF that makes another Row, and go back to DataFrame. Or make a UDF
 >> that operates on all columns and returns a new value. mapPartitions
 is
 >> also available if you want to transform an iterator of Row to another
 >> iterator of Row.
 >>
 >> On Thu, Mar 7, 2019 at 2:33 PM peng yu  wrote:
 >> >
 >> > it is very similar to SCALAR, but for SCALAR the output can't be
 struct/row and the input has to be pd.Series, which doesn't support a row.
 >> >
 >> > I'm doing tensorflow batch inference in spark,
 https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
 >> >
 >> > Which i have to do the groupBy in order to use the apply function,
 i'm wondering why not just enable apply to df ?
 >> >
 >> > On Thu, Mar 7, 2019 at 3:15 PM Sean Owen  wrote:
 >> >>
 >> >> Are you looking for SCALAR? that lets you map one row to one row,
 but
 >> >> do it more efficiently in batch. What are you trying to do?
 >> >>
 >> >> On Thu, Mar 7, 2019 at 2:03 PM peng yu  wrote:
 >> >> >
 >> >> > I'm looking for a mapPartition(pandas_udf) for  a
 pyspark.Dataframe.
 >> >> >
 >> >> > ```
 >> >> > @pandas_udf(df.schema, PandasUDFType.MAP)
 >> >> > def do_nothing(pandas_df):
 >> >> > return pandas_df
 >> >> >
 >> >> >
 >> >> > new_df = df.mapPartition(do_nothing)
 >> >> > ```
 >> >> > pandas_udf only support scala or GROUPED_MAP.  Why not support
 just Map?
 >> >> >
 >> >> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen 
 wrote:
 >> >> >>
 >> >> >> Are you looking for @pandas_udf in Python? Or just
 mapPartition? Those exist already
 >> >> >>
 >> >> >> On Thu, Mar 7, 2019, 1:43 PM peng yu 
 wrote:
 >> >> >>>
 >> >> >>> There is a nice map_partition function in R `dapply`.  so
 that user can pass a row to udf.
 >> >> >>>
 >> >> >>> I'm wondering why we don't have that in python?
 >> >> >>>
 >> >> >>> I'm trying to have a map_partition function with pandas_udf
 supported
 >> >> >>>
 >> >> >>> thanks!

>>>


Re: Thoughts on dataframe cogroup?

2019-02-26 Thread Li Jin
Thank you both for the reply. Chris and I have very similar use cases for
cogroup.

One of the goals for groupby apply + pandas UDF was to avoid things like
collect list and reshaping data between Spark and Pandas. Cogroup feels
very similar and can be an extension to the groupby apply + pandas UDF
functionality.

I wonder if any PMC/committers have any thoughts/opinions on this?

On Tue, Feb 26, 2019 at 2:17 AM  wrote:

> Just to add to this I’ve also implemented my own cogroup previously and
> would welcome a cogroup for datafame.
>
> My specific use case was that I had a large amount of time series data.
> Spark has very limited support for time series (specifically as-of joins),
> but pandas has good support.
>
> My solution was to take my two dataframes and perform a group by and
> collect list on each. The resulting arrays could be passed into a udf where
> they could be marshaled into a couple of pandas dataframes and processed
> using pandas excellent time series functionality.
>
> If cogroup was available natively on dataframes this would have been a bit
> nicer. The ideal would have been some pandas udf version of cogroup that
> gave me a pandas dataframe for each spark dataframe in the cogroup!
>
> Chris
>
> On 26 Feb 2019, at 00:38, Jonathan Winandy 
> wrote:
>
> For info, in our team have defined our own cogroup on dataframe in the
> past on different projects using different methods (rdd[row] based or union
> all collect list based).
>
> I might be biased, but find the approach very useful in project to
> simplify and speed up transformations, and remove a lot of intermediate
> stages (distinct + join => just cogroup).
>
> Plus spark 2.4 introduced a lot of new operator for nested data. That's a
> win!
>
>
> On Thu, 21 Feb 2019, 17:38 Li Jin,  wrote:
>
>> I am wondering do other people have opinion/use case on cogroup?
>>
>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin  wrote:
>>
>>> Alessandro,
>>>
>>> Thanks for the reply. I assume by "equi-join", you mean "equality  full
>>> outer join" .
>>>
>>> Two issues I see with equity outer join is:
>>> (1) equity outer join will give n * m rows for each key (n and m being
>>> the corresponding number of rows in df1 and df2 for each key)
>>> (2) User needs to do some extra processing to transform n * m back to
>>> the desired shape (two sub dataframes with n and m rows)
>>>
>>> I think full outer join is an inefficient way to implement cogroup. If
>>> the end goal is to have two separate dataframes for each key, why joining
>>> them first and then unjoin them?
>>>
>>>
>>>
>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>> alessandro.solima...@gmail.com> wrote:
>>>
>>>> Hello,
>>>> I fail to see how an equi-join on the key columns is different than the
>>>> cogroup you propose.
>>>>
>>>> I think the accepted answer can shed some light:
>>>>
>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>
>>>> Now you apply an udf on each iterable, one per key value (obtained with
>>>> cogroup).
>>>>
>>>> You can achieve the same by:
>>>> 1) join df1 and df2 on the key you want,
>>>> 2) apply "groupby" on such key
>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>> familiar with them
>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>> that will process each group "in isolation".
>>>>
>>>> HTH,
>>>> Alessandro
>>>>
>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have been using Pyspark's groupby().apply() quite a bit and it has
>>>>> been very helpful in integrating Spark with our existing pandas-heavy
>>>>> libraries.
>>>>>
>>>>> Recently, we have found more and more cases where groupby().apply() is
>>>>> not sufficient - In some cases, we want to group two dataframes by the 
>>>>> same
>>>>> key, and apply a function which takes two pd.DataFrame (also returns a
>>>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>>>> operation in the RDD API.
>>>>>
>>>>> It would be great to be able to do sth like this: (not actual API,
>>>>> just to explain the use case):
>>>>>
>>>>> @pandas_udf(return_schema, ...)
>>>>> def my_udf(pdf1, pdf2)
>>>>>  # pdf1 and pdf2 are the subset of the original dataframes that is
>>>>> associated with a particular key
>>>>>  result = ... # some code that uses pdf1 and pdf2
>>>>>  return result
>>>>>
>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>
>>>>> I have searched around the problem and some people have suggested to
>>>>> join the tables first. However, it's often not the same pattern and hard 
>>>>> to
>>>>> get it to work by using joins.
>>>>>
>>>>> I wonder what are people's thought on this?
>>>>>
>>>>> Li
>>>>>
>>>>>


Re: Thoughts on dataframe cogroup?

2019-02-21 Thread Li Jin
I am wondering do other people have opinion/use case on cogroup?

On Wed, Feb 20, 2019 at 5:03 PM Li Jin  wrote:

> Alessandro,
>
> Thanks for the reply. I assume by "equi-join", you mean "equality  full
> outer join" .
>
> Two issues I see with equity outer join is:
> (1) equity outer join will give n * m rows for each key (n and m being the
> corresponding number of rows in df1 and df2 for each key)
> (2) User needs to do some extra processing to transform n * m back to the
> desired shape (two sub dataframes with n and m rows)
>
> I think full outer join is an inefficient way to implement cogroup. If the
> end goal is to have two separate dataframes for each key, why joining them
> first and then unjoin them?
>
>
>
> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
> alessandro.solima...@gmail.com> wrote:
>
>> Hello,
>> I fail to see how an equi-join on the key columns is different than the
>> cogroup you propose.
>>
>> I think the accepted answer can shed some light:
>>
>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>
>> Now you apply an udf on each iterable, one per key value (obtained with
>> cogroup).
>>
>> You can achieve the same by:
>> 1) join df1 and df2 on the key you want,
>> 2) apply "groupby" on such key
>> 3) finally apply a udaf (you can have a look here if you are not familiar
>> with them
>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>> that will process each group "in isolation".
>>
>> HTH,
>> Alessandro
>>
>> On Tue, 19 Feb 2019 at 23:30, Li Jin  wrote:
>>
>>> Hi,
>>>
>>> We have been using Pyspark's groupby().apply() quite a bit and it has
>>> been very helpful in integrating Spark with our existing pandas-heavy
>>> libraries.
>>>
>>> Recently, we have found more and more cases where groupby().apply() is
>>> not sufficient - In some cases, we want to group two dataframes by the same
>>> key, and apply a function which takes two pd.DataFrame (also returns a
>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>> operation in the RDD API.
>>>
>>> It would be great to be able to do sth like this: (not actual API, just
>>> to explain the use case):
>>>
>>> @pandas_udf(return_schema, ...)
>>> def my_udf(pdf1, pdf2)
>>>  # pdf1 and pdf2 are the subset of the original dataframes that is
>>> associated with a particular key
>>>  result = ... # some code that uses pdf1 and pdf2
>>>  return result
>>>
>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>
>>> I have searched around the problem and some people have suggested to
>>> join the tables first. However, it's often not the same pattern and hard to
>>> get it to work by using joins.
>>>
>>> I wonder what are people's thought on this?
>>>
>>> Li
>>>
>>>


Re: Thoughts on dataframe cogroup?

2019-02-20 Thread Li Jin
Alessandro,

Thanks for the reply. I assume by "equi-join", you mean "equality  full
outer join" .

Two issues I see with equity outer join is:
(1) equity outer join will give n * m rows for each key (n and m being the
corresponding number of rows in df1 and df2 for each key)
(2) User needs to do some extra processing to transform n * m back to the
desired shape (two sub dataframes with n and m rows)

I think full outer join is an inefficient way to implement cogroup. If the
end goal is to have two separate dataframes for each key, why joining them
first and then unjoin them?



On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
alessandro.solima...@gmail.com> wrote:

> Hello,
> I fail to see how an equi-join on the key columns is different than the
> cogroup you propose.
>
> I think the accepted answer can shed some light:
>
> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>
> Now you apply an udf on each iterable, one per key value (obtained with
> cogroup).
>
> You can achieve the same by:
> 1) join df1 and df2 on the key you want,
> 2) apply "groupby" on such key
> 3) finally apply a udaf (you can have a look here if you are not familiar
> with them
> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), that
> will process each group "in isolation".
>
> HTH,
> Alessandro
>
> On Tue, 19 Feb 2019 at 23:30, Li Jin  wrote:
>
>> Hi,
>>
>> We have been using Pyspark's groupby().apply() quite a bit and it has
>> been very helpful in integrating Spark with our existing pandas-heavy
>> libraries.
>>
>> Recently, we have found more and more cases where groupby().apply() is
>> not sufficient - In some cases, we want to group two dataframes by the same
>> key, and apply a function which takes two pd.DataFrame (also returns a
>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>> operation in the RDD API.
>>
>> It would be great to be able to do sth like this: (not actual API, just
>> to explain the use case):
>>
>> @pandas_udf(return_schema, ...)
>> def my_udf(pdf1, pdf2)
>>  # pdf1 and pdf2 are the subset of the original dataframes that is
>> associated with a particular key
>>  result = ... # some code that uses pdf1 and pdf2
>>  return result
>>
>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>
>> I have searched around the problem and some people have suggested to join
>> the tables first. However, it's often not the same pattern and hard to get
>> it to work by using joins.
>>
>> I wonder what are people's thought on this?
>>
>> Li
>>
>>


Thoughts on dataframe cogroup?

2019-02-19 Thread Li Jin
Hi,

We have been using Pyspark's groupby().apply() quite a bit and it has been
very helpful in integrating Spark with our existing pandas-heavy libraries.

Recently, we have found more and more cases where groupby().apply() is not
sufficient - In some cases, we want to group two dataframes by the same
key, and apply a function which takes two pd.DataFrame (also returns a
pd.DataFrame) for each key. This feels very much like the "cogroup"
operation in the RDD API.

It would be great to be able to do sth like this: (not actual API, just to
explain the use case):

@pandas_udf(return_schema, ...)
def my_udf(pdf1, pdf2)
 # pdf1 and pdf2 are the subset of the original dataframes that is
associated with a particular key
 result = ... # some code that uses pdf1 and pdf2
 return result

df3  = cogroup(df1, df2, key='some_key').apply(my_udf)

I have searched around the problem and some people have suggested to join
the tables first. However, it's often not the same pattern and hard to get
it to work by using joins.

I wonder what are people's thought on this?

Li


Re: proposal for expanded & consistent timestamp types

2018-12-11 Thread Li Jin
Of course. I added some comments in the doc.

On Tue, Dec 11, 2018 at 12:01 PM Imran Rashid  wrote:

> Hi Li,
>
> thanks for the comments!  I admit I had not thought very much about python
> support, its a good point.  But I'd actually like to clarify one thing
> about the doc -- though it discusses java types, the point is actually
> about having support for these logical types at the SQL level.  The doc
> uses java names instead of SQL names just because there is so much
> confusion around the SQL names, as they haven't been implemented
> consistently.  Once there is support for the additional logical types, then
> we'd absolutely want to get the same support in python.
>
> Its great to hear there are existing python types we can map each behavior
> to.  Could you add a comment on the doc on each of the types, mentioning
> the equivalent in python?
>
> thanks,
> Imran
>
> On Fri, Dec 7, 2018 at 1:33 PM Li Jin  wrote:
>
>> Imran,
>>
>> Thanks for sharing this. When working on interop between Spark and
>> Pandas/Arrow in the past, we also faced some issues due to the different
>> definitions of timestamp in Spark and Pandas/Arrow, because Spark timestamp
>> has Instant semantics and Pandas/Arrow timestamp has either LocalDateTime
>> or OffsetDateTime semantics. (Detailed discussion is in the PR:
>> https://github.com/apache/spark/pull/18664#issuecomment-316554156.)
>>
>> For one I am excited to see this effort going but also would love to see
>> interop of Python to be included/considered in the picture. I don't think
>> it adds much to what has already been proposed already because Python
>> timestamps are basically LocalDateTime or OffsetDateTime.
>>
>> Li
>>
>>
>>
>> On Thu, Dec 6, 2018 at 11:03 AM Imran Rashid 
>> wrote:
>>
>>> Hi,
>>>
>>> I'd like to discuss the future of timestamp support in Spark, in
>>> particular with respect of handling timezones in different SQL types.   In
>>> a nutshell:
>>>
>>> * There are at least 3 different ways of handling the timestamp type
>>> across timezone changes
>>> * We'd like Spark to clearly distinguish the 3 types (it currently
>>> implements 1 of them), in a way that is backwards compatible, and also
>>> compliant with the SQL standard.
>>> * We'll get agreement across Spark, Hive, and Impala.
>>>
>>> Zoltan Ivanfi (Parquet PMC, also my coworker) has written up a detailed
>>> doc, describing the problem in more detail, the state of various SQL
>>> engines, and how we can get to a better state without breaking any current
>>> use cases.  The proposal is good for Spark by itself.  We're also going to
>>> the Hive & Impala communities with this proposal, as its better for
>>> everyone if everything is compatible.
>>>
>>> Note that this isn't proposing a specific implementation in Spark as
>>> yet, just a description of the overall problem and our end goal.  We're
>>> going to each community to get agreement on the overall direction.  Then
>>> each community can figure out specifics as they see fit.  (I don't think
>>> there are any technical hurdles with this approach eg. to decide whether
>>> this would be even possible in Spark.)
>>>
>>> Here's a link to the doc Zoltan has put together.  It is a bit long, but
>>> it explains how such a seemingly simple concept has become such a mess and
>>> how we can get to a better state.
>>>
>>>
>>> https://docs.google.com/document/d/1gNRww9mZJcHvUDCXklzjFEQGpefsuR_akCDfWsdE35Q/edit#heading=h.dq3b1mwkrfky
>>>
>>> Please review the proposal and let us know your opinions, concerns and
>>> suggestions.
>>>
>>> thanks,
>>> Imran
>>>
>>


Re: proposal for expanded & consistent timestamp types

2018-12-07 Thread Li Jin
Imran,

Thanks for sharing this. When working on interop between Spark and
Pandas/Arrow in the past, we also faced some issues due to the different
definitions of timestamp in Spark and Pandas/Arrow, because Spark timestamp
has Instant semantics and Pandas/Arrow timestamp has either LocalDateTime
or OffsetDateTime semantics. (Detailed discussion is in the PR:
https://github.com/apache/spark/pull/18664#issuecomment-316554156.)

For one I am excited to see this effort going but also would love to see
interop of Python to be included/considered in the picture. I don't think
it adds much to what has already been proposed already because Python
timestamps are basically LocalDateTime or OffsetDateTime.

Li



On Thu, Dec 6, 2018 at 11:03 AM Imran Rashid 
wrote:

> Hi,
>
> I'd like to discuss the future of timestamp support in Spark, in
> particular with respect of handling timezones in different SQL types.   In
> a nutshell:
>
> * There are at least 3 different ways of handling the timestamp type
> across timezone changes
> * We'd like Spark to clearly distinguish the 3 types (it currently
> implements 1 of them), in a way that is backwards compatible, and also
> compliant with the SQL standard.
> * We'll get agreement across Spark, Hive, and Impala.
>
> Zoltan Ivanfi (Parquet PMC, also my coworker) has written up a detailed
> doc, describing the problem in more detail, the state of various SQL
> engines, and how we can get to a better state without breaking any current
> use cases.  The proposal is good for Spark by itself.  We're also going to
> the Hive & Impala communities with this proposal, as its better for
> everyone if everything is compatible.
>
> Note that this isn't proposing a specific implementation in Spark as yet,
> just a description of the overall problem and our end goal.  We're going to
> each community to get agreement on the overall direction.  Then each
> community can figure out specifics as they see fit.  (I don't think there
> are any technical hurdles with this approach eg. to decide whether this
> would be even possible in Spark.)
>
> Here's a link to the doc Zoltan has put together.  It is a bit long, but
> it explains how such a seemingly simple concept has become such a mess and
> how we can get to a better state.
>
>
> https://docs.google.com/document/d/1gNRww9mZJcHvUDCXklzjFEQGpefsuR_akCDfWsdE35Q/edit#heading=h.dq3b1mwkrfky
>
> Please review the proposal and let us know your opinions, concerns and
> suggestions.
>
> thanks,
> Imran
>


Re: Helper methods for PySpark discussion

2018-10-26 Thread Li Jin
> (2) If the method forces evaluation this matches most obvious way that
would implemented then we should add it with a note in the docstring

I am not sure about this because force evaluation could be something that
has side effect. For example, df.count() can realize a cache and if we
implement __len__ to call df.count() then len(df) would end up populating
some cache and can be unintuitive.

On Fri, Oct 26, 2018 at 1:21 PM Leif Walsh  wrote:

> That all sounds reasonable but I think in the case of 4 and maybe also 3 I
> would rather see it implemented to raise an error message that explains
> what’s going on and suggests the explicit operation that would do the most
> equivalent thing. And perhaps raise a warning (using the warnings module)
> for things that might be unintuitively expensive.
> On Fri, Oct 26, 2018 at 12:15 Holden Karau  wrote:
>
>> Coming out of https://github.com/apache/spark/pull/21654 it was agreed
>> the helper methods in question made sense but there was some desire for a
>> plan as to which helper methods we should use.
>>
>> I'd like to purpose a light weight solution to start with for helper
>> methods that match either Pandas or general Python collection helper
>> methods:
>> 1) If the helper method doesn't collect the DataFrame back or force
>> evaluation to the driver then we should add it without discussion
>> 2) If the method forces evaluation this matches most obvious way that
>> would implemented then we should add it with a note in the docstring
>> 3) If the method does collect the DataFrame back to the driver and that
>> is the most obvious way it would implemented (e.g. calling list to get back
>> a list would have to collect the DataFrame) then we should add it with a
>> warning in the docstring
>> 4) If the method collects the DataFrame but a reasonable Python developer
>> wouldn't expect that behaviour not implementing the helper method would be
>> better
>>
>> What do folks think?
>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
> --
> --
> Cheers,
> Leif
>


Re: DataSourceV2 hangouts sync

2018-10-25 Thread Li Jin
Although I am not specifically involved in DSv2, I think having this kind
of meeting is definitely helpful to discuss, move certain effort forward
and keep people on the same page. Glad to see this kind of working group
happening.

On Thu, Oct 25, 2018 at 5:58 PM John Zhuge  wrote:

> Great idea!
>
> On Thu, Oct 25, 2018 at 1:10 PM Ryan Blue 
> wrote:
>
>> Hi everyone,
>>
>> There's been some great discussion for DataSourceV2 in the last few
>> months, but it has been difficult to resolve some of the discussions and I
>> don't think that we have a very clear roadmap for getting the work done.
>>
>> To coordinate better as a community, I'd like to start a regular sync-up
>> over google hangouts. We use this in the Parquet community to have more
>> effective community discussions about thorny technical issues and to get
>> aligned on an overall roadmap. It is really helpful in that community and I
>> think it would help us get DSv2 done more quickly.
>>
>> Here's how it works: people join the hangout, we go around the list to
>> gather topics, have about an hour-long discussion, and then send a summary
>> of the discussion to the dev list for anyone that couldn't participate.
>> That way we can move topics along, but we keep the broader community in the
>> loop as well for further discussion on the mailing list.
>>
>> I'll volunteer to set up the sync and send invites to anyone that wants
>> to attend. If you're interested, please reply with the email address you'd
>> like to put on the invite list (if there's a way to do this without
>> specific invites, let me know). Also for the first sync, please note what
>> times would work for you so we can try to account for people in different
>> time zones.
>>
>> For the first one, I was thinking some day next week (time TBD by those
>> interested) and starting off with a general roadmap discussion before
>> diving into specific technical topics.
>>
>> Thanks,
>>
>> rb
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>
> --
> John Zhuge
>


Re: [DISCUSS] PySpark Window UDF

2018-09-05 Thread Li Jin
Hello again!

I recently implemented a proof-of-concept implementation of proposal above.
I think the results are pretty exciting so I want to share my findings with
the community. I have implemented two variants of the pandas window UDF -
one that takes pandas.Series as input and one that takes numpy array as
input. I benchmarked with rolling mean on 1M doubles and here are some
results:

Spark SQL window function: 20s
Pandas variant: ~60s
Numpy variant: 10s
Numpy variant with numba: 4s

You can see the benchmark code here:
https://gist.github.com/icexelloss/845beb3d0d6bfc3d51b3c7419edf0dcb

I think the results are quite exciting because:
(1) numpy variant even outperforms the Spark SQL window function
(2) numpy variant with numba has the best performance as well as the
flexibility to allow users to write window functions in pure python

The Pandas variant is not bad either (1.5x faster than existing UDF with
collect_list) but the numpy variant definitely has much better performance.

So far all Pandas UDFs interacts with Pandas data structure rather than
numpy data structure, but the window UDF result might be a good reason to
open up numpy variants of Pandas UDFs. What do people think? I'd love to
hear community's feedbacks.


Links:
You can reproduce benchmark with numpy variant by using the branch:
https://github.com/icexelloss/spark/tree/window-udf-numpy

PR link:
https://github.com/apache/spark/pull/22305

On Wed, May 16, 2018 at 3:34 PM Li Jin  wrote:

> Hi All,
>
> I have been looking into leverage the Arrow and Pandas UDF work we have
> done so far for Window UDF in PySpark. I have done some investigation and
> believe there is a way to do PySpark window UDF efficiently.
>
> The basic idea is instead of passing each window to Python separately, we
> can pass a "batch of windows" as an Arrow Batch of rows + begin/end indices
> for each window (indices are computed on the Java side), and then rolling
> over the begin/end indices in Python and applies the UDF.
>
> I have written my investigation in more details here:
>
> https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#
>
> I think this is a pretty promising and hope to get some feedback from the
> community about this approach. Let's discuss! :)
>
> Li
>


Re: [DISCUSS] move away from python doctests

2018-08-29 Thread Li Jin
Hi Imran,

My understanding is that doctests and unittests are orthogonal - doctests
are used to make sure docstring examples are correct and are not meant to
replace unittests.
Functionalities are covered by unit tests to ensure correctness and
doctests are used to test the docstring, not the functionalities itself.

There are issues with doctests, for example, we cannot test arrow related
functions in doctest because of pyarrow is optional dependency, but I think
that's a separate issue.

Does this make sense?

Li

On Wed, Aug 29, 2018 at 6:35 PM Imran Rashid 
wrote:

> Hi,
>
> I'd like to propose that we move away from such heavy reliance on doctests
> in python, and move towards more traditional unit tests.  The main reason
> is that its hard to share test code in doc tests.  For example, I was just
> looking at
>
> https://github.com/apache/spark/commit/82c18c240a6913a917df3b55cc5e22649561c4dd
>  and wondering if we had any tests for some of the pyspark changes.
> SparkSession.createDataFrame has doctests, but those are just run with one
> standard spark configuration, which does not enable arrow.  Its hard to
> easily reuse that test, just with another spark context with a different
> conf.  Similarly I've wondered about reusing test cases but with
> local-cluster instead of local mode.  I feel like they also discourage
> writing a test which tries to get more exhaustive coverage on corner cases.
>
> I'm not saying we should stop using doctests -- I see why they're nice.  I
> just think they should really only be when you want that code snippet in
> the doc anyway, so you might as well test it.
>
> Admittedly, I'm not really a python-developer, so I could be totally wrong
> about the right way to author doctests -- pushback welcome!
>
> Thoughts?
>
> thanks,
> Imran
>


Re: [discuss][minor] impending python 3.x jenkins upgrade... 3.5.x? 3.6.x?

2018-08-20 Thread Li Jin
Thanks for looking into this Shane. If we can only have a single python 3
version, I agree 3.6 would be better than 3.5. Otherwise, ideally I think
it would be nice to test all supported 3.x versions (latest micros should
be fine).

On Mon, Aug 20, 2018 at 7:07 PM shane knapp  wrote:

> initially, i'd like to just choose one version to have the primary tests
> against, but i'm also not opposed to supporting more of a matrix.  the
> biggest problem i see w/this approach, however, is that of build monitoring
> and long-term ownership.  this is why we have a relatively restrictive
> current deployment.
>
> another thing i've been noticing during this project is that we have a lot
> of flaky tests...  for instance, i'm literally having every other build
> fail on my (relatively) up-to-date PRB fork:
> https://amplab.cs.berkeley.edu/jenkins/job/ubuntuSparkPRB/
>
> (i'm testing more than python here, otherwise i could just build a spark
> distro and run the python tests against that)
>
> i'll also set up a 3.6 env tomorrow and start testing against that.  i'm
> pretty confident about 3.5, tho.
>
> shane
>
> On Mon, Aug 20, 2018 at 11:33 AM, Bryan Cutler  wrote:
>
>> Thanks for looking into this Shane!  If we are choosing a single python
>> 3.x, I think 3.6 would be good. It might still be nice to test against
>> other versions too, so we can catch any issues. Is it possible to have more
>> exhaustive testing as part of a nightly or scheduled build? As a point of
>> reference for Python 3.6, Arrow is using this version for CI.
>>
>> Bryan
>>
>> On Sun, Aug 19, 2018 at 9:49 PM Hyukjin Kwon  wrote:
>>
>>> Actually Python 3.7 is released (
>>> https://www.python.org/downloads/release/python-370/) too and I fixed
>>> the compatibility issues accordingly -
>>> https://github.com/apache/spark/pull/21714
>>> There has been an issue for 3.6 (comparing to lower versions of Python
>>> including 3.5) - https://github.com/apache/spark/pull/16429
>>>
>>> I am not yet sure what's the best matrix for it actually. In case of R,
>>> we test lowest version in Jenkins and highest version via AppVeyor FWIW.
>>> I don't have a strong preference opinion on this since we have been
>>> having compatibility issues for each Python version.
>>>
>>>
>>> 2018년 8월 14일 (화) 오전 4:15, shane knapp 님이 작성:
>>>
 hey everyone!

 i was checking out the EOL/release cycle for python 3.5 and it looks
 like we'll have 3.5.6 released in early 2019.

 this got me to thinking:  instead of 3.5, what about 3.6?

 i looked around, and according to the 'docs' and 'collective wisdom of
 the internets', 3.5 and 3.6 should be fully backwards-compatible w/3.4.

 of course, this needs to be taken w/a grain of salt, as we're mostly
 focused on actual python package requirements, rather than worrying about
 core python functionality.

 thoughts?  comments?

 thanks in advance,

 shane
 --
 Shane Knapp
 UC Berkeley EECS Research / RISELab Staff Technical Lead
 https://rise.cs.berkeley.edu

>>>
>
>
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>


Re: code freeze and branch cut for Apache Spark 2.4

2018-08-10 Thread Li Jin
I agree with Byran. If it's acceptable to have another job to test with
Python 3.5 and pyarrow 0.10.0, I am leaning towards upgrading arrow.

Arrow 0.10.0 has tons of bug fixes and improves from 0.8.0, including
important memory leak fixes such as
https://issues.apache.org/jira/browse/ARROW-1973. I think releasing with
0.10.0 will improve the overall experience of arrow related features quite
bit.

I also think it's a good idea to test against newer Python versions. But I
don't know how difficult it is and whether or not it's feasible to resolve
that between branch cut and RC cut.

On Fri, Aug 10, 2018 at 5:44 PM, shane knapp  wrote:

> see:  https://github.com/apache/spark/pull/21939#issuecomment-412154343
>
> yes, i can set up a build.  have some Qs in the PR about building the
> spark package before running the python tests.
>
> On Fri, Aug 10, 2018 at 10:41 AM, Bryan Cutler  wrote:
>
>> I agree that we should hold off on the Arrow upgrade if it requires major
>> changes to our testing. I did have another thought that maybe we could just
>> add another job to test against Python 3.5 and pyarrow 0.10.0 and keep all
>> current testing the same? I'm not sure how doable that is right now and
>> don't want to make a ton of extra work, so no objections from me to hold
>> off on things for now.
>>
>> On Fri, Aug 10, 2018 at 9:48 AM, shane knapp  wrote:
>>
>>> On Fri, Aug 10, 2018 at 9:47 AM, Wenchen Fan 
>>> wrote:
>>>
 It seems safer to skip the arrow 0.10.0 upgrade for Spark 2.4 and leave
 it to Spark 3.0, so that we have more time to test. Any objections?

>>>
>>> none here.
>>>
>>> --
>>> Shane Knapp
>>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>>> https://rise.cs.berkeley.edu
>>>
>>
>>
>
>
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>


Re: [SPARK][SQL] Distributed createDataframe from many pandas DFs using Arrow

2018-07-08 Thread Li Jin
Hi Linar,

This seems useful. But perhaps reusing the same function name is better?

http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame

Currently createDataFrame takes an RDD of any kind of SQL data
representation(e.g. row, tuple, int, boolean, etc.), or list, or
pandas.DataFrame.

Perhaps we can support taking an RDD of *pandas.DataFrame *as the "data"
args too?

What do other people think.

Li

On Sun, Jul 8, 2018 at 1:13 PM, Linar Savion 
wrote:

> We've created a snippet that creates a Spark DF from a RDD of many pandas
> DFs in a distributed manner that does not require the driver to collect the
> entire dataset.
>
> Early tests show a performance improvement of x6-x10 over using
> pandasDF->Rows>sparkDF.
>
> I've seen that there are some open pull requests that change the way arrow
> serialization work, Should I open a pull request to add this functionality
> to SparkSession? (`createFromPandasDataframesRDD`)
>
> https://gist.github.com/linar-jether/7dd61ed6fa89098ab9c58a1ab428b2b5
>
> Thanks,
> Linar
>


Re: [SPARK-24579] SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks

2018-06-29 Thread Li Jin
Hi Xiangrui,

Thanks for sending this out. I have left some comments on the google doc:
https://docs.google.com/document/d/1dFOFV3LOu6deSNd8Ndp87-wsdxQqA9cnkuj35jUlrmQ/edit#heading=h.84jotgsrp6bj

Look forward to your response.

Li

On Mon, Jun 18, 2018 at 11:33 AM, Xiangrui Meng  wrote:

> Hi all,
>
> I posted a new SPIP on optimized data exchange between Spark and DL/AI
> frameworks at SPARK-24579
> . It took inputs from
> offline conversations with several Spark committers and contributors at
> Spark+AI summit conference. Please take a look and let me know your
> thoughts in JIRA comments. Thanks!
>
> Best,
> Xiangrui
> --
>
> Xiangrui Meng
>
> Software Engineer
>
> Databricks Inc. [image: http://databricks.com] 
>


Re: Missing HiveConf when starting PySpark from head

2018-06-14 Thread Li Jin
Sounds good. Thanks all for the quick reply.

https://issues.apache.org/jira/browse/SPARK-24563


On Thu, Jun 14, 2018 at 12:19 PM, Xiao Li  wrote:

> Thanks for catching this. Please feel free to submit a PR. I do not think
> Vanzin wants to introduce the behavior changes in that PR. We should do the
> code review more carefully.
>
> Xiao
>
> 2018-06-14 9:18 GMT-07:00 Li Jin :
>
>> Are there objection to restore the behavior for PySpark users? I am happy
>> to submit a patch.
>>
>> On Thu, Jun 14, 2018 at 12:15 PM Reynold Xin  wrote:
>>
>>> The behavior change is not good...
>>>
>>> On Thu, Jun 14, 2018 at 9:05 AM Li Jin  wrote:
>>>
>>>> Ah, looks like it's this change:
>>>> https://github.com/apache/spark/commit/b3417b731d4e323398a0d
>>>> 7ec6e86405f4464f4f9#diff-3b5463566251d5b09fd328738a9e9bc5
>>>>
>>>> It seems strange that by default Spark doesn't build with Hive but by
>>>> default PySpark requires it...
>>>>
>>>> This might also be a behavior change to PySpark users that build Spark
>>>> without Hive. The old behavior is "fall back to non-hive support" and the
>>>> new behavior is "program won't start".
>>>>
>>>> On Thu, Jun 14, 2018 at 11:51 AM, Sean Owen  wrote:
>>>>
>>>>> I think you would have to build with the 'hive' profile? but if so
>>>>> that would have been true for a while now.
>>>>>
>>>>>
>>>>> On Thu, Jun 14, 2018 at 10:38 AM Li Jin  wrote:
>>>>>
>>>>>> Hey all,
>>>>>>
>>>>>> I just did a clean checkout of github.com/apache/spark but failed to
>>>>>> start PySpark, this is what I did:
>>>>>>
>>>>>> git clone g...@github.com:apache/spark.git; cd spark; build/sbt
>>>>>> package; bin/pyspark
>>>>>>
>>>>>> And got this exception:
>>>>>>
>>>>>> (spark-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark
>>>>>>
>>>>>> Python 3.6.3 |Anaconda, Inc.| (default, Nov  8 2017, 18:10:31)
>>>>>>
>>>>>> [GCC 4.2.1 Compatible Clang 4.0.1 (tags/RELEASE_401/final)] on darwin
>>>>>>
>>>>>> Type "help", "copyright", "credits" or "license" for more information.
>>>>>>
>>>>>> 18/06/14 11:34:14 WARN NativeCodeLoader: Unable to load native-hadoop
>>>>>> library for your platform... using builtin-java classes where applicable
>>>>>>
>>>>>> Using Spark's default log4j profile: org/apache/spark/log4j-default
>>>>>> s.properties
>>>>>>
>>>>>> Setting default log level to "WARN".
>>>>>>
>>>>>> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
>>>>>> setLogLevel(newLevel).
>>>>>>
>>>>>> /Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py:45:
>>>>>> UserWarning: Failed to initialize Spark session.
>>>>>>
>>>>>>   warnings.warn("Failed to initialize Spark session.")
>>>>>>
>>>>>> Traceback (most recent call last):
>>>>>>
>>>>>>   File 
>>>>>> "/Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py",
>>>>>> line 41, in 
>>>>>>
>>>>>> spark = SparkSession._create_shell_session()
>>>>>>
>>>>>>   File 
>>>>>> "/Users/icexelloss/workspace/upstream2/spark/python/pyspark/sql/session.py",
>>>>>> line 564, in _create_shell_session
>>>>>>
>>>>>> SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()
>>>>>>
>>>>>> TypeError: 'JavaPackage' object is not callable
>>>>>>
>>>>>> I also tried to delete hadoop deps from my ivy2 cache and reinstall
>>>>>> them but no luck. I wonder:
>>>>>>
>>>>>>
>>>>>>1. I have not seen this before, could this be caused by recent
>>>>>>change to head?
>>>>>>2. Am I doing something wrong in the build process?
>>>>>>
>>>>>>
>>>>>> Thanks much!
>>>>>> Li
>>>>>>
>>>>>>
>>>>
>


Re: Missing HiveConf when starting PySpark from head

2018-06-14 Thread Li Jin
Are there objection to restore the behavior for PySpark users? I am happy
to submit a patch.
On Thu, Jun 14, 2018 at 12:15 PM Reynold Xin  wrote:

> The behavior change is not good...
>
> On Thu, Jun 14, 2018 at 9:05 AM Li Jin  wrote:
>
>> Ah, looks like it's this change:
>>
>> https://github.com/apache/spark/commit/b3417b731d4e323398a0d7ec6e86405f4464f4f9#diff-3b5463566251d5b09fd328738a9e9bc5
>>
>> It seems strange that by default Spark doesn't build with Hive but by
>> default PySpark requires it...
>>
>> This might also be a behavior change to PySpark users that build Spark
>> without Hive. The old behavior is "fall back to non-hive support" and the
>> new behavior is "program won't start".
>>
>> On Thu, Jun 14, 2018 at 11:51 AM, Sean Owen  wrote:
>>
>>> I think you would have to build with the 'hive' profile? but if so that
>>> would have been true for a while now.
>>>
>>>
>>> On Thu, Jun 14, 2018 at 10:38 AM Li Jin  wrote:
>>>
>>>> Hey all,
>>>>
>>>> I just did a clean checkout of github.com/apache/spark but failed to
>>>> start PySpark, this is what I did:
>>>>
>>>> git clone g...@github.com:apache/spark.git; cd spark; build/sbt
>>>> package; bin/pyspark
>>>>
>>>> And got this exception:
>>>>
>>>> (spark-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark
>>>>
>>>> Python 3.6.3 |Anaconda, Inc.| (default, Nov  8 2017, 18:10:31)
>>>>
>>>> [GCC 4.2.1 Compatible Clang 4.0.1 (tags/RELEASE_401/final)] on darwin
>>>>
>>>> Type "help", "copyright", "credits" or "license" for more information.
>>>>
>>>> 18/06/14 11:34:14 WARN NativeCodeLoader: Unable to load native-hadoop
>>>> library for your platform... using builtin-java classes where applicable
>>>>
>>>> Using Spark's default log4j profile:
>>>> org/apache/spark/log4j-defaults.properties
>>>>
>>>> Setting default log level to "WARN".
>>>>
>>>> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
>>>> setLogLevel(newLevel).
>>>>
>>>> /Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py:45:
>>>> UserWarning: Failed to initialize Spark session.
>>>>
>>>>   warnings.warn("Failed to initialize Spark session.")
>>>>
>>>> Traceback (most recent call last):
>>>>
>>>>   File
>>>> "/Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py", line
>>>> 41, in 
>>>>
>>>> spark = SparkSession._create_shell_session()
>>>>
>>>>   File
>>>> "/Users/icexelloss/workspace/upstream2/spark/python/pyspark/sql/session.py",
>>>> line 564, in _create_shell_session
>>>>
>>>> SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()
>>>>
>>>> TypeError: 'JavaPackage' object is not callable
>>>>
>>>> I also tried to delete hadoop deps from my ivy2 cache and reinstall
>>>> them but no luck. I wonder:
>>>>
>>>>
>>>>1. I have not seen this before, could this be caused by recent
>>>>change to head?
>>>>2. Am I doing something wrong in the build process?
>>>>
>>>>
>>>> Thanks much!
>>>> Li
>>>>
>>>>
>>


Re: Missing HiveConf when starting PySpark from head

2018-06-14 Thread Li Jin
Ah, looks like it's this change:
https://github.com/apache/spark/commit/b3417b731d4e323398a0d7ec6e86405f4464f4f9#diff-3b5463566251d5b09fd328738a9e9bc5

It seems strange that by default Spark doesn't build with Hive but by
default PySpark requires it...

This might also be a behavior change to PySpark users that build Spark
without Hive. The old behavior is "fall back to non-hive support" and the
new behavior is "program won't start".

On Thu, Jun 14, 2018 at 11:51 AM, Sean Owen  wrote:

> I think you would have to build with the 'hive' profile? but if so that
> would have been true for a while now.
>
>
> On Thu, Jun 14, 2018 at 10:38 AM Li Jin  wrote:
>
>> Hey all,
>>
>> I just did a clean checkout of github.com/apache/spark but failed to
>> start PySpark, this is what I did:
>>
>> git clone g...@github.com:apache/spark.git; cd spark; build/sbt package;
>> bin/pyspark
>>
>> And got this exception:
>>
>> (spark-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark
>>
>> Python 3.6.3 |Anaconda, Inc.| (default, Nov  8 2017, 18:10:31)
>>
>> [GCC 4.2.1 Compatible Clang 4.0.1 (tags/RELEASE_401/final)] on darwin
>>
>> Type "help", "copyright", "credits" or "license" for more information.
>>
>> 18/06/14 11:34:14 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>>
>> Using Spark's default log4j profile: org/apache/spark/log4j-
>> defaults.properties
>>
>> Setting default log level to "WARN".
>>
>> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
>> setLogLevel(newLevel).
>>
>> /Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py:45:
>> UserWarning: Failed to initialize Spark session.
>>
>>   warnings.warn("Failed to initialize Spark session.")
>>
>> Traceback (most recent call last):
>>
>>   File "/Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py",
>> line 41, in 
>>
>> spark = SparkSession._create_shell_session()
>>
>>   File 
>> "/Users/icexelloss/workspace/upstream2/spark/python/pyspark/sql/session.py",
>> line 564, in _create_shell_session
>>
>> SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()
>>
>> TypeError: 'JavaPackage' object is not callable
>>
>> I also tried to delete hadoop deps from my ivy2 cache and reinstall them
>> but no luck. I wonder:
>>
>>
>>1. I have not seen this before, could this be caused by recent change
>>to head?
>>2. Am I doing something wrong in the build process?
>>
>>
>> Thanks much!
>> Li
>>
>>


Re: Missing HiveConf when starting PySpark from head

2018-06-14 Thread Li Jin
I can work around by using:

bin/pyspark --conf spark.sql.catalogImplementation=in-memory

now, but still wonder what's going on with HiveConf..

On Thu, Jun 14, 2018 at 11:37 AM, Li Jin  wrote:

> Hey all,
>
> I just did a clean checkout of github.com/apache/spark but failed to
> start PySpark, this is what I did:
>
> git clone g...@github.com:apache/spark.git; cd spark; build/sbt package;
> bin/pyspark
>
> And got this exception:
>
> (spark-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark
>
> Python 3.6.3 |Anaconda, Inc.| (default, Nov  8 2017, 18:10:31)
>
> [GCC 4.2.1 Compatible Clang 4.0.1 (tags/RELEASE_401/final)] on darwin
>
> Type "help", "copyright", "credits" or "license" for more information.
>
> 18/06/14 11:34:14 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
> Using Spark's default log4j profile: org/apache/spark/log4j-
> defaults.properties
>
> Setting default log level to "WARN".
>
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
>
> /Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py:45:
> UserWarning: Failed to initialize Spark session.
>
>   warnings.warn("Failed to initialize Spark session.")
>
> Traceback (most recent call last):
>
>   File "/Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py",
> line 41, in 
>
> spark = SparkSession._create_shell_session()
>
>   File 
> "/Users/icexelloss/workspace/upstream2/spark/python/pyspark/sql/session.py",
> line 564, in _create_shell_session
>
> SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()
>
> TypeError: 'JavaPackage' object is not callable
>
> I also tried to delete hadoop deps from my ivy2 cache and reinstall them
> but no luck. I wonder:
>
>
>1. I have not seen this before, could this be caused by recent change
>to head?
>2. Am I doing something wrong in the build process?
>
>
> Thanks much!
> Li
>
>


Missing HiveConf when starting PySpark from head

2018-06-14 Thread Li Jin
Hey all,

I just did a clean checkout of github.com/apache/spark but failed to start
PySpark, this is what I did:

git clone g...@github.com:apache/spark.git; cd spark; build/sbt package;
bin/pyspark

And got this exception:

(spark-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark

Python 3.6.3 |Anaconda, Inc.| (default, Nov  8 2017, 18:10:31)

[GCC 4.2.1 Compatible Clang 4.0.1 (tags/RELEASE_401/final)] on darwin

Type "help", "copyright", "credits" or "license" for more information.

18/06/14 11:34:14 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).

/Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py:45:
UserWarning: Failed to initialize Spark session.

  warnings.warn("Failed to initialize Spark session.")

Traceback (most recent call last):

  File
"/Users/icexelloss/workspace/upstream2/spark/python/pyspark/shell.py", line
41, in 

spark = SparkSession._create_shell_session()

  File
"/Users/icexelloss/workspace/upstream2/spark/python/pyspark/sql/session.py",
line 564, in _create_shell_session

SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()

TypeError: 'JavaPackage' object is not callable

I also tried to delete hadoop deps from my ivy2 cache and reinstall them
but no luck. I wonder:


   1. I have not seen this before, could this be caused by recent change to
   head?
   2. Am I doing something wrong in the build process?


Thanks much!
Li


Re: Optimizer rule ConvertToLocalRelation causes expressions to be eager-evaluated in Planning phase

2018-06-08 Thread Li Jin
Sorry I am confused now... My UDF gets executed for each row anyway
(because I am doing with column and want to execute the UDF with each row).
The difference is that with the optimization "ConvertToLocalRelation" it
gets executed for each row on the driver in the optimization stage?

On Fri, Jun 8, 2018 at 3:57 PM, Herman van Hövell tot Westerflier <
her...@databricks.com> wrote:

> But that is still cheaper than executing that expensive UDF for each row
> in your dataset right?
>
> On Fri, Jun 8, 2018 at 9:51 PM Li Jin  wrote:
>
>> I see. Thanks for the clarification. It's not a a big issue but I am
>> surprised my UDF can be executed in planning phase. If my UDF is doing
>> something expensive it could get weird.
>>
>>
>>
>> On Fri, Jun 8, 2018 at 3:44 PM, Reynold Xin  wrote:
>>
>>> But from the user's perspective, optimization is not run right? So it is
>>> still lazy.
>>>
>>>
>>> On Fri, Jun 8, 2018 at 12:35 PM Li Jin  wrote:
>>>
>>>> Hi All,
>>>>
>>>> Sorry for the long email title. I am a bit surprised to find that the
>>>> current optimizer rule "ConvertToLocalRelation" causes expressions to be
>>>> eager-evaluated in planning phase, this can be demonstrated with the
>>>> following code:
>>>>
>>>> scala> val myUDF = udf((x: String) => { println("UDF evaled"); "result"
>>>> })
>>>>
>>>> myUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
>>>> UserDefinedFunction(,StringType,Some(List(StringType)))
>>>>
>>>>
>>>> scala> val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))
>>>>
>>>> df: org.apache.spark.sql.DataFrame = [UDF(s): string]
>>>>
>>>>
>>>> scala> println(df.queryExecution.optimizedPlan)
>>>>
>>>> UDF evaled
>>>>
>>>> LocalRelation [UDF(s)#9]
>>>>
>>>>  This is somewhat unexpected to me because of Spark's lazy execution
>>>> model.
>>>>
>>>> I am wondering if this behavior is by design?
>>>>
>>>> Thanks!
>>>> Li
>>>>
>>>>
>>>>
>>


Re: Optimizer rule ConvertToLocalRelation causes expressions to be eager-evaluated in Planning phase

2018-06-08 Thread Li Jin
I see. Thanks for the clarification. It's not a a big issue but I am
surprised my UDF can be executed in planning phase. If my UDF is doing
something expensive it could get weird.



On Fri, Jun 8, 2018 at 3:44 PM, Reynold Xin  wrote:

> But from the user's perspective, optimization is not run right? So it is
> still lazy.
>
>
> On Fri, Jun 8, 2018 at 12:35 PM Li Jin  wrote:
>
>> Hi All,
>>
>> Sorry for the long email title. I am a bit surprised to find that the
>> current optimizer rule "ConvertToLocalRelation" causes expressions to be
>> eager-evaluated in planning phase, this can be demonstrated with the
>> following code:
>>
>> scala> val myUDF = udf((x: String) => { println("UDF evaled"); "result" })
>>
>> myUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
>> UserDefinedFunction(,StringType,Some(List(StringType)))
>>
>>
>> scala> val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))
>>
>> df: org.apache.spark.sql.DataFrame = [UDF(s): string]
>>
>>
>> scala> println(df.queryExecution.optimizedPlan)
>>
>> UDF evaled
>>
>> LocalRelation [UDF(s)#9]
>>
>>  This is somewhat unexpected to me because of Spark's lazy execution
>> model.
>>
>> I am wondering if this behavior is by design?
>>
>> Thanks!
>> Li
>>
>>
>>


Optimizer rule ConvertToLocalRelation causes expressions to be eager-evaluated in Planning phase

2018-06-08 Thread Li Jin
Hi All,

Sorry for the long email title. I am a bit surprised to find that the
current optimizer rule "ConvertToLocalRelation" causes expressions to be
eager-evaluated in planning phase, this can be demonstrated with the
following code:

scala> val myUDF = udf((x: String) => { println("UDF evaled"); "result" })

myUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
UserDefinedFunction(,StringType,Some(List(StringType)))


scala> val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))

df: org.apache.spark.sql.DataFrame = [UDF(s): string]


scala> println(df.queryExecution.optimizedPlan)

UDF evaled

LocalRelation [UDF(s)#9]

 This is somewhat unexpected to me because of Spark's lazy execution model.

I am wondering if this behavior is by design?

Thanks!
Li


Re: MatrixUDT and VectorUDT in Spark ML

2018-05-31 Thread Li Jin
Please see https://issues.apache.org/jira/browse/SPARK-24258
On Wed, May 30, 2018 at 10:40 PM Dongjin Lee  wrote:

> How is this issue going? Is there any Jira ticket about this?
>
> Thanks,
> Dongjin
>
> On Sat, Mar 24, 2018 at 1:39 PM, Himanshu Mohan <
> himanshu.mo...@aexp.com.invalid> wrote:
>
>> I agree
>>
>>
>>
>>
>>
>>
>>
>> Thanks
>>
>> Himanshu
>>
>>
>>
>> *From:* Li Jin [mailto:ice.xell...@gmail.com]
>> *Sent:* Friday, March 23, 2018 8:24 PM
>> *To:* dev 
>> *Subject:* MatrixUDT and VectorUDT in Spark ML
>>
>>
>>
>> Hi All,
>>
>>
>>
>> I came across these two types MatrixUDT and VectorUDF in Spark ML when
>> doing feature extraction and preprocessing with PySpark. However, when
>> trying to do some basic operations, such as vector multiplication and
>> matrix multiplication, I had to go down to Python UDF.
>>
>>
>>
>> It seems to be it would be very useful to have built-in operators on
>> these types just like first class Spark SQL types, e.g.,
>>
>>
>>
>> df.withColumn('v', df.matrix_column * df.vector_column)
>>
>>
>>
>> I wonder what are other people's thoughts on this?
>>
>>
>>
>> Li
>>
>> --
>> American Express made the following annotations
>> --
>>
>> "This message and any attachments are solely for the intended recipient
>> and may contain confidential or privileged information. If you are not the
>> intended recipient, any disclosure, copying, use, or distribution of the
>> information included in this message and any attachments is prohibited. If
>> you have received this communication in error, please notify us by reply
>> e-mail and immediately and permanently delete this message and any
>> attachments. Thank you."
>>
>> American Express a ajouté le commentaire suivant le
>> Ce courrier et toute pièce jointe qu'il contient sont réservés au seul
>> destinataire indiqué et peuvent renfermer des renseignements confidentiels
>> et privilégiés. Si vous n'êtes pas le destinataire prévu, toute
>> divulgation, duplication, utilisation ou distribution du courrier ou de
>> toute pièce jointe est interdite. Si vous avez reçu cette communication par
>> erreur, veuillez nous en aviser par courrier et détruire immédiatement le
>> courrier et les pièces jointes. Merci.
>> --
>>
>>
>
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
>
> *github:  <http://goog_969573159/>github.com/dongjinleekr
> <http://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
> <http://kr.linkedin.com/in/dongjinleekr>slideshare: 
> www.slideshare.net/dongjinleekr
> <http://www.slideshare.net/dongjinleekr>*
>


Re: [VOTE] Spark 2.3.1 (RC2)

2018-05-24 Thread Li Jin
I'd like to bring https://issues.apache.org/jira/browse/SPARK-24373 to
people's attention cause it could be a regression from 2.2.

I will leave it to more experienced people to decide whether this should be
a blocker or not.

On Wed, May 23, 2018 at 12:54 PM, Marcelo Vanzin 
wrote:

> Sure. Also, I'd appreciate if these bugs were properly triaged and
> targeted, so that we could avoid creating RCs when we know there are
> blocking bugs that will prevent the RC vote from succeeding.
>
> On Wed, May 23, 2018 at 9:02 AM, Xiao Li  wrote:
> > -1
> >
> > Yeah, we should fix it in Spark 2.3.1.
> > https://issues.apache.org/jira/browse/SPARK-24257 is a correctness bug.
> The
> > PR can be merged soon. Thus, let us have another RC?
> >
> > Thanks,
> >
> > Xiao
> >
> >
> > 2018-05-23 8:04 GMT-07:00 chenliang613 :
> >>
> >> Hi
> >>
> >> Agree with Wenchen, it is better to fix this issue.
> >>
> >> Regards
> >> Liang
> >>
> >>
> >> cloud0fan wrote
> >> > We found a critical bug in tungsten that can lead to silent data
> >> > corruption: https://github.com/apache/spark/pull/21311
> >> >
> >> > This is a long-standing bug that starts with Spark 2.0(not a
> >> > regression),
> >> > but since we are going to release 2.3.1, I think it's a good chance to
> >> > include this fix.
> >> >
> >> > We will also backport this fix to Spark 2.0, 2.1, 2.2, and then we can
> >> > discuss if we should do a new release for 2.0, 2.1, 2.2 later.
> >> >
> >> > Thanks,
> >> > Wenchen
> >> >
> >> > On Wed, May 23, 2018 at 9:54 PM, Sean Owen 
> >>
> >> > srowen@
> >>
> >> >  wrote:
> >> >
> >> >> +1 Same result for me as with RC1.
> >> >>
> >> >>
> >> >> On Tue, May 22, 2018 at 2:45 PM Marcelo Vanzin 
> >>
> >> > vanzin@
> >>
> >> > 
> >> >> wrote:
> >> >>
> >> >>> Please vote on releasing the following candidate as Apache Spark
> >> >>> version
> >> >>> 2.3.1.
> >> >>>
> >> >>> The vote is open until Friday, May 25, at 20:00 UTC and passes if
> >> >>> at least 3 +1 PMC votes are cast.
> >> >>>
> >> >>> [ ] +1 Release this package as Apache Spark 2.3.1
> >> >>> [ ] -1 Do not release this package because ...
> >> >>>
> >> >>> To learn more about Apache Spark, please see
> http://spark.apache.org/
> >> >>>
> >> >>> The tag to be voted on is v2.3.1-rc2 (commit 93258d80):
> >> >>> https://github.com/apache/spark/tree/v2.3.1-rc2
> >> >>>
> >> >>> The release files, including signatures, digests, etc. can be found
> >> >>> at:
> >> >>> https://dist.apache.org/repos/dist/dev/spark/v2.3.1-rc2-bin/
> >> >>>
> >> >>> Signatures used for Spark RCs can be found in this file:
> >> >>> https://dist.apache.org/repos/dist/dev/spark/KEYS
> >> >>>
> >> >>> The staging repository for this release can be found at:
> >> >>>
> >> >>> https://repository.apache.org/content/repositories/
> orgapachespark-1270/
> >> >>>
> >> >>> The documentation corresponding to this release can be found at:
> >> >>> https://dist.apache.org/repos/dist/dev/spark/v2.3.1-rc2-docs/
> >> >>>
> >> >>> The list of bug fixes going into 2.3.1 can be found at the following
> >> >>> URL:
> >> >>> https://issues.apache.org/jira/projects/SPARK/versions/12342432
> >> >>>
> >> >>> FAQ
> >> >>>
> >> >>> =
> >> >>> How can I help test this release?
> >> >>> =
> >> >>>
> >> >>> If you are a Spark user, you can help us test this release by taking
> >> >>> an existing Spark workload and running on this release candidate,
> then
> >> >>> reporting any regressions.
> >> >>>
> >> >>> If you're working in PySpark you can set up a virtual env and
> install
> >> >>> the current RC and see if anything important breaks, in the
> Java/Scala
> >> >>> you can add the staging repository to your projects resolvers and
> test
> >> >>> with the RC (make sure to clean up the artifact cache before/after
> so
> >> >>> you don't end up building with a out of date RC going forward).
> >> >>>
> >> >>> ===
> >> >>> What should happen to JIRA tickets still targeting 2.3.1?
> >> >>> ===
> >> >>>
> >> >>> The current list of open tickets targeted at 2.3.1 can be found at:
> >> >>> https://s.apache.org/Q3Uo
> >> >>>
> >> >>> Committers should look at those and triage. Extremely important bug
> >> >>> fixes, documentation, and API tweaks that impact compatibility
> should
> >> >>> be worked on immediately. Everything else please retarget to an
> >> >>> appropriate release.
> >> >>>
> >> >>> ==
> >> >>> But my bug isn't fixed?
> >> >>> ==
> >> >>>
> >> >>> In order to make timely releases, we will typically not hold the
> >> >>> release unless the bug in question is a regression from the previous
> >> >>> release. That being said, if there is something which is a
> regression
> >> >>> that has not been correctly targeted please ping me or a committer
> to
> >> >>> help target the issue.
> >> >>>
> >> >>>

[DISCUSS] PySpark Window UDF

2018-05-16 Thread Li Jin
Hi All,

I have been looking into leverage the Arrow and Pandas UDF work we have
done so far for Window UDF in PySpark. I have done some investigation and
believe there is a way to do PySpark window UDF efficiently.

The basic idea is instead of passing each window to Python separately, we
can pass a "batch of windows" as an Arrow Batch of rows + begin/end indices
for each window (indices are computed on the Java side), and then rolling
over the begin/end indices in Python and applies the UDF.

I have written my investigation in more details here:
https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#

I think this is a pretty promising and hope to get some feedback from the
community about this approach. Let's discuss! :)

Li


Re: Clarify window behavior in Spark SQL

2018-04-03 Thread Li Jin
Thanks all for the explanation. I am happy to update the API doc.

https://issues.apache.org/jira/browse/SPARK-23861

On Tue, Apr 3, 2018 at 8:54 PM, Reynold Xin <r...@databricks.com> wrote:

> Ah ok. Thanks for commenting. Everyday I learn something new about SQL.
>
> For others to follow, SQL Server has a good explanation of the behavior:
> https://docs.microsoft.com/en-us/sql/t-sql/queries
> /select-over-clause-transact-sql
>
>
> Can somebody (Li?) update the API documentation to specify the gotchas, in
> case users are not familiar with SQL window function semantics?
>
>
>
> General Remarks
> <https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql#general-remarks>
>
> More than one window function can be used in a single query with a single
> FROM clause. The OVER clause for each function can differ in partitioning
> and ordering.
>
> If PARTITION BY is not specified, the function treats all rows of the
> query result set as a single group.
> Important!
> <https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql#important>
>
> If ROWS/RANGE is specified and  is used for
>  (short syntax) then this specification is used for
> the window frame boundary starting point and CURRENT ROW is used for the
> boundary ending point. For example “ROWS 5 PRECEDING” is equal to “ROWS
> BETWEEN 5 PRECEDING AND CURRENT ROW”.
>
> Note+
>
> If ORDER BY is not specified entire partition is used for a window frame.
> This applies only to functions that do not require ORDER BY clause. If
> ROWS/RANGE is not specified but ORDER BY is specified, RANGE UNBOUNDED
> PRECEDING AND CURRENT ROW is used as default for window frame. This applies
> only to functions that have can accept optional ROWS/RANGE specification.
> For example, ranking functions cannot accept ROWS/RANGE, therefore this
> window frame is not applied even though ORDER BY is present and ROWS/RANGE
> is not.
>
>
>
>
>
> On Tue, Apr 3, 2018 at 5:50 PM, Xingbo Jiang <jiangxb1...@gmail.com>
> wrote:
>
>> This is actually by design, without a `ORDER BY` clause, all rows are
>> considered as the peer row of the current row, which means that the frame
>> is effectively the entire partition. This behavior follows the window
>> syntax of PGSQL.
>> You can refer to the comment by yhuai: https://github.com/apac
>> he/spark/pull/5604#discussion_r157931911
>> :)
>>
>> 2018-04-04 6:27 GMT+08:00 Reynold Xin <r...@databricks.com>:
>>
>>> Do other (non-Hive) SQL systems do the same thing?
>>>
>>> On Tue, Apr 3, 2018 at 3:16 PM, Herman van Hövell tot Westerflier <
>>> her...@databricks.com> wrote:
>>>
>>>> This is something we inherited from Hive: https://cwiki.apache.org
>>>> /confluence/display/Hive/LanguageManual+WindowingAndAnalytics
>>>>
>>>> When ORDER BY is specified with missing WINDOW clause, the WINDOW
>>>>> specification defaults to RANGE BETWEEN UNBOUNDED PRECEDING AND
>>>>> CURRENT ROW.
>>>>
>>>> When both ORDER BY and WINDOW clauses are missing, the WINDOW
>>>>> specification defaults to ROW BETWEEN UNBOUNDED PRECEDING AND
>>>>> UNBOUNDED FOLLOWING.
>>>>
>>>>
>>>> It sort of makes sense if you think about it. If there is no ordering
>>>> there is no way to have a bound frame. If there is ordering we default to
>>>> the most commonly used deterministic frame.
>>>>
>>>>
>>>> On Tue, Apr 3, 2018 at 11:09 PM, Reynold Xin <r...@databricks.com>
>>>> wrote:
>>>>
>>>>> Seems like a bug.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 3, 2018 at 1:26 PM, Li Jin <ice.xell...@gmail.com> wrote:
>>>>>
>>>>>> Hi Devs,
>>>>>>
>>>>>> I am seeing some behavior with window functions that is a bit
>>>>>> unintuitive and would like to get some clarification.
>>>>>>
>>>>>> When using aggregation function with window, the frame boundary seems
>>>>>> to change depending on the order of the window.
>>>>>>
>>>>>> Example:
>>>>>> (1)
>>>>>>
>>>>>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>>>>>
>>>>>> w1 = Window.partitionBy('id')
>>>>>>
>>>>>> df.withColumn('v2', mean(df.v).over(w1)).show()
>>>>>

Re: Clarify window behavior in Spark SQL

2018-04-03 Thread Li Jin
Here is the original code and comments:
https://github.com/apache/spark/commit/b6b50efc854f298d5b3e11c05dca995a85bec962#diff-4a8f00ca33a80744965463dcc6662c75L277

Seems this is intentional. Although I am not really sure why - maybe to
match other SQL systems behavior?

On Tue, Apr 3, 2018 at 5:09 PM, Reynold Xin <r...@databricks.com> wrote:

> Seems like a bug.
>
>
>
> On Tue, Apr 3, 2018 at 1:26 PM, Li Jin <ice.xell...@gmail.com> wrote:
>
>> Hi Devs,
>>
>> I am seeing some behavior with window functions that is a bit unintuitive
>> and would like to get some clarification.
>>
>> When using aggregation function with window, the frame boundary seems to
>> change depending on the order of the window.
>>
>> Example:
>> (1)
>>
>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>
>> w1 = Window.partitionBy('id')
>>
>> df.withColumn('v2', mean(df.v).over(w1)).show()
>>
>> +---+---+---+
>>
>> | id|  v| v2|
>>
>> +---+---+---+
>>
>> |  0|  1|2.0|
>>
>> |  0|  2|2.0|
>>
>> |  0|  3|2.0|
>>
>> +---+---+---+
>>
>> (2)
>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>
>> w2 = Window.partitionBy('id').orderBy('v')
>>
>> df.withColumn('v2', mean(df.v).over(w2)).show()
>>
>> +---+---+---+
>>
>> | id|  v| v2|
>>
>> +---+---+---+
>>
>> |  0|  1|1.0|
>>
>> |  0|  2|1.5|
>>
>> |  0|  3|2.0|
>>
>> +---+---+---+
>>
>> Seems like orderBy('v') in the example (2) also changes the frame
>> boundaries from (
>>
>> unboundedPreceding, unboundedFollowing) to (unboundedPreceding,
>> currentRow).
>>
>>
>> I found this behavior a bit unintuitive. I wonder if this behavior is by
>> design and if so, what's the specific rule that orderBy() interacts with
>> frame boundaries?
>>
>>
>> Thanks,
>>
>> Li
>>
>>
>


Clarify window behavior in Spark SQL

2018-04-03 Thread Li Jin
Hi Devs,

I am seeing some behavior with window functions that is a bit unintuitive
and would like to get some clarification.

When using aggregation function with window, the frame boundary seems to
change depending on the order of the window.

Example:
(1)

df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')

w1 = Window.partitionBy('id')

df.withColumn('v2', mean(df.v).over(w1)).show()

+---+---+---+

| id|  v| v2|

+---+---+---+

|  0|  1|2.0|

|  0|  2|2.0|

|  0|  3|2.0|

+---+---+---+

(2)
df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')

w2 = Window.partitionBy('id').orderBy('v')

df.withColumn('v2', mean(df.v).over(w2)).show()

+---+---+---+

| id|  v| v2|

+---+---+---+

|  0|  1|1.0|

|  0|  2|1.5|

|  0|  3|2.0|

+---+---+---+

Seems like orderBy('v') in the example (2) also changes the frame
boundaries from (

unboundedPreceding, unboundedFollowing) to (unboundedPreceding, currentRow).


I found this behavior a bit unintuitive. I wonder if this behavior is by
design and if so, what's the specific rule that orderBy() interacts with
frame boundaries?


Thanks,

Li


MatrixUDT and VectorUDT in Spark ML

2018-03-23 Thread Li Jin
Hi All,

I came across these two types MatrixUDT and VectorUDF in Spark ML when
doing feature extraction and preprocessing with PySpark. However, when
trying to do some basic operations, such as vector multiplication and
matrix multiplication, I had to go down to Python UDF.

It seems to be it would be very useful to have built-in operators on these
types just like first class Spark SQL types, e.g.,

df.withColumn('v', df.matrix_column * df.vector_column)

I wonder what are other people's thoughts on this?

Li


Re: Time Series Functionality with Spark

2018-03-12 Thread Li Jin
Thanks for the pointer!

On Mon, Mar 12, 2018 at 1:40 PM, Sean Owen <sro...@gmail.com> wrote:

> (There was also https://github.com/sryza/spark-timeseries -- might be
> another point of reference for you.)
>
> On Mon, Mar 12, 2018 at 10:33 AM Li Jin <ice.xell...@gmail.com> wrote:
>
>> Hi All,
>>
>> This is Li Jin. We (me and my fellow colleagues at Two Sigma) have been
>> using Spark for time series analysis for the past two years and it has been
>> a success to scale up our time series analysis.
>>
>> Recently, we start a conversation with Reynold about potential
>> opportunities to collaborate and improve time series functionalities with
>> Spark in general. For that, first we'd like to get some feedbacks from the
>> community w.r.t. requirements for time series analysis.
>>
>> I have created a doc here to cover the what I think are the most common
>> patterns for time series analysis:
>> https://docs.google.com/document/d/1j69F4LLjetPykdfzedDN-EmP2f-I-
>> 5MPvVY4fugp2xs/edit?usp=sharing
>>
>> If you have needs/use cases for time series analysis with Spark, we'd
>> appreciate your feedback.
>>
>> Thank you
>>
>


Time Series Functionality with Spark

2018-03-12 Thread Li Jin
Hi All,

This is Li Jin. We (me and my fellow colleagues at Two Sigma) have been
using Spark for time series analysis for the past two years and it has been
a success to scale up our time series analysis.

Recently, we start a conversation with Reynold about potential
opportunities to collaborate and improve time series functionalities with
Spark in general. For that, first we'd like to get some feedbacks from the
community w.r.t. requirements for time series analysis.

I have created a doc here to cover the what I think are the most common
patterns for time series analysis:
https://docs.google.com/document/d/1j69F4LLjetPykdfzedDN-EmP2f-I-5MPvVY4fugp2xs/edit?usp=sharing

If you have needs/use cases for time series analysis with Spark, we'd
appreciate your feedback.

Thank you


Re: Welcoming some new committers

2018-03-02 Thread Li Jin
Congrats!
On Fri, Mar 2, 2018 at 5:49 PM Holden Karau  wrote:

> Congratulations and welcome everyone! So excited to see the project grow
> our committer base.
>
> On Mar 2, 2018 2:42 PM, "Reynold Xin"  wrote:
>
>> Congrats and welcome!
>>
>>
>> On Fri, Mar 2, 2018 at 10:41 PM, Matei Zaharia 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> The Spark PMC has recently voted to add several new committers to the
>>> project, based on their contributions to Spark 2.3 and other past work:
>>>
>>> - Anirudh Ramanathan (contributor to Kubernetes support)
>>> - Bryan Cutler (contributor to PySpark and Arrow support)
>>> - Cody Koeninger (contributor to streaming and Kafka support)
>>> - Erik Erlandson (contributor to Kubernetes support)
>>> - Matt Cheah (contributor to Kubernetes support and other parts of Spark)
>>> - Seth Hendrickson (contributor to MLlib and PySpark)
>>>
>>> Please join me in welcoming Anirudh, Bryan, Cody, Erik, Matt and Seth as
>>> committers!
>>>
>>> Matei
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>>


[SPIP] as-of join in Spark SQL

2018-01-03 Thread Li Jin
Hi community,

Following instruction on https://spark.apache.org/improvement-proposals.html,
I'd like to propose a SPIP: as-of join in Spark SQL.

Here is the Jira:
https://issues.apache.org/jira/browse/SPARK-22947

If you are interested, please take a look and let me know what you think. I
am looking forward to feedbacks.

Thank you


Re: Spark Data Frame. PreSorded partitions

2017-12-04 Thread Li Jin
Sorry, s/ordered distributed/ordered distribution/g

On Mon, Dec 4, 2017 at 10:37 AM, Li Jin <ice.xell...@gmail.com> wrote:

> Just to give another data point: most of the data we use with Spark are
> sorted on disk, having a way to allow data source to pass ordered
> distributed to DataFrames is really useful for us.
>
> On Mon, Dec 4, 2017 at 9:12 AM, Николай Ижиков <nizhikov@gmail.com>
> wrote:
>
>> Hello, guys.
>>
>> Thank you for answers!
>>
>> > I think pushing down a sort  could make a big difference.
>> > You can however proposes to the data source api 2 to be included.
>>
>> Jörn, are you talking about this jira issue? -
>> https://issues.apache.org/jira/browse/SPARK-15689
>> Is there any additional documentation I has to learn before making any
>> proposition?
>>
>>
>>
>> 04.12.2017 14:05, Holden Karau пишет:
>>
>>> I think pushing down a sort (or really more in the case where the data
>>> is already naturally returned in sorted order on some column) could make a
>>> big difference. Probably the simplest argument for a lot of time being
>>> spent sorting (in some use cases) is the fact it's still one of the
>>> standard benchmarks.
>>>
>>> On Mon, Dec 4, 2017 at 1:55 AM, Jörn Franke <jornfra...@gmail.com
>>> <mailto:jornfra...@gmail.com>> wrote:
>>>
>>> I do not think that the data source api exposes such a thing. You
>>> can however proposes to the data source api 2 to be included.
>>>
>>> However there are some caveats , because sorted can mean two
>>> different things (weak vs strict order).
>>>
>>> Then, is really a lot of time lost because of sorting? The best
>>> thing is to not read data that is not needed at all (see min/max indexes in
>>> orc/parquet or bloom filters in Orc). What is not read
>>> does not need to be sorted. See also predicate pushdown.
>>>
>>>  > On 4. Dec 2017, at 07:50, Николай Ижиков <nizhikov@gmail.com
>>> <mailto:nizhikov@gmail.com>> wrote:
>>>  >
>>>  > Cross-posting from @user.
>>>  >
>>>  > Hello, guys!
>>>  >
>>>  > I work on implementation of custom DataSource for Spark Data
>>> Frame API and have a question:
>>>  >
>>>  > If I have a `SELECT * FROM table1 ORDER BY some_column` query I
>>> can sort data inside a partition in my data source.
>>>  >
>>>  > Do I have a built-in option to tell spark that data from each
>>> partition already sorted?
>>>  >
>>>  > It seems that Spark can benefit from usage of already sorted
>>> partitions.
>>>  > By using of distributed merge sort algorithm, for example.
>>>  >
>>>  > Does it make sense for you?
>>>  >
>>>  >
>>>  > 28.11.2017 18:42, Michael Artz пишет:
>>>  >> I'm not sure other than retrieving from a hive table that is
>>> already sorted.  This sounds cool though, would be interested to know this
>>> as well
>>>  >> On Nov 28, 2017 10:40 AM, "Николай Ижиков" <
>>> nizhikov@gmail.com <mailto:nizhikov@gmail.com> >> nizhikov@gmail.com <mailto:nizhikov@gmail.com>>> wrote:
>>>  >>Hello, guys!
>>>  >>I work on implementation of custom DataSource for Spark Data
>>> Frame API and have a question:
>>>  >>If I have a `SELECT * FROM table1 ORDER BY some_column` query
>>> I can sort data inside a partition in my data source.
>>>  >>Do I have a built-in option to tell spark that data from each
>>> partition already sorted?
>>>  >>It seems that Spark can benefit from usage of already sorted
>>> partitions.
>>>  >>By using of distributed merge sort algorithm, for example.
>>>  >>Does it make sense for you?
>>>  >>
>>> -
>>>  >>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> <mailto:user-unsubscr...@spark.apache.org> >> user-unsubscr...@spark.apache.org <mailto:user-unsubscribe@spark
>>> .apache.org>>
>>>  >
>>>  > 
>>> -
>>>  > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> dev-unsubscr...@spark.apache.org>
>>>  >
>>>
>>> 
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> dev-unsubscr...@spark.apache.org>
>>>
>>>
>>>
>>>
>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>


Re: Spark Data Frame. PreSorded partitions

2017-12-04 Thread Li Jin
Just to give another data point: most of the data we use with Spark are
sorted on disk, having a way to allow data source to pass ordered
distributed to DataFrames is really useful for us.

On Mon, Dec 4, 2017 at 9:12 AM, Николай Ижиков 
wrote:

> Hello, guys.
>
> Thank you for answers!
>
> > I think pushing down a sort  could make a big difference.
> > You can however proposes to the data source api 2 to be included.
>
> Jörn, are you talking about this jira issue? -
> https://issues.apache.org/jira/browse/SPARK-15689
> Is there any additional documentation I has to learn before making any
> proposition?
>
>
>
> 04.12.2017 14:05, Holden Karau пишет:
>
>> I think pushing down a sort (or really more in the case where the data is
>> already naturally returned in sorted order on some column) could make a big
>> difference. Probably the simplest argument for a lot of time being spent
>> sorting (in some use cases) is the fact it's still one of the standard
>> benchmarks.
>>
>> On Mon, Dec 4, 2017 at 1:55 AM, Jörn Franke > > wrote:
>>
>> I do not think that the data source api exposes such a thing. You can
>> however proposes to the data source api 2 to be included.
>>
>> However there are some caveats , because sorted can mean two
>> different things (weak vs strict order).
>>
>> Then, is really a lot of time lost because of sorting? The best thing
>> is to not read data that is not needed at all (see min/max indexes in
>> orc/parquet or bloom filters in Orc). What is not read
>> does not need to be sorted. See also predicate pushdown.
>>
>>  > On 4. Dec 2017, at 07:50, Николай Ижиков > > wrote:
>>  >
>>  > Cross-posting from @user.
>>  >
>>  > Hello, guys!
>>  >
>>  > I work on implementation of custom DataSource for Spark Data Frame
>> API and have a question:
>>  >
>>  > If I have a `SELECT * FROM table1 ORDER BY some_column` query I
>> can sort data inside a partition in my data source.
>>  >
>>  > Do I have a built-in option to tell spark that data from each
>> partition already sorted?
>>  >
>>  > It seems that Spark can benefit from usage of already sorted
>> partitions.
>>  > By using of distributed merge sort algorithm, for example.
>>  >
>>  > Does it make sense for you?
>>  >
>>  >
>>  > 28.11.2017 18:42, Michael Artz пишет:
>>  >> I'm not sure other than retrieving from a hive table that is
>> already sorted.  This sounds cool though, would be interested to know this
>> as well
>>  >> On Nov 28, 2017 10:40 AM, "Николай Ижиков" <
>> nizhikov@gmail.com  > nizhikov@gmail.com >> wrote:
>>  >>Hello, guys!
>>  >>I work on implementation of custom DataSource for Spark Data
>> Frame API and have a question:
>>  >>If I have a `SELECT * FROM table1 ORDER BY some_column` query
>> I can sort data inside a partition in my data source.
>>  >>Do I have a built-in option to tell spark that data from each
>> partition already sorted?
>>  >>It seems that Spark can benefit from usage of already sorted
>> partitions.
>>  >>By using of distributed merge sort algorithm, for example.
>>  >>Does it make sense for you?
>>  >>
>> -
>>  >>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>  > .apache.org >
>>  >
>>  > 
>> -
>>  > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > dev-unsubscr...@spark.apache.org>
>>  >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > dev-unsubscr...@spark.apache.org>
>>
>>
>>
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: [discuss][PySpark] Can we drop support old Pandas (<0.19.2) or what version should we support?

2017-11-14 Thread Li Jin
I think this makes sense. PySpark/Pandas interops in 2.3 are new anyway, I
don't think we need to support the new functionality with older version of
pandas (Takuya's reason 3)

One thing I am not sure is how complicated it is to support pandas < 0.19.2
with old non-Arrow interops and require pandas >= 0.19.2 for new Arrow
interops. Maybe it makes sense to allow user keep using their PySpark code
if they don't want to use any of the new stuff. If this is still
complicated, I would be leaning towards not supporting < 0.19.2.


On Tue, Nov 14, 2017 at 6:04 AM, Hyukjin Kwon  wrote:

> +0 to drop it as I said in the PR. I am seeing It brings a lot of hard
> time to get the cool changes through, and is slowing down them to get
> pushed.
>
> My only worry is, users who depends on lower pandas versions (Pandas
> 0.19.2 seems released less then a year before. In the similar time, Spark
> 2.1.0 was released).
>
> If this worry is less than I expected, I definitely support it. It should
> speed up those cool changes.
>
>
> On 14 Nov 2017 7:14 pm, "Takuya UESHIN"  wrote:
>
> Hi all,
>
> I'd like to raise a discussion about Pandas version.
> Originally we are discussing it at https://github.com/apache/s
> park/pull/19607 but we'd like to ask for feedback from community.
>
>
> Currently we don't explicitly specify the Pandas version we are supporting
> but we need to decide what version we should support because:
>
>   - There have been a number of API evolutions around extension dtypes
> that make supporting pandas 0.18.x and lower challenging.
>
>   - Sometimes Pandas older than 0.19.2 doesn't handle timestamp values
> properly. We want to provide properer support for timestamp values.
>
>   - If users want to use vectorized UDFs, or toPandas / createDataFrame
> from Pandas DataFrame with Arrow which will be released in Spark 2.3, users
> have to upgrade Pandas 0.19.2 or upper anyway because we need pyarrow
> internally, which supports only 0.19.2 or upper.
>
>
> The point I'd like to ask is:
>
> Can we drop support old Pandas (<0.19.2)?
> If not, what version should we support?
>
>
> References:
>
> - vectorized UDF
>   - https://github.com/apache/spark/pull/18659
>   - https://github.com/apache/spark/pull/18732
> - toPandas with Arrow
>   - https://github.com/apache/spark/pull/18459
> - createDataFrame from pandas DataFrame with Arrow
>   - https://github.com/apache/spark/pull/19646
>
>
> Any comments are welcome!
>
> Thanks.
>
> --
> Takuya UESHIN
> Tokyo, Japan
>
> http://twitter.com/ueshin
>
>
>