Re: [SQL] Unresolved reference with chained window functions.

2017-03-24 Thread Herman van Hövell tot Westerflier
This is definitely a bug in the CollapseWindow optimizer rule. I think we
can use SPARK-20086  to
track this.

On Fri, Mar 24, 2017 at 9:28 PM, Maciej Szymkiewicz 
wrote:

> Forwarded from SO (http://stackoverflow.com/q/43007433). Looks like
> regression compared to 2.0.2.
>
> scala> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.expressions.Window
>
> scala> val win_spec_max =
> Window.partitionBy("x").orderBy("AmtPaid").rowsBetween(Window.
> unboundedPreceding,
> 0)
> win_spec_max: org.apache.spark.sql.expressions.WindowSpec =
> org.apache.spark.sql.expressions.WindowSpec@3433e418
>
> scala> val df = Seq((1, 2.0), (1, 3.0), (1, 1.0), (1, -2.0), (1,
> -1.0)).toDF("x", "AmtPaid")
> df: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double]
>
> scala> val df_with_sum = df.withColumn("AmtPaidCumSum",
> sum(col("AmtPaid")).over(win_spec_max))
> df_with_sum: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double
> ... 1 more field]
>
> scala> val df_with_max = df_with_sum.withColumn("AmtPaidCumSumMax",
> max(col("AmtPaidCumSum")).over(win_spec_max))
> df_with_max: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double
> ... 2 more fields]
>
> scala> df_with_max.explain
> == Physical Plan ==
> !Window [sum(AmtPaid#361) windowspecdefinition(x#360, AmtPaid#361 ASC
> NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
> AmtPaidCumSum#366, max(AmtPaidCumSum#366) windowspecdefinition(x#360,
> AmtPaid#361 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW) AS AmtPaidCumSumMax#372], [x#360], [AmtPaid#361 ASC NULLS
> FIRST]
> +- *Sort [x#360 ASC NULLS FIRST, AmtPaid#361 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(x#360, 200)
>   +- LocalTableScan [x#360, AmtPaid#361]
>
> scala> df_with_max.printSchema
> root
>  |-- x: integer (nullable = false)
>  |-- AmtPaid: double (nullable = false)
>  |-- AmtPaidCumSum: double (nullable = true)
>  |-- AmtPaidCumSumMax: double (nullable = true)
>
> scala> df_with_max.show
> 17/03/24 21:22:32 ERROR Executor: Exception in task 0.0 in stage 19.0
> (TID 234)
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding
> attribute, tree: AmtPaidCumSum#366
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>...
> Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#366
> in [sum#385,max#386,x#360,AmtPaid#361]
>...
>
> Is it a known issue or do we need a JIRA?
>
> --
> Best,
> Maciej Szymkiewicz
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 

Herman van Hövell

Software Engineer

Databricks Inc.

hvanhov...@databricks.com

+31 6 420 590 27

databricks.com

[image: http://databricks.com] 


[image: Join Databricks at Spark Summit 2017 in San Francisco, the world's
largest event for the Apache Spark community.] 


[SQL] Unresolved reference with chained window functions.

2017-03-24 Thread Maciej Szymkiewicz
Forwarded from SO (http://stackoverflow.com/q/43007433). Looks like
regression compared to 2.0.2.

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val win_spec_max =
Window.partitionBy("x").orderBy("AmtPaid").rowsBetween(Window.unboundedPreceding,
0)
win_spec_max: org.apache.spark.sql.expressions.WindowSpec =
org.apache.spark.sql.expressions.WindowSpec@3433e418

scala> val df = Seq((1, 2.0), (1, 3.0), (1, 1.0), (1, -2.0), (1,
-1.0)).toDF("x", "AmtPaid")
df: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double]

scala> val df_with_sum = df.withColumn("AmtPaidCumSum",
sum(col("AmtPaid")).over(win_spec_max))
df_with_sum: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double
... 1 more field]

scala> val df_with_max = df_with_sum.withColumn("AmtPaidCumSumMax",
max(col("AmtPaidCumSum")).over(win_spec_max))
df_with_max: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double
... 2 more fields]

scala> df_with_max.explain
== Physical Plan ==
!Window [sum(AmtPaid#361) windowspecdefinition(x#360, AmtPaid#361 ASC
NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
AmtPaidCumSum#366, max(AmtPaidCumSum#366) windowspecdefinition(x#360,
AmtPaid#361 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW) AS AmtPaidCumSumMax#372], [x#360], [AmtPaid#361 ASC NULLS
FIRST]
+- *Sort [x#360 ASC NULLS FIRST, AmtPaid#361 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(x#360, 200)
  +- LocalTableScan [x#360, AmtPaid#361]

scala> df_with_max.printSchema
root
 |-- x: integer (nullable = false)
 |-- AmtPaid: double (nullable = false)
 |-- AmtPaidCumSum: double (nullable = true)
 |-- AmtPaidCumSumMax: double (nullable = true)

scala> df_with_max.show
17/03/24 21:22:32 ERROR Executor: Exception in task 0.0 in stage 19.0
(TID 234)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding
attribute, tree: AmtPaidCumSum#366
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
   ...
Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#366
in [sum#385,max#386,x#360,AmtPaid#361]
   ...

Is it a known issue or do we need a JIRA?

-- 
Best,
Maciej Szymkiewicz


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: RFC: deprecate SparkStatusTracker, remove JobProgressListener

2017-03-24 Thread Marcelo Vanzin
On Fri, Mar 24, 2017 at 1:18 PM, Ryan Blue  wrote:
> For the status tracker, I'd like to see it replaced with something better
> before deprecating it. I've been looking at it for implementing better
> feedback in notebooks, and it looks sufficient for that at the moment. Is
> there a reason to remove it other than that it differs from the rest API?

My original thinking was to eventually replace it with a programmatic
API that exposes all the REST API information. Basically the REST API
without the need to have an HTTP client.

That still might be worth it in the future (just to avoid having sort
of duplicate APIs), but at this point I'm happy with just changing the
backend of the status tracker to not use JobProgressListener.


-- 
Marcelo

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: RFC: deprecate SparkStatusTracker, remove JobProgressListener

2017-03-24 Thread Marcelo Vanzin
On Fri, Mar 24, 2017 at 1:13 PM, Josh Rosen  wrote:
> Let's not deprecate SparkStatusTracker (at least not until there's a
> suitable replacement that's just as easy to use). Deprecating it now would
> leave users with an unactionable warning and that's not great for users who
> try to keep their build warnings clean.

I'm not planning on changing that API right now; we can look at it
later if desired.

> If you want to be really proactive in notifying anyone who might be affected
> by JobProgressListener's removal we could mark JobProgressListener

I'll make a change for that later. It will add warnings in the Spark
build, but then we have a ton of those already...

-- 
Marcelo

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: RFC: deprecate SparkStatusTracker, remove JobProgressListener

2017-03-24 Thread Marcelo Vanzin
On Fri, Mar 24, 2017 at 12:07 PM, Josh Rosen  wrote:
> I think that it should be safe to remove JobProgressListener but I'd like to
> keep the SparkStatusTracker API.

Thanks Josh. I can work with that. My main concern would be keeping
the listener around.

Is it worth it to add a deprecated annotation to it?

-- 
Marcelo

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: RFC: deprecate SparkStatusTracker, remove JobProgressListener

2017-03-24 Thread Josh Rosen
I think that it should be safe to remove JobProgressListener but I'd like
to keep the SparkStatusTracker API.

SparkStatusTracker was originally developed to provide a stable
programmatic status API for use by Hive on Spark. SparkStatusTracker
predated the Spark REST APIs for status tracking which is why there's some
overlap of functionality between those APIs. Given that SparkStatusTracker
is a longstanding stable public API I'd prefer to not remove it because
there may be a lot of existing user code that depends on it. It's also a
relatively easy-to-support API because it presents a clean query
abstraction and doesn't expose mutable data structures via its public
interface, so we should be able to support this interface with an
implementation based on the new UI database.

JobProgressListener, on the other hand, has a messy interface which was not
designed for use by code outside of Spark. This interface was marked as
@DeveloperAPI as part of Spark 1.0 (see
https://github.com/apache/spark/pull/648) but I think that decision was a
mistake because the interface exposes mutable internal state. For example,
if a user wanted to query completed stages using JobProgressListener then
they would access a field declared as

  val completedStages = ListBuffer[StageInfo]()

which requires the user to explicitly synchronize on the
JobProgressListener instance in order to safely access this field. This is
a bad API and it's not really possible to cleanly present this same
interface with a database-backed implementation. In addition, this
interface has not been fully stable over time and there's currently no
public / DeveloperAPI mechanism to get access to the Spark-constructed
instance of JobProgressListener.

Given all of this this, I think that it's unlikely that users are relying
on JobProgressListener since Spark has other APIs for status tracking which
are more stable and easier to work with. If anyone is relying on this then
they could inline the JobProgressListener source in their own project and
instantiate and register the listener themselves.

Thus I think it's fine to remove JobProgressListener but think we should
keep SparkStatusTracker. I think that the decision of whether we want to
make a next-generation "V2" programmatic status API based on the REST API
types can happen later / independently.

On Thu, Mar 23, 2017 at 1:32 PM Marcelo Vanzin  wrote:

> Hello all,
>
> For those not following, I'm working on SPARK-18085, where my goal is
> to decouple the storage of UI data from the actual UI implementation.
> This is mostly targeted at the history server, so that it's possible
> to quickly load a "database" with UI information instead of the
> existing way of re-parsing event logs, but I think it also helps with
> the live UI, since it doesn't require storing UI information in memory
> and thus relieves some memory pressure on the driver. (I may still add
> an in-memory database in that project, but that's digressing from the
> topic at hand.)
>
> One of my (unwritten) goals in that project was to get rid of
> JobProgressListener. Now that I'm at a point where I can do that from
> the UI's p.o.v., I ran into SparkStatusTracker. So I'd like to get
> people's views on two topics.
>
> (i) deprecate SparkStatusTracker, provide a new API based on the
> public REST types.
>
> SparkStatusTracker provides yet another way of getting job, stage and
> executor information (aside from the UI and the API). It has its own
> types that model those, which are based on the existing UI types but
> not the same. It could be replaced by making REST calls to the UI
> endpoint, but that's sub-optimal since it doesn't make a lot of sense
> to do that when you already have an instance of SparkContext to play
> with.
>
> Since that's a public, stable API, it can't be removed right away. But
> I'd like to propose that we deprecate it, and provide a new API that
> is based on the REST types (which, with my work, are also used in the
> UI). The existing "SparkStatusTracker" would still exist until we can
> remove it, of course.
>
> What do people think about this approach? Another option is to not add
> the new API, but keep SparkStatusTracker around using the new UI
> database to back it.
>
> (ii) Remove JobProgressListener
>
> I didn't notice it before, but JobProgressListener is public-ish
> (@DeveloperApi). I'm not sure why that is, and it's a weird thing
> because it exposes non-public types (from UIData.scala) in its API.
> With the work I'm doing, and the above suggestion about
> SparkStatusTracker, JobProgressListener becomes unused in Spark
> itself, and keeping it would just mean the driver keeps using unneeded
> memory.
>
> Are there concerns about removing that class? Its functionality is
> available in both SparkStatusTracker and the REST API, so it's mostly
> redundant.
>
>
> So, thoughts?
>
>
> Note to self: (i) above means I'd have to scale back some of my goals
> for SPARK-18085. More 

Re: planning & discussion for larger scheduler changes

2017-03-24 Thread Reynold Xin
On Fri, Mar 24, 2017 at 4:41 PM, Imran Rashid  wrote:

> Kay and I were discussing some of the  bigger scheduler changes getting
> proposed lately, and realized there is a broader discussion to have with
> the community, outside of any single jira.  I'll start by sharing my
> initial thoughts, I know Kay has thoughts on this too, but it would be good
> to input from everyone.
>
> In particular, SPARK-14649 & SPARK-13669 have got me thinking.  These are
> proposed changes in behavior that are not fixes for *correctness* in fault
> tolerance, but to improve the performance when there faults.  The changes
> make some intuitive sense, but its also hard to judge whether they are
> necessarily better; its hard to verify the correctness of the changes; and
> its hard to even know that we haven't broken the old behavior (because of
> how brittle the scheduler seems to be).
>
> So I'm wondering:
>
> 1) in the short-term, can we find ways to get these changes merged, but
> turned off by default, in a way that we feel confident won't break existing
> code?
>


+1

For risky features that's how we often do it. Feature flag it and turn it
on later.



>
> 2) a bit longer-term -- should we be considering bigger rewrites to the
> scheduler?  Particularly, to improve testability?  eg., maybe if it was
> rewritten to more completely follow the actor model and eliminate shared
> state, the code would be cleaner and more testable.  Or maybe this is a
> crazy idea, and we'd just lose everything we'd learned so far and be stuck
> fixing the as many bugs in the new version.
>


This of course depends. Refactoring a large complicated piece of code is
one of the most challenging tasks in engineering. It is extremely difficult
to ensure things are correct even after that, especially in areas that
don't have amazing test coverage.


Re: Fwd: [SparkSQL] Project using NamedExpression

2017-03-24 Thread Aviral Agarwal
Hi ,
Can you please point me on how to resolve the expression ?
I was looking into LogicalPlan.Resolve expression() that takes a Partial
Function but I am not sure how to use that.

Thanks,
Aviral Agarwal

On Mar 24, 2017 09:20, "Liang-Chi Hsieh"  wrote:


Hi,

You need to resolve the expressions before passing into creating
UnsafeProjection.



Aviral Agarwal wrote
> Hi guys,
>
> I want transform Row using NamedExpression.
>
> Below is the code snipped that I am using :
>
>
> def apply(dataFrame: DataFrame, selectExpressions:
> java.util.List[String]): RDD[UnsafeRow] = {
>
> val exprArray = selectExpressions.map(s =>
>   Column(SqlParser.parseExpression(s)).named
> )
>
> val inputSchema = dataFrame.logicalPlan.output
>
> val transformedRDD = dataFrame.mapPartitions(
>   iter => {
> val project = UnsafeProjection.create(exprArray,inputSchema)
> iter.map{
>   row =>
> project(InternalRow.fromSeq(row.toSeq))
> }
> })
>
> transformedRDD
>   }
>
>
> The problem is that expression becomes unevaluable :
>
> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
> expression: 'a
> at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.
> genCode(Expression.scala:233)
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.g
> enCode(unresolved.scala:53)
> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
> n$gen$2.apply(Expression.scala:106)
> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu
> n$gen$2.apply(Expression.scala:102)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.sql.catalyst.expressions.Expression.gen(Exp
> ression.scala:102)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
> at scala.collection.mutable.ResizableArray$class.foreach(Resiza
> bleArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.
> scala:47)
> at scala.collection.TraversableLike$class.map(TraversableLike.
> scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon
> text.generateExpressions(CodeGenerator.scala:464)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.createCode(GenerateUnsafeProjection.scala:281)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:324)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:317)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn
> safeProjection$.create(GenerateUnsafeProjection.scala:32)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera
> tor.generate(CodeGenerator.scala:635)
> at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
> create(Projection.scala:125)
> at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.
> create(Projection.scala:135)
> at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
> ScalaTransform.scala:31)
> at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(
> ScalaTransform.scala:30)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
> apply$20.apply(RDD.scala:710)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
> apply$20.apply(RDD.scala:710)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
> DD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca
> la:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
> scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> This might be because the Expression is unresolved.
>
> Any help would be appreciated.
>
> Thanks and Regards,
> Aviral Agarwal





-
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-