Re: 退订

2024-03-10 Thread Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh-unsubscr...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

王新隆  于2024年3月11日周一 10:18写道:

> 退订


Re: Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-03-10 Thread Hang Ruan
Hi, Xuyang & Daniel.

I have checked this part of code. I think it is an expected behavior.
As marked in code comments, this loop makes sure that the transactions
before this checkpoint id are re-created.

The situation Daniel mentioned will happen only when all checkpoint between
1 and 2 fails. If so, we should check why these checkpoints failed.
The transaction producer will be used when the DeliveryGuarantee is
EXACTLY_ONCE. If other DeliveryGuarantee is accepted, you could use other
DeliveryGuarantee to skip it.

I think it is better to check whether there are many checkpoints failed,
and check the flame graph to make sure this code caused the busyness.

Best,
Hang

Xuyang  于2024年3月11日周一 09:58写道:

> Hi, Danny.
> When the problem occurs, can you use flame graph to confirm whether the
> loop in this code is causing the busyness?
> Since I'm not particularly familiar with kafka connector, I can't give you
> an accurate reply. I think Hang Ruan is an expert in this field :).
>
> Hi, Ruan Hang. Can you take a look at this strange situation?
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-03-10 16:49:16,"Daniel Peled"  写道:
>
> Hello,
>
> I am sorry I am addressing you personally.
> I have tried sending the request in the user group and got no response
>
> If you can't help me please let me know
> And please tell me who can help up
>
> The problem is as followed:
>
> We have noticed that when we add a *new kafka sink* operator to the
> graph, *and start from the last save point*, the operator is 100% busy
> for several minutes and *even 1/2-1 hour* !!!
>
> The problematic code seems to be the following for-loop in
> getTransactionalProducer() method:
>
>
> *org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*
>
> private FlinkKafkaInternalProducer
> getTransactionalProducer(long checkpointId) {
> checkState(
> checkpointId > lastCheckpointId,
> "Expected %s > %s",
> checkpointId,
> lastCheckpointId);
> FlinkKafkaInternalProducer producer = null;
> // in case checkpoints have been aborted, Flink would create
> non-consecutive transaction ids
> // this loop ensures that all gaps are filled with initialized
> (empty) transactions
>
>
>
>
>
> * for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
> String transactionalId =
> TransactionalIdFactory.buildTransactionalId(
> transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
>   producer = getOrCreateTransactionalProducer(transactionalId);
> }*
> this.lastCheckpointId = checkpointId;
> assert producer != null;
> LOG.info("Created new transactional producer {}",
> producer.getTransactionalId());
> return producer;
> }
>
>
> Since we added a new sink operator the lastCheckpointId is 1,
> And if for example the checkpointId is 2,
> The loop will be executed for 2 times !!!
>
>
> We have several questions:
> 1. Is this behaviour expected ?
> 2. Are we doing something wrong ?
> 3. Is there a way to avoid this behavior ?
>
> Best Regards,
> Danny
>
>


退订

2024-03-10 Thread 王新隆
退订

Re:Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-03-10 Thread Xuyang
Hi, Danny. 
When the problem occurs, can you use flame graph to confirm whether the loop in 
this code is causing the busyness?
Since I'm not particularly familiar with kafka connector, I can't give you an 
accurate reply. I think Hang Ruan is an expert in this field :). 


Hi, Ruan Hang. Can you take a look at this strange situation?




--

Best!
Xuyang




在 2024-03-10 16:49:16,"Daniel Peled"  写道:

Hello,


I am sorry I am addressing you personally.
I have tried sending the request in the user group and got no response


If you can't help me please let me know
And please tell me who can help up


The problem is as followed:


We have noticed that when we add a new kafka sink operator to the graph, and 
start from the last save point, the operator is 100% busy for several minutes 
and even 1/2-1 hour !!!


The problematic code seems to be the following for-loop in 
getTransactionalProducer() method:


org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer



private FlinkKafkaInternalProducer 
getTransactionalProducer(long checkpointId) {
checkState(
checkpointId > lastCheckpointId,
"Expected %s > %s",
checkpointId,
lastCheckpointId);
FlinkKafkaInternalProducer producer = null;
// in case checkpoints have been aborted, Flink would create 
non-consecutive transaction ids
// this loop ensures that all gaps are filled with initialized (empty) 
transactions
for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
String transactionalId =
TransactionalIdFactory.buildTransactionalId(
transactionalIdPrefix, 
kafkaSinkContext.getParallelInstanceId(), id);
producer = getOrCreateTransactionalProducer(transactionalId);
}
this.lastCheckpointId = checkpointId;
assert producer != null;
LOG.info("Created new transactional producer {}", 
producer.getTransactionalId());
return producer;
}





Since we added a new sink operator the lastCheckpointId is 1, 
And if for example the checkpointId is 2,
The loop will be executed for 2 times !!!


We have several questions:
1. Is this behaviour expected ?
2. Are we doing something wrong ?
3. Is there a way to avoid this behavior ?

Best Regards,
Danny


Re: FlinkSQL connector jdbc 用户密码是否可以加密/隐藏

2024-03-10 Thread gongzhongqiang
hi, 东树
   隐藏sql中的敏感信息,这个需要外部的大数据平台来做。
比如:StreamPark 的变量管理,可以提前维护好配置信息,编写sql时引用配置,由平台提交至flink时解析sql并替换变量。

Best,
Zhongqiang Gong

杨东树  于2024年3月10日周日 21:50写道:

> 各位好,
>考虑到数据库用户、密码安全性问题,使用FlinkSQL connector
> jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password:
> CREATE TABLE wordcount_sink (
>  word String,
>  cnt BIGINT,
>  primary key (word) not enforced
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://localhost:3306/flink',
>  'username' = 'root',
>  'password' = '123456',
>  'table-name' = 'wordcount_sink'
> );


Re: FlinkSQL connector jdbc 用户密码是否可以加密/隐藏

2024-03-10 Thread Feng Jin
1. 目前 JDBC connector 本身不支持加密, 我理解可以在提交 SQL 给 SQL 文本来做加解密的操作,或者做一些变量替换来隐藏密码。

2. 可以考虑提前创建好 jdbc catalog,从而避免编写 DDL 暴露密码。


Best,
Feng

On Sun, Mar 10, 2024 at 9:50 PM 杨东树  wrote:

> 各位好,
>考虑到数据库用户、密码安全性问题,使用FlinkSQL connector
> jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password:
> CREATE TABLE wordcount_sink (
>  word String,
>  cnt BIGINT,
>  primary key (word) not enforced
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://localhost:3306/flink',
>  'username' = 'root',
>  'password' = '123456',
>  'table-name' = 'wordcount_sink'
> );


Re:Re: Schema Evolution & Json Schemas

2024-03-10 Thread Jensen
退订
















At 2024-02-26 20:55:19, "Salva Alcántara"  wrote:

Awesome Andrew, thanks a lot for the info!


On Sun, Feb 25, 2024 at 4:37 PM Andrew Otto  wrote:

>  the following code generator
Oh, and FWIW we avoid code generation and POJOs, and instead rely on Flink's 
Row or RowData abstractions.










On Sun, Feb 25, 2024 at 10:35 AM Andrew Otto  wrote:

Hi! 


I'm not sure if this totally is relevant for you, but we use JSONSchema and 
JSON with Flink at the Wikimedia Foundation. 
We explicitly disallow the use of additionalProperties, unless it is to define 
Map type fields (where additionalProperties itself is a schema).


We have JSONSchema converters and JSON Serdes to be able to use our JSONSchemas 
and JSON records with both the DataStream API (as Row) and Table API (as 
RowData).


See:
- 
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json
- 
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/#managing-a-object


State schema evolution is supported via the EventRowTypeInfo wrapper.


Less directly about Flink: I gave a talk at Confluent's Current conf in 2022 
about why we use JSONSchema. See also this blog post series if you are 
interested!


-Andrew Otto
 Wikimedia Foundation




On Fri, Feb 23, 2024 at 1:58 AM Salva Alcántara  wrote:

I'm facing some issues related to schema evolution in combination with the 
usage of Json Schemas and I was just wondering whether there are any 
recommended best practices.


In particular, I'm using the following code generator:


- https://github.com/joelittlejohn/jsonschema2pojo



Main gotchas so far relate to the `additionalProperties` field. When setting 
that to true, the resulting POJO is not valid according to Flink rules because 
the generated getter/setter methods don't follow the java beans naming 
conventions, e.g., see here:


- https://github.com/joelittlejohn/jsonschema2pojo/issues/1589


This means that the Kryo fallback is used for serialization purposes, which is 
not only bad for performance but also breaks state schema evolution.


So, because of that, setting `additionalProperties` to `false` looks like a 
good idea but then your job will break if an upstream/producer service adds a 
property to the messages you are reading. To solve this problem, the POJOs for 
your job (as a reader) can be generated to ignore the `additionalProperties` 
field (via the `@JsonIgnore` Jackson annotation). This seems to be a good 
overall solution to the problem, but looks a bit convoluted to me / didn't come 
without some trial & error (= pain & frustration).


Is there anyone here facing similar issues? It would be good to hear your 
thoughts on this!


BTW, this is very interesting article that touches on the above mentioned 
difficulties:
- 
https://www.creekservice.org/articles/2024/01/09/json-schema-evolution-part-2.html
 



FlinkSQL connector jdbc 用户密码是否可以加密/隐藏

2024-03-10 Thread 杨东树
各位好,
   考虑到数据库用户、密码安全性问题,使用FlinkSQL connector 
jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password:
CREATE TABLE wordcount_sink (
 word String,
 cnt BIGINT,
 primary key (word) not enforced
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://localhost:3306/flink',
 'username' = 'root',
 'password' = '123456',
 'table-name' = 'wordcount_sink'
);