Hi Spark Community,

Saving a data frame into a file on S3 using:

*df.write.csv(s3_location)*

If run for longer than 30 mins, the following error persists:

*The provided token has expired. (Service: Amazon S3; Status Code: 400;
Error Code: ExpiredToken;`)*

Potentially, because there is a hardcoded session limit in temporary S3
connection from Spark.

One can specify the duration as per here:

https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html

One can, of course, chunk data into sub-30 min writes. However, Is there a
way to change the token expiry parameter directly in Spark before using
"write.csv"?

Thanks a lot for any help!
Vasyl





On Tue, Jan 23, 2018 at 2:46 PM, Toy <noppani...@gmail.com> wrote:

> Thanks, I get this error when I switched to s3a://
>
> Exception in thread "streaming-job-executor-0"
> java.lang.NoSuchMethodError: com.amazonaws.services.s3.
> transfer.TransferManager.<init>(Lcom/amazonaws/services/
> s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
> at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(
> S3AFileSystem.java:287)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>
> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <palw...@hortonworks.com>
> wrote:
>
>> Spark cannot read locally from S3 without an S3a protocol; you’ll more
>> than likely need a local copy of the data or you’ll need to utilize the
>> proper jars to enable S3 communication from the edge to the datacenter.
>>
>>
>>
>> https://stackoverflow.com/questions/30385981/how-to-
>> access-s3a-files-from-apache-spark
>>
>>
>>
>> Here are the jars: https://mvnrepository.com/artifact/org.apache.hadoop/
>> hadoop-aws
>>
>>
>>
>> Looks like you already have them, in which case you’ll have to make
>> small configuration changes, e.g. s3 à s3a
>>
>>
>>
>> Keep in mind: *The Amazon JARs have proven very brittle: the version of
>> the Amazon libraries must match the versions against which the Hadoop
>> binaries were built.*
>>
>>
>>
>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.
>> html#using-the-s3a-filesystem-client
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From: *Toy <noppani...@gmail.com>
>> *Date: *Tuesday, January 23, 2018 at 11:33 AM
>> *To: *"user@spark.apache.org" <user@spark.apache.org>
>> *Subject: *I can't save DataFrame from running Spark locally
>>
>>
>>
>> Hi,
>>
>>
>>
>> First of all, my Spark application runs fine in AWS EMR. However, I'm
>> trying to run it locally to debug some issue. My application is just to
>> parse log files and convert to DataFrame then convert to ORC and save to
>> S3. However, when I run locally I get this error
>>
>>
>>
>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>>
>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(
>> Jets3tFileSystemStore.java:170)
>>
>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(
>> Jets3tFileSystemStore.java:221)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>> NativeMethodAccessorImpl.java:62)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:497)
>>
>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
>> RetryInvocationHandler.java:191)
>>
>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
>> RetryInvocationHandler.java:102)
>>
>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>>
>> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(
>> S3FileSystem.java:340)
>>
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>>
>> at org.apache.spark.sql.execution.datasources.
>> InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationComm
>> and.scala:77)
>>
>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.
>> sideEffectResult$lzycompute(commands.scala:58)
>>
>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.
>> sideEffectResult(commands.scala:56)
>>
>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(
>> commands.scala:74)
>>
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> execute$1.apply(SparkPlan.scala:114)
>>
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> execute$1.apply(SparkPlan.scala:114)
>>
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> executeQuery$1.apply(SparkPlan.scala:135)
>>
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:151)
>>
>> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:
>> 132)
>>
>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>>
>> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(
>> QueryExecution.scala:87)
>>
>> at org.apache.spark.sql.execution.QueryExecution.
>> toRdd(QueryExecution.scala:87)
>>
>> at org.apache.spark.sql.execution.datasources.
>> DataSource.write(DataSource.scala:492)
>>
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>>
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>>
>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>>
>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>>
>>
>>
>> Here's what I have in sbt
>>
>>
>>
>> scalaVersion := "2.11.8"
>>
>>
>>
>> val sparkVersion = "2.1.0"
>>
>> val hadoopVersion = "2.7.3"
>>
>> val awsVersion = "1.11.155"
>>
>>
>>
>> lazy val sparkAndDependencies = Seq(
>>
>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>>
>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>
>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>>
>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>>
>>
>>
>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>>
>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>>
>> )
>>
>>
>>
>> And this is where the code failed
>>
>>
>>
>> val sparrowWriter = sparrowCastedDf.write.mode("
>> append").format("orc").option("compression", "zlib")
>>
>> sparrowWriter.save(sparrowOutputPath)
>>
>>
>>
>> sparrowOutputPath is something like s3://bucket/folder and it exists I
>> checked it with aws command line
>>
>>
>>
>> I put a breakpoint there and the full path looks like this
>> s3://bucket/orc/dt=2018-01-23 which exists.
>>
>>
>>
>> I have also set up the credentials like this
>>
>>
>>
>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>>
>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>>
>>
>>
>> My confusion is this code runs fine in the cluster but I get this error
>> running locally.
>>
>>
>>
>>
>>
>

Reply via email to