Hi,
a missing part file for one of the parallel sinks is not necessarily a problem. 
This can happen if that parallel instance of the sink never received data after 
the job successfully restarted. 

Missing data, however, is a problem. Maybe I need some more information about 
your setup:

 - When are you inspecting the part files?
 - Do you shutdown the Flink Job before checking? If so, how do you shut it 
down.
 - When do you know whether all the data from Kafka was consumed by Flink and 
has passed through the pipeline into HDFS?

Cheers,
Aljoscha
> On 08 Mar 2016, at 13:19, Maximilian Bode <maximilian.b...@tngtech.com> wrote:
> 
> Hi Aljoscha,
> 
> oh I see. I was under the impression this file was used internally and the 
> output being completed at the end. Ok, so I extracted the relevant lines using
>       for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
> "$i.final"; done
> which seems to do the trick.
> 
> Unfortunately, now some records are missing again. In particular, there are 
> the files
>       part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
> .valid-length files
>       part-0-1, part-1-1, ..., part-10-0
> in the bucket, where job parallelism=12. So it looks to us as if one of the 
> files was not even created in the second attempt. This behavior seems to be 
> what somewhat reproducible, cf. my earlier email where the part-11 file 
> disappeared as well.
> 
> Thanks again for your help.
> 
> Cheers,
>  Max
> — 
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljos...@apache.org>:
>> 
>> Hi,
>> are you taking the “.valid-length” files into account. The problem with 
>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible 
>> to truncate files. So the trick we’re using is to write the length up to 
>> which a file is valid if we would normally need to truncate it. (If the job 
>> fails in the middle of writing the output files have to be truncated to a 
>> valid position.) For example, say you have an output file part-8-0. Now, if 
>> there exists a file part-8-0.valid-length this file tells you up to which 
>> position the file part-8-0 is valid. So you should only read up to this 
>> point.
>> 
>> The name of the “.valid-length” suffix can also be configured, by the way, 
>> as can all the other stuff.
>> 
>> If this is not the problem then I definitely have to investigate further. 
>> I’ll also look into the Hadoop 2.4.1 build problem.
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> 
>>> wrote:
>>> 
>>> Hi Aljoscha,
>>> thanks again for getting back to me. I built from your branch and the 
>>> exception is not occurring anymore. The RollingSink state can be restored.
>>> 
>>> Still, the exactly-once guarantee seems not to be fulfilled, there are 
>>> always some extra records after killing either a task manager or the job 
>>> manager. Do you have an idea where this behavior might be coming from? (I 
>>> guess concrete numbers will not help greatly as there are so many 
>>> parameters influencing them. Still, in our test scenario, we produce 2 
>>> million records in a Kafka queue but in the final output files there are on 
>>> the order of 2.1 million records, so a 5% error. The job is running in a 
>>> per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>> 
>>> On another (maybe unrelated) note: when I pulled your branch, the Travis 
>>> build did not go through for -Dhadoop.version=2.4.1. I have not looked into 
>>> this further as of now, is this one of the tests known to fail sometimes?
>>> 
>>> Cheers,
>>> Max
>>> <travis.log>
>>> — 
>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>>> 
>>>> Hi Maximilian,
>>>> sorry for the delay, we where very busy with the release last week. I had 
>>>> a hunch about the problem but I think I found a fix now. The problem is in 
>>>> snapshot restore. When restoring, the sink tries to clean up any files 
>>>> that where previously in progress. If Flink restores to the same snapshot 
>>>> twice in a row then it will try to clean up the leftover files twice but 
>>>> they are not there anymore, this causes the exception.
>>>> 
>>>> I have a fix in my branch: 
>>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>> 
>>>> Could you maybe try if this solves your problem? Which version of Flink 
>>>> are you using? You would have to build from source to try it out. 
>>>> Alternatively I could build it and put it onto a maven snapshot repository 
>>>> for you to try it out.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>>> 
>>>>> Hi,
>>>>> did you check whether there are any files at your specified HDFS output 
>>>>> location? If yes, which files are there?
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> 
>>>>>> wrote:
>>>>>> 
>>>>>> Just for the sake of completeness: this also happens when killing a task 
>>>>>> manager and is therefore probably unrelated to job manager HA.
>>>>>> 
>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
>>>>>>> <maximilian.b...@tngtech.com>:
>>>>>>> 
>>>>>>> Hi everyone,
>>>>>>> 
>>>>>>> unfortunately, I am running into another problem trying to establish 
>>>>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>> 
>>>>>>> When using
>>>>>>> 
>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new 
>>>>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>> output.addSink(sink);
>>>>>>> 
>>>>>>> and then killing the job manager, the new job manager is unable to 
>>>>>>> restore the old state throwing
>>>>>>> ---
>>>>>>> java.lang.Exception: Could not restore checkpointed state to operators 
>>>>>>> and functions
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: 
>>>>>>> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 
>>>>>>> was neither moved to pending nor is still in progress.
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>         ... 3 more
>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file 
>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither 
>>>>>>> moved to pending nor is still in progress.
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>         ... 4 more
>>>>>>> ---
>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact 
>>>>>>> using 2.4.0 – might this be the same issue?
>>>>>>> 
>>>>>>> Another thing I could think of is that the job is not configured 
>>>>>>> correctly and there is some sort of timing issue. The checkpoint 
>>>>>>> interval is 10 seconds, everything else was left at default value. Then 
>>>>>>> again, as the NonRollingBucketer is used, there should not be any 
>>>>>>> timing issues, right?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>> 
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>> 
>>>>>>> — 
>>>>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to