Hi Maximilian,
I’m currently running some tests again on a cluster to try and pinpoint the
problem. Just to make sure, you are using Hadoop 2.4.1 with Yarn and Kafka 0.8,
correct?
In the meantime, could you maybe run a test where you completely bypass Kafka,
just so we can see whether the problem is in Kafka or the RollingSink. For my
tests I created this source:
public static class LongSource extends RichSourceFunction<Long> implements
Checkpointed<Long> {
private static final long serialVersionUID = 1L;
private long numElements;
private int sleepInterval;
private volatile boolean running = true;
private long index = 0;
public LongSource(long numElements, int sleepInterval) {
this.numElements = numElements;
this.sleepInterval = sleepInterval;
}
@Override
public void run(SourceContext<Long> out) throws Exception {
while (running && index < numElements) {
out.collect(index);
Thread.sleep(sleepInterval);
index++;
}
while (running) {
Thread.sleep(100);
}
}
@Override
public Long snapshotState(long l, long l1) throws Exception {
return index;
}
@Override
public void restoreState(Long aLong) throws Exception {
this.index = aLong;
}
@Override
public void cancel() {
running = false;
}
}
It’s a fault-tolerant source that emits elements and I can specify a sleep
interval so that the job is not too fast and I can kill it before it finishes.
My testing job is this, which should be quite similar to yours:
DataStream<Long> inputStream = env.addSource(new
DataGenerator.LongSource(2000000, 1));
DataStream<String> result = inputStream.map(new RichMapFunction<Long, String>()
{
LongCounter count;
@Override
public void open(Configuration parameters) throws Exception {
count = getRuntimeContext().getLongCounter("count");
}
@Override
public String map(Long aLong) throws Exception {
count.add(1L);
return "" + aLong;
}
});
RollingSink<String> sink = new RollingSink<>(sinkPath);
sink.setBucketer(new NonRollingBucketer());
result.addSink(sink);
Cheers,
Aljoscha
> On 09 Mar 2016, at 08:31, Maximilian Bode <[email protected]> wrote:
>
> Hi Aljoscha,
>
> yeah I should have been clearer. I did mean those accumulators but am not
> trusting them in the sense of total number (as you said, they are reset on
> failure). On the other hand, if they do not change for a while it is pretty
> obvious that the job has ingested everything in the queue. But you are right,
> this is kind of heuristic. In combination with the fact that the
> DateTimeBucketer does not create new folders I believe this should be
> sufficient to decide when the job has basically finished, though.
>
> So the setup is the following: The Flink job consists of a
> FlinkKafkaConsumer08, a map containing just an IntCounter accumulator and
> finally a rolling sink writing to HDFS. I start it in a per-job yarn session
> with n=3, s=4. Then I pour 2 million records in the Kafka queue the
> application is reading from. If no job/task managers are killed, the behavior
> is exactly as expected: the output files in HDFS grow with time and I can
> exactly monitor via the accumulator when every record has been ingested from
> Kafka. After that time, I give the job a few seconds and then cancel it via
> the web interface. Then still some time later (to give the job the chance to
> output the few records still hanging around) a wc -l on the output files
> yields exactly the expected 2 million.
>
> On the other hand, if I kill a task manager while the job is in progress, one
> of the 12 output files seems to be missing as described before. A wc -l on
> only the relevant bytes as I mentioned in an earlier mail then leads to a
> number smaller than 2 million.
>
> We are using an FsStateBackend in HDFS with a checkpoint interval of 10s.
>
> Cheers,
> Max
> —
> Maximilian Bode * Junior Consultant * [email protected]
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>> Am 08.03.2016 um 17:46 schrieb Aljoscha Krettek <[email protected]>:
>>
>> Hi,
>> with accumulator you mean the ones you get from
>> RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not
>> fault-tolerant which means that the count in these probably doesn’t reflect
>> the actual number of elements that were processed. When a job fails and
>> restarts the accumulators should start from scratch. This makes me wonder
>> how yours ever reach the required 2 mio, for it to be considered “done”.
>>
>> This keeps getting more mysterious…
>>
>> By the way, what are you using as StateBackend and checkpoint interval?
>>
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 13:38, Maximilian Bode <[email protected]>
>>> wrote:
>>>
>>> Hi,
>>> thanks for the fast answer. Answers inline.
>>>
>>>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <[email protected]>:
>>>>
>>>> Hi,
>>>> a missing part file for one of the parallel sinks is not necessarily a
>>>> problem. This can happen if that parallel instance of the sink never
>>>> received data after the job successfully restarted.
>>>>
>>>> Missing data, however, is a problem. Maybe I need some more information
>>>> about your setup:
>>>>
>>>> - When are you inspecting the part files?
>>> Some time after the cluster is shut down
>>>> - Do you shutdown the Flink Job before checking? If so, how do you shut it
>>>> down.
>>> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be
>>> written only after cancelling the job, right?
>>>> - When do you know whether all the data from Kafka was consumed by Flink
>>>> and has passed through the pipeline into HDFS?
>>> I have an accumulator in a map right before writing into HDFS. Also, the
>>> RollingSink has a DataTimeBucketer which makes it transparent when no new
>>> data is arriving anymore as the last bucket is from some minutes ago.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 08 Mar 2016, at 13:19, Maximilian Bode <[email protected]>
>>>>> wrote:
>>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> oh I see. I was under the impression this file was used internally and
>>>>> the output being completed at the end. Ok, so I extracted the relevant
>>>>> lines using
>>>>> for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" >
>>>>> "$i.final"; done
>>>>> which seems to do the trick.
>>>>>
>>>>> Unfortunately, now some records are missing again. In particular, there
>>>>> are the files
>>>>> part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding
>>>>> .valid-length files
>>>>> part-0-1, part-1-1, ..., part-10-0
>>>>> in the bucket, where job parallelism=12. So it looks to us as if one of
>>>>> the files was not even created in the second attempt. This behavior seems
>>>>> to be what somewhat reproducible, cf. my earlier email where the part-11
>>>>> file disappeared as well.
>>>>>
>>>>> Thanks again for your help.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>> —
>>>>> Maximilian Bode * Junior Consultant * [email protected]
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>
>>>>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <[email protected]>:
>>>>>>
>>>>>> Hi,
>>>>>> are you taking the “.valid-length” files into account. The problem with
>>>>>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not
>>>>>> possible to truncate files. So the trick we’re using is to write the
>>>>>> length up to which a file is valid if we would normally need to truncate
>>>>>> it. (If the job fails in the middle of writing the output files have to
>>>>>> be truncated to a valid position.) For example, say you have an output
>>>>>> file part-8-0. Now, if there exists a file part-8-0.valid-length this
>>>>>> file tells you up to which position the file part-8-0 is valid. So you
>>>>>> should only read up to this point.
>>>>>>
>>>>>> The name of the “.valid-length” suffix can also be configured, by the
>>>>>> way, as can all the other stuff.
>>>>>>
>>>>>> If this is not the problem then I definitely have to investigate
>>>>>> further. I’ll also look into the Hadoop 2.4.1 build problem.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>> thanks again for getting back to me. I built from your branch and the
>>>>>>> exception is not occurring anymore. The RollingSink state can be
>>>>>>> restored.
>>>>>>>
>>>>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are
>>>>>>> always some extra records after killing either a task manager or the
>>>>>>> job manager. Do you have an idea where this behavior might be coming
>>>>>>> from? (I guess concrete numbers will not help greatly as there are so
>>>>>>> many parameters influencing them. Still, in our test scenario, we
>>>>>>> produce 2 million records in a Kafka queue but in the final output
>>>>>>> files there are on the order of 2.1 million records, so a 5% error. The
>>>>>>> job is running in a per-job YARN session with n=3, s=4 with a
>>>>>>> checkpointing interval of 10s.)
>>>>>>>
>>>>>>> On another (maybe unrelated) note: when I pulled your branch, the
>>>>>>> Travis build did not go through for -Dhadoop.version=2.4.1. I have not
>>>>>>> looked into this further as of now, is this one of the tests known to
>>>>>>> fail sometimes?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>> <travis.log>
>>>>>>> —
>>>>>>> Maximilian Bode * Junior Consultant * [email protected]
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>
>>>>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <[email protected]>:
>>>>>>>>
>>>>>>>> Hi Maximilian,
>>>>>>>> sorry for the delay, we where very busy with the release last week. I
>>>>>>>> had a hunch about the problem but I think I found a fix now. The
>>>>>>>> problem is in snapshot restore. When restoring, the sink tries to
>>>>>>>> clean up any files that where previously in progress. If Flink
>>>>>>>> restores to the same snapshot twice in a row then it will try to clean
>>>>>>>> up the leftover files twice but they are not there anymore, this
>>>>>>>> causes the exception.
>>>>>>>>
>>>>>>>> I have a fix in my branch:
>>>>>>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>>>>>>
>>>>>>>> Could you maybe try if this solves your problem? Which version of
>>>>>>>> Flink are you using? You would have to build from source to try it
>>>>>>>> out. Alternatively I could build it and put it onto a maven snapshot
>>>>>>>> repository for you to try it out.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> did you check whether there are any files at your specified HDFS
>>>>>>>>> output location? If yes, which files are there?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode
>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>> Just for the sake of completeness: this also happens when killing a
>>>>>>>>>> task manager and is therefore probably unrelated to job manager HA.
>>>>>>>>>>
>>>>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode
>>>>>>>>>>> <[email protected]>:
>>>>>>>>>>>
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>
>>>>>>>>>>> unfortunately, I am running into another problem trying to
>>>>>>>>>>> establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 ->
>>>>>>>>>>> HDFS).
>>>>>>>>>>>
>>>>>>>>>>> When using
>>>>>>>>>>>
>>>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new
>>>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>>>>>> output.addSink(sink);
>>>>>>>>>>>
>>>>>>>>>>> and then killing the job manager, the new job manager is unable to
>>>>>>>>>>> restore the old state throwing
>>>>>>>>>>> ---
>>>>>>>>>>> java.lang.Exception: Could not restore checkpointed state to
>>>>>>>>>>> operators and functions
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to
>>>>>>>>>>> function: In-Progress file
>>>>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither
>>>>>>>>>>> moved to pending nor is still in progress.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>>>>> ... 3 more
>>>>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file
>>>>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither
>>>>>>>>>>> moved to pending nor is still in progress.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>>>>> ... 4 more
>>>>>>>>>>> ---
>>>>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in
>>>>>>>>>>> fact using 2.4.0 – might this be the same issue?
>>>>>>>>>>>
>>>>>>>>>>> Another thing I could think of is that the job is not configured
>>>>>>>>>>> correctly and there is some sort of timing issue. The checkpoint
>>>>>>>>>>> interval is 10 seconds, everything else was left at default value.
>>>>>>>>>>> Then again, as the NonRollingBucketer is used, there should not be
>>>>>>>>>>> any timing issues, right?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Max
>>>>>>>>>>>
>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>>>>>>
>>>>>>>>>>> —
>>>>>>>>>>> Maximilian Bode * Junior Consultant * [email protected]
>>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>