[jira] [Closed] (SPARK-15251) Cannot apply PythonUDF to aggregated column

2019-03-13 Thread Matthew Livesey (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-15251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Livesey closed SPARK-15251.
---

Cannot be reproduced on master, and is now years old. Closing as no longer 
relevant.

> Cannot apply PythonUDF to aggregated column
> ---
>
> Key: SPARK-15251
> URL: https://issues.apache.org/jira/browse/SPARK-15251
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.6.1
>Reporter: Matthew Livesey
>Priority: Major
>
> In scala it is possible to define a UDF an apply it to an aggregated value in 
> an expression, for example:
> {code}
> def timesTwo(x: Int): Int = x * 2
> sqlContext.udf.register("timesTwo", timesTwo _)
> sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show()
> case class Data(x: Int, y: String)
> val data = List(Data(1, "a"), Data(2, "b"))
> val rdd = sc.parallelize(data)
> val df = rdd.toDF
> df.registerTempTable("my_data")
> sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show() 
> +---+
> |  t|
> +---+
> |  6|
> +---+
> {code}
> Performing the same computation in pyspark:
> {code}
> def timesTwo(x):
> return x * 2
> sqlContext.udf.register("timesTwo", timesTwo)
> data = [(1, 'a'), (2, 'b')]
> rdd = sc.parallelize(data)
> df = sqlContext.createDataFrame(rdd, ["x", "y"])
> df.registerTempTable("my_data")
> sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show()
> {code}
> Gives the following:
> {code}
> AnalysisException: u"expression 'pythonUDF' is neither present in the group 
> by, nor is it an aggregate function. Add to group by or wrap in first() (or 
> first_value) if you don't care which value you get.;"
> {code}
> Using a lambda rather than a named function gives the same error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16191) Code-Generated SpecificColumnarIterator fails for wide pivot with caching

2016-06-24 Thread Matthew Livesey (JIRA)
Matthew Livesey created SPARK-16191:
---

 Summary: Code-Generated SpecificColumnarIterator fails for wide 
pivot with caching
 Key: SPARK-16191
 URL: https://issues.apache.org/jira/browse/SPARK-16191
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.1
Reporter: Matthew Livesey


When caching a pivot of more than 2260 columns, the instance of 
SpecificColumnarIterator which is generated by code-generation fails to be 
compiled with:

bq. failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of 
method \"()Z\" of class 
\"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator\"
 grows beyond 64 KB

This can be re-produced in PySpark with the following (it took some trial and 
error to find that 2261 is the magic number at which the generated class breaks 
the 64KB limit).

{code}
def build_pivot(width):
categories = ["cat_%s" % i for i in range(0,width)]
customers = ["cust_%s" % i for i in range(0,10)]
rows = []
for cust in customers:
for cat in categories:
for i in range(0,4):
row = (cust, cat, i, 7.0)
rows.append(row)
rdd = sc.parallelize(rows)
df = sqlContext.createDataFrame(rdd, ["customer", "category", "instance", 
"value"])
pivot_value_rows = 
df.select("category").distinct().orderBy("category").collect()
pivot_values = [r.category for r in pivot_value_rows]
import pyspark.sql.functions as func
pivot = df.groupBy('customer').pivot("category", 
pivot_values).agg(func.sum(df.value)).cache()
pivot.write.save('my_pivot', mode='overwrite')

for i in [2260, 2261]:
try:
build_pivot(i)
print "Succeeded for %s" % i
except:
print "Failed for %s" % i
{code}

Removing the `cache()` call avoids the problem and allows wider pivots, since 
ColumnarIterator is specifically related to caching it does not get generated 
where caching is not used.

This could be symptomatic of a general problem that generated code can break 
the 64KB bytecode limit, and so occur in other cases as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15877) DataSource executed twice when using ORDER BY

2016-06-10 Thread Matthew Livesey (JIRA)
Matthew Livesey created SPARK-15877:
---

 Summary: DataSource executed twice when using ORDER BY
 Key: SPARK-15877
 URL: https://issues.apache.org/jira/browse/SPARK-15877
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.6.1
Reporter: Matthew Livesey


When executing using a custom DataSource, I observed that if using an Order By, 
the underlying DataSource is executed twice. A small example demonstrating this 
is here: 

https://github.com/mattinbits/spark-sql-sort-double-execution

>From debugging, I find that when there is a "Sort" in the Logical plan (and 
>therefore the resulting Physical plan) an "Exchange" object is inserted into 
>the plan by the method "EnsureRequirements.ensureDistributionAndOrdering()". 
>this Exchange has a RangePartitioner. At the point that the dataframe is 
>converted to an RDD, (which is before computation of that RDD should occur). 
>The RangePartitioner causes execution of the RDD for the purpose of sampling 
>to get statistics to guide partitioning. This is done by 
>"Exchange.prepareShuffleDependency()" which in turn calls 
>"SamplingUtils.reservoirSampleAndCount()". 

In some cases, this causes a significant performance degradation, anywhere that 
the cost of computing the RDD is high. The RDD gets executed twice, once during 
the conversion of Dataframe to RDD, and then again when the RDD is eventually 
computed, e.g. by a call to "collect()". There doesn't appear to be any 
configuration setting to control whether an RDD should be executed for 
sampling, and I haven't been able to determine whether the sampling is 
necessary to get the correct results or if it is just aimed at improving 
performance. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15251) Cannot apply PythonUDF to aggregated column

2016-05-10 Thread Matthew Livesey (JIRA)
Matthew Livesey created SPARK-15251:
---

 Summary: Cannot apply PythonUDF to aggregated column
 Key: SPARK-15251
 URL: https://issues.apache.org/jira/browse/SPARK-15251
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.6.1
Reporter: Matthew Livesey


In scala it is possible to define a UDF an apply it to an aggregated value in 
an expression, for example:

{code}
def timesTwo(x: Int): Int = x * 2
sqlContext.udf.register("timesTwo", timesTwo _)
sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show()
case class Data(x: Int, y: String)
val data = List(Data(1, "a"), Data(2, "b"))
val rdd = sc.parallelize(data)
val df = rdd.toDF
df.registerTempTable("my_data")
sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show() 

+---+
|  t|
+---+
|  6|
+---+
{code}

Performing the same computation in pyspark:

{code}
def timesTwo(x):
return x * 2

sqlContext.udf.register("timesTwo", timesTwo)
data = [(1, 'a'), (2, 'b')]
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, ["x", "y"])

df.registerTempTable("my_data")
sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show()
{code}

Gives the following:

{code}
AnalysisException: u"expression 'pythonUDF' is neither present in the group by, 
nor is it an aggregate function. Add to group by or wrap in first() (or 
first_value) if you don't care which value you get.;"
{code}

Using a lambda rather than a named function gives the same error.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org