Github user ljwagerfield commented on the pull request: https://github.com/apache/spark/pull/8710#issuecomment-220395710 We're seeing this exception too. We're also running our operations in serial (at least on the surface it seems as if we are). If we execute a `df.save` operation in a `Future` and wait for that `Future` to complete, then all `df.save` operations we perform within subsequent `Future`s will fail. This specifically happens when we load Avro files from S3 and save them as Parquet back to S3. The loading works fine but the saving fails on 2nd attempt. Furthermore, if we simply generate a `DataFrame` from an in-memory list (so we're not loading from S3 - only saving to S3) then the error goes away... I'm not sure how helpful this is. We're using Java 1.8, Scala 2.10.5, with our Spark codebase at commit https://github.com/apache/spark/commit/15de51c238a7340fa81cb0b80d029a05d97bfc5c. Our exact reproduction steps are: **1. Run a Spark Shell with appropriate dependencies** ``` ./spark-shell --packages com.amazonaws:aws-java-sdk:1.10.75,org.apache.hadoop:hadoop-aws:2.7.2,com.databricks:spark-avro_2.10:2.0.1 ``` **2. Run the following setup code within the shell** ``` import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.ExecutionContext.Implicits.global import sqlContext.implicits._ import org.apache.spark.sql._ implicit val sqlContext = new org.apache.spark.sql.SQLContext(sc) val hadoopConf = sc.hadoopConfiguration; hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", "...") hadoopConf.set("fs.s3.awsSecretAccessKey", "...") val df = sqlContext.read.format("com.databricks.spark.avro").load("s3://bucket/input.avro") def doWrite() { df.write.format("org.apache.spark.sql.parquet").mode(SaveMode.Overwrite).save("s3://bucket/output") } ``` **3. Run this _twice_ - but leaving time for the first execution to finish (so the operations are serialised)** ``` Future { doWrite(); println("SUCCEEDED") }.recover { case e: Throwable => println("FAILED: " + e.getMessage()); e.printStackTrace() } ``` **Result:** ``` spark.sql.execution.id is already set java.lang.IllegalArgumentException: spark.sql.execution.id is already set at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) at $line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.doWrite(<console>:41) at $line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply$mcV$sp(<console>:43) at $line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:43) at $line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:43) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org