[jira] [Created] (SPARK-34259) Reading a partitioned dataset with a partition value of NOW causes the value to be parsed as a timestamp.

2021-01-27 Thread Chris Martin (Jira)
Chris Martin created SPARK-34259:


 Summary: Reading a partitioned dataset with a partition value of 
NOW causes the value to be parsed as a timestamp.
 Key: SPARK-34259
 URL: https://issues.apache.org/jira/browse/SPARK-34259
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.1
Reporter: Chris Martin


*Problem*

Reading a partitioned dataset where one of the column values matches a special 
timestamp (NOW, TODAY etc) causes the value to be interpreted as a timestamp 
rather than a string. 

*Example Code (Scala)*
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object TestBug {

  def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").getOrCreate()

val df = spark.range(1, 2).withColumn("partition", lit("NOW"))
df.write.mode("overwrite").partitionBy("partition").parquet("bug")

spark.read.parquet("bug").show(truncate = false)
  }

}
{code}
 The above program prints out:
{noformat}
+---+--+
|id |partition |
+---+--+
|1 |2021-01-27 08:53:23.650039|
+---+--+
{noformat}
 

*Analysis*

This happens because in PartitioningUtils.inferPartitionColumnValue we try and 
cast the partition value to a timestamp in order to determine if timestamp is a 
valid interpretation.  As NOW etc are literals which are valid to cast to 
timestamps, the code ends up as interpreting the value as a timestamp.

I think what we want to do here is change 
PartitioningUtils.inferPartitionColumnValue so that when it  attempts to 
interpret as timestamp we ignore the special values. This looks difficult to do 
if we continue to use cast, but one other option is to add an option to
DateTimeUtils.stringToDate to tell it to ignore special values and instead use 
that to do the conversion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-06-17 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865433#comment-16865433
 ] 

Chris Martin commented on SPARK-27463:
--

 sounds good to me too.

> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-06-13 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862828#comment-16862828
 ] 

Chris Martin commented on SPARK-27463:
--

Hi [~hyukjin.kwon]

Ah I see your concern now.  It think it’s fair to say that the cogrouping 
functionality proposed has no analogous API in Pandas.  In my opinion that’s 
understandable as Pandas is fundamentally a library for manipulating local data 
so the problems of colocating multiple DatafFrames don’t apply as they do in 
Spark.  That said, the inspiration behind the proposed API is clearly that of 
the Pandas groupby().apply() so I’d argue it is not without precedent.

I think the more direct comparison here is with the existing Dataset cogroup, 
where high level functionality is almost exactly the same (partition two 
distinct DatafFrames such that partitions are cogroup and apply a flatmap 
operation over them) with the differences being in the cogroup key definition 
(typed for datasets, untyped for pandas-udf), Input (Iterables for Datasets, 
Pandas DataFrames for pandas-udf) and Output (Iterable for Datasets, pandas 
Dataframe for pandas-udf). Now at this point one might observe that we have two 
different language-specific implementations of the same high level 
functionality.  This is true, however it’s been the case since the introduction 
of Pandas Udfs (see groupBy().apply() vs groupByKey().flatmapgroups()) and is 
imho a good thing; it allows us to provide functionality that plays to the 
strength of each individual language given that what is simple and idiomatic in 
Python is not in Scala and vice versa.

If, considering this, we agree that this cogroup functionality both useful and 
suitable as exposing via a Pandas UDF (and I hope we do, but please say if you 
disagree), the question now comes as to what we would like the api to be. At 
this point let’s consider the API as currently proposed in the design doc.

 
{code:java}
result = df1.cogroup(df2, on='id').apply(my_pandas_udf)
{code}

This API is concise and consistent with existing groupby.apply().  The 
disadvantage is that it isn’t consistent with Dataset’s cogroup and, as this 
API doesn’t exist in Pandas, it can’t be consistent with that (although I would 
argue that if Pandas did introduce such an API it would look a lot like this).

