Hi Soheil, Hequn has given you the usage of this method, see here : https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L639
Thanks, vino. 2018-07-31 17:56 GMT+08:00 Soheil Pourbafrani <soheil.i...@gmail.com>: > Hi vino, > > Could you please show markAsTemporary usage by a simple example? > Thanks > > On Tue, Jul 31, 2018 at 2:10 PM, vino yang <yanghua1...@gmail.com> wrote: > >> Hi Soheil, >> >> The documentation of markAsTemporarilyIdle method is here : >> https://ci.apache.org/projects/flink/flink-docs-release-1. >> 5/api/java/org/apache/flink/streaming/api/functions/ >> source/SourceFunction.SourceContext.html#markAsTemporarilyIdle-- >> >> Thanks, vino. >> >> 2018-07-31 17:14 GMT+08:00 Hequn Cheng <chenghe...@gmail.com>: >> >>> Hi Soheil, >>> >>> You can set parallelism to 1 to solve the problem. >>> Or use markAsTemporarilyIdle() as Fabian said(the link maybe is >>> https://github.com/apache/flink/blob/master/flink-connectors >>> /flink-connector-kafka-base/src/main/java/org/apache/flink/ >>> streaming/connectors/kafka/FlinkKafkaConsumerBase.java line639). >>> >>> On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> If you are using a custom source, you can call >>>> SourceContext.markAsTemporarilyIdle() to indicate that a task is >>>> currently not producing new records [1]. >>>> >>>> Best, Fabian >>>> >>>> 2018-07-31 8:50 GMT+02:00 Reza Sameei <reza.sa...@gmail.com>: >>>> >>>>> It's not a real solution; but why you don't change the parallelism for >>>>> your `SourceFunction`? >>>>> >>>>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani < >>>>> soheil.i...@gmail.com> wrote: >>>>> >>>>>> In Flink Event time mode, I use the periodic watermark to advance >>>>>> event time. Every slot extract event time from the incoming message and >>>>>> to >>>>>> emit watermark, subtract it a network delay, say 3000ms. >>>>>> >>>>>> public Watermark getCurrentWatermark() { >>>>>> return new Watermark(MAX_TIMESTAMP - DELEY); >>>>>> } >>>>>> >>>>>> I have 4 active slots. The problem is just two slots get incoming >>>>>> data but all of them call the method getCurrentWatermark(). So in >>>>>> this situation consider a case that thread 1 and 2 get incoming data and >>>>>> thread 3 and 4 will not. >>>>>> >>>>>> Thread-1-watermark ---> 1541217659806 >>>>>> Thread-2-watermark ---> 1541217659810 >>>>>> Thread-3-watermark ---> (0 - 3000 = -3000) >>>>>> Thread-4-watermark ---> (0 - 3000 = -3000) >>>>>> >>>>>> So as Flink set the lowest watermark as the general watermark, time >>>>>> doesn't go on! If I change the getCurrentWatermark() method as: >>>>>> >>>>>> public Watermark getCurrentWatermark() { >>>>>> return new Watermark(System.currentTimeMillis() - DELEY); >>>>>> } >>>>>> >>>>>> it will solve the problem, but I don't want to use machine's >>>>>> timestamp! How can I fix the problem? >>>>>> >>>>>> >>>>> >>>>> -- >>>>> رضا سامعی | Reza Sameei | Software Developer | 09126662695 >>>>> >>>> >>>> >>> >> >