Re: Dataframe's .drop in PySpark doesn't accept Column
I understand the rational, but when you need to reference, for example when using a join, some column which name is not unique, it can be confusing in terms of API. However I figured out that you can use a qualified name for the column using the *other-dataframe.column_name* syntax, maybe we just need to document this well... Le dim. 31 mai 2015 à 12:18, 范文臣 cloud0...@163.com a écrit : `Column` in `DataFrame` is a general concept. `field1` is a column, `field + 1` is a column, `field1 field2` is also a column. For API like `select`, it should accept `Column` as we need general expressions. But for `drop`, we can only drop exist columns which is not general expression. So I think it makes sense to only allow String in `drop` as column name. At 2015-05-31 02:41:52, Reynold Xin r...@databricks.com wrote: Name resolution is not as easy I think. Wenchen can maybe give you some advice on resolution about this one. On Sat, May 30, 2015 at 9:37 AM, Yijie Shen henry.yijies...@gmail.com wrote: I think just match the Column’s expr as UnresolvedAttribute and use UnresolvedAttribute’s name to match schema’s field name is enough. Seems no need to regard expr as a more general one. :) On May 30, 2015 at 11:14:05 PM, Girardot Olivier ( o.girar...@lateral-thoughts.com) wrote: Jira done : https://issues.apache.org/jira/browse/SPARK-7969 I've already started working on it but it's less trivial than it seems because I don't exactly now the inner workings of the catalog, and how to get the qualified name of a column to match it against the schema/catalog. Regards, Olivier. Le sam. 30 mai 2015 à 09:54, Reynold Xin r...@databricks.com a écrit : Yea would be great to support a Column. Can you create a JIRA, and possibly a pull request? On Fri, May 29, 2015 at 2:45 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Actually, the Scala API too is only based on column name Le ven. 29 mai 2015 à 11:23, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi, Testing a bit more 1.4, it seems that the .drop() method in PySpark doesn't seem to accept a Column as input datatype : *.join(only_the_best, only_the_best.pol_no == df.pol_no, inner).drop(only_the_best.pol_no)\* File /usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py, line 1225, in drop jdf = self._jdf.drop(colName) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 523, in __call__ (new_args, temp_args) = self._get_args(args) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 510, in _get_args temp_arg = converter.convert(arg, self.gateway_client) File /usr/local/lib/python2.7/site-packages/py4j/java_collections.py, line 490, in convert for key in object.keys(): TypeError: 'Column' object is not callable It doesn't seem very consistent with rest of the APIs - and is especially annoying when executing joins - because drop(my_key) is not a qualified reference to the column. What do you think about changing that ? or what is the best practice as a workaround ? Regards, Olivier.
Re: Catalyst: Reusing already computed expressions within a projection
Thanks for pointing to that link! It looks like it’s useful, but it does look more complicated than the case I’m trying to address. In my case, we set y = f(x), then we use y later on in future projections (z = g(y)). In that case, the analysis is trivial in that we aren’t trying to find equivalent expressions, we actually know that z is based off of y. In addition, we are already storing off y because it’s one of the projections, so there’s no tradeoff between time vs memory. Perf gains I believe that the performance gains can be quite substantial, but can you check that the case I bring up below will indeed benefit from such a optimization? For example, suppose I have a date column (unclean_date) that is stored in some strange string format. I then use an udf or a hive function that converts it to the Catalyst date type (cleaned_date). Next, I want to extract one column with the month, and another with the year, so I can do groupBys/aggregations later. Currently, every projection/expression based off of the cleaned_date will have to do the expensive parsing again if I avoid caching and prefer to do everything in one pass. Code generation phase vs optimization Is there a reason why doing it at the optimization phase is the wrong approach? If sounds like we’re actually logically changing the order of computation if we do my proposed approach. I do agree however if there are lower hanging fruits, then we should tackle those first =) On Sat, May 30, 2015 at 10:00 PM Michael Armbrust mich...@databricks.com wrote: I think this is likely something that we'll want to do during the code generation phase. Though its probably not the lowest hanging fruit at this point. On Sun, May 31, 2015 at 5:02 AM, Reynold Xin r...@databricks.com wrote: I think you are looking for http://en.wikipedia.org/wiki/Common_subexpression_elimination in the optimizer. One thing to note is that as we do more and more optimization like this, the optimization time might increase. Do you see a case where this can bring you substantial performance gains? On Sat, May 30, 2015 at 9:02 AM, Justin Uang justin.u...@gmail.com wrote: On second thought, perhaps can this be done by writing a rule that builds the dag of dependencies between expressions, then convert it into several layers of projections, where each new layer is allowed to depend on expression results from previous projections? Are there any pitfalls to this approach? On Sat, May 30, 2015 at 11:30 AM Justin Uang justin.u...@gmail.com wrote: If I do the following df2 = df.withColumn('y', df['x'] * 7) df3 = df2.withColumn('z', df2.y * 3) df3.explain() Then the result is Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS y#64,((x#59 * 7.0) AS y#64 * 3.0) AS z#65] PhysicalRDD [date#56,id#57,timestamp#58,x#59], MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163 Effectively I want to compute y = f(x) z = g(y) The catalyst optimizer realizes that y#64 is the same as the one previously computed, however, when building the projection, it is ignoring the fact that it had already computed y, so it calculates `x * 7` twice. y = x * 7 z = x * 7 * 3 If I wanted to make this fix, would it be possible to do the logic in the optimizer phase? I imagine that it's difficult because the expressions in InterpretedMutableProjection don't have access to the previous expression results, only the input row, and that the design doesn't seem to be catered for this.
Re: Catalyst: Reusing already computed expressions within a projection
I think Michael's bringing up code gen because the compiler (not Spark, but javac and JVM JIT) already does common subexpression elimination, so we might get it for free during code gen. On Sun, May 31, 2015 at 11:48 AM, Justin Uang justin.u...@gmail.com wrote: Thanks for pointing to that link! It looks like it’s useful, but it does look more complicated than the case I’m trying to address. In my case, we set y = f(x), then we use y later on in future projections (z = g(y)). In that case, the analysis is trivial in that we aren’t trying to find equivalent expressions, we actually know that z is based off of y. In addition, we are already storing off y because it’s one of the projections, so there’s no tradeoff between time vs memory. Perf gains I believe that the performance gains can be quite substantial, but can you check that the case I bring up below will indeed benefit from such a optimization? For example, suppose I have a date column (unclean_date) that is stored in some strange string format. I then use an udf or a hive function that converts it to the Catalyst date type (cleaned_date). Next, I want to extract one column with the month, and another with the year, so I can do groupBys/aggregations later. Currently, every projection/expression based off of the cleaned_date will have to do the expensive parsing again if I avoid caching and prefer to do everything in one pass. Code generation phase vs optimization Is there a reason why doing it at the optimization phase is the wrong approach? If sounds like we’re actually logically changing the order of computation if we do my proposed approach. I do agree however if there are lower hanging fruits, then we should tackle those first =) On Sat, May 30, 2015 at 10:00 PM Michael Armbrust mich...@databricks.com wrote: I think this is likely something that we'll want to do during the code generation phase. Though its probably not the lowest hanging fruit at this point. On Sun, May 31, 2015 at 5:02 AM, Reynold Xin r...@databricks.com wrote: I think you are looking for http://en.wikipedia.org/wiki/Common_subexpression_elimination in the optimizer. One thing to note is that as we do more and more optimization like this, the optimization time might increase. Do you see a case where this can bring you substantial performance gains? On Sat, May 30, 2015 at 9:02 AM, Justin Uang justin.u...@gmail.com wrote: On second thought, perhaps can this be done by writing a rule that builds the dag of dependencies between expressions, then convert it into several layers of projections, where each new layer is allowed to depend on expression results from previous projections? Are there any pitfalls to this approach? On Sat, May 30, 2015 at 11:30 AM Justin Uang justin.u...@gmail.com wrote: If I do the following df2 = df.withColumn('y', df['x'] * 7) df3 = df2.withColumn('z', df2.y * 3) df3.explain() Then the result is Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS y#64,((x#59 * 7.0) AS y#64 * 3.0) AS z#65] PhysicalRDD [date#56,id#57,timestamp#58,x#59], MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163 Effectively I want to compute y = f(x) z = g(y) The catalyst optimizer realizes that y#64 is the same as the one previously computed, however, when building the projection, it is ignoring the fact that it had already computed y, so it calculates `x * 7` twice. y = x * 7 z = x * 7 * 3 If I wanted to make this fix, would it be possible to do the logic in the optimizer phase? I imagine that it's difficult because the expressions in InterpretedMutableProjection don't have access to the previous expression results, only the input row, and that the design doesn't seem to be catered for this.
Re: [VOTE] Release Apache Spark 1.4.0 (RC3)
+1 (non-binding) Launched against a pseudo-distributed YARN cluster running Hadoop 2.6.0 and ran some jobs. -Sandy On Sat, May 30, 2015 at 3:44 PM, Krishna Sankar ksanka...@gmail.com wrote: +1 (non-binding, of course) 1. Compiled OSX 10.10 (Yosemite) OK Total time: 17:07 min mvn clean package -Pyarn -Dyarn.version=2.6.0 -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests 2. Tested pyspark, mlib - running as well as compare results with 1.3.1 2.1. statistics (min,max,mean,Pearson,Spearman) OK 2.2. Linear/Ridge/Laso Regression OK 2.3. Decision Tree, Naive Bayes OK 2.4. KMeans OK Center And Scale OK 2.5. RDD operations OK State of the Union Texts - MapReduce, Filter,sortByKey (word count) 2.6. Recommendation (Movielens medium dataset ~1 M ratings) OK Model evaluation/optimization (rank, numIter, lambda) with itertools OK 3. Scala - MLlib 3.1. statistics (min,max,mean,Pearson,Spearman) OK 3.2. LinearRegressionWithSGD OK 3.3. Decision Tree OK 3.4. KMeans OK 3.5. Recommendation (Movielens medium dataset ~1 M ratings) OK 3.6. saveAsParquetFile OK 3.7. Read and verify the 4.3 save(above) - sqlContext.parquetFile, registerTempTable, sql OK 3.8. result = sqlContext.sql(SELECT OrderDetails.OrderID,ShipCountry,UnitPrice,Qty,Discount FROM Orders INNER JOIN OrderDetails ON Orders.OrderID = OrderDetails.OrderID) OK 4.0. Spark SQL from Python OK 4.1. result = sqlContext.sql(SELECT * from people WHERE State = 'WA') OK Cheers k/ On Fri, May 29, 2015 at 4:40 PM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc3 (commit dd109a8): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=dd109a8746ec07c7c83995890fc2c0cd7a693730 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.0] https://repository.apache.org/content/repositories/orgapachespark-1109/ [published as version: 1.4.0-rc3] https://repository.apache.org/content/repositories/orgapachespark-1110/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-docs/ Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Tuesday, June 02, at 00:32 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == What has changed since RC1 == Below is a list of bug fixes that went into this RC: http://s.apache.org/vN == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.4 QA period, so -1 votes should only occur for significant regressions from 1.3.1. Bugs already present in 1.3.X, minor regressions, or bugs related to new features will not block this release. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org