The alternative would be to implement something on RelationalGroupedData as 
described by Li in the post above (I think we can discount something based on 
KeyValueGroupedDataset as if my reading of the code is correct this would only 
apply for typed APIs which this isn’t).  The big advantage here is that this is 
much more consistent with the existing Dataset cogroup.  On the flip side it 
comes at the cost of a little more verbosity and IMHO is a little less 
pythonic/in the style of Pandas.  That being the case, I’m slightly in favour 
of the the API as currently proposed in the design doc, but am happy to be 
swayed to something else if the majority have a different opinion.

> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-06-12 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861995#comment-16861995
 ] 

Chris Martin commented on SPARK-27463:
--

Also my assumption is that the most difficult part of this is extending the udf 
functionality such that multiple DataFrames can be passed as arguments to a 
given udf.  I have a fairly rough design proposal for how this might be 
achieved.  Once this has been refined slightly I'll post it up so that people 
can comment. 

> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-06-12 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861993#comment-16861993
 ] 

Chris Martin commented on SPARK-27463:
--

Hi [~hyukjin.kwon]- I've just started working on the code side of this (as an 
aside I seem unable to assign this Jira to me- do you know how I can do this?). 

Regarding your questions- I don't think there is an analogous API in pandas 
although perhaps [~icexelloss] knows of one.  In terms of comparison to the 
Dataset Cogroup there are obviously a number of similarities but the biggest 
difference is that the Scala version you end up operating on a couple of Scala 
Iterators whereas in this proposal you would operate on a couple of Pandas 
DataFrames.  This means that the Scala version doesn't necessarily need to be 
able to store the entire cogroup in memory, but on the other hand gives you a 
much less rich data structure (a Scala iterator as opposed to a Pandas 
DataFrame).   I think this distinction is basically analogous to that between 
the Python groupby().apply()  and the Scala groupbyKey().flatmapgroups(). In 
each case you end up operating on a data structure which is more in keeping 
with the language at hand. 

 

 

 

> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs

2019-05-18 Thread Chris Martin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Martin updated SPARK-27463:
-
Summary: Support Dataframe Cogroup via Pandas UDFs   (was: SPIP: Support 
Dataframe Cogroup via Pandas UDFs )

> Support Dataframe Cogroup via Pandas UDFs 
> --
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs

2019-05-18 Thread Chris Martin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Martin updated SPARK-27463:
-
Labels:   (was: SPIP)

> SPIP: Support Dataframe Cogroup via Pandas UDFs 
> 
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs

2019-04-15 Thread Chris Martin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Martin updated SPARK-27463:
-
Description: 
Recent work on Pandas UDFs in Spark, has allowed for improved interoperability 
between Pandas and Spark.  This proposal aims to extend this by introducing a 
new Pandas UDF type which would allow for a cogroup operation to be applied to 
two PySpark DataFrames.

Full details are in the google document linked below.

 

  was:
h2. *Background and Motivation*

Recently there has been a great deal of work in PySpark to improve 
interoperability with the Pandas library. This work has allowed users to write 
User Defined Functions (UDFs) in Pandas which can then be applied to a Spark 
DataFrame. The benefit here is that it allows users to combine the 
functionality of Pandas with the parallelisation abilities of Spark. In 
addition, these new Pandas UDFs have significantly lower overhead than 
traditional UDFS as they operate on a batch of data at a time (i.e. they are 
vectorised) and they use Apache Arrow for serialisation between the JVM and 
Python processes.

As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively 
offer a map operation at the row level, while Grouped Map UDFs allow a map 
operation on a group of data. This functionality has proved successful in 
allowing users to integrate Spark with existing Pandas workflows, however there 
are situations where the existing functionality offered is not sufficient. One 
such case is analogous to the existing Cogroup functionality available on RDDs 
and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In 
this case, the user would like to group two Spark DataFrames by a common key 
and then apply a python function to each group. This python function would take 
two pandas DataFrames as its arguments and would return an arbitrary length 
Pandas DataFrame.

