Hi, We're trying to use Flink 1.11 Java tables API to process a streaming use case:
We have 2 streams, each one with different structures. Both events,
coming from Kafka, can be:
- A new event (not in the system already)
- An updated event (updating an event that previously was inserted)
so we only want to store the latest data in the Table.
We need to join the 2 previous Tables to have all this data stored in the
Flink system. We think that the best way is to store joined data as a
Table.
This is going to be a Flink Table, that will be a join of the 2 tables by a
common key.
To sum up, we have:
- Stream 1 (coming from Kafka topic) -> Flink Table 1
- Stream 2 (coming from Kafka topic) -> Flink Table 2
- Table 3 = Table 1 join Table 2
- DataStream using RetractStream of Table 3
To get the last element in Table 1 and Table 2, we are using Functions
(LastValueAggFunction):
streamTableEnvironment.registerFunction("LAST_VALUE_STRING", new
LastValueAggFunction.StringLastValueAggFunction());
...
streamTableEnvironment.fromDataStream(inputDataStream)
.groupBy($("id"))
.select(
$("id").as("o_id"),
call("LAST_VALUE_STRING", $("title")).as("o_title"),
call("LAST_VALUE_STRING", $("description")).as("o_description")
);
The questions are:
- Is our approach correct to get the data stored in the Flink system?
- Is it necessary to use the *LastValueAggFunction *in our case ? as we
want to retract the stream to
out custom Pojo instead of *Row*, but we're getting the attached error:
(attached*: stack_trace.log*)
Abdelilah Choukdi,
Backend dev at ManoMano.
stack_trace.log
Description: Binary data
