Measure performance time in some spark transformations.
I want to measure how long it takes some different transformations in Spark as map, joinWithCassandraTable and so on. Which one is the best aproximation to do it? def time[R](block: => R): R = { val t0 = System.nanoTime() val result = block val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) + "ns") result} Could I use something like this?? I guess that the System.nanoTime will be executed in the driver before and after the workers execute the maps/joins and so on. Is it right? any other idea?
Having issues when running spark with s3
Hi, I am putting data using spark-redshift connector("com.databricks" %% "spark-redshift" % "1.1.0") which uses s3. Basically when I use *fs.s3a.fast.upload=true* hadoop property then my program works fine. But if this property is false then it causes issue and throws NullPointerException. Basically it tries to find* ${hadoop.tmp.dir}/s3a* property here https://github.com/gnudeep/hadoop-2.6/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java#L267 Here: contextCfgItemName:- *${hadoop.tmp.dir}/s3a* *It looks strange that property is having forward slash in it.* Can someone explain why this is happening. Plz find the huge stack-trace down: org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) at com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:278) at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:346) at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at com.goibibo.sqlshift.commons.MySQLToRedshiftMigrator$.storeToRedshift(MySQLToRedshiftMigrator.scala:325) at com.goibibo.sqlshift.SQLShift$$anonfun$run$2.apply(SQLShift.scala:156) at com.goibibo.sqlshift.SQLShift$$anonfun$run$2.apply(SQLShift.scala:127) at scala.collection.immutable.List.foreach(List.scala:318) at com.goibibo.sqlshift.SQLShift$.run(SQLShift.scala:127) at com.goibibo.sqlshift.SQLShift$.start(SQLShift.scala:196) at com.goibibo.sqlshift.FullDump.startSqlShift(FullDump.scala:47) at com.goibibo.sqlshift.FullDump$$anonfun$1.apply$mcV$sp(FullDump.scala:52) at com.goibibo.sqlshift.FullDump$$anonfun$1.apply(FullDump.scala:50) at com.goibibo.sqlshift.FullDump$$anonfun$1.apply(FullDump.scala:50) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682) at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196) at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685) at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692) at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1685) at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750) at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) at org.scalatest.SuperEngine.org $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:4
Re: Spark Structured Streaming is giving error “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;”
Thanks for the quick response...I'm able to inner join the dataframes with regular spark session. The issue is only with the spark streaming session. BTW I'm using Spark 2.2.0 version... -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Structured Streaming is giving error “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;”
Perhaps this link might help you. https://stackoverflow.com/questions/48699445/inner-join-not-working-in-dataframe-using-spark-2-1 Best, Passion On Sat, May 12, 2018, 10:57 AM ThomasThomas wrote: > Hi There, > > Our use case is like this. > > We have a nested(multiple) JSON message flowing through Kafka Queue. Read > the message from Kafka using Spark Structured Streaming(SSS) and explode > the data and flatten all data into single record using DataFrame joins and > land into a relational database table(DB2). > > But we are getting the following error when we write into db using JDBC. > > “org.apache.spark.sql.AnalysisException: Inner join between two streaming > DataFrames/Datasets is not supported;” > > Any help would be greatly appreciated. > > Thanks, > Thomas Thomas > Mastermind Solutions LLC. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Spark Structured Streaming is giving error “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;”
Hi There, Our use case is like this. We have a nested(multiple) JSON message flowing through Kafka Queue. Read the message from Kafka using Spark Structured Streaming(SSS) and explode the data and flatten all data into single record using DataFrame joins and land into a relational database table(DB2). But we are getting the following error when we write into db using JDBC. “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;” Any help would be greatly appreciated. Thanks, Thomas Thomas Mastermind Solutions LLC. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Dataset error with Encoder
Hi, I have the following issue, case class Item (c1: String, c2: String, c3: Option[BigDecimal]) import sparkSession.implicits._ val result = df.as[Item].groupByKey(_.c1).mapGroups((key, value) => { value }) But I get the following error in compilation time: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. What am I missing? Thanks