To give a concrete example of the usefulness of this functionality, consider 
the use case of performing an as-of join between two distinct DataFrames This 
is something that has traditionally been very difficult to do in Spark (and 
indeed in SQL in general)[2] but which has good support in Pandas[3]. If 
Cogroup-like functionality was available in PySpark then one could simply write 
a Pandas function to perform the as-of joining which could then be applied to 
two (appropriately grouped) DataFrames.

This proposal therefore advocates introducing a new API call which would allow 
for a Cogrouped Pandas UDF.

[1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e]

[2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims 
to add asof join functionality to Spark.

[3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html]
h2. *API Changes*

The public API changes would all be on the PySpark side. In terms of the API 
itself there are a couple of options depending on whether the goal is syntactic 
brevity or with consistency with the DataSet version of cogroup. If brevity is 
the aim then a new method can be added to the DataFrame class:

 
{code:java}
# other is another DataFrame, on is the cogroup key, udf is the function to 
apply.
def cogroup(self, other, on, udf){code}
 

Alternatively, to be consistent with the DataSet version of cogroup, a new 
method could be added to the GroupedData class.

 
{code:java}
# other is another GroupedData, udf is the function to apply.
def cogroup(self, other, udf){code}
 

The exact API can be worked out as part of this SPIP and the document will be 
updated once a decision has been reached.

In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify 
this new type of UDF. Functions annotated with this decorator should take two 
Pandas DataFrames and return a single Pandas DataFrame. Here is an example of 
usage, using the as-of join use case described earlier and the first option for 
the API syntax.

 
{code:java}
@pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP)
# df1, df2 and function return are all pandas.DataFrames
def asof_join(df1, df2):
  return pd.merge_asof(df1, df2, on='time')

df1.cogroup(df2, on='product_id', apply=asof_join){code}
 
h2. *Target Personas*

Data scientists, data engineers, library developers.
h2. *Scope*
 * Initial implementation will only consider the case of Cogrouping exactly two 
DataFrames. Further work may extend this to the case of multiple DataFrames
 * API call is to be made available via PySpark only. No equivalent 
R/Java/Scala functionality will be offered.

h2. *Design*
 * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark
 * New public method to be added to either GroupedData or DataFrame to expose 
cogroup in Pyspark
 * New package private method to be added to RelationGroupedDataset to allow 

