Sure. Thanks for the pointer, let me reorder the same. Any comments about
the approach followed for merging topics and creating a single JSON?

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> An AllWindow operator requires an AllWindowFunction, instead of a
> WindowFunction. In your case, the keyBy() seems to be in the wrong place,
> to get a keyed window you have to write something akin to:
>
> inputStream
>   .keyBy(…)
>   .window(…)
>   .apply(…) // or reduce()
>
> In your case, you key the stream and then the keying is “lost” again
> because you apply a flatMap(). That’s why you have an all-window and not a
> keyed window.
>
> Best,
> Aljoscha
>
> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com>
> wrote:
>
> Hi,
>
> I am trying to combine two kafka topics using the a single kafka consumer
> on a list of topics, further convert the json string in the stream to POJO.
> Then, join them via keyBy ( On event time field ) and to merge them as a
> single fat json, I was planning to use a window stream and apply a window
> function on the window stream. The assumption is that Topic-A & Topic-B can
> be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B
> (JSON ) will be present with the same eventTime. Hence was planning to use
> a coutWindow(2) post keyBy on eventTime.
>
> I have couple of questions for the same;
>
> 1. Is the approach fine for merging topics and creating a single JSON?
> 2. The window function on All Window stream doesnt seem to work fine; Any
> pointers will be greatly appreciated.
>
> Code Snippet :
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> logger.info("Flink Stream Window Charger has started");
>
> Properties properties = new Properties();
>
> properties.setProperty("bootstrap.servers", "127.0.0.1:1030");
>
> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka"
> );
>
> properties.setProperty("group.id", "group-0011");
>
> properties.setProperty("auto.offset.reset", "smallest");
>
>
> List < String > names = new ArrayList < > ();
>
>
> names.add("Topic-A");
>
> names.add("Topic-B");
>
>
> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < >
> (names, new SimpleStringSchema(), properties));
>
> DataStream < TopicPojo > pojo = stream.map(new 
> Deserializer()).keyBy((eventTime)
> -> TopicPojo.getEventTime());
>
> List < String > where = new ArrayList < String > ();
>
> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new
> Tokenizer()).countWindowAll(2);
>
> DataStream < String > data_charging = data_window.apply(new
> MyWindowFunction());
>
> data_charging.addSink(new SinkFunction < String > () {
>
>
> public void invoke(String value) throws Exception {
>
>
>   // Yet to be implemented - Merge two POJO into one
>
>  }
>
> });
>
>
> try
>
> {
>
>  env.execute();
>
> } catch (Exception e)
>
> {
>
>  return;
>
> }
>
> }
>
> }
>
> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>
>  private static final long serialVersionUID = 1 L;
>
>  @Override
>
>  public void flatMap(TopicPojo value, Collector < String > out) throws
> Exception {
>
>   ObjectMapper mapper = new ObjectMapper();
>
>   out.collect(mapper.writeValueAsString(value));
>
>  }
>
> }
>
> class MyWindowFunction implements WindowFunction < TopicPojo, String,
> String, GlobalWindow > {
>
>  @Override
>
>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo >
> arg2, Collector < String > out)
>
>  throws Exception {
>
>   int count = 0;
>
>   for (TopicPojo in : arg2) {
>
>    count++;
>
>   }
>
>   // Test Result - TO be modified
>
>   out.collect("Window: " + window + "count: " + count);
>
>
>  }
>
> }
>
> class Deserializer implements MapFunction < String, TopicPojo > {
>
>  private static final long serialVersionUID = 1 L;
>
>  @Override
>
>  public TopicPojo map(String value) throws IOException {
>
>   // TODO Auto-generated method stub
>
>   ObjectMapper mapper = new ObjectMapper();
>
>   TopicPojo obj = null;
>
>   try {
>
>
>    System.out.println(value);
>
>
>    obj = mapper.readValue(value, TopicPojo.class);
>
>
>   } catch (JsonParseException e) {
>
>
>    // TODO Auto-generated catch block
>
>
>    throw new IOException("Failed to deserialize JSON object.");
>
>
>   } catch (JsonMappingException e) {
>
>
>    // TODO Auto-generated catch block
>
>
>    throw new IOException("Failed to deserialize JSON object.");
>
>   } catch (IOException e) {
>
>
>    // TODO Auto-generated catch block
>
>
>    throw new IOException("Failed to deserialize JSON object.");
>
>   }
>
>   return obj;
>
>  }
>
> }
>
> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>)
> in the type AllWindowedStream<String,GlobalWindow> is not applicable for
> the arguments (MyWindowFunction) error.
>
> Kindly give your input.
>
> Regards,
> Vijay Raajaa GS
>
>
>

Reply via email to