Hi everyone, unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
When using
RollingSink<Tuple3<Integer,Integer,String>> sink = new
RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
sink.setBucketer(new NonRollingBucketer());
output.addSink(sink);
and then killing the job manager, the new job manager is unable to restore the
old state throwing
---
java.lang.Exception: Could not restore checkpointed state to operators and
functions
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.Exception: Failed to restore state to function:
In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was
neither moved to pending nor is still in progress.
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
... 3 more
Caused by: java.lang.RuntimeException: In-Progress file
hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to
pending nor is still in progress.
at
org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
at
org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
... 4 more
---
I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using
2.4.0 – might this be the same issue?
Another thing I could think of is that the job is not configured correctly and
there is some sort of timing issue. The checkpoint interval is 10 seconds,
everything else was left at default value. Then again, as the
NonRollingBucketer is used, there should not be any timing issues, right?
Cheers,
Max
[1] https://issues.apache.org/jira/browse/FLINK-2979
—
Maximilian Bode * Junior Consultant * [email protected]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
signature.asc
Description: Message signed with OpenPGP using GPGMail
