Re: Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-14 Thread Jane Chan
Hi iasiuide,

感谢提问. 先来回答最后一个问题

关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗?
>

Lookup join 的 on condition 会在优化过程中经过一系列改写, 这里只简要对影响 lookup 和 where 的几处进行说明.

1. logical 阶段, FlinkFilterJoinRule 会将 on 条件 split 为针对单边的 (左表/右表) 和针对双边的.
**针对单边的 filter 会被尽量 pushdown 到 join 节点之前** (这意味着有可能会额外生成一个 Filter 节点);
Filter 节点后续如何变化取决于这个 filter 能否 pushdown 到 source, 如果不能, 那么在 physical
阶段它就会变成维表上面 Calc 节点 (denoted by calcOnTemporalTable) 里面的 condition.

2. 在 CommonPhysicalLookupJoin 里解析 allLookupKeys 的时候, 会试图从
calcOnTemporalTable 里把常量条件抽取出来形成最终的 lookup key (也就是 explain plan 里面
lookup=[...] 的内容), 在 explain 时, 只要存在 calcOnTemporalTable, where=[...]
就会被打印出来.

回到具体的 case

为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==>
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>

因为 b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
是针对维表单边的条件且无法被下推. 另外, 这里使用了非确定性函数[1], 请关注结果的正确性.


> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==>
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
>

此时常量可以被提取出来


> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND
> (d.data_source = 'ex_agent' OR d.data_source = 'agent')
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
>

据我所知 lookup 目前应该还不支持 SARGable

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/determinism/

Best,
Jane

On Fri, Mar 8, 2024 at 11:19 AM iasiuide  wrote:

> 好的,已经贴了sql片段
>
> 在 2024-03-08 11:02:34,"Xuyang"  写道:
> >Hi, 你的图挂了,可以用图床或者直接贴SQL
> >
> >
> >
> >
> >--
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >在 2024-03-08 10:54:19,"iasiuide"  写道:
> >
> >
> >
> >
> >
> >下面的sql片段中
> >ods_ymfz_prod_sys_divide_order  为kafka source表
> >dim_ymfz_prod_sys_trans_log   为mysql为表
> >dim_ptfz_ymfz_merchant_info   为mysql为表
> >
> >
> >
> >flink web ui界面的执行计划片段如下:
> >
> > [1]:TableSourceScan(table=[[default_catalog, default_database,
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time),
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))),
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id,
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> >+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time,
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 *
> divide_fee_amt), divide_fee_amt) AS div_fee_amt,
> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time
> AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt
>  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS
> TIMESTAMP(9)), '-MM-dd')))])
> >   +-
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date =
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))],
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts,
> bg_rel_trans_id, pay_type, member_id, mer_name])
> >  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> > +-
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source
> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type,
> member_id, mer_name, pk_id, agent_id, bagent_id])
> >+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name, agent_id, bagent_id])
> >   +-
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id],
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])],
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id,
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
> >  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt,
> ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS
> fagent_id0])
> > +-
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0],
> where=[(data_source = 'agent')], select=[sys_date, create_time,
> div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0,
> fagent_id0, pk_id, agent_name, bagent_name])
> >  
> >
> >
> >为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==>
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> >关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND
> 

Re: emitValueWithRetract issue

2024-01-14 Thread Jane Chan
Hi Adam,

Thanks for reporting this issue. The bug you identified has been fixed in
the release-1.18 and master branches, which will be released in v1.18.2 and
v1.19.0. You can now use the `emitUpdateWithRetract` method with the
expected behavior. More details can be found here[1].


[1] https://issues.apache.org/jira/browse/FLINK-31788

Best,
Jane

On Wed, Apr 12, 2023 at 11:17 PM Feng Jin  wrote:

> hi Adam
>
> As far as I know, there is currently no similar API available,
> but I believe that this feature was accidentally removed and we should add
> it back.
> I have created a Jira to track the progress of this feature.
> https://issues.apache.org/jira/browse/FLINK-31788
>
>
>
> On Tue, Apr 11, 2023 at 12:10 AM Adam Augusta  wrote:
>
>> Many thanks for the sanity check, Feng.
>>
>> It’s a shame this well-documented feature was silently removed.
>> emitValue() creates an unreasonable amount of unnecessary and disruptive
>> chatter on the changelog stream, as evidenced by putting a print table
>> after the flatAggregate. Lots of -D/+I RowData pairs with identical fields.
>>
>> Is there any clean way to set up a stateful group aggregation in the 1.18
>> Table API that doesn’t misbehave in this fashion?
>>
>> On Mon, Apr 10, 2023 at 11:43 AM Feng Jin  wrote:
>>
>>> hi Adam
>>>
>>> I have checked the code and indeed this feature is not available in the
>>> latest version of Flink code.
>>>
>>> This feature was originally implemented in the old planner:
>>> 
>>> https://github.com/apache/flink/pull/8550/files
>>>
>>> However, this logic was not implemented in the new planner , the Blink
>>> planner.
>>>
>>> With the removal of the old planner in version 1.14
>>> https://github.com/apache/flink/pull/16080 , this code was also removed.
>>>
>>>
>>>
>>> Best
>>>
>>> Feng
>>>
>>> On Sat, Apr 8, 2023 at 4:17 AM Adam Augusta  wrote:
>>>
 The TableAggregateFunction javadocs indicate that either "emitValue" or
 "emitUpdateWithRetract" is required.

 But if I implement my TableAggregateFunction with
 "emitUpdateWithRetract", I get a validation error. If I implement both
 methods it works, but emitUpdateWithRetract is not used.

 Peering into the Flink source code, I see that
 ImperativeAggCodeGen validates the presence of emitValue, but is agnostic
 to emitUpdateWithRetract.
 More curiously, Flink's source code doesn't have a single test with a
 TableAggregateFunction that uses emitUpdateWithRetract.

 Is this a ghost feature?

 Thanks,
 Adam

