Hi All,

Need your inputs on below scenario:

Source : Kafka (Actual source is oracle db, data is pushed to kafka)
SDK :  Java
Runner : Flink 
Problem: Subscribe to 5 topics(tables) join with different keys, Group by based 
on few columns.
Existing solution: Using session window of 20 seconds having different 
transform for every 2 queries and using the result.

Below is the sample code:

        Sessions sessionWindow = 
Sessions.withGapDuration(Duration.standardSeconds((long) 
Integer.parseInt("20")));

  PCollection<Row> stream1 =
                PCollectionTuple
                        .of(new TupleTag<>("TABLE1"), rows1)
                        .and(new TupleTag<>("TABLE2"), rows2)
                        .apply("rows1-rows2", SqlTransform.query(
                                "select  t1.col1,t1.col2,t2.col5 from "
                                        "TABLE1 t1  join TABLE2 t2 \n" +
                                        "on t1.col5 = t2.col7 "
                                )
                        )
                        .apply("window" , Window.into(sessionWindow));

   PCollection<Row> mergedStream =
                PCollectionTuple
                        .of(new TupleTag<>("MERGE-TABLE"), stream1)
                        .apply("merge" , SqlTransform.query("select col1,col2, 
\n" +
                                "max(case when col3='D' then col8   end) as 
D_col3,\n" +
                                "max(case when col3='C' then col8   end) as 
C_col3,\n" +
                                "max(case when col6='CP' then col10   end) as 
CP_col6,\n" +
                                "max(case when col6='DP' then col10   end) as 
DP_col6\n" +
                                "from MERGE-TABLE " +
                                "group by  col1,col2\n "
                        )).apply("merge-window", Window.into(sessionWindow));

 PCollection<Row> stream2 =
                PCollectionTuple
                        .of(new TupleTag<>("TABLE3"), mergedStream)
                        .and(new TupleTag<>("TABLE4"), stream22)
                        .apply( 
                                                 SqlTransform.query("select 
distinct c1,c2,c4 from   " +
                                                                "TABLE3 d1 join 
TABLE4  d2\n" +
                                " on  d1.num= d2.tr_num  "))
                        .apply("e-window" , Window.into(sessionWindow));


Is there any better approach?

Looking forward for suggestions.
        
Thanks!!

Reply via email to