Re:Re:Blink 1.11 create view是不是没有办法把rowtime带下去?

2020-11-15 文章 周虓岗
不是,我值得是table api可以带event time。 如果整个使用sql表达,怎么把time attribute待下去 在 2020-11-16 15:53:44,"hailongwang" <18868816...@163.com> 写道: >Hi zhou, > 你是指的 createTemporaryView 这个方法吗,这个方法上也可以指定字段,例子可以查看[1]。 >其中 createTemporaryView 的实现也是间接调用了 fromDataStream 方法[2]。 > > >[1]

Re:Blink 1.11 create view是不是没有办法把rowtime带下去?

2020-11-15 文章 hailongwang
Hi zhou, 你是指的 createTemporaryView 这个方法吗,这个方法上也可以指定字段,例子可以查看[1]。 其中 createTemporaryView 的实现也是间接调用了 fromDataStream 方法[2]。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-view-from-a-datastream-or-dataset [2]

Re: Flink sql 无法用!=

2020-11-15 文章 Danny Chan
是的 <> 是 SQL 标准推荐的用法。 jindy_liu <286729...@qq.com> 于2020年11月16日周一 下午2:24写道: > 用<> > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象

2020-11-15 文章 Danny Chan
DataSet 已经是社区准备 deprecate 的 API 了,不建议再使用。1.12 版本后推荐统一使用 DataStream,使用 sqlQuery 接口拿到 table 对象后转成 DataStream。 Asahi Lee <978466...@qq.com> 于2020年11月13日周五 下午4:05写道: > BatchTableEnvironment对象可以进行table to dataset; dataset to table > > > > > --原始邮件-- > 发件人: >

Blink 1.11 create view是不是没有办法把rowtime带下去?

2020-11-15 文章 周虓岗
通过table api的// declare an additional logical field as an event time attribute Tabletable=tEnv.fromDataStream(stream,$("user_name"),$("data"),$("user_action_time").rowtime()"); 可以把eventtime往后传, 如果使用createview的话怎么把这个time attribute往后带吗? 不往后传的话可能会 这个有什么方法吗?

Re: flink算子类在多个subtask中是各自初始化1个实例对象吗?

2020-11-15 文章 tison
可以这么认为,大体上你可以认为每个并发有自己的环境。 技术上,算子对象是每个并发会实例化一个,而 static 变量的【共享】程度跟你设置的 slot per TM 值还有其他一些调度相关的参数有关,但是最好不要依赖这种实现层面的东西。 一种常见的误解是我创建一个 static HashMap 就神奇地拥有了全局的键值存储,这当然是不对的,只有在同一个 JVM 实例上也就是同一个 TM 上的任务才会看到同一个 HashMap 对象,而这几乎是不可控的。 可以看一下这篇文档[1]对物理部署的实际情况有一个基本的认知。 Best, tison. [1]

Re: Reduce等函数的对下reuse问题