>>>


Re: [DISCUSS][FLINK-32993] Datagen connector handles length-constrained fields according to the schema definition by default

2023-11-21 Thread Jane Chan
Hi Yubin,

Thanks for driving this discussion. Perhaps a specific example can better
illustrate the current issue.

Considering the following DDL, f0 will always be generated with a default
char length of 100, regardless of char(5), bcause the connector option
'fields.f0.length' is not specified [1].

> CREATE TABLE foo (
>f0 CHAR(5)
> ) WITH ('connector' = 'datagen');
>

Since it's often the case for a fixed-length type to specify length
explictly in the DDL, the current design can be confusing for users to some
extent.

However, for the proposed changes, it would be preferable to provide
specific details on how to handle the "not be user-defined" scenario. For
example, should it be ignored or should an exception be thrown?

To be more specific,
1. For fixed-length data types, what happens for the following two DDLs

> CREATE TABLE foo (
>f0 CHAR(5)
> ) WITH ('connector' = 'datagen', 'fields.f0.length' = '10');
>
> CREATE TABLE bar (
>f0 CHAR(5)
> ) WITH ('connector' = 'datagen', 'fields.f0.length' = '1');
>

2. For variable-length data types, what happens for the following two DDLs

> CREATE TABLE meow (
>f0 VARCHAR(20)
> ) WITH ('connector' = 'datagen', 'fields.f0.length' = '10');
>
> CREATE TABLE purr (
>f0 STRING
> ) WITH ('connector' = 'datagen', 'fields.f0.length' = '10');
>

Best,
Jane

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/datagen/#fields-length


On Mon, Nov 20, 2023 at 8:46 PM 李宇彬  wrote:

> Hi everyone,
>
>
> Currently, the Datagen connector generates data that doesn't match the
> schema definition
> when dealing with fixed-length and variable-length fields. It defaults to
> a unified length of 100
> and requires manual configuration by the user. This violates the
> correctness of schema constraints
> and hampers ease of use.
>
>
> Jane Chan and I have discussed offline and I will summarize our discussion
> below.
>
>
> To enhance the datagen connector to automatically generate data that
> conforms to the schema
> definition without additional manual configuration, we propose handling
> the following data types
> appropriately [1]:
>   1. For fixed-length data types (char, binary), the length should be
> defined by the schema definition
>  and not be user-defined.
>   2. For variable-length data types (varchar, varbinary), the length
> should be defined by the schema
>   definition, but allow for user-defined lengths that are smaller
> than the schema definition.
>
>
>
> Looking forward to your feedback :)
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-32993
>
>
> Best,
> Yubin
>
>


Re: Flink SQL的状态清理

2023-10-17 Thread Jane Chan
Hi, 你好

如果使用的是 standalone session cluster, 想要在 JM/TM 日志中看到参数打印出来, 需要在集群启动前在
flink-conf.yaml 配置 table.exec.state.ttl: '${TTL}', 再启动集群;
集群启动后再修改的话, 日志不会打印出来, 可以通过 SET; 命令查看当前 SQL CLI 中配置的参数.
另外, 需要先执行 SET 'table.exec.state.ttl' = '${TTL}' , 然后提交作业, 可以确认下操作顺序是否有误.

祝好!
Jane

On Mon, Oct 9, 2023 at 6:01 PM 小昌同学  wrote:

> 你好,老师,我也是这样设置的,我这边是flink sql client,但是我去flink web ui界面并没有看到这个配置生效。
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
> ---- 回复的原邮件 
> | 发件人 | Jane Chan |
> | 发送日期 | 2023年9月25日 11:24 |
> | 收件人 |  |
> | 主题 | Re: Flink SQL的状态清理 |
> Hi,
>
> 可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86
>
> Best,
> Jane
>
> On Thu, Sep 21, 2023 at 5:17 PM faronzz  wrote:
>
> 试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")
>
>
>
>
> | |
> faronzz
> |
> |
> faro...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | 小昌同学 |
> | 发送日期 | 2023年09月21日 17:06 |
> | 收件人 | user-zh |
> | 主题 | Flink SQL的状态清理 |
>
>
> 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>


Re: Flink SQL的状态清理

2023-09-24 Thread Jane Chan
Hi,

可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86

Best,
Jane

On Thu, Sep 21, 2023 at 5:17 PM faronzz  wrote:

> 试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")
>
>
>
>
> | |
> faronzz
> |
> |
> faro...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | 小昌同学 |
> | 发送日期 | 2023年09月21日 17:06 |
> | 收件人 | user-zh |
> | 主题 | Flink SQL的状态清理 |
>
>
> 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: Install Flink Document as a PDF Book

2023-09-14 Thread Jane Chan
Hi Yunhui,

If you need to access the documentation offline, you can clone the Flink
project locally and build the documentation following the instructions in
the readme file. Once built, you can view it by opening localhost:1313 in
your browser. However, if you prefer a PDF version, you may find the
following link [1] helpful.

[1] https://discourse.gohugo.io/t/generate-hugo-website-as-a-pdf/22855/4

Best,
Jane

On Thu, Sep 14, 2023 at 11:44 PM Yunhui Han  wrote:

