Hi Maximilian, sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out. Cheers, Aljoscha > On 03 Mar 2016, at 14:50, Aljoscha Krettek <[email protected]> wrote: > > Hi, > did you check whether there are any files at your specified HDFS output > location? If yes, which files are there? > > Cheers, > Aljoscha >> On 03 Mar 2016, at 14:29, Maximilian Bode <[email protected]> >> wrote: >> >> Just for the sake of completeness: this also happens when killing a task >> manager and is therefore probably unrelated to job manager HA. >> >>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode >>> <[email protected]>: >>> >>> Hi everyone, >>> >>> unfortunately, I am running into another problem trying to establish >>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS). >>> >>> When using >>> >>> RollingSink<Tuple3<Integer,Integer,String>> sink = new >>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound"); >>> sink.setBucketer(new NonRollingBucketer()); >>> output.addSink(sink); >>> >>> and then killing the job manager, the new job manager is unable to restore >>> the old state throwing >>> --- >>> java.lang.Exception: Could not restore checkpointed state to operators and >>> functions >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:744) >>> Caused by: java.lang.Exception: Failed to restore state to function: >>> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was >>> neither moved to pending nor is still in progress. >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446) >>> ... 3 more >>> Caused by: java.lang.RuntimeException: In-Progress file >>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to >>> pending nor is still in progress. >>> at >>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686) >>> at >>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) >>> ... 4 more >>> --- >>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using >>> 2.4.0 – might this be the same issue? >>> >>> Another thing I could think of is that the job is not configured correctly >>> and there is some sort of timing issue. The checkpoint interval is 10 >>> seconds, everything else was left at default value. Then again, as the >>> NonRollingBucketer is used, there should not be any timing issues, right? >>> >>> Cheers, >>> Max >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-2979 >>> >>> — >>> Maximilian Bode * Junior Consultant * [email protected] >>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>> >> >
