Hi,
thanks for the fast answer. Answers inline.

> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi,
> a missing part file for one of the parallel sinks is not necessarily a 
> problem. This can happen if that parallel instance of the sink never received 
> data after the job successfully restarted.
> 
> Missing data, however, is a problem. Maybe I need some more information about 
> your setup:
> 
> - When are you inspecting the part files?
Some time after the cluster is shut down
> - Do you shutdown the Flink Job before checking? If so, how do you shut it 
> down.
Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written 
only after cancelling the job, right?
> - When do you know whether all the data from Kafka was consumed by Flink and 
> has passed through the pipeline into HDFS?
I have an accumulator in a map right before writing into HDFS. Also, the 
RollingSink has a DataTimeBucketer which makes it transparent when no new data 
is arriving anymore as the last bucket is from some minutes ago.
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:19, Maximilian Bode <maximilian.b...@tngtech.com> 
>> wrote:
>> 
>> Hi Aljoscha,
>> 
>> oh I see. I was under the impression this file was used internally and the 
>> output being completed at the end. Ok, so I extracted the relevant lines 
>> using
>>      for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
>> "$i.final"; done
>> which seems to do the trick.
>> 
>> Unfortunately, now some records are missing again. In particular, there are 
>> the files
>>      part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
>> .valid-length files
>>      part-0-1, part-1-1, ..., part-10-0
>> in the bucket, where job parallelism=12. So it looks to us as if one of the 
>> files was not even created in the second attempt. This behavior seems to be 
>> what somewhat reproducible, cf. my earlier email where the part-11 file 
>> disappeared as well.
>> 
>> Thanks again for your help.
>> 
>> Cheers,
>> Max
>> —
>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>> 
>>> Hi,
>>> are you taking the “.valid-length” files into account. The problem with 
>>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not 
>>> possible to truncate files. So the trick we’re using is to write the length 
>>> up to which a file is valid if we would normally need to truncate it. (If 
>>> the job fails in the middle of writing the output files have to be 
>>> truncated to a valid position.) For example, say you have an output file 
>>> part-8-0. Now, if there exists a file part-8-0.valid-length this file tells 
>>> you up to which position the file part-8-0 is valid. So you should only 
>>> read up to this point.
>>> 
>>> The name of the “.valid-length” suffix can also be configured, by the way, 
>>> as can all the other stuff.
>>> 
>>> If this is not the problem then I definitely have to investigate further. 
>>> I’ll also look into the Hadoop 2.4.1 build problem.
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> 
>>>> wrote:
>>>> 
>>>> Hi Aljoscha,
>>>> thanks again for getting back to me. I built from your branch and the 
>>>> exception is not occurring anymore. The RollingSink state can be restored.
>>>> 
>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are 
>>>> always some extra records after killing either a task manager or the job 
>>>> manager. Do you have an idea where this behavior might be coming from? (I 
>>>> guess concrete numbers will not help greatly as there are so many 
>>>> parameters influencing them. Still, in our test scenario, we produce 2 
>>>> million records in a Kafka queue but in the final output files there are 
>>>> on the order of 2.1 million records, so a 5% error. The job is running in 
>>>> a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>>> 
>>>> On another (maybe unrelated) note: when I pulled your branch, the Travis 
>>>> build did not go through for -Dhadoop.version=2.4.1. I have not looked 
>>>> into this further as of now, is this one of the tests known to fail 
>>>> sometimes?
>>>> 
>>>> Cheers,
>>>> Max
>>>> <travis.log>
>>>> —
>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>>>> 
>>>>> Hi Maximilian,
>>>>> sorry for the delay, we where very busy with the release last week. I had 
>>>>> a hunch about the problem but I think I found a fix now. The problem is 
>>>>> in snapshot restore. When restoring, the sink tries to clean up any files 
>>>>> that where previously in progress. If Flink restores to the same snapshot 
>>>>> twice in a row then it will try to clean up the leftover files twice but 
>>>>> they are not there anymore, this causes the exception.
>>>>> 
>>>>> I have a fix in my branch: 
>>>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>>> 
>>>>> Could you maybe try if this solves your problem? Which version of Flink 
>>>>> are you using? You would have to build from source to try it out. 
>>>>> Alternatively I could build it and put it onto a maven snapshot 
>>>>> repository for you to try it out.
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> did you check whether there are any files at your specified HDFS output 
>>>>>> location? If yes, which files are there?
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>> Just for the sake of completeness: this also happens when killing a 
>>>>>>> task manager and is therefore probably unrelated to job manager HA.
>>>>>>> 
>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
>>>>>>>> <maximilian.b...@tngtech.com>:
>>>>>>>> 
>>>>>>>> Hi everyone,
>>>>>>>> 
>>>>>>>> unfortunately, I am running into another problem trying to establish 
>>>>>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>>> 
>>>>>>>> When using
>>>>>>>> 
>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new 
>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>>> output.addSink(sink);
>>>>>>>> 
>>>>>>>> and then killing the job manager, the new job manager is unable to 
>>>>>>>> restore the old state throwing
>>>>>>>> ---
>>>>>>>> java.lang.Exception: Could not restore checkpointed state to operators 
>>>>>>>> and functions
>>>>>>>>        at 
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>        at java.lang.Thread.run(Thread.java:744)
>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: 
>>>>>>>> In-Progress file 
>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither 
>>>>>>>> moved to pending nor is still in progress.
>>>>>>>>        at 
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>>        ... 3 more
>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file 
>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither 
>>>>>>>> moved to pending nor is still in progress.
>>>>>>>>        at 
>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>>        ... 4 more
>>>>>>>> ---
>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact 
>>>>>>>> using 2.4.0 – might this be the same issue?
>>>>>>>> 
>>>>>>>> Another thing I could think of is that the job is not configured 
>>>>>>>> correctly and there is some sort of timing issue. The checkpoint 
>>>>>>>> interval is 10 seconds, everything else was left at default value. 
>>>>>>>> Then again, as the NonRollingBucketer is used, there should not be any 
>>>>>>>> timing issues, right?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>> 
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>>> 
>>>>>>>> —
>>>>>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to