Hi sundy,

1. Some partition of your input kafka don't have data. Since window
watermark is the min value of all it's inputs, if there are no data from
one of it's inputs, window will never be triggered. You can set parallelism
of your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but
worth a try).
2. Only one record in the input. In this case, window can not be triggered
either. You might think of it like the time has be stopped. To trigger the
widow, you should read more data with watermark bigger than the window end.

Hope it helps you.
Best, Hequn

2018-03-03 13:06 GMT+08:00 sundy <543950...@qq.com>:

> Hi, thanks for your reply.
>
> I have searched it in stackoverflow, and there is someone who has the some
> problem.
>
> https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-
> correctly-with-processing-time-but-will-not-produc
>
>
> From your advice, I tried the code.
>
>  env.getConfig().setAutoWatermarkInterval(3 * 1000);
>
> And it calls the getCurrentWaterMark function each 3 seconds,  but still
> no result come out.
> From the outputs   ('water mark1520049229163'), I could see that the add
> method is called. But the no result from the sink function.
>
>
>
>
> On 3 Mar 2018, at 12:47, Xingcan Cui <xingc...@gmail.com> wrote:
>
> Hi,
>
> for periodically generated watermarks, you should use 
> `ExecutionConfig.setAutoWatermarkInterval()`
> to set an interval.
>
> Hope that helps.
>
> Best,
> Xingcan
>
> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com> wrote:
>
>
>
> Hi, I got a problem in Flink  and need your help.
>
> I tried to use TimeCharacteristic.EvenTime, but the sink function never be
> executed.
>
> public class StreamingJob {
>   public static void main(String[] args) throws Exception {
>     // set up the streaming execution environment
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     ObjectMapper jsonMapper = new ObjectMapper();
>
>     Properties properties = new Properties();
> //    String brokers = "172.27.138.8:9092";
>     String brokers = "localhost:9092";
>     properties.setProperty("bootstrap.servers", brokers);
>     properties.setProperty("group.id", "test_fink");
>     String topic = "stream_test";
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     FlinkKafkaConsumer010<BitRate> myConsumer =
>         new FlinkKafkaConsumer010(topic, new 
> BitRate.BitRateDeserializtionSchema(), properties);
>
>     DataStream<BitRate> stream = env.addSource(myConsumer)
>         .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
>     DataStream<BitRate>
>         reduceItems =
>         stream
>             .keyBy(a -> a.gameId)
>             .timeWindow(Time.seconds(10))
>             .reduce((a, b) -> a.add(b));
>
>     reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
> (tuple) -> {
>       try {
>         tuple.end();
>         System.out.println(tuple.rate + "\t" + tuple.user);
>         return jsonMapper.writeValueAsBytes(tuple);
>       } catch (JsonProcessingException e) {
>         e.printStackTrace();
>         return "".getBytes();
>       }
>     }));
>
>     env.execute("Flink Streaming Java API Skeleton");
>   }
>
> }
>
>
>
> Here is the CustomWatermarkEmitter. I tried to increase the lag number,
>  but not worked.
>
> public class CustomWatermarkEmitter implements 
> AssignerWithPeriodicWatermarks<BitRate> {
>
>   private long currentMax = 0;
>   private long lag = 3600 * 1000; //not worked ,even though the lag is very 
> big
>
>   @Nullable
>   @Override
>   public Watermark getCurrentWatermark() {
>     long atLeastTime = currentMax - lag;
>     System.out.println("water mark" + atLeastTime);
>     return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>   }
>
>   @Override
>   public long extractTimestamp(BitRate bitRate, long l) {
>     currentMax = Long.max(bitRate.eventTime, currentMax);
>     return bitRate.eventTime;
>   }
> }
>
>
>
> Here is the entity BitRate, the logs are generated in time , sample log
> `4281_783_1520047769115`
>
>
> public BitRate(long eventTime, long gameId, long rate, long user) {
>   this.eventTime = eventTime;
>
>   this.gameId = gameId;
>   this.rate = rate;
>   this.user = user;
>   this.startTs = System.currentTimeMillis();
>   this.type = 0;
> }
>
> public void end() {
>   this.endTs = System.currentTimeMillis();
> }
>
> public BitRate add(BitRate b) {
>   System.out.println("Add:" + b.rate);
>   this.rate += b.rate;
>   this.user += b.user;
>   return this;
> }
>
>
>
>
>

Reply via email to