Hi,
I am trying to implement a flink job which takes the twitter as the source
and collects tweets from a list of hashtags. The flink job basically
aggregates the volume of tweets per hashtag in a given time frame. I have
implemented them successfully, but then if there is no tweet across all the
hashtags I need to send out a default value of 0 across all hashtags. Not
sure how to implement this functionality.
Code Snippet :
env.addSource(source)
.flatMap(new ExtractHashTagsSymbols(tickers))
.keyBy(0)
.timeWindow(Time.seconds(Integer.parseInt(window_time)))
.sum(1)
.timeWindowAll(Time.seconds(Integer.parseInt(window_time)))
.apply(new GetVolume(tickerVolumeMap))
.addSink(new SinkFunction<JSONObject>(){
public void invoke(JSONObject value) throws Exception {
System.out.println("Twitter Volume:"+value.toString());
//JsonParser jsonParser = new JsonParser();
//JsonObject gsonObject =
(JsonObject)jsonParser.parse(value.toString());
pushToSocket(value, socket_url);
}
});
The above code waits for window_time time frame and computes the tweet
volume and sends out a json.
Regards,
Vijay Raajaa GS