2020-11-15 文章 赵一旦
再具体点,reduce中return的对象作为reduce之后输出(这里是否涉及立即序列化)。 reduce(new ReduceFunction{ @Override public ObjCls reduce( ObjCls ele1, ObjCls ele2 ){ long resultPv = ele1.getPv() + ele2.getPv(); ele1.setPv(999); // 此处如果加这么一句setPv,会影响到什么算子呢?(各种可能DAG情况下) ele1.setPv( resultPv ); return

Reduce等函数的对下reuse问题

2020-11-15 文章 赵一旦
如题,想知道reduce函数实现的时候,什么情况复用对下可能导致问题呢?or永远不可能导致问题呢? 比如计算图中存在很多重复计算: streamA.reduce(reduceFunction1,); streamA.reduce(reduceFunction2,); streamA.

Re: 关于去重(Deduplication)

2020-11-15 文章 macia kk
好的,明白了,谢谢 Jark Wu 于2020年11月16日周一 上午10:27写道: > 关于2, 你理解反了。性能上 deduplicate with first row 比 first_value 更优。 因为deduplicate > with first row 在 state 里面只存了 key,value 只用了一个空字节来表示。 > > On Sun, 15 Nov 2020 at 21:47, macia kk wrote: > > > 感谢 Jark 回复, 一直有看你的博客,收益匪浅。 > > > > 关于2,性能上是 first_value

changlog-json??????????

2020-11-15 文章 DovE?g
Hi,?? ?? SQL select memberid, shop_time, workerid, resource_type, proctime from( select memberid, shop_time, workerid, resource_type, proctime from inviteShop where shop_time = DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') ) t0

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 Jark Wu
1. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication 2. 这是1.12 的功能,定义在 sink DDL with 属性里的。 On Mon, 16 Nov 2020 at 14:18, jindy_liu <286729...@qq.com> wrote: > 哦,这样啊 > 1、加上一个 deduplicate by sink key 节点在sql中是怎么写的? > 2、另外sql 中有关键字能单独指定一条sink

Re: Flink sql 无法用!=

2020-11-15 文章 jindy_liu
用<> -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink sql 无法用!=

2020-11-15 文章 丁浩浩
我想在where条件下用不等号报错,难道flink sql不等号不是!=这个吗? [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Bang equal '!=' is not allowed under the current SQL conformance level

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
哦,这样啊 1、加上一个 deduplicate by sink key 节点在sql中是怎么写的? 2、另外sql 中有关键字能单独指定一条sink sql的并发度吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink算子类在多个subtask中是各自初始化1个实例对象吗?

2020-11-15 文章 hl9...@126.com
Hi,all: flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例? 我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例? 希望有朋友能解释下算子在job运行中初始化的过程。 测试相关代码如下: // flink 1.10.2版本,并行度为3 @Slf4j public class PersonFlatMap extends

Re: Flink中readFile中如何只读取增量文件

2020-11-15 文章 Jark Wu
可以去 JIRA 中开个 issue 反馈下这个功能。 On Mon, 16 Nov 2020 at 12:45, hectorhedev wrote: > 试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY > 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED > 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。 > >

flink-1.11.2 job启动不起来,

2020-11-15 文章 史 正超
启动命令:run -d -m yarn-cluster -p 12 -yjm 1600 -ytm 12288 -ys 12 -ynm xxx -yqu flink-critical -j /app/flink-1.11.2/executor/fcbox-streaming-sql-platform-1.11.jar --sqlid 17 --jobName realtime_app_kpi_dis_day_16 12个并行度, 12个slot,启动不了 Caused by: java.util.concurrent.CompletionException:

回复:Flink中readFile中如何只读取增量文件

2020-11-15 文章 hectorhedev
试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。 这个问题之前的邮件里有讨论过,见:http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
图片png格式,怕看不了,我文字补充下: 1、print的最后几行。 32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0) 32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0) 32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0) 32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0) 32>

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 Jark Wu
如果你是改了test表上的 status 关联字段,那么是会出现这个现象的。你一开始的 example 不是改 status 字段的。 这个问题的本质是 join key 和你最终的 sink key 不一致,导致可能出现乱序。 这个只需要在 sink 前显式按照 sink key shuffle 应该就能解决,比如加上一个 deduplicate by sink key 节点。 或者在 1.12 版本中,只需要 sink 并发与前面节点的并发不一样,框架也会自动加上一个 sink key shuffle。 关于你说的 join 节点热点问题,那是因为你的 status key

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
怕图片看不清, 我文字补充下: 1、print的最后几行。 32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0) 32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0) 32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0) 32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0) 32>

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
图片是屏幕截图,png格式的。忘记加后缀了。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

回复: Flink中readFile中如何只读取增量文件

2020-11-15 文章 Hector He
试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。 这个问题之前的邮件里有讨论过,见:http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
我又重试了次,不用重启job也会有问题,就是把并行度大于1会有问题!。 1、直接在sql-client里,启动/data/home/jindyliu/flink-demo/flink-1.11.2/bin/sql-client.sh embedded -d /data/home/jindyliu/flink-demo/flink-1.11.2//conf/sql-client-defaults.yaml sql-client-defaults.yaml的并行度设置为40. 数据一样,其中test表规模是200w条,status表11条。 源表test: CREATE TABLE

??????flink sql hive streaming??????????????????bug

2020-11-15 文章 Excalibur
??hive table read blinkStreamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true); Table table=blinkStreamTableEnv.sqlQuery("SELECT * FROM test.table_config /*+ OPTIONS('streaming-source.enable'='true','streaming-source.monitor-interval'

flink sql hive streaming??????????????????bug

2020-11-15 文章 Excalibur
??1.11.2 ??java.lang.IllegalArgumentException: TheContinuousFileMonitoringFunctionhas already restored froma previous Flinkversion. at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176) at

flink sql hive streaming??????????????????bug

2020-11-15 文章 Excalibur
??1.11.2 ??java.lang.IllegalArgumentException:TheContinuousFileMonitoringFunctionhas already restoredfroma previousFlinkversion. at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176) at

flink sql hive streaming??????????????????bug

2020-11-15 文章 Excalibur
??1.11.2 ??java.lang.IllegalArgumentException: The ContinuousFileMonitoringFunction has already restored from a previous Flink version. at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176) at

Re: Flink中readFile中如何只读取增量文件

2020-11-15 文章 Jark Wu
你试了 FileProcessingMode.PROCESS_CONTINUOUSLY 了么? On Mon, 16 Nov 2020 at 09:23, hepingtao wrote: > 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下: > > val stream = env.readFileStream(inputPath, 10, > FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) > > 源码里说明可以用

Re: Flink cdc 多表关联处理延迟很大

2020-11-15 文章 Jark Wu
瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 Best, Jark On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote: > select > ri.sub_clazz_number, > prcrs.rounds, > count(*) as num > from > subclazz gs > JOIN > (SELECT

Re: 关于去重(Deduplication)

2020-11-15 文章 Jark Wu
关于2, 你理解反了。性能上 deduplicate with first row 比 first_value 更优。 因为deduplicate with first row 在 state 里面只存了 key,value 只用了一个空字节来表示。 On Sun, 15 Nov 2020 at 21:47, macia kk wrote: > 感谢 Jark 回复, 一直有看你的博客,收益匪浅。 > > 关于2,性能上是 first_value 更优,因为只保存了 key 和 对应的 value,而窗口函数保留了整行数据? > > > > Jark Wu

flink 1.11.2运行时出错

2020-11-15 文章 曹武
flink on yarn 模式,任务在yarn中跑了两天后出错,错误信息如下: org.apache.flink.util.FlinkException: JobManager responsible for fa0e8f776be3b5cd6573e1922da67c1f lost the leadership. at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415)

Re: Flink cdc 多表关联处理延迟很大

2020-11-15 文章 丁浩浩
select ri.sub_clazz_number, prcrs.rounds, count(*) as num from subclazz gs JOIN (SELECT gce.number, min( gce.extension_value ) AS grade FROM course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP BY gce.number) AS temp ON temp.number =

flink 任务动态编译异常

2020-11-15 文章 iwodetianna
hi 遇到了一个问题, 本地调测正常,扔服务器上就会报错 环境: flink: 1.10.1 scala: 2.11 业务需求flink动态生成规则, 我用一个模板文件根据配置生成完整的规则, 然后动态编译,在Pattern的filter函数中调用 任务jar包放到了flink的lib目录下 请大神帮忙分析一下原因,谢谢!!! 报错信息: Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: e1MyTest ClassLoader

batch模式broadcast hash join为什么会有数据丢失

2020-11-15 文章

Re: Flink中readFile中如何只读取增量文件

2020-11-15 文章 hepingtao
我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下: val stream = env.readFileStream(inputPath, 10, FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long) 方法替代,但事实上FileProcessingMode并没有对应的

Re: 关于去重(Deduplication)

2020-11-15 文章 macia kk
感谢 Jark 回复, 一直有看你的博客,收益匪浅。 关于2,性能上是 first_value 更优,因为只保存了 key 和 对应的 value,而窗口函数保留了整行数据? Jark Wu 于2020年11月15日周日 下午8:45写道: > 主要两个区别: > 1. 在语义上,deduplicate 是整行去重,而 first_value, last_value 是列去重。比如 deduplicate > with last row,是保留最后一行,如果最后一行中有 null 值,也会保留。而 last_value 是保留该列的最后非 null 值。 > 2. 性能上

jobmanager与taskmanager之间用rpc通信,为什么taskmanager之间用netty通信?

2020-11-15 文章 Jeff
如题! jobmanager与taskmanager之前通信也用netty通信不行吗?

Re: 关于去重(Deduplication)

2020-11-15 文章 Jark Wu
主要两个区别: 1. 在语义上,deduplicate 是整行去重,而 first_value, last_value 是列去重。比如 deduplicate with last row,是保留最后一行,如果最后一行中有 null 值,也会保留。而 last_value 是保留该列的最后非 null 值。 2. 性能上 deduplicate 更优,比如 first row, 只保存了 key 的state信息。 Best, Jark On Sun, 15 Nov 2020 at 19:23, macia kk wrote: > 各位大佬: > >

关于去重(Deduplication)

2020-11-15 文章 macia kk
各位大佬: 我看文档上建议使用的去重方式是用窗口函数 SELECT [column_list]FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS