Hi sundy,

It is strange that your configuration does not take effect. Do you set
parallelism somewhere else? Maybe, you can refer to the kafka test case[1].
In this test case, line 229 set parallelism to 1 and works fine.
Hope it helps you.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java

On Sat, Mar 3, 2018 at 4:02 PM, sundy <543950...@qq.com> wrote:

> Hi Hequn Cheng,
>
> Finally I got the problem and find the way to define the correct WaterMark
> by your advice, thank you very much.
>
> The problem is that I set the watermark to the
>
>     waterMark =  maxEventTime - lag
>
> And the timeWindow is 10Seconds,  But I generated the test records too
> quickly so the 10000 records are all in the window duration(my bad).
> So flink are waiting for new more numbers to close the window.
>
> Another one question is why I set   'env.setParallelism(1)’ and run the
> code in IDEA(mini Flink cluster) , but the getWatermark is called in 4
> different threads?
> Which time is  the getWaterMark function called? After the keyBy operation
> or after the source operation?
>
>
>
> On 3 Mar 2018, at 15:28, Hequn Cheng <chenghe...@gmail.com> wrote:
>
> Hi sundy,
>
> The default parallelism is 4. It seems that your configuration does not
> take effect. You can try 'env.setParallelism(1)' to set the job
> parallelism.
> For watermark, you can refer to [1] and [2].
>
> PS: Others can not see your reply because you only reply to me. Try reply
> to all so that others can help you too :-)
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/
> event_time.html#watermarks-in-parallel-streams
> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/
> event_timestamps_watermarks.html#timestamps-per-kafka-partition
>
> On Sat, Mar 3, 2018 at 3:03 PM, sundy <543950...@qq.com> wrote:
>
>>
>> Hi Hequn Cheng,
>>
>> Thanks for you advice, I think I found the problem. But I didn't know why.
>>
>> Firstly, let me introduce my operations,  I run the code in IDEA with
>> MinFlinkCluster,  and then I set the env parallelism to 1, and the kafka
>> topic ‘stream_test' has  one partition. Then send 10000 records with
>> current timestamp  the kafka , format is  ‘4281_783_1520059217832’ , the
>> last field is the evenTime.
>>
>> I add this debug code to print the thread-id.
>>
>> <PastedGraphic-1.png>
>>
>>
>> The result is it will print in 4 threads int each period duration(3
>> seconds).Such as
>>
>> 50:water mark:0
>> 52:water mark:0
>> 51:water mark:1520056427871
>> 49:water mark:0
>>
>> So this results to the watermark 0. But why it happened in 1 parallelism?
>> Maybe it was caused by the keyBy operation?  I am new to Flink,  I hope
>> to know how to set the watermark in the right way.
>>
>>
>>
>>
>>
>> On 3 Mar 2018, at 13:52, Hequn Cheng <chenghe...@gmail.com> wrote:
>>
>> Hi sundy,
>>
>> 1. Some partition of your input kafka don't have data. Since window
>> watermark is the min value of all it's inputs, if there are no data from
>> one of it's inputs, window will never be triggered. You can set
>> parallelism of your job to 1 to avoid this problem(PS: Maybe this bug is
>> fixed now, but worth a try).
>> 2. Only one record in the input. In this case, window can not be
>> triggered either. You might think of it like the time has be stopped. To
>> trigger the widow, you should read more data with watermark bigger than the
>> window end.
>>
>> Hope it helps you.
>> Best, Hequn
>>
>> 2018-03-03 13:06 GMT+08:00 sundy <543950...@qq.com>:
>>
>>> Hi, thanks for your reply.
>>>
>>> I have searched it in stackoverflow, and there is someone who has the
>>> some problem.
>>>
>>> https://stackoverflow.com/questions/40993753/flink-streaming
>>> -program-runs-correctly-with-processing-time-but-will-not-produc
>>>
>>>
>>> From your advice, I tried the code.
>>>
>>>  env.getConfig().setAutoWatermarkInterval(3 * 1000);
>>>
>>> And it calls the getCurrentWaterMark function each 3 seconds,  but still
>>> no result come out.
>>> From the outputs   ('water mark1520049229163'), I could see that the add
>>> method is called. But the no result from the sink function.
>>>
>>>
>>>
>>>
>>> On 3 Mar 2018, at 12:47, Xingcan Cui <xingc...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> for periodically generated watermarks, you should use
>>> `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.
>>>
>>> Hope that helps.
>>>
>>> Best,
>>> Xingcan
>>>
>>> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com> wrote:
>>>
>>>
>>>
>>> Hi, I got a problem in Flink  and need your help.
>>>
>>> I tried to use TimeCharacteristic.EvenTime, but the sink function never
>>> be executed.
>>>
>>> public class StreamingJob {
>>>   public static void main(String[] args) throws Exception {
>>>     // set up the streaming execution environment
>>>     final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     ObjectMapper jsonMapper = new ObjectMapper();
>>>
>>>     Properties properties = new Properties();
>>> //    String brokers = "172.27.138.8:9092";
>>>     String brokers = "localhost:9092";
>>>     properties.setProperty("bootstrap.servers", brokers);
>>>     properties.setProperty("group.id", "test_fink");
>>>     String topic = "stream_test";
>>>
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>     FlinkKafkaConsumer010<BitRate> myConsumer =
>>>         new FlinkKafkaConsumer010(topic, new 
>>> BitRate.BitRateDeserializtionSchema(), properties);
>>>
>>>     DataStream<BitRate> stream = env.addSource(myConsumer)
>>>         .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
>>>     DataStream<BitRate>
>>>         reduceItems =
>>>         stream
>>>             .keyBy(a -> a.gameId)
>>>             .timeWindow(Time.seconds(10))
>>>             .reduce((a, b) -> a.add(b));
>>>
>>>     reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
>>> (tuple) -> {
>>>       try {
>>>         tuple.end();
>>>         System.out.println(tuple.rate + "\t" + tuple.user);
>>>         return jsonMapper.writeValueAsBytes(tuple);
>>>       } catch (JsonProcessingException e) {
>>>         e.printStackTrace();
>>>         return "".getBytes();
>>>       }
>>>     }));
>>>
>>>     env.execute("Flink Streaming Java API Skeleton");
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> Here is the CustomWatermarkEmitter. I tried to increase the lag number,
>>>  but not worked.
>>>
>>> public class CustomWatermarkEmitter implements 
>>> AssignerWithPeriodicWatermarks<BitRate> {
>>>
>>>   private long currentMax = 0;
>>>   private long lag = 3600 * 1000; //not worked ,even though the lag is very 
>>> big
>>>
>>>   @Nullable
>>>   @Override
>>>   public Watermark getCurrentWatermark() {
>>>     long atLeastTime = currentMax - lag;
>>>     System.out.println("water mark" + atLeastTime);
>>>     return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>>>   }
>>>
>>>   @Override
>>>   public long extractTimestamp(BitRate bitRate, long l) {
>>>     currentMax = Long.max(bitRate.eventTime, currentMax);
>>>     return bitRate.eventTime;
>>>   }
>>> }
>>>
>>>
>>>
>>> Here is the entity BitRate, the logs are generated in time , sample log
>>>   `4281_783_1520047769115`
>>>
>>>
>>> public BitRate(long eventTime, long gameId, long rate, long user) {
>>>   this.eventTime = eventTime;
>>>
>>>   this.gameId = gameId;
>>>   this.rate = rate;
>>>   this.user = user;
>>>   this.startTs = System.currentTimeMillis();
>>>   this.type = 0;
>>> }
>>>
>>> public void end() {
>>>   this.endTs = System.currentTimeMillis();
>>> }
>>>
>>> public BitRate add(BitRate b) {
>>>   System.out.println("Add:" + b.rate);
>>>   this.rate += b.rate;
>>>   this.user += b.user;
>>>   return this;
>>> }
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>

Reply via email to