Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-10 文章 Leonard Xu
k-docs-master/zh/dev/event_time.html> > - > 昨天的完整代码是: > https://paste.ubuntu.com/p/9JsFDKC5V8/ > > > ~!!! > > > > > > > > > > > > 在 2020-12-10 12:02:31,"Leonard Xu"

Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 Leonard Xu
你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库, 也可以说是OutputFormat不会参与cp, 所以at-least-once都不一定能保证。 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗? 祝好, Leonard > 在 2020年12月10日,11:22,jie

Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 文章 Leonard Xu
Hi, 补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark, 需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1] 给出文档中省略的watermark生成部分code: // 老版本 //Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AscendingTimestampExtractor()

Re: FlinkSQL中创建表,视图等一些元数据信息都是存放在什么地方(没看到像Hive那样使用mysql存储元数据信息)

2020-12-07 文章 Leonard Xu
Hi, Flink 的元数据存放在catalog中的,也支持多种catalog(embedded, HIve,JDBC,自定义catalog),默认Flink使用内置的GenericInMemoryCatalog,这个是in memory的catalog,元数据都在这里,生产环境上可以使用HiveCatalog 祝好 Leonard [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/catalogs.html

Re: ScanTableSource 为什么不支持SupportsFilterPushDown的问题

2020-12-07 文章 Leonard Xu
Hi, 在1.11中,planner 并没有支持下表中的各种PushDown, 所以这里做了check,这是planner层面不支持的。在1.12里,planner层面已经支持了这些PushDown,所以这些check都没有了,用户可以自定义 connector 并实现各种PushDown,比如,1.12中已经支持了kafka source上的watermarkPushdown。 因此,有这类需求建议基于1.12开发。 祝好, Leonard > > Hi: > 由于业务需要,我想在flink1.11.2中自定义一个ScanTableSource,

Re: flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交

2020-12-07 文章 Leonard Xu
> 你的意思是 自己实现sink 提交的过程中抛出所有异常并且rollback,是可以做到端对端精确一次的; 不是,我是在回答你Flink 在写入关系数据库是可以实现端到端一致的,需要的方式需要实现两阶段提交,这个思路是可行的。你说的简单地回滚是没法保证exactly-once语义的。 > hdxg1101300...@163.com > > 发件人: Leonard Xu > 发送时间: 2020-12-07 17:00 > 收件人: user-zh > 主题: Re: flink 使用关系型数据库的默认事务是否可以做到端对端的

Re: flink sql实时计算UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-07 文章 Leonard Xu
Hi, 你是不是没有订阅flink的用户邮件列表,所以有些邮件你看不到。 你可以发送任意内容的邮件到user-zh-subscr...@flink.apache.org <mailto:user-zh-subscr...@flink.apache.org> 即可订阅用户邮件列表,订阅后邮件列表里大家的提问和回答你都可以看见了。 [1] https://flink.apache.org/zh/community.html > 在 2020年12月7日,16:50,Leonard Xu 写道: > > Hi, > >>

Re: flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交

2020-12-07 文章 Leonard Xu
Hi, > 在 2020年12月7日,16:46,hdxg1101300...@163.com 写道: > >flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交; >自己实现sink开启数据库事务,遇到错误回滚并抛出异常,是否可以实现数据精确一次 Flink 写入关系型数据库是可以做到端到端的一致性的,默认是不支持的,需要实现两阶段提交,按照你的思路是可行的。另外社区也有人在做这个feature[1],已经有PR了,你可以参考,预计会在1.13里支持。 祝好, Leonard [1]

Re: flink sql实时计算UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-07 文章 Leonard Xu
Hi, > 在 2020年12月7日,16:41,爱成绕指柔 <1194803...@qq.com> 写道: > > Exception in thread "main" org.apache.flink.table.api.TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. 这个错误是在query 没法推断出主键,而 hbase sink 是一个upsert sink, 需要PK来实现upsert语义。

Re: flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-02 文章 Leonard Xu
Hi,yanzi 可以贴下完整的报错信息吗? 祝好, Leonard Xu > 在 2020年12月3日,10:36,yanzi 写道: > > 使用flinksql 1.11版本,运行计算好的指标落入mysql,长时间没有数据会报错,导致任务会失败。 > 针对:https://issues.apache.org/jira/browse/FLINK-16681,已经修复此问题,但是我们使用1.11最新版本,运行一段时间后,发现还是会有此问题,如何解决 > > > > -- > Sent from: ht

Re: 【Flink SQL】无法启动 env.yaml

2020-12-01 文章 Leonard Xu
Hi, 李轲 这是因为yml只支持1.10之前老的connector,写法是connector.type=‘filesystem’, 1.11之后的新connector都是 connetor=‘filesystem’, 除了简化了写法外,前者的工厂方法和后者的也不一样, 所以通过yml定义的新的connector是不能被老的工厂 SPI 发现的。而在yml中定义表从1.11开始就是不推荐了,因为已经支持了用DDL这种纯SQL的方式定义表。 推荐你可以拉起sql-client后,用DDL的方式建表 祝好 Leonard > 在 2020年12月1日,21:43,李轲 写道:

Re: 分组查询时,select的字段是否一定要都在group by中吗?

2020-11-30 文章 Leonard Xu
Hi, bulterman 你的分析是对的,group by pk的query是可以优化到筛选全部列的,这可以是个优化点,只是flink 现在还没有做, 和 Flink pk的 NOT ENFORCED 并没有关系,NOT NEOFRCED是说Flink不持有数据,不像数据库持有数据可以在读取时做校验。 个人感觉这是个小的优化点,如果很急需可以考虑在社区开个issue. 祝好, Leonard Xu > 在 2020年12月1日,13:40,bulterman <15618338...@163.com> 写道: > > Hi ALL, >

Re: canal-json 分库分表场景应用

2020-11-26 文章 Leonard Xu
Hi, air23 国内用户用Canal-json还是很多的,我建了个issue 来支持, 你可以关注下。 > 你好 请问这个debezium-json 这个value.source.table 功能 在1.12的canal-json会实现吗, > 看到canal-json代码里面 好像是有这部分代码。 Best, Leonard [1]https://issues.apache.org/jira/browse/FLINK-20385 > > > > > >

Re: 发现flinksql写hive比写hdfs慢很多

2020-11-26 文章 Leonard Xu
Hi Hive 使用的 BulkWriter 目前有两个问题,一个是支持的数据类型没有MR writer的全,第二个是,BulkWriter 支持parquet和orc,但只支持orc的最新版本,写入低版本有兼容性问题 主要因为这两个问题,所以默认使用MR writer。 祝好, Leonard > 在 2020年11月26日,20:05,admin <17626017...@163.com> 写道: > > BulkWriter

Re: flink1.11编译失败

2020-11-26 文章 Leonard Xu
HI 这两个类是 codegen 生成的,所以源码里没有,你编译下flink-sql-parser模块就会自动生成这几个类。 祝好, Leonard > 在 2020年11月26日,19:43,zhy 写道: > > hi、flink1.11 release source编译为什么会缺失类文件,去github仓库也没找到,如何解决这个问题~ > > > import org.apache.flink.sql.parser.impl.ParseException; > > import

Re: 发现flinksql写hive比写hdfs慢很多

2020-11-26 文章 Leonard Xu
Hi, admin 谢谢验证, > 在 2020年11月26日,17:43,admin <17626017...@163.com> 写道: > > 默认true的情况下 两个任务同时写30分钟,写hive的任务就已经落后了3分钟 此时,写hive用MR writer,写HDFS只支持Flink BulkWriter, 如果单独测试sink的话,Flink BulkWriter 应该不止10%的性能提升。 > false的情况,两个写30多分钟,差异不大 false时,两个作业都用 Flink BulkWriter,差异是不大的。 另外

Re: 发现flinksql写hive比写hdfs慢很多

2020-11-26 文章 Leonard Xu
Hi, admin 结合这个 issue 和你的对比结果, 我觉得应该是这个bug,这个问题在最新的分支已经修复,今天社区cut branch了,你可以帮忙在1.12的分支或master的分支上验证下吗? 祝好, Leonard [1] https://github.com/apache/flink/tree/release-1.12 [2] https://github.com/apache/flink/tree/master

Re: flink1.11.2 一直不入hbase问题

2020-11-26 文章 Leonard Xu
HI 图挂了,Flink 1.11.x 版本hbase的 identifier 都是 hbase-1.4, 不是 hbase-1.4.3, 应该会报错才对,可以确认下日志里也没有报错吗? 祝好 Leonard > 在 2020年11月26日,14:05,simon 写道: > > hbase-1.4.3

Re: flink sql 连接mysql 无数据输出

2020-11-25 文章 Leonard Xu
Hi 调用了executeSql,作业就已经执行了,不用再调用下面的这个执行方法,看你已经配置了cp,确认下mysql的参数是否正确。 > 在 2020年11月25日,18:42,冯草纸 写道: > > env.execute("sql test"); > // bsTableEnv.execute("sql test");

Re: 用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

2020-11-24 文章 Leonard Xu
Hi, 可以去社区jira上建个issue吗?如果有问题在1.11的版本里也需要修复的。 祝好 Leonard [1] https://issues.apache.org/jira/projects/FLINK/issues/ > 在 2020年11月24日,01:03,macdoor 写道: > > 自己回答一下,供其他人参考。 > > 换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2 > 的一个bug,1.12应该已经改正了 > > > > -- > Sent from:

Re: flink读mysql分库分表

2020-11-23 文章 Leonard Xu
Hi, 我没理解错的话你是想一次读出所有表(分库分表)的所有数据, 用一个DDL建表语句搞定,目前还不支持 祝好, Leonard > 在 2020年11月23日,17:22,酷酷的浑蛋 写道: > > > > flink读mysql分库分表可以自动识别吗? 还是只能一个一个读? >

Re: flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

2020-11-16 文章 Leonard Xu
Hi, 你确定是在Flink SQL 里使用 upsert 语法? 我理解是不支持的 另外你flink里声明connector DDL 中的主键应该和你在Mysql表的主键一致。 祝好 Leonard > 在 2020年11月17日,09:12,鱼子酱 <384939...@qq.com> 写道: > > upsert

Re: Flink未来会弃用TableSourceFactory吗

2020-11-16 文章 Leonard Xu
Hi, 据我了解会弃用的,新的connector都会用DynamicTableSourceFactory,一般稳定一两个版本后社区会弃用, 另外这个是比较底层的实现,sql用户应该感知不到,如果有自定义connector的开发都建议用DynamicTableSourceFactory。 祝好 Leonard Xu > 在 2020年11月16日,19:54,Luna Wong 写道: > > FLIP-95都实现后有了DynamicTableSourceFactory那么TableSourceFactory会弃用吗?

Re: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()

2020-11-02 文章 Leonard Xu
Hi, 你好,你用的版本是什么? 在1.12前, temporal join 一个view 是不支持的, 只能直接关联一个可以lookup的table 祝好 Leonard > 在 2020年11月2日,19:40,史 正超 写道: > > Source表定义的proctime,在view中select后,不能用于Temporal table join. > sql如下: > ```sql > > CREATE TABLE SourceA ( >id STRING, >procTime AS PROCTIME() > ) WITH (

Re: StreamSQL 进行窗口计算的疑问

2020-10-25 文章 Leonard Xu
Hi 看起来这个query应该是没问题的,语法也是flink支持的,能贴点结果数据吗?可以简化下query能复现就行。 祝好 Leonard

Re: 请教 table /sql API, 窗口frist/last value

2020-10-25 文章 Leonard Xu
Hi > 在 2020年10月21日,16:27,marble.zh...@coinflex.com.invalid > 写道: > > select marketCode as market_code, > first_value(matchedPrice) over (partition by marketCode ORDER BY > transTime) as vopen > from TickData > where action = 'OrderMatched' and side = 'BUY' > group by marketCode,

Re: flink sql添加 null值 字段

2020-10-14 文章 Leonard Xu
Hi 你需要指定下数据类型,可以用: cast(null as varchar) as person_uuid Best Leonard > 在 2020年10月15日,12:18,Dream-底限 写道: > > hi、 > 我现在使用flink sql完成如下sql语句,但是程序无法运行,请问这个功能要怎么实现: > select null as person_uuid from tablename > 抛出异常: > org.apache.flink.client.program.ProgramInvocationException: The main method >

Re: flink sink kafka过慢问题

2020-10-13 文章 Leonard Xu
Hi 看起来你经确定了是在写入kafka时过慢导致了高峰期sink反压, 生产环境上1万的tps不算高的,可以先确定下是否是kafka集群写入瓶颈问题,kafka 自带的 perf 脚本可以测试下6个分区的topic写入的能力,测试时数据大小和tpoic配置可以和生产一致,如果是kafka写入瓶颈问题,那就需要增加分区,对应修改flink作业的写入并发。 另外,如果开启exactly-once配置,写入速度会慢一些,没有特别的业务需求可以用at-least-once. 祝好 Leonard > 在 2020年10月13日,19:38,xyq 写道: > > kafka

Re: flink点查时态表支持子查询

2020-10-13 文章 Leonard Xu
Hi, 我理解网络开销更多来自于当前的lookup实现每次都需要访问外部系统,如果能做一些cache机制,这样能省掉较多的开销。 你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。 > 在 2020年10月13日,10:03,Dream-底限 写道: > > hi、 >

Re: flink sql 更新mysql字段

2020-09-28 文章 Leonard Xu
Hi Insert 到指定字段是个通用的需求,社区已经有一个issue[1] 在跟踪了,你可以关注下 祝好 Leonard [1] https://issues.apache.org/jira/browse/FLINK-18726 > 在 2020年9月28日,17:46,lemon 写道: > > hi各位: > 请问一下,如果mysql表中有20个字段,现在有多个insert into的语句分别更新指定字段,即同一条记录可能有多个insert语句去分别更新不同字段 >

Re: sql-cli执行sql报错

2020-09-28 文章 Leonard Xu
Hi benchao的回复是的对的, 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > 相关lib包: > flink-connector-kafka_2.12-1.10.2.jar > kafka-clients-0.11.0.3.jar 祝好 Leonard

Re: Field types of query result and registered TableSink 'table' do not match.

2020-09-28 文章 Leonard Xu
Hi 报错信息是query的schema和sink table 的schema信息不匹配, > tumble_start(proctimeField, INTERVAL '10' SECOND) as tum 是保留时间属性的,对应的类型是TimeIndicatorTypeInfo, 如果要转到TIMSTAMP,你可以在输出前, cast下tum字段的类型到TIMESTAMP(3) 祝好 Leonard

Re: Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失

2020-09-28 文章 Leonard Xu
> 在 2020年9月15日,16:52,LittleFall <1578166...@qq.com> 写道: > > 谢谢,请问有相关的 issue 链接吗 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ To @LItteFall : 没有对应的issue,因为是在修复changlog issue[1]时在这个issue里一起修复的,代码可以看下TableBufferReducedStatementExecutor里reduceBuffer保证是对同key上得不同操作顺序执行的。

Re: flink在校验DDL属性是否合格时候代码在哪里?

2020-09-28 文章 Leonard Xu
HI > 在 2020年9月26日,13:26,kcz <573693...@qq.com> 写道: > > 自己写类似的,有太多的if 嵌套了,想学习下,模仿下。 DDL的属性是指WITH里面的属性吗?这个是各个connector校验的,具体的代码在可以先看下各个connector的实现,参考: DynamicTableSourceFactory DynamicTableSinkFactory 的实现类, 比如:JdbcDynamicTableFactory 祝好 Leonard

Re: flink sql 1.11.2 jdbc connector 按月分表读取

2020-09-28 文章 Leonard Xu
Hi > 是我这边建issue不? > > 这里还发现一个问题 select count(*) from mysql_table 不能执行。 > 是的,用户都可以在JIRA[1]上建issue的。 不能执行是报错吗?可以把错误信息贴下吗? 祝好 Leonard [1]https://issues.apache.org/jira/projects/FLINK/issues

Re: 查询hbase sink结果表,有时查到数据,有时查不到

2020-09-26 文章 Leonard Xu
会被框架推断后省略的,这是upsert sink里的一个小优化,但是upsert sink是能收到DELETE消息的,根据query的changlog推导,部分query会产生DELETE消息的,所以upsert sink会处理 INSERT,UPDATE_AFTER, DELETE三种消息,如果有DELETE消息就会出现查询时刚好被删除的情况。 祝好 Leonard Xu

Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-24 文章 Leonard Xu
Hi >看debug日志99%是CalcMergeRule , 我看blink用的是FlinkCalcMergeRule , > 在matches方法里加了些对none-deterministic表达式的过滤,, > 于是我将CalcMergeRule替换成FlinkCalcMergeRule, 并在FlinkRuleSets里做了更新 , > 重跑后debug日志是99%是更新过的FlinkCalcMergeRule 虽然debug日志看是CalcMergeRule一直在触发,但替换CalcMergeRule后也没有改变, 推测是其他rule引起的。 有特别的需要要使用old

Re: 查询hbase sink结果表,有时查到数据,有时查不到

2020-09-24 文章 Leonard Xu
Hi > 通过sql查hbase时,有时查到数据,有时候查不到数据。是不是group操作,会有下游算子 发送撤回消息,导致在delete > hbase的某条rowkey数据,导致客户端查不到数据? > 我理解 hbase sink 应该是upsert数据吧。会不会先delete 再put 导致这样的现象 ? 是的,group by 算子会像下游 hbase sink发retract消息,hbase sink处理retract消息的实现就是先delete再insert,所以去查hbase的时候就会碰到你说的有时查不到的情况。 祝好 Leonard

Re: Flink-1.11 Table API &符号 语法问题

2020-09-21 文章 Leonard Xu
Hi, 可以的,我看了下,你可以把你异常和可以复现的代码贴下吗? 祝好, Leonard > 在 2020年9月22日,09:44,nashcen <2415370...@qq.com> 写道: > > 语法提示错误,就没有运行。在你们的IDEA里面,1.11 Table API 的& 写法,是否显示正常? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink-1.11 Table API &符号 语法问题

2020-09-21 文章 Leonard Xu
Hi > > 在Flink-1.11 中,关于Table API的 select、filter方法,官方推荐语法,用$符号替换原来的" > > > > 但是我在IDEA里,按照官方API的示例去写,$符号不生效,这是什么原因? > >

Re: 请教一下Flink和主流数据湖集成的情况

2020-09-15 文章 Leonard Xu
Hello Flink 集成iceberg, iceberg 社区应该有 jingsong 和 openin <mailto:open...@gmail.com>x 在做,可以直接问下他们。 祝好 Leonard Xu > 在 2020年9月14日,16:52,dixingxing85 写道: > > 谢谢,是的是有iceberg的sink。我看之信在做flink读取iceberg这块的工作,主要想知道下进展和社区未来会重点支持哪个数据湖框架 > > Sent from my iPhone > >> On Se

Re: Flink sql权限

2020-09-15 文章 Leonard Xu
Hi, 据我所知,目前是不支持的,社区没找到对应的issue, 这块短期内应该没计划整,CC了熟悉这块的小伙伴。 祝好 Leonard Xu > 在 2020年9月11日,14:33,163 写道: > > > 请问,flink sql支持元数据的权限校验吗?例如使用hive catalog时,支持hive的权限检查?如果目前不支持,未来是否会考虑? >

Re: flink table Kafka 重新连接的问题

2020-09-15 文章 Leonard Xu
HI > 在 2020年9月9日,16:37,marble.zh...@coinflex.com.invalid > 写道: > > 如果某个message出现exception时 table的connector, 你用的format是什么?一般这种某条message解析异常引起的失败都是可以跳过,比如配置下json.ignore-parse-errors(json) 或 csv.ignore-parse-errors(csv)。 [1]

Re: flink-cdc sink mysql 问题

2020-09-14 文章 Leonard Xu
陈韬说的是对的,cdc connector只有source的, 你写入的是mysql表,需要加下jdbc connector 的依赖 祝好 Leonard > >> 2020年9月10日 下午3:54,杨帅统 写道: >> >> pvuv_test_back >

Re: localtimestamp和current_timestamp插入mysql时区错乱

2020-09-10 文章 Leonard Xu
Hi > insert into sk > values(localtimestamp,current_timestamp,localtimestamp,current_timestamp); 1. 你flink里声明的字段类型和数据库的类型不匹配,需要保持一致,数据库里是varchar,flink是timestamp,完整类型映射可以参考[1] 2. 你插入的两个字段(ocaltimestamp,current_timestamp)的值可以贴出来看看? Best Leonard

Re: localtimestamp和current_timestamp插入mysql时区错乱

2020-09-09 文章 Leonard Xu
Hi, > 这样插入mysql 后dtm时区是乱的, 应该插入的是当前时间减8个小时的,变成了当前时间减21小时 变成当前时间减21小时这个感觉好奇怪,方便贴下完整的代码和数据吗? Best Leonard

Re: flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

2020-09-09 文章 Leonard Xu
Hi 可以看下贴下你Kafka table的option 和 作业的 checkpoint配置吗? 可以确定的是,用的都是同一个group id,。 如果你没有配置 checkpoint, Flink Kafka consumer 的 enable.auto.commit 默认设置为 false,就不会提交对应group 的offset, 此时你两个作业只是用 group id 确定一个起始消费offset,得到的数据就是一致的。 你可以看看[1][2]里面对这个机制的解释。 Best Leonard [1]

Re: flink-cdc sink mysql 问题

2020-09-09 文章 Leonard Xu
Hi 这个错误是jar包没有正确地加载,看代码应该没啥问题,添加jar包后需要重启下集群,你测试的时候重启了吗? 祝好 Leonard > 在 2020年9月9日,16:48,杨帅统 写道: > > 公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。 > 在做测试的时候定义一张cdc源表和一张sink表 > CREATE TABLE pvuv_test ( > id INT, > dt STRING, > pv STRING, > uv STRING , > proc_time AS PROCTIME()

Re: sql-client提交报错UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-09-09 文章 Leonard Xu
Hi 这个错误一般是你的query 是upsert的query,没能推断出PK,所以报错了 。 如果是自定义的connector, 应该实现 DynamicTableSink 接口而不是 老的 UpsertStreamTableSink接口, 实现DynamicTableSink接口可以支持在表上定义PK,不用推导。 看这个报错,kudu的connector实现的是 老的UpsertStreamTableSink, 绕过的办法是改写下你的query,让query可以推导出pk。 祝好 Leonard > 在 2020年9月9日,20:27,kandy.wang 写道: >

Re: 1.11版本,创建视图后,根据表名称获取视图表对象,表名为临时名称的问题

2020-09-07 文章 Leonard Xu
m datagen),本身是没有表名的。createTemporaryView("table1", table); 只是 将 table所对应 queryOperation 对应到了 表名的path(table1)下, 所以 table1 对象拿到的还是同一个query operation。 如果想看创建的临时表,可以用bsTableEnv.listTables()查看。 祝好 Leonard Xu

Re: flink-sql1.11写入mysql重复问题

2020-09-07 文章 Leonard Xu
Hi 这个原因在于flink 和 mysql主键是不一致导致的重复数据,你可以把当前mysql表中设置成flink主键的字段添加个unique key 约束,这样和pk也是等价的。 Best Leonard > 在 2020年9月6日,21:21,凌天荣 <466792...@qq.com> 写道: > > 使用flink-sql1.11时,insert进connect为mysql的flink > table时,由于mysql里面的id是主键自增的,1.11版本upsert模式设置主键在插入的时候又不能指定为mysql的主键id,只能设置别的字段为flink >

Re: flink sql多层view嵌套,字段not found

2020-09-06 文章 Leonard Xu
Hi, linhou 使用方式没问题,这个看起来像是个bug, 你可以去社区[1]建一个issue, issue上贴个可以复现这个问题的demo Best Leonard [1] https://issues.apache.org/jira/projects/FLINK/summary > 在 2020年9月3日,18:40,Lin Hou 写道: > > activity_property

Re: flink1.11连接mysql问题

2020-08-31 文章 Leonard Xu
> 在 2020年8月28日,15:02,酷酷的浑蛋 写道: > > com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet > successfully received from the server was 52,445,041 milliseconds ago. The > last packet sent successfully to the server was 52,445,045 milliseconds ago. > is longer than the server

Re: 来自李国鹏的邮件

2020-08-31 文章 Leonard Xu
> 在 2020年8月31日,15:55,李国鹏 写道: > > 退订 Hi 是指取消订阅邮件吗? 可以发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org取消订阅来自 user-zh@flink.apache.org 邮件列表的邮件 邮件列表的订阅管理,可以参考[1] 祝好, Leonard Xu [1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list <https://flin

Re: flink1.11时间函数

2020-08-29 文章 Leonard Xu
补充下哈, 可能是function这个词翻译后理解问题,功能没有确定性/不确定性这一说法,那个文档里的function都应理解为函数,note里讲的是函数的返回值是确定性的还是不确定性。 祝好 Leonard > 在 2020年8月29日,18:22,Dream-底限 写道: > > 哦哦,好吧,我昨天用NOW的时候直接报错告诉我这是个bug,让我提交issue,我以为这种标示的都是函数功能有问题的 > > Benchao Li 于2020年8月28日周五 下午8:01写道: > >> 不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。 >>

Re: flink sql 计算列不支持comment

2020-08-29 文章 Leonard Xu
Hi, sllence 这是个bug, 看起来是支持计算列时漏掉了comment的解析,我开了个issue去修复[1]. 祝好 Leonard [1] https://issues.apache.org/jira/browse/FLINK-19092 > 在 2020年8月29日,13:37, > 写道: > > Flink版本:1.11.1 > > > > 官网文档中定义如下: > > : > > column_name AS

Re: flink-sql 1.11整合hbase的查询性能问题

2020-08-28 文章 Leonard Xu
Hi > 另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown. 我搜了下社区还没相关的issue,如果是强需求你可以去社区建个issue[1],让社区支持下 第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc Rui Li 确认下 祝好 Leonard [1] https://issues.apache.org/jira/projects/FLINK/summary

Re: flink-sql 1.11整合hbase的查询性能问题

2020-08-27 文章 Leonard Xu
Hi, 大罗 > 在 2020年8月28日,11:57,大罗 写道: > > > 你好,我想提两个问题,关于flink-sql整合hbase,问题列表如下: > 问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。 > > 问题二:flink-sql > 查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT > 问题一: HbaseTableSource 目前没有支持

Re: tidb Binlog 整库同步到 hive

2020-08-27 文章 Leonard Xu
Hi > 多路输出是个动态的,因为是整库的binlog ,不确定库中是否有新增表,作业要触发hive里面去创建表的动作。hive catalog create > table 是否可以在运行中来调用吗? > 程序启动之后生成的jobgraph运行中是不是不可以调整的?如果可以的话,是否有类似的案例可以参考。 用dataStream是会更灵活些,思路也差不多,在运行中可以调用的建表动作的,但是运行的拓扑是不可以动态调整的,不管DataStream 还是 SQL 的拓扑。 祝好 Leonard

Re: flink1.11读取kafka问题

2020-08-27 文章 Leonard Xu
Hi 是不是下游(sink mysql)反压了导致上游不再消费了,可以通过看看webui的指标[1]确定下。 祝好 Leonard [1]https://ci.apache.org/projects/flink/flink-docs-master/zh/monitoring/back_pressure.html > 在 2020年8月28日,09:22,小学生 <201782...@qq.com>

Re: tidb Binlog 整库同步到 hive

2020-08-27 文章 Leonard Xu
Hi, qishang > 1. 好像 Flink SQL 里面只能一个数据源 kafka 到一张Hive表,一个Topic是否可以根据解析的表不同插入到不同的Hive表吗 ? > 我看了一些邮件好像说不可以的,在问一下。 在一个SQL作业中是不行的,因为SQL是强依赖Schema的,schema需要事先声明。 > 2. 或者有什么好的解决方式吗?因为数据量都不是很大,表比较多,每个表都要维护一个任务的话,代价比较大。 除了多个sql作业的方式,如果需要在一个SQL作业中可以试试在一个作业里把所有表的binlog 格式统一用一个字段(如string)

Re: Flink 维表延迟join

2020-08-27 文章 Leonard Xu
多谢 Jark 提议 Issue[1] 建好了, 大家可以在issue下讨论。 祝好 Leonard [1] https://issues.apache.org/jira/browse/FLINK-19063 <https://issues.apache.org/jira/browse/FLINK-19063> > 在 2020年8月27日,19:54,Jark Wu 写道: > > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。 > > On Thu, 27 Aug 2

Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 文章 Leonard Xu
Congrats, Dian! Well deserved. Best Leonard > 在 2020年8月27日,19:34,Kurt Young 写道: > > Congratulations Dian! > > Best, > Kurt > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li wrote: > >> Congratulations Dian! >> >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei wrote: >> >>> Congrats! >>> >>> On

Re: Flink 维表延迟join

2020-08-26 文章 Leonard Xu
Hi, all 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? Best Leonard > 在 2020年8月27日,10:39,china_tao 写道: > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 >

Re: Flink SQL Map类型字段大小写不敏感支持

2020-08-26 文章 Leonard Xu
Hi,zilong 之前我建了一个issue[1]支持大小写敏感, 有了个初步的PR,但是社区想做全套,字段名,表名,catalog名都统一解决,所以还没支持 祝好 Leonard [1] https://issues.apache.org/jira/browse/FLINK-16175?filter=12347488 > 在 2020年8月26日,20:47,zilong xiao 写道: > > 这个有相关的issue可以follow吗?

Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-26 文章 Leonard Xu
Thanks ZhuZhu for being the release manager and everyone who contributed to this release. Best, Leonard

Re: 请问一下,flink 1.11 的cdc历史数据问题

2020-08-24 文章 Leonard Xu
Hello > Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下: > 1.底层是使用了debezium来加载历史数据的吗? Flink支持两种CDC格式,debezium json和 canal json, debezium 和 canal都是CDC系统,简单说他们可以把数据库表的binlog以对应的json写入到消息队列如Kafka, 作为下游系统的Flink 支持去消费对应的cdc数据,两个CDC工作都支持加载历史数据的。 另外Jark 在Veverica 开源了一个Flink CDC connector

Re: flink1.11 mysql 分页查询

2020-08-17 文章 Leonard Xu
Hi 可以跟下这个issue[1], 在1.12会支持用于自定义query Best Leonard https://issues.apache.org/jira/browse/FLINK-17826 > 在 2020年8月18日,09:50,china_tao 写道: > > 那肯定不行啊,我mysql表里面内容很多。FlinkSQL有没有直接分页查询的方法么?望赐教。类似于spark > dataframe中的dbtable,万分感谢 > > > > -- >

Re: flink1.11 mysql 分页查询

2020-08-17 文章 Leonard Xu
Hi > 在 2020年8月17日,20:46,china_tao 写道: > > 您好,请教一个问题,flink sql 读取mysql如何分页查询。 > 在spark中,dataframe可以通过dbtable,传入分页查询的语句。 > val resultDF = session.read.format("jdbc") > .option("url",jdbcUrl) > .option("dbtable" , selectSql ) > .option("user",user) > .options(writeOpts) >

Re: flink-1.10.1 想用 DDL 入 ES5.6

2020-08-13 文章 Leonard Xu
Hi, 我贴的链接里有对应的PR[1], 你可以看看这个PR里的代码,代码入口可以从 Elasticsearch6DynamicSink.java 开始 比如你自己实现了Elasticsearch5DynamicSink

Re: 请问在 flink sql 中建立的多张表应当怎样分辨接收 kafka 传来的 canal-json?

2020-08-13 文章 Leonard Xu
Hello 现在只支持一个topic里包含单张表的changelog, 你这个case相当于用了一个topic里包含多张表的changelog,只是twocol在解析binlog时 a,b 字段找不到,你配置ignore-parse-errors就会返回(null,null) 建议每张chagnelog表和一个topic对应就可以了 祝好 Leonard > 在 2020年8月13日,19:55,LittleFall <1578166...@qq.com> 写道: > > 这是我在 flink sql 中建立的两张表: > create table base ( >id

Re: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 Leonard Xu
FLIP-129 [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API > 在 2020年8月13日,11:26,zhao liang 写道:

Re: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

2020-08-12 文章 Leonard Xu
Hi 现象是正常的, execute_sql是一个异步的方法,提交后就退出了,如果需要等待执行结果,可以调用如下方法显式地等待 sql_result = t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd") sql_result.get_job_client().get_job_execution_result().result() 祝好 Leonard Xu > 在 2020年8月12日,16:00,徐振华

Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 Leonard Xu
Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka connetor 目前的实现是AppendStreamTableSink,所以不能处理 社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。 Best Leonard [1]https://issues.apache.org/jira/browse/FLINK-18826 > 在

Re: flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

2020-08-11 文章 Leonard Xu
Hello 1. 使用CDC功能的话请用1.11.1,该版本修复了一个CDC的bug[1] 2. 另外你这个异常栈是没有找到对应的 connector jar,确认下用的是 flink-sql-connector-elasticsearch6_2.11-1.11.0 这个jar. 祝好 Leonard [1] https://issues.apache.org/jira/browse/FLINK-18461 > 在 2020年8月12日,13:31,jindy_liu

Re: flink-1.10.1 想用 DDL 入 ES5.6

2020-08-11 文章 Leonard Xu
Hi, ES5 没有SQL jar,所以不支持,可以参考[1] 支持ES6 的sql connector 的实现,这是在1.11里支持的。 Best Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-1 <https://issues.apache.org/jira/browse/FLINK-1>7027 > 在 2020年8月11日,11:18,kcz <573693...@qq.com> 写道: > > 查看了文档之后,DDL只支持ES6以上,如果我想ES5也用

Re: 请教flink计算一些报表需求的实现

2020-08-10 文章 Leonard Xu
Hi, 关键的图挂了,邮件里上传图片经常挂,可以用图床工具发个链接。 Best Leonard > 在 2020年8月10日,23:35,lfgy <15029270...@163.com> 写道: > > 最近在做一个报表的项目,5分钟和小时的报表采用Flink计算,遇到下面几个问题,请大佬帮忙解答下,谢谢。 > 输入的原始数据流包含了几十个维度和指标字段,然后会抽取其中的2~3个维度和若干指标进行汇聚计算, > 有些还需要计算分组TOPN,还有任务依赖,先计算3个维度,然后从3个维度计算两个维度。 > 我当前的实现流图是: >

Re: 关于FlinkSQL的文档中flinksql的connector和catalog划分以及hive catalog和hive connector是否必须配合使用的问题。

2020-08-10 文章 Leonard Xu
。难道不使用hivecatalog就不能读取hive数据?是这个意思嘛。感觉好像意思是hive读取特别,这种情况必须使用hive > catalog。不可以使用jdbc catalog,但使用hive connector嘛? 是的,不是特别,而是HiveCatalog 就是用来管理 Hive中表、库、函数的元数据中心,用这个HiveCatalog也是很自然的事情。目前不可以使用JDBC catalog,很长一段时间也应该不行,Jdbc catalog 里存的表都是DB里的表,不支持存放Hive的表。 祝好 Leonard Xu

Re: flink sql csv格式字段分隔符问题

2020-08-07 文章 Leonard Xu
Hi 试下这个 'csv.line-delimiter' = U&'\\0009' 注意后面不要再加单引号了 祝好 Leonard > 在 2020年8月7日,16:27,kandy.wang 写道: > > 设置 'csv.field-delimiter'='\t' > ,查询数据的时候,报错:org.apache.flink.table.api.ValidationException: Option > 'csv.field-delimiter' must be a string with single character, but was: \t > 请问,该怎么搞?

Re: flinksql jdbc异步维表

2020-08-06 文章 Leonard Xu
Hi 社区已经有一个issue在跟进了, 你可以关注下 Best Leonard [1] https://issues.apache.org/jira/browse/FLINK-14902 > 在 2020年8月7日,11:03,todd 写道: > > JdbcDynamicTableSource默认以同步方式加载JDBC数据,未来是否有计划提供异步接口,用户可以自主选择第三方异步框架。 > > > > -- > Sent from:

Re: flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2020-08-06 文章 Leonard Xu
Hi 你这种场景使用sql jar就可以了,sql jar 里面包含了 es connector 相关的依赖,两者确实不能共存,因为sql jar 对 es的依赖做了shade,而 es connector jar 没有对es的依赖做shade. 另外,你的异常栈应该是一个已知的lambda表达式序列化问题[1], 在1.11.0已经修复,可以升级1.11.1试下? Best Leonard [1] https://issues.apache.org/jira/browse/FLINK-18006

Re: Flink Mysql sink按时间分库分表

2020-08-05 文章 Leonard Xu
Hi 我理解这个除了指定表名,关键是要在数据库中自动建表吧,JDBC 这边之前有个相关issue我跟进过[2],不过代码还没进,暂时还没有好的办法。Es connector 是支持类似功能的,如果数据可以放在es可以使用下。 祝好 Leonard [1] https://issues.apache.org/jira/browse/FLINK-16294 > 在 2020年8月5日,20:36,张健 写道: > > > > > 大家好: > > > 想问下目前

Re: flink sql eos

2020-08-05 文章 Leonard Xu
Hi > 目前仅有kafka实现了TwoPhaseCommitSinkFunction,但kafka的ddl中也没有属性去设 > 置Semantic为EXACTLY_ONCE 除了Kafka还有filesystem connector也是支持 EXACTLY ONCE的,kafka 的已经在1.12支持了[1] > 当开启全局EXACTLY_ONCE并且所有使用的connector都支持EXACTLY_ONCE,是否整个应 > 用程序就可以做到端到端的精确一致性 是的。 祝好 Leonard [1]

Re: 退订

2020-08-04 文章 Leonard Xu
Hi 是指取消订阅邮件吗? 可以发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org <mailto:user-zh-unsubscr...@flink.apache.org> 取消订阅来自 user-zh@flink.apache.org <mailto:user-zh@flink.apache.org> 邮件列表的邮件 Flink 邮件列表的订阅管理,可以参考[1] 祝好, Leonard Xu [1] https://flink.apache.org/community.html#how-

Re: slot计算问题

2020-08-04 文章 Leonard Xu
Hi, 图挂了,你可以用个图床工具上传图片链接看看 Best Leonard > 在 2020年8月4日,19:53,★猛★ 写道: > > hi 你好, > > > > 我的flink是1.9,在streamgraph上设置最大并行度是780,并且所有算子在同一个分组下,但是实际使用的slot却是964,按理说应该是780,为什么会出现这种情况。 > 下图是flinkui里的graph > > > > 实际使用的slot > >

Re: 退订

2020-08-03 文章 Leonard Xu
Hi 是需要取消订阅邮件吗? 可以发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 取消订阅来自 user-zh@flink.apache.org 邮件组的邮件 邮件组的订阅管理,可以参考[1] 祝好, Leonard Xu [1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list > 在 2020年8月3日,16:27,baiyg25...@hundsun.com 写道: > > 退订 > >

Re: flink 1.11.0 conenctor-jdbc 报错

2020-08-02 文章 Leonard Xu
Hi > 在 2020年8月3日,10:16,song wang 写道: > > 查询 integer 如果MySQL中数据类型是 INT UNSIGNED,Flink 中 对应的类型是 BIGINT, 你检查下是不是这个原因,类型映射可以参考[1] Best Leonard [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#data-type-mapping

Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-01 文章 Leonard Xu
Hi, 这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 支持的INSERT语法,但是不是所有的 connector 都支持 INSERT OVERWRITE, 目前支持的只有 Filesystem connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持 INSERT OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1], 就是在connector

Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 Leonard Xu
Hello > 在 2020年7月31日,21:13,chenxuying 写道: > > 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的, 是通过用户的query来决定写入的模式是upsert 还是 append , 你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 如果已经在用1.11了,1.10的文档可以不用看的。

Re: Flink sql 转义字符问题

2020-07-31 文章 Leonard Xu
Hi, zilong SPLIT_INDEX(${xxx}, ‘;’, 0) ‘;’ 分号不是特殊字符,编译时应该不会报错的,我在Flink 1.11.1 用DDL 测试了下, 能够work的,不知道你的环境是怎样的。 U&'\003B' 是 ; 的 unicode编码,所以用这个unicode编码是可以的,但一般这种用法是在需要用不可见字符分割时我们这样使用, 比如 \n 对应的s是 U&'\\000A’ ,\r 对应的是 U&'\\000D’, 对于分号这种可见字符来讲,不需要用unicode编码就可以的。 祝好 Leonard > 在

Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 Leonard Xu
Hi, chenxuying 看你还是用的还是 " 'connector.type' = 'jdbc', …. " ,这是老的option,使用老的option参数还是需要根据query推导主键, 需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式. Best Leonard [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options

Re: flink kafka SQL Connectors 传递kerberos 参数

2020-07-30 文章 Leonard Xu
; > > > > 谢谢 ,我试试 > > > > > > > > > > > > > > > 在 2020-07-30 17:34:41,"Leonard Xu" 写道: >> Hi, >> kafka properties 的参数是可以透传的,你试试下面: >> >> ‘properties.security.protocol'='SASL_PLAINTEXT', >> ‘properties.sasl.mechanism'='GSSAP

Re: flink kafka SQL Connectors 传递kerberos 参数

2020-07-30 文章 Leonard Xu
Hi, kafka properties 的参数是可以透传的,你试试下面: ‘properties.security.protocol'='SASL_PLAINTEXT', ‘properties.sasl.mechanism'='GSSAPI’, ‘properties.sasl.kerberos.service.name'='kafka', 祝好 Leonard > 在 2020年7月30日,17:00,lydata 写道: > > > > 是否需要这3个参数,或者下面参数是否支持? > > > > >

Re: flink on yarn 读取 hbase数据时 ,Task失败,具体描述如下

2020-07-29 文章 Leonard Xu
Hi,张锴 这个描述看起来没有用的信息呢,既然有任务失败,失败的日志和异常信息可以贴出来看看。或者贴一个可以复现这个失败的case. > 在 2020年7月29日,17:02,张锴 写道: > > flink获取Hbase数据并计算 > 在本地测试没问题,提交到Yarn上出现Task任务失败,无相关日志输出,task任务一直重启。任务失败的地方在数据计算部分。 > 语言:Scala,无堆栈信息输出 Best Leonard

Re: 使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印

2020-07-29 文章 Leonard Xu
Hi > >bsTableEnv.executeSql("SELECT f_random, count(1) " + >"FROM datagen " + >"GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print(); TableResult.print() 方法目前只支持了 exactly-once 语义,在 streaming 模式下必须设置checkpoint才能work, 你配置下checkpoint之后再试下,支持 At

Re: flink-1.11 hive-1.2.1 ddl 无法写入数据

2020-07-29 文章 Leonard Xu
Hi, kcz 看connector的properties还是1.10的格式,你换成1.11试试[1]. > 在 2020年7月29日,15:07,kcz <573693...@qq.com> 写道: > > tableEnv.executeSql("CREATE TABLE situation.source_table (\n" + >"\thost STRING,\n" + >"\turl STRING,\n" + >"\tpublic_date STRING\n" + >

Re: Flink SQL 解析复杂(嵌套)JSON的问题 以及写入到hive类型映射问题

2020-07-28 文章 Leonard Xu
Hello > 问题是: > 如果json array 里还有一个array 也是继续嵌套定义吗? 这个数据是要写入到hive,该怎么映射,array > ,怎么映射成Hive类型,比如映射成array,这种情况的json该如何处理? 有没有什么办法直接把json > array,直接映射成array,试了一下发现不行,该如何处理这种复杂类型。 Json format有一个issue在解这个问题[1],可以把jsonNode强制转成 string, 1.12里会支持,可以看下. Best Leonard [1]

Re: kafka-connect json格式适配问题?

2020-07-27 文章 Leonard Xu
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗? 窗口里的时间用来做time attribute 列了吧,只能是TIMESTAMP(3), 其TIMESTAMP字段Flink是可以支持到TIMESTAMP(9)的 祝好 Leonard > 在 2020年7月27日,20:05,RS 写道: > > Hi, > 改了下sql,遇到一个新的问题: > Caused by: org.apache.flink.table.planner.codegen.CodeGenException: >

Re: flink sql 读取mysql

2020-07-24 文章 Leonard Xu
Hello 这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句 祝好 Leonard Xu > 在 2020年7月24日,16:20,liunaihua521 写道: > > org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered > "timestamp,"at line > Was expecting one of: > "CURSOR"...

Re: flink 1.11 cdc相关问题

2020-07-24 文章 Leonard Xu
Hi amenhub 针对这个问题,我建了个issue来跟踪这个问题[1], 另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息, DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2] Best Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-18700 <https://issues.apache.org/jira/browse/FLINK-18

<    1   2   3   4   >