Hi All,
Need your inputs on below scenario:
Source : Kafka (Actual source is oracle db, data is pushed to kafka)
SDK : Java
Runner : Flink
Problem: Subscribe to 5 topics(tables) join with different keys, Group by based
on few columns.
Existing solution: Using session window of 20 seconds having different
transform for every 2 queries and using the result.
Below is the sample code:
Sessions sessionWindow =
Sessions.withGapDuration(Duration.standardSeconds((long)
Integer.parseInt("20")));
PCollection<Row> stream1 =
PCollectionTuple
.of(new TupleTag<>("TABLE1"), rows1)
.and(new TupleTag<>("TABLE2"), rows2)
.apply("rows1-rows2", SqlTransform.query(
"select t1.col1,t1.col2,t2.col5 from "
"TABLE1 t1 join TABLE2 t2 \n" +
"on t1.col5 = t2.col7 "
)
)
.apply("window" , Window.into(sessionWindow));
PCollection<Row> mergedStream =
PCollectionTuple
.of(new TupleTag<>("MERGE-TABLE"), stream1)
.apply("merge" , SqlTransform.query("select col1,col2,
\n" +
"max(case when col3='D' then col8 end) as
D_col3,\n" +
"max(case when col3='C' then col8 end) as
C_col3,\n" +
"max(case when col6='CP' then col10 end) as
CP_col6,\n" +
"max(case when col6='DP' then col10 end) as
DP_col6\n" +
"from MERGE-TABLE " +
"group by col1,col2\n "
)).apply("merge-window", Window.into(sessionWindow));
PCollection<Row> stream2 =
PCollectionTuple
.of(new TupleTag<>("TABLE3"), mergedStream)
.and(new TupleTag<>("TABLE4"), stream22)
.apply(
SqlTransform.query("select
distinct c1,c2,c4 from " +
"TABLE3 d1 join
TABLE4 d2\n" +
" on d1.num= d2.tr_num "))
.apply("e-window" , Window.into(sessionWindow));
Is there any better approach?
Looking forward for suggestions.
Thanks!!