Hi, thanks for the fast answer. Answers inline. > Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <aljos...@apache.org>: > > Hi, > a missing part file for one of the parallel sinks is not necessarily a > problem. This can happen if that parallel instance of the sink never received > data after the job successfully restarted. > > Missing data, however, is a problem. Maybe I need some more information about > your setup: > > - When are you inspecting the part files? Some time after the cluster is shut down > - Do you shutdown the Flink Job before checking? If so, how do you shut it > down. Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written only after cancelling the job, right? > - When do you know whether all the data from Kafka was consumed by Flink and > has passed through the pipeline into HDFS? I have an accumulator in a map right before writing into HDFS. Also, the RollingSink has a DataTimeBucketer which makes it transparent when no new data is arriving anymore as the last bucket is from some minutes ago. > > Cheers, > Aljoscha >> On 08 Mar 2016, at 13:19, Maximilian Bode <maximilian.b...@tngtech.com> >> wrote: >> >> Hi Aljoscha, >> >> oh I see. I was under the impression this file was used internally and the >> output being completed at the end. Ok, so I extracted the relevant lines >> using >> for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > >> "$i.final"; done >> which seems to do the trick. >> >> Unfortunately, now some records are missing again. In particular, there are >> the files >> part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding >> .valid-length files >> part-0-1, part-1-1, ..., part-10-0 >> in the bucket, where job parallelism=12. So it looks to us as if one of the >> files was not even created in the second attempt. This behavior seems to be >> what somewhat reproducible, cf. my earlier email where the part-11 file >> disappeared as well. >> >> Thanks again for your help. >> >> Cheers, >> Max >> — >> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> >>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljos...@apache.org>: >>> >>> Hi, >>> are you taking the “.valid-length” files into account. The problem with >>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not >>> possible to truncate files. So the trick we’re using is to write the length >>> up to which a file is valid if we would normally need to truncate it. (If >>> the job fails in the middle of writing the output files have to be >>> truncated to a valid position.) For example, say you have an output file >>> part-8-0. Now, if there exists a file part-8-0.valid-length this file tells >>> you up to which position the file part-8-0 is valid. So you should only >>> read up to this point. >>> >>> The name of the “.valid-length” suffix can also be configured, by the way, >>> as can all the other stuff. >>> >>> If this is not the problem then I definitely have to investigate further. >>> I’ll also look into the Hadoop 2.4.1 build problem. >>> >>> Cheers, >>> Aljoscha >>>> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> >>>> wrote: >>>> >>>> Hi Aljoscha, >>>> thanks again for getting back to me. I built from your branch and the >>>> exception is not occurring anymore. The RollingSink state can be restored. >>>> >>>> Still, the exactly-once guarantee seems not to be fulfilled, there are >>>> always some extra records after killing either a task manager or the job >>>> manager. Do you have an idea where this behavior might be coming from? (I >>>> guess concrete numbers will not help greatly as there are so many >>>> parameters influencing them. Still, in our test scenario, we produce 2 >>>> million records in a Kafka queue but in the final output files there are >>>> on the order of 2.1 million records, so a 5% error. The job is running in >>>> a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.) >>>> >>>> On another (maybe unrelated) note: when I pulled your branch, the Travis >>>> build did not go through for -Dhadoop.version=2.4.1. I have not looked >>>> into this further as of now, is this one of the tests known to fail >>>> sometimes? >>>> >>>> Cheers, >>>> Max >>>> <travis.log> >>>> — >>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>> >>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>: >>>>> >>>>> Hi Maximilian, >>>>> sorry for the delay, we where very busy with the release last week. I had >>>>> a hunch about the problem but I think I found a fix now. The problem is >>>>> in snapshot restore. When restoring, the sink tries to clean up any files >>>>> that where previously in progress. If Flink restores to the same snapshot >>>>> twice in a row then it will try to clean up the leftover files twice but >>>>> they are not there anymore, this causes the exception. >>>>> >>>>> I have a fix in my branch: >>>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix >>>>> >>>>> Could you maybe try if this solves your problem? Which version of Flink >>>>> are you using? You would have to build from source to try it out. >>>>> Alternatively I could build it and put it onto a maven snapshot >>>>> repository for you to try it out. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote: >>>>>> >>>>>> Hi, >>>>>> did you check whether there are any files at your specified HDFS output >>>>>> location? If yes, which files are there? >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> >>>>>>> wrote: >>>>>>> >>>>>>> Just for the sake of completeness: this also happens when killing a >>>>>>> task manager and is therefore probably unrelated to job manager HA. >>>>>>> >>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode >>>>>>>> <maximilian.b...@tngtech.com>: >>>>>>>> >>>>>>>> Hi everyone, >>>>>>>> >>>>>>>> unfortunately, I am running into another problem trying to establish >>>>>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS). >>>>>>>> >>>>>>>> When using >>>>>>>> >>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new >>>>>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound"); >>>>>>>> sink.setBucketer(new NonRollingBucketer()); >>>>>>>> output.addSink(sink); >>>>>>>> >>>>>>>> and then killing the job manager, the new job manager is unable to >>>>>>>> restore the old state throwing >>>>>>>> --- >>>>>>>> java.lang.Exception: Could not restore checkpointed state to operators >>>>>>>> and functions >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: >>>>>>>> In-Progress file >>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither >>>>>>>> moved to pending nor is still in progress. >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446) >>>>>>>> ... 3 more >>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file >>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither >>>>>>>> moved to pending nor is still in progress. >>>>>>>> at >>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) >>>>>>>> ... 4 more >>>>>>>> --- >>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact >>>>>>>> using 2.4.0 – might this be the same issue? >>>>>>>> >>>>>>>> Another thing I could think of is that the job is not configured >>>>>>>> correctly and there is some sort of timing issue. The checkpoint >>>>>>>> interval is 10 seconds, everything else was left at default value. >>>>>>>> Then again, as the NonRollingBucketer is used, there should not be any >>>>>>>> timing issues, right? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Max >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979 >>>>>>>> >>>>>>>> — >>>>>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
signature.asc
Description: Message signed with OpenPGP using GPGMail