The approach could work, but if it can happen that an event from stream A is 
not matched by an event in stream B you will have lingering state that never 
goes away. For such cases it might be better to write a custom 
CoProcessFunction as sketched here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>.

The idea is to keep events from each side in state and emit a result when you 
get the event from the other side. You also set a cleanup timer in case no 
other event arrives to make sure that state eventually goes away.

Best,
Aljoscha
> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote:
> 
> Sure. Thanks for the pointer, let me reorder the same. Any comments about the 
> approach followed for merging topics and creating a single JSON?
> 
> Regards,
> Vijay Raajaa G S
> 
> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi,
> An AllWindow operator requires an AllWindowFunction, instead of a 
> WindowFunction. In your case, the keyBy() seems to be in the wrong place, to 
> get a keyed window you have to write something akin to:
> 
> inputStream
>   .keyBy(…)
>   .window(…)
>   .apply(…) // or reduce()
> 
> In your case, you key the stream and then the keying is “lost” again because 
> you apply a flatMap(). That’s why you have an all-window and not a keyed 
> window.
> 
> Best,
> Aljoscha
> 
>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com 
>> <mailto:gsvijayraa...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I am trying to combine two kafka topics using the a single kafka consumer on 
>> a list of topics, further convert the json string in the stream to POJO. 
>> Then, join them via keyBy ( On event time field ) and to merge them as a 
>> single fat json, I was planning to use a window stream and apply a window 
>> function on the window stream. The assumption is that Topic-A & Topic-B can 
>> be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON 
>> ) will be present with the same eventTime. Hence was planning to use a 
>> coutWindow(2) post keyBy on eventTime.
>> 
>> I have couple of questions for the same;
>> 
>> 1. Is the approach fine for merging topics and creating a single JSON?
>> 2. The window function on All Window stream doesnt seem to work fine; Any 
>> pointers will be greatly appreciated.
>> 
>> Code Snippet : 
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> logger.info <http://logger.info/>("Flink Stream Window Charger has started");
>> 
>> Properties properties = new Properties();
>> 
>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030 
>> <http://127.0.0.1:1030/>");
>> 
>> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka 
>> <http://127.0.0.1:2181/service-kafka>");
>> 
>> properties.setProperty("group.id <http://group.id/>", "group-0011");
>> 
>> properties.setProperty("auto.offset.reset", "smallest");
>> 
>> 
>> 
>> List < String > names = new ArrayList < > ();
>> 
>> 
>> 
>> names.add("Topic-A");
>> 
>> names.add("Topic-B");
>> 
>> 
>> 
>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > 
>> (names, new SimpleStringSchema(), properties));
>> 
>> DataStream < TopicPojo > pojo = stream.map(new 
>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
>> 
>> List < String > where = new ArrayList < String > ();
>> 
>> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new 
>> Tokenizer()).countWindowAll(2);
>> 
>> DataStream < String > data_charging = data_window.apply(new 
>> MyWindowFunction());
>> 
>> data_charging.addSink(new SinkFunction < String > () {
>> 
>> 
>> 
>> public void invoke(String value) throws Exception {
>> 
>> 
>> 
>>   // Yet to be implemented - Merge two POJO into one 
>> 
>>  }
>> 
>> });
>> 
>> 
>> 
>> try
>> 
>> {
>> 
>>  env.execute();
>> 
>> } catch (Exception e)
>> 
>> {
>> 
>>  return;
>> 
>> }
>> 
>> }
>> 
>> }
>> 
>> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>> 
>>  private static final long serialVersionUID = 1 L;
>> 
>>  @Override
>> 
>>  public void flatMap(TopicPojo value, Collector < String > out) throws 
>> Exception {
>> 
>>   ObjectMapper mapper = new ObjectMapper();
>> 
>>   out.collect(mapper.writeValueAsString(value));
>> 
>>  }
>> 
>> }
>> 
>> class MyWindowFunction implements WindowFunction < TopicPojo, String, 
>> String, GlobalWindow > {
>> 
>>  @Override
>> 
>>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo > 
>> arg2, Collector < String > out)
>> 
>>  throws Exception {
>> 
>>   int count = 0;
>> 
>>   for (TopicPojo in : arg2) {
>> 
>>    count++;
>> 
>>   }
>> 
>>   // Test Result - TO be modified
>> 
>>   out.collect("Window: " + window + "count: " + count);
>> 
>> 
>> 
>>  }
>> 
>> }
>> 
>> class Deserializer implements MapFunction < String, TopicPojo > {
>> 
>>  private static final long serialVersionUID = 1 L;
>> 
>>  @Override
>> 
>>  public TopicPojo map(String value) throws IOException {
>> 
>>   // TODO Auto-generated method stub
>> 
>>   ObjectMapper mapper = new ObjectMapper();
>> 
>>   TopicPojo obj = null;
>> 
>>   try {
>> 
>> 
>> 
>>    System.out.println(value);
>> 
>> 
>> 
>>    obj = mapper.readValue(value, TopicPojo.class);
>> 
>> 
>> 
>>   } catch (JsonParseException e) {
>> 
>> 
>> 
>>    // TODO Auto-generated catch block
>> 
>> 
>> 
>>    throw new IOException("Failed to deserialize JSON object.");
>> 
>> 
>> 
>>   } catch (JsonMappingException e) {
>> 
>> 
>> 
>>    // TODO Auto-generated catch block
>> 
>> 
>> 
>>    throw new IOException("Failed to deserialize JSON object.");
>> 
>>   } catch (IOException e) {
>> 
>> 
>> 
>>    // TODO Auto-generated catch block
>> 
>> 
>> 
>>    throw new IOException("Failed to deserialize JSON object.");
>> 
>>   }
>> 
>>   return obj;
>> 
>>  }
>> 
>> }
>> 
>> 
>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in 
>> the type AllWindowedStream<String,GlobalWindow> is not applicable for the 
>> arguments (MyWindowFunction) error.
>> 
>> Kindly give your input.
>> 
>> Regards,
>> Vijay Raajaa GS 
>> 
> 
> 

Reply via email to