GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/21126

    [SPARK-24050][SS] Calculate input / processing rates correctly for 
DataSourceV2 streaming sources

    ## What changes were proposed in this pull request?
    
    In some streaming queries, the input and processing rates are not 
calculated at all (shows up as zero) because MicroBatchExecution fails to 
associated metrics from the executed plan of a trigger with the sources in the 
logical plan of the trigger. The way this executed-plan-leaf-to-logical-source 
attribution works is as follows. With V1 sources, there was no way to identify 
which execution plan leaves were generated by a streaming source. So did a 
best-effort attempt to match logical and execution plan leaves when the number 
of leaves were same. In cases where the number of leaves is different, we just 
give up and report zero rates. An example where this may happen is as follows.
    
    ```
    val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
    val streamingInputDF = ...
    
    val query = streamingInputDF.join(cachedStaticDF).writeStream....
    ```
    In this case, the `cachedStaticDF` has multiple logical leaves, but in the 
trigger's execution plan it only has leaf because a cached subplan is 
represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in 
the number of leaves causing the input rates to be computed as zero. 
    
    With DataSourceV2, all inputs are represented in the executed plan using 
`DataSourceV2ScanExec`, each of which has a reference to the associated logical 
`DataSource` and `DataSourceReader`. So its easy to associate the metrics to 
the original streaming sources.
    
    In this PR, the solution is as follows. If all the streaming sources in a 
streaming query as v2 sources, then use a new code path where the 
execution-metrics-to-source mapping is done directly. Otherwise we fall back to 
existing mapping logic.
    
    ## How was this patch tested?
    - New unit tests using V2 memory source
    - Existing unit tests using V1 source


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-24050

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21126.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21126
    
----
commit d485db8ec70a8bd8e2fff488e75be08d384ceef0
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-04-23T06:20:14Z

    SPARK-24050

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to