Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11301#discussion_r62431691
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
    @@ -37,6 +39,16 @@ private[sql] object Column {
       def apply(expr: Expression): Column = new Column(expr)
     
       def unapply(col: Column): Option[Expression] = Some(col.expr)
    +
    +  @scala.annotation.varargs
    +  def updateExpressionsOrigin(cols: Column*): Unit = {
    +    // Update Expression.origin using the callSite of an operation
    +    val callSite = org.apache.spark.util.Utils.getCallSite().shortForm
    +    cols.map(col => col.expr.foreach(e => e.origin.callSite = 
Some(callSite)))
    +    // Update CurrentOrigin for setting origin for LogicalPlan node
    +    CurrentOrigin.set(
    +      Origin(Some(callSite), CurrentOrigin.get.line, 
CurrentOrigin.get.startPosition))
    --- End diff --
    
    I think it's better to set CurrentOrigin directly because origin state 
changes globally under the same thread.
    Otherwise, we have some problems.
    
    1. call sites associated with different queries can be scrambled.
    
    For example, let's say we execute following 2 queries. One is filter and 
other is orderBy.
    
    ```
    val df = sc.parallelize(1 to 10, 1).toDF
    df.filter($"value" + 10 > 4).show        // query1 filter
    df.orderBy($"value" + 13).show        // query2 orderBy
    ```
    
    One of the code generated for query2 is as follows.
    query2 don't have filter operation but the call site includes `@ filter`, 
it's related to previous query (query1).
    
    ```
    /* 006 */ class SpecificOrdering extends 
org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering {
    /* 007 */   
    
    ...
    
    /* 024 */       /* (input[0, int] + 13) @ orderBy at <console>:27 */
    /* 025 */       /* input[0, int] @ filter at <console>:27 */
    /* 026 */       int value1 = i.getInt(0);
    ```
    
    2.  call site should not be associated with `BaseProjection`. It's not 
related to each expression directly.
    
    When we write queries without column objects like `df.filter("value + 10 > 
3"), call sites are not associated with `BaseProjection`.
    
    ```
    /* 006 */ class SpecificSafeProjection extends 
org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
    /* 007 */   
    /* 008 */   private Object[] references;
    /* 009 */   private MutableRow mutableRow;
    /* 010 */   private Object[] values;
    /* 011 */   private org.apache.spark.sql.types.StructType schema;
    /* 012 */   
    /* 013 */   
    /* 014 */   public SpecificSafeProjection(Object[] references) {
    /* 015 */     this.references = references;
    /* 016 */     mutableRow = (MutableRow) references[references.length - 1];
    /* 017 */     
    /* 018 */     this.schema = (org.apache.spark.sql.types.StructType) 
references[0];
    /* 019 */   }
    /* 020 */   
    /* 021 */   public java.lang.Object apply(java.lang.Object _i) {
    /* 022 */     InternalRow i = (InternalRow) _i;
    /* 023 */     /* createexternalrow(if (isnull(input[0, int])) null else 
input[0, int], StructField(value,IntegerType,false)) */
    /* 024 */     values = new Object[1];
    /* 025 */     /* if (isnull(input[0, int])) null else input[0, int] */
    /* 026 */     /* isnull(input[0, int]) */
    /* 027 */     /* input[0, int] */
    /* 028 */     int value3 = i.getInt(0);
    /* 029 */     boolean isNull1 = false;
    /* 030 */     int value1 = -1;
    /* 031 */     if (!false && false) {
    /* 032 */       /* null */
    /* 033 */       final int value4 = -1;
    /* 034 */       isNull1 = true;
    /* 035 */       value1 = value4;
    /* 036 */     } else {
    /* 037 */       /* input[0, int] */
    /* 038 */       int value5 = i.getInt(0);
    /* 039 */       isNull1 = false;
    /* 040 */       value1 = value5;
    /* 041 */     }
    /* 042 */     if (isNull1) {
    /* 043 */       values[0] = null;
    /* 044 */     } else {
    /* 045 */       values[0] = value1;
    /* 046 */     }
    /* 047 */     
    /* 048 */     final org.apache.spark.sql.Row value = new 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, 
this.schema);
    /* 049 */     if (false) {
    /* 050 */       mutableRow.setNullAt(0);
    /* 051 */     } else {
    /* 052 */       
    /* 053 */       mutableRow.update(0, value);
    /* 054 */     }
    /* 055 */     
    /* 056 */     return mutableRow;
    /* 057 */   }
    /* 058 */ }
    /* 059 */ 
    ```
    
    But if we write queries with column objects, call sites are associated with 
`BaseProjection`.
    
    ```
    /* 006 */ class SpecificSafeProjection extends 
org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
    /* 007 */   
    /* 008 */   private Object[] references;
    /* 009 */   private MutableRow mutableRow;
    /* 010 */   private Object[] values;
    /* 011 */   private org.apache.spark.sql.types.StructType schema;
    /* 012 */   
    /* 013 */   
    /* 014 */   public SpecificSafeProjection(Object[] references) {
    /* 015 */     this.references = references;
    /* 016 */     mutableRow = (MutableRow) references[references.length - 1];
    /* 017 */     
    /* 018 */     this.schema = (org.apache.spark.sql.types.StructType) 
references[0];
    /* 019 */   }
    /* 020 */   
    /* 021 */   public java.lang.Object apply(java.lang.Object _i) {
    /* 022 */     InternalRow i = (InternalRow) _i;
    /* 023 */     /* createexternalrow(if (isnull(input[0, int])) null else 
input[0, int], StructField(value,IntegerType,false)) @ filter at <console... */
    /* 024 */     values = new Object[1];
    /* 025 */     /* if (isnull(input[0, int])) null else input[0, int] @ 
filter at <console>:27 */
    /* 026 */     /* isnull(input[0, int]) @ filter at <console>:27 */
    /* 027 */     /* input[0, int] @ filter at <console>:27 */
    /* 028 */     int value3 = i.getInt(0);
    /* 029 */     boolean isNull1 = false;
    /* 030 */     int value1 = -1;
    /* 031 */     if (!false && false) {
    /* 032 */       /* null @ filter at <console>:27 */
    /* 033 */       final int value4 = -1;
    /* 034 */       isNull1 = true;
    /* 035 */       value1 = value4;
    /* 036 */     } else {
    /* 037 */       /* input[0, int] @ filter at <console>:27 */
    /* 038 */       int value5 = i.getInt(0);
    /* 039 */       isNull1 = false;
    /* 040 */       value1 = value5;
    /* 041 */     }
    /* 042 */     if (isNull1) {
    /* 043 */       values[0] = null;
    /* 044 */     } else {
    /* 045 */       values[0] = value1;
    /* 046 */     }
    /* 047 */     
    /* 048 */     final org.apache.spark.sql.Row value = new 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, 
this.schema);
    /* 049 */     if (false) {
    /* 050 */       mutableRow.setNullAt(0);
    /* 051 */     } else {
    /* 052 */       
    /* 053 */       mutableRow.update(0, value);
    /* 054 */     }
    /* 055 */     
    /* 056 */     return mutableRow;
    /* 057 */   }
    /* 058 */ }
    /* 059 */ 
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to