Re: Serialization or internal functions?

2020-04-09 Thread Vadim Semenov
You can take a look at the code that Spark generates:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.debug.codegenString

val spark: SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._

val data = Seq("A","b","c").toDF("col")
data.write.parquet("/tmp/data")

val df = spark.read.parquet("/tmp/data")

val df1 = df.withColumn("valueconcat", concat(col(data.columns.head),
lit(" "), lit("concat"))).select("valueconcat")
println(codegenString(df1.queryExecution.executedPlan))

val df2 = df.map(e=> s"$e concat")
println(codegenString(df2.queryExecution.executedPlan))


It shows that for the df1 it internally uses
org.apache.spark.unsafe.types.UTF8String#concat vs
deserialization/serialization of the map function in the df2

Using spark native functions in most cases is the most effective way
in terms of performance

On Sat, Apr 4, 2020 at 2:07 PM  wrote:
>
> Dear Community,
>
>
>
> Recently, I had to solve the following problem “for every entry of a 
> Dataset[String], concat a constant value” , and to solve it, I used built-in 
> functions :
>
>
>
> val data = Seq("A","b","c").toDS
>
>
>
> scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit(" 
> "),lit("concat"))).select("valueconcat").explain()
>
> == Physical Plan ==
>
> LocalTableScan [valueconcat#161]
>
>
>
> As an alternative , a much simpler version of the program is to use map, but 
> it adds a serialization step that does not seem to be present for the version 
> above :
>
>
>
> scala> data.map(e=> s"$e concat").explain
>
> == Physical Plan ==
>
> *(1) SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
> java.lang.String, true], true, false) AS value#92]
>
> +- *(1) MapElements , obj#91: java.lang.String
>
>+- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String
>
>   +- LocalTableScan [value#12]
>
>
>
> Is this over-optimization or is this the right way to go?
>
>
>
> As a follow up , is there any better API to get the one and only column 
> available in a DataSet[String] when using built-in functions? 
> “col(data.columns.head)” works but it is not ideal.
>
>
>
> Thanks!



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Serialization or internal functions?

2020-04-07 Thread Som Lima
Go to localhost:4040

While sparksession is running.

Go to localhost:4040

Select Stages from menu option.

Select Job you are interested in.


You can select additional metrics

Including  DAG visualisation.





On Tue, 7 Apr 2020, 17:14 yeikel valdes,  wrote:

> Thanks for your input Soma , but I am actually looking to understand the
> differences and not only on the performance.
>
>  On Sun, 05 Apr 2020 02:21:07 -0400 * somplastic...@gmail.com
>  * wrote 
>
> If you want to  measure optimisation in terms of time taken , then here is
> an idea  :)
>
>
> public class MyClass {
> public static void main(String args[])
> throws InterruptedException
> {
>   long start  =  System.currentTimeMillis();
>
> // replace with your add column code
> // enough data to measure
>Thread.sleep(5000);
>
>  long end  = System.currentTimeMillis();
>
>int timeTaken = 0;
>   timeTaken = (int) (end  - start );
>
>   System.out.println("Time taken  " + timeTaken) ;
> }
> }
>
> On Sat, 4 Apr 2020, 19:07 ,  wrote:
>
> Dear Community,
>
>
>
> Recently, I had to solve the following problem “for every entry of a
> Dataset[String], concat a constant value” , and to solve it, I used
> built-in functions :
>
>
>
> val data = Seq("A","b","c").toDS
>
>
>
> scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit("
> "),lit("concat"))).select("valueconcat").explain()
>
> == Physical Plan ==
>
> LocalTableScan [valueconcat#161]
>
>
>
> As an alternative , a much simpler version of the program is to use map,
> but it adds a serialization step that does not seem to be present for the
> version above :
>
>
>
> scala> data.map(e=> s"$e concat").explain
>
> == Physical Plan ==
>
> *(1) SerializeFromObject [staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
> java.lang.String, true], true, false) AS value#92]
>
> +- *(1) MapElements , obj#91: java.lang.String
>
>+- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String
>
>   +- LocalTableScan [value#12]
>
>
>
> Is this over-optimization or is this the right way to go?
>
>
>
> As a follow up , is there any better API to get the one and only column
> available in a DataSet[String] when using built-in functions?
> “col(data.columns.head)” works but it is not ideal.
>
>
>
> Thanks!
>
>
>


Re: Serialization or internal functions?

2020-04-07 Thread yeikel valdes
Thanks for your input Soma , but I am actually looking to understand the 
differences and not only on the performance. 


 On Sun, 05 Apr 2020 02:21:07 -0400 somplastic...@gmail.com wrote 


If you want to  measure optimisation in terms of time taken , then here is an 
idea  :)  




public class MyClass {
    public static void main(String args[]) 
    throws InterruptedException
    {
          long start  =  System.currentTimeMillis();
      
// replace with your add column code
// enough data to measure 
       Thread.sleep(5000);
  
     long end  = System.currentTimeMillis();
     
       int timeTaken = 0;
      timeTaken = (int) (end  - start );


      System.out.println("Time taken  " + timeTaken) ;
    }
}


On Sat, 4 Apr 2020, 19:07 ,  wrote:


Dear Community,

 

Recently, I had to solve the following problem “for every entry of a 
Dataset[String], concat a constant value” , and to solve it, I used built-in 
functions :

 

val data = Seq("A","b","c").toDS

 

scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit(" 
"),lit("concat"))).select("valueconcat").explain()

== Physical Plan ==

LocalTableScan [valueconcat#161]

 

As an alternative , a much simpler version of the program is to use map, but it 
adds a serialization step that does not seem to be present for the version 
above :

 

scala> data.map(e=> s"$e concat").explain

== Physical Plan ==

*(1) SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#92]

+- *(1) MapElements , obj#91: java.lang.String

   +- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String

  +- LocalTableScan [value#12]

 

Is this over-optimization or is this the right way to go?  

 

As a follow up , is there any better API to get the one and only column 
available in a DataSet[String] when using built-in functions? 
“col(data.columns.head)” works but it is not ideal.

 

Thanks!

Re: Serialization or internal functions?

2020-04-05 Thread Som Lima
If you want to  measure optimisation in terms of time taken , then here is
an idea  :)


public class MyClass {
public static void main(String args[])
throws InterruptedException
{
  long start  =  System.currentTimeMillis();

// replace with your add column code
// enough data to measure
   Thread.sleep(5000);

 long end  = System.currentTimeMillis();

   int timeTaken = 0;
  timeTaken = (int) (end  - start );

  System.out.println("Time taken  " + timeTaken) ;
}
}

On Sat, 4 Apr 2020, 19:07 ,  wrote:

> Dear Community,
>
>
>
> Recently, I had to solve the following problem “for every entry of a
> Dataset[String], concat a constant value” , and to solve it, I used
> built-in functions :
>
>
>
> val data = Seq("A","b","c").toDS
>
>
>
> scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit("
> "),lit("concat"))).select("valueconcat").explain()
>
> == Physical Plan ==
>
> LocalTableScan [valueconcat#161]
>
>
>
> As an alternative , a much simpler version of the program is to use map,
> but it adds a serialization step that does not seem to be present for the
> version above :
>
>
>
> scala> data.map(e=> s"$e concat").explain
>
> == Physical Plan ==
>
> *(1) SerializeFromObject [staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
> java.lang.String, true], true, false) AS value#92]
>
> +- *(1) MapElements , obj#91: java.lang.String
>
>+- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String
>
>   +- LocalTableScan [value#12]
>
>
>
> Is this over-optimization or is this the right way to go?
>
>
>
> As a follow up , is there any better API to get the one and only column
> available in a DataSet[String] when using built-in functions?
> “col(data.columns.head)” works but it is not ideal.
>
>
>
> Thanks!
>


Serialization or internal functions?

2020-04-04 Thread email
Dear Community, 

 

Recently, I had to solve the following problem "for every entry of a
Dataset[String], concat a constant value" , and to solve it, I used built-in
functions : 

 

val data = Seq("A","b","c").toDS

 

scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit("
"),lit("concat"))).select("valueconcat").explain()

== Physical Plan ==

LocalTableScan [valueconcat#161]

 

As an alternative , a much simpler version of the program is to use map, but
it adds a serialization step that does not seem to be present for the
version above : 

 

scala> data.map(e=> s"$e concat").explain

== Physical Plan ==

*(1) SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true, false) AS value#92]

+- *(1) MapElements , obj#91: java.lang.String

   +- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String

  +- LocalTableScan [value#12]

 

Is this over-optimization or is this the right way to go?  

 

As a follow up , is there any better API to get the one and only column
available in a DataSet[String] when using built-in functions?
"col(data.columns.head)" works but it is not ideal.

 

Thanks!