Hi Xuyang,
Thank you for the explanation, table.exec.sink.upsert-materialize =
FORCE config
was set unnecessarily, I just redeployed the job and confirmed that when
using default AUTO, materializer is still on
Thank you for the example you provided. My understanding of upsert key was
exactly as yours before, but I have not been able to reproduce that
When executing EXPLAIN CHANGELOG_MODE statement only information about
using materializer is visible, no information about the upsert key is
printed. After slightly modifying flink lib to add log statements in
SinkUpsertMaterializer, at runtime tm logs show that variable
inputUpsertKey is empty, hasUpsertKey boolean value is false
Let me describe my observations based on two examples
EXAMPLE 1
First example is taken directly from this article [1]:
> -- CDC source tables: s1 & s2
> s1: id BIGINT, level BIGINT, PRIMARY KEY(id)
> s2: id BIGINT, attr VARCHAR, PRIMARY KEY(id)
-- sink table: t1
> t1: id BIGINT, level BIGINT, attr VARCHAR, PRIMARY KEY(id)
-- join s1 and s2 and insert the result into t1
> INSERT INTO t1
> SELECT
> s1.*, s2.attr
> FROM s1 JOIN s2
> ON s1.level = s2.id
When I run this simplified example, that just joins two tables, I get an *empty
upsert key* exactly as it is stated in the article. Again at runtime tm
logs show that variable inputUpsertKey is empty, hasUpsertKey boolean value
is false. I do not see information about upsert key when running EXPLAIN
statement, however an information about join seems to be important:
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]
EXAMPLE 2
Second example is slightly modified. Unique keys are used in the join and
sink PK differs from join key:
> s1: id BIGINT, level BIGINT, PRIMARY KEY(id)
> s2: id BIGINT, attr VARCHAR, PRIMARY KEY(id)
t1: id BIGINT, level BIGINT, attr VARCHAR, PRIMARY KEY( level )
> INSERT INTO t1
> SELECT
> s1.*, s2.attr
> FROM s1 JOIN s2
> ON s1.id = s2.id
This time* upsert key is defined*. In the above inputUpsertKey is defined
as 'id', hasUpsertKey boolean value is true. Additionally, information
about join using unique key is printed in the plan when executing EXPLAIN:
> leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey]
That seems to be correct behavior. SinkUpsertMaterializer will use that
upsert key when comparing incoming event with the historical events in its
state
According to the SinkUpsertMaterializer code, whenever upsert key is empty
(first example) whole row equaliser is used to find last matched value in
state. If the whole row needs to be matched, in some jobs it may
potentially lead to undesirable final ordering due to using TemporalJoins
(some state gets cleared when watermark progresses, even if global ttl = 0)
and/or non-deterministic calculations for some columns (like adding column
with value LOCALTIMESTAMP just before the sink). At the same time, sink
upsert materializer will still be turned on automatically which may suggest
to the user that it is ordering events correctly. Maybe we could add more
documentation on that use cases?
I am eager to hear what do you think
best regards
Marek
[1]
https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness
pt., 2 lut 2024 o 12:30 Xuyang napisał(a):
> Hi, Maj.
>
> > 1. Does the materializer support jobs containing different types of
> joins (more specifically regular and temporal joins)?
> > 2. Does the materializer support different types of input connectors:
> kafka with both debezium-avro-confluent and avro-confluent formats and
> upsert-kafka with avro-confluent format? All with well defined primary
> key (PK)
>
> The common answer to both questions is "no." The upsert materializer is
> only related to the sink and the node before the sink (usually a join or an
> aggregation, etc.).
>
> By default (with table.exec.sink.upsert-materialize = AUTO), the upsert
> materializer will appear when the upsert key of the upstream node before
> the sink and the pk of the sink do not match. Usually, we do not need to
> manually set this parameter to FORCE.
>
>
> Suppose we have a source table T1, with a schema of "a", "b", "c", and "a"
> is the pk. Downstream, "b" is used as the join key to join with table T2,
> and the result is written into table T3, where "a" is also the pk. The
> global parallelism is set to 2.
>
> The source will issue (+I, a1, b1, c1), (-U, a1, b1, c1), (+U, a1, b2,
> c2). Because the join key is "b", the upsert key for the join becomes "b",
> which does not match the sink's pk "a", hence a sink materializer is
> produced.
>
> Since the join key is "b", (+I, a1, b1, c1) and (-U, a1, b1, c1) will be
> sent to the first parallel instance of the join "join1", and (+U, a1, b2,
> c2) will be sent to the second parallel instance of the join "join2". At
> the same time, since the sink's pk is "a", these three pieces of data are
> actually related in sequence at the sink.
>
>