Hi everyone,

unfortunately, I am running into another problem trying to establish exactly 
once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).

When using

RollingSink<Tuple3<Integer,Integer,String>> sink = new 
RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
sink.setBucketer(new NonRollingBucketer());
output.addSink(sink);

and then killing the job manager, the new job manager is unable to restore the 
old state throwing
---
java.lang.Exception: Could not restore checkpointed state to operators and 
functions
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.Exception: Failed to restore state to function: 
In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was 
neither moved to pending nor is still in progress.
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
        ... 3 more
Caused by: java.lang.RuntimeException: In-Progress file 
hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to 
pending nor is still in progress.
        at 
org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
        at 
org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
        ... 4 more
---
I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 
2.4.0 – might this be the same issue?

Another thing I could think of is that the job is not configured correctly and 
there is some sort of timing issue. The checkpoint interval is 10 seconds, 
everything else was left at default value. Then again, as the 
NonRollingBucketer is used, there should not be any timing issues, right?

Cheers,
 Max

[1] https://issues.apache.org/jira/browse/FLINK-2979

—
Maximilian Bode * Junior Consultant * [email protected]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to