Re: pyflink execute_insert问题求解答

2020-09-09 文章 Dian Fu
这两个看起来是同一个问题,1.11是支持的,可以看一下TableEnvironment.create_statement_set(): https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/table_environment.html#executeexplain-jobs > 在 2020年9月9日,上午11:31,whh_960101 写道: > > 您好,我使用pyflink时的代码如下,有如下两个问题: > 1. > source = st_

localtimestamp??current_timestamp????mysql????????

2020-09-09 文章 xuzh
Dear all??   > CREATE TABLE sink ( >   id INT, >   prod_nm STRING, >  dtm timestamp, >  primary key(id)  NOT ENFORCED --  '??' > )  > WITH ( >     'connector' = 'jdbc', >     'url' = 'jdbc:mysql://10.0.0.0:3306/rs_report?useUnicode=true&characterEncoding=UTF-8', >     'table-name

flink sql 1.11.1 insert data to hive from kafka split into two jobs

2020-09-09 文章 大罗
Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下: 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type" :2}, {"pid":"a", "val":1, "data_type": 1, "app_type" :2}] 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1, "app_type" :2} 把runDataStream输出到re

Re: pyflink execute_insert问题求解答

2020-09-09 文章 nicholasjiang
1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表? 针对Multiple Sink的话推荐通过Statement Set方式: statement_set = TableEnvironment.create_statement_set() main_table = source.select("...") sub_table = source.select("...") statement_set.add_insert("main_table", main_table) statement_set.add_insert("sub

Re: flink sql client 如何同时执行多条 sql 语句

2020-09-09 文章 nicholasjiang
目前Flink SQL Client不支持同时执行多条SQL语句,可以写个程序读SQL文件调用Table API的createStatementSet方式同时执行多条SQL语句。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 1.11.1 如何只插入一个列簇的数据到hbase

2020-09-09 文章 大罗
Hi,我遇到一个问题,我在hive catalog里定义一个hbase connector表,如下: CREATE TABLE t_sems_second ( rowkey string, status row (val VARCHAR, dqf VARCHAR), analog row (val VARCHAR, dqf VARCHAR) ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 't_sems_second', 'zookeeper.quorum' = 'dev-hadoop-node-c:2181,

flink-sql????????on kafka??flink table??????select????flink table????????????group id??????

2020-09-09 文章 ??????
??kafka??flink table??select??selectgroup id??

Re:Re: pyflink execute_insert问题求解答

2020-09-09 文章 whh_960101
问题1: 我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api 例如: if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗 .. 问题2: full_outer_join(right, join_predicate)[source]¶ Joins two Table. Similar to a SQL full outer join. The fields of the two j

flink table Kafka 重新连接的问题

2020-09-09 文章 marble.zh...@coinflex.com.INVALID
你好。 当connector连接kafka,如果某个message出现exception时,task就停了, 没有自动重新连接, 看了kafka connector的配置,没有这方面的设置,这个有什么重连机制吗? Thanks. -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink-cdc sink mysql 问题

2020-09-09 文章 杨帅统
公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。 在做测试的时候定义一张cdc源表和一张sink表 CREATE TABLE pvuv_test ( id INT, dt STRING, pv STRING, uv STRING , proc_time AS PROCTIME() --使用维表时需要指定该字段 ) WITH ( 'connector' = 'mysql-cdc', -- 连接器 'hostname' = 'localhost', --mysql地址 'port' = '3306', --

回复: flink-cdc sink mysql 问题

2020-09-09 文章 wdmcode
Hi 杨帅统 MySQL-MySQL数据实时同步也可以使用阿里开源的otter https://github.com/alibaba/otter/ 发件人: 杨帅统 发送时间: Wednesday, September 9, 2020 4:49 PM 收件人: user-zh@flink.apache.org 主题: flink-cdc sink mysql 问题 公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。 在做测试的时候定义一张cdc源表和一张sink表 CREATE TABLE pvuv_test ( id INT,

Re: pyflink execute_insert问题求解答

2020-09-09 文章 Dian Fu
针对问题1: 你的需求是这样的吗:先获取表中字段'data'的值(第一行的值),根据'data'的值,再构造不同的作业逻辑? 针对问题2:现在join不支持两个表的字段名重复,可以看一下JIRA [1],所以目前必须保证两个表的字段名不重复。 [1] https://issues.apache.org/jira/browse/FLINK-18679 > 在 2020年9月9日,下午4:27,whh_960101 写道: > > 问题1: > 我已经生成了一个Table对象main

Re: 问题跟踪

2020-09-09 文章 Dian Fu
因为你需要根据不同'data'的数据,构造不同的作业逻辑,就必然需要执行作业,获取'data'的值,所以你目前的做法是对的。 1.12发布之后,还可以通过以下方式: 1)table.limit(1).to_pandas:可以只取表中的第一条数据 2)table.limit(1).collect(): limit以及collect 1.12会支持。 > 在 2020年9月9日,下午5:28,whh_960101 写道: > > 回答:对的,我的需求是根据'data'的值,再构造不同的作业逻辑,如何实现,求解答!谢谢!

回复:flink1.9.3 on yarn 提交任务问题

2020-09-09 文章 宁吉浩
我使用如下两条命令提交,发现两个任务共使用一个flink-ui 在ui里取消任意一个job,将会使整个集群被取消,这个应该不是正常情况吧? 或者说,是我的提交命令有问题吗? ps: -d 参数也增加过,没作用 nohup /data2/workspace/flink/flink-1.9.3/bin/flink run -m yarn-cluster \ -yn 1 -ys 3 -p 3 -ytm 2048m -ynm test \ -c aa.class aaa.jar \ & nohup /data2/workspace/flink/flink-1.9.3/bin/flink ru

回复:flink1.9.3 on yarn 提交任务问题

2020-09-09 文章 Jun Zhang
这个应该是和你配置的HA有关,你去掉HA配置试试,或者检查一下HA配置 Best  Jun -- 原始邮件 -- 发件人: 宁吉浩

Yarn-Session长时间运行Kerberos问题

2020-09-09 文章 hua mulan
Session长时间运行提交Job会出现Kerberos的Keytab认证失败,这种情况我应该如何排查 2020-09-08 14:58:39,030 INFO org.apache.flink.yarn.YarnResourceManager - Received new container: container_1594117729227_83900_01_39 - Remaining pending container requests: 1 2020-09-08 14:58:39,030 INFO org.apache.fl

sql-client提交报错UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-09-09 文章 kandy.wang
自实现了kudu connector报错: 2020-09-09 18:34:59,442 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL statement. at org.apache.flink.table.client.gateway.local.

Re: sql-client提交报错UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-09-09 文章 Leonard Xu
Hi 这个错误一般是你的query 是upsert的query,没能推断出PK,所以报错了 。 如果是自定义的connector, 应该实现 DynamicTableSink 接口而不是 老的 UpsertStreamTableSink接口, 实现DynamicTableSink接口可以支持在表上定义PK,不用推导。 看这个报错,kudu的connector实现的是 老的UpsertStreamTableSink, 绕过的办法是改写下你的query,让query可以推导出pk。 祝好 Leonard > 在 2020年9月9日,20:27,kandy.wang 写道: >

Re: flink-cdc sink mysql 问题

2020-09-09 文章 Leonard Xu
Hi 这个错误是jar包没有正确地加载,看代码应该没啥问题,添加jar包后需要重启下集群,你测试的时候重启了吗? 祝好 Leonard > 在 2020年9月9日,16:48,杨帅统 写道: > > 公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。 > 在做测试的时候定义一张cdc源表和一张sink表 > CREATE TABLE pvuv_test ( > id INT, > dt STRING, > pv STRING, > uv STRING , > proc_time AS PROCTIME() --使用维表时

Re: Row和RowData的区别

2020-09-09 文章 Danny Chan
Row 是暴露给 DataStream 用户用的,里面可以设置 RowKind,RowData 是 Table 内部的数据结构,在一些场景序列化会有提升,使用 Flink SQL 会直接应用上 RowData,当然高级用户想直接用 RowData 也是可以的,1.11 的新版 connector API 就是将 RowData 暴露给了 connector 开发者。 Best, Danny Chan > 在 2020年9月9日,下午1:51,刘首维 写道: > > Hi all, > > > 请问`org.apache.flink.types.Row`和`org.a

Re: flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

2020-09-09 文章 Leonard Xu
Hi 可以看下贴下你Kafka table的option 和 作业的 checkpoint配置吗? 可以确定的是,用的都是同一个group id,。 如果你没有配置 checkpoint, Flink Kafka consumer 的 enable.auto.commit 默认设置为 false,就不会提交对应group 的offset, 此时你两个作业只是用 group id 确定一个起始消费offset,得到的数据就是一致的。 你可以看看[1][2]里面对这个机制的解释。 Best Leonard [1] https://ci.apache.org/projects/flin

Re: localtimestamp和current_timestamp插入mysql时区错乱

2020-09-09 文章 Leonard Xu
Hi, > 这样插入mysql 后dtm时区是乱的, 应该插入的是当前时间减8个小时的,变成了当前时间减21小时 变成当前时间减21小时这个感觉好奇怪,方便贴下完整的代码和数据吗? Best Leonard

?????? flink????????GMV,??????????????????????????????

2020-09-09 文章 ????????
??1? ??1.11+??sqlupdate_beforeupdate_after? ???Flinkafter??before?? . --  -- ??:

flink????spring

2020-09-09 文章 1115098...@qq.com
??spring boot??flink??spring boot??flink??spring boot

退订

2020-09-09 文章 harveyer
退订, 谢谢

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 文章 lec ssmi
直接根据订单的id进行retract(使用last_value group by id ),然后sum就可以了吧。只要你设置的状态保存期是的大于你订单金额的冷却时间就行。 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道: > 请问第1点是有实际的案例使用了么? > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > 谢谢. > > > > > --

答复: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-09 文章 范超
Transient 都不参与序列化了,怎么可能从checkopont里恢复? -邮件原件- 发件人: Yun Tang [mailto:myas...@live.com] 发送时间: 2020年9月7日 星期一 12:50 收件人: user-zh@flink.apache.org 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1] 可以排查的思路 1. 你的state是否开

Flink 1.11 jdbc查pg失败

2020-09-09 文章 Jimmy Zhang
flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗 | Best, Jimmy | Signature is customized by Netease Mail Master

回复: Flink 1.11 jdbc查pg失败

2020-09-09 文章 wdmcode
Hi Jimmy 给字段加双引号试试呢 Select “F1”,”F2” from xxx.xxx; 发件人: Jimmy Zhang 发送时间: Thursday, September 10, 2020 9:41 AM 收件人: user-zh@flink.apache.org 主题: Flink 1.11 jdbc查pg失败 flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗 Best, Jimmy Signature is customized by Netease Mail Master

Re: flink集成spring

2020-09-09 文章 Jeff Zhang
可以看看这个zeppelin sdk,,也许适合你 https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh 1115098...@qq.com 于2020年9月10日周四 上午9:09写道: > 大家好,我在将spring boot集成到flink的过程中,遇到很多问题,感觉不太兼容。看官方文档也没有集成spring > boot的介绍,是不是flink设计的时候就没有考虑与spring boot的集成? -- Best Regards Jeff Zhang

退订

2020-09-09 文章 邢明浩
退订,谢谢

??????flink????spring

2020-09-09 文章 ??????
flinkjar | | ?? | | ??15927482...@163.com | ?? ??2020??09??10?? 09:09??1115098...@qq.com ?? ??spring boot??flink??spring boot??

退订

2020-09-09 文章 邢明浩
退订

Re: 退订

2020-09-09 文章 Xingbo Huang
Hi, 退订请发邮件到 user-zh-unsubscr...@flink.apache.org 详细的可以参考 [1] [1] https://flink.apache.org/zh/community.html#section-1 Best, Xingbo 邢明浩 于2020年9月10日周四 上午10:03写道: > 退订

Re: 退订

2020-09-09 文章 Xintong Song
你好, 退订需要发邮件到 user-zh-unsubscr...@flink.apache.org Thank you~ Xintong Song On Thu, Sep 10, 2020 at 10:03 AM 邢明浩 wrote: > 退订

flink 1.11 taskmanager????????????????????????

2020-09-09 文章 Z-Z
Hi ?? flink docker sessiontaskmanager taskmanager.memory.process.size: 5120m taskmanager.memory.jvm-metaspace.size: 1024m taskmanager7.5G??taskmanager INFO  [] - Final TaskExecutor Memory configurat

?????? flink-sql????????on kafka??flink table??????select????flink table????????????group id??????

2020-09-09 文章 ??????
CREATE TABLE ODS_PaymentOrdert (     orderId INT,      memberId INT, orderAmount DECIMAL(10, 2), paymentStatus SMALLINT, orderDate VARCHAR, payDate VARCHAR, paymentIP VARCHAR, orderSrc VARCHAR, channelType SMALLINT, productId SMALLINT,

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 文章 Benchao Li
1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 如果还有自己的binlog格式,也可以自定义format来实现。 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为 1. append / update_after 消息会累加到聚合指标上 2. delete / update_before 消息会从聚合指标上进行retract [1] https://ci.apache.org/projects/flink/fli

Flink 1.5.0 savepoint 失败

2020-09-09 文章 likai
hi all. 我使用flink 1.5.0 在触发 savepoint失败。 共享目录:/data/emr_flink_savepoint_share/ 触发命令:bin/flink savepoint feaab3ec9031bce4eab0b677693ab9f0 file:///data/emr_flink_savepoint_share Hadoop conf 默认文件系统是 hdfs://flink-hdfs 报错: Caused by: java.lang.Exception: Could not materialize checkpoint 9381 for oper

Re: flink sql 1.11.1 insert data to hive from kafka split into two jobs

2020-09-09 文章 Qishang
Hi. 大罗 试一下这个方法 org.apache.flink.table.api.StatementSet#execute ss.execute(); 大罗 于2020年9月9日周三 下午3:13写道: > Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下: > > 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type" > :2}, > {"pid":"a", "val":1, "data_type": 1, "app_type" :

Re: Flink Plan Visualizer

2020-09-09 文章 zilong xiao
有可以画job graph的方法吗? 黄潇 于2020年9月8日周二 下午8:32写道: > Hi, > > 据我所知,使用 env.getExecutionPlan() 得到的 json 字符串[1]只包含 stream graph > 的信息,所以这样画出来的图是 stream graph。 > 在 job 提交之后的 web ui 中可以看到经过 operator chain 之后的图信息。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/execution_plans.html

Re: Flink Plan Visualizer

2020-09-09 文章 JasonLee
hi Flink plan visualizer 应该是只能画stream graph stream graph 是一个逻辑上的 DAG 图 你把任务提交到集群上 在 Flink WEB UI 上面就可以看到 job graph 了 stream graph 和 job graph 的区别是 job graph 优化了 operator chain job graph 是在调用 env.execute 方法之后才生成的 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

??????flink 1.11 taskmanager????????????????????????

2020-09-09 文章 Z-Z
rocksdb?? --  -- ??: "Z-Z"

Re: flink 1.11 taskmanager实际占用内存超出配置太多

2020-09-09 文章 Xintong Song
Flink 是无法完全控制所有内存开销的,这是 java 应用程序自身特点决定的。 - 对于 java heap/direct/metaspace 等 JVM 可以控制的内存,Flink 会设置 JVM 参数控制其不能超用 - 对于 Flink 自己维护的固定大小的缓冲池,如 network buffer pool, managed memory 等,Flink 也会 严格限制申请内存的大小。 - 对于其他一些开销,如 JVM 的线程栈、用户代码及第三方依赖中的 native 方法等,Flink 是无法限制这部分内存使用的大小的,只能是根据配置从总内存中预留出一部分来。如果预留的不够多,就回

退订

2020-09-09 文章 程 婕
退订,谢谢

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 文章 lec ssmi
上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗? 感觉底层和 last_value() group by id是一样的。 Benchao Li 于2020年9月10日周四 上午10:34写道: > > 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 > 如果还有自己的binlog格式,也可以自定义format来实现。 > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可

Re: 退订

2020-09-09 文章 黄潇
Hi, 退订需要发邮件到 user-zh-unsubscr...@flink.apache.org 可以参考 https://flink.apache.org/zh/community.html#section-1 程 婕 于2020年9月10日周四 下午1:56写道: > 退订,谢谢 > > >

Flink 1.11.1 job申请到PhysicalSlot的ResourceProfile问题

2020-09-09 文章 黄潇
Hi all, 最近在看Flink资源管理部分的源码,Flink 1.11.1 版本在本地IDE调试的时候发现 slotPool.requestNewAllocatedSlot 方法申请到 PhysicalSlot 的 ResourceProfile 信息比较奇怪(见图)。 请问这是因为本地IDE运行的原因,还是说缺少某些配置信息? Thanks!