Hi Soheil,

Hequn has given you the usage of this method, see here :
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L639

Thanks, vino.

2018-07-31 17:56 GMT+08:00 Soheil Pourbafrani <soheil.i...@gmail.com>:

> Hi vino,
>
> Could you please show markAsTemporary usage by a simple example?
> Thanks
>
> On Tue, Jul 31, 2018 at 2:10 PM, vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Soheil,
>>
>> The documentation of markAsTemporarilyIdle method is here :
>> https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 5/api/java/org/apache/flink/streaming/api/functions/
>> source/SourceFunction.SourceContext.html#markAsTemporarilyIdle--
>>
>> Thanks, vino.
>>
>> 2018-07-31 17:14 GMT+08:00 Hequn Cheng <chenghe...@gmail.com>:
>>
>>> Hi Soheil,
>>>
>>> You can set parallelism to 1 to solve the problem.
>>> Or use markAsTemporarilyIdle() as Fabian said(the link maybe is
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-kafka-base/src/main/java/org/apache/flink/
>>> streaming/connectors/kafka/FlinkKafkaConsumerBase.java line639).
>>>
>>> On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> If you are using a custom source, you can call
>>>> SourceContext.markAsTemporarilyIdle() to indicate that a task is
>>>> currently not producing new records [1].
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-07-31 8:50 GMT+02:00 Reza Sameei <reza.sa...@gmail.com>:
>>>>
>>>>> It's not a real solution; but why you don't change the parallelism for
>>>>> your `SourceFunction`?
>>>>>
>>>>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <
>>>>> soheil.i...@gmail.com> wrote:
>>>>>
>>>>>> In Flink Event time mode, I use the periodic watermark to advance
>>>>>> event time. Every slot extract event time from the incoming message and 
>>>>>> to
>>>>>> emit watermark, subtract it a network delay, say 3000ms.
>>>>>>
>>>>>> public Watermark getCurrentWatermark() {
>>>>>>             return new Watermark(MAX_TIMESTAMP - DELEY);
>>>>>>         }
>>>>>>
>>>>>> I have 4 active slots. The problem is just two slots get incoming
>>>>>> data but all of them call the method getCurrentWatermark(). So in
>>>>>> this situation consider a case that thread 1 and 2 get incoming data and
>>>>>> thread 3 and 4 will not.
>>>>>>
>>>>>> Thread-1-watermark ---> 1541217659806
>>>>>> Thread-2-watermark ---> 1541217659810
>>>>>> Thread-3-watermark ---> (0 - 3000 = -3000)
>>>>>> Thread-4-watermark ---> (0 - 3000 = -3000)
>>>>>>
>>>>>> So as Flink set the lowest watermark as the general watermark, time
>>>>>> doesn't go on! If I change the getCurrentWatermark() method as:
>>>>>>
>>>>>> public Watermark getCurrentWatermark() {
>>>>>>             return new Watermark(System.currentTimeMillis() - DELEY);
>>>>>>         }
>>>>>>
>>>>>> it will solve the problem, but I don't want to use machine's
>>>>>> timestamp! How can I fix the problem?
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to