Hi Stefan, Sorry but it doesn't seem immediately clear to me what's a good way to use https://github.com/king/bravo.
How are people using it? Would you for example modify build.gradle somehow to publish the bravo as a library locally/internally? Or add code directly in the bravo project (locally) and run it from there (using an IDE, for example)? Also it doesn't seem like the bravo gradle project supports building a flink job jar, but if it does, how do I do it? Thanks. On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> wrote: > Good then, I'll try to analyze the savepoints with Bravo. Thanks! > > > How would you assume that backpressure would influence your updates? > Updates to each local state still happen event-by-event, in a single > reader/writing thread. > > Sure, just an ignorant guess by me. I'm not familiar with most of Flink's > internals. Any way high backpressure is not a seen on this job after it has > caught up the lag, so at I thought it would be worth mentioning. > > On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <s.rich...@data-artisans.com> > wrote: > >> Hi, >> >> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>: >> >> > you could take a look at Bravo [1] to query your savepoints and to >> check if the state in the savepoint complete w.r.t your expectations >> >> Thanks. I'm not 100% if this is the case, but to me it seemed like the >> missed ids were being logged by the reducer soon after the job had started >> (after restoring a savepoint). But on the other hand, after that I also >> made another savepoint & restored that, so what I could check is: does that >> next savepoint have the missed ids that were logged (a couple of minutes >> before the savepoint was created, so there should've been more than enough >> time to add them to the state before the savepoint was triggered) or not. >> Any way, if I would be able to verify with Bravo that the ids are missing >> from the savepoint (even though reduced logged that it saw them), would >> that help in figuring out where they are lost? Is there some major >> difference compared to just looking at the final output after window has >> been triggered? >> >> >> >> I think that makes a difference. For example, you can investigate if >> there is a state loss or a problem with the windowing. In the savepoint you >> could see which keys exists and to which windows they are assigned. Also >> just to make sure there is no misunderstanding: only elements that are in >> the state at the start of a savepoint are expected to be part of the >> savepoint; all elements between start and completion of the savepoint are >> not expected to be part of the savepoint. >> >> >> > I also doubt that the problem is about backpressure after restore, >> because the job will only continue running after the state restore is >> already completed. >> >> Yes, I'm not suspecting that the state restoring would be the problem >> either. My concern was about backpressure possibly messing with the updates >> of reducing state? I would tend to suspect that updating the state >> consistently is what fails, where heavy load / backpressure might be a >> factor. >> >> >> >> How would you assume that backpressure would influence your updates? >> Updates to each local state still happen event-by-event, in a single >> reader/writing thread. >> >> >> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter < >> s.rich...@data-artisans.com> wrote: >> >>> Hi, >>> >>> you could take a look at Bravo [1] to query your savepoints and to check >>> if the state in the savepoint complete w.r.t your expectations. I somewhat >>> doubt that there is a general problem with the state/savepoints because >>> many users are successfully running it on a large state and I am not aware >>> of any data loss problems, but nothing is impossible. What the savepoint >>> does is also straight forward: iterate a db snapshot and write all >>> key/value pairs to disk, so all data that was in the db at the time of the >>> savepoint, should show up. I also doubt that the problem is about >>> backpressure after restore, because the job will only continue running >>> after the state restore is already completed. Did you check if you are >>> using exactly-one-semantics or at-least-once semantics? Also did you check >>> that the kafka consumer start position is configured properly [2]? Are >>> watermarks generated as expected after restore? >>> >>> One more unrelated high-level comment that I have: for a granularity of >>> 24h windows, I wonder if it would not make sense to use a batch job instead? >>> >>> Best, >>> Stefan >>> >>> [1] https://github.com/king/bravo >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration >>> >>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>: >>> >>> Thanks for the suggestions! >>> >>> > In general, it would be tremendously helpful to have a minimal working >>> example which allows to reproduce the problem. >>> >>> Definitely. The problem with reproducing has been that this only seems >>> to happen in the bigger production data volumes. >>> >>> That's why I'm hoping to find a way to debug this with the production >>> data. With that it seems to consistently cause some misses every time the >>> job is killed/restored. >>> >>> > check if it happens for shorter windows, like 1h etc >>> >>> What would be the benefit of that compared to 24h window? >>> >>> > simplify the job to not use a reduce window but simply a time window >>> which outputs the window events. Then counting the input and output events >>> should allow you to verify the results. If you are not seeing missing >>> events, then it could have something to do with the reducing state used in >>> the reduce function. >>> >>> Hm, maybe, but not sure how useful that would be, because it wouldn't >>> yet prove that it's related to reducing, because not having a reduce >>> function could also mean smaller load on the job, which might alone be >>> enough to make the problem not manifest. >>> >>> Is there a way to debug what goes into the reducing state (including >>> what gets removed or overwritten and what restored), if that makes sense..? >>> Maybe some suitable logging could be used to prove that the lost data is >>> written to the reducing state (or at least asked to be written), but not >>> found any more when the window closes and state is flushed? >>> >>> On configuration once more, we're using RocksDB state backend with >>> asynchronous incremental checkpointing. The state is restored from >>> savepoints though, we haven't been using those checkpoints in these tests >>> (although they could be used in case of crashes – but we haven't had those >>> now). >>> >>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Juho, >>>> >>>> another idea to further narrow down the problem could be to simplify >>>> the job to not use a reduce window but simply a time window which outputs >>>> the window events. Then counting the input and output events should allow >>>> you to verify the results. If you are not seeing missing events, then it >>>> could have something to do with the reducing state used in the reduce >>>> function. >>>> >>>> In general, it would be tremendously helpful to have a minimal working >>>> example which allows to reproduce the problem. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin < >>>> and...@data-artisans.com> wrote: >>>> >>>>> Hi Juho, >>>>> >>>>> can you try to reduce the job to minimal reproducible example and >>>>> share the job and input? >>>>> >>>>> For example: >>>>> - some simple records as input, e.g. tuples of primitive types saved >>>>> as cvs >>>>> - minimal deduplication job which processes them and misses records >>>>> - check if it happens for shorter windows, like 1h etc >>>>> - setup which you use for the job, ideally locally reproducible or >>>>> cloud >>>>> >>>>> Best, >>>>> Andrey >>>>> >>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote: >>>>> >>>>> Sorry to insist, but we seem to be blocked for any serious usage of >>>>> state in Flink if we can't rely on it to not miss data in case of restore. >>>>> >>>>> Would anyone have suggestions for how to troubleshoot this? So far I >>>>> have verified with DEBUG logs that our reduce function gets to process >>>>> also >>>>> the data that is missing from window output. >>>>> >>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com> >>>>> wrote: >>>>> >>>>>> Hi Andrey, >>>>>> >>>>>> To rule out for good any questions about sink behaviour, the job was >>>>>> killed and started with an additional Kafka sink. >>>>>> >>>>>> The same number of ids were missed in both outputs: KafkaSink & >>>>>> BucketingSink. >>>>>> >>>>>> I wonder what would be the next steps in debugging? >>>>>> >>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks, Andrey. >>>>>>> >>>>>>> > so it means that the savepoint does not loose at least some >>>>>>> dropped records. >>>>>>> >>>>>>> I'm not sure what you mean by that? I mean, it was known from the >>>>>>> beginning, that not everything is lost before/after restoring a >>>>>>> savepoint, >>>>>>> just some records around the time of restoration. It's not 100% clear >>>>>>> whether records are lost before making a savepoint or after restoring >>>>>>> it. >>>>>>> Although, based on the new DEBUG logs it seems more like losing some >>>>>>> records that are seen ~soon after restoring. It seems like Flink would >>>>>>> be >>>>>>> somehow confused either about the restored state vs. new inserts to >>>>>>> state. >>>>>>> This could also be somehow linked to the high back pressure on the kafka >>>>>>> source while the stream is catching up. >>>>>>> >>>>>>> > If it is feasible for your setup, I suggest to insert one more map >>>>>>> function after reduce and before sink. >>>>>>> > etc. >>>>>>> >>>>>>> Isn't that the same thing that we discussed before? Nothing is sent >>>>>>> to BucketingSink before the window closes, so I don't see how it would >>>>>>> make >>>>>>> any difference if we replace the BucketingSink with a map function or >>>>>>> another sink type. We don't create or restore savepoints during the time >>>>>>> when BucketingSink gets input or has open buckets – that happens at a >>>>>>> much >>>>>>> later time of day. I would focus on figuring out why the records are >>>>>>> lost >>>>>>> while the window is open. But I don't know how to do that. Would you >>>>>>> have >>>>>>> any additional suggestions? >>>>>>> >>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin < >>>>>>> and...@data-artisans.com> wrote: >>>>>>> >>>>>>>> Hi Juho, >>>>>>>> >>>>>>>> so it means that the savepoint does not loose at least some dropped >>>>>>>> records. >>>>>>>> >>>>>>>> If it is feasible for your setup, I suggest to insert one more map >>>>>>>> function after reduce and before sink. >>>>>>>> The map function should be called right after window is triggered >>>>>>>> but before flushing to s3. >>>>>>>> The result of reduce (deduped record) could be logged there. >>>>>>>> This should allow to check whether the processed distinct records >>>>>>>> were buffered in the state after the restoration from the savepoint or >>>>>>>> not. >>>>>>>> If they were buffered we should see that there was an attempt to write >>>>>>>> them >>>>>>>> to the sink from the state. >>>>>>>> >>>>>>>> Another suggestion is to try to write records to some other sink or >>>>>>>> to both. >>>>>>>> E.g. if you can access file system of workers, maybe just into >>>>>>>> local files and check whether the records are also dropped there. >>>>>>>> >>>>>>>> Best, >>>>>>>> Andrey >>>>>>>> >>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> wrote: >>>>>>>> >>>>>>>> Hi Andrey! >>>>>>>> >>>>>>>> I was finally able to gather the DEBUG logs that you suggested. In >>>>>>>> short, the reducer logged that it processed at least some of the ids >>>>>>>> that >>>>>>>> were missing from the output. >>>>>>>> >>>>>>>> "At least some", because I didn't have the job running with DEBUG >>>>>>>> logs for the full 24-hour window period. So I was only able to look up >>>>>>>> if I >>>>>>>> can find *some* of the missing ids in the DEBUG logs. Which I did >>>>>>>> indeed. >>>>>>>> >>>>>>>> I changed the DistinctFunction.java to do this: >>>>>>>> >>>>>>>> @Override >>>>>>>> public Map<String, String> reduce(Map<String, String> value1, >>>>>>>> Map<String, String> value2) { >>>>>>>> LOG.debug("DistinctFunction.reduce returns: {}={}", >>>>>>>> value1.get("field"), value1.get("id")); >>>>>>>> return value1; >>>>>>>> } >>>>>>>> >>>>>>>> Then: >>>>>>>> >>>>>>>> vi flink-1.6.0/conf/log4j.properties >>>>>>>> >>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >>>>>>>> >>>>>>>> Then I ran the following kind of test: >>>>>>>> >>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18 >>>>>>>> 08:35 UTC 2018 >>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13, >>>>>>>> restored from that previous cluster's savepoint >>>>>>>> - Ran until caught up offsets >>>>>>>> - Cancelled the job with a new savepoint >>>>>>>> - Started a new job _without_ DEBUG, which restored the new >>>>>>>> savepoint, let it keep running so that it will eventually write the >>>>>>>> output >>>>>>>> >>>>>>>> Then on the next day, after results had been flushed when the >>>>>>>> 24-hour window closed, I compared the results again with a batch >>>>>>>> version's >>>>>>>> output. And found some missing ids as usual. >>>>>>>> >>>>>>>> I drilled down to one specific missing id (I'm replacing the actual >>>>>>>> value with AN12345 below), which was not found in the stream output, >>>>>>>> but >>>>>>>> was found in batch output & flink DEBUG logs. >>>>>>>> >>>>>>>> Related to that id, I gathered the following information: >>>>>>>> >>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored >>>>>>>> >>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first time, >>>>>>>> proved by this log line: >>>>>>>> 2018-09-18 09:14:29,085 DEBUG >>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction - >>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345 >>>>>>>> >>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint >>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint >>>>>>>> >>>>>>>> ( >>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1 min >>>>>>>> delay before next) >>>>>>>> / >>>>>>>> more occurrences of DistinctFunction.reduce >>>>>>>> ) >>>>>>>> >>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last time >>>>>>>> >>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled >>>>>>>> >>>>>>>> To be noted, there was high backpressure after restoring from >>>>>>>> savepoint until the stream caught up with the kafka offsets. Although, >>>>>>>> our >>>>>>>> job uses assign timestamps & watermarks on the flink kafka consumer >>>>>>>> itself, >>>>>>>> so event time of all partitions is synchronized. As expected, we don't >>>>>>>> get >>>>>>>> any late data in the late data side output. >>>>>>>> >>>>>>>> From this we can see that the missing ids are processed by the >>>>>>>> reducer, but they must get lost somewhere before the 24-hour window is >>>>>>>> triggered. >>>>>>>> >>>>>>>> I think it's worth mentioning once more that the stream doesn't >>>>>>>> miss any ids if we let it's running without interruptions / state >>>>>>>> restoring. >>>>>>>> >>>>>>>> What's next? >>>>>>>> >>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin < >>>>>>>> and...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> Hi Juho, >>>>>>>>> >>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a >>>>>>>>> burst of input >>>>>>>>> >>>>>>>>> This is of course totally true, my understanding is the same. We >>>>>>>>> cannot exclude problem there for sure, just savepoints are used a lot >>>>>>>>> w/o >>>>>>>>> problem reports and BucketingSink is known to be problematic with s3. >>>>>>>>> That >>>>>>>>> is why, I asked you: >>>>>>>>> >>>>>>>>> > You also wrote that the timestamps of lost event are 'probably' >>>>>>>>> around the time of the savepoint, if it is not yet for sure I would >>>>>>>>> also >>>>>>>>> check it. >>>>>>>>> >>>>>>>>> Although, bucketing sink might loose any data at the end of the >>>>>>>>> day (also from the middle). The fact, that it is always around the >>>>>>>>> time of >>>>>>>>> taking a savepoint and not random, is surely suspicious and possible >>>>>>>>> savepoint failures need to be investigated. >>>>>>>>> >>>>>>>>> Regarding the s3 problem, s3 doc says: >>>>>>>>> >>>>>>>>> > The caveat is that if you make a HEAD or GET request to the key >>>>>>>>> name (to find if the object exists) before creating the object, >>>>>>>>> Amazon S3 >>>>>>>>> provides 'eventual consistency' for read-after-write. >>>>>>>>> >>>>>>>>> The algorithm you suggest is how it is roughly implemented now >>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that >>>>>>>>> 'eventual consistency’ means that even if you just created file (its >>>>>>>>> name >>>>>>>>> is key) it can be that you do not get it in the list or exists (HEAD) >>>>>>>>> returns false and you risk to rewrite the previous part. >>>>>>>>> >>>>>>>>> The BucketingSink was designed for a standard file system. s3 is >>>>>>>>> used over a file system wrapper atm but does not always provide >>>>>>>>> normal file >>>>>>>>> system guarantees. See also last example in [1]. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Andrey >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>>>>>>> >>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> wrote: >>>>>>>>> >>>>>>>>> Andrey, thank you very much for the debugging suggestions, I'll >>>>>>>>> try them. >>>>>>>>> >>>>>>>>> In the meanwhile two more questions, please: >>>>>>>>> >>>>>>>>> > Just to keep in mind this problem with s3 and exclude it for >>>>>>>>> sure. I would also check whether the size of missing events is around >>>>>>>>> the >>>>>>>>> batch size of BucketingSink or not. >>>>>>>>> >>>>>>>>> Fair enough, but I also want to focus on debugging the most >>>>>>>>> probable subject first. So what do you think about this – true or >>>>>>>>> false: >>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst of >>>>>>>>> input. >>>>>>>>> Around the state restoring point (middle of the day) it doesn't get >>>>>>>>> any >>>>>>>>> input, so it can't lose anything either. Isn't this true, or have I >>>>>>>>> totally >>>>>>>>> missed how Flink works in triggering window results? I would not >>>>>>>>> expect >>>>>>>>> there to be any optimization that speculatively triggers early >>>>>>>>> results of a >>>>>>>>> regular time window to the downstream operators. >>>>>>>>> >>>>>>>>> > The old BucketingSink has in general problem with s3. Internally >>>>>>>>> BucketingSink queries s3 as a file system to list already written file >>>>>>>>> parts (batches) and determine index of the next part to start. Due to >>>>>>>>> eventual consistency of checking file existence in s3 [1], the >>>>>>>>> BucketingSink can rewrite the previously written part and basically >>>>>>>>> loose >>>>>>>>> it. >>>>>>>>> >>>>>>>>> I was wondering, what does S3's "read-after-write consistency" >>>>>>>>> (mentioned on the page you linked) actually mean. It seems that this >>>>>>>>> might >>>>>>>>> be possible: >>>>>>>>> - LIST keys, find current max index >>>>>>>>> - choose next index = max + 1 >>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key doesn't >>>>>>>>> exist on S3 >>>>>>>>> >>>>>>>>> But definitely sounds easier if a sink keeps track of files in a >>>>>>>>> way that's guaranteed to be consistent. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Juho >>>>>>>>> >>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin < >>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is >>>>>>>>>> planned for the next 1.7 release, sorry for confusion. >>>>>>>>>> The old BucketingSink has in general problem with s3. >>>>>>>>>> Internally BucketingSink queries s3 as a file system >>>>>>>>>> to list already written file parts (batches) and determine index >>>>>>>>>> of the next part to start. Due to eventual consistency of checking >>>>>>>>>> file >>>>>>>>>> existence in s3 [1], the BucketingSink can rewrite the previously >>>>>>>>>> written >>>>>>>>>> part and basically loose it. It should be fixed for >>>>>>>>>> StreamingFileSink in >>>>>>>>>> 1.7 where Flink keeps its own track of written parts and does not >>>>>>>>>> rely on >>>>>>>>>> s3 as a file system. >>>>>>>>>> I also include Kostas, he might add more details. >>>>>>>>>> >>>>>>>>>> Just to keep in mind this problem with s3 and exclude it for >>>>>>>>>> sure I would also check whether the size of missing events is >>>>>>>>>> around the >>>>>>>>>> batch size of BucketingSink or not. You also wrote that the >>>>>>>>>> timestamps of >>>>>>>>>> lost event are 'probably' around the time of the savepoint, if it is >>>>>>>>>> not >>>>>>>>>> yet for sure I would also check it. >>>>>>>>>> >>>>>>>>>> Have you already checked the log files of job manager and task >>>>>>>>>> managers for the job running before and after the restore from the >>>>>>>>>> check >>>>>>>>>> point? Is everything successful there, no errors, relevant warnings >>>>>>>>>> or >>>>>>>>>> exceptions? >>>>>>>>>> >>>>>>>>>> As the next step, I would suggest to log all encountered events >>>>>>>>>> in DistinctFunction.reduce if possible for production data and check >>>>>>>>>> whether the missed events are eventually processed before or after >>>>>>>>>> the >>>>>>>>>> savepoint. The following log message indicates a border between the >>>>>>>>>> events >>>>>>>>>> that should be included into the savepoint (logged before) or not: >>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template) >>>>>>>>>> Also check if the savepoint has been overall completed: >>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms." >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Andrey >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>>>>>>>>> >>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Using StreamingFileSink is not a convenient option for production >>>>>>>>>> use for us as it doesn't support s3*. I could use StreamingFileSink >>>>>>>>>> just to >>>>>>>>>> verify, but I don't see much point in doing so. Please consider my >>>>>>>>>> previous >>>>>>>>>> comment: >>>>>>>>>> >>>>>>>>>> > I realized that BucketingSink must not play any role in this >>>>>>>>>> problem. This is because only when the 24-hour window triggers, >>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring point >>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>>> anything >>>>>>>>>> either (right?). >>>>>>>>>> >>>>>>>>>> I could also use a kafka sink instead, but I can't imagine how >>>>>>>>>> there could be any difference. It's very real that the sink doesn't >>>>>>>>>> get any >>>>>>>>>> input for a long time until the 24-hour window closes, and then it >>>>>>>>>> quickly >>>>>>>>>> writes out everything because it's not that much data eventually for >>>>>>>>>> the >>>>>>>>>> distinct values. >>>>>>>>>> >>>>>>>>>> Any ideas for debugging what's happening around the savepoint & >>>>>>>>>> restoration time? >>>>>>>>>> >>>>>>>>>> *) I actually implemented StreamingFileSink as an alternative >>>>>>>>>> sink. This was before I came to realize that most likely the sink >>>>>>>>>> component >>>>>>>>>> has nothing to do with the data loss problem. I tried it with s3n:// >>>>>>>>>> path >>>>>>>>>> just to see an exception being thrown. In the source code I indeed >>>>>>>>>> then >>>>>>>>>> found an explicit check for the target path scheme to be "hdfs://". >>>>>>>>>> >>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin < >>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>> >>>>>>>>>>> Ok, I think before further debugging the window reduced state, >>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in >>>>>>>>>>> Flink 1.6.0 instead of the previous 'BucketingSink’? >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Andrey >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>>>>>>>> >>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it seems >>>>>>>>>>> like there's a bug somewhere now that the output is missing some >>>>>>>>>>> data. >>>>>>>>>>> >>>>>>>>>>> > I would wait and check the actual output in s3 because it is >>>>>>>>>>> the main result of the job >>>>>>>>>>> >>>>>>>>>>> Yes, and that's what I have already done. There seems to be >>>>>>>>>>> always some data loss with the production data volumes, if the job >>>>>>>>>>> has been >>>>>>>>>>> restarted on that day. >>>>>>>>>>> >>>>>>>>>>> Would you have any suggestions for how to debug this further? >>>>>>>>>>> >>>>>>>>>>> Many thanks for stepping in. >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin < >>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Juho, >>>>>>>>>>>> >>>>>>>>>>>> So it is a per key deduplication job. >>>>>>>>>>>> >>>>>>>>>>>> Yes, I would wait and check the actual output in s3 because it >>>>>>>>>>>> is the main result of the job and >>>>>>>>>>>> >>>>>>>>>>>> > The late data around the time of taking savepoint might be >>>>>>>>>>>> not included into the savepoint but it should be behind the >>>>>>>>>>>> snapshotted >>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>> >>>>>>>>>>>> is not a bug, it is a possible behaviour. >>>>>>>>>>>> >>>>>>>>>>>> The savepoint is a snapshot of the data in transient which is >>>>>>>>>>>> already consumed from Kafka. >>>>>>>>>>>> Basically the full contents of the window result is split >>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed >>>>>>>>>>>> offset in >>>>>>>>>>>> Kafka but before the window result is written into s3. >>>>>>>>>>>> >>>>>>>>>>>> Allowed lateness should not affect it, I am just saying that >>>>>>>>>>>> the final result in s3 should include all records after it. >>>>>>>>>>>> This is what should be guaranteed but not the contents of the >>>>>>>>>>>> intermediate savepoint. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Andrey >>>>>>>>>>>> >>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your answer! >>>>>>>>>>>> >>>>>>>>>>>> I check for the missed data from the final output on s3. So I >>>>>>>>>>>> wait until the next day, then run the same thing re-implemented in >>>>>>>>>>>> batch, >>>>>>>>>>>> and compare the output. >>>>>>>>>>>> >>>>>>>>>>>> > The late data around the time of taking savepoint might be >>>>>>>>>>>> not included into the savepoint but it should be behind the >>>>>>>>>>>> snapshotted >>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>> >>>>>>>>>>>> Yes, I would definitely expect that. It seems like there's a >>>>>>>>>>>> bug somewhere. >>>>>>>>>>>> >>>>>>>>>>>> > Then it should just come later after the restore and should >>>>>>>>>>>> be reduced within the allowed lateness into the final result which >>>>>>>>>>>> is saved >>>>>>>>>>>> into s3. >>>>>>>>>>>> >>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any role >>>>>>>>>>>> here, because I started running the job with allowedLateness=0, >>>>>>>>>>>> and still >>>>>>>>>>>> get the data loss, while my late data output doesn't receive >>>>>>>>>>>> anything. >>>>>>>>>>>> >>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or >>>>>>>>>>>> the actual implementation, basically saving just one of records >>>>>>>>>>>> inside the >>>>>>>>>>>> 24h window in s3? then what is missing there? >>>>>>>>>>>> >>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a keyBy >>>>>>>>>>>> before the DistinctFunction. So there's one record for each key >>>>>>>>>>>> (which is >>>>>>>>>>>> the combination of a couple of fields). In practice I've seen that >>>>>>>>>>>> we're >>>>>>>>>>>> missing ~2000-4000 elements on each restore, and the total output >>>>>>>>>>>> is >>>>>>>>>>>> obviously much more than that. >>>>>>>>>>>> >>>>>>>>>>>> Here's the full code for the key selector: >>>>>>>>>>>> >>>>>>>>>>>> public class MapKeySelector implements >>>>>>>>>>>> KeySelector<Map<String,String>, Object> { >>>>>>>>>>>> >>>>>>>>>>>> private final String[] fields; >>>>>>>>>>>> >>>>>>>>>>>> public MapKeySelector(String... fields) { >>>>>>>>>>>> this.fields = fields; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public Object getKey(Map<String, String> event) throws >>>>>>>>>>>> Exception { >>>>>>>>>>>> Tuple key = >>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance(); >>>>>>>>>>>> for (int i = 0; i < fields.length; i++) { >>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], ""), i); >>>>>>>>>>>> } >>>>>>>>>>>> return key; >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> And a more exact example on how it's used: >>>>>>>>>>>> >>>>>>>>>>>> .keyBy(new MapKeySelector("ID", "PLAYER_ID", >>>>>>>>>>>> "FIELD", "KEY_NAME", "KEY_VALUE")) >>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin < >>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>> >>>>>>>>>>>>> Where exactly does the data miss? When do you notice that? >>>>>>>>>>>>> Do you check it: >>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume in >>>>>>>>>>>>> the middle of the day >>>>>>>>>>>>> or >>>>>>>>>>>>> - some distinct records miss in the final output >>>>>>>>>>>>> of BucketingSink in s3 after window result is actually triggered >>>>>>>>>>>>> and saved >>>>>>>>>>>>> into s3 at the end of the day? is this the main output? >>>>>>>>>>>>> >>>>>>>>>>>>> The late data around the time of taking savepoint might be not >>>>>>>>>>>>> included into the savepoint but it should be behind the >>>>>>>>>>>>> snapshotted offset >>>>>>>>>>>>> in Kafka. Then it should just come later after the restore and >>>>>>>>>>>>> should be >>>>>>>>>>>>> reduced within the allowed lateness into the final result which >>>>>>>>>>>>> is saved >>>>>>>>>>>>> into s3. >>>>>>>>>>>>> >>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or the >>>>>>>>>>>>> actual implementation, basically saving just one of records >>>>>>>>>>>>> inside the 24h >>>>>>>>>>>>> window in s3? then what is missing there? >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Andrey >>>>>>>>>>>>> >>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing data >>>>>>>>>>>>> when restoring from savepoint. >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio < >>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I realized that BucketingSink must not play any role in this >>>>>>>>>>>>>> problem. This is because only when the 24-hour window triggers, >>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state restoring >>>>>>>>>>>>>> point >>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>>>>>>> anything >>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>> >>>>>>>>>>>>>> I will next try removing the allowedLateness entirely from >>>>>>>>>>>>>> the equation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In the meanwhile, please let me know if you have any >>>>>>>>>>>>>> suggestions for debugging the lost data, for example what logs >>>>>>>>>>>>>> to enable. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues >>>>>>>>>>>>>> with that, that could contribute to lost data when restoring a >>>>>>>>>>>>>> savepoint? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio < >>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when state >>>>>>>>>>>>>>> is restored from a savepoint. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Do you have any debugging hints to find out where exactly >>>>>>>>>>>>>>> the data gets dropped? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. It >>>>>>>>>>>>>>> doesn't have any custom state management. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from that >>>>>>>>>>>>>>> savepoint, some data is missed. It seems to be losing just a >>>>>>>>>>>>>>> small amount >>>>>>>>>>>>>>> of data. The event time of lost data is probably around the >>>>>>>>>>>>>>> time of >>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is not >>>>>>>>>>>>>>> entirely >>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) >>>>>>>>>>>>>>> events that come >>>>>>>>>>>>>>> in after restoring. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> When the job processes a full 24-hour window without >>>>>>>>>>>>>>> interruptions it doesn't miss anything. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Usually the problem doesn't happen in test environments that >>>>>>>>>>>>>>> have smaller parallelism and smaller data volumes. But in >>>>>>>>>>>>>>> production >>>>>>>>>>>>>>> volumes the job seems to be consistently missing at least >>>>>>>>>>>>>>> something on >>>>>>>>>>>>>>> every restore. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This issue has consistently happened since the job was >>>>>>>>>>>>>>> initially created. It was at first run on an older version of >>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm wondering if this could be for example some >>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. >>>>>>>>>>>>>>> what's been >>>>>>>>>>>>>>> written by BucketingSink? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. Job content, simplified >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> kafkaStream >>>>>>>>>>>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>>>>>>>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>> .allowedLateness(allowedLateness) >>>>>>>>>>>>>>> .sideOutputLateData(lateDataTag) >>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>> .addSink(sink) >>>>>>>>>>>>>>> // use a fixed number of output partitions >>>>>>>>>>>>>>> .setParallelism(8)) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>> * Usage: .keyBy("the", "distinct", "fields").reduce(new >>>>>>>>>>>>>>> DistinctFunction()) >>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>> public class DistinctFunction implements >>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, String> >>>>>>>>>>>>>>> value1, Map<String, String> value2) { >>>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2. State configuration >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints"; >>>>>>>>>>>>>>> new RocksDBStateBackend(statePath, >>>>>>>>>>>>>>> enableIncrementalCheckpointing); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Checkpointing Mode Exactly Once >>>>>>>>>>>>>>> Interval 1m 0s >>>>>>>>>>>>>>> Timeout 10m 0s >>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on >>>>>>>>>>>>>>> cancellation) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 3. BucketingSink configuration >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything special >>>>>>>>>>>>>>> here, if not the fact that we're writing to S3. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> String outputPath = "s3://bucket/output"; >>>>>>>>>>>>>>> BucketingSink<Map<String, String>> sink = new >>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>>>>>>>>>>> .setBucketer(new ProcessdateBucketer()) >>>>>>>>>>>>>>> .setBatchSize(batchSize) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 4. Kafka & event time >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a >>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to >>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We also >>>>>>>>>>>>>>> write late >>>>>>>>>>>>>>> data to side output, but nothing is written there – if it >>>>>>>>>>>>>>> would, it could >>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that our >>>>>>>>>>>>>>> late data >>>>>>>>>>>>>>> writing works, because we previously had some actual late data >>>>>>>>>>>>>>> which ended >>>>>>>>>>>>>>> up there). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 5. allowedLateness >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It may be or may not be relevant that I have also enabled >>>>>>>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If that makes sense, I could try removing allowedLateness >>>>>>>>>>>>>>> entirely? That would be just to rule out that Flink doesn't >>>>>>>>>>>>>>> have a bug >>>>>>>>>>>>>>> that's related to restoring state in combination with the >>>>>>>>>>>>>>> allowedLateness >>>>>>>>>>>>>>> feature. After all, all of our data should be in a good enough >>>>>>>>>>>>>>> order to not >>>>>>>>>>>>>>> be late, given the max out of orderness used on kafka consumer >>>>>>>>>>>>>>> timestamp >>>>>>>>>>>>>>> extractor. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thank you in advance! >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>> >>> >> >> >