Hi, a missing part file for one of the parallel sinks is not necessarily a problem. This can happen if that parallel instance of the sink never received data after the job successfully restarted.
Missing data, however, is a problem. Maybe I need some more information about your setup: - When are you inspecting the part files? - Do you shutdown the Flink Job before checking? If so, how do you shut it down. - When do you know whether all the data from Kafka was consumed by Flink and has passed through the pipeline into HDFS? Cheers, Aljoscha > On 08 Mar 2016, at 13:19, Maximilian Bode <maximilian.b...@tngtech.com> wrote: > > Hi Aljoscha, > > oh I see. I was under the impression this file was used internally and the > output being completed at the end. Ok, so I extracted the relevant lines using > for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > > "$i.final"; done > which seems to do the trick. > > Unfortunately, now some records are missing again. In particular, there are > the files > part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding > .valid-length files > part-0-1, part-1-1, ..., part-10-0 > in the bucket, where job parallelism=12. So it looks to us as if one of the > files was not even created in the second attempt. This behavior seems to be > what somewhat reproducible, cf. my earlier email where the part-11 file > disappeared as well. > > Thanks again for your help. > > Cheers, > Max > — > Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > >> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljos...@apache.org>: >> >> Hi, >> are you taking the “.valid-length” files into account. The problem with >> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible >> to truncate files. So the trick we’re using is to write the length up to >> which a file is valid if we would normally need to truncate it. (If the job >> fails in the middle of writing the output files have to be truncated to a >> valid position.) For example, say you have an output file part-8-0. Now, if >> there exists a file part-8-0.valid-length this file tells you up to which >> position the file part-8-0 is valid. So you should only read up to this >> point. >> >> The name of the “.valid-length” suffix can also be configured, by the way, >> as can all the other stuff. >> >> If this is not the problem then I definitely have to investigate further. >> I’ll also look into the Hadoop 2.4.1 build problem. >> >> Cheers, >> Aljoscha >>> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> >>> wrote: >>> >>> Hi Aljoscha, >>> thanks again for getting back to me. I built from your branch and the >>> exception is not occurring anymore. The RollingSink state can be restored. >>> >>> Still, the exactly-once guarantee seems not to be fulfilled, there are >>> always some extra records after killing either a task manager or the job >>> manager. Do you have an idea where this behavior might be coming from? (I >>> guess concrete numbers will not help greatly as there are so many >>> parameters influencing them. Still, in our test scenario, we produce 2 >>> million records in a Kafka queue but in the final output files there are on >>> the order of 2.1 million records, so a 5% error. The job is running in a >>> per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.) >>> >>> On another (maybe unrelated) note: when I pulled your branch, the Travis >>> build did not go through for -Dhadoop.version=2.4.1. I have not looked into >>> this further as of now, is this one of the tests known to fail sometimes? >>> >>> Cheers, >>> Max >>> <travis.log> >>> — >>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>> >>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>: >>>> >>>> Hi Maximilian, >>>> sorry for the delay, we where very busy with the release last week. I had >>>> a hunch about the problem but I think I found a fix now. The problem is in >>>> snapshot restore. When restoring, the sink tries to clean up any files >>>> that where previously in progress. If Flink restores to the same snapshot >>>> twice in a row then it will try to clean up the leftover files twice but >>>> they are not there anymore, this causes the exception. >>>> >>>> I have a fix in my branch: >>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix >>>> >>>> Could you maybe try if this solves your problem? Which version of Flink >>>> are you using? You would have to build from source to try it out. >>>> Alternatively I could build it and put it onto a maven snapshot repository >>>> for you to try it out. >>>> >>>> Cheers, >>>> Aljoscha >>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote: >>>>> >>>>> Hi, >>>>> did you check whether there are any files at your specified HDFS output >>>>> location? If yes, which files are there? >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> >>>>>> wrote: >>>>>> >>>>>> Just for the sake of completeness: this also happens when killing a task >>>>>> manager and is therefore probably unrelated to job manager HA. >>>>>> >>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode >>>>>>> <maximilian.b...@tngtech.com>: >>>>>>> >>>>>>> Hi everyone, >>>>>>> >>>>>>> unfortunately, I am running into another problem trying to establish >>>>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS). >>>>>>> >>>>>>> When using >>>>>>> >>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new >>>>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound"); >>>>>>> sink.setBucketer(new NonRollingBucketer()); >>>>>>> output.addSink(sink); >>>>>>> >>>>>>> and then killing the job manager, the new job manager is unable to >>>>>>> restore the old state throwing >>>>>>> --- >>>>>>> java.lang.Exception: Could not restore checkpointed state to operators >>>>>>> and functions >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: >>>>>>> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 >>>>>>> was neither moved to pending nor is still in progress. >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446) >>>>>>> ... 3 more >>>>>>> Caused by: java.lang.RuntimeException: In-Progress file >>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither >>>>>>> moved to pending nor is still in progress. >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686) >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) >>>>>>> ... 4 more >>>>>>> --- >>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact >>>>>>> using 2.4.0 – might this be the same issue? >>>>>>> >>>>>>> Another thing I could think of is that the job is not configured >>>>>>> correctly and there is some sort of timing issue. The checkpoint >>>>>>> interval is 10 seconds, everything else was left at default value. Then >>>>>>> again, as the NonRollingBucketer is used, there should not be any >>>>>>> timing issues, right? >>>>>>> >>>>>>> Cheers, >>>>>>> Max >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979 >>>>>>> >>>>>>> — >>>>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>>>> >>>>>> >>>>> >>>> >>> >> >