Hi Spark Community, Saving a data frame into a file on S3 using:
*df.write.csv(s3_location)* If run for longer than 30 mins, the following error persists: *The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken;`)* Potentially, because there is a hardcoded session limit in temporary S3 connection from Spark. One can specify the duration as per here: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html One can, of course, chunk data into sub-30 min writes. However, Is there a way to change the token expiry parameter directly in Spark before using "write.csv"? Thanks a lot for any help! Vasyl On Tue, Jan 23, 2018 at 2:46 PM, Toy <noppani...@gmail.com> wrote: > Thanks, I get this error when I switched to s3a:// > > Exception in thread "streaming-job-executor-0" > java.lang.NoSuchMethodError: com.amazonaws.services.s3. > transfer.TransferManager.<init>(Lcom/amazonaws/services/ > s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V > at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize( > S3AFileSystem.java:287) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669) > > On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <palw...@hortonworks.com> > wrote: > >> Spark cannot read locally from S3 without an S3a protocol; you’ll more >> than likely need a local copy of the data or you’ll need to utilize the >> proper jars to enable S3 communication from the edge to the datacenter. >> >> >> >> https://stackoverflow.com/questions/30385981/how-to- >> access-s3a-files-from-apache-spark >> >> >> >> Here are the jars: https://mvnrepository.com/artifact/org.apache.hadoop/ >> hadoop-aws >> >> >> >> Looks like you already have them, in which case you’ll have to make >> small configuration changes, e.g. s3 à s3a >> >> >> >> Keep in mind: *The Amazon JARs have proven very brittle: the version of >> the Amazon libraries must match the versions against which the Hadoop >> binaries were built.* >> >> >> >> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index. >> html#using-the-s3a-filesystem-client >> >> >> >> >> >> >> >> >> >> *From: *Toy <noppani...@gmail.com> >> *Date: *Tuesday, January 23, 2018 at 11:33 AM >> *To: *"user@spark.apache.org" <user@spark.apache.org> >> *Subject: *I can't save DataFrame from running Spark locally >> >> >> >> Hi, >> >> >> >> First of all, my Spark application runs fine in AWS EMR. However, I'm >> trying to run it locally to debug some issue. My application is just to >> parse log files and convert to DataFrame then convert to ORC and save to >> S3. However, when I run locally I get this error >> >> >> >> java.io.IOException: /orc/dt=2018-01-23 doesn't exist >> >> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get( >> Jets3tFileSystemStore.java:170) >> >> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode( >> Jets3tFileSystemStore.java:221) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke( >> NativeMethodAccessorImpl.java:62) >> >> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >> DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:497) >> >> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod( >> RetryInvocationHandler.java:191) >> >> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke( >> RetryInvocationHandler.java:102) >> >> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source) >> >> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus( >> S3FileSystem.java:340) >> >> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) >> >> at org.apache.spark.sql.execution.datasources. >> InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationComm >> and.scala:77) >> >> at org.apache.spark.sql.execution.command.ExecutedCommandExec. >> sideEffectResult$lzycompute(commands.scala:58) >> >> at org.apache.spark.sql.execution.command.ExecutedCommandExec. >> sideEffectResult(commands.scala:56) >> >> at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute( >> commands.scala:74) >> >> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >> execute$1.apply(SparkPlan.scala:114) >> >> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >> execute$1.apply(SparkPlan.scala:114) >> >> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >> executeQuery$1.apply(SparkPlan.scala:135) >> >> at org.apache.spark.rdd.RDDOperationScope$.withScope( >> RDDOperationScope.scala:151) >> >> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala: >> 132) >> >> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) >> >> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute( >> QueryExecution.scala:87) >> >> at org.apache.spark.sql.execution.QueryExecution. >> toRdd(QueryExecution.scala:87) >> >> at org.apache.spark.sql.execution.datasources. >> DataSource.write(DataSource.scala:492) >> >> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) >> >> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) >> >> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193) >> >> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170) >> >> >> >> Here's what I have in sbt >> >> >> >> scalaVersion := "2.11.8" >> >> >> >> val sparkVersion = "2.1.0" >> >> val hadoopVersion = "2.7.3" >> >> val awsVersion = "1.11.155" >> >> >> >> lazy val sparkAndDependencies = Seq( >> >> "org.apache.spark" %% "spark-core" % sparkVersion, >> >> "org.apache.spark" %% "spark-sql" % sparkVersion, >> >> "org.apache.spark" %% "spark-hive" % sparkVersion, >> >> "org.apache.spark" %% "spark-streaming" % sparkVersion, >> >> >> >> "org.apache.hadoop" % "hadoop-aws" % hadoopVersion, >> >> "org.apache.hadoop" % "hadoop-common" % hadoopVersion >> >> ) >> >> >> >> And this is where the code failed >> >> >> >> val sparrowWriter = sparrowCastedDf.write.mode(" >> append").format("orc").option("compression", "zlib") >> >> sparrowWriter.save(sparrowOutputPath) >> >> >> >> sparrowOutputPath is something like s3://bucket/folder and it exists I >> checked it with aws command line >> >> >> >> I put a breakpoint there and the full path looks like this >> s3://bucket/orc/dt=2018-01-23 which exists. >> >> >> >> I have also set up the credentials like this >> >> >> >> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key") >> >> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret") >> >> >> >> My confusion is this code runs fine in the cluster but I get this error >> running locally. >> >> >> >> >> >