You mean you want to output some data when you know that you don’t have any counts for a given time window?
This is not (easily) possible in Flink right now because this would require an operation with parallelism one that determines that there is no data across all keys. Best, Aljoscha > On 24. Jun 2017, at 18:22, G.S.Vijay Raajaa <[email protected]> wrote: > > Hi, > > I am trying to implement a flink job which takes the twitter as the source > and collects tweets from a list of hashtags. The flink job basically > aggregates the volume of tweets per hashtag in a given time frame. I have > implemented them successfully, but then if there is no tweet across all the > hashtags I need to send out a default value of 0 across all hashtags. Not > sure how to implement this functionality. > > Code Snippet : > > env.addSource(source) > > .flatMap(new ExtractHashTagsSymbols(tickers)) > > .keyBy(0) > > .timeWindow(Time.seconds(Integer.parseInt(window_time))) > > .sum(1) > > .timeWindowAll(Time.seconds(Integer.parseInt(window_time))) > > .apply(new GetVolume(tickerVolumeMap)) > > .addSink(new SinkFunction<JSONObject>(){ > > > public void invoke(JSONObject value) throws Exception { > > System.out.println("Twitter Volume:"+value.toString()); > > //JsonParser jsonParser = new JsonParser(); > > //JsonObject gsonObject = > (JsonObject)jsonParser.parse(value.toString()); > > pushToSocket(value, socket_url); > > } > > }); > > > > The above code waits for window_time time frame and computes the tweet volume > and sends out a json. > > Regards, > > Vijay Raajaa GS >
