Hi,
as an addition. I don’t have a solution yet, for the general problem of what 
happens when a parallel instance of a source never receives elements. This 
watermark business is very tricky...

Cheers,
Aljoscha
> On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi Konstantin,
> I finally nailed down the problem. :-)
> 
> The basis of the problem is the fact that there is a mismatch in the 
> parallelism of the Flink Kafka Consumer and the number of partitions in the 
> Kafka Stream. I would assume that in your case the Kafka Stream has 1 
> partition. This means, that only one of the parallel instances of the Flink 
> Kafka Consumer ever receives element, which in turn means that only one of 
> the parallel instances of the timestamp extractor ever receives elements. 
> This means that no watermarks get emitted for the other parallel instances 
> which in turn means that the watermark does not advance downstream because 
> the watermark at an operator is the minimum over all upstream watermarks. 
> This explains why ExampleTimestampExtractor1 only works in the case with 
> parallelism=1. 
> 
> The reason why ExampleTimestampExtractor2 works in all parallelism settings 
> is not very obvious. The secret is in this method:
> 
> @Override
> public long getCurrentWatermark() {
>   return lastTimestamp - maxDelay;
> }
> 
> In the parallel instances that never receive any element lastTimestamp is set 
> to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - 
> maxDelay (+1)). Now, because the watermark at an operator is always the 
> minimum over all watermarks from upstream operators the watermark at the 
> window operator always tracks the watermark of the parallel instance that 
> receives elements. 
> 
> I hope this helps, but please let me know if I should provide more 
> explanation. This is a very tricky topic.
> 
> Cheers,
> Aljoscha
> 
>> On 29 Nov 2015, at 21:18, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>> wrote:
>> 
>> Hi Aljoscha,
>> 
>> I have put together a gist [1] with two classes, a short processing
>> pipeline, which shows the behavior and a data generator to write records
>> into Kafka. I hope I remembered everything we discussed correctly.
>> 
>> So basically in the example it works with "TimestampExtractor1" only for
>> parallelism 1, with "TimestampExtractor2" it works regardless of the
>> parallelism. Run from the IDE.
>> 
>> Let me know if you need anything else.
>> 
>> Cheers,
>> 
>> Konstantin
>> 
>> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
>> 
>> On 25.11.2015 21:15, Konstantin Knauf wrote:
>>> Hi Aljoscha,
>>> 
>>> sure, will do. I have neither found a solution. I won't have time to put
>>> a minimal example together before the weekend though.
>>> 
>>> Cheers,
>>> 
>>> Konstantin
>>> 
>>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
>>>> Hi Konstantin,
>>>> I still didn’t come up with an explanation for the behavior. Could you 
>>>> maybe send me example code (and example data if it is necessary to 
>>>> reproduce the problem.)? This would really help me pinpoint the problem.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>>>>> wrote:
>>>>> 
>>>>> Hi Aljoscha,
>>>>> 
>>>>> Are you sure? I am running the job from my IDE at the moment.
>>>>> 
>>>>> If I set
>>>>> 
>>>>> StreamExecutionEnvironment.setParallelism(1);
>>>>> 
>>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
>>>>> getCurrentWatermark() and emitting a watermark at every record)
>>>>> 
>>>>> If I set
>>>>> 
>>>>> StreamExecutionEnvironment.setParallelism(5);
>>>>> 
>>>>> it does not work.
>>>>> 
>>>>> So, if I understood you correctly, it is the opposite of what you were
>>>>> expecting?!
>>>>> 
>>>>> Cheers,
>>>>> 
>>>>> Konstantin
>>>>> 
>>>>> 
>>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> actually, the bug is more subtle. Normally, it is not a problem that the 
>>>>>> TimestampExtractor sometimes emits a watermark that is lower than the 
>>>>>> one before. (This is the result of the bug with Long.MIN_VALUE I 
>>>>>> mentioned before). The stream operators wait for watermarks from all 
>>>>>> upstream operators and only advance the watermark monotonically in 
>>>>>> lockstep with them. This way, the watermark cannot decrease at an 
>>>>>> operator.
>>>>>> 
>>>>>> In your case, you have a topology with parallelism 1, I assume. In that 
>>>>>> case the operators are chained. (There is no separate operators but 
>>>>>> basically only one operator and element transmission happens in function 
>>>>>> calls). In this setting the watermarks are directly forwarded to 
>>>>>> operators without going through the logic I mentioned above.
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf 
>>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>> 
>>>>>>> Hi Aljoscha,
>>>>>>> 
>>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>>>>>> the opposite than before (only watermarks per events vs only watermarks
>>>>>>> per autowatermark). And now it works :). The question remains, why it
>>>>>>> did not work before. As far as I see, it is an issue with the first
>>>>>>> TimestmapExtractor itself?!
>>>>>>> 
>>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted 
>>>>>>> watermarks?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> 
>>>>>>> Konstantin
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>>>> final private long maxDelay;
>>>>>>> private long lastTimestamp = Long.MIN_VALUE;
>>>>>>> 
>>>>>>> public PojoTimestampExtractor(long maxDelay) {
>>>>>>>     this.maxDelay = maxDelay;
>>>>>>> }
>>>>>>> 
>>>>>>> @Override
>>>>>>> public long extractTimestamp(Pojo pojo, long l) {
>>>>>>>     lastTimestamp = pojo.getTime();
>>>>>>>     return pojo.getTime();
>>>>>>> }
>>>>>>> 
>>>>>>> @Override
>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>     return Long.MIN_VALUE;
>>>>>>> }
>>>>>>> 
>>>>>>> @Override
>>>>>>> public long getCurrentWatermark() {
>>>>>>>     return lastTimestamp - maxDelay;
>>>>>>> }
>>>>>>> 
>>>>>>> 
>>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>>>>>> Hi,
>>>>>>>> yes, at your data-rate emitting a watermark for every element should 
>>>>>>>> not be a problem. It could become a problem with higher data-rates 
>>>>>>>> since the system can get overwhelmed if every element also generates a 
>>>>>>>> watermark. In that case I would suggest storing the lastest 
>>>>>>>> element-timestamp in an internal field and only emitting in 
>>>>>>>> getCurrentWatermark(), since then, then the watermark interval can be 
>>>>>>>> tunes using the auto-watermark interval setting.
>>>>>>>> 
>>>>>>>> But that should not be the cause of the problem that you currently 
>>>>>>>> have. Would you maybe be willing to send me some (mock) example data 
>>>>>>>> and the code so that I can reproduce the problem and have a look at 
>>>>>>>> it? to aljoscha at apache.org.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf 
>>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Aljoscha,
>>>>>>>>> 
>>>>>>>>> ok, now I at least understand, why it works with fromElements(...). 
>>>>>>>>> For
>>>>>>>>> the rest I am not so sure.
>>>>>>>>> 
>>>>>>>>>> What this means in your case is that the watermark can only advance 
>>>>>>>>>> if
>>>>>>>>> a new element arrives, because only then is the watermark updated.
>>>>>>>>> 
>>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>>>>>> something else?
>>>>>>>>> 
>>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>>>>>> choice, if i understand the semantics correctly. It just affects
>>>>>>>>> watermarking in the absence of events, right?
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> 
>>>>>>>>> Konstantin
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the 
>>>>>>>>>> TimestampExtractor works internally.
>>>>>>>>>> 
>>>>>>>>>> First, the timestamp extractor internally keeps the value of the 
>>>>>>>>>> last emitted watermark. Then, the semantics of the 
>>>>>>>>>> TimestampExtractor are as follows :
>>>>>>>>>> - the result of extractTimestamp is taken and it replaces the 
>>>>>>>>>> internal timestamp of the element
>>>>>>>>>> - if the result of extractWatermark is larger than the last 
>>>>>>>>>> watermark the new value is emitted as a watermark and the value is 
>>>>>>>>>> stored
>>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark 
>>>>>>>>>> interval, if the returned value is larger than the last watermark it 
>>>>>>>>>> is emitted and stored as last watermark
>>>>>>>>>> 
>>>>>>>>>> What this means in your case is that the watermark can only advance 
>>>>>>>>>> if a new element arrives, because only then is the watermark updated.
>>>>>>>>>> 
>>>>>>>>>> The reason why you see results if you use fromElements is that the 
>>>>>>>>>> window-operator also emits all the windows that it currently has 
>>>>>>>>>> buffered if the program closes. This happens in the case of 
>>>>>>>>>> fromElements because only a finite number of elements is emitted, 
>>>>>>>>>> after which the source closes, thereby finishing the whole program.
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>>>>> 
>>>>>>>>>>> @Override
>>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>>    return Long.MIN_VALUE;
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> Gyula
>>>>>>>>>>> 
>>>>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 
>>>>>>>>>>> 2015. nov. 16., H, 10:39):
>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>> 
>>>>>>>>>>> thanks for your answer. Yes I am using the same 
>>>>>>>>>>> TimestampExtractor-Class.
>>>>>>>>>>> 
>>>>>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>>>>> 
>>>>>>>>>>> {"time": 1447666537260, ...} And parsed: 
>>>>>>>>>>> 2015-11-16T10:35:37.260+01:00
>>>>>>>>>>> 
>>>>>>>>>>> The order now is
>>>>>>>>>>> 
>>>>>>>>>>> stream
>>>>>>>>>>> .map(dummyMapper)
>>>>>>>>>>> .assignTimestamps(...)
>>>>>>>>>>> .timeWindow(...)
>>>>>>>>>>> 
>>>>>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>>>>>> stream.assignTimestamps(...)?
>>>>>>>>>>> 
>>>>>>>>>>> Cheers,
>>>>>>>>>>> 
>>>>>>>>>>> Konstantin
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> are you also using the timestamp extractor when you are using 
>>>>>>>>>>>> env.fromCollection().
>>>>>>>>>>>> 
>>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that 
>>>>>>>>>>>> just prints the element and forwards it? To see if the elements 
>>>>>>>>>>>> come with a good timestamp from Kafka.
>>>>>>>>>>>> 
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf 
>>>>>>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I 
>>>>>>>>>>>>> read in
>>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>>>>>           new AvroPojoDeserializationSchema(),
>>>>>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are 
>>>>>>>>>>>>> EventTime,
>>>>>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>>>>>> .sum(..)
>>>>>>>>>>>>> .print()
>>>>>>>>>>>>> 
>>>>>>>>>>>>> env.execute();
>>>>>>>>>>>>> 
>>>>>>>>>>>>> the windows never get triggered.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it 
>>>>>>>>>>>>> works
>>>>>>>>>>>>> with EventTime, too.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>> 
>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> public class PojoTimestampExtractor implements 
>>>>>>>>>>>>> TimestampExtractor<Pojo> {
>>>>>>>>>>>>> 
>>>>>>>>>>>>> final private long maxDelay;
>>>>>>>>>>>>> 
>>>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>>>>>   this.maxDelay = maxDelay;
>>>>>>>>>>>>> }
>>>>>>>>>>>>> 
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>>>>>   return pojo.getTime();
>>>>>>>>>>>>> }
>>>>>>>>>>>>> 
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>>>>>   return pojo.getTime() - maxDelay;
>>>>>>>>>>>>> }
>>>>>>>>>>>>> 
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>>>>   return Long.MIN_VALUE;
>>>>>>>>>>>>> }
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -- 
>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> -- 
>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>> 
>>>>>> 
>>>>> 
>>>>> -- 
>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>>> 
>>> 
>> 
>> -- 
>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 

Reply via email to