[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs

2019-04-15 Thread Chris Martin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Martin updated SPARK-27463:
-
Description: 
h2. *Background and Motivation*

Recently there has been a great deal of work in PySpark to improve 
interoperability with the Pandas library. This work has allowed users to write 
User Defined Functions (UDFs) in Pandas which can then be applied to a Spark 
DataFrame. The benefit here is that it allows users to combine the 
functionality of Pandas with the parallelisation abilities of Spark. In 
addition, these new Pandas UDFs have significantly lower overhead than 
traditional UDFS as they operate on a batch of data at a time (i.e. they are 
vectorised) and they use Apache Arrow for serialisation between the JVM and 
Python processes.

As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively 
offer a map operation at the row level, while Grouped Map UDFs allow a map 
operation on a group of data. This functionality has proved successful in 
allowing users to integrate Spark with existing Pandas workflows, however there 
are situations where the existing functionality offered is not sufficient. One 
such case is analogous to the existing Cogroup functionality available on RDDs 
and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In 
this case, the user would like to group two Spark DataFrames by a common key 
and then apply a python function to each group. This python function would take 
two pandas DataFrames as its arguments and would return an arbitrary length 
Pandas DataFrame.

To give a concrete example of the usefulness of this functionality, consider 
the use case of performing an as-of join between two distinct DataFrames This 
is something that has traditionally been very difficult to do in Spark (and 
indeed in SQL in general)[2] but which has good support in Pandas[3]. If 
Cogroup-like functionality was available in PySpark then one could simply write 
a Pandas function to perform the as-of joining which could then be applied to 
two (appropriately grouped) DataFrames.

This proposal therefore advocates introducing a new API call which would allow 
for a Cogrouped Pandas UDF.

[1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e]

[2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims 
to add asof join functionality to Spark.

[3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html]
h2. *API Changes*

The public API changes would all be on the PySpark side. In terms of the API 
itself there are a couple of options depending on whether the goal is syntactic 
brevity or with consistency with the DataSet version of cogroup. If brevity is 
the aim then a new method can be added to the DataFrame class:

 
{code:java}
# other is another DataFrame, on is the cogroup key, udf is the function to 
apply.
def cogroup(self, other, on, udf){code}
 

Alternatively, to be consistent with the DataSet version of cogroup, a new 
method could be added to the GroupedData class.

 
{code:java}
# other is another GroupedData, udf is the function to apply.
def cogroup(self, other, udf){code}
 

The exact API can be worked out as part of this SPIP and the document will be 
updated once a decision has been reached.

In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify 
this new type of UDF. Functions annotated with this decorator should take two 
Pandas DataFrames and return a single Pandas DataFrame. Here is an example of 
usage, using the as-of join use case described earlier and the first option for 
the API syntax.

 
{code:java}
@pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP)
# df1, df2 and function return are all pandas.DataFrames
def asof_join(df1, df2):
  return pd.merge_asof(df1, df2, on='time')

df1.cogroup(df2, on='product_id', apply=asof_join){code}
 
h2. *Target Personas*

Data scientists, data engineers, library developers.
h2. *Scope*
 * Initial implementation will only consider the case of Cogrouping exactly two 
DataFrames. Further work may extend this to the case of multiple DataFrames
 * API call is to be made available via PySpark only. No equivalent 
R/Java/Scala functionality will be offered.

h2. *Design*
 * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark
 * New public method to be added to either GroupedData or DataFrame to expose 
cogroup in Pyspark
 * New package private method to be added to RelationGroupedDataset to allow 
cogroup in Scala
 * New logical node to be added representing cogroup.
 * New physical node to be added to implement cogroup. This node will ensure 
correct partitioning of input DataFrames and create two groupedIterators which 
will be piped into the Python process for UDF execution.
 * Extend ArrowPythonRunner such that it can 

[jira] [Created] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs

2019-04-15 Thread Chris Martin (JIRA)
Chris Martin created SPARK-27463:


 Summary: SPIP: Support Dataframe Cogroup via Pandas UDFs 
 Key: SPARK-27463
 URL: https://issues.apache.org/jira/browse/SPARK-27463
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, SQL
Affects Versions: 3.0.0
Reporter: Chris Martin


h2. *Background and Motivation*

Recently there has been a great deal of work in PySpark to improve 
interoperability with the Pandas library. This work has allowed users to write 
User Defined Functions (UDFs) in Pandas which can then be applied to a Spark 
DataFrame. The benefit here is that it allows users to combine the 
functionality of Pandas with the parallelisation abilities of Spark. In 
addition, these new Pandas UDFs have significantly lower overhead than 
traditional UDFS as they operate on a batch of data at a time (i.e. they are 
vectorised) and they use Apache Arrow for serialisation between the JVM and 
Python processes.

As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively 
offer a map operation at the row level, while Grouped Map UDFs allow a map 
operation on a group of data. This functionality has proved successful in 
allowing users to integrate Spark with existing Pandas workflows, however there 
are situations where the existing functionality offered is not sufficient. One 
such case is analogous to the existing Cogroup functionality available on RDDs 
and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In 
this case, the user would like to group two Spark DataFrames by a common key 
and then apply a python function to each group. This python function would take 
two pandas DataFrames as its arguments and would return an arbitrary length 
Pandas DataFrame.

To give a concrete example of the usefulness of this functionality, consider 
the use case of performing an as-of join between two distinct DataFrames This 
is something that has traditionally been very difficult to do in Spark (and 
indeed in SQL in general)[2] but which has good support in Pandas[3]. If 
Cogroup-like functionality was available in PySpark then one could simply write 
a Pandas function to perform the as-of joining which could then be applied to 
two (appropriately grouped) DataFrames.

This proposal therefore advocates introducing a new API call which would allow 
for a Cogrouped Pandas UDF.

[1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e]

[2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims 
to add asof join functionality to Spark.

[3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html]
h2. *API Changes*

The public API changes would all be on the PySpark side. In terms of the API 
itself there are a couple of options depending on whether the goal is syntactic 
brevity or with consistency with the DataSet version of cogroup. If brevity is 
the aim then a new method can be added to the DataFrame class:

 
{code:java}
# other is another DataFrame, on is the cogroup key, udf is the function to 
apply.
def cogroup(self, other, on, udf){code}
 

Alternatively, to be consistent with the DataSet version of cogroup, a new 
method could be added to the GroupedData class.

 
{code:java}
# other is another GroupedData, udf is the function to apply.
def cogroup(self, other, udf){code}
 

The exact API can be worked out as part of this SPIP and the document will be 
updated once a decision has been reached.

In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify 
this new type of UDF. Functions annotated with this decorator should take two 
Pandas DataFrames and return a single Pandas DataFrame. Here is an example of 
usage, using the as-of join use case described earlier and the first option for 
the API syntax.

 
{code:java}
@pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP)
# df1, df2 and function return are all pandas.DataFrames
def asof_join(df1, df2):
  return pd.merge_asof(df1, df2, on='time')

df1.cogroup(df2, on='product_id', apply=asof_join){code}
 
h2. *Target Personas*

Data scientists, data engineers, library developers.
h2. *Scope*
 * Initial implementation will only consider the case of Cogrouping exactly two 
DataFrames. Further work may extend this to the case of multiple DataFrames
 * API call is to be made available via PySpark only. No equivalent 
R/Java/Scala functionality will be offered.

h2. *Design*
 * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark
 * New public method to be added to either GroupedData or DataFrame to expose 
cogroup in Pyspark
 * New package private method to be added to RelationGroupedDataset to allow 
cogroup in Scala
 * New logical node to be added representing cogroup.
 * New physical node to be added to implement cogroup. This node 

[jira] [Commented] (SPARK-6305) Add support for log4j 2.x to Spark

2018-08-22 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-6305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589311#comment-16589311
 ] 

Chris Martin commented on SPARK-6305:
-

I don't think that's such a big deal so long as Spark can have a hard 
dependency on log4j directly (as opposed to being slf4j with log4j as the 
default backend).  At the moment I'm assuming the former and it's relatively 
easy (although not as easy as log4j 1.x) to adjust the log level as needed.  If 
one needs to consider other logging backends (e.g. JUL) then all this gets more 
complicated as the log level manipulation needs to work for each backend we 
support  and if we need to consider log4j 1.x as yet another backend then my 
approach is not going to work!

For waht ti's worth this is one of the reasons why I'm a bit confused by 
logging.scala.  There's some code (and a comment) in there which implies that 
users should be able to swap out for JUL if they want.  I can't see how this 
will ever work through as all the code for switching log levels is assuming 
log4j loggers!

> Add support for log4j 2.x to Spark
> --
>
> Key: SPARK-6305
> URL: https://issues.apache.org/jira/browse/SPARK-6305
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Reporter: Tal Sliwowicz
>Priority: Minor
>
> log4j 2 requires replacing the slf4j binding and adding the log4j jars in the 
> classpath. Since there are shaded jars, it must be done during the build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6305) Add support for log4j 2.x to Spark

2018-08-22 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-6305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589293#comment-16589293
 ] 

