Hi, as an addition. I don’t have a solution yet, for the general problem of what happens when a parallel instance of a source never receives elements. This watermark business is very tricky...
Cheers, Aljoscha > On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi Konstantin, > I finally nailed down the problem. :-) > > The basis of the problem is the fact that there is a mismatch in the > parallelism of the Flink Kafka Consumer and the number of partitions in the > Kafka Stream. I would assume that in your case the Kafka Stream has 1 > partition. This means, that only one of the parallel instances of the Flink > Kafka Consumer ever receives element, which in turn means that only one of > the parallel instances of the timestamp extractor ever receives elements. > This means that no watermarks get emitted for the other parallel instances > which in turn means that the watermark does not advance downstream because > the watermark at an operator is the minimum over all upstream watermarks. > This explains why ExampleTimestampExtractor1 only works in the case with > parallelism=1. > > The reason why ExampleTimestampExtractor2 works in all parallelism settings > is not very obvious. The secret is in this method: > > @Override > public long getCurrentWatermark() { > return lastTimestamp - maxDelay; > } > > In the parallel instances that never receive any element lastTimestamp is set > to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - > maxDelay (+1)). Now, because the watermark at an operator is always the > minimum over all watermarks from upstream operators the watermark at the > window operator always tracks the watermark of the parallel instance that > receives elements. > > I hope this helps, but please let me know if I should provide more > explanation. This is a very tricky topic. > > Cheers, > Aljoscha > >> On 29 Nov 2015, at 21:18, Konstantin Knauf <konstantin.kn...@tngtech.com> >> wrote: >> >> Hi Aljoscha, >> >> I have put together a gist [1] with two classes, a short processing >> pipeline, which shows the behavior and a data generator to write records >> into Kafka. I hope I remembered everything we discussed correctly. >> >> So basically in the example it works with "TimestampExtractor1" only for >> parallelism 1, with "TimestampExtractor2" it works regardless of the >> parallelism. Run from the IDE. >> >> Let me know if you need anything else. >> >> Cheers, >> >> Konstantin >> >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d >> >> On 25.11.2015 21:15, Konstantin Knauf wrote: >>> Hi Aljoscha, >>> >>> sure, will do. I have neither found a solution. I won't have time to put >>> a minimal example together before the weekend though. >>> >>> Cheers, >>> >>> Konstantin >>> >>> On 25.11.2015 19:10, Aljoscha Krettek wrote: >>>> Hi Konstantin, >>>> I still didn’t come up with an explanation for the behavior. Could you >>>> maybe send me example code (and example data if it is necessary to >>>> reproduce the problem.)? This would really help me pinpoint the problem. >>>> >>>> Cheers, >>>> Aljoscha >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <konstantin.kn...@tngtech.com> >>>>> wrote: >>>>> >>>>> Hi Aljoscha, >>>>> >>>>> Are you sure? I am running the job from my IDE at the moment. >>>>> >>>>> If I set >>>>> >>>>> StreamExecutionEnvironment.setParallelism(1); >>>>> >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from >>>>> getCurrentWatermark() and emitting a watermark at every record) >>>>> >>>>> If I set >>>>> >>>>> StreamExecutionEnvironment.setParallelism(5); >>>>> >>>>> it does not work. >>>>> >>>>> So, if I understood you correctly, it is the opposite of what you were >>>>> expecting?! >>>>> >>>>> Cheers, >>>>> >>>>> Konstantin >>>>> >>>>> >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote: >>>>>> Hi, >>>>>> actually, the bug is more subtle. Normally, it is not a problem that the >>>>>> TimestampExtractor sometimes emits a watermark that is lower than the >>>>>> one before. (This is the result of the bug with Long.MIN_VALUE I >>>>>> mentioned before). The stream operators wait for watermarks from all >>>>>> upstream operators and only advance the watermark monotonically in >>>>>> lockstep with them. This way, the watermark cannot decrease at an >>>>>> operator. >>>>>> >>>>>> In your case, you have a topology with parallelism 1, I assume. In that >>>>>> case the operators are chained. (There is no separate operators but >>>>>> basically only one operator and element transmission happens in function >>>>>> calls). In this setting the watermarks are directly forwarded to >>>>>> operators without going through the logic I mentioned above. >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf >>>>>>> <konstantin.kn...@tngtech.com> wrote: >>>>>>> >>>>>>> Hi Aljoscha, >>>>>>> >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only >>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do >>>>>>> the opposite than before (only watermarks per events vs only watermarks >>>>>>> per autowatermark). And now it works :). The question remains, why it >>>>>>> did not work before. As far as I see, it is an issue with the first >>>>>>> TimestmapExtractor itself?! >>>>>>> >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted >>>>>>> watermarks? >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> Konstantin >>>>>>> >>>>>>> [1] >>>>>>> >>>>>>> final private long maxDelay; >>>>>>> private long lastTimestamp = Long.MIN_VALUE; >>>>>>> >>>>>>> public PojoTimestampExtractor(long maxDelay) { >>>>>>> this.maxDelay = maxDelay; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public long extractTimestamp(Pojo pojo, long l) { >>>>>>> lastTimestamp = pojo.getTime(); >>>>>>> return pojo.getTime(); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public long extractWatermark(Pojo pojo, long l) { >>>>>>> return Long.MIN_VALUE; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public long getCurrentWatermark() { >>>>>>> return lastTimestamp - maxDelay; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote: >>>>>>>> Hi, >>>>>>>> yes, at your data-rate emitting a watermark for every element should >>>>>>>> not be a problem. It could become a problem with higher data-rates >>>>>>>> since the system can get overwhelmed if every element also generates a >>>>>>>> watermark. In that case I would suggest storing the lastest >>>>>>>> element-timestamp in an internal field and only emitting in >>>>>>>> getCurrentWatermark(), since then, then the watermark interval can be >>>>>>>> tunes using the auto-watermark interval setting. >>>>>>>> >>>>>>>> But that should not be the cause of the problem that you currently >>>>>>>> have. Would you maybe be willing to send me some (mock) example data >>>>>>>> and the code so that I can reproduce the problem and have a look at >>>>>>>> it? to aljoscha at apache.org. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Aljoscha >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf >>>>>>>>> <konstantin.kn...@tngtech.com> wrote: >>>>>>>>> >>>>>>>>> Hi Aljoscha, >>>>>>>>> >>>>>>>>> ok, now I at least understand, why it works with fromElements(...). >>>>>>>>> For >>>>>>>>> the rest I am not so sure. >>>>>>>>> >>>>>>>>>> What this means in your case is that the watermark can only advance >>>>>>>>>> if >>>>>>>>> a new element arrives, because only then is the watermark updated. >>>>>>>>> >>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean >>>>>>>>> something else? >>>>>>>>> >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok >>>>>>>>> choice, if i understand the semantics correctly. It just affects >>>>>>>>> watermarking in the absence of events, right? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> >>>>>>>>> Konstantin >>>>>>>>> >>>>>>>>> >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote: >>>>>>>>>> Hi, >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the >>>>>>>>>> TimestampExtractor works internally. >>>>>>>>>> >>>>>>>>>> First, the timestamp extractor internally keeps the value of the >>>>>>>>>> last emitted watermark. Then, the semantics of the >>>>>>>>>> TimestampExtractor are as follows : >>>>>>>>>> - the result of extractTimestamp is taken and it replaces the >>>>>>>>>> internal timestamp of the element >>>>>>>>>> - if the result of extractWatermark is larger than the last >>>>>>>>>> watermark the new value is emitted as a watermark and the value is >>>>>>>>>> stored >>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark >>>>>>>>>> interval, if the returned value is larger than the last watermark it >>>>>>>>>> is emitted and stored as last watermark >>>>>>>>>> >>>>>>>>>> What this means in your case is that the watermark can only advance >>>>>>>>>> if a new element arrives, because only then is the watermark updated. >>>>>>>>>> >>>>>>>>>> The reason why you see results if you use fromElements is that the >>>>>>>>>> window-operator also emits all the windows that it currently has >>>>>>>>>> buffered if the program closes. This happens in the case of >>>>>>>>>> fromElements because only a finite number of elements is emitted, >>>>>>>>>> after which the source closes, thereby finishing the whole program. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Aljoscha >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha? >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public long getCurrentWatermark() { >>>>>>>>>>> return Long.MIN_VALUE; >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> Gyula >>>>>>>>>>> >>>>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: >>>>>>>>>>> 2015. nov. 16., H, 10:39): >>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>> >>>>>>>>>>> thanks for your answer. Yes I am using the same >>>>>>>>>>> TimestampExtractor-Class. >>>>>>>>>>> >>>>>>>>>>> The timestamps look good to me. Here an example. >>>>>>>>>>> >>>>>>>>>>> {"time": 1447666537260, ...} And parsed: >>>>>>>>>>> 2015-11-16T10:35:37.260+01:00 >>>>>>>>>>> >>>>>>>>>>> The order now is >>>>>>>>>>> >>>>>>>>>>> stream >>>>>>>>>>> .map(dummyMapper) >>>>>>>>>>> .assignTimestamps(...) >>>>>>>>>>> .timeWindow(...) >>>>>>>>>>> >>>>>>>>>>> Is there a way to print out the assigned timestamps after >>>>>>>>>>> stream.assignTimestamps(...)? >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> >>>>>>>>>>> Konstantin >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote: >>>>>>>>>>>> Hi, >>>>>>>>>>>> are you also using the timestamp extractor when you are using >>>>>>>>>>>> env.fromCollection(). >>>>>>>>>>>> >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that >>>>>>>>>>>> just prints the element and forwards it? To see if the elements >>>>>>>>>>>> come with a good timestamp from Kafka. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Aljoscha >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf >>>>>>>>>>>>> <konstantin.kn...@tngtech.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>> >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka. >>>>>>>>>>>>> >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I >>>>>>>>>>>>> read in >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this: >>>>>>>>>>>>> >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082< >>>>>>>>>>>>> (parameterTool.getRequired("topic"), >>>>>>>>>>>>> new AvroPojoDeserializationSchema(), >>>>>>>>>>>>> parameterTool.getProperties())) >>>>>>>>>>>>> >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are >>>>>>>>>>>>> EventTime, >>>>>>>>>>>>> AutoWatermarkIntervall is 500. >>>>>>>>>>>>> >>>>>>>>>>>>> The problem is, when I do something like: >>>>>>>>>>>>> >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000)) >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) >>>>>>>>>>>>> .sum(..) >>>>>>>>>>>>> .print() >>>>>>>>>>>>> >>>>>>>>>>>>> env.execute(); >>>>>>>>>>>>> >>>>>>>>>>>>> the windows never get triggered. >>>>>>>>>>>>> >>>>>>>>>>>>> If I use ProcessingTime it works. >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it >>>>>>>>>>>>> works >>>>>>>>>>>>> with EventTime, too. >>>>>>>>>>>>> >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated. >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> >>>>>>>>>>>>> Konstantin >>>>>>>>>>>>> >>>>>>>>>>>>> [1]: >>>>>>>>>>>>> >>>>>>>>>>>>> public class PojoTimestampExtractor implements >>>>>>>>>>>>> TimestampExtractor<Pojo> { >>>>>>>>>>>>> >>>>>>>>>>>>> final private long maxDelay; >>>>>>>>>>>>> >>>>>>>>>>>>> public PojoTimestampExtractor(long maxDelay) { >>>>>>>>>>>>> this.maxDelay = maxDelay; >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> @Override >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) { >>>>>>>>>>>>> return pojo.getTime(); >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> @Override >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) { >>>>>>>>>>>>> return pojo.getTime() - maxDelay; >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> @Override >>>>>>>>>>>>> public long getCurrentWatermark() { >>>>>>>>>>>>> return Long.MIN_VALUE; >>>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>> >>>> >>> >> >> -- >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >