Re: Request to provide sample codes on Data stream using flink, Spring Boot & kafka

2024-02-08 Thread David Anderson
For a collection of several complete sample applications using Flink with
Kafka, see https://github.com/confluentinc/flink-cookbook.

And I agree with Marco -- in fact, I would go farther, and say that using
Spring Boot with Flink is an anti-pattern.

David

On Wed, Feb 7, 2024 at 4:37 PM Marco Villalobos 
wrote:

> Hi Nida,
>
> You can find sample code for using Kafka here:
> https://kafka.apache.org/documentation/
> You can find sample code for using Flink here:
> https://nightlies.apache.org/flink/flink-docs-stable/
> You can find sample code for using Flink with Kafka here:
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/
>
> You can find sample code for using Spring Boot here:
> https://docs.spring.io/spring-boot/docs/3.2.2/reference/htmlsingle/
> You can find sample code for using Spring Boot with Kafka here:
> https://docs.spring.io/spring-boot/docs/3.2.2/reference/htmlsingle/#messaging.kafka
>
> As far as sample code for using Spring Boot with Apache Flink in the same
> process, you won't find it because both technologies solve different
> problems. Apache Flink is stream programming. Code is submitted to a Flink
> cluster.
>
> Spring Boot is micro-services, IoC, integration, application framework for
> building stand-alone applications (it doesn't run on a cluster).
>
> You don't need Spring Boot in an Apache Flink application and there is now
> way to use Apache Flink within a Spring Boot application.
>
> But, maybe can elaborate on why you think it is necessary to use Spring
> Boot with Apache Flink?
>
>
> Why would you need Spring Boot to for a Flink Job?
>
> > On Feb 6, 2024, at 3:22 AM, Fidea Lidea  wrote:
> >
> > Hi Team,
> >
> > I request you to provide sample codes on data streaming using flink,
> kafka and spring boot.
> >
> > Awaiting your response.
> >
> > Thanks & Regards
> > Nida Shaikh
>
>


RE: 退订

2024-02-08 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On 2024/02/06 04:15:48 杨作青 wrote:
>   
> 
> 

Re: sink upsert materializer in SQL job

2024-02-08 Thread Marek Maj
Hi Xuyang,
Thank you for the explanation, table.exec.sink.upsert-materialize =
FORCE config
was set unnecessarily, I just redeployed the job and confirmed that when
using default AUTO, materializer is still on

Thank you for the example you provided. My understanding of upsert key was
exactly as yours before, but I have not been able to reproduce that

When executing EXPLAIN CHANGELOG_MODE statement only information about
using materializer is visible, no information about the upsert key is
printed. After slightly modifying flink lib to add log statements in
SinkUpsertMaterializer, at runtime tm logs show that variable
inputUpsertKey is empty, hasUpsertKey boolean value is false

Let me describe my observations based on two examples

EXAMPLE 1
First example is taken directly from this article [1]:

> -- CDC source tables:  s1 & s2
> s1: id BIGINT, level BIGINT, PRIMARY KEY(id)
> s2: id BIGINT, attr VARCHAR, PRIMARY KEY(id)

-- sink table: t1
> t1: id BIGINT, level BIGINT, attr VARCHAR, PRIMARY KEY(id)


-- join s1 and s2 and insert the result into t1
> INSERT INTO t1
> SELECT
>   s1.*, s2.attr
> FROM s1 JOIN s2
>   ON s1.level = s2.id


When I run this simplified example, that just joins two tables, I get an *empty
upsert key* exactly as it is stated in the article. Again at runtime tm
logs show that variable inputUpsertKey is empty, hasUpsertKey boolean value
is false. I do not see information about upsert key when running EXPLAIN
statement, however an information about join seems to be important:

> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]


EXAMPLE 2
Second example is slightly modified. Unique keys are used in the join and
sink PK differs from join key:

> s1: id BIGINT, level BIGINT, PRIMARY KEY(id)
> s2: id BIGINT, attr VARCHAR, PRIMARY KEY(id)

t1: id BIGINT, level BIGINT, attr VARCHAR, PRIMARY KEY( level )


> INSERT INTO t1
> SELECT
>   s1.*, s2.attr
> FROM s1 JOIN s2
>   ON s1.id = s2.id


This time* upsert key is defined*. In the above inputUpsertKey is defined
as 'id', hasUpsertKey boolean value is true. Additionally, information
about join using unique key is printed in the plan when executing EXPLAIN:

> leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey]


That seems to be correct behavior. SinkUpsertMaterializer will use that
upsert key when comparing incoming event with the historical events in its
state

According to the SinkUpsertMaterializer code, whenever upsert key is empty
(first example) whole row equaliser is used to find last matched value in
state. If the whole row needs to be matched, in some jobs it may
potentially lead to undesirable final ordering due to using TemporalJoins
(some state gets cleared when watermark progresses, even if global ttl = 0)
and/or non-deterministic calculations for some columns (like adding column
with value LOCALTIMESTAMP just before the sink). At the same time, sink
upsert materializer will still be turned on automatically which may suggest
to the user that it is ordering events correctly. Maybe we could add more
documentation on that use cases?

I am eager to hear what do you think

best regards
Marek

[1]
https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness

pt., 2 lut 2024 o 12:30 Xuyang  napisał(a):

> Hi, Maj.
>
> > 1. Does the materializer support jobs containing different types of
> joins (more specifically regular and temporal joins)?
> > 2. Does the materializer support different types of input connectors:
> kafka with both debezium-avro-confluent and avro-confluent formats and
> upsert-kafka with avro-confluent format? All with well defined primary
> key (PK)
>
> The common answer to both questions is "no." The upsert materializer is
> only related to the sink and the node before the sink (usually a join or an
> aggregation, etc.).
>
> By default (with table.exec.sink.upsert-materialize = AUTO), the upsert
> materializer will appear when the upsert key of the upstream node before
> the sink and the pk of the sink do not match. Usually, we do not need to
> manually set this parameter to FORCE.
>
>
> Suppose we have a source table T1, with a schema of "a", "b", "c", and "a"
> is the pk. Downstream, "b" is used as the join key to join with table T2,
> and the result is written into table T3, where "a" is also the pk. The
> global parallelism is set to 2.
>
> The source will issue (+I, a1, b1, c1), (-U, a1, b1, c1), (+U, a1, b2,
> c2). Because the join key is "b", the upsert key for the join becomes "b",
> which does not match the sink's pk "a", hence a sink materializer is
> produced.
>
> Since the join key is "b", (+I, a1, b1, c1) and (-U, a1, b1, c1) will be
> sent to the first parallel instance of the join "join1", and (+U, a1, b2,
> c2) will be sent to the second parallel instance of the join "join2". At
> the same time, since the sink's pk is "a", these three pieces of data are
> actually related in sequence at the sink.
>
>

flink作业实时数据质量监控告警要如何实现?

2024-02-08 Thread casel.chen
我们在使用flink搭建实时数仓,想知道flink作业是如何做数据质量监控告警的?包括数据及时性、完整性、一致性、准确性等
调研了spark streaming有amazon deequ和apache 
griffin框架来实现,想知道flink作业有没有类似的DQC框架?最好是对原有作业无侵入或者少侵入。
如果没有的话,实时数据质量这块一般是如何实现的呢?
如果每个生产作业都要单独配置一个DQC作业是不是代价太高了?有没有通过metrics暴露数据质量信息的呢?


下面是deequ使用的示例,检查每个微批数据是否满足规则要求。我们也有类似的数据质量检查需求


VerificationSuite().onData(df)
  .addCheck(Check(CheckLevel.Error, "this a unit test")
.hasSize(_ == 5) // 判断数据量是否是5条
.isComplete("id") // 判断该列是否全部不为空
.isUnique("id") // 判断该字段是否是唯一
.isComplete("productName") // 判断该字段全部不为空
.isContainedIn("priority", Array("high", "low")) // 该字段仅仅包含这两个字段
.isNonNegative("numViews") //该字段不包含负数
.containsURL("description", _ >= 0.5) // 包含url的记录是否超过0.5
.hasApproxQuantile("numViews", 0.5, _ <= 10)
  )
  .run()