Hi, Bellow is my code 

 splitStream.select(duringTime + "")
                .map(new KeyMapFunc())
                .assignTimestampsAndWatermarks(new DelaySaltWatermarks())
                .setParallelism(300)
                .keyBy(_SQL, _KEY, _SALT)
               
.window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10)))
                .apply(new WindowSaltFunc())
                .keyBy(_SQL, _KEY)
               
.window(TumblingEventTimeWindows.of(Time.seconds(duringTime)))
                .apply(new WindowFunc())
                .addSink(new FlinkKafkaProducer010<>("topic", new
SimpleSerializationSchema(), this.properties));

and 

public class DelaySaltWatermarks implements
AssignerWithPeriodicWatermarks<ContentMessage> {

    private long currentMaxTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER);
    }

    @Override
    public long extractTimestamp(ContentMessage contentMessage, long l) {
        long timestamp = contentMessage.getTimestamp();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
}

and when i changed the Parallelism(300) of assigntimestampandwatermarks ,
the window can be fired.

thanks,
aitozi


Aljoscha Krettek wrote
> Hi,
> 
> So I understood that you have roughly this pipeline:
> 
> Input 1 --\
>            |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window    
> Input 2 --/
> 
> If the timestamp assigner is after the CoFlatMap the processInput() method
> of the extractor should still be called. Not by the StreamInputProcessor
> but by ChainingOutput [1], which basically connects the Two-Input
> CoFlatMap to the one-input operator that comes after that. The could still
> be a bug in there somewhere, however.
> 
> Could you maybe send me the relevant parts of your code, so that I can
> have a look. Or provide a minimal example.
> 
> Best,
> Aljoscha
> 
> [1]
> https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394
> 
>> On 7. Aug 2017, at 19:21, aitozi &lt;

> gjying1314@

> &gt; wrote:
>> 
>> 
>> Hi,
>> 
>> my flink version is 1.2
>> 
>> i am work on this problem these days. Below is my found.
>> 
>> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
>> the before operator, the before operator has two input(it is a
>> "connected"
>> Co-FlatMap operator with parallelism 240), it runs into that the
>> watermark
>> didn't update.
>> 
>> the i look into the source code, that the
>> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask
>> has
>> method with processElement1() and processElement2() method, but all of
>> them
>> do not run processElement in StreamInputProcessor to
>> extractTimestamp(shown
>> in TimestampsAndPeriodicWatermarksOperator)
>> 
>> so that, the timestamp is not update, and my waterMark is update just
>> like
>> the class BoundedOutOfOrdernessTimestampExtractor .
>> 
>> So, is it a bug that the timestamp is not update when deal with a two
>> input
>> stream.
>> 
>> Ps: my English is not very good , i dont know can you understand me :)
>> 
>> thanks,
>> aitozi
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to