使用了什么state backend?能描述一下产生上述问题的步骤吗?
是直接跑作业就产生上述错误,还是作业有基于sp启动,或者是中途重启过?
zhangbin 于2022年5月27日周五 13:34写道:
>
> Retry
> 回复的原邮件
> 发件人 zhangbin
> 发送日期 2022年05月27日 10:11
> 收件人 godfre...@gmail.com
> 抄送人 user-zh
> 主题 回复:Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误
>
把异常栈也发出来吧
zhangbin 于2022年5月26日周四 22:54写道:
r-3.4.14.jar
> flink-sql-connector-mysql-cdc-2.1.1.jar
> flink-table_2.11-1.13.0.jar
> flink-table-blink_2.11-1.13.0.jar
> log4j-1.2-api-2.12.1.jar
> log4j-api-2.12.1.jar
> log4j-core-2.12.1.jar
> log4j-slf4j-impl-2.12.1.jar
> -邮件原件-
> 发件人: godfrey he [mailto:godfre...
Hi liangjinghong,
原因是 blink planner 中引入并修改了 SqlTableRef 类, 而 Legacy planner 中没有引入 SqlTableRef
类,从而导致加载到了Calcite 中 SqlTableRef (该类有问题)。
解决方案:如果只使用到了blink planner,可以把legacy planner 的包冲lib下移除。
Best,
Godfrey
liangjinghong 于2022年2月14日周一 17:26写道:
> 各位老师们好,以下代码在开发环境中可以执行,打包部署后报错:
>
> 代码:
>
> CREATE
Hi Ada,
sql-gateway之前没有维护起来,确实是一个遗憾。
最近我们也关注到大家对batch的兴趣越来越浓,sql-gateway还会继续维护。
btw,非常欢迎分享一下你们使用Flink替换Spark遇到的一些痛点,我们会逐渐去解决这些痛点
Best,
Godfrey
Ada Wong 于2022年1月12日周三 10:09写道:
>
> cc tsreaper and Godfrey He
>
> 文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道:
可以把具体的sql发出来看看
yidan zhao 于2021年11月2日周二 下午7:06写道:
>
> 如题,我原先基于flink1.11和1.12貌似没这个问题。目前基于1.13出现这个问题。
> 问题描述如下:
> 我t1是kafka表,其中有个属性是event_time是row time属性,然后创建了view v1,通过select ,
> event_time from t1这样创建。 现在问题是这么创建之后,我基于v1查询报错说aggre.. window只能在time
> attributes上定义。
>
上传的图片没法显示,通过图床工具或纯文本方式重新发一遍
lyh1067341434 于2021年11月1日周一 上午10:42写道:
> 您好!
>
> 这样好像还是不行,因为group by id ,id还是密文字符串,还是会把id当成字符串处理,所以还是不能正确分组;
> 为了更清楚表达,下面为图示:
>
> 谢谢您!
>
>
>
>
>
>
>
> 在 2021-10-29 10:49:35,"Caizhi Weng" 写道:
> >Hi!
> >
> >你是不是想写这样的 SQL:
> >
> >SELECT id, sum(price) AS total_price
我建了一个jira,建议sql client把作业提交到各种集群的方式在文档里写清楚,
可以关注 https://issues.apache.org/jira/browse/FLINK-23483
Best,
Godfrey
Caizhi Weng 于2021年7月23日周五 上午10:12写道:
> Hi!
>
> 可以考虑把 k8s session 的 flink rest api 地址暴露出来,然后客户端把 execution.target 设为
> remote,rest.address 和 rest.port 设为相应地址。
>
>
例如 calc merge rule,还有calc,agg等其他相关rule,点比较散。得具体看
jun su 于2020年9月23日周三 上午10:22写道:
> hi godfrey,
> 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的
>
> godfrey he 于2020年9月23日周三 上午10:09写道:
>
> > Hi Jun,
> >
> > 可能是old planner缺少一些rule导致遇到了corner case,
> > b
ano, 而blink貌似没用hep,
> > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> >
> > godfrey he 于2020年9月22日周二 下午8:58写道:
> >
> >> blink planner 有这个问题吗?
> >>
> >> jun su 于2020年9月22日周二 下午3:27写道:
> >>
> >> > hi all,
blink planner 有这个问题吗?
jun su 于2020年9月22日周二 下午3:27写道:
> hi all,
>
> 环境: flink-1.9.2 flink table planner
> 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
>
> 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
> ---
> 代码:
>
>
据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解
wdmcode 于2020年9月10日周四 上午9:44写道:
> Hi Jimmy
>
> 给字段加双引号试试呢
> Select “F1”,”F2” from xxx.xxx;
>
>
> 发件人: Jimmy Zhang
> 发送时间: Thursday, September 10, 2020 9:41 AM
> 收件人: user-zh@flink.apache.org
> 主题: Flink 1.11 jdbc查pg失败
>
> flink 1.11用jdbc查询pg表时,pg表的字段是大写
能提供完整的demo吗?
me 于2020年9月11日周五 下午6:54写道:
> 1.flink 版本是1.11.1
> streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamBlinkSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> streamTableEnv = StreamTableEnvironment.create(streamEnv,
>
TableEnvironment 不是多线程安全的。
btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?
Jeff Zhang 于2020年9月14日周一 下午12:10写道:
> 参考zeppelin的做法,每个线程里都调用这个
>
>
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
>
>
> jun su
cc @Rui Li
李佳宸 于2020年9月14日周一 下午5:11写道:
> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
> 版本是1.11.1
> Caused by: java.io.FileNotFoundException: File
>
> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
> does
sql
client的默认并发为1,如果没有在sql-client-defaults.yaml显示设置parallelism,代码里面的默认并发为1.因此需要显示的设置
sql-client-defaults.yaml的parallelism
Jark Wu 于2020年9月15日周二 上午11:43写道:
> Hi,
>
> 请问
> 1. 有完整的异常栈吗? 你是怎么从 ck 恢复的呢? 用的什么命令?
> 2. 是的。因为 source 只能并发1。先写到 kafka,再从 kafka 同步是可以的。
>
> Best,
> Jark
>
> On Fri, 11 Sep
已知问题,已fix:https://issues.apache.org/jira/browse/FLINK-18750
guaishushu1...@163.com 于2020年9月16日周三 下午2:32写道:
> 当create_view和LATERAL TABLE 共用时 会出现字段找不到异常
>
> 语法:
> CREATE TABLE billing_data_test (
> message STRING
>
>
> create view v1 as
> select T.*
> from billing_data_test,
> LATERAL
blink 根据每个算子的digest信息来判断是否可以reuse(只有digest完全一样才可以reuse),
例如table source节点,算子信息包括:表名,select的字段信息,其他push down之后的信息等。
你可以通过explain的方式把plan打印出来看看,source的digest是否一样
Jingsong Li 于2020年9月17日周四 下午2:45写道:
> 你仔细看看这两个数据源是不是有什么不同
> 只要有一点不同,Blink 就 reuse 不了
>
> On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai
可以用flink提供的“去重"语法来支持
[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D
Shengkai Fang 于2020年9月15日周二 下午4:02写道:
> hi, 我对于使用upsert
>
> kafka能够省state感到疑惑。金竹老师提供的实现只是丢掉了delete消息,你的下游表依旧需要手动去重才可以得到准确的结果才对啊。如果每个下游表都手动去重这样子还能省state吗?
>
> star
已更新至flink1.11.1
godfrey he 于2020年8月24日周一 下午9:45写道:
> 我们会在这周让flink-sql-gateway支持1.11,请关注
> 另外,sql-client支持gateway模式,据我所知目前还没计划。
>
> shougou <80562...@qq.com> 于2020年8月24日周一 上午9:48写道:
>
>> 也有同样的问题,同时也问一下,sql client 计划在哪个版本支持gateway模式?多谢
>>
>>
>>
>
如果是通过TableEnvironment#execute方法提交需要设置execution.attached=true, 或者是通过flink
cli的 加上-d
如果是通过TableEnvironment#executeSql方法提交,需要代码里显示的等待作业结束:
TableResult tableResult = tEnv.executeSql(xxx);
// wait job finished
我们会在这周让flink-sql-gateway支持1.11,请关注
另外,sql-client支持gateway模式,据我所知目前还没计划。
shougou <80562...@qq.com> 于2020年8月24日周一 上午9:48写道:
> 也有同样的问题,同时也问一下,sql client 计划在哪个版本支持gateway模式?多谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
检查一下log4j2相关的版本,参考[1]
[1]
https://stackoverflow.com/questions/50970960/facing-issue-with-log4j2-java-lang-exceptionininitializererror
guaishushu1...@163.com 于2020年8月24日周一 上午11:18写道:
> SQL提交会出现这种问题???
> Caused by: java.lang.IllegalArgumentException: Initial capacity must be at
> least one but
看看deduplication语法[1] 是否满足你的需求
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
forideal 于2020年8月17日周一 下午12:13写道:
> Hi,
>
> 最近我有一个使用 Flink SQL 做简单的数据去重的需求,想使用 Flink 的 `ScalarFunction`,通过阅读
> API 发现 FunctionContext context 并不支持访问
> GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
是要1个小时的window结束才会出结果。
你可以通过把window缩小或者设置early-fire来提早看到数据
table.exec.emit.early-fire.enabled=true
table.exec.emit.early-fire.delay=xx
> 手动拿到那个executeSql的返回的TableResult,然后去 wait job finished
这个是为了防止本地ide里执行时executeSql执行结束后进程退出导致job也强制结束
DanielGu
可以把原始的计算结果打印出来,执行 set execution.result-mode=changelog
(如果source有delete消息,可能会出现null值)
LittleFall <1578166...@qq.com> 于2020年8月13日周四 下午3:33写道:
> mysql 的建表语句
> use test;
> create table base (
> id int primary key,
> location varchar(20)
> );
> create table stuff(
> id int
sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。
Zhao,Yi(SEC) 于2020年8月13日周四 下午5:11写道:
> A是10机器集群(HA模式,独立集群),B作为提交机器。
> 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
> ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
> 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
>
tEnv.executeSql(insertSql); 是异步提交完任务就返回了,
如果是IDE里运行的话话,进程就直接退出导致job也就结束了。需要需要等到job结束,
目前可以通过下面这种方式
TableResult result = tEnv.executeSql(insertSql);
result..getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
另外 tEnv.executeSql(insertSql);
配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗?
Benchao Li 于2020年8月10日周一 上午10:36写道:
> Hi,
>
> 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
>
> op <520075...@qq.com> 于2020年8月10日周一 上午10:27写道:
>
> > Hi
> > 在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
> > val config =
BatchTableEnvironmentImpl 属于 old planner,
缺少 flink-table-planner_${scala.binary.version}.jar 的依赖
郭华威 于2020年8月10日周一 上午10:21写道:
> flink1.11.1 使用tableApi 报错:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Create BatchTableEnvironment failed.
> at
>
目前sql-client还不支持。关于纯SQL文本statement set的支持,
目前社区已经达成语法的一致意见,应该后续会慢慢的支持。
kandy.wang 于2020年8月5日周三 下午10:43写道:
>
>
>
>
>
>
> @ godfrey
> 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-04 19:36:5
端是没有复用呢?理论上source +
> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>
>
> 在 2020-08-04 17:26:02,"godfrey he" 写道:
> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >
> >kandy.wang 于2020
blink planner支持将多sink的query优化成尽量复用重复计算部分。
1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
kandy.wang 于2020年8月4日周二 下午5:20写道:
> FLINK SQL view相关问题:
> create view order_source
>
> as
>
> select order_id, order_goods_id, user_id,...
>
> from (
>
> ..
feMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
> ...
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
> ... 21 more
> [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize
> class org.apache.flink.util.JavaGcCleanerWrapper]
>
逻辑上批产生的结果是Table,流产生的结果是Changelog。
你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
最简单的方式可以将query改为带group by的,再看结果的差异。
更多关于Table和Changelog的概念可以参考 [1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
chenxuying 于2020年8月4日周二 上午11:44写道:
> hi :
>
parquet.compression=SNAPPY,更多信息可参考[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/parquet.html#format-options
lydata 于2020年8月4日周二 上午11:39写道:
> Flink DDL的方式 写 Hive parquet 格式 ,是否支持 snappy压缩,如果支持 请问下参数是什么?
你把Map换为Map试试
zilong xiao 于2020年8月3日周一 下午4:56写道:
> 目前转List可以用数组代替,Map貌似没法成功运行
>
> zilong xiao 于2020年8月3日周一 上午10:43写道:
>
> > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is
> not
> > supported:
> >
>
如果你想在client端拿到query的结果做preview的话,目前API层面支持直接collect或者print执行结果,可以参考 [1]
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#execute-a-query
Jeff Zhang 于2020年8月1日周六 下午11:01写道:
> Apache Zeppelin有自己的rest api,你可以用rest api来提交flink sql
>
StatementSet 中的多个insert会被编译成一个job提交。
你能提供一下对应的代码样例吗?
op <520075...@qq.com> 于2020年7月30日周四 下午3:57写道:
> 大家好,我发现StatementSet 里添加2个insertsql执行的时候会启动两个application,
> 这两个任务除了sink都是一样的吗?这样是不是会重复计算和浪费资源,而且两边的数据可能不同步,
> 有什么办法能解决?
> 谢谢
t suitable for
> executing sql files
>
> godfrey he 于2020年7月29日周三 上午9:56写道:
>
>> hi Jun,
>>
>> Currently, sql client has supported -u option, just like:
>> ./bin/sql-client.sh embedded -u "insert_statement".
>>
>> There is already a JIRA [
VIDERS
>
> .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))后确实不报这个错了。
>
> 题外话,个人认为flink不应该将这样的异常抛给用户去解决,除非我去深入研究源码,要不然根本无法搞清楚具体发生了什么,在封装性上还有待改善。
> ________
> 发件人: godfrey he
> 发送时间: 2020年7月28日 13:55
> 收件人: user
一次都会创建一个TableEnvironment
>
> 发件人: godfrey he
> 发送时间: 2020年7月28日 9:58
> 收件人: user-zh
> 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
>
> 你们是否在多线程环境下使用 TableEnvironment ?
> TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
>
> g
你的包是完整的flink-1.11.1的包吗?
例如 check一下 ClusterClientJobClientAdapter 这个类是否继承 CoordinationRequestGateway
?
shimin huang 于2020年7月28日周二 上午11:21写道:
> Hi,all:
> 本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下:
> org.apache.flink.client.program.ProgramInvocationException: The main method
>
你们是否在多线程环境下使用 TableEnvironment ?
TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
godfrey he 于2020年7月28日周二 上午9:55写道:
> hi 能给出详细的schema信息吗?
>
> wind.fly@outlook.com 于2020年7月27日周一
> 下午7:02写道:
>
>> 补充一下,执行的sql如下:
>>
>>
hi 能给出详细的schema信息吗?
wind.fly@outlook.com 于2020年7月27日周一 下午7:02写道:
> 补充一下,执行的sql如下:
>
> select order_no, order_time from
> x.ods.ods_binlog_test_trip_create_t_order_1
>
>
> 发件人: wind.fly@outlook.com
> 发送时间: 2020年7月27日 18:49
> 收件人: user-zh@flink.apache.org
和hive结合下,filesystem是支持流式读取的,可以参考 [1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/hive/hive_streaming.html#streaming-reading
Leonard Xu 于2020年7月23日周四 下午10:28写道:
> Hi,
>
> Filesystem connector 支持streaming 写入,streaming 读取
> 还未支持,所以读取完了就停止。支持streaming
1.10 也是支持的
Michael Ran 于2020年7月22日周三 下午9:07写道:
> 1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,2.with
> properties属性很重要 ,关系我自定义的一些参数设定。3.关于 catalog 这个东西,是不是只有1.11
> 版本才能从catalog 获取 with properties 哦? 1.10 you 有支持吗
> 在 2020-07-22 18:22:22,"godfrey he"
你观察到有sink写不过来导致反压吗?
或者你调大flush interval试试,让每个buffer攒更多的数据
曹武 <14701319...@163.com> 于2020年7月23日周四 下午4:48写道:
> 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
> 代码如下:
> String sourceDdl =" CREATE TABLE debezium_source " +
> "( " +
> "id
hi,
目前没有解决办法,insert job根据sink表名自动生成job name。
后续解法关注 https://issues.apache.org/jira/browse/FLINK-18545
Weixubin <18925434...@163.com> 于2020年7月23日周四 下午6:07写道:
> Hi,
> 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。
> 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。
>
867#event-3578490750>
Best,
Godfrey
wind.fly@outlook.com 于2020年7月23日周四 下午7:34写道:
> Hi,Godfrey:
> 加了checkpoint后确实可以了,能具体讲一下原理吗?print是在完成快照的时候顺便把结果输出了吗?或者有没有相关文档?
>
> Best,
> Junbao Zhang
> ________
> 发件人: godfrey he
> 发送时间: 2020年7月23日
1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
都是exactly once语义,需要配置checkpoint才能得到结果。
Best,
Godfrey
wind.fly@outlook.com 于2020年7月23日周四 下午7:22写道:
> Hi, all:
>
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> sql如下:
>
>
>
这个问题的已经有一个issue:https://issues.apache.org/jira/browse/FLINK-18545,请关注
WeiXubin <18925434...@163.com> 于2020年7月23日周四 下午6:00写道:
> Hi,
> 我想请问下使用 streamExecutionEnv.execute("from kafka sink
> hbase"),通过这种方式可以给Job指定名称。
> 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。
> 请问有什么解决方案吗?谢谢
>
>
>
> --
>
tableEnv 中 可以通过
tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
如果要拿到properties,可以通过catalog的接口得到 [1]。
如果要自定义实现source/sink,可以参考 [2]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
[2]
;
>诚然,DataStream的灵活性确实会是一把双刃剑,但就像@leonard提到的,平台层和应用层的目的和开发重点可能也不太一样,对Flink
> API使用侧重点也不同。我个人还是希望可以在享受全新API设计优势同时,
>
> 可以继续使用DataStream(Transformation)的灵活性,助力Flink组件在我们组的开落地
>
>
> 再次感谢各位的回复!
>
>
> 发件人: Jark Wu
> 发送时间: 2020年7月2
hi Xingxing,
1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
postgres catalog,
可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
catalog写新的meta。
是否会转为默认catalog,据我所知,目前没有。
3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。
Best,
Godfrey
sql-client.sh的-u是指update语句,目前只支持insert。
Jark Wu 于2020年7月21日周二 下午6:47写道:
> Hi,
>
> 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。
> 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。
> 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.
>
> Best,
> Jark
>
> On Thu, 16 Jul
可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
Dream-底限 于2020年7月21日周二 下午7:25写道:
> hi
>
>
你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。
另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:
sql1='''Insert into g_sink_unit select alarm_id,trck_id from
kafka_source_tab'''
sql2='''Insert into g_summary_base select alarm_id,trck_id from
kafka_source_tab;'''
小学生 <201782...@qq.com>
http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
这个邮件里提到了类似的问题。
https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将
“data”,“mysqlType”等格式不确定的字段定义为String类型,
下游通过udf自己再解析对应的json
Best,
Godfrey
jindy_liu
;
>>
>> On Fri, Jul 17, 2020 at 2:54 PM godfrey he wrote:
>>
>>> udf_xxx的逻辑是啥?
>>>
>>>
>>> Luan Cooper 于2020年7月17日周五 下午2:40写道:
>>>
>>> > Hi
>>> >
>>> > 我有这么一个 SQL
>>> > INSERT INTO es
>
如果你是用flink sql的,可以通过DDL的方式来定义kafka sink,参考 [1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
Best,
Godfrey
smq <374060...@qq.com> 于2020年7月19日周日 下午9:36写道:
> 大家好,我想通过avro格式sink到kafka,请问该怎么实现,官网上没找到相关方法。
hi Evan,
感谢反馈,目前已经有一个issue [1]在跟踪该问题,可以关注后续进展
[1] https://issues.apache.org/jira/browse/FLINK-18545
是的。目前按照你的写法做不到只提交一个job了
sunfulin 于2020年7月17日周五 下午3:11写道:
>
>
>
> hi,
> 再问下,这个方案还是会提交两个job吧?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 14:36:19,"godfrey he" 写道:
> >做不到,1.11里把 StreamExecutionEnvironment.execute 和
> &
第二个问题的异常栈是啥?
kcz <573693...@qq.com> 于2020年7月17日周五 下午2:17写道:
> 第一个bug提示只需要
> classloader.resolve-order: parent-first
> 第二个bug采用了parquet还没解决
>
>
> --原始邮件--
> 发件人:
> "kcz"
>
udf_xxx的逻辑是啥?
Luan Cooper 于2020年7月17日周五 下午2:40写道:
> Hi
>
> 我有这么一个 SQL
> INSERT INTO es
> SELECT
> a,
> udf_xxx(b)
> FROM mongo_oplog -- 自定义 TableFactory
>
> Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码 Exception,可以稳定重现
>
> LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
>
>
>
>
> hi,
> 感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
> to DataStream的语句不会生成拓扑。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 12:09:20,"godfrey he" 写道:
> >hi sunfulin,
>
hi sunfulin,
目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
Best,
Godfrey
Leonard Xu 于2020年7月17日周五 上午12:12写道:
> Hi,
>
> 我理解目前好像做不到, cc: godfrey 大佬看看
>
> 祝好,
> Leonard Xu
为什么要 GROUP BY id,name ,description, weight ?
直接 "INSERT INTO sink SELECT id,name ,description, weight FROM
debezium_source" 不能满足需求?
曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:
> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> 从checkpoint恢复以后,新来op=d的数据会删除失败
> 重启命令:./bin/flink run -m
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
Best,
Godfrey
wangl...@geekplus.com.cn 于2020年7月16日周四 下午4:02写道:
> 比如:
>
> CREATE TABLE my_table (
> id BIGINT,
> first_name STRING,
> last_name
现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
你可以配置在flink-conf.yaml里
Harold.Miao 于2020年7月16日周四 下午1:27写道:
> hi flink users
>
> 通过sql-client提交sql怎么设置checkpointing.interval?
> 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
> 谢谢
>
>
>
> --
>
> Best Regards,
> Harold Miao
目前 1.11 版本中的 tableResult.print 只支持 exactly once 语义,需要配置 checkpoint。
1.12 里准备支持 at least once 语义,用户可以不用配置 checkpoint。目前 pr [1] 正在reivew 。
[1] https://github.com/apache/flink/pull/12867
Best,
Godfrey
Jingsong Li 于2020年7月16日周四 上午11:36写道:
> tableResult.print需要有checkpoint
>
> Best,
> Jingsong
>
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和
hi sunfulin,
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和
可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
JobClient 可以 cancel 作业,获取 job status。
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
Best,
Godfrey
Evan 于2020年7月9日周四 上午9:40写道:
> 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
>
select a.table_tmp1.r1 / a.table_tmp2.r2
这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。
zilong xiao 于2020年7月8日周三 下午8:55写道:
> 列如下面这样,需要查询table1 & table2,分别查询不同的字段
> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~
> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from
>
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 方法是直接执行sql作业
hi jack,jincheng
Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
it.next()
}
但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)
但是1.11的TableResult#collect实现对流的query支持不完整(只支持append
only的query),master已经完整支持。
可以参照
hi,如 benchao 所说,SELECT XX AS YY 和Table API的renameColumns等价。
而且这些名字仅用于sql解析和优化阶段,实际执行的时候不会使用字段名。
Best,
Godfrey
Benchao Li 于2020年6月12日周五 下午6:36写道:
> 直接用SELECT XX AS YY就等价于Table API的renameColumns了吧。
>
> naisili Yuan 于2020年6月12日周五 下午6:23写道:
>
> > Hi all
> >
hi chenkaibit
欢迎将fix贡献回社区
chenkaibit 于2020年6月9日周二 上午10:34写道:
> 我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>
>
> 在 2020-06-05 15:06:48,"hb" <343122...@163.com> 写道:
> >Flink SQL 作业,
hi 请问你用的flink是哪个版本?StreamTask这个类里报了NPE,感觉是bug。
hb <343122...@163.com> 于2020年6月5日周五 下午3:07写道:
> Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
> 但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
> 哪位帮忙看看,不胜感激.
>
>
> 2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task -
>
chao Yang
>
>
>
>
>
> -- 原始邮件 --
> 发件人: godfrey he 发送时间: 2020年6月4日 21:17
> 收件人: user-zh 主题: 回复:Flink SQL 子查询优化问题
>
>
>
> hi Yichao,
>
> 目前 planner 会 try best 的将两个相邻的 project 节点合并在一起,除非两个project被分开。
> 就像你上面的那种做法。但是加一个group by的执行代价比较高。
&g
hi yihan,
如 Leonard 所说,你可以考虑使用 first_value, last_value 等聚合函数和赛选其他字段。
1.11开始支持ddl定义pk信息, 如果id在source表中也是pk字段,可以直接定义,
planner会利用该信息传递pk到sink表。
Bests,
Godfrey
Leonard Xu 于2020年6月4日周四 下午9:01写道:
> Hi,
>
> > 但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项,
>
> select后费聚合值可以通过max()或sum()来取,因为已经按照key
hi 可以考虑使用 temporal table join [1]
Best,
Godfrey
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#join-with-a-temporal-table
小屁孩 <932460...@qq.com> 于2020年6月4日周四 下午5:51写道:
> 您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去
> 这是我的source
>
>
> import
hi Yichao,
目前 planner 会 try best 的将两个相邻的 project 节点合并在一起,除非两个project被分开。
就像你上面的那种做法。但是加一个group by的执行代价比较高。
对于blink planner 而言,*有一个绕的办法*,可以在子查询的结果加一个print sink(可以ignore输出),
利用多sink的优化特性,将两个project分开,从而阻止优化器将两个project合并。
Best,
Godfrey
1048262223 <1048262...@qq.com> 于2020年6月4日周四 下午4:56写道:
> Hi
hi greemqq...@163.com,15701181132mr@gmail.com
能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等
hi hdxg1101300...@163.com,
你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink
sql,还是datastream?
Bests,
Godfrey
Bests,
Godfrey
Michael Ran 于2020年6月3日周三 下午8:07写道:
> 我们也会用幂等处理类似的东西。
>
hi 元灵,
这个是个已知bug: https://issues.apache.org/jira/browse/FLINK-17753,目前正在fix。
Bests,
Godfrey
元灵 于2020年6月3日周三 下午5:39写道:
> 大家好,请教个问题:
> 我在pyflink中使用SQL DDL创建kafka源,如下:
> kafka_source_ddl = """
> CREATE TABLE kafka_source_tb (
> name VARCHAR,
> number INT,
> msgtime
hi star,
Flink 1.11 开始已经支持 table source 读取 retract 消息,update 消息。
目前支持 Debezium format,Canal format [1],其他的情况目前需要自己实现。
Best,
Godfrey
[1]
hi shangwen, flink
支持配置 env.java.opts,env.java.opts.jobmanager,env.java.opts.taskmanager
等来配置JVM
详细请见:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jvm-and-logging-options
Best,
Godfrey
shangwen <583767...@qq.com> 于2020年6月2日周二 下午9:03写道:
>
hi john,请问你用sql client跑的是yarn per job 还是 yarn session 模式?
security 相关的配置需要放到 flink-conf.yaml 中,是因为 sql client不负责启动一个flink cluster,
只负责提交sql job。
Best,
Godfrey
john <506269...@qq.com> 于2020年5月29日周五 下午12:09写道:
> flink version: 1.10.1
> Hive versos: 2.11
> flink-conf.yaml 安全配置:
> # security
>
Hi, 夏帅
感谢反馈问题,我建了一个issue https://issues.apache.org/jira/browse/FLINK-18055
,应该今天就可以fix
Best,
Godfrey
Leonard Xu 于2020年6月2日周二 下午12:13写道:
> Hi, 夏帅
>
> 感谢反馈,这应该是个bug,我 这边本地也复现了,我先看下哈
>
> 祝好,
> Leonard Xu
>
> > 在 2020年6月2日,11:57,夏帅 写道:
> >
> > 是我编译的问题么,在window下编译的
>
>
目前StreamTableEnvironment和TableEnvironment在DAG优化方面的行为自1.11开始都是一样的了,建议都使用StatementSet来支持多insert。TableEnvironment以后还会进一步的简化和整合。
Best,
Godfrey
wind.fly@outlook.com 于2020年5月28日周四 下午5:45写道:
> Hi,
>
>
第一个问题:set execution.parallelism=10;
这样命令设置的job的默认并发度。一些算子有自己并发度的设置逻辑,不受默认并发度的影响(例如
hive的source,是根据partition数来的)。你可以在提交作业之前调用set命令来修改每个job的默认并发度。
第二个问题:1.11在 FLINK-16822[1] 被fix后,你可以通过配置项方式设置checkpoint [2]。例如:
set execution.checkpointing.mode=EXACTLY_ONCE。
[1]
目前社区已经在讨论 release-1.10.1 RC [1] 的发布
[1]
http://mail-archives.apache.org/mod_mbox/flink-dev/202004.mbox/%3CCAM7-19K0YsejvZpfVJrvEX6_DOJ7sUViEn9nB-5zfhX8P28_9A%40mail.gmail.com%3E
Best,
Godfrey
Benchao Li 于2020年4月16日周四 下午3:06写道:
> Hi,
> Flikn 1.10.1还没有正式发布,暂时还没有地方可以直接下载。可以从源码直接编译一下~
>
>
Hi Even,
1. 目前 SQL CLI 支持通过在 sql-client-default.yaml 里设置 parallelism
和 max-parallelism 来控制 CLI 任务的默认并发。或者通过 set 命令,如 set
execution.parallelism=10;放方式动态设置。例外,对于如果使用 blink
planner,可以用 table.exec.resource.default-parallelism 来配置默认并发。[1]
另外 SQL CLI 还不支持 checkpoint 的设置。
2. 目前 SQL CLI 默认是 in-memory
Hi Xinghalo,
欢迎向 sql gateway 贡献~
Best,
Godfrey
111 于2020年4月2日周四 上午11:10写道:
> Hi,
> 了解了,那我知道怎么解决了。我这边使用的是sql-gateway,看样子得在sql-gateway里面加一种表定义的语法了。
> 多谢多谢
>
>
> Best,
> Xinghalo
> 在2020年04月2日 10:52,Benchao Li 写道:
> 你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法:
>
> SELECT
>
目前 flink 支持 Scala 2.11 和 Scala 2.12, 默认情况下,通过 mvn package 打包出来的是包含 Scala
2.11 的包,例如 flink-table-blink_*2.11*-1.10.0.jar。
可以通过 -Dscala-2.12 指定 Scala 的版本是 2.12, 打出来的包是 flink-table-blink_*2.12*
-1.10.0.jar 这样的。
Best,
Godfrey
wangl...@geekplus.com.cn 于2020年3月26日周四 下午6:34写道:
>
>
还有一种方式是sql gateway 支持 --jar 和 --library 指定用户的jar,这种方式不需要用户将jar放到flink的lib下
godfrey he 于2020年3月25日周三 下午6:24写道:
> hi 赵峰,
>
> 你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink
> JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。
>
> Best,
> Godfre
hi 赵峰,
你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink
JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。
Best,
Godfrey
Zhenghua Gao 于2020年3月25日周三 下午4:26写道:
> 请确认一下 kafka connector 的jar包是否在 flink/lib 下。
> 目前的报错看起来是找不到kafka connector的jar包。
>
> *Best Regards,*
>
Hi,你可以尝试在yarn上去拿历史作业的日志
Best,
Godfrey
111 于2020年3月25日周三 上午10:53写道:
> Hi,
>
> 目前确实通过sql-gateway拿到了jobId,并且获取到了状态。不过由于使用的是yarn-session,导致失败后taskmanager回收,taskmanager上的日志也就丢失了。
> 如果连接到Jobmanager上,是拿不到taskmanager曾经的日志吧?
>
>
> Best,
> xinghalo
> 在2020年03月25日 10:47,godfre
on的机制呢?
>
>
> | |
> xinghalo
> |
> |
> xingh...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月25日 10:32,godfrey he 写道:
> 目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
> gateway不会返回错误。你看看flink web ui作业是否提交成功
>
> Best,
> Godfrey
>
> 111
共有 106 项搜索結果,以下是第 1 - 100 matches
Mail list logo