> Hi, all
>
> How can I install the latest Flink Document as a PDF book?
>
> Best
>


Re: 退订

2023-09-03 Thread Jane Chan
退订请发送邮件至 user-zh-unsubscr...@flink.apache.org

Best,
Jane

On Sun, Sep 3, 2023 at 6:15 PM lei-tian  wrote:

> 退订
>
>
> | |
> lei-tian
> |
> |
> totorobabyf...@163.com
> |


Re: Query on Flink SQL primary key for nested field

2023-07-11 Thread Jane Chan
Hi Elakiya,

Did you encounter a ParserException when executing the DDL? AFAIK, Flink
SQL does not support declaring a nested column (compound identifier) as
primary key at syntax level.

A possible workaround is to change the schema to not contain record type,
then you can change the DDL to the following

CREATE TABLE Employee (
  id STRING PRIMARY KEY NOT ENFORCED,
  name STRING
) WITH (
  ...
)

Best regards,
Jane

On Mon, Jul 10, 2023 at 7:32 PM elakiya udhayanan 
wrote:

> Hi Hang,
>  Once again thanks for your response, but I think you have misunderstood
> my question. At present we are only using the DDL format of Table API and
> the only issue we face is , we are unable to set the primary key field for
> the Flink table since the value we want to use as primary key is present
> inside the object as mentioned in my question earlier.
>
> Re-posting my question again here:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *I have a Kafka topic named employee which uses confluent avro schema and
> will emit the payload as below:{"employee": {"id": "123456","name":
> "sampleName"}}I am using the upsert-kafka connector to consume the events
> from the above Kafka topic as below using the Flink SQL DDL statement, also
> here I want to use the id field as the Primary key. But I am unable to use
> the id field since it is inside the object.DDL Statement:String statement =
> "CREATE TABLE Employee (\r\n" +"  employee  ROW(id STRING, name STRING\r\n"
> +"  ),\r\n" +"  PRIMARY KEY (employee.id ) NOT
> ENFORCED\r\n" +") WITH (\r\n" +"  'connector' = 'upsert-kafka',\r\n" +"
>  'topic' = 'employee',\r\n" +"  'properties.bootstrap.servers' =
> 'kafka-cp-kafka:9092',\r\n" +"  'key.format' = 'raw',\r\n" +"
>  'value.format' = 'avro-confluent',\r\n" +"  'value.avro-confluent.url' =
> 'http://kafka-cp-schema-registry:8081
> ',\r\n" +")";*
>
> On Mon, Jul 10, 2023 at 2:27 PM Hang Ruan  wrote:
>
>> Hi, Elakiya.
>>
>> If everything is right for the KafkaTable, I think there must be a
>> `user_id` field in the Kafka message key.
>> We could see the code in the method `createKeyValueProjections` of
>> `UpsertKafkaDynamicTableFactory` as follows.
>>
>> ```
>> private Tuple2
>> createKeyValueProjections(ResolvedCatalogTable catalogTable) {
>> ResolvedSchema schema = catalogTable.getResolvedSchema();
>> // primary key should validated earlier
>> List keyFields =
>> schema.getPrimaryKey().get().getColumns();
>> DataType physicalDataType = schema.toPhysicalRowDataType();
>>
>> Configuration tableOptions =
>> Configuration.fromMap(catalogTable.getOptions());
>> // upsert-kafka will set key.fields to primary key fields by
>> default
>> tableOptions.set(KEY_FIELDS, keyFields);
>>
>> int[] keyProjection = createKeyFormatProjection(tableOptions,
>> physicalDataType);
>> int[] valueProjection = createValueFormatProjection(tableOptions,
>> physicalDataType);
>>
>> return Tuple2.of(keyProjection, valueProjection);
>> }
>> ```
>> The primary keys will be put in the KEY_FIELDS option to create the key
>> format projection, which will be used to get fields from Kafka message key.
>>
>> Best,
>> Hang
>>
>> elakiya udhayanan  于2023年7月10日周一 16:41写道:
>>
>>> Hi Hang,
>>>
>>>  The select query works fine absolutely, we have also implemented join
>>> queries which also works without any issues.
>>>
>>> Thanks,
>>> Elakiya
>>>
>>> On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan 
>>> wrote:
>>>
 Hi, Elakiya.

 Maybe this DDL could be executed. Please execute the select sql `select
 * from KafkaTable`. Then I think there will be some error or the `user_id`
 will not be read correctly.

 Best,
 Hang

 elakiya udhayanan  于2023年7月10日周一 16:25写道:

> Hi Hang Ruan,
>
> Thanks for your response. But in the documentation, they have an
> example of defining the Primary Key for the DDL statement (code below). In
> that case we should be able to define the primary key for the DDL rite. We
> have defined the primary key in our earlier use cases when it wasn't a
> nested field. Please correct me , If I have misunderstood anything.
>
> CREATE TABLE KafkaTable (  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',  
> `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  PRIMARY KEY 
> (`user_id`) NOT ENFORCED) WITH (  'connector' = 'upsert-kafka',  ...  
> 'key.format' = 'json',  'key.json.ignore-parse-errors' = 'true',  
> 'value.format' = 'json',  'value.json.fail-on-missing-field' = 'false',  
> 'value.fields-include' = 'EXCEPT_KEY')
>
> Thanks,
> Elakiya
>
> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan 
> wrote:
>
>> Hi, elakiya.
>>
>> The upsert-kafka connector will read the primary keys from the Kafka
>> message keys. We cannot define the fields in the Kafka message values as
>> 

Re: table.exec.state.ttl not working as expected

2023-06-25 Thread Jane Chan
Hi,

>From the attachment, I saw there are IntervalJoin and GroupWindowAggregate
operators. AFAIK the state retention for such operators is not controlled
by `table.exec.state.ttl`.
Could you share the operator-level state metrics to help identify the
issue?

Best,
Jane

On Sun, Jun 25, 2023 at 10:38 AM Hangxiang Yu  wrote:

> Hi, neha.
>
> Could you share more information:
>
>1. Which State Backend are you using? If it's RocksDB, is incremental
>checkpointing enabled?
>2. Which specific operator is experiencing an increase in Checkpoint
>data size? (You can check the Checkpoint size changes of different subtasks
>from the Checkpoint History in the Flink UI)
>3. Has there been any change in data flow and input data during this
>time?
>
>
> On Fri, Jun 23, 2023 at 2:01 PM neha goyal  wrote:
>
>> Hello,
>>
>> I have assigned env.getConfig().set("table.exec.state.ttl", "180 s") to
>> my table environment. Even after that, I can see continuous growth in
>> savepoint size.
>>
>> I am attaching the screenshot of the job graph and savepoint metric.
>> I am also adding the query that I am running on Kafka streams, It is a
>> lengthy query. Any help would be highly appreciated.
>>
>> SELECT
>>   *
>> from
>>   (
>> With Actuals as (
>>   SELECT
>> a1.orderId,
>> a1.zoneId,
>> a3.cityId,
>> case
>>   when a2.status = 'delivered' then round(
>> CAST(
>>   (
>> Cast(a2.server_time_stamp AS BIGINT) -
>> Cast(a1.server_time_stamp AS BIGINT)
>>   ) AS DOUBLE
>> ) / CAST(6 AS DOUBLE),
>> 4
>>   )
>>   when CAST(
>> CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
>> DOUBLE
>>   ) / CAST(60 * 1000 AS DOUBLE) > cast(lmTime as DOUBLE) then
>> CAST(
>> CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
>> DOUBLE
>>   ) / CAST(60 * 1000 AS DOUBLE)
>>   else null
>> end AS P2D_inclusive,
>> case
>>   when a2.status = 'delivered' then round(
>> CAST(
>>   (
>> Cast(a2.server_time_stamp AS BIGINT) -
>> Cast(a1.server_time_stamp AS BIGINT)
>>   ) AS DOUBLE
>> ) / CAST(6 AS DOUBLE),
>> 4
>>   )
>>   else null
>> end as P2D_exclusive,
>> cast(lmTime as DOUBLE) as PP2D,
>> case
>>   when a2.status = 'delivered' then exp(
>> (
>>   (
>> Cast(a2.server_time_stamp AS BIGINT) - CURRENT_TIMESTAMP
>>   ) /(60 * 1000)
>> ) / 100
>>   )
>>   else 1
>> end as recency_wt,
>> case
>>   when a2.status = 'delivered' then 1
>>   else 0
>> end as delivered_flag,
>> case
>>   when a2.status = 'delivered' then a2.proctime
>>   else a1.proctime
>> end as proctime
>>   FROM
>> (
>>   select
>> distinct orderId,
>> zoneId,
>> server_time_stamp,
>> proctime
>>   from
>> my_streamtable
>>   where
>> status = 'pickedup'
>> ) a1
>> LEFT JOIN (
>>   select
>> distinct orderId,
>> zoneId,
>> status,
>> server_time_stamp,
>> proctime
>>   from
>> my_streamtable
>>   where
>> status = 'delivered'
>> ) a2 ON a1.orderId = a2.orderId
>> AND a2.proctime BETWEEN a1.proctime - interval '60' minute
>> AND a1.proctime + interval '60' minute
>> INNER JOIN (
>>   select
>> distinct orderId,
>> cityId,
>> lmTime,
>> proctime
>>   from
>> my_streamtable2
>>   where
>> orderId is not null
>> ) a3 ON cast(a1.orderId as VARCHAR) = cast(a3.orderId as VARCHAR)
>> AND a3.proctime BETWEEN a1.proctime - interval '60' minute
>> AND a1.proctime + interval '60' minute
>> ),
>> zone_count as(
>>   select
>> zoneId,
>> proctime() as proctime,
>> COUNT(orderId) as counts_inclusive,
>> sum(delivered_flag) as counts_exclusive,
>> AVG(cityId) as cityId
>>   from
>> Actuals
>>   where
>> P2D_inclusive is not null
>>   group by
>> HOP(
>>   proctime(),
>>   interval '5' minute,
>>   interval '60' minute
>> ),
>> zoneId
>> ),
>> zone_agg as (
>>   select
>> zoneId,
>> sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
>> zone_quotient_inclusive,
>> sum(recency_wt *(P2D_exclusive - PP2D)) / sum(recency_wt) as
>> zone_quotient_exclusive,
>> avg(cityId) as cityId,
>> proctime() as proctime
>>   

Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-30 Thread Jane Chan
Congratulations!

Best regards,
Jane

On Thu, Mar 30, 2023 at 1:38 PM Jiadong Lu  wrote:

> Congratulations !!!
>
> Best,
> Jiadong Lu
>
> On 2023/3/27 17:23, Yu Li wrote:
> > Dear Flinkers,
> >
> >
> >
> As you may have noticed, we are pleased to announce that Flink Table Store 
> has joined the Apache Incubator as a separate project called Apache 
> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
> streaming data lake platform for high-speed data ingestion, change data 
> tracking and efficient real-time analytics, with the vision of supporting a 
> larger ecosystem and establishing a vibrant and neutral open source community.
> >
> >
> >
> We would like to thank everyone for their great support and efforts for the 
> Flink Table Store project, and warmly welcome everyone to join the 
> development and activities of the new project. Apache Flink will continue to 
> be one of the first-class citizens supported by Paimon, and we believe that 
> the Flink and Paimon communities will maintain close cooperation.
> >
> >
> > 亲爱的Flinkers,
> >
> >
> > 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入
> > Apache 孵化器独立孵化 [1] [2] [3]。新项目的名字是
> > Apache Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订
> > 阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,
> > 并建立一个充满活力和中立的开源社区。
> >
> >
> > 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢
> > 迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon 支持的
> > 主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。
> >
> >
> > Best Regards,
> >
> > Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
> >
> >
> > 致礼,
> >
> > 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
> >
> >
> > [1] https://paimon.apache.org/ 
> >
> > [2] https://github.com/apache/incubator-paimon
> > 
> >
> > [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
> > 
> >
>


Re: flink写入mysql数据异常

2023-03-23 Thread Jane Chan
附件还是没有收到哦.

Flink SQL 支持 INSERT INTO table_identifier (column_identifier1 [,
column_identifier2, ...]) 插入指定列, 具体语法可以参考 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries

On Thu, Mar 23, 2023 at 5:35 PM 小昌同学  wrote:

> 您好,我刚刚重新上传了附件;是的,Flink
> SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导
> 小昌同学
> ccc0606fight...@163.com
>
> <https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>
> ---- 回复的原邮件 
> 发件人 Jane Chan 
> 发送日期 2023年3月23日 15:42
> 收件人  
> 主题 Re: flink写入mysql数据异常
> Hi,
>
> 没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键,
> 并且与数据库中物理表主键保持一致. 可以参考 [1].
>
> [1]
>
> https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86
>
> On Thu, Mar 23, 2023 at 2:54 PM 小昌同学  wrote:
>
> 大佬,你好,代码上传在附件中了;
> 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?
>
> 小昌同学
> ccc0606fight...@163.com
>
> <
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D
> >
>  回复的原邮件 
> 发件人 Jane Chan 
> 发送日期 2023年3月23日 14:23
> 收件人  
> 主题 Re: flink写入mysql数据异常
> 可以把完整 SQL 发出来看看
>
> 祝好!
> Jane
>
> On Thu, Mar 23, 2023 at 1:39 PM 小昌同学  wrote:
>
> 使用flink
> sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert
> ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>
>
>


Re: flink写入mysql数据异常

2023-03-23 Thread Jane Chan
Hi,

没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键,
并且与数据库中物理表主键保持一致. 可以参考 [1].

[1]
https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86

On Thu, Mar 23, 2023 at 2:54 PM 小昌同学  wrote:

> 大佬,你好,代码上传在附件中了;
> 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?
>
> 小昌同学
> ccc0606fight...@163.com
>
> <https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>
> ---- 回复的原邮件 
> 发件人 Jane Chan 
> 发送日期 2023年3月23日 14:23
> 收件人  
> 主题 Re: flink写入mysql数据异常
> 可以把完整 SQL 发出来看看
>
> 祝好!
> Jane
>
> On Thu, Mar 23, 2023 at 1:39 PM 小昌同学  wrote:
>
> 使用flink
> sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert
> ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>
>


Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jane Chan
Hi,

如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
map ['msg_code','0', 'msg_reason', cast('abc' as string)]

如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/

祝好!
Jane

On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:

> 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
> >Hi,
> >
> >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> >
> >Sincerely,
> >Shuo
> >
> >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> >
> >> 复制执行我提供的两个sql就一定会复现!
> >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> >> 这个问题是这个版本calcite引起的。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-22 09:28:17,"Jeff"  写道:
> >> >bug地址:
> >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> >> >
> >> >
> >> >bug详细内容:
> >> >the values of map are truncated by the CASE WHEN
> function.
> >> >// sql
> >> >create table test (a map) with ('connector'='print');
> >> >insert into test  select * from (values(case when true then
> >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> >> end));
> >> >
> >> >the result:
> >> >
> >> >+I[{test=123}]
> >> >
> >> >We hope the value of result is '123456789', but I get '123', the length
> >> is limited by 'abc'.
> >>
>


Re: regular join每条流单独设置ttl

2023-03-22 Thread Jane Chan
Hi,

我在社区发起了在 Operator 粒度设置 State TTL 的讨论 [1], 支持为每条流单独设置 TTL, 欢迎参与讨论 :)

[1] https://lists.apache.org/thread/ffmc96gv8ofoskbxlhtm7w8oxv8nqzct

Best,
Jane

On Wed, Feb 15, 2023 at 1:26 PM Jane Chan  wrote:

> 你好,
>
> 目前 Flink SQL 还不支持为每条流单独设置 state TTL, 不过社区计划支持这个功能, 最近就会有 FLIP 提出, 也欢迎参与讨论.
>
> Best regards,
> Jane
>
> On Wed, Feb 15, 2023 at 11:13 AM Jason_H  wrote:
>
>> 大家好,
>> 我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular
>> join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2
>>
>>
>> | |
>> Jason_H
>> |
>> |
>> hyb_he...@163.com
>> |
>
>


Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jane Chan
Hi,

这是 Calcite 的一个 bug[1], 已经在 1.27.0 上修复. 不过由于 Flink 1.15.1, 1.15.2 和 1.16.1
都依赖 Calcite 1.26.0, 所以目前只能尝试如下方式绕过, 可以等 release-1.17 发布后升级到新版本上, 应该不会再有问题了.

select * from (values(case when true then map['test','123456789'] else
map ['msg_code','0', 'msg_reason', cast('abc' as string)] end));


[1] https://issues.apache.org/jira/browse/CALCITE-4603

Best,
Jane

On Wed, Mar 22, 2023 at 11:49 AM tison  wrote:

> 你可以关注下发布动态,测试一下 RC
> https://lists.apache.org/thread/d9o0tgnv0fl9goqsdo8wmq9121b9wolv
>
> Best,
> tison.
>
>
> tison  于2023年3月22日周三 11:47写道:
>
> > Flink master 上 calcite 的版本是 1.29,看起来会在 Flink 1.17 release 出来
> >
> > Best,
> > tison.
> >
> >
> > Shuo Cheng  于2023年3月22日周三 11:42写道:
> >
> >> Hi,
> >>
> >> 如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> >>
> >> Sincerely,
> >> Shuo
> >>
> >> On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> >>
> >> > 复制执行我提供的两个sql就一定会复现!
> >> > 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> >> > 这个问题是这个版本calcite引起的。
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > 在 2023-03-22 09:28:17,"Jeff"  写道:
> >> > >bug地址:
> >> > >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> >> > >
> >> > >
> >> > >bug详细内容:
> >> > >the values of map are truncated by the CASE WHEN
> >> function.
> >> > >// sql
> >> > >create table test (a map) with
> ('connector'='print');
> >> > >insert into test  select * from (values(case when true then
> >> > map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> >> > end));
> >> > >
> >> > >the result:
> >> > >
> >> > >+I[{test=123}]
> >> > >
> >> > >We hope the value of result is '123456789', but I get '123', the
> length
> >> > is limited by 'abc'.
> >> >
> >>
> >
>


Re: Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-08 Thread Jane Chan
从 plan 上看起来在 sink 节点这里因为推导不出 upsert key 加上了 SinkUpsertMaterializer[1],
这里会按照 sink 表定义的主键进行 keyby shuffle[2], 只能保证最终一致性.
另外你的操作描述中 schema 为三列, 但 DDL 是四列, 且格式乱了.

一些可能的建议如下

1. 如果上游数据有主键并且也是 rowid 的话, 建议在 Flink source 表上声明 PK, 避免额外生成 materializer
节点; 同时注意在声明 Flink source 表时不要带上 metadata 列 (比如 op), 这会导致非确定性更新[3].
2. 检查写入 MySQL 数据库中的物理表 PK 字段是否和 Flink SQL sink 表的 PK 字段保持一致.

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
[2]
https://github.com/apache/flink/blob/3ea83baad0c8413f8e1f4a027866335d13789538/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L378
[3]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/determinism/#31-%e6%b5%81%e4%b8%8a%e7%9a%84%e4%b8%8d%e7%a1%ae%e5%ae%9a%e6%80%a7

Best,
Jane

On Mon, Mar 6, 2023 at 11:24 AM 陈佳豪  wrote:

> 刚做了一下测试
> 目前假定有3行数据需要同步(全量):
> | 编号 |
> 电话
> |
> 座机
> |
> | 1 |
> 1311313
> |
> 123
> |
> | 2 |
> 1311313
> |
> 456
> |
> | 3 |
> 1311313
> |
> 789
> |
>
>
>
>
> 这个时候我修改第四行数据的两个字段(增量):
> | 1
>
>
> |
> 电话
> |
> 座机
> |
> | 1 |
> 1311313
> |
> 123
> |
> | 2 |
> 1311313
> |
> 456
> |
> | 3 |
> 13113133110
> |
> 888
> |
> 修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确).
> 然后我继续删除数据3这个时候就不对了,在flink里面有修改两次的缓存数据,所以删除的同时将原来的旧数据插入进了mysql中(操作错误).
>
> 上述是我基于flink1.16.1版本进行测试的结果,目前不知道是不是要配置flink还是下游算子具体配置什么也不是清楚。这个问题困扰有3周了,各种测试调整都没有起作用。
>
>
>
>
>
>
>
>
> 在 2023-03-06 10:54:23,"陈佳豪"  写道:
> >hi 早上好
>
> >我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下
> >
> >== Abstract Syntax Tree ==
> >LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID,
> 名称, 手机, 座机])
> >+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"],
> 名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
> 座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"])
> >   +- LogicalTableScan(table=[[default_catalog, default_database, 电话]])
> >
> >
> >== Optimized Physical Plan ==
> >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称,
> 手机, 座机], upsertMaterialize=[true])
> >+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS
> rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机,
> CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS
> 座机])
> >   +- TableSourceScan(table=[[default_catalog, default_database, 电话]],
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad,
> 63fd660536521f81a2cfabae])
> >
> >
> >== Optimized Execution Plan ==
> >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称,
> 手机, 座机], upsertMaterialize=[true])
> >+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID,
> 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS
> VARCHAR(2147483647)) AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255))
> AS 座机])
> >   +- TableSourceScan(table=[[default_catalog, default_database, 电话]],
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad,
> 63fd660536521f81a2cfabae])
> >
> >
> >
> >在 2023-03-05 15:37:53,"Jane Chan"  写道:
> >>Hi,
> >>
> >>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在
> 1.16.1
> >>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
> >>打印出来看看.
> >>
> >>[1]
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
> >>
> >>祝好!
> >>Jane
> >>
> >>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪  wrote:
> >>
> >>> hi 你好
> >>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2023-03-02 11:52:41,"Jane Chan"  写道:
> >>> >Hi,
> >>> >
> >>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本,
> 这个
> >>> >query 在 1.16.2 上验证没有问题
> >>> >
> >>> >[1]
> >>> >
> >>>
> https://nightlies

Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-04 Thread Jane Chan
Hi,

抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1
上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
打印出来看看.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/

祝好!
Jane

On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪  wrote:

> hi 你好
> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-02 11:52:41,"Jane Chan"  写道:
> >Hi,
> >
> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
> >query 在 1.16.2 上验证没有问题
> >
> >[1]
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
> >
> >Best,
> >Jane
> >
> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪  wrote:
> >
> >> flink ,kafka连接 jdbc连接版本都是1.15.2的
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-01 18:14:35,"陈佳豪"  写道:
> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
> >> >String kafka = "CREATE TABLE `电话` (`rowid`
> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
> >> 'connector' = 'kafka', 'topic' =
> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
> >> 'properties.bootstrap.servers' = '132.232.27.116:9092',
> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
> >> >
> >> >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称`
> >> STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT
> >> ENFORCED  )  WITH ('connector' = 'jdbc','driver' =
> >> 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://
> >> 43.136.128.102:6506/meihua_test','username' = 'root',
> 'password' =
> >> '123456','table-name' = '电话2'  )";
> >> >
> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as
> `座机`
> >> from `电话` ) as t_1";
> >> >
> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
> >>
>


Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 Thread Jane Chan
Hi,

可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
query 在 1.16.2 上验证没有问题

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/

Best,
Jane

On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪  wrote:

> flink ,kafka连接 jdbc连接版本都是1.15.2的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-01 18:14:35,"陈佳豪"  写道:
> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
> >String kafka = "CREATE TABLE `电话` (`rowid`
> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
> 'connector' = 'kafka', 'topic' =
> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
> 'properties.bootstrap.servers' = '132.232.27.116:9092',
> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
> >
> >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称`
> STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT
> ENFORCED  )  WITH ('connector' = 'jdbc','driver' =
> 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://
> 43.136.128.102:6506/meihua_test','username' = 'root','password' =
> '123456','table-name' = '电话2'  )";
> >
> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机`
> from `电话` ) as t_1";
> >
> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
>


Re:

2023-02-25 Thread Jane Chan
退订请发送邮件至 user-zh-unsubscr...@flink.apache.org

Best,
Jane

On Fri, Feb 24, 2023 at 7:43 PM LITA LITA  wrote:

> 退订
>
> <704669...@qq.com.invalid> 于2023年2月24日周五 07:58写道:
>
> > 退订
> >
> >
>


Re: 使用flink sql 将kafka的数据同步到mysql无法删除。

2023-02-25 Thread Jane Chan
Hi,

原问题中 String 变量 kafka 和 mysql 赋值反了, 以及能提供下所使用的 flink 版本吗, 我使用 1.16.1 没有复现此问题

payload

{
  "before": {
"rowid": "f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d",
"63f73b332e77497da91286f0": "Jerry",
"63f73b3f2e77497da91286fb": "mobile number",
"63f73b3f2e77497da91286fc": "telephone number"
  },
  "after": null,

"source": {...},
  "op": "d",
  "ts_ms": 1677342340042,
  "transaction": null
}

flink sql

Flink SQL> insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as
`名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `手机` from (select `rowid` as `rowID`,
`63f73b332e77497da91286f0` as `名称`,`63f73b3f2e77497da91286fb` as `手机`,
`63f73b3f2e77497da91286fc` as `座机` from `电话_1`) as t_1;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8490c9530d3a97e73aeedfe9745f2fe3

mysql output

mysql> select * from 电话;
+
--++---+--+
| rowID| 名称   | 手机  | 座机
  |
+
--++---+--+
| f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Tom| mobile number | telephone
number |
+
--++---+--+
1 row in set (0.00 sec)

mysql> select * from 电话;
+
--++---+--+
| rowID| 名称   | 手机  | 座机
  |
+
--++---+--+
| f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Jerry  | mobile number | telephone
number |
+
--++---+--+
1 row in set (0.00 sec)

mysql> select * from 电话;
Empty set (0.00 sec)

Best,
Jane

On Fri, Feb 24, 2023 at 2:21 PM 陈佳豪  wrote:

> -建表语法如下
> String kafka = "CREATE TABLE `电话` " +
> "(`rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机`
> VARCHAR(255),  " +
> "  PRIMARY KEY (`rowID`) NOT ENFORCED  ) " +
> " WITH " +
> "('connector' = 'jdbc',   " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver',   " +
> " 'url' = 'jdbc:mysql://XX:6506/meihua_test',  " +
> "  'username' = 'root',  " +
> "  'password' = '123456',  " +
> "  'table-name' = '电话'  )";
>
> String mysql = "CREATE TABLE `电话_1` " +
> "(`rowid` VARCHAR(100)," +
> "`63f73b332e77497da91286f0` VARCHAR(100)," +
> "`63f73b3f2e77497da91286fb` VARCHAR(100)," +
> "`63f73b3f2e77497da91286fc` VARCHAR(100)," +
> "`op` STRING ," +
> " PRIMARY KEY (rowid) NOT ENFORCED )" +
> " WITH " +
> "( 'connector' = 'kafka', " +
> "'topic' =
> 'sz_worksheet-63f82984f3ec743e45b0d561-63f73b332e77497da91286ef'," +
> " 'properties.bootstrap.servers' = 'XX:9092'," +
> " 'scan.startup.mode' = 'earliest-offset', " +
> "'format' = 'debezium-json' )";
> -执行语句如下
> String insert = "insert into `电话` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
> " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as
> `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机`
> from `电话_1` ) as t_1";
> -操作数据如下
>
>
> String insert = "insert into `电话` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
> " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as
> `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机`
> from `电话_1` ) as t_1";
> -执行语句如下
> {
> "op":"d",
> "before":{
> "rowid":"f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d"
> },
> "after":null
> }
> 现在的结论是可以新增和修改,但是无法删除。难道insert into这个语句搞不定吗? 走的debezuim json序列化的格式。
> 各位大佬帮看下 谢谢。


Re: 退订

2023-02-23 Thread Jane Chan
Please refer to https://flink.apache.org/community/ and send the email to
user-unsubscr...@flink.apache.org

Best,
Jane

On Fri, Feb 24, 2023 at 3:17 PM zhangjunjie  wrote:

> 退订
>
>
>


Re: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API

2023-02-22 Thread Jane Chan
Hi Daniel,

Thanks for reporting this issue. According to the FLIP [1], this should be
a bug, and I've created a Jira ticket [2] to track this.

> We will introduce a declarative concept to `BuiltInFunctionDefinitions`
> and `FlinkSqlOperatorTable` that maintain a function name + version to
> instance mapping.
>

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI
[2] https://issues.apache.org/jira/browse/FLINK-31182

Best,
Jane

On Wed, Feb 22, 2023 at 9:55 AM yuxia  wrote:

> Hi, Daniel Henneberger.
> Thanks for reporting.  It seems a bug to me. Could you please help create
> a Jira[1] for it?
> As a workaround, is it possible not to use UNNEST? May be you can try to
> use EXPLODE function for the Flink planner will rewrites UNNEST to explode
> function
> in implementation[2].
>
> [1] https://issues.apache.org/jira/projects/FLINK/issues/
> [2]
> https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.scala
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Daniel Henneberger" 
> *收件人: *"User" 
> *发送时间: *星期三, 2023年 2 月 22日 上午 5:35:02
> *主题: *Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API
>
> Dear Apache Flink community,
>
> I could use some help with a serialization issue I'm having while using
> the Table API. Specifically, I'm trying to deserialize a serialized
> CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation.
> It seems that the CompilePlan deserializer isn't looking up any functions
> in the BuiltInFunctionDefinitions class, which is causing the
> de-serialization to fail.
>
> Do any of you have experience with this issue or know of a workaround for
> serializing a Table API plan?
>
> Below is code to replicate.
>
> Thanks,
> Daniel Henneberger
>
> private void test() {
>   EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inStreamingMode().build();
>   TableEnvironment tEnv = TableEnvironment.create(settings);
>
>   // Create a table of values
>   Table table = tEnv.fromValues(createNestedDatatype(),
>   Row.of(List.of(Row.of("nested")), "name"));
>   tEnv.createTemporaryView("table1", table);
>
>   // Invoke the unnest operation
>   Table unnested = tEnv.sqlQuery("SELECT name, nested\n"
>   + "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)");
>
>   StatementSet statementSet = tEnv.createStatementSet();
>   statementSet.addInsert(TableDescriptor.forConnector("print").build(), 
> unnested);
>
>   // Serialize the plan
>   CompiledPlan plan = statementSet.compilePlan();
>   String json = plan.asJsonString();
>
>   // Attempt to load the plan
>   // This fails with the error 'Could not resolve internal system function 
> '$UNNEST_ROWS$1'. This is a bug, please file an issue.'
>   CompiledPlan plan2 = tEnv.loadPlan(PlanReference.fromJsonString(json));
>   plan2.execute().print();
> }
>
> private DataType createNestedDatatype() {
>   return DataTypes.ROW(
>   DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.ROW(
>   DataTypes.FIELD("nested", DataTypes.STRING())
>   ))),
>   DataTypes.FIELD("name", DataTypes.STRING()));
> }
>
>
>


Re: regular join每条流单独设置ttl

2023-02-14 Thread Jane Chan
你好,

目前 Flink SQL 还不支持为每条流单独设置 state TTL, 不过社区计划支持这个功能, 最近就会有 FLIP 提出, 也欢迎参与讨论.

Best regards,
Jane

On Wed, Feb 15, 2023 at 11:13 AM Jason_H  wrote:

> 大家好,
> 我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular
> join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


Re: [ANNOUNCE] Apache Flink Table Store 0.2.0 released

2022-08-28 Thread Jane Chan
Congrats! Thanks Jingsong for driving this release, and thanks to all
contributors!

Best,
Jane

On Mon, Aug 29, 2022 at 11:35 AM Jingsong Li  wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink Table Store 0.2.0.
>
> Apache Flink Table Store is a unified storage to build dynamic tables
> for both streaming and batch processing in Flink, supporting
> high-speed data ingestion and timely data query.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Table Store can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20table-store
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351570
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Jingsong Lee
>


Re: [ANNOUNCE] Apache Flink Table Store 0.2.0 released

2022-08-28 Thread Jane Chan
Congrats! Thanks Jingsong for driving this release, and thanks to all
contributors!

Best,
Jane

On Mon, Aug 29, 2022 at 11:35 AM Jingsong Li  wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink Table Store 0.2.0.
>
> Apache Flink Table Store is a unified storage to build dynamic tables
> for both streaming and batch processing in Flink, supporting
> high-speed data ingestion and timely data query.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Table Store can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20table-store
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351570
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Jingsong Lee
>