Hi, are you taking the “.valid-length” files into account. The problem with doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files. So the trick we’re using is to write the length up to which a file is valid if we would normally need to truncate it. (If the job fails in the middle of writing the output files have to be truncated to a valid position.) For example, say you have an output file part-8-0. Now, if there exists a file part-8-0.valid-length this file tells you up to which position the file part-8-0 is valid. So you should only read up to this point.
The name of the “.valid-length” suffix can also be configured, by the way, as can all the other stuff. If this is not the problem then I definitely have to investigate further. I’ll also look into the Hadoop 2.4.1 build problem. Cheers, Aljoscha > On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> wrote: > > Hi Aljoscha, > thanks again for getting back to me. I built from your branch and the > exception is not occurring anymore. The RollingSink state can be restored. > > Still, the exactly-once guarantee seems not to be fulfilled, there are always > some extra records after killing either a task manager or the job manager. Do > you have an idea where this behavior might be coming from? (I guess concrete > numbers will not help greatly as there are so many parameters influencing > them. Still, in our test scenario, we produce 2 million records in a Kafka > queue but in the final output files there are on the order of 2.1 million > records, so a 5% error. The job is running in a per-job YARN session with > n=3, s=4 with a checkpointing interval of 10s.) > > On another (maybe unrelated) note: when I pulled your branch, the Travis > build did not go through for -Dhadoop.version=2.4.1. I have not looked into > this further as of now, is this one of the tests known to fail sometimes? > > Cheers, > Max > <travis.log> > — > Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > >> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>: >> >> Hi Maximilian, >> sorry for the delay, we where very busy with the release last week. I had a >> hunch about the problem but I think I found a fix now. The problem is in >> snapshot restore. When restoring, the sink tries to clean up any files that >> where previously in progress. If Flink restores to the same snapshot twice >> in a row then it will try to clean up the leftover files twice but they are >> not there anymore, this causes the exception. >> >> I have a fix in my branch: >> https://github.com/aljoscha/flink/tree/rolling-sink-fix >> >> Could you maybe try if this solves your problem? Which version of Flink are >> you using? You would have to build from source to try it out. Alternatively >> I could build it and put it onto a maven snapshot repository for you to try >> it out. >> >> Cheers, >> Aljoscha >>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>> Hi, >>> did you check whether there are any files at your specified HDFS output >>> location? If yes, which files are there? >>> >>> Cheers, >>> Aljoscha >>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> >>>> wrote: >>>> >>>> Just for the sake of completeness: this also happens when killing a task >>>> manager and is therefore probably unrelated to job manager HA. >>>> >>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode >>>>> <maximilian.b...@tngtech.com>: >>>>> >>>>> Hi everyone, >>>>> >>>>> unfortunately, I am running into another problem trying to establish >>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS). >>>>> >>>>> When using >>>>> >>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new >>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound"); >>>>> sink.setBucketer(new NonRollingBucketer()); >>>>> output.addSink(sink); >>>>> >>>>> and then killing the job manager, the new job manager is unable to >>>>> restore the old state throwing >>>>> --- >>>>> java.lang.Exception: Could not restore checkpointed state to operators >>>>> and functions >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:744) >>>>> Caused by: java.lang.Exception: Failed to restore state to function: >>>>> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 >>>>> was neither moved to pending nor is still in progress. >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446) >>>>> ... 3 more >>>>> Caused by: java.lang.RuntimeException: In-Progress file >>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved >>>>> to pending nor is still in progress. >>>>> at >>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686) >>>>> at >>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) >>>>> ... 4 more >>>>> --- >>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact >>>>> using 2.4.0 – might this be the same issue? >>>>> >>>>> Another thing I could think of is that the job is not configured >>>>> correctly and there is some sort of timing issue. The checkpoint interval >>>>> is 10 seconds, everything else was left at default value. Then again, as >>>>> the NonRollingBucketer is used, there should not be any timing issues, >>>>> right? >>>>> >>>>> Cheers, >>>>> Max >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979 >>>>> >>>>> — >>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>> >>>> >>> >> >