Hi, I am trying to join two Kafka Data Streams from and output to another Kafka topic, however my joined stream does not output any data. After some time, my program crashes and runs out of memory, which I think is a result of the join not working. My code doesn't throw any errors, but the joins don't produce any output. My join logic is below, please suggest possible solutions.P.S: Things I have tried so far:
1. Increased task slots on the task manager 2. Added Watermarks to my Kafka sources DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream) .where(new KeySelector<GenericRecord, String>() { @Override public String getKey(GenericRecord value) throws Exception { return value.get("la_id").toString(); } }).equalTo(new KeySelector<GenericRecord, String>() { @Override public String getKey(GenericRecord value) throws Exception { return value.get("id").toString(); } }).window(TumblingEventTimeWindows.of(Time.seconds(30))) .apply(new JoinFunction<GenericRecord, GenericRecord, Enhanced>() { @Override public Enhanced join(GenericRecord first, GenericRecord second) throws Exception { return new Enhanced( Long.parseLong(first.get("c_at").toString()), first.get("c_type").toString(), first.get("id").toString(), Integer.parseInt(first.get("d_cts").toString()), Integer.parseInt(first.get("c_cts").toString()), second.get("prov").toString(), second.get("bb_S_T").toString(), second.get("p_id").toString(), second.get("s_ccurr").toString() ); } });