Hi,
are you taking the “.valid-length” files into account. The problem with doing 
“exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to 
truncate files. So the trick we’re using is to write the length up to which a 
file is valid if we would normally need to truncate it. (If the job fails in 
the middle of writing the output files have to be truncated to a valid 
position.) For example, say you have an output file part-8-0. Now, if there 
exists a file part-8-0.valid-length this file tells you up to which position 
the file part-8-0 is valid. So you should only read up to this point.

The name of the “.valid-length” suffix can also be configured, by the way, as 
can all the other stuff.

If this is not the problem then I definitely have to investigate further. I’ll 
also look into the Hadoop 2.4.1 build problem.

Cheers,
Aljoscha
> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> wrote:
> 
> Hi Aljoscha,
> thanks again for getting back to me. I built from your branch and the 
> exception is not occurring anymore. The RollingSink state can be restored.
> 
> Still, the exactly-once guarantee seems not to be fulfilled, there are always 
> some extra records after killing either a task manager or the job manager. Do 
> you have an idea where this behavior might be coming from? (I guess concrete 
> numbers will not help greatly as there are so many parameters influencing 
> them. Still, in our test scenario, we produce 2 million records in a Kafka 
> queue but in the final output files there are on the order of 2.1 million 
> records, so a 5% error. The job is running in a per-job YARN session with 
> n=3, s=4 with a checkpointing interval of 10s.)
> 
> On another (maybe unrelated) note: when I pulled your branch, the Travis 
> build did not go through for -Dhadoop.version=2.4.1. I have not looked into 
> this further as of now, is this one of the tests known to fail sometimes?
> 
> Cheers,
>  Max
> <travis.log>
> — 
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>:
>> 
>> Hi Maximilian,
>> sorry for the delay, we where very busy with the release last week. I had a 
>> hunch about the problem but I think I found a fix now. The problem is in 
>> snapshot restore. When restoring, the sink tries to clean up any files that 
>> where previously in progress. If Flink restores to the same snapshot twice 
>> in a row then it will try to clean up the leftover files twice but they are 
>> not there anymore, this causes the exception.
>> 
>> I have a fix in my branch: 
>> https://github.com/aljoscha/flink/tree/rolling-sink-fix
>> 
>> Could you maybe try if this solves your problem? Which version of Flink are 
>> you using? You would have to build from source to try it out. Alternatively 
>> I could build it and put it onto a maven snapshot repository for you to try 
>> it out.
>> 
>> Cheers,
>> Aljoscha
>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote:
>>> 
>>> Hi,
>>> did you check whether there are any files at your specified HDFS output 
>>> location? If yes, which files are there?
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> 
>>>> wrote:
>>>> 
>>>> Just for the sake of completeness: this also happens when killing a task 
>>>> manager and is therefore probably unrelated to job manager HA.
>>>> 
>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
>>>>> <maximilian.b...@tngtech.com>:
>>>>> 
>>>>> Hi everyone,
>>>>> 
>>>>> unfortunately, I am running into another problem trying to establish 
>>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>> 
>>>>> When using
>>>>> 
>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new 
>>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>> output.addSink(sink);
>>>>> 
>>>>> and then killing the job manager, the new job manager is unable to 
>>>>> restore the old state throwing
>>>>> ---
>>>>> java.lang.Exception: Could not restore checkpointed state to operators 
>>>>> and functions
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>   at java.lang.Thread.run(Thread.java:744)
>>>>> Caused by: java.lang.Exception: Failed to restore state to function: 
>>>>> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 
>>>>> was neither moved to pending nor is still in progress.
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>   ... 3 more
>>>>> Caused by: java.lang.RuntimeException: In-Progress file 
>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved 
>>>>> to pending nor is still in progress.
>>>>>   at 
>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>   at 
>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>   ... 4 more
>>>>> ---
>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact 
>>>>> using 2.4.0 – might this be the same issue?
>>>>> 
>>>>> Another thing I could think of is that the job is not configured 
>>>>> correctly and there is some sort of timing issue. The checkpoint interval 
>>>>> is 10 seconds, everything else was left at default value. Then again, as 
>>>>> the NonRollingBucketer is used, there should not be any timing issues, 
>>>>> right?
>>>>> 
>>>>> Cheers,
>>>>> Max
>>>>> 
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>> 
>>>>> — 
>>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to