Chris Martin commented on SPARK-6305:
-

Thanks [~srowen] and [~ste...@apache.org] for the feedback.

So far my strategy has been to exclude log4j 1.x (log4j:log4j) and the log4j 
1.x slf4j binding (org.slf4j.slf4j-log4j12) from transitive dependencies.  In 
their place I'm adding in the in the log4j-1.2-api bridge which should provide 
the log4j 1.x classes they expect with the output redirected to  log4j2.  
Hopefully this should avoid the stacktrace issue that Steve mentions but that 
would depend if any of the dependencies are doing anything funky. 

The only problems I forsee with this are:

1) There a bunch of stuff going on in logging.scala to do with lifecycle 
management and potential use of JUL that I'm genuinely unsure as to what it's 
trying to achieve.  I might have to ask on the developer mailing list to find 
out what's going on here, but if anyone here understands then do let me know.  
From what I've seen there's no need to shade any of this, but it's perfectly 
possible I might be missing something.

2) I'm less familiar with the projects in external- and I'm not entirely sure 
under what environments they should run.  I'm going to leave these til the end 
when hopefully I'll understand this a bit more!

3)  As has been mentioned- if and when we decide to move to log4j2- everyone's 
existing properties files will need to change (and from what I've read on the 
log4j jira- they will never have perfect backwards compatibility).  For now I'm 
just seeing if we can make spark use log4j2.

 

thanks,

 

Chris

 

> Add support for log4j 2.x to Spark
> --
>
> Key: SPARK-6305
> URL: https://issues.apache.org/jira/browse/SPARK-6305
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Reporter: Tal Sliwowicz
>Priority: Minor
>
> log4j 2 requires replacing the slf4j binding and adding the log4j jars in the 
> classpath. Since there are shaded jars, it must be done during the build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6305) Add support for log4j 2.x to Spark

2018-08-21 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-6305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587814#comment-16587814
 ] 

Chris Martin commented on SPARK-6305:
-

[~srowen]

I've taken a look at this and I'd like to sync up to see your perspective 
(given that you already looked at this).  From what I can see:

1) As you say there's a load of pom manipulation to do in order to exclude 
log4j1 and related components from dependencies. 

2) There are a bunch of unit tests that rely on hooking into log4j internals in 
order to capture log output and make assertions against them.

3) The org.apache.spark.internal.Logging trait has some fairly low level log4j 
logic.

4) There are a few log4.properties file that would have to be converted to the 
log4j2 format.

Of these 1) is a pain but it's fairly mechanical and I would hope we could 
write some sort of automated check to tell us if log4j1 is still being brought 
in.  2) is fairly easily solvable; I have some code to do this.  3) worries me 
as this class is doing some fairly hairy stuff and I'm not sure of the use 
cases- it would be good to have a chat about this. 4) is simple enough as far 
as spark goes, but the fly in the ointment is that all existing user log 
configuration would need to be changed.

thoughts?

 

Chris

 

 

 

 

 

 

 

 

 

> Add support for log4j 2.x to Spark
> --
>
> Key: SPARK-6305
> URL: https://issues.apache.org/jira/browse/SPARK-6305
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Reporter: Tal Sliwowicz
>Priority: Minor
>
> log4j 2 requires replacing the slf4j binding and adding the log4j jars in the 
> classpath. Since there are shaded jars, it must be done during the build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24950) scala DateTimeUtilsSuite daysToMillis and millisToDays fails w/java 8 181-b13

2018-07-27 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560312#comment-16560312
 ] 

Chris Martin commented on SPARK-24950:
--

Hi,

 

just to say that I looked at this and came to the same conclusion as Shane.  
I've submitted a PR which excludes both New Years Eve and New Years day from 
the test- which should mean it will work on both old and new jvms.

> scala DateTimeUtilsSuite daysToMillis and millisToDays fails w/java 8 181-b13
> -
>
> Key: SPARK-24950
> URL: https://issues.apache.org/jira/browse/SPARK-24950
> Project: Spark
>  Issue Type: Bug
>  Components: Build, Tests
>Affects Versions: 2.4.0
>Reporter: shane knapp
>Priority: Major
>
> during my travails to port the spark builds to run on ubuntu 16.04LTS, i have 
> encountered a strange and apparently java version-specific failure on *one* 
> specific unit test.
> the failure is here:
> [https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/868/testReport/junit/org.apache.spark.sql.catalyst.util/DateTimeUtilsSuite/daysToMillis_and_millisToDays/]
> the java version on this worker is:
> sknapp@ubuntu-testing:~$ java -version
>  java version "1.8.0_181"
>  Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
> however, when i run this exact build on the other ubuntu workers, it passes.  
> they systems are set up (for the most part) identically except for the java 
> version:
> sknapp@amp-jenkins-staging-worker-02:~$ java -version
>  java version "1.8.0_171"
>  Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)
> there are some minor kernel and other package differences on these ubuntu 
> workers, but nothing that (in my opinion) would affect this test.  i am 
> willing to help investigate this, however.
> the test also passes on the centos 6.9 workers, which have the following java 
> version installed:
> [sknapp@amp-jenkins-worker-05 ~]$ java -version
> java version "1.8.0_60"
> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
> Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)my guess is 
> that either:
> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
> or
> sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
> is doing something wrong.  i am not a scala expert by any means, so i'd 
> really like some help in trying to un-block the project to port the builds to 
> ubuntu.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24950) scala DateTimeUtilsSuite daysToMillis and millisToDays fails w/java 8 181-b13

2018-07-27 Thread Chris Martin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560312#comment-16560312
 ] 

Chris Martin edited comment on SPARK-24950 at 7/27/18 8:48 PM:
---

Hi,

 

just to say that I looked at this and came to the same conclusion as Sean.  
I've submitted a PR which excludes both New Years Eve and New Years day from 
the test- which should mean it will work on both old and new jvms.


was (Author: d80tb7):
Hi,

 

just to say that I looked at this and came to the same conclusion as Shane.  
I've submitted a PR which excludes both New Years Eve and New Years day from 
the test- which should mean it will work on both old and new jvms.

> scala DateTimeUtilsSuite daysToMillis and millisToDays fails w/java 8 181-b13
> -
>
> Key: SPARK-24950
> URL: https://issues.apache.org/jira/browse/SPARK-24950
> Project: Spark
>  Issue Type: Bug
>  Components: Build, Tests
>Affects Versions: 2.4.0
>Reporter: shane knapp
>Priority: Major
>
> during my travails to port the spark builds to run on ubuntu 16.04LTS, i have 
> encountered a strange and apparently java version-specific failure on *one* 
> specific unit test.
> the failure is here:
> [https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/868/testReport/junit/org.apache.spark.sql.catalyst.util/DateTimeUtilsSuite/daysToMillis_and_millisToDays/]
> the java version on this worker is:
> sknapp@ubuntu-testing:~$ java -version
>  java version "1.8.0_181"
>  Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
> however, when i run this exact build on the other ubuntu workers, it passes.  
> they systems are set up (for the most part) identically except for the java 
> version:
> sknapp@amp-jenkins-staging-worker-02:~$ java -version
>  java version "1.8.0_171"
>  Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)
> there are some minor kernel and other package differences on these ubuntu 
> workers, but nothing that (in my opinion) would affect this test.  i am 
> willing to help investigate this, however.
> the test also passes on the centos 6.9 workers, which have the following java 
> version installed:
> [sknapp@amp-jenkins-worker-05 ~]$ java -version
> java version "1.8.0_60"
> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
> Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)my guess is 
> that either:
> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
> or
> sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
> is doing something wrong.  i am not a scala expert by any means, so i'd 
> really like some help in trying to un-block the project to port the builds to 
> ubuntu.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org