Hi All,

We are using gobblin mapreduce for a while and we experienced a weird issue
which can cause duplicates in shipping logs from kafka to s3.

We saw the following issue:
- On s3 we saw duplicates in the shipped logs like this:
2017-10-02 06:05       756
s3:/somefancy-bucket/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.4.gz
2017-10-02 06:05       756
s3://somefancy-bucket/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz

As you can see there are two files, one with the recordcount (4) and one
without it.
I tried to dig into the yarn logs and for this file I saw the following:

2017-10-02 06:05:02,491 INFO [Task-committing-pool-0]
org.apache.gobblin.writer.FsDataWriter: Moving data from
/gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-staging/somepath/attempt_1503408808112_128711_m_000016_1/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz
to
/gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz

2017-10-02 06:05:02,495 ERROR [Task-committing-pool-0]
org.apache.gobblin.runtime.Task: Failed to close all open resources

org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename
/gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz
to
/gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.4.gz:
dst already exists

        at
org.apache.gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:243)

        at
org.apache.gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:227)

        at
org.apache.gobblin.writer.FsDataWriter.addRecordCountToFileName(FsDataWriter.java:295)

        at
org.apache.gobblin.writer.FsDataWriter.close(FsDataWriter.java:276)

        at
org.apache.gobblin.writer.CloseOnFlushWriterWrapper.close(CloseOnFlushWriterWrapper.java:83)

        at com.google.common.io.Closer.close(Closer.java:214)

        at
org.apache.gobblin.instrumented.writer.InstrumentedDataWriterBase.close(InstrumentedDataWriterBase.java:215)

        at
org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator.close(InstrumentedDataWriterDecorator.java:43)

        at com.google.common.io.Closer.close(Closer.java:214)

        at
org.apache.gobblin.writer.PartitionedDataWriter.close(PartitionedDataWriter.java:225)

        at org.apache.gobblin.writer.RetryWriter.close(RetryWriter.java:109)

        at com.google.common.io.Closer.close(Closer.java:214)

        at org.apache.gobblin.runtime.fork.Fork.close(Fork.java:422)

        at com.google.common.io.Closer.close(Closer.java:214)

        at org.apache.gobblin.runtime.Task.commit(Task.java:854)

        at
org.apache.gobblin.runtime.GobblinMultiTaskAttempt$1$1.call(GobblinMultiTaskAttempt.java:166)

        at
org.apache.gobblin.runtime.GobblinMultiTaskAttempt$1$1.call(GobblinMultiTaskAttempt.java:161)


And later in the logs the following:

2017-10-02 06:04:03,555 INFO [Task-committing-pool-0]
org.apache.gobblin.writer.FsDataWriter: Moving data from
/gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-staging/somepath/attempt_1503408808112_128711_m_000016_0/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz
to
/gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz

2017-10-02 06:04:03,572 INFO [Task-committing-pool-0]
org.apache.gobblin.writer.FsDataWriter: Renaming
/gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz
to
/gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.4.gz

As the timestamp in the later logline is earlier than the previous one I
assumed it is due to speculative execution when it tried to run the same
rename/move to do twice and first it succeeded but on some other mapper it
failed.

I would think this is expected (due to the speculative execution) and
overall the mapreduce job finished successful.
What was not expected is the duplicates in the files, as it seems even
though it failed to rename once as the filename with the recordcount
already existed it published to the final location without the recordcount
added to the filename.

Is it possible this is caused due to the speculative execution or it is due
to some other issue?
It happens rare but it unfortunately happens. Now I'm trying to disable
speculative execution and see if it solves this issue.

Did you see issue like this?
What do you think?

I'm not sure if it needed but we used our own json writer (once I would
create a pull request from that as well):
https://github.com/prezi/gobblin/blob/json_module/gobblin-modules/gobblin-json/src/main/java/org/apache/gobblin/writer/SimpleCompressionDataWriter.java
And our own partitioner:
https://github.com/prezi/gobblin/blob/json_module/gobblin-modules/gobblin-json/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedJsonWriterPartitioner.java

Thanks,
Tamas

Reply via email to