Hi All, We are using gobblin mapreduce for a while and we experienced a weird issue which can cause duplicates in shipping logs from kafka to s3.
We saw the following issue: - On s3 we saw duplicates in the shipped logs like this: 2017-10-02 06:05 756 s3:/somefancy-bucket/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.4.gz 2017-10-02 06:05 756 s3://somefancy-bucket/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz As you can see there are two files, one with the recordcount (4) and one without it. I tried to dig into the yarn logs and for this file I saw the following: 2017-10-02 06:05:02,491 INFO [Task-committing-pool-0] org.apache.gobblin.writer.FsDataWriter: Moving data from /gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-staging/somepath/attempt_1503408808112_128711_m_000016_1/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz to /gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz 2017-10-02 06:05:02,495 ERROR [Task-committing-pool-0] org.apache.gobblin.runtime.Task: Failed to close all open resources org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename /gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz to /gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.4.gz: dst already exists at org.apache.gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:243) at org.apache.gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:227) at org.apache.gobblin.writer.FsDataWriter.addRecordCountToFileName(FsDataWriter.java:295) at org.apache.gobblin.writer.FsDataWriter.close(FsDataWriter.java:276) at org.apache.gobblin.writer.CloseOnFlushWriterWrapper.close(CloseOnFlushWriterWrapper.java:83) at com.google.common.io.Closer.close(Closer.java:214) at org.apache.gobblin.instrumented.writer.InstrumentedDataWriterBase.close(InstrumentedDataWriterBase.java:215) at org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator.close(InstrumentedDataWriterDecorator.java:43) at com.google.common.io.Closer.close(Closer.java:214) at org.apache.gobblin.writer.PartitionedDataWriter.close(PartitionedDataWriter.java:225) at org.apache.gobblin.writer.RetryWriter.close(RetryWriter.java:109) at com.google.common.io.Closer.close(Closer.java:214) at org.apache.gobblin.runtime.fork.Fork.close(Fork.java:422) at com.google.common.io.Closer.close(Closer.java:214) at org.apache.gobblin.runtime.Task.commit(Task.java:854) at org.apache.gobblin.runtime.GobblinMultiTaskAttempt$1$1.call(GobblinMultiTaskAttempt.java:166) at org.apache.gobblin.runtime.GobblinMultiTaskAttempt$1$1.call(GobblinMultiTaskAttempt.java:161) And later in the logs the following: 2017-10-02 06:04:03,555 INFO [Task-committing-pool-0] org.apache.gobblin.writer.FsDataWriter: Moving data from /gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-staging/somepath/attempt_1503408808112_128711_m_000016_0/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz to /gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz 2017-10-02 06:04:03,572 INFO [Task-committing-pool-0] org.apache.gobblin.writer.FsDataWriter: Renaming /gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.gz to /gobblin/gobblin/task/job_RawLogShippingToS3_1506924118242/task-output/somepath/raw/staging/dt=2017-10-02/hr=07/part.task_RawLogShippingToS3_1506924118242_233_0.4.gz As the timestamp in the later logline is earlier than the previous one I assumed it is due to speculative execution when it tried to run the same rename/move to do twice and first it succeeded but on some other mapper it failed. I would think this is expected (due to the speculative execution) and overall the mapreduce job finished successful. What was not expected is the duplicates in the files, as it seems even though it failed to rename once as the filename with the recordcount already existed it published to the final location without the recordcount added to the filename. Is it possible this is caused due to the speculative execution or it is due to some other issue? It happens rare but it unfortunately happens. Now I'm trying to disable speculative execution and see if it solves this issue. Did you see issue like this? What do you think? I'm not sure if it needed but we used our own json writer (once I would create a pull request from that as well): https://github.com/prezi/gobblin/blob/json_module/gobblin-modules/gobblin-json/src/main/java/org/apache/gobblin/writer/SimpleCompressionDataWriter.java And our own partitioner: https://github.com/prezi/gobblin/blob/json_module/gobblin-modules/gobblin-json/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedJsonWriterPartitioner.java Thanks, Tamas