Hi Maximilian,
I’m currently running some tests again on a cluster to try and pinpoint the 
problem. Just to make sure, you are using Hadoop 2.4.1 with Yarn and Kafka 0.8, 
correct?

In the meantime, could you maybe run a test where you completely bypass Kafka, 
just so we can see whether the problem is in Kafka or the RollingSink. For my 
tests I created this source:

public static class LongSource extends RichSourceFunction<Long> implements 
Checkpointed<Long> {
   private static final long serialVersionUID = 1L;

   private long numElements;

   private int sleepInterval;

   private volatile boolean running = true;

   private long index = 0;

   public LongSource(long numElements, int sleepInterval) {
      this.numElements = numElements;
      this.sleepInterval = sleepInterval;
   }

   @Override
   public void run(SourceContext<Long> out) throws Exception {

      while (running && index < numElements) {
         out.collect(index);
         Thread.sleep(sleepInterval);
         index++;
      }

      while (running) {
         Thread.sleep(100);
      }
   }

   @Override
   public Long snapshotState(long l, long l1) throws Exception {
      return index;
   }

   @Override
   public void restoreState(Long aLong) throws Exception {
      this.index = aLong;
   }

   @Override
   public void cancel() {
      running = false;
   }
}

It’s a fault-tolerant source that emits elements and I can specify a sleep 
interval so that the job is not too fast and I can kill it before it finishes.

My testing job is this, which should be quite similar to yours:

DataStream<Long> inputStream = env.addSource(new 
DataGenerator.LongSource(2000000, 1));

DataStream<String> result = inputStream.map(new RichMapFunction<Long, String>() 
{
   LongCounter count;
   @Override
   public void open(Configuration parameters) throws Exception {
      count = getRuntimeContext().getLongCounter("count");
   }

   @Override
   public String map(Long aLong) throws Exception {
      count.add(1L);
      return "" + aLong;
   }
});

RollingSink<String> sink = new RollingSink<>(sinkPath);
sink.setBucketer(new NonRollingBucketer());
result.addSink(sink);

