The approach could work, but if it can happen that an event from stream A is not matched by an event in stream B you will have lingering state that never goes away. For such cases it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>.
The idea is to keep events from each side in state and emit a result when you get the event from the other side. You also set a cleanup timer in case no other event arrives to make sure that state eventually goes away. Best, Aljoscha > On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote: > > Sure. Thanks for the pointer, let me reorder the same. Any comments about the > approach followed for merging topics and creating a single JSON? > > Regards, > Vijay Raajaa G S > > On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Hi, > An AllWindow operator requires an AllWindowFunction, instead of a > WindowFunction. In your case, the keyBy() seems to be in the wrong place, to > get a keyed window you have to write something akin to: > > inputStream > .keyBy(…) > .window(…) > .apply(…) // or reduce() > > In your case, you key the stream and then the keying is “lost” again because > you apply a flatMap(). That’s why you have an all-window and not a keyed > window. > > Best, > Aljoscha > >> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com >> <mailto:gsvijayraa...@gmail.com>> wrote: >> >> Hi, >> >> I am trying to combine two kafka topics using the a single kafka consumer on >> a list of topics, further convert the json string in the stream to POJO. >> Then, join them via keyBy ( On event time field ) and to merge them as a >> single fat json, I was planning to use a window stream and apply a window >> function on the window stream. The assumption is that Topic-A & Topic-B can >> be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON >> ) will be present with the same eventTime. Hence was planning to use a >> coutWindow(2) post keyBy on eventTime. >> >> I have couple of questions for the same; >> >> 1. Is the approach fine for merging topics and creating a single JSON? >> 2. The window function on All Window stream doesnt seem to work fine; Any >> pointers will be greatly appreciated. >> >> Code Snippet : >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> logger.info <http://logger.info/>("Flink Stream Window Charger has started"); >> >> Properties properties = new Properties(); >> >> properties.setProperty("bootstrap.servers", "127.0.0.1:1030 >> <http://127.0.0.1:1030/>"); >> >> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka >> <http://127.0.0.1:2181/service-kafka>"); >> >> properties.setProperty("group.id <http://group.id/>", "group-0011"); >> >> properties.setProperty("auto.offset.reset", "smallest"); >> >> >> >> List < String > names = new ArrayList < > (); >> >> >> >> names.add("Topic-A"); >> >> names.add("Topic-B"); >> >> >> >> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > >> (names, new SimpleStringSchema(), properties)); >> >> DataStream < TopicPojo > pojo = stream.map(new >> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); >> >> List < String > where = new ArrayList < String > (); >> >> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new >> Tokenizer()).countWindowAll(2); >> >> DataStream < String > data_charging = data_window.apply(new >> MyWindowFunction()); >> >> data_charging.addSink(new SinkFunction < String > () { >> >> >> >> public void invoke(String value) throws Exception { >> >> >> >> // Yet to be implemented - Merge two POJO into one >> >> } >> >> }); >> >> >> >> try >> >> { >> >> env.execute(); >> >> } catch (Exception e) >> >> { >> >> return; >> >> } >> >> } >> >> } >> >> class Tokenizer implements FlatMapFunction < TopicPojo, String > { >> >> private static final long serialVersionUID = 1 L; >> >> @Override >> >> public void flatMap(TopicPojo value, Collector < String > out) throws >> Exception { >> >> ObjectMapper mapper = new ObjectMapper(); >> >> out.collect(mapper.writeValueAsString(value)); >> >> } >> >> } >> >> class MyWindowFunction implements WindowFunction < TopicPojo, String, >> String, GlobalWindow > { >> >> @Override >> >> public void apply(String key, GlobalWindow window, Iterable < TopicPojo > >> arg2, Collector < String > out) >> >> throws Exception { >> >> int count = 0; >> >> for (TopicPojo in : arg2) { >> >> count++; >> >> } >> >> // Test Result - TO be modified >> >> out.collect("Window: " + window + "count: " + count); >> >> >> >> } >> >> } >> >> class Deserializer implements MapFunction < String, TopicPojo > { >> >> private static final long serialVersionUID = 1 L; >> >> @Override >> >> public TopicPojo map(String value) throws IOException { >> >> // TODO Auto-generated method stub >> >> ObjectMapper mapper = new ObjectMapper(); >> >> TopicPojo obj = null; >> >> try { >> >> >> >> System.out.println(value); >> >> >> >> obj = mapper.readValue(value, TopicPojo.class); >> >> >> >> } catch (JsonParseException e) { >> >> >> >> // TODO Auto-generated catch block >> >> >> >> throw new IOException("Failed to deserialize JSON object."); >> >> >> >> } catch (JsonMappingException e) { >> >> >> >> // TODO Auto-generated catch block >> >> >> >> throw new IOException("Failed to deserialize JSON object."); >> >> } catch (IOException e) { >> >> >> >> // TODO Auto-generated catch block >> >> >> >> throw new IOException("Failed to deserialize JSON object."); >> >> } >> >> return obj; >> >> } >> >> } >> >> >> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in >> the type AllWindowedStream<String,GlobalWindow> is not applicable for the >> arguments (MyWindowFunction) error. >> >> Kindly give your input. >> >> Regards, >> Vijay Raajaa GS >> > >