Re: Serialization or internal functions?
You can take a look at the code that Spark generates: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.debug.codegenString val spark: SparkSession import org.apache.spark.sql.functions._ import spark.implicits._ val data = Seq("A","b","c").toDF("col") data.write.parquet("/tmp/data") val df = spark.read.parquet("/tmp/data") val df1 = df.withColumn("valueconcat", concat(col(data.columns.head), lit(" "), lit("concat"))).select("valueconcat") println(codegenString(df1.queryExecution.executedPlan)) val df2 = df.map(e=> s"$e concat") println(codegenString(df2.queryExecution.executedPlan)) It shows that for the df1 it internally uses org.apache.spark.unsafe.types.UTF8String#concat vs deserialization/serialization of the map function in the df2 Using spark native functions in most cases is the most effective way in terms of performance On Sat, Apr 4, 2020 at 2:07 PM wrote: > > Dear Community, > > > > Recently, I had to solve the following problem “for every entry of a > Dataset[String], concat a constant value” , and to solve it, I used built-in > functions : > > > > val data = Seq("A","b","c").toDS > > > > scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit(" > "),lit("concat"))).select("valueconcat").explain() > > == Physical Plan == > > LocalTableScan [valueconcat#161] > > > > As an alternative , a much simpler version of the program is to use map, but > it adds a serialization step that does not seem to be present for the version > above : > > > > scala> data.map(e=> s"$e concat").explain > > == Physical Plan == > > *(1) SerializeFromObject [staticinvoke(class > org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, > java.lang.String, true], true, false) AS value#92] > > +- *(1) MapElements , obj#91: java.lang.String > >+- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String > > +- LocalTableScan [value#12] > > > > Is this over-optimization or is this the right way to go? > > > > As a follow up , is there any better API to get the one and only column > available in a DataSet[String] when using built-in functions? > “col(data.columns.head)” works but it is not ideal. > > > > Thanks! -- Sent from my iPhone - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Serialization or internal functions?
Go to localhost:4040 While sparksession is running. Go to localhost:4040 Select Stages from menu option. Select Job you are interested in. You can select additional metrics Including DAG visualisation. On Tue, 7 Apr 2020, 17:14 yeikel valdes, wrote: > Thanks for your input Soma , but I am actually looking to understand the > differences and not only on the performance. > > On Sun, 05 Apr 2020 02:21:07 -0400 * somplastic...@gmail.com > * wrote > > If you want to measure optimisation in terms of time taken , then here is > an idea :) > > > public class MyClass { > public static void main(String args[]) > throws InterruptedException > { > long start = System.currentTimeMillis(); > > // replace with your add column code > // enough data to measure >Thread.sleep(5000); > > long end = System.currentTimeMillis(); > >int timeTaken = 0; > timeTaken = (int) (end - start ); > > System.out.println("Time taken " + timeTaken) ; > } > } > > On Sat, 4 Apr 2020, 19:07 , wrote: > > Dear Community, > > > > Recently, I had to solve the following problem “for every entry of a > Dataset[String], concat a constant value” , and to solve it, I used > built-in functions : > > > > val data = Seq("A","b","c").toDS > > > > scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit(" > "),lit("concat"))).select("valueconcat").explain() > > == Physical Plan == > > LocalTableScan [valueconcat#161] > > > > As an alternative , a much simpler version of the program is to use map, > but it adds a serialization step that does not seem to be present for the > version above : > > > > scala> data.map(e=> s"$e concat").explain > > == Physical Plan == > > *(1) SerializeFromObject [staticinvoke(class > org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, > java.lang.String, true], true, false) AS value#92] > > +- *(1) MapElements , obj#91: java.lang.String > >+- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String > > +- LocalTableScan [value#12] > > > > Is this over-optimization or is this the right way to go? > > > > As a follow up , is there any better API to get the one and only column > available in a DataSet[String] when using built-in functions? > “col(data.columns.head)” works but it is not ideal. > > > > Thanks! > > >
Re: Serialization or internal functions?
Thanks for your input Soma , but I am actually looking to understand the differences and not only on the performance. On Sun, 05 Apr 2020 02:21:07 -0400 somplastic...@gmail.com wrote If you want to measure optimisation in terms of time taken , then here is an idea :) public class MyClass { public static void main(String args[]) throws InterruptedException { long start = System.currentTimeMillis(); // replace with your add column code // enough data to measure Thread.sleep(5000); long end = System.currentTimeMillis(); int timeTaken = 0; timeTaken = (int) (end - start ); System.out.println("Time taken " + timeTaken) ; } } On Sat, 4 Apr 2020, 19:07 , wrote: Dear Community, Recently, I had to solve the following problem “for every entry of a Dataset[String], concat a constant value” , and to solve it, I used built-in functions : val data = Seq("A","b","c").toDS scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit(" "),lit("concat"))).select("valueconcat").explain() == Physical Plan == LocalTableScan [valueconcat#161] As an alternative , a much simpler version of the program is to use map, but it adds a serialization step that does not seem to be present for the version above : scala> data.map(e=> s"$e concat").explain == Physical Plan == *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#92] +- *(1) MapElements , obj#91: java.lang.String +- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String +- LocalTableScan [value#12] Is this over-optimization or is this the right way to go? As a follow up , is there any better API to get the one and only column available in a DataSet[String] when using built-in functions? “col(data.columns.head)” works but it is not ideal. Thanks!
Re: Serialization or internal functions?
If you want to measure optimisation in terms of time taken , then here is an idea :) public class MyClass { public static void main(String args[]) throws InterruptedException { long start = System.currentTimeMillis(); // replace with your add column code // enough data to measure Thread.sleep(5000); long end = System.currentTimeMillis(); int timeTaken = 0; timeTaken = (int) (end - start ); System.out.println("Time taken " + timeTaken) ; } } On Sat, 4 Apr 2020, 19:07 , wrote: > Dear Community, > > > > Recently, I had to solve the following problem “for every entry of a > Dataset[String], concat a constant value” , and to solve it, I used > built-in functions : > > > > val data = Seq("A","b","c").toDS > > > > scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit(" > "),lit("concat"))).select("valueconcat").explain() > > == Physical Plan == > > LocalTableScan [valueconcat#161] > > > > As an alternative , a much simpler version of the program is to use map, > but it adds a serialization step that does not seem to be present for the > version above : > > > > scala> data.map(e=> s"$e concat").explain > > == Physical Plan == > > *(1) SerializeFromObject [staticinvoke(class > org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, > java.lang.String, true], true, false) AS value#92] > > +- *(1) MapElements , obj#91: java.lang.String > >+- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String > > +- LocalTableScan [value#12] > > > > Is this over-optimization or is this the right way to go? > > > > As a follow up , is there any better API to get the one and only column > available in a DataSet[String] when using built-in functions? > “col(data.columns.head)” works but it is not ideal. > > > > Thanks! >
Serialization or internal functions?
Dear Community, Recently, I had to solve the following problem "for every entry of a Dataset[String], concat a constant value" , and to solve it, I used built-in functions : val data = Seq("A","b","c").toDS scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit(" "),lit("concat"))).select("valueconcat").explain() == Physical Plan == LocalTableScan [valueconcat#161] As an alternative , a much simpler version of the program is to use map, but it adds a serialization step that does not seem to be present for the version above : scala> data.map(e=> s"$e concat").explain == Physical Plan == *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#92] +- *(1) MapElements , obj#91: java.lang.String +- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String +- LocalTableScan [value#12] Is this over-optimization or is this the right way to go? As a follow up , is there any better API to get the one and only column available in a DataSet[String] when using built-in functions? "col(data.columns.head)" works but it is not ideal. Thanks!