Thanks a lot, use env.setParallelism(1) before the source define works (I set
it before the env.execute, so it did not take effect).
> On 3 Mar 2018, at 16:02, sundy <543950...@qq.com> wrote:
>
> Hi Hequn Cheng,
>
> Finally I got the problem and find the way to define the correct WaterMark by
> your advice, thank you very much.
>
> The problem is that I set the watermark to the
>
> waterMark = maxEventTime - lag
>
> And the timeWindow is 10Seconds, But I generated the test records too
> quickly so the 10000 records are all in the window duration(my bad).
> So flink are waiting for new more numbers to close the window.
>
> Another one question is why I set 'env.setParallelism(1)’ and run the code
> in IDEA(mini Flink cluster) , but the getWatermark is called in 4 different
> threads?
> Which time is the getWaterMark function called? After the keyBy operation or
> after the source operation?
>
>
>
>> On 3 Mar 2018, at 15:28, Hequn Cheng <chenghe...@gmail.com
>> <mailto:chenghe...@gmail.com>> wrote:
>>
>> Hi sundy,
>>
>> The default parallelism is 4. It seems that your configuration does not take
>> effect. You can try 'env.setParallelism(1)' to set the job parallelism.
>> For watermark, you can refer to [1] and [2].
>>
>> PS: Others can not see your reply because you only reply to me. Try reply to
>> all so that others can help you too :-)
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>>
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition>
>>
>> On Sat, Mar 3, 2018 at 3:03 PM, sundy <543950...@qq.com
>> <mailto:543950...@qq.com>> wrote:
>>
>> Hi Hequn Cheng,
>>
>> Thanks for you advice, I think I found the problem. But I didn't know why.
>>
>> Firstly, let me introduce my operations, I run the code in IDEA with
>> MinFlinkCluster, and then I set the env parallelism to 1, and the kafka
>> topic ‘stream_test' has one partition. Then send 10000 records with current
>> timestamp the kafka , format is ‘4281_783_1520059217832’ , the last field
>> is the evenTime.
>>
>> I add this debug code to print the thread-id.
>>
>> <PastedGraphic-1.png>
>>
>>
>> The result is it will print in 4 threads int each period duration(3
>> seconds).Such as
>>
>> 50:water mark:0
>> 52:water mark:0
>> 51:water mark:1520056427871
>> 49:water mark:0
>>
>> So this results to the watermark 0. But why it happened in 1 parallelism?
>> Maybe it was caused by the keyBy operation? I am new to Flink, I hope to
>> know how to set the watermark in the right way.
>>
>>
>>
>>
>>
>>> On 3 Mar 2018, at 13:52, Hequn Cheng <chenghe...@gmail.com
>>> <mailto:chenghe...@gmail.com>> wrote:
>>>
>>> Hi sundy,
>>>
>>> 1. Some partition of your input kafka don't have data. Since window
>>> watermark is the min value of all it's inputs, if there are no data from
>>> one of it's inputs, window will never be triggered. You can set parallelism
>>> of your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but
>>> worth a try).
>>> 2. Only one record in the input. In this case, window can not be triggered
>>> either. You might think of it like the time has be stopped. To trigger the
>>> widow, you should read more data with watermark bigger than the window end.
>>>
>>> Hope it helps you.
>>> Best, Hequn
>>>
>>> 2018-03-03 13:06 GMT+08:00 sundy <543950...@qq.com
>>> <mailto:543950...@qq.com>>:
>>> Hi, thanks for your reply.
>>>
>>> I have searched it in stackoverflow, and there is someone who has the some
>>> problem.
>>>
>>> https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc
>>>
>>> <https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc>
>>>
>>>
>>> From your advice, I tried the code.
>>>
>>> env.getConfig().setAutoWatermarkInterval(3 * 1000);
>>>
>>> And it calls the getCurrentWaterMark function each 3 seconds, but still no
>>> result come out.
>>> From the outputs ('water mark1520049229163'), I could see that the add
>>> method is called. But the no result from the sink function.
>>>
>>>
>>>
>>>
>>>> On 3 Mar 2018, at 12:47, Xingcan Cui <xingc...@gmail.com
>>>> <mailto:xingc...@gmail.com>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> for periodically generated watermarks, you should use
>>>> `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.
>>>>
>>>> Hope that helps.
>>>>
>>>> Best,
>>>> Xingcan
>>>>
>>>>> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com
>>>>> <mailto:543950...@qq.com>> wrote:
>>>>>
>>>>>
>>>>>
>>>>> Hi, I got a problem in Flink and need your help.
>>>>>
>>>>> I tried to use TimeCharacteristic.EvenTime, but the sink function never
>>>>> be executed.
>>>>>
>>>>> public class StreamingJob {
>>>>> public static void main(String[] args) throws Exception {
>>>>> // set up the streaming execution environment
>>>>> final StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> ObjectMapper jsonMapper = new ObjectMapper();
>>>>>
>>>>> Properties properties = new Properties();
>>>>> // String brokers = "172.27.138.8:9092 <http://172.27.138.8:9092/>";
>>>>> String brokers = "localhost:9092";
>>>>> properties.setProperty("bootstrap.servers", brokers);
>>>>> properties.setProperty("group.id <http://group.id/>", "test_fink");
>>>>> String topic = "stream_test";
>>>>>
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> FlinkKafkaConsumer010<BitRate> myConsumer =
>>>>> new FlinkKafkaConsumer010(topic, new
>>>>> BitRate.BitRateDeserializtionSchema(), properties);
>>>>>
>>>>> DataStream<BitRate> stream = env.addSource(myConsumer)
>>>>> .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
>>>>> DataStream<BitRate>
>>>>> reduceItems =
>>>>> stream
>>>>> .keyBy(a -> a.gameId)
>>>>> .timeWindow(Time.seconds(10))
>>>>> .reduce((a, b) -> a.add(b));
>>>>>
>>>>> reduceItems.addSink(new FlinkKafkaProducer010<>(brokers,
>>>>> "stream_sink", (tuple) -> {
>>>>> try {
>>>>> tuple.end();
>>>>> System.out.println(tuple.rate + "\t" + tuple.user);
>>>>> return jsonMapper.writeValueAsBytes(tuple);
>>>>> } catch (JsonProcessingException e) {
>>>>> e.printStackTrace();
>>>>> return "".getBytes();
>>>>> }
>>>>> }));
>>>>>
>>>>> env.execute("Flink Streaming Java API Skeleton");
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> Here is the CustomWatermarkEmitter. I tried to increase the lag number,
>>>>> but not worked.
>>>>>
>>>>> public class CustomWatermarkEmitter implements
>>>>> AssignerWithPeriodicWatermarks<BitRate> {
>>>>>
>>>>> private long currentMax = 0;
>>>>> private long lag = 3600 * 1000; //not worked ,even though the lag is
>>>>> very big
>>>>>
>>>>> @Nullable
>>>>> @Override
>>>>> public Watermark getCurrentWatermark() {
>>>>> long atLeastTime = currentMax - lag;
>>>>> System.out.println("water mark" + atLeastTime);
>>>>> return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>>>>> }
>>>>>
>>>>> @Override
>>>>> public long extractTimestamp(BitRate bitRate, long l) {
>>>>> currentMax = Long.max(bitRate.eventTime, currentMax);
>>>>> return bitRate.eventTime;
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> Here is the entity BitRate, the logs are generated in time , sample log
>>>>> `4281_783_1520047769115`
>>>>>
>>>>>
>>>>> public BitRate(long eventTime, long gameId, long rate, long user) {
>>>>> this.eventTime = eventTime;
>>>>>
>>>>> this.gameId = gameId;
>>>>> this.rate = rate;
>>>>> this.user = user;
>>>>> this.startTs = System.currentTimeMillis();
>>>>> this.type = 0;
>>>>> }
>>>>>
>>>>> public void end() {
>>>>> this.endTs = System.currentTimeMillis();
>>>>> }
>>>>>
>>>>> public BitRate add(BitRate b) {
>>>>> System.out.println("Add:" + b.rate);
>>>>> this.rate += b.rate;
>>>>> this.user += b.user;
>>>>> return this;
>>>>> }
>>>>>
>>>>
>>>
>>>
>>
>>
>