Hi,
I am trying to join two Kafka Data Streams from and output to another Kafka
topic, however my joined stream does not output any data.  After some time,
my program crashes and runs out of memory, which I think is a result of the
join not working. My code doesn't throw any errors, but the joins don't
produce any output. My join logic is below, please suggest possible
solutions.P.S:
Things I have tried so far:

   1. Increased task slots on the task manager
   2. Added Watermarks to my Kafka sources

 DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
                                .where(new KeySelector<GenericRecord, String>() 
{

                                        @Override
                                        public String getKey(GenericRecord 
value) throws Exception {
                                                return 
value.get("la_id").toString();

                                        }
                                }).equalTo(new KeySelector<GenericRecord, 
String>() {

                                        @Override
                                        public String getKey(GenericRecord 
value) throws Exception {
                                                return 
value.get("id").toString();
                                        }
                                
}).window(TumblingEventTimeWindows.of(Time.seconds(30)))
                                .apply(new JoinFunction<GenericRecord, 
GenericRecord, Enhanced>() {


                                        @Override
                                        public Enhanced join(GenericRecord 
first, GenericRecord second)
throws Exception {
                                                return new Enhanced(
                                                                
Long.parseLong(first.get("c_at").toString()),
                                                                
first.get("c_type").toString(),
                                                                
first.get("id").toString(),
                                                                
Integer.parseInt(first.get("d_cts").toString()),
                                                                
Integer.parseInt(first.get("c_cts").toString()),
                                                                
second.get("prov").toString(),
                                                                
second.get("bb_S_T").toString(),
                                                                
second.get("p_id").toString(),
                                                                
second.get("s_ccurr").toString()
                                                );
                                        }
                                });

Reply via email to