unsubscribe

On Sat, Feb 17, 2024 at 3:04 AM Рамик И <ramik...@gmail.com> wrote:

>
> Hi
> I'm using Spark Streaming to read from Kafka and write to S3. Sometimes I
> get errors when writing org.apache.hadoop.fs.FileAlreadyExistsException.
>
> Spark version: 3.5.0
> scala version : 2.13.8
> Cluster: k8s
>
> libraryDependencies
> org.apache.hadoop.hadoop-aws    3.3.4
> com.amazonaws.aws-java-sdk-s3    1.12.600
>
>
>
> code:
> df
> .coalesce(1)
> .write
> .option("fs.s3a.committer.require.uuid", "true")
>  .option("fs.s3a.committer.generate.uuid", "true")
> .option("fs.s3a.committer.name", "magic")
> .option("fs.s3a.committer.magic.enabled", "true")
>  .option("orc.compress", "zlib")
>  .mode(SaveMode.Append)
> .orc(path)
>
>
>
> executor 9
>
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Job UUID
> 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid
> 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using committer magic
> to output data to s3a://mybucket/test
> 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using Committer
> MagicCommitter{AbstractS3ACommitter{role=Task committer
> attempt_202402161305112153373254688311399_0367_m_000000_13217, name=magic,
> outputPath=s3a://mybucket/test,
> workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_000000_13217/__base,
> uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid
> source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test
> 24/02/16 13:05:25 INFO SQLHadoopMapReduceCommitProtocol: Using output
> committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Starting: Setup Task
> attempt_202402161305112153373254688311399_0367_m_000000_13217
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Setup Task
> attempt_202402161305112153373254688311399_0367_m_000000_13217: duration
> 0:00.061s
> 24/02/16 13:05:25 ERROR Executor: Exception in task 0.2 in stage 367.1
> (TID 13217)
> org.apache.hadoop.fs.FileAlreadyExistsException:
> s3a://mybucket/test/part-00000-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc
> already exists
>
>
> executor 10
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Job UUID
> 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid
> 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using committer magic
> to output data to s3a://mybucket/test
> 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using Committer
> MagicCommitter{AbstractS3ACommitter{role=Task committer
> attempt_202402161305112153373254688311399_0367_m_000000_13216, name=magic,
> outputPath=s3a://mybucket/test,
> workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_000000_13216/__base,
> uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid
> source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test
> 24/02/16 13:05:24 INFO SQLHadoopMapReduceCommitProtocol: Using output
> committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Starting: Setup Task
> attempt_202402161305112153373254688311399_0367_m_000000_13216
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Setup Task
> attempt_202402161305112153373254688311399_0367_m_000000_13216: duration
> 0:00.112s
> 24/02/16 13:05:24 ERROR Executor: Exception in task 0.1 in stage 367.1
> (TID 13216)
> org.apache.hadoop.fs.FileAlreadyExistsException:
> s3a://mybucket/test/part-00000-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc
> already exists
>
>
>
> how can I fix it ?
>


-- 
Zhang Xin(张欣)
Email:josseph.zh...@gmail.com

Reply via email to