Re: Dataframe's .drop in PySpark doesn't accept Column

2015-05-31 Thread Olivier Girardot
I understand the rational, but when you need to reference, for example when
using a join, some column which name is not unique, it can be confusing in
terms of API.
However I figured out that you can use a qualified name for the column
using the *other-dataframe.column_name* syntax, maybe we just need to
document this well...


Le dim. 31 mai 2015 à 12:18, 范文臣 cloud0...@163.com a écrit :

 `Column` in `DataFrame` is a general concept. `field1` is a column, `field
 + 1` is a column, `field1  field2` is also a column. For API like
 `select`, it should accept `Column` as we need general expressions. But for
 `drop`, we can only drop exist columns which is not general expression. So
 I think it makes sense to only allow String in `drop` as column name.



 At 2015-05-31 02:41:52, Reynold Xin r...@databricks.com wrote:

 Name resolution is not as easy I think.  Wenchen can maybe give you some
 advice on resolution about this one.


 On Sat, May 30, 2015 at 9:37 AM, Yijie Shen henry.yijies...@gmail.com
 wrote:

 I think just match the Column’s expr as UnresolvedAttribute and use
 UnresolvedAttribute’s name to match schema’s field name is enough.

 Seems no need to regard expr as a more general one. :)

 On May 30, 2015 at 11:14:05 PM, Girardot Olivier (
 o.girar...@lateral-thoughts.com) wrote:

 Jira done : https://issues.apache.org/jira/browse/SPARK-7969
 I've already started working on it but it's less trivial than it seems
 because I don't exactly now the inner workings of the catalog,
 and how to get the qualified name of a column to match it against the
 schema/catalog.

 Regards,

 Olivier.

  Le sam. 30 mai 2015 à 09:54, Reynold Xin r...@databricks.com a écrit :

 Yea would be great to support a Column. Can you create a JIRA, and
 possibly a pull request?


 On Fri, May 29, 2015 at 2:45 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Actually, the Scala API too is only based on column name

  Le ven. 29 mai 2015 à 11:23, Olivier Girardot 
 o.girar...@lateral-thoughts.com a écrit :

 Hi,
 Testing a bit more 1.4, it seems that the .drop() method in PySpark
 doesn't seem to accept a Column as input datatype :


 *.join(only_the_best, only_the_best.pol_no == df.pol_no,
 inner).drop(only_the_best.pol_no)\* File
 /usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py, line
 1225, in drop
 jdf = self._jdf.drop(colName)
 File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py,
 line 523, in __call__
 (new_args, temp_args) = self._get_args(args)
 File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py,
 line 510, in _get_args
 temp_arg = converter.convert(arg, self.gateway_client)
 File
 /usr/local/lib/python2.7/site-packages/py4j/java_collections.py, line
 490, in convert
 for key in object.keys():
 TypeError: 'Column' object is not callable

 It doesn't seem very consistent with rest of the APIs - and is
 especially annoying when executing joins - because drop(my_key) is not a
 qualified reference to the column.

 What do you think about changing that ? or what is the best practice
 as a workaround ?

 Regards,

 Olivier.






Re: Catalyst: Reusing already computed expressions within a projection

2015-05-31 Thread Justin Uang
Thanks for pointing to that link! It looks like it’s useful, but it does
look more complicated than the case I’m trying to address.

In my case, we set y = f(x), then we use y later on in future projections (z
= g(y)). In that case, the analysis is trivial in that we aren’t trying to
find equivalent expressions, we actually know that z is based off of y. In
addition, we are already storing off y because it’s one of the projections,
so there’s no tradeoff between time vs memory.
Perf gains

I believe that the performance gains can be quite substantial, but can you
check that the case I bring up below will indeed benefit from such a
optimization?

For example, suppose I have a date column (unclean_date) that is stored in
some strange string format. I then use an udf or a hive function that
converts it to the Catalyst date type (cleaned_date). Next, I want to
extract one column with the month, and another with the year, so I can do
groupBys/aggregations later. Currently, every projection/expression based
off of the cleaned_date will have to do the expensive parsing again if I
avoid caching and prefer to do everything in one pass.
Code generation phase vs optimization

Is there a reason why doing it at the optimization phase is the wrong
approach? If sounds like we’re actually logically changing the order of
computation if we do my proposed approach. I do agree however if there are
lower hanging fruits, then we should tackle those first =)
​

On Sat, May 30, 2015 at 10:00 PM Michael Armbrust mich...@databricks.com
wrote:

 I think this is likely something that we'll want to do during the code
 generation phase.  Though its probably not the lowest hanging fruit at this
 point.

 On Sun, May 31, 2015 at 5:02 AM, Reynold Xin r...@databricks.com wrote:

 I think you are looking for
 http://en.wikipedia.org/wiki/Common_subexpression_elimination in the
 optimizer.

 One thing to note is that as we do more and more optimization like this,
 the optimization time might increase. Do you see a case where this can
 bring you substantial performance gains?


 On Sat, May 30, 2015 at 9:02 AM, Justin Uang justin.u...@gmail.com
 wrote:

 On second thought, perhaps can this be done by writing a rule that
 builds the dag of dependencies between expressions, then convert it into
 several layers of projections, where each new layer is allowed to depend on
 expression results from previous projections?

 Are there any pitfalls to this approach?

 On Sat, May 30, 2015 at 11:30 AM Justin Uang justin.u...@gmail.com
 wrote:

 If I do the following

 df2 = df.withColumn('y', df['x'] * 7)
 df3 = df2.withColumn('z', df2.y * 3)
 df3.explain()

 Then the result is

  Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS
 y#64,((x#59 * 7.0) AS y#64 * 3.0) AS z#65]
   PhysicalRDD [date#56,id#57,timestamp#58,x#59],
 MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163

 Effectively I want to compute

 y = f(x)
 z = g(y)

 The catalyst optimizer realizes that y#64 is the same as the one
 previously computed, however, when building the projection, it is ignoring
 the fact that it had already computed y, so it calculates `x * 7` twice.

 y = x * 7
 z = x * 7 * 3

 If I wanted to make this fix, would it be possible to do the logic in
 the optimizer phase? I imagine that it's difficult because the expressions
 in InterpretedMutableProjection don't have access to the previous
 expression results, only the input row, and that the design doesn't seem to
 be catered for this.






Re: Catalyst: Reusing already computed expressions within a projection

2015-05-31 Thread Reynold Xin
I think Michael's bringing up code gen because the compiler (not Spark, but
javac and JVM JIT) already does common subexpression elimination, so we
might get it for free during code gen.


On Sun, May 31, 2015 at 11:48 AM, Justin Uang justin.u...@gmail.com wrote:

 Thanks for pointing to that link! It looks like it’s useful, but it does
 look more complicated than the case I’m trying to address.

 In my case, we set y = f(x), then we use y later on in future projections
 (z = g(y)). In that case, the analysis is trivial in that we aren’t
 trying to find equivalent expressions, we actually know that z is based
 off of y. In addition, we are already storing off y because it’s one of
 the projections, so there’s no tradeoff between time vs memory.
 Perf gains

 I believe that the performance gains can be quite substantial, but can you
 check that the case I bring up below will indeed benefit from such a
 optimization?

 For example, suppose I have a date column (unclean_date) that is stored
 in some strange string format. I then use an udf or a hive function that
 converts it to the Catalyst date type (cleaned_date). Next, I want to
 extract one column with the month, and another with the year, so I can do
 groupBys/aggregations later. Currently, every projection/expression based
 off of the cleaned_date will have to do the expensive parsing again if I
 avoid caching and prefer to do everything in one pass.
 Code generation phase vs optimization

 Is there a reason why doing it at the optimization phase is the wrong
 approach? If sounds like we’re actually logically changing the order of
 computation if we do my proposed approach. I do agree however if there are
 lower hanging fruits, then we should tackle those first =)
 ​

 On Sat, May 30, 2015 at 10:00 PM Michael Armbrust mich...@databricks.com
 wrote:

 I think this is likely something that we'll want to do during the code
 generation phase.  Though its probably not the lowest hanging fruit at this
 point.

 On Sun, May 31, 2015 at 5:02 AM, Reynold Xin r...@databricks.com wrote:

 I think you are looking for
 http://en.wikipedia.org/wiki/Common_subexpression_elimination in the
 optimizer.

 One thing to note is that as we do more and more optimization like this,
 the optimization time might increase. Do you see a case where this can
 bring you substantial performance gains?


 On Sat, May 30, 2015 at 9:02 AM, Justin Uang justin.u...@gmail.com
 wrote:

 On second thought, perhaps can this be done by writing a rule that
 builds the dag of dependencies between expressions, then convert it into
 several layers of projections, where each new layer is allowed to depend on
 expression results from previous projections?

 Are there any pitfalls to this approach?

 On Sat, May 30, 2015 at 11:30 AM Justin Uang justin.u...@gmail.com
 wrote:

 If I do the following

 df2 = df.withColumn('y', df['x'] * 7)
 df3 = df2.withColumn('z', df2.y * 3)
 df3.explain()

 Then the result is

  Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS
 y#64,((x#59 * 7.0) AS y#64 * 3.0) AS z#65]
   PhysicalRDD [date#56,id#57,timestamp#58,x#59],
 MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163

 Effectively I want to compute

 y = f(x)
 z = g(y)

 The catalyst optimizer realizes that y#64 is the same as the one
 previously computed, however, when building the projection, it is ignoring
 the fact that it had already computed y, so it calculates `x * 7` twice.

 y = x * 7
 z = x * 7 * 3

 If I wanted to make this fix, would it be possible to do the logic in
 the optimizer phase? I imagine that it's difficult because the expressions
 in InterpretedMutableProjection don't have access to the previous
 expression results, only the input row, and that the design doesn't seem 
 to
 be catered for this.






Re: [VOTE] Release Apache Spark 1.4.0 (RC3)

2015-05-31 Thread Sandy Ryza
+1 (non-binding)

Launched against a pseudo-distributed YARN cluster running Hadoop 2.6.0 and
ran some jobs.

-Sandy

On Sat, May 30, 2015 at 3:44 PM, Krishna Sankar ksanka...@gmail.com wrote:

 +1 (non-binding, of course)

 1. Compiled OSX 10.10 (Yosemite) OK Total time: 17:07 min
  mvn clean package -Pyarn -Dyarn.version=2.6.0 -Phadoop-2.4
 -Dhadoop.version=2.6.0 -DskipTests
 2. Tested pyspark, mlib - running as well as compare results with 1.3.1
 2.1. statistics (min,max,mean,Pearson,Spearman) OK
 2.2. Linear/Ridge/Laso Regression OK
 2.3. Decision Tree, Naive Bayes OK
 2.4. KMeans OK
Center And Scale OK
 2.5. RDD operations OK
   State of the Union Texts - MapReduce, Filter,sortByKey (word count)
 2.6. Recommendation (Movielens medium dataset ~1 M ratings) OK
Model evaluation/optimization (rank, numIter, lambda) with
 itertools OK
 3. Scala - MLlib
 3.1. statistics (min,max,mean,Pearson,Spearman) OK
 3.2. LinearRegressionWithSGD OK
 3.3. Decision Tree OK
 3.4. KMeans OK
 3.5. Recommendation (Movielens medium dataset ~1 M ratings) OK
 3.6. saveAsParquetFile OK
 3.7. Read and verify the 4.3 save(above) - sqlContext.parquetFile,
 registerTempTable, sql OK
 3.8. result = sqlContext.sql(SELECT
 OrderDetails.OrderID,ShipCountry,UnitPrice,Qty,Discount FROM Orders INNER
 JOIN OrderDetails ON Orders.OrderID = OrderDetails.OrderID) OK
 4.0. Spark SQL from Python OK
 4.1. result = sqlContext.sql(SELECT * from people WHERE State = 'WA') OK

 Cheers
 k/

 On Fri, May 29, 2015 at 4:40 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 Please vote on releasing the following candidate as Apache Spark version
 1.4.0!

 The tag to be voted on is v1.4.0-rc3 (commit dd109a8):

 https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=dd109a8746ec07c7c83995890fc2c0cd7a693730

 The release files, including signatures, digests, etc. can be found at:
 http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-bin/

 Release artifacts are signed with the following key:
 https://people.apache.org/keys/committer/pwendell.asc

 The staging repository for this release can be found at:
 [published as version: 1.4.0]
 https://repository.apache.org/content/repositories/orgapachespark-1109/
 [published as version: 1.4.0-rc3]
 https://repository.apache.org/content/repositories/orgapachespark-1110/

 The documentation corresponding to this release can be found at:
 http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-docs/

 Please vote on releasing this package as Apache Spark 1.4.0!

 The vote is open until Tuesday, June 02, at 00:32 UTC and passes
 if a majority of at least 3 +1 PMC votes are cast.

 [ ] +1 Release this package as Apache Spark 1.4.0
 [ ] -1 Do not release this package because ...

 To learn more about Apache Spark, please see
 http://spark.apache.org/

 == What has changed since RC1 ==
 Below is a list of bug fixes that went into this RC:
 http://s.apache.org/vN

 == How can I help test this release? ==
 If you are a Spark user, you can help us test this release by
 taking a Spark 1.3 workload and running on this release candidate,
 then reporting any regressions.

 == What justifies a -1 vote for this release? ==
 This vote is happening towards the end of the 1.4 QA period,
 so -1 votes should only occur for significant regressions from 1.3.1.
 Bugs already present in 1.3.X, minor regressions, or bugs related
 to new features will not block this release.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org