Cheers,
Aljoscha
> On 09 Mar 2016, at 08:31, Maximilian Bode <[email protected]> wrote:
> 
> Hi Aljoscha,
> 
> yeah I should have been clearer. I did mean those accumulators but am not 
> trusting them in the sense of total number (as you said, they are reset on 
> failure). On the other hand, if they do not change for a while it is pretty 
> obvious that the job has ingested everything in the queue. But you are right, 
> this is kind of heuristic. In combination with the fact that the 
> DateTimeBucketer does not create new folders I believe this should be 
> sufficient to decide when the job has basically finished, though.
> 
> So the setup is the following: The Flink job consists of a 
> FlinkKafkaConsumer08, a map containing just an IntCounter accumulator and 
> finally a rolling sink writing to HDFS. I start it in a per-job yarn session 
> with n=3, s=4. Then I pour 2 million records in the Kafka queue the 
> application is reading from. If no job/task managers are killed, the behavior 
> is exactly as expected: the output files in HDFS grow with time and I can 
> exactly monitor via the accumulator when every record has been ingested from 
> Kafka. After that time, I give the job a few seconds and then cancel it via 
> the web interface. Then still some time later (to give the job the chance to 
> output the few records still hanging around) a wc -l on the output files 
> yields exactly the expected 2 million.
> 
> On the other hand, if I kill a task manager while the job is in progress, one 
> of the 12 output files seems to be missing as described before. A wc -l on 
> only the relevant bytes as I mentioned in an earlier mail then leads to a 
> number smaller than 2 million.
> 
> We are using an FsStateBackend in HDFS with a checkpoint interval of 10s.
> 
> Cheers,
>  Max
> — 
> Maximilian Bode * Junior Consultant * [email protected]
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 08.03.2016 um 17:46 schrieb Aljoscha Krettek <[email protected]>:
>> 
>> Hi,
>> with accumulator you mean the ones you get from 
>> RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not 
>> fault-tolerant which means that the count in these probably doesn’t reflect 
>> the actual number of elements that were processed. When a job fails and 
>> restarts the accumulators should start from scratch. This makes me wonder 
>> how yours ever reach the required 2 mio, for it to be considered “done”.
>> 
>> This keeps getting more mysterious… 
>> 
>> By the way, what are you using as StateBackend and checkpoint interval?
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 13:38, Maximilian Bode <[email protected]> 
>>> wrote:
>>> 
>>> Hi,
>>> thanks for the fast answer. Answers inline.
>>> 
>>>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <[email protected]>:
>>>> 
>>>> Hi,
>>>> a missing part file for one of the parallel sinks is not necessarily a 
>>>> problem. This can happen if that parallel instance of the sink never 
>>>> received data after the job successfully restarted.
>>>> 
>>>> Missing data, however, is a problem. Maybe I need some more information 
>>>> about your setup:
>>>> 
>>>> - When are you inspecting the part files?
>>> Some time after the cluster is shut down
>>>> - Do you shutdown the Flink Job before checking? If so, how do you shut it 
>>>> down.
>>> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be 
>>> written only after cancelling the job, right?
>>>> - When do you know whether all the data from Kafka was consumed by Flink 
>>>> and has passed through the pipeline into HDFS?
>>> I have an accumulator in a map right before writing into HDFS. Also, the 
>>> RollingSink has a DataTimeBucketer which makes it transparent when no new 
>>> data is arriving anymore as the last bucket is from some minutes ago.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 08 Mar 2016, at 13:19, Maximilian Bode <[email protected]> 
>>>>> wrote:
>>>>> 
>>>>> Hi Aljoscha,
>>>>> 
>>>>> oh I see. I was under the impression this file was used internally and 
>>>>> the output being completed at the end. Ok, so I extracted the relevant 
>>>>> lines using
>>>>>   for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
>>>>> "$i.final"; done
>>>>> which seems to do the trick.
>>>>> 
>>>>> Unfortunately, now some records are missing again. In particular, there 
>>>>> are the files
>>>>>   part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
>>>>> .valid-length files
>>>>>   part-0-1, part-1-1, ..., part-10-0
>>>>> in the bucket, where job parallelism=12. So it looks to us as if one of 
>>>>> the files was not even created in the second attempt. This behavior seems 
>>>>> to be what somewhat reproducible, cf. my earlier email where the part-11 
>>>>> file disappeared as well.
>>>>> 
>>>>> Thanks again for your help.
>>>>> 
>>>>> Cheers,
>>>>> Max
>>>>> —
>>>>> Maximilian Bode * Junior Consultant * [email protected]
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>> 
>>>>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <[email protected]>:
>>>>>> 
>>>>>> Hi,
>>>>>> are you taking the “.valid-length” files into account. The problem with 
>>>>>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not 
>>>>>> possible to truncate files. So the trick we’re using is to write the 
>>>>>> length up to which a file is valid if we would normally need to truncate 
>>>>>> it. (If the job fails in the middle of writing the output files have to 
>>>>>> be truncated to a valid position.) For example, say you have an output 
>>>>>> file part-8-0. Now, if there exists a file part-8-0.valid-length this 
>>>>>> file tells you up to which position the file part-8-0 is valid. So you 
>>>>>> should only read up to this point.
>>>>>> 
>>>>>> The name of the “.valid-length” suffix can also be configured, by the 
>>>>>> way, as can all the other stuff.
>>>>>> 
>>>>>> If this is not the problem then I definitely have to investigate 
>>>>>> further. I’ll also look into the Hadoop 2.4.1 build problem.
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <[email protected]> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>> Hi Aljoscha,
>>>>>>> thanks again for getting back to me. I built from your branch and the 
>>>>>>> exception is not occurring anymore. The RollingSink state can be 
>>>>>>> restored.
>>>>>>> 
>>>>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are 
>>>>>>> always some extra records after killing either a task manager or the 
>>>>>>> job manager. Do you have an idea where this behavior might be coming 
>>>>>>> from? (I guess concrete numbers will not help greatly as there are so 
>>>>>>> many parameters influencing them. Still, in our test scenario, we 
>>>>>>> produce 2 million records in a Kafka queue but in the final output 
>>>>>>> files there are on the order of 2.1 million records, so a 5% error. The 
>>>>>>> job is running in a per-job YARN session with n=3, s=4 with a 
>>>>>>> checkpointing interval of 10s.)
>>>>>>> 
>>>>>>> On another (maybe unrelated) note: when I pulled your branch, the 
>>>>>>> Travis build did not go through for -Dhadoop.version=2.4.1. I have not 
>>>>>>> looked into this further as of now, is this one of the tests known to 
>>>>>>> fail sometimes?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>> <travis.log>
>>>>>>> —
>>>>>>> Maximilian Bode * Junior Consultant * [email protected]
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>> 
>>>>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <[email protected]>:
>>>>>>>> 
>>>>>>>> Hi Maximilian,
>>>>>>>> sorry for the delay, we where very busy with the release last week. I 
>>>>>>>> had a hunch about the problem but I think I found a fix now. The 
>>>>>>>> problem is in snapshot restore. When restoring, the sink tries to 
>>>>>>>> clean up any files that where previously in progress. If Flink 
>>>>>>>> restores to the same snapshot twice in a row then it will try to clean 
>>>>>>>> up the leftover files twice but they are not there anymore, this 
>>>>>>>> causes the exception.
>>>>>>>> 
>>>>>>>> I have a fix in my branch: 
>>>>>>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>>>>>> 
>>>>>>>> Could you maybe try if this solves your problem? Which version of 
>>>>>>>> Flink are you using? You would have to build from source to try it 
>>>>>>>> out. Alternatively I could build it and put it onto a maven snapshot 
>>>>>>>> repository for you to try it out.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <[email protected]> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> did you check whether there are any files at your specified HDFS 
>>>>>>>>> output location? If yes, which files are there?
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode 
>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>> 
>>>>>>>>>> Just for the sake of completeness: this also happens when killing a 
>>>>>>>>>> task manager and is therefore probably unrelated to job manager HA.
>>>>>>>>>> 
>>>>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
>>>>>>>>>>> <[email protected]>:
>>>>>>>>>>> 
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>> 
>>>>>>>>>>> unfortunately, I am running into another problem trying to 
>>>>>>>>>>> establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> 
>>>>>>>>>>> HDFS).
>>>>>>>>>>> 
>>>>>>>>>>> When using
>>>>>>>>>>> 
>>>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new 
>>>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>>>>>> output.addSink(sink);
>>>>>>>>>>> 
>>>>>>>>>>> and then killing the job manager, the new job manager is unable to 
>>>>>>>>>>> restore the old state throwing
>>>>>>>>>>> ---
>>>>>>>>>>> java.lang.Exception: Could not restore checkpointed state to 
>>>>>>>>>>> operators and functions
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:744)
>>>>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to 
>>>>>>>>>>> function: In-Progress file 
>>>>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither 
>>>>>>>>>>> moved to pending nor is still in progress.
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>>>>>     ... 3 more
>>>>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file 
>>>>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither 
>>>>>>>>>>> moved to pending nor is still in progress.
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>>>>>     ... 4 more
>>>>>>>>>>> ---
>>>>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in 
>>>>>>>>>>> fact using 2.4.0 – might this be the same issue?
>>>>>>>>>>> 
>>>>>>>>>>> Another thing I could think of is that the job is not configured 
>>>>>>>>>>> correctly and there is some sort of timing issue. The checkpoint 
>>>>>>>>>>> interval is 10 seconds, everything else was left at default value. 
>>>>>>>>>>> Then again, as the NonRollingBucketer is used, there should not be 
>>>>>>>>>>> any timing issues, right?
>>>>>>>>>>> 
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Max
>>>>>>>>>>> 
>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>>>>>> 
>>>>>>>>>>> —
>>>>>>>>>>> Maximilian Bode * Junior Consultant * [email protected]
>>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to