Hi Stefan,

Sorry but it doesn't seem immediately clear to me what's a good way to use
https://github.com/king/bravo.

How are people using it? Would you for example modify build.gradle somehow
to publish the bravo as a library locally/internally? Or add code directly
in the bravo project (locally) and run it from there (using an IDE, for
example)? Also it doesn't seem like the bravo gradle project supports
building a flink job jar, but if it does, how do I do it?

Thanks.

On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> wrote:

> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>
> > How would you assume that backpressure would influence your updates?
> Updates to each local state still happen event-by-event, in a single
> reader/writing thread.
>
> Sure, just an ignorant guess by me. I'm not familiar with most of Flink's
> internals. Any way high backpressure is not a seen on this job after it has
> caught up the lag, so at I thought it would be worth mentioning.
>
> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <s.rich...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>:
>>
>> > you could take a look at Bravo [1] to query your savepoints and to
>> check if the state in the savepoint complete w.r.t your expectations
>>
>> Thanks. I'm not 100% if this is the case, but to me it seemed like the
>> missed ids were being logged by the reducer soon after the job had started
>> (after restoring a savepoint). But on the other hand, after that I also
>> made another savepoint & restored that, so what I could check is: does that
>> next savepoint have the missed ids that were logged (a couple of minutes
>> before the savepoint was created, so there should've been more than enough
>> time to add them to the state before the savepoint was triggered) or not.
>> Any way, if I would be able to verify with Bravo that the ids are missing
>> from the savepoint (even though reduced logged that it saw them), would
>> that help in figuring out where they are lost? Is there some major
>> difference compared to just looking at the final output after window has
>> been triggered?
>>
>>
>>
>> I think that makes a difference. For example, you can investigate if
>> there is a state loss or a problem with the windowing. In the savepoint you
>> could see which keys exists and to which windows they are assigned. Also
>> just to make sure there is no misunderstanding: only elements that are in
>> the state at the start of a savepoint are expected to be part of the
>> savepoint; all elements between start and completion of the savepoint are
>> not expected to be part of the savepoint.
>>
>>
>> > I also doubt that the problem is about backpressure after restore,
>> because the job will only continue running after the state restore is
>> already completed.
>>
>> Yes, I'm not suspecting that the state restoring would be the problem
>> either. My concern was about backpressure possibly messing with the updates
>> of reducing state? I would tend to suspect that updating the state
>> consistently is what fails, where heavy load / backpressure might be a
>> factor.
>>
>>
>>
>> How would you assume that backpressure would influence your updates?
>> Updates to each local state still happen event-by-event, in a single
>> reader/writing thread.
>>
>>
>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> you could take a look at Bravo [1] to query your savepoints and to check
>>> if the state in the savepoint complete w.r.t your expectations. I somewhat
>>> doubt that there is a general problem with the state/savepoints because
>>> many users are successfully running it on a large state and I am not aware
>>> of any data loss problems, but nothing is impossible. What the savepoint
>>> does is also straight forward: iterate a db snapshot and write all
>>> key/value pairs to disk, so all data that was in the db at the time of the
>>> savepoint, should show up. I also doubt that the problem is about
>>> backpressure after restore, because the job will only continue running
>>> after the state restore is already completed. Did you check if you are
>>> using exactly-one-semantics or at-least-once semantics? Also did you check
>>> that the kafka consumer start position is configured properly [2]? Are
>>> watermarks generated as expected after restore?
>>>
>>> One more unrelated high-level comment that I have: for a granularity of
>>> 24h windows, I wonder if it would not make sense to use a batch job instead?
>>>
>>> Best,
>>> Stefan
>>>
>>> [1] https://github.com/king/bravo
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>
>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>:
>>>
>>> Thanks for the suggestions!
>>>
>>> > In general, it would be tremendously helpful to have a minimal working
>>> example which allows to reproduce the problem.
>>>
>>> Definitely. The problem with reproducing has been that this only seems
>>> to happen in the bigger production data volumes.
>>>
>>> That's why I'm hoping to find a way to debug this with the production
>>> data. With that it seems to consistently cause some misses every time the
>>> job is killed/restored.
>>>
>>> > check if it happens for shorter windows, like 1h etc
>>>
>>> What would be the benefit of that compared to 24h window?
>>>
>>> > simplify the job to not use a reduce window but simply a time window
>>> which outputs the window events. Then counting the input and output events
>>> should allow you to verify the results. If you are not seeing missing
>>> events, then it could have something to do with the reducing state used in
>>> the reduce function.
>>>
>>> Hm, maybe, but not sure how useful that would be, because it wouldn't
>>> yet prove that it's related to reducing, because not having a reduce
>>> function could also mean smaller load on the job, which might alone be
>>> enough to make the problem not manifest.
>>>
>>> Is there a way to debug what goes into the reducing state (including
>>> what gets removed or overwritten and what restored), if that makes sense..?
>>> Maybe some suitable logging could be used to prove that the lost data is
>>> written to the reducing state (or at least asked to be written), but not
>>> found any more when the window closes and state is flushed?
>>>
>>> On configuration once more, we're using RocksDB state backend with
>>> asynchronous incremental checkpointing. The state is restored from
>>> savepoints though, we haven't been using those checkpoints in these tests
>>> (although they could be used in case of crashes – but we haven't had those
>>> now).
>>>
>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> another idea to further narrow down the problem could be to simplify
>>>> the job to not use a reduce window but simply a time window which outputs
>>>> the window events. Then counting the input and output events should allow
>>>> you to verify the results. If you are not seeing missing events, then it
>>>> could have something to do with the reducing state used in the reduce
>>>> function.
>>>>
>>>> In general, it would be tremendously helpful to have a minimal working
>>>> example which allows to reproduce the problem.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <
>>>> and...@data-artisans.com> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> can you try to reduce the job to minimal reproducible example and
>>>>> share the job and input?
>>>>>
>>>>> For example:
>>>>> - some simple records as input, e.g. tuples of primitive types saved
>>>>> as cvs
>>>>> - minimal deduplication job which processes them and misses records
>>>>> - check if it happens for shorter windows, like 1h etc
>>>>> - setup which you use for the job, ideally locally reproducible or
>>>>> cloud
>>>>>
>>>>> Best,
>>>>> Andrey
>>>>>
>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>
>>>>> Sorry to insist, but we seem to be blocked for any serious usage of
>>>>> state in Flink if we can't rely on it to not miss data in case of restore.
>>>>>
>>>>> Would anyone have suggestions for how to troubleshoot this? So far I
>>>>> have verified with DEBUG logs that our reduce function gets to process 
>>>>> also
>>>>> the data that is missing from window output.
>>>>>
>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Andrey,
>>>>>>
>>>>>> To rule out for good any questions about sink behaviour, the job was
>>>>>> killed and started with an additional Kafka sink.
>>>>>>
>>>>>> The same number of ids were missed in both outputs: KafkaSink &
>>>>>> BucketingSink.
>>>>>>
>>>>>> I wonder what would be the next steps in debugging?
>>>>>>
>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks, Andrey.
>>>>>>>
>>>>>>> > so it means that the savepoint does not loose at least some
>>>>>>> dropped records.
>>>>>>>
>>>>>>> I'm not sure what you mean by that? I mean, it was known from the
>>>>>>> beginning, that not everything is lost before/after restoring a 
>>>>>>> savepoint,
>>>>>>> just some records around the time of restoration. It's not 100% clear
>>>>>>> whether records are lost before making a savepoint or after restoring 
>>>>>>> it.
>>>>>>> Although, based on the new DEBUG logs it seems more like losing some
>>>>>>> records that are seen ~soon after restoring. It seems like Flink would 
>>>>>>> be
>>>>>>> somehow confused either about the restored state vs. new inserts to 
>>>>>>> state.
>>>>>>> This could also be somehow linked to the high back pressure on the kafka
>>>>>>> source while the stream is catching up.
>>>>>>>
>>>>>>> > If it is feasible for your setup, I suggest to insert one more map
>>>>>>> function after reduce and before sink.
>>>>>>> > etc.
>>>>>>>
>>>>>>> Isn't that the same thing that we discussed before? Nothing is sent
>>>>>>> to BucketingSink before the window closes, so I don't see how it would 
>>>>>>> make
>>>>>>> any difference if we replace the BucketingSink with a map function or
>>>>>>> another sink type. We don't create or restore savepoints during the time
>>>>>>> when BucketingSink gets input or has open buckets – that happens at a 
>>>>>>> much
>>>>>>> later time of day. I would focus on figuring out why the records are 
>>>>>>> lost
>>>>>>> while the window is open. But I don't know how to do that. Would you 
>>>>>>> have
>>>>>>> any additional suggestions?
>>>>>>>
>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>
>>>>>>>> Hi Juho,
>>>>>>>>
>>>>>>>> so it means that the savepoint does not loose at least some dropped
>>>>>>>> records.
>>>>>>>>
>>>>>>>> If it is feasible for your setup, I suggest to insert one more map
>>>>>>>> function after reduce and before sink.
>>>>>>>> The map function should be called right after window is triggered
>>>>>>>> but before flushing to s3.
>>>>>>>> The result of reduce (deduped record) could be logged there.
>>>>>>>> This should allow to check whether the processed distinct records
>>>>>>>> were buffered in the state after the restoration from the savepoint or 
>>>>>>>> not.
>>>>>>>> If they were buffered we should see that there was an attempt to write 
>>>>>>>> them
>>>>>>>> to the sink from the state.
>>>>>>>>
>>>>>>>> Another suggestion is to try to write records to some other sink or
>>>>>>>> to both.
>>>>>>>> E.g. if you can access file system of workers, maybe just into
>>>>>>>> local files and check whether the records are also dropped there.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Andrey
>>>>>>>>
>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>>>
>>>>>>>> Hi Andrey!
>>>>>>>>
>>>>>>>> I was finally able to gather the DEBUG logs that you suggested. In
>>>>>>>> short, the reducer logged that it processed at least some of the ids 
>>>>>>>> that
>>>>>>>> were missing from the output.
>>>>>>>>
>>>>>>>> "At least some", because I didn't have the job running with DEBUG
>>>>>>>> logs for the full 24-hour window period. So I was only able to look up 
>>>>>>>> if I
>>>>>>>> can find *some* of the missing ids in the DEBUG logs. Which I did
>>>>>>>> indeed.
>>>>>>>>
>>>>>>>> I changed the DistinctFunction.java to do this:
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public Map<String, String> reduce(Map<String, String> value1,
>>>>>>>> Map<String, String> value2) {
>>>>>>>>         LOG.debug("DistinctFunction.reduce returns: {}={}",
>>>>>>>> value1.get("field"), value1.get("id"));
>>>>>>>>         return value1;
>>>>>>>>     }
>>>>>>>>
>>>>>>>> Then:
>>>>>>>>
>>>>>>>> vi flink-1.6.0/conf/log4j.properties
>>>>>>>>
>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>>
>>>>>>>> Then I ran the following kind of test:
>>>>>>>>
>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18
>>>>>>>> 08:35 UTC 2018
>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13,
>>>>>>>> restored from that previous cluster's savepoint
>>>>>>>> - Ran until caught up offsets
>>>>>>>> - Cancelled the job with a new savepoint
>>>>>>>> - Started a new job _without_ DEBUG, which restored the new
>>>>>>>> savepoint, let it keep running so that it will eventually write the 
>>>>>>>> output
>>>>>>>>
>>>>>>>> Then on the next day, after results had been flushed when the
>>>>>>>> 24-hour window closed, I compared the results again with a batch 
>>>>>>>> version's
>>>>>>>> output. And found some missing ids as usual.
>>>>>>>>
>>>>>>>> I drilled down to one specific missing id (I'm replacing the actual
>>>>>>>> value with AN12345 below), which was not found in the stream output, 
>>>>>>>> but
>>>>>>>> was found in batch output & flink DEBUG logs.
>>>>>>>>
>>>>>>>> Related to that id, I gathered the following information:
>>>>>>>>
>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>>>>>>>
>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first time,
>>>>>>>> proved by this log line:
>>>>>>>> 2018-09-18 09:14:29,085 DEBUG
>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction                  -
>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>>
>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>>>>>>>
>>>>>>>> (
>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1 min
>>>>>>>> delay before next)
>>>>>>>> /
>>>>>>>> more occurrences of DistinctFunction.reduce
>>>>>>>> )
>>>>>>>>
>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last time
>>>>>>>>
>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>>>>>>>
>>>>>>>> To be noted, there was high backpressure after restoring from
>>>>>>>> savepoint until the stream caught up with the kafka offsets. Although, 
>>>>>>>> our
>>>>>>>> job uses assign timestamps & watermarks on the flink kafka consumer 
>>>>>>>> itself,
>>>>>>>> so event time of all partitions is synchronized. As expected, we don't 
>>>>>>>> get
>>>>>>>> any late data in the late data side output.
>>>>>>>>
>>>>>>>> From this we can see that the missing ids are processed by the
>>>>>>>> reducer, but they must get lost somewhere before the 24-hour window is
>>>>>>>> triggered.
>>>>>>>>
>>>>>>>> I think it's worth mentioning once more that the stream doesn't
>>>>>>>> miss any ids if we let it's running without interruptions / state 
>>>>>>>> restoring.
>>>>>>>>
>>>>>>>> What's next?
>>>>>>>>
>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <
>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Juho,
>>>>>>>>>
>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a
>>>>>>>>> burst of input
>>>>>>>>>
>>>>>>>>> This is of course totally true, my understanding is the same. We
>>>>>>>>> cannot exclude problem there for sure, just savepoints are used a lot 
>>>>>>>>> w/o
>>>>>>>>> problem reports and BucketingSink is known to be problematic with s3. 
>>>>>>>>> That
>>>>>>>>> is why, I asked you:
>>>>>>>>>
>>>>>>>>> > You also wrote that the timestamps of lost event are 'probably'
>>>>>>>>> around the time of the savepoint, if it is not yet for sure I would 
>>>>>>>>> also
>>>>>>>>> check it.
>>>>>>>>>
>>>>>>>>> Although, bucketing sink might loose any data at the end of the
>>>>>>>>> day (also from the middle). The fact, that it is always around the 
>>>>>>>>> time of
>>>>>>>>> taking a savepoint and not random, is surely suspicious and possible
>>>>>>>>> savepoint failures need to be investigated.
>>>>>>>>>
>>>>>>>>> Regarding the s3 problem, s3 doc says:
>>>>>>>>>
>>>>>>>>> > The caveat is that if you make a HEAD or GET request to the key
>>>>>>>>> name (to find if the object exists) before creating the object, 
>>>>>>>>> Amazon S3
>>>>>>>>> provides 'eventual consistency' for read-after-write.
>>>>>>>>>
>>>>>>>>> The algorithm you suggest is how it is roughly implemented now
>>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that
>>>>>>>>> 'eventual consistency’ means that even if you just created file (its 
>>>>>>>>> name
>>>>>>>>> is key) it can be that you do not get it in the list or exists (HEAD)
>>>>>>>>> returns false and you risk to rewrite the previous part.
>>>>>>>>>
>>>>>>>>> The BucketingSink was designed for a standard file system. s3 is
>>>>>>>>> used over a file system wrapper atm but does not always provide 
>>>>>>>>> normal file
>>>>>>>>> system guarantees. See also last example in [1].
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Andrey
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>>
>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>>>>
>>>>>>>>> Andrey, thank you very much for the debugging suggestions, I'll
>>>>>>>>> try them.
>>>>>>>>>
>>>>>>>>> In the meanwhile two more questions, please:
>>>>>>>>>
>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it for
>>>>>>>>> sure. I would also check whether the size of missing events is around 
>>>>>>>>> the
>>>>>>>>> batch size of BucketingSink or not.
>>>>>>>>>
>>>>>>>>> Fair enough, but I also want to focus on debugging the most
>>>>>>>>> probable subject first. So what do you think about this – true or 
>>>>>>>>> false:
>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst of 
>>>>>>>>> input.
>>>>>>>>> Around the state restoring point (middle of the day) it doesn't get 
>>>>>>>>> any
>>>>>>>>> input, so it can't lose anything either. Isn't this true, or have I 
>>>>>>>>> totally
>>>>>>>>> missed how Flink works in triggering window results? I would not 
>>>>>>>>> expect
>>>>>>>>> there to be any optimization that speculatively triggers early 
>>>>>>>>> results of a
>>>>>>>>> regular time window to the downstream operators.
>>>>>>>>>
>>>>>>>>> > The old BucketingSink has in general problem with s3. Internally
>>>>>>>>> BucketingSink queries s3 as a file system to list already written file
>>>>>>>>> parts (batches) and determine index of the next part to start. Due to
>>>>>>>>> eventual consistency of checking file existence in s3 [1], the
>>>>>>>>> BucketingSink can rewrite the previously written part and basically 
>>>>>>>>> loose
>>>>>>>>> it.
>>>>>>>>>
>>>>>>>>> I was wondering, what does S3's "read-after-write consistency"
>>>>>>>>> (mentioned on the page you linked) actually mean. It seems that this 
>>>>>>>>> might
>>>>>>>>> be possible:
>>>>>>>>> - LIST keys, find current max index
>>>>>>>>> - choose next index = max + 1
>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key doesn't
>>>>>>>>> exist on S3
>>>>>>>>>
>>>>>>>>> But definitely sounds easier if a sink keeps track of files in a
>>>>>>>>> way that's guaranteed to be consistent.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Juho
>>>>>>>>>
>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <
>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is
>>>>>>>>>> planned for the next 1.7 release, sorry for confusion.
>>>>>>>>>> The old BucketingSink has in general problem with s3.
>>>>>>>>>> Internally BucketingSink queries s3 as a file system
>>>>>>>>>> to list already written file parts (batches) and determine index
>>>>>>>>>> of the next part to start. Due to eventual consistency of checking 
>>>>>>>>>> file
>>>>>>>>>> existence in s3 [1], the BucketingSink can rewrite the previously 
>>>>>>>>>> written
>>>>>>>>>> part and basically loose it. It should be fixed for 
>>>>>>>>>> StreamingFileSink in
>>>>>>>>>> 1.7 where Flink keeps its own track of written parts and does not 
>>>>>>>>>> rely on
>>>>>>>>>> s3 as a file system.
>>>>>>>>>> I also include Kostas, he might add more details.
>>>>>>>>>>
>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it for
>>>>>>>>>> sure  I would also check whether the size of missing events is 
>>>>>>>>>> around the
>>>>>>>>>> batch size of BucketingSink or not. You also wrote that the 
>>>>>>>>>> timestamps of
>>>>>>>>>> lost event are 'probably' around the time of the savepoint, if it is 
>>>>>>>>>> not
>>>>>>>>>> yet for sure I would also check it.
>>>>>>>>>>
>>>>>>>>>> Have you already checked the log files of job manager and task
>>>>>>>>>> managers for the job running before and after the restore from the 
>>>>>>>>>> check
>>>>>>>>>> point? Is everything successful there, no errors, relevant warnings 
>>>>>>>>>> or
>>>>>>>>>> exceptions?
>>>>>>>>>>
>>>>>>>>>> As the next step, I would suggest to log all encountered events
>>>>>>>>>> in DistinctFunction.reduce if possible for production data and check
>>>>>>>>>> whether the missed events are eventually processed before or after 
>>>>>>>>>> the
>>>>>>>>>> savepoint. The following log message indicates a border between the 
>>>>>>>>>> events
>>>>>>>>>> that should be included into the savepoint (logged before) or not:
>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template)
>>>>>>>>>> Also check if the savepoint has been overall completed:
>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Andrey
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>>>>>>>
>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Using StreamingFileSink is not a convenient option for production
>>>>>>>>>> use for us as it doesn't support s3*. I could use StreamingFileSink 
>>>>>>>>>> just to
>>>>>>>>>> verify, but I don't see much point in doing so. Please consider my 
>>>>>>>>>> previous
>>>>>>>>>> comment:
>>>>>>>>>>
>>>>>>>>>> > I realized that BucketingSink must not play any role in this
>>>>>>>>>> problem. This is because only when the 24-hour window triggers,
>>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring point
>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose 
>>>>>>>>>> anything
>>>>>>>>>> either (right?).
>>>>>>>>>>
>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine how
>>>>>>>>>> there could be any difference. It's very real that the sink doesn't 
>>>>>>>>>> get any
>>>>>>>>>> input for a long time until the 24-hour window closes, and then it 
>>>>>>>>>> quickly
>>>>>>>>>> writes out everything because it's not that much data eventually for 
>>>>>>>>>> the
>>>>>>>>>> distinct values.
>>>>>>>>>>
>>>>>>>>>> Any ideas for debugging what's happening around the savepoint &
>>>>>>>>>> restoration time?
>>>>>>>>>>
>>>>>>>>>> *) I actually implemented StreamingFileSink as an alternative
>>>>>>>>>> sink. This was before I came to realize that most likely the sink 
>>>>>>>>>> component
>>>>>>>>>> has nothing to do with the data loss problem. I tried it with s3n:// 
>>>>>>>>>> path
>>>>>>>>>> just to see an exception being thrown. In the source code I indeed 
>>>>>>>>>> then
>>>>>>>>>> found an explicit check for the target path scheme to be "hdfs://".
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ok, I think before further debugging the window reduced state,
>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in
>>>>>>>>>>> Flink 1.6.0 instead of the previous 'BucketingSink’?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Andrey
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>>>>>>>
>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it seems
>>>>>>>>>>> like there's a bug somewhere now that the output is missing some 
>>>>>>>>>>> data.
>>>>>>>>>>>
>>>>>>>>>>> > I would wait and check the actual output in s3 because it is
>>>>>>>>>>> the main result of the job
>>>>>>>>>>>
>>>>>>>>>>> Yes, and that's what I have already done. There seems to be
>>>>>>>>>>> always some data loss with the production data volumes, if the job 
>>>>>>>>>>> has been
>>>>>>>>>>> restarted on that day.
>>>>>>>>>>>
>>>>>>>>>>> Would you have any suggestions for how to debug this further?
>>>>>>>>>>>
>>>>>>>>>>> Many thanks for stepping in.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>
>>>>>>>>>>>> So it is a per key deduplication job.
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 because it
>>>>>>>>>>>> is the main result of the job and
>>>>>>>>>>>>
>>>>>>>>>>>> > The late data around the time of taking savepoint might be
>>>>>>>>>>>> not included into the savepoint but it should be behind the 
>>>>>>>>>>>> snapshotted
>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>
>>>>>>>>>>>> is not a bug, it is a possible behaviour.
>>>>>>>>>>>>
>>>>>>>>>>>> The savepoint is a snapshot of the data in transient which is
>>>>>>>>>>>> already consumed from Kafka.
>>>>>>>>>>>> Basically the full contents of the window result is split
>>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed 
>>>>>>>>>>>> offset in
>>>>>>>>>>>> Kafka but before the window result is written into s3.
>>>>>>>>>>>>
>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying that
>>>>>>>>>>>> the final result in s3 should include all records after it.
>>>>>>>>>>>> This is what should be guaranteed but not the contents of the
>>>>>>>>>>>> intermediate savepoint.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Andrey
>>>>>>>>>>>>
>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your answer!
>>>>>>>>>>>>
>>>>>>>>>>>> I check for the missed data from the final output on s3. So I
>>>>>>>>>>>> wait until the next day, then run the same thing re-implemented in 
>>>>>>>>>>>> batch,
>>>>>>>>>>>> and compare the output.
>>>>>>>>>>>>
>>>>>>>>>>>> > The late data around the time of taking savepoint might be
>>>>>>>>>>>> not included into the savepoint but it should be behind the 
>>>>>>>>>>>> snapshotted
>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, I would definitely expect that. It seems like there's a
>>>>>>>>>>>> bug somewhere.
>>>>>>>>>>>>
>>>>>>>>>>>> > Then it should just come later after the restore and should
>>>>>>>>>>>> be reduced within the allowed lateness into the final result which 
>>>>>>>>>>>> is saved
>>>>>>>>>>>> into s3.
>>>>>>>>>>>>
>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any role
>>>>>>>>>>>> here, because I started running the job with allowedLateness=0, 
>>>>>>>>>>>> and still
>>>>>>>>>>>> get the data loss, while my late data output doesn't receive 
>>>>>>>>>>>> anything.
>>>>>>>>>>>>
>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or
>>>>>>>>>>>> the actual implementation, basically saving just one of records 
>>>>>>>>>>>> inside the
>>>>>>>>>>>> 24h window in s3? then what is missing there?
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a keyBy
>>>>>>>>>>>> before the DistinctFunction. So there's one record for each key 
>>>>>>>>>>>> (which is
>>>>>>>>>>>> the combination of a couple of fields). In practice I've seen that 
>>>>>>>>>>>> we're
>>>>>>>>>>>> missing ~2000-4000 elements on each restore, and the total output 
>>>>>>>>>>>> is
>>>>>>>>>>>> obviously much more than that.
>>>>>>>>>>>>
>>>>>>>>>>>> Here's the full code for the key selector:
>>>>>>>>>>>>
>>>>>>>>>>>> public class MapKeySelector implements
>>>>>>>>>>>> KeySelector<Map<String,String>, Object> {
>>>>>>>>>>>>
>>>>>>>>>>>>     private final String[] fields;
>>>>>>>>>>>>
>>>>>>>>>>>>     public MapKeySelector(String... fields) {
>>>>>>>>>>>>         this.fields = fields;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public Object getKey(Map<String, String> event) throws
>>>>>>>>>>>> Exception {
>>>>>>>>>>>>         Tuple key =
>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();
>>>>>>>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>>>>>>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>>>>>>>>>>>         }
>>>>>>>>>>>>         return key;
>>>>>>>>>>>>     }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> And a more exact example on how it's used:
>>>>>>>>>>>>
>>>>>>>>>>>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID",
>>>>>>>>>>>> "FIELD", "KEY_NAME", "KEY_VALUE"))
>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Where exactly does the data miss? When do you notice that?
>>>>>>>>>>>>> Do you check it:
>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume in
>>>>>>>>>>>>> the middle of the day
>>>>>>>>>>>>> or
>>>>>>>>>>>>> - some distinct records miss in the final output
>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually triggered 
>>>>>>>>>>>>> and saved
>>>>>>>>>>>>> into s3 at the end of the day? is this the main output?
>>>>>>>>>>>>>
>>>>>>>>>>>>> The late data around the time of taking savepoint might be not
>>>>>>>>>>>>> included into the savepoint but it should be behind the 
>>>>>>>>>>>>> snapshotted offset
>>>>>>>>>>>>> in Kafka. Then it should just come later after the restore and 
>>>>>>>>>>>>> should be
>>>>>>>>>>>>> reduced within the allowed lateness into the final result which 
>>>>>>>>>>>>> is saved
>>>>>>>>>>>>> into s3.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or the
>>>>>>>>>>>>> actual implementation, basically saving just one of records 
>>>>>>>>>>>>> inside the 24h
>>>>>>>>>>>>> window in s3? then what is missing there?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing data
>>>>>>>>>>>>> when restoring from savepoint.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I realized that BucketingSink must not play any role in this
>>>>>>>>>>>>>> problem. This is because only when the 24-hour window triggers,
>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state restoring 
>>>>>>>>>>>>>> point
>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose 
>>>>>>>>>>>>>> anything
>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely from
>>>>>>>>>>>>>> the equation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any
>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what logs 
>>>>>>>>>>>>>> to enable.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues
>>>>>>>>>>>>>> with that, that could contribute to lost data when restoring a 
>>>>>>>>>>>>>> savepoint?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when state
>>>>>>>>>>>>>>> is restored from a savepoint.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Do you have any debugging hints to find out where exactly
>>>>>>>>>>>>>>> the data gets dropped?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. It
>>>>>>>>>>>>>>> doesn't have any custom state management.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from that
>>>>>>>>>>>>>>> savepoint, some data is missed. It seems to be losing just a 
>>>>>>>>>>>>>>> small amount
>>>>>>>>>>>>>>> of data. The event time of lost data is probably around the 
>>>>>>>>>>>>>>> time of
>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is not 
>>>>>>>>>>>>>>> entirely
>>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) 
>>>>>>>>>>>>>>> events that come
>>>>>>>>>>>>>>> in after restoring.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> When the job processes a full 24-hour window without
>>>>>>>>>>>>>>> interruptions it doesn't miss anything.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Usually the problem doesn't happen in test environments that
>>>>>>>>>>>>>>> have smaller parallelism and smaller data volumes. But in 
>>>>>>>>>>>>>>> production
>>>>>>>>>>>>>>> volumes the job seems to be consistently missing at least 
>>>>>>>>>>>>>>> something on
>>>>>>>>>>>>>>> every restore.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This issue has consistently happened since the job was
>>>>>>>>>>>>>>> initially created. It was at first run on an older version of 
>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm wondering if this could be for example some
>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. 
>>>>>>>>>>>>>>> what's been
>>>>>>>>>>>>>>> written by BucketingSink?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. Job content, simplified
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         kafkaStream
>>>>>>>>>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>                 .addSink(sink)
>>>>>>>>>>>>>>>                 // use a fixed number of output partitions
>>>>>>>>>>>>>>>                 .setParallelism(8))
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>>>>>>>>>>>> DistinctFunction())
>>>>>>>>>>>>>>>  */
>>>>>>>>>>>>>>> public class DistinctFunction implements
>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String, String>
>>>>>>>>>>>>>>> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2. State configuration
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath,
>>>>>>>>>>>>>>> enableIncrementalCheckpointing);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once
>>>>>>>>>>>>>>> Interval 1m 0s
>>>>>>>>>>>>>>> Timeout 10m 0s
>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>>> cancellation)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything special
>>>>>>>>>>>>>>> here, if not the fact that we're writing to S3.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>>>>>>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 4. Kafka & event time
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a
>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We also 
>>>>>>>>>>>>>>> write late
>>>>>>>>>>>>>>> data to side output, but nothing is written there – if it 
>>>>>>>>>>>>>>> would, it could
>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that our 
>>>>>>>>>>>>>>> late data
>>>>>>>>>>>>>>> writing works, because we previously had some actual late data 
>>>>>>>>>>>>>>> which ended
>>>>>>>>>>>>>>> up there).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 5. allowedLateness
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It may be or may not be relevant that I have also enabled
>>>>>>>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If that makes sense, I could try removing allowedLateness
>>>>>>>>>>>>>>> entirely? That would be just to rule out that Flink doesn't 
>>>>>>>>>>>>>>> have a bug
>>>>>>>>>>>>>>> that's related to restoring state in combination with the 
>>>>>>>>>>>>>>> allowedLateness
>>>>>>>>>>>>>>> feature. After all, all of our data should be in a good enough 
>>>>>>>>>>>>>>> order to not
>>>>>>>>>>>>>>> be late, given the max out of orderness used on kafka consumer 
>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>>> extractor.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thank you in advance!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>>>
>>
>>
>

Reply via email to