It seems that eventTime is a static field in TopicPojo and the key selector 
also just gets the static field via TopicPojo.getEventTime(). Why is that? 
Because with this the event time basically has nothing to do with the data.

> On 5. May 2017, at 10:32, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote:
> 
> I tried the timestamp field as a string datatype as well as a Date object. 
> Getting same error in both the cases;
> 
> Please find the POJO file:
> 
> import java.text.DateFormat;
> 
> import java.text.ParseException;
> 
> import java.text.SimpleDateFormat;
> 
> import java.util.Date;
> 
> import java.util.HashMap;
> 
> import java.util.List;
> 
> import java.util.Map;
> 
> import com.fasterxml.jackson.annotation.JsonAnyGetter;
> 
> import com.fasterxml.jackson.annotation.JsonAnySetter;
> 
> import com.fasterxml.jackson.annotation.JsonFormat;
> 
> import com.fasterxml.jackson.annotation.JsonIgnore;
> 
> import com.fasterxml.jackson.annotation.JsonInclude;
> 
> import com.fasterxml.jackson.annotation.JsonProperty;
> 
> import com.fasterxml.jackson.annotation.JsonPropertyOrder;
> 
> import org.apache.commons.lang.builder.ToStringBuilder;
> 
> 
> 
> @JsonPropertyOrder({
> 
> "data",
> 
> "label",
> 
> "eventTime"
> 
> })
> 
> public class TopicPojo {
> 
> 
> 
> @JsonProperty("data")
> 
> private List<List<Double>> data = null;
> 
> @JsonProperty("label")
> 
> private List<String> label = null;
> 
> @JsonProperty("eventTime")
> 
> private  static  Date eventTime;
> 
> 
> 
> /**
> 
> * No args constructor for use in serialization
> 
> * 
> 
> */
> 
> public TopicPojo() {
> 
> }
> 
> 
> 
> /**
> 
> * 
> 
> * @param data
> 
> * @param label
> 
> * @param eventTime
> 
> */
> 
> public SammonsPojo(List<List<Double>> data, List<String> label, Date 
> eventTime) {
> 
> super();
> 
> this.data = data;
> 
> this.label = label;
> 
> this.eventTime = eventTime;
> 
> }
> 
> 
> 
> @JsonProperty("data")
> 
> public List<List<Double>> getData() {
> 
> return data;
> 
> }
> 
> 
> 
> @JsonProperty("data")
> 
> public void setData(List<List<Double>> data) {
> 
> this.data = data;
> 
> }
> 
> 
> 
> @JsonProperty("label")
> 
> public List<String> getLabel() {
> 
> return label;
> 
> }
> 
> 
> 
> @JsonProperty("label")
> 
> public void setLabel(List<String> label) {
> 
> this.label = label;
> 
> }
> 
> 
> 
> @JsonProperty("eventTime")
> 
> public static Date getEventTime() {
> 
> return eventTime;
> 
> }
> 
> 
> 
> @JsonProperty("eventTime")
> 
> public void setEventTime(Date eventTime) {
> 
> this.eventTime = eventTime;
> 
> }
> 
> 
> 
> @Override
> 
> public String toString() {
> 
> return ToStringBuilder.reflectionToString(this);
> 
> }
> 
> 
> 
> }
> 
> 
> 
> The above code pertains to eventTime as Date object , tried them as String as 
> well.
> 
> Regards,
> 
> Vijay Raajaa G S
> 
> 
> On Fri, May 5, 2017 at 1:59 PM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> What’s the KeySelector you’re using? To me, this indicates that the timestamp 
> field is somehow changing after the original keying or in transit.
> 
> Best.
> Aljoscha
>> On 4. May 2017, at 22:01, G.S.Vijay Raajaa <gsvijayraa...@gmail.com 
>> <mailto:gsvijayraa...@gmail.com>> wrote:
>> 
>> I tried to reorder and the window function works fine. but then after 
>> processing few stream of data from Topic A and Topic B, the window function 
>> seem to throw the below error. The keyby is on eventTime field.
>> 
>> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
>> 
>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>> 
>> at 
>> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>> 
>> at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>> 
>> at org.apache.flink.streaming.runtime.io 
>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>> 
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>> 
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>> 
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>> 
>> at java.lang.Thread.run(Thread.java:745)
>> 
>> 
>> 
>> Regards,
>> 
>> Vijay Raajaa GS 
>> 
>> 
>> On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <gsvijayraa...@gmail.com 
>> <mailto:gsvijayraa...@gmail.com>> wrote:
>> Thanks for your input, will try to incorporate them in my implementation.
>> 
>> Regards,
>> Vijay Raajaa G S
>> 
>> On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> The approach could work, but if it can happen that an event from stream A is 
>> not matched by an event in stream B you will have lingering state that never 
>> goes away. For such cases it might be better to write a custom 
>> CoProcessFunction as sketched here: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>.
>> 
>> The idea is to keep events from each side in state and emit a result when 
>> you get the event from the other side. You also set a cleanup timer in case 
>> no other event arrives to make sure that state eventually goes away.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com 
>>> <mailto:gsvijayraa...@gmail.com>> wrote:
>>> 
>>> Sure. Thanks for the pointer, let me reorder the same. Any comments about 
>>> the approach followed for merging topics and creating a single JSON?
>>> 
>>> Regards,
>>> Vijay Raajaa G S
>>> 
>>> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org 
>>> <mailto:aljos...@apache.org>> wrote:
>>> Hi,
>>> An AllWindow operator requires an AllWindowFunction, instead of a 
>>> WindowFunction. In your case, the keyBy() seems to be in the wrong place, 
>>> to get a keyed window you have to write something akin to:
>>> 
>>> inputStream
>>>   .keyBy(…)
>>>   .window(…)
>>>   .apply(…) // or reduce()
>>> 
>>> In your case, you key the stream and then the keying is “lost” again 
>>> because you apply a flatMap(). That’s why you have an all-window and not a 
>>> keyed window.
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com 
>>>> <mailto:gsvijayraa...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I am trying to combine two kafka topics using the a single kafka consumer 
>>>> on a list of topics, further convert the json string in the stream to 
>>>> POJO. Then, join them via keyBy ( On event time field ) and to merge them 
>>>> as a single fat json, I was planning to use a window stream and apply a 
>>>> window function on the window stream. The assumption is that Topic-A & 
>>>> Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , 
>>>> Topic B (JSON ) will be present with the same eventTime. Hence was 
>>>> planning to use a coutWindow(2) post keyBy on eventTime.
>>>> 
>>>> I have couple of questions for the same;
>>>> 
>>>> 1. Is the approach fine for merging topics and creating a single JSON?
>>>> 2. The window function on All Window stream doesnt seem to work fine; Any 
>>>> pointers will be greatly appreciated.
>>>> 
>>>> Code Snippet : 
>>>> StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> 
>>>> logger.info <http://logger.info/>("Flink Stream Window Charger has 
>>>> started");
>>>> 
>>>> Properties properties = new Properties();
>>>> 
>>>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030 
>>>> <http://127.0.0.1:1030/>");
>>>> 
>>>> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka 
>>>> <http://127.0.0.1:2181/service-kafka>");
>>>> 
>>>> properties.setProperty("group.id <http://group.id/>", "group-0011");
>>>> 
>>>> properties.setProperty("auto.offset.reset", "smallest");
>>>> 
>>>> 
>>>> 
>>>> List < String > names = new ArrayList < > ();
>>>> 
>>>> 
>>>> 
>>>> names.add("Topic-A");
>>>> 
>>>> names.add("Topic-B");
>>>> 
>>>> 
>>>> 
>>>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > 
>>>> (names, new SimpleStringSchema(), properties));
>>>> 
>>>> DataStream < TopicPojo > pojo = stream.map(new 
>>>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
>>>> 
>>>> List < String > where = new ArrayList < String > ();
>>>> 
>>>> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new 
>>>> Tokenizer()).countWindowAll(2);
>>>> 
>>>> DataStream < String > data_charging = data_window.apply(new 
>>>> MyWindowFunction());
>>>> 
>>>> data_charging.addSink(new SinkFunction < String > () {
>>>> 
>>>> 
>>>> 
>>>> public void invoke(String value) throws Exception {
>>>> 
>>>> 
>>>> 
>>>>   // Yet to be implemented - Merge two POJO into one 
>>>> 
>>>>  }
>>>> 
>>>> });
>>>> 
>>>> 
>>>> 
>>>> try
>>>> 
>>>> {
>>>> 
>>>>  env.execute();
>>>> 
>>>> } catch (Exception e)
>>>> 
>>>> {
>>>> 
>>>>  return;
>>>> 
>>>> }
>>>> 
>>>> }
>>>> 
>>>> }
>>>> 
>>>> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>>>> 
>>>>  private static final long serialVersionUID = 1 L;
>>>> 
>>>>  @Override
>>>> 
>>>>  public void flatMap(TopicPojo value, Collector < String > out) throws 
>>>> Exception {
>>>> 
>>>>   ObjectMapper mapper = new ObjectMapper();
>>>> 
>>>>   out.collect(mapper.writeValueAsString(value));
>>>> 
>>>>  }
>>>> 
>>>> }
>>>> 
>>>> class MyWindowFunction implements WindowFunction < TopicPojo, String, 
>>>> String, GlobalWindow > {
>>>> 
>>>>  @Override
>>>> 
>>>>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo > 
>>>> arg2, Collector < String > out)
>>>> 
>>>>  throws Exception {
>>>> 
>>>>   int count = 0;
>>>> 
>>>>   for (TopicPojo in : arg2) {
>>>> 
>>>>    count++;
>>>> 
>>>>   }
>>>> 
>>>>   // Test Result - TO be modified
>>>> 
>>>>   out.collect("Window: " + window + "count: " + count);
>>>> 
>>>> 
>>>> 
>>>>  }
>>>> 
>>>> }
>>>> 
>>>> class Deserializer implements MapFunction < String, TopicPojo > {
>>>> 
>>>>  private static final long serialVersionUID = 1 L;
>>>> 
>>>>  @Override
>>>> 
>>>>  public TopicPojo map(String value) throws IOException {
>>>> 
>>>>   // TODO Auto-generated method stub
>>>> 
>>>>   ObjectMapper mapper = new ObjectMapper();
>>>> 
>>>>   TopicPojo obj = null;
>>>> 
>>>>   try {
>>>> 
>>>> 
>>>> 
>>>>    System.out.println(value);
>>>> 
>>>> 
>>>> 
>>>>    obj = mapper.readValue(value, TopicPojo.class);
>>>> 
>>>> 
>>>> 
>>>>   } catch (JsonParseException e) {
>>>> 
>>>> 
>>>> 
>>>>    // TODO Auto-generated catch block
>>>> 
>>>> 
>>>> 
>>>>    throw new IOException("Failed to deserialize JSON object.");
>>>> 
>>>> 
>>>> 
>>>>   } catch (JsonMappingException e) {
>>>> 
>>>> 
>>>> 
>>>>    // TODO Auto-generated catch block
>>>> 
>>>> 
>>>> 
>>>>    throw new IOException("Failed to deserialize JSON object.");
>>>> 
>>>>   } catch (IOException e) {
>>>> 
>>>> 
>>>> 
>>>>    // TODO Auto-generated catch block
>>>> 
>>>> 
>>>> 
>>>>    throw new IOException("Failed to deserialize JSON object.");
>>>> 
>>>>   }
>>>> 
>>>>   return obj;
>>>> 
>>>>  }
>>>> 
>>>> }
>>>> 
>>>> 
>>>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) 
>>>> in the type AllWindowedStream<String,GlobalWindow> is not applicable for 
>>>> the arguments (MyWindowFunction) error.
>>>> 
>>>> Kindly give your input.
>>>> 
>>>> Regards,
>>>> Vijay Raajaa GS 
>>>> 
>>> 
>>> 
>> 
>> 
>> 
> 
> 

Reply via email to