Github user hbutani commented on the pull request:

    https://github.com/apache/spark/pull/5604#issuecomment-96220742
  
    Hi,
    
    I spent some time on this patch. This is a good start.
    But there are several Semantic issues. And I have some
    comments/suggestions about the execution. Hope you don't mind my comments.
    
    Semantic Issues:
    1. The order by clause in windowing implies an ordering within each logical 
partition. Strictly speaking doing a global order(across all logical partitions 
within a physical partition, i.e. all rows that end up at the same node) is not 
quite the same; specially when you have rows with the same values(on the order 
expressions) across partitions. This way of implementing also forces you to 
read all the rows in the WindowAggregate execution before apply any 
aggregations. This precludes you from doing any 'streaming' style execution. 
Effectively you are repartitioning in WindowAggregate, albeit within each 
WindowAggregate invocation. More on this below, in Execution comments.
    
    2. 'Current Row' doesn't mean an offset of '0', for Range based windowing. 
It implies including Rows before or after (depending on the direction of the 
boundary) that have the same Order expression values.
    
    3. No 'partition clause' doesn't imply no partitioning; it implies that all 
rows should be treated as 1 partition.
    
    Execution comments/suggestions:
    1. You want to consider holding onto as few rows as possible. There are 
several things to consider:
      a. A relatively easy first step is to only read 1 logical partition at a 
time. But this would require the rows coming in are in the correct order.
      b. My suggestion would be to introduce UDAF level streaming as early as 
possible. This way for many common cases the memory footprint can be very low. 
See the work done in hive around 
'org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing' This 
also has a big performance impact; in hive we encountered  users who work with 
partitions of considerable size(10s of thousands); the O(n*n) straightforward  
way of computing window aggregates was a huge bottleneck.
    
    2. Why are you translating all Window invocations to HiveUDAF? This way you 
are taking a hit to go from spark to hive values and back. At least for the 
AggFunctions already in Spark,why not use them. 
    
    3. The Exchange Operator has a parameter to specify ordering within a 
partition. Is there a way to generate a physical plan that utilizes this. You 
wouldn't need a Sort operator above the Exchange then. (I am still learning 
about Spark-Sql, so I am not sure on the implication of this) 
    
    regards,
    Harish Butani.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to