Re: 退订
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自 user-zh-unsubscr...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。 Best, Hang [1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8 [2] https://flink.apache.org/community.html#mailing-lists 王新隆 于2024年3月11日周一 10:18写道: > 退订
Re: Entering a busy loop when adding a new sink to the graph + checkpoint enabled
Hi, Xuyang & Daniel. I have checked this part of code. I think it is an expected behavior. As marked in code comments, this loop makes sure that the transactions before this checkpoint id are re-created. The situation Daniel mentioned will happen only when all checkpoint between 1 and 2 fails. If so, we should check why these checkpoints failed. The transaction producer will be used when the DeliveryGuarantee is EXACTLY_ONCE. If other DeliveryGuarantee is accepted, you could use other DeliveryGuarantee to skip it. I think it is better to check whether there are many checkpoints failed, and check the flame graph to make sure this code caused the busyness. Best, Hang Xuyang 于2024年3月11日周一 09:58写道: > Hi, Danny. > When the problem occurs, can you use flame graph to confirm whether the > loop in this code is causing the busyness? > Since I'm not particularly familiar with kafka connector, I can't give you > an accurate reply. I think Hang Ruan is an expert in this field :). > > Hi, Ruan Hang. Can you take a look at this strange situation? > > > -- > Best! > Xuyang > > > 在 2024-03-10 16:49:16,"Daniel Peled" 写道: > > Hello, > > I am sorry I am addressing you personally. > I have tried sending the request in the user group and got no response > > If you can't help me please let me know > And please tell me who can help up > > The problem is as followed: > > We have noticed that when we add a *new kafka sink* operator to the > graph, *and start from the last save point*, the operator is 100% busy > for several minutes and *even 1/2-1 hour* !!! > > The problematic code seems to be the following for-loop in > getTransactionalProducer() method: > > > *org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer* > > private FlinkKafkaInternalProducer > getTransactionalProducer(long checkpointId) { > checkState( > checkpointId > lastCheckpointId, > "Expected %s > %s", > checkpointId, > lastCheckpointId); > FlinkKafkaInternalProducer producer = null; > // in case checkpoints have been aborted, Flink would create > non-consecutive transaction ids > // this loop ensures that all gaps are filled with initialized > (empty) transactions > > > > > > * for (long id = lastCheckpointId + 1; id <= checkpointId; id++) { > String transactionalId = > TransactionalIdFactory.buildTransactionalId( > transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id); > producer = getOrCreateTransactionalProducer(transactionalId); > }* > this.lastCheckpointId = checkpointId; > assert producer != null; > LOG.info("Created new transactional producer {}", > producer.getTransactionalId()); > return producer; > } > > > Since we added a new sink operator the lastCheckpointId is 1, > And if for example the checkpointId is 2, > The loop will be executed for 2 times !!! > > > We have several questions: > 1. Is this behaviour expected ? > 2. Are we doing something wrong ? > 3. Is there a way to avoid this behavior ? > > Best Regards, > Danny > >
退订
退订
Re:Entering a busy loop when adding a new sink to the graph + checkpoint enabled
Hi, Danny. When the problem occurs, can you use flame graph to confirm whether the loop in this code is causing the busyness? Since I'm not particularly familiar with kafka connector, I can't give you an accurate reply. I think Hang Ruan is an expert in this field :). Hi, Ruan Hang. Can you take a look at this strange situation? -- Best! Xuyang 在 2024-03-10 16:49:16,"Daniel Peled" 写道: Hello, I am sorry I am addressing you personally. I have tried sending the request in the user group and got no response If you can't help me please let me know And please tell me who can help up The problem is as followed: We have noticed that when we add a new kafka sink operator to the graph, and start from the last save point, the operator is 100% busy for several minutes and even 1/2-1 hour !!! The problematic code seems to be the following for-loop in getTransactionalProducer() method: org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer private FlinkKafkaInternalProducer getTransactionalProducer(long checkpointId) { checkState( checkpointId > lastCheckpointId, "Expected %s > %s", checkpointId, lastCheckpointId); FlinkKafkaInternalProducer producer = null; // in case checkpoints have been aborted, Flink would create non-consecutive transaction ids // this loop ensures that all gaps are filled with initialized (empty) transactions for (long id = lastCheckpointId + 1; id <= checkpointId; id++) { String transactionalId = TransactionalIdFactory.buildTransactionalId( transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id); producer = getOrCreateTransactionalProducer(transactionalId); } this.lastCheckpointId = checkpointId; assert producer != null; LOG.info("Created new transactional producer {}", producer.getTransactionalId()); return producer; } Since we added a new sink operator the lastCheckpointId is 1, And if for example the checkpointId is 2, The loop will be executed for 2 times !!! We have several questions: 1. Is this behaviour expected ? 2. Are we doing something wrong ? 3. Is there a way to avoid this behavior ? Best Regards, Danny
Re: FlinkSQL connector jdbc 用户密码是否可以加密/隐藏
hi, 东树 隐藏sql中的敏感信息,这个需要外部的大数据平台来做。 比如:StreamPark 的变量管理,可以提前维护好配置信息,编写sql时引用配置,由平台提交至flink时解析sql并替换变量。 Best, Zhongqiang Gong 杨东树 于2024年3月10日周日 21:50写道: > 各位好, >考虑到数据库用户、密码安全性问题,使用FlinkSQL connector > jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password: > CREATE TABLE wordcount_sink ( > word String, > cnt BIGINT, > primary key (word) not enforced > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/flink', > 'username' = 'root', > 'password' = '123456', > 'table-name' = 'wordcount_sink' > );
Re: FlinkSQL connector jdbc 用户密码是否可以加密/隐藏
1. 目前 JDBC connector 本身不支持加密, 我理解可以在提交 SQL 给 SQL 文本来做加解密的操作,或者做一些变量替换来隐藏密码。 2. 可以考虑提前创建好 jdbc catalog,从而避免编写 DDL 暴露密码。 Best, Feng On Sun, Mar 10, 2024 at 9:50 PM 杨东树 wrote: > 各位好, >考虑到数据库用户、密码安全性问题,使用FlinkSQL connector > jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password: > CREATE TABLE wordcount_sink ( > word String, > cnt BIGINT, > primary key (word) not enforced > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/flink', > 'username' = 'root', > 'password' = '123456', > 'table-name' = 'wordcount_sink' > );
Re:Re: Schema Evolution & Json Schemas
退订 At 2024-02-26 20:55:19, "Salva Alcántara" wrote: Awesome Andrew, thanks a lot for the info! On Sun, Feb 25, 2024 at 4:37 PM Andrew Otto wrote: > the following code generator Oh, and FWIW we avoid code generation and POJOs, and instead rely on Flink's Row or RowData abstractions. On Sun, Feb 25, 2024 at 10:35 AM Andrew Otto wrote: Hi! I'm not sure if this totally is relevant for you, but we use JSONSchema and JSON with Flink at the Wikimedia Foundation. We explicitly disallow the use of additionalProperties, unless it is to define Map type fields (where additionalProperties itself is a schema). We have JSONSchema converters and JSON Serdes to be able to use our JSONSchemas and JSON records with both the DataStream API (as Row) and Table API (as RowData). See: - https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json - https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/#managing-a-object State schema evolution is supported via the EventRowTypeInfo wrapper. Less directly about Flink: I gave a talk at Confluent's Current conf in 2022 about why we use JSONSchema. See also this blog post series if you are interested! -Andrew Otto Wikimedia Foundation On Fri, Feb 23, 2024 at 1:58 AM Salva Alcántara wrote: I'm facing some issues related to schema evolution in combination with the usage of Json Schemas and I was just wondering whether there are any recommended best practices. In particular, I'm using the following code generator: - https://github.com/joelittlejohn/jsonschema2pojo Main gotchas so far relate to the `additionalProperties` field. When setting that to true, the resulting POJO is not valid according to Flink rules because the generated getter/setter methods don't follow the java beans naming conventions, e.g., see here: - https://github.com/joelittlejohn/jsonschema2pojo/issues/1589 This means that the Kryo fallback is used for serialization purposes, which is not only bad for performance but also breaks state schema evolution. So, because of that, setting `additionalProperties` to `false` looks like a good idea but then your job will break if an upstream/producer service adds a property to the messages you are reading. To solve this problem, the POJOs for your job (as a reader) can be generated to ignore the `additionalProperties` field (via the `@JsonIgnore` Jackson annotation). This seems to be a good overall solution to the problem, but looks a bit convoluted to me / didn't come without some trial & error (= pain & frustration). Is there anyone here facing similar issues? It would be good to hear your thoughts on this! BTW, this is very interesting article that touches on the above mentioned difficulties: - https://www.creekservice.org/articles/2024/01/09/json-schema-evolution-part-2.html
FlinkSQL connector jdbc 用户密码是否可以加密/隐藏
各位好, 考虑到数据库用户、密码安全性问题,使用FlinkSQL connector jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password: CREATE TABLE wordcount_sink ( word String, cnt BIGINT, primary key (word) not enforced ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/flink', 'username' = 'root', 'password' = '123456', 'table-name' = 'wordcount_sink' );