Hi,

I am trying to combine two kafka topics using the a single kafka consumer
on a list of topics, further convert the json string in the stream to POJO.
Then, join them via keyBy ( On event time field ) and to merge them as a
single fat json, I was planning to use a window stream and apply a window
function on the window stream. The assumption is that Topic-A & Topic-B can
be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B
(JSON ) will be present with the same eventTime. Hence was planning to use
a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any
pointers will be greatly appreciated.

Code Snippet :

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < >
(names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new
Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new
Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new
MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws
Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String,
String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo >
arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}

I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>)
in the type AllWindowedStream<String,GlobalWindow> is not applicable for
the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS

Reply via email to