[jira] [Closed] (SPARK-15251) Cannot apply PythonUDF to aggregated column
[ https://issues.apache.org/jira/browse/SPARK-15251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew Livesey closed SPARK-15251. --- Cannot be reproduced on master, and is now years old. Closing as no longer relevant. > Cannot apply PythonUDF to aggregated column > --- > > Key: SPARK-15251 > URL: https://issues.apache.org/jira/browse/SPARK-15251 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Matthew Livesey >Priority: Major > > In scala it is possible to define a UDF an apply it to an aggregated value in > an expression, for example: > {code} > def timesTwo(x: Int): Int = x * 2 > sqlContext.udf.register("timesTwo", timesTwo _) > sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show() > case class Data(x: Int, y: String) > val data = List(Data(1, "a"), Data(2, "b")) > val rdd = sc.parallelize(data) > val df = rdd.toDF > df.registerTempTable("my_data") > sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show() > +---+ > | t| > +---+ > | 6| > +---+ > {code} > Performing the same computation in pyspark: > {code} > def timesTwo(x): > return x * 2 > sqlContext.udf.register("timesTwo", timesTwo) > data = [(1, 'a'), (2, 'b')] > rdd = sc.parallelize(data) > df = sqlContext.createDataFrame(rdd, ["x", "y"]) > df.registerTempTable("my_data") > sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show() > {code} > Gives the following: > {code} > AnalysisException: u"expression 'pythonUDF' is neither present in the group > by, nor is it an aggregate function. Add to group by or wrap in first() (or > first_value) if you don't care which value you get.;" > {code} > Using a lambda rather than a named function gives the same error. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16191) Code-Generated SpecificColumnarIterator fails for wide pivot with caching
Matthew Livesey created SPARK-16191: --- Summary: Code-Generated SpecificColumnarIterator fails for wide pivot with caching Key: SPARK-16191 URL: https://issues.apache.org/jira/browse/SPARK-16191 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.1 Reporter: Matthew Livesey When caching a pivot of more than 2260 columns, the instance of SpecificColumnarIterator which is generated by code-generation fails to be compiled with: bq. failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method \"()Z\" of class \"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator\" grows beyond 64 KB This can be re-produced in PySpark with the following (it took some trial and error to find that 2261 is the magic number at which the generated class breaks the 64KB limit). {code} def build_pivot(width): categories = ["cat_%s" % i for i in range(0,width)] customers = ["cust_%s" % i for i in range(0,10)] rows = [] for cust in customers: for cat in categories: for i in range(0,4): row = (cust, cat, i, 7.0) rows.append(row) rdd = sc.parallelize(rows) df = sqlContext.createDataFrame(rdd, ["customer", "category", "instance", "value"]) pivot_value_rows = df.select("category").distinct().orderBy("category").collect() pivot_values = [r.category for r in pivot_value_rows] import pyspark.sql.functions as func pivot = df.groupBy('customer').pivot("category", pivot_values).agg(func.sum(df.value)).cache() pivot.write.save('my_pivot', mode='overwrite') for i in [2260, 2261]: try: build_pivot(i) print "Succeeded for %s" % i except: print "Failed for %s" % i {code} Removing the `cache()` call avoids the problem and allows wider pivots, since ColumnarIterator is specifically related to caching it does not get generated where caching is not used. This could be symptomatic of a general problem that generated code can break the 64KB bytecode limit, and so occur in other cases as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15877) DataSource executed twice when using ORDER BY
Matthew Livesey created SPARK-15877: --- Summary: DataSource executed twice when using ORDER BY Key: SPARK-15877 URL: https://issues.apache.org/jira/browse/SPARK-15877 Project: Spark Issue Type: Bug Affects Versions: 1.6.1 Reporter: Matthew Livesey When executing using a custom DataSource, I observed that if using an Order By, the underlying DataSource is executed twice. A small example demonstrating this is here: https://github.com/mattinbits/spark-sql-sort-double-execution >From debugging, I find that when there is a "Sort" in the Logical plan (and >therefore the resulting Physical plan) an "Exchange" object is inserted into >the plan by the method "EnsureRequirements.ensureDistributionAndOrdering()". >this Exchange has a RangePartitioner. At the point that the dataframe is >converted to an RDD, (which is before computation of that RDD should occur). >The RangePartitioner causes execution of the RDD for the purpose of sampling >to get statistics to guide partitioning. This is done by >"Exchange.prepareShuffleDependency()" which in turn calls >"SamplingUtils.reservoirSampleAndCount()". In some cases, this causes a significant performance degradation, anywhere that the cost of computing the RDD is high. The RDD gets executed twice, once during the conversion of Dataframe to RDD, and then again when the RDD is eventually computed, e.g. by a call to "collect()". There doesn't appear to be any configuration setting to control whether an RDD should be executed for sampling, and I haven't been able to determine whether the sampling is necessary to get the correct results or if it is just aimed at improving performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15251) Cannot apply PythonUDF to aggregated column
Matthew Livesey created SPARK-15251: --- Summary: Cannot apply PythonUDF to aggregated column Key: SPARK-15251 URL: https://issues.apache.org/jira/browse/SPARK-15251 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.6.1 Reporter: Matthew Livesey In scala it is possible to define a UDF an apply it to an aggregated value in an expression, for example: {code} def timesTwo(x: Int): Int = x * 2 sqlContext.udf.register("timesTwo", timesTwo _) sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show() case class Data(x: Int, y: String) val data = List(Data(1, "a"), Data(2, "b")) val rdd = sc.parallelize(data) val df = rdd.toDF df.registerTempTable("my_data") sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show() +---+ | t| +---+ | 6| +---+ {code} Performing the same computation in pyspark: {code} def timesTwo(x): return x * 2 sqlContext.udf.register("timesTwo", timesTwo) data = [(1, 'a'), (2, 'b')] rdd = sc.parallelize(data) df = sqlContext.createDataFrame(rdd, ["x", "y"]) df.registerTempTable("my_data") sqlContext.sql("SELECT timesTwo(Sum(x)) t FROM my_data").show() {code} Gives the following: {code} AnalysisException: u"expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;" {code} Using a lambda rather than a named function gives the same error. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org