Thanks a lot, use env.setParallelism(1) before the source  define works (I set 
it before the env.execute, so it did not take effect).
 
> On 3 Mar 2018, at 16:02, sundy <543950...@qq.com> wrote:
> 
> Hi Hequn Cheng,
> 
> Finally I got the problem and find the way to define the correct WaterMark by 
> your advice, thank you very much.
> 
> The problem is that I set the watermark to the 
> 
>     waterMark =  maxEventTime - lag 
> 
> And the timeWindow is 10Seconds,  But I generated the test records too 
> quickly so the 10000 records are all in the window duration(my bad). 
> So flink are waiting for new more numbers to close the window.  
> 
> Another one question is why I set   'env.setParallelism(1)’ and run the code 
> in IDEA(mini Flink cluster) , but the getWatermark is called in 4 different 
> threads?
> Which time is  the getWaterMark function called? After the keyBy operation or 
> after the source operation?
> 
> 
> 
>> On 3 Mar 2018, at 15:28, Hequn Cheng <chenghe...@gmail.com 
>> <mailto:chenghe...@gmail.com>> wrote:
>> 
>> Hi sundy,
>> 
>> The default parallelism is 4. It seems that your configuration does not take 
>> effect. You can try 'env.setParallelism(1)' to set the job parallelism.
>> For watermark, you can refer to [1] and [2].
>> 
>> PS: Others can not see your reply because you only reply to me. Try reply to 
>> all so that others can help you too :-) 
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams>
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition>
>> 
>> On Sat, Mar 3, 2018 at 3:03 PM, sundy <543950...@qq.com 
>> <mailto:543950...@qq.com>> wrote:
>> 
>> Hi Hequn Cheng,
>> 
>> Thanks for you advice, I think I found the problem. But I didn't know why.
>> 
>> Firstly, let me introduce my operations,  I run the code in IDEA with 
>> MinFlinkCluster,  and then I set the env parallelism to 1, and the kafka 
>> topic ‘stream_test' has  one partition. Then send 10000 records with current 
>> timestamp  the kafka , format is  ‘4281_783_1520059217832’ , the last field 
>> is the evenTime.
>> 
>> I add this debug code to print the thread-id.
>> 
>> <PastedGraphic-1.png>
>> 
>> 
>> The result is it will print in 4 threads int each period duration(3 
>> seconds).Such as
>> 
>> 50:water mark:0
>> 52:water mark:0
>> 51:water mark:1520056427871
>> 49:water mark:0
>> 
>> So this results to the watermark 0. But why it happened in 1 parallelism? 
>> Maybe it was caused by the keyBy operation?  I am new to Flink,  I hope to 
>> know how to set the watermark in the right way.
>> 
>> 
>>   
>>  
>> 
>>> On 3 Mar 2018, at 13:52, Hequn Cheng <chenghe...@gmail.com 
>>> <mailto:chenghe...@gmail.com>> wrote:
>>> 
>>> Hi sundy,
>>> 
>>> 1. Some partition of your input kafka don't have data. Since window 
>>> watermark is the min value of all it's inputs, if there are no data from 
>>> one of it's inputs, window will never be triggered. You can set parallelism 
>>> of your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but 
>>> worth a try).
>>> 2. Only one record in the input. In this case, window can not be triggered 
>>> either. You might think of it like the time has be stopped. To trigger the 
>>> widow, you should read more data with watermark bigger than the window end.
>>> 
>>> Hope it helps you.
>>> Best, Hequn
>>> 
>>> 2018-03-03 13:06 GMT+08:00 sundy <543950...@qq.com 
>>> <mailto:543950...@qq.com>>:
>>> Hi, thanks for your reply. 
>>> 
>>> I have searched it in stackoverflow, and there is someone who has the some 
>>> problem.
>>> 
>>> https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc
>>>  
>>> <https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc>
>>> 
>>> 
>>> From your advice, I tried the code. 
>>> 
>>>  env.getConfig().setAutoWatermarkInterval(3 * 1000);
>>> 
>>> And it calls the getCurrentWaterMark function each 3 seconds,  but still no 
>>> result come out.
>>> From the outputs   ('water mark1520049229163'), I could see that the add 
>>> method is called. But the no result from the sink function.
>>> 
>>> 
>>> 
>>> 
>>>> On 3 Mar 2018, at 12:47, Xingcan Cui <xingc...@gmail.com 
>>>> <mailto:xingc...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> for periodically generated watermarks, you should use 
>>>> `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.
>>>> 
>>>> Hope that helps.
>>>> 
>>>> Best,
>>>> Xingcan
>>>> 
>>>>> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com 
>>>>> <mailto:543950...@qq.com>> wrote:
>>>>> 
>>>>> 
>>>>> 
>>>>> Hi, I got a problem in Flink  and need your help.
>>>>> 
>>>>> I tried to use TimeCharacteristic.EvenTime, but the sink function never 
>>>>> be executed.  
>>>>> 
>>>>> public class StreamingJob {
>>>>>   public static void main(String[] args) throws Exception {
>>>>>     // set up the streaming execution environment
>>>>>     final StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>     ObjectMapper jsonMapper = new ObjectMapper();
>>>>> 
>>>>>     Properties properties = new Properties();
>>>>> //    String brokers = "172.27.138.8:9092 <http://172.27.138.8:9092/>";
>>>>>     String brokers = "localhost:9092";
>>>>>     properties.setProperty("bootstrap.servers", brokers);
>>>>>     properties.setProperty("group.id <http://group.id/>", "test_fink");
>>>>>     String topic = "stream_test";
>>>>> 
>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>     FlinkKafkaConsumer010<BitRate> myConsumer =
>>>>>         new FlinkKafkaConsumer010(topic, new 
>>>>> BitRate.BitRateDeserializtionSchema(), properties);
>>>>> 
>>>>>     DataStream<BitRate> stream = env.addSource(myConsumer)
>>>>>         .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
>>>>>     DataStream<BitRate>
>>>>>         reduceItems =
>>>>>         stream
>>>>>             .keyBy(a -> a.gameId)
>>>>>             .timeWindow(Time.seconds(10))
>>>>>             .reduce((a, b) -> a.add(b));
>>>>> 
>>>>>     reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, 
>>>>> "stream_sink", (tuple) -> {
>>>>>       try {
>>>>>         tuple.end();
>>>>>         System.out.println(tuple.rate + "\t" + tuple.user);
>>>>>         return jsonMapper.writeValueAsBytes(tuple);
>>>>>       } catch (JsonProcessingException e) {
>>>>>         e.printStackTrace();
>>>>>         return "".getBytes();
>>>>>       }
>>>>>     }));
>>>>> 
>>>>>     env.execute("Flink Streaming Java API Skeleton");
>>>>>   }
>>>>> 
>>>>> }
>>>>> 
>>>>> 
>>>>> Here is the CustomWatermarkEmitter. I tried to increase the lag number,  
>>>>> but not worked.
>>>>> 
>>>>> public class CustomWatermarkEmitter implements 
>>>>> AssignerWithPeriodicWatermarks<BitRate> {
>>>>> 
>>>>>   private long currentMax = 0;
>>>>>   private long lag = 3600 * 1000; //not worked ,even though the lag is 
>>>>> very big
>>>>> 
>>>>>   @Nullable
>>>>>   @Override
>>>>>   public Watermark getCurrentWatermark() {
>>>>>     long atLeastTime = currentMax - lag;
>>>>>     System.out.println("water mark" + atLeastTime);
>>>>>     return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public long extractTimestamp(BitRate bitRate, long l) {
>>>>>     currentMax = Long.max(bitRate.eventTime, currentMax);
>>>>>     return bitRate.eventTime;
>>>>>   }
>>>>> }
>>>>> 
>>>>> 
>>>>> Here is the entity BitRate, the logs are generated in time , sample log   
>>>>> `4281_783_1520047769115`
>>>>> 
>>>>> 
>>>>> public BitRate(long eventTime, long gameId, long rate, long user) {
>>>>>   this.eventTime = eventTime;
>>>>> 
>>>>>   this.gameId = gameId;
>>>>>   this.rate = rate;
>>>>>   this.user = user;
>>>>>   this.startTs = System.currentTimeMillis();
>>>>>   this.type = 0;
>>>>> }
>>>>> 
>>>>> public void end() {
>>>>>   this.endTs = System.currentTimeMillis();
>>>>> }
>>>>> 
>>>>> public BitRate add(BitRate b) {
>>>>>   System.out.println("Add:" + b.rate);
>>>>>   this.rate += b.rate;
>>>>>   this.user += b.user;
>>>>>   return this;
>>>>> }
>>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to