Hi sundy, It is strange that your configuration does not take effect. Do you set parallelism somewhere else? Maybe, you can refer to the kafka test case[1]. In this test case, line 229 set parallelism to 1 and works fine. Hope it helps you.
[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java On Sat, Mar 3, 2018 at 4:02 PM, sundy <543950...@qq.com> wrote: > Hi Hequn Cheng, > > Finally I got the problem and find the way to define the correct WaterMark > by your advice, thank you very much. > > The problem is that I set the watermark to the > > waterMark = maxEventTime - lag > > And the timeWindow is 10Seconds, But I generated the test records too > quickly so the 10000 records are all in the window duration(my bad). > So flink are waiting for new more numbers to close the window. > > Another one question is why I set 'env.setParallelism(1)’ and run the > code in IDEA(mini Flink cluster) , but the getWatermark is called in 4 > different threads? > Which time is the getWaterMark function called? After the keyBy operation > or after the source operation? > > > > On 3 Mar 2018, at 15:28, Hequn Cheng <chenghe...@gmail.com> wrote: > > Hi sundy, > > The default parallelism is 4. It seems that your configuration does not > take effect. You can try 'env.setParallelism(1)' to set the job > parallelism. > For watermark, you can refer to [1] and [2]. > > PS: Others can not see your reply because you only reply to me. Try reply > to all so that others can help you too :-) > > [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/ > event_time.html#watermarks-in-parallel-streams > [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/ > event_timestamps_watermarks.html#timestamps-per-kafka-partition > > On Sat, Mar 3, 2018 at 3:03 PM, sundy <543950...@qq.com> wrote: > >> >> Hi Hequn Cheng, >> >> Thanks for you advice, I think I found the problem. But I didn't know why. >> >> Firstly, let me introduce my operations, I run the code in IDEA with >> MinFlinkCluster, and then I set the env parallelism to 1, and the kafka >> topic ‘stream_test' has one partition. Then send 10000 records with >> current timestamp the kafka , format is ‘4281_783_1520059217832’ , the >> last field is the evenTime. >> >> I add this debug code to print the thread-id. >> >> <PastedGraphic-1.png> >> >> >> The result is it will print in 4 threads int each period duration(3 >> seconds).Such as >> >> 50:water mark:0 >> 52:water mark:0 >> 51:water mark:1520056427871 >> 49:water mark:0 >> >> So this results to the watermark 0. But why it happened in 1 parallelism? >> Maybe it was caused by the keyBy operation? I am new to Flink, I hope >> to know how to set the watermark in the right way. >> >> >> >> >> >> On 3 Mar 2018, at 13:52, Hequn Cheng <chenghe...@gmail.com> wrote: >> >> Hi sundy, >> >> 1. Some partition of your input kafka don't have data. Since window >> watermark is the min value of all it's inputs, if there are no data from >> one of it's inputs, window will never be triggered. You can set >> parallelism of your job to 1 to avoid this problem(PS: Maybe this bug is >> fixed now, but worth a try). >> 2. Only one record in the input. In this case, window can not be >> triggered either. You might think of it like the time has be stopped. To >> trigger the widow, you should read more data with watermark bigger than the >> window end. >> >> Hope it helps you. >> Best, Hequn >> >> 2018-03-03 13:06 GMT+08:00 sundy <543950...@qq.com>: >> >>> Hi, thanks for your reply. >>> >>> I have searched it in stackoverflow, and there is someone who has the >>> some problem. >>> >>> https://stackoverflow.com/questions/40993753/flink-streaming >>> -program-runs-correctly-with-processing-time-but-will-not-produc >>> >>> >>> From your advice, I tried the code. >>> >>> env.getConfig().setAutoWatermarkInterval(3 * 1000); >>> >>> And it calls the getCurrentWaterMark function each 3 seconds, but still >>> no result come out. >>> From the outputs ('water mark1520049229163'), I could see that the add >>> method is called. But the no result from the sink function. >>> >>> >>> >>> >>> On 3 Mar 2018, at 12:47, Xingcan Cui <xingc...@gmail.com> wrote: >>> >>> Hi, >>> >>> for periodically generated watermarks, you should use >>> `ExecutionConfig.setAutoWatermarkInterval()` to set an interval. >>> >>> Hope that helps. >>> >>> Best, >>> Xingcan >>> >>> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com> wrote: >>> >>> >>> >>> Hi, I got a problem in Flink and need your help. >>> >>> I tried to use TimeCharacteristic.EvenTime, but the sink function never >>> be executed. >>> >>> public class StreamingJob { >>> public static void main(String[] args) throws Exception { >>> // set up the streaming execution environment >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> ObjectMapper jsonMapper = new ObjectMapper(); >>> >>> Properties properties = new Properties(); >>> // String brokers = "172.27.138.8:9092"; >>> String brokers = "localhost:9092"; >>> properties.setProperty("bootstrap.servers", brokers); >>> properties.setProperty("group.id", "test_fink"); >>> String topic = "stream_test"; >>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> FlinkKafkaConsumer010<BitRate> myConsumer = >>> new FlinkKafkaConsumer010(topic, new >>> BitRate.BitRateDeserializtionSchema(), properties); >>> >>> DataStream<BitRate> stream = env.addSource(myConsumer) >>> .assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); >>> DataStream<BitRate> >>> reduceItems = >>> stream >>> .keyBy(a -> a.gameId) >>> .timeWindow(Time.seconds(10)) >>> .reduce((a, b) -> a.add(b)); >>> >>> reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", >>> (tuple) -> { >>> try { >>> tuple.end(); >>> System.out.println(tuple.rate + "\t" + tuple.user); >>> return jsonMapper.writeValueAsBytes(tuple); >>> } catch (JsonProcessingException e) { >>> e.printStackTrace(); >>> return "".getBytes(); >>> } >>> })); >>> >>> env.execute("Flink Streaming Java API Skeleton"); >>> } >>> >>> } >>> >>> >>> >>> Here is the CustomWatermarkEmitter. I tried to increase the lag number, >>> but not worked. >>> >>> public class CustomWatermarkEmitter implements >>> AssignerWithPeriodicWatermarks<BitRate> { >>> >>> private long currentMax = 0; >>> private long lag = 3600 * 1000; //not worked ,even though the lag is very >>> big >>> >>> @Nullable >>> @Override >>> public Watermark getCurrentWatermark() { >>> long atLeastTime = currentMax - lag; >>> System.out.println("water mark" + atLeastTime); >>> return new Watermark(atLeastTime < 0 ? 0 : atLeastTime); >>> } >>> >>> @Override >>> public long extractTimestamp(BitRate bitRate, long l) { >>> currentMax = Long.max(bitRate.eventTime, currentMax); >>> return bitRate.eventTime; >>> } >>> } >>> >>> >>> >>> Here is the entity BitRate, the logs are generated in time , sample log >>> `4281_783_1520047769115` >>> >>> >>> public BitRate(long eventTime, long gameId, long rate, long user) { >>> this.eventTime = eventTime; >>> >>> this.gameId = gameId; >>> this.rate = rate; >>> this.user = user; >>> this.startTs = System.currentTimeMillis(); >>> this.type = 0; >>> } >>> >>> public void end() { >>> this.endTs = System.currentTimeMillis(); >>> } >>> >>> public BitRate add(BitRate b) { >>> System.out.println("Add:" + b.rate); >>> this.rate += b.rate; >>> this.user += b.user; >>> return this; >>> } >>> >>> >>> >>> >>> >> >> > >