Re:Re:Blink 1.11 create view是不是没有办法把rowtime带下去?

2020-11-15 文章 周虓岗









不是,我值得是table api可以带event time。 如果整个使用sql表达,怎么把time attribute待下去










在 2020-11-16 15:53:44,"hailongwang" <18868816...@163.com> 写道:
>Hi zhou,
>   你是指的 createTemporaryView 这个方法吗,这个方法上也可以指定字段,例子可以查看[1]。
>其中 createTemporaryView 的实现也是间接调用了 fromDataStream 方法[2]。
>
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-view-from-a-datastream-or-dataset
>[2] 
>https://github.com/apache/flink/blob/c24185d1c2853d5c56eed6c40e5960d2398474ca/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java#L253
>
>
>Best,
>Hailong
>
>
>
>
>在 2020-11-16 13:48:35,"周虓岗"  写道:
>
>通过table api的// declare an additional logical field as an event time attribute
>
>Tabletable=tEnv.fromDataStream(stream,$("user_name"),$("data"),$("user_action_time").rowtime()");
>
>
>可以把eventtime往后传,
>如果使用createview的话怎么把这个time attribute往后带吗?
>
>
>不往后传的话可能会
>
>
>这个有什么方法吗?
>
>
>
>
>
>
> 


Re:Blink 1.11 create view是不是没有办法把rowtime带下去?

2020-11-15 文章 hailongwang
Hi zhou,
   你是指的 createTemporaryView 这个方法吗,这个方法上也可以指定字段,例子可以查看[1]。
其中 createTemporaryView 的实现也是间接调用了 fromDataStream 方法[2]。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-view-from-a-datastream-or-dataset
[2] 
https://github.com/apache/flink/blob/c24185d1c2853d5c56eed6c40e5960d2398474ca/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java#L253


Best,
Hailong




在 2020-11-16 13:48:35,"周虓岗"  写道:

通过table api的// declare an additional logical field as an event time attribute

Tabletable=tEnv.fromDataStream(stream,$("user_name"),$("data"),$("user_action_time").rowtime()");


可以把eventtime往后传,
如果使用createview的话怎么把这个time attribute往后带吗?


不往后传的话可能会


这个有什么方法吗?






 

Re: Flink sql 无法用!=

2020-11-15 文章 Danny Chan
是的 <> 是 SQL 标准推荐的用法。

jindy_liu <286729...@qq.com> 于2020年11月16日周一 下午2:24写道:

> 用<>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象

2020-11-15 文章 Danny Chan
DataSet 已经是社区准备 deprecate 的 API 了,不建议再使用。1.12 版本后推荐统一使用 DataStream,使用
sqlQuery 接口拿到 table 对象后转成 DataStream。

Asahi Lee <978466...@qq.com> 于2020年11月13日周五 下午4:05写道:

> BatchTableEnvironment对象可以进行table to dataset; dataset to table
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> danny0...@apache.org>;
> 发送时间: 2020年11月10日(星期二) 下午2:43
> 收件人: "user-zh"
> 主题: Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
>
>
>
> 拿 BatchTableEnvironment 对象作什么用处呢 ? 我们有 TableEnvironmentInternal 但是不推荐使用。
>
> Asahi Lee <978466...@qq.com> 于2020年11月9日周一 下午5:09写道:
>
> > 是的,BatchTableEnvironment 对象
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人:
> >  
> "user-zh"
> >
> <
> > danny0...@apache.org>;
> > 发送时间: 2020年11月9日(星期一) 中午12:34
> > 收件人: "user-zh" >
> > 主题: Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
> >
> >
> >
> > >
> > > BatchTableEnvironment 环境
> >
> >
> > 是说  BatchTableEnvironment 对象吗
> >
> > Asahi Lee <978466...@qq.com> 于2020年11月9日周一 上午10:48写道:
> >
> > > 你好!
> > > &nbsp; &nbsp; &nbsp; 我使用的是flink
> > 1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取:
> > > // ** // BLINK BATCH QUERY //
> **
> > import
> > > org.apache.flink.table.api.EnvironmentSettings; import
> > > org.apache.flink.table.api.TableEnvironment;
> EnvironmentSettings
> > bbSettings
> > > =
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > > TableEnvironment bbTableEnv =
> > >
> >
> TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?


Blink 1.11 create view是不是没有办法把rowtime带下去?

2020-11-15 文章 周虓岗
通过table api的// declare an additional logical field as an event time attribute

Tabletable=tEnv.fromDataStream(stream,$("user_name"),$("data"),$("user_action_time").rowtime()");


可以把eventtime往后传,
如果使用createview的话怎么把这个time attribute往后带吗?


不往后传的话可能会


这个有什么方法吗?



Re: flink算子类在多个subtask中是各自初始化1个实例对象吗?

2020-11-15 文章 tison
可以这么认为,大体上你可以认为每个并发有自己的环境。

技术上,算子对象是每个并发会实例化一个,而 static 变量的【共享】程度跟你设置的 slot per TM
值还有其他一些调度相关的参数有关,但是最好不要依赖这种实现层面的东西。

一种常见的误解是我创建一个 static HashMap 就神奇地拥有了全局的键值存储,这当然是不对的,只有在同一个 JVM 实例上也就是同一个 TM
上的任务才会看到同一个 HashMap 对象,而这几乎是不可控的。

可以看一下这篇文档[1]对物理部署的实际情况有一个基本的认知。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html


hl9...@126.com  于2020年11月16日周一 下午1:55写道:

> Hi,all:
>
> flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?
>
> 我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?
> 希望有朋友能解释下算子在job运行中初始化的过程。
>
> 测试相关代码如下:
> // flink 1.10.2版本,并行度为3
> @Slf4j
> public class PersonFlatMap extends RichFlatMapFunction String>, Person> {
> private transient ValueState state;
>
> public PersonFlatMap(){
> log.info(String.format("PersonFlatMap【%s】:
> 创建实例",this.toString()));
> }
>
> @Override
> public void open(Configuration parameters) throws IOException {
> //略去无关代码...
> log.info(String.format("PersonFlatMap【%s】:初始化状态!",
> this.toString()));
> }
>
> @Override
> public void flatMap(Tuple2 t, Collector
> collector) throws Exception {
> Person p = JSONUtil.toObject(t.f1,Person.class);
> collector.collect(p);
> if(state.value() == null){state.update(0);}
> state.update(state.value() + 1);
> log.info("state: "+state.value());
> }
> }
>
> //测试日志输出
> ...
> flink-10.2 - [2020-11-16 13:41:54.360] - INFO  [main]
> com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例
> //此处略去无关日志...
> flink-10.2 - [2020-11-16 13:42:00.326] - INFO  [Flat Map -> Sink: Print to
> Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
> - Initializing heap keyed state backend with stream factory.
> flink-10.2 - [2020-11-16 13:42:00.351] - INFO  [Flat Map -> Sink: Print to
> Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态!
> flink-10.2 - [2020-11-16 13:42:00.354] - INFO  [Flat Map -> Sink: Print to
> Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态!
> flink-10.2 - [2020-11-16 13:42:00.356] - INFO  [Flat Map -> Sink: Print to
> Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态!
> ...
>
>
>
>
> hl9...@126.com
>


Re: Reduce等函数的对下reuse问题

2020-11-15 文章 赵一旦
再具体点,reduce中return的对象作为reduce之后输出(这里是否涉及立即序列化)。

reduce(new ReduceFunction{

  @Override
  public ObjCls reduce( ObjCls ele1, ObjCls ele2 ){
long resultPv = ele1.getPv() + ele2.getPv();

ele1.setPv(999);   //  此处如果加这么一句setPv,会影响到什么算子呢?(各种可能DAG情况下)

ele1.setPv( resultPv );
return ele1;
  }

})

赵一旦  于2020年11月16日周一 下午2:40写道:

> 如题,想知道reduce函数实现的时候,什么情况复用对下可能导致问题呢?or永远不可能导致问题呢?
>
>
> 比如计算图中存在很多重复计算:
>
> streamA.reduce(reduceFunction1,);
>
> streamA.reduce(reduceFunction2,);
>
> streamA.
>


Reduce等函数的对下reuse问题

2020-11-15 文章 赵一旦
如题,想知道reduce函数实现的时候,什么情况复用对下可能导致问题呢?or永远不可能导致问题呢?


比如计算图中存在很多重复计算:

streamA.reduce(reduceFunction1,);

streamA.reduce(reduceFunction2,);

streamA.


Re: 关于去重(Deduplication)

2020-11-15 文章 macia kk
好的,明白了,谢谢

Jark Wu  于2020年11月16日周一 上午10:27写道:

> 关于2, 你理解反了。性能上 deduplicate with first row 比 first_value 更优。 因为deduplicate
> with first row 在 state 里面只存了 key,value 只用了一个空字节来表示。
>
> On Sun, 15 Nov 2020 at 21:47, macia kk  wrote:
>
> > 感谢 Jark 回复, 一直有看你的博客,收益匪浅。
> >
> > 关于2,性能上是 first_value 更优,因为只保存了 key 和 对应的 value,而窗口函数保留了整行数据?
> >
> >
> >
> > Jark Wu  于2020年11月15日周日 下午8:45写道:
> >
> > > 主要两个区别:
> > > 1. 在语义上,deduplicate 是整行去重,而 first_value, last_value 是列去重。比如 deduplicate
> > > with last row,是保留最后一行,如果最后一行中有 null 值,也会保留。而 last_value 是保留该列的最后非 null
> 值。
> > > 2. 性能上 deduplicate 更优,比如 first row, 只保存了 key 的state信息。
> > >
> > > Best,
> > > Jark
> > >
> > > On Sun, 15 Nov 2020 at 19:23, macia kk  wrote:
> > >
> > > > 各位大佬:
> > > >
> > > >   我看文档上建议使用的去重方式是用窗口函数
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> > > > >
> > > >
> > > > SELECT [column_list]FROM (
> > > >SELECT [column_list],
> > > >  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
> > > >ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
> > > >FROM table_name)WHERE rownum <= N [AND conditions]
> > > >
> > > >
> > > > 但是我看 Flink SQL 里还有个 first_value, laste_value,也能实现同样的目标。
> > > > 请问这两者有什么区别吗,尤其是在 watermark  以及状态管理上?
> > > >
> > >
> >
>


changlog-json??????????

2020-11-15 文章 DovE?g
Hi,??

??
SQL
select memberid, shop_time, workerid, resource_type, proctime
from(
    select memberid, shop_time, workerid, resource_type, proctime
    from inviteShop
    where shop_time >= DATE_FORMAT(LOCALTIMESTAMP, 
'-MM-dd')
) t0
inner join
(
    select memberid, min(shop_time) as shop_time
    from inviteShop
    where shop_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
    group by memberid
) t1 on t0.memberid = t1.memberid and t0.shop_time = t1.shop_time 



??inviteShop??KafkaChangelog-json
Best
SELECT [column_list] FROM (SELECT [column_list],  ROW_NUMBER() OVER 
([PARTITION BY col1[, col2...]]ORDER BY time_attr [asc|desc]) AS rownum 
   FROM table_name) WHERE rownum = 1org.apache.flink.table.api.TableException: 
Deduplicate doesn't support consuming update and delete changes which is 
produced by node TableSourceScan

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 Jark Wu
1.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
2. 这是1.12 的功能,定义在 sink DDL with 属性里的。

On Mon, 16 Nov 2020 at 14:18, jindy_liu <286729...@qq.com> wrote:

> 哦,这样啊
> 1、加上一个 deduplicate by sink key 节点在sql中是怎么写的?
> 2、另外sql 中有关键字能单独指定一条sink sql的并发度吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink sql 无法用!=

2020-11-15 文章 jindy_liu
用<>



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql 无法用!=

2020-11-15 文章 丁浩浩
我想在where条件下用不等号报错,难道flink sql不等号不是!=这个吗?
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.runtime.CalciteException: Bang equal '!=' is not allowed 
under the current SQL conformance level

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
哦,这样啊
1、加上一个 deduplicate by sink key 节点在sql中是怎么写的?
2、另外sql 中有关键字能单独指定一条sink sql的并发度吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink算子类在多个subtask中是各自初始化1个实例对象吗?

2020-11-15 文章 hl9...@126.com
Hi,all:
flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?
我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?
希望有朋友能解释下算子在job运行中初始化的过程。

测试相关代码如下:
// flink 1.10.2版本,并行度为3
@Slf4j
public class PersonFlatMap extends RichFlatMapFunction, 
Person> {
private transient ValueState state;

public PersonFlatMap(){
log.info(String.format("PersonFlatMap【%s】: 创建实例",this.toString()));
}

@Override
public void open(Configuration parameters) throws IOException {
//略去无关代码...
log.info(String.format("PersonFlatMap【%s】:初始化状态!", this.toString()));
}

@Override
public void flatMap(Tuple2 t, Collector collector) 
throws Exception {
Person p = JSONUtil.toObject(t.f1,Person.class);
collector.collect(p);
if(state.value() == null){state.update(0);}
state.update(state.value() + 1);
log.info("state: "+state.value());
}
}

//测试日志输出
...
flink-10.2 - [2020-11-16 13:41:54.360] - INFO  [main] 
com.toonyoo.operator.PersonFlatMap  - 
PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例
//此处略去无关日志...
flink-10.2 - [2020-11-16 13:42:00.326] - INFO  [Flat Map -> Sink: Print to Std. 
Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  - 
Initializing heap keyed state backend with stream factory.
flink-10.2 - [2020-11-16 13:42:00.351] - INFO  [Flat Map -> Sink: Print to Std. 
Out (1/3)] com.toonyoo.operator.PersonFlatMap  - 
PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态!
flink-10.2 - [2020-11-16 13:42:00.354] - INFO  [Flat Map -> Sink: Print to Std. 
Out (3/3)] com.toonyoo.operator.PersonFlatMap  - 
PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态!
flink-10.2 - [2020-11-16 13:42:00.356] - INFO  [Flat Map -> Sink: Print to Std. 
Out (2/3)] com.toonyoo.operator.PersonFlatMap  - 
PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态!
...




hl9...@126.com


Re: Flink中readFile中如何只读取增量文件

2020-11-15 文章 Jark Wu
可以去 JIRA 中开个 issue 反馈下这个功能。

On Mon, 16 Nov 2020 at 12:45, hectorhedev 
wrote:

> 试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY
> 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED
> 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。
>
> 这个问题之前的邮件里有讨论过,见:
> http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html
> 官网文档也有说明:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources
>
> IMPORTANT NOTES:
> If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a
> file is modified, its contents are re-processed entirely. This can break
> the “exactly-once” semantics, as appending data at the end of a file will
> lead to all its contents being re-processed.
>
> val stream = env.readFile(textInputFormat, inputPath,
> FileProcessingMode.PROCESS_CONTINUOUSLY, 10)
>
>
> --
> 发件人:Jark [via Apache Flink 中文用户邮件列表] 
> 发送时间:2020年11月16日(星期一) 10:36
> 收件人:hepingtao 
> 主 题:Re: Flink中readFile中如何只读取增量文件
>
>  你试了  FileProcessingMode.PROCESS_CONTINUOUSLY 了么?
>
> On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]>
> wrote:
>
> > 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream
> 是可以实现的,代码如下:
> >
> > val stream = env.readFileStream(inputPath, 10,
> > FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
> >
> > 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long)
> > 方法替代,但事实上FileProcessingMode并没有对应的
> > watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source
> > Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。
> >
> > 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢!
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink.147419.n8.nabble.com/Flink-readFile-tp142p8633.html
>  To start a new topic under Apache Flink 中文用户邮件列表, email
> ml+s147419n1...@n8.nabble.com
>  To unsubscribe from Apache Flink 中文用户邮件列表, click here.
> NAML
>
>


flink-1.11.2 job启动不起来,

2020-11-15 文章 史 正超
启动命令:run -d -m yarn-cluster -p 12 -yjm 1600 -ytm 12288 -ys 12 -ynm xxx -yqu 
flink-critical -j 
/app/flink-1.11.2/executor/fcbox-streaming-sql-platform-1.11.jar --sqlid 17 
--jobName realtime_app_kpi_dis_day_16

12个并行度, 12个slot,启动不了

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No 
pooled slot available and request to ResourceManager for new slot failed
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
... 29 common frames omitted
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No 
pooled slot available and request to ResourceManager for new slot failed
... 27 common frames omitted
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
Could not fulfill slot request d0d4eb5366379663f826d9cbf1eb3ff0.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
... 20 common frames omitted
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
 Could not fulfill slot request d0d4eb5366379663f826d9cbf1eb3ff0. Requested 
resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
at 
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
at 
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
... 27 common frames omitted




回复:Flink中readFile中如何只读取增量文件

2020-11-15 文章 hectorhedev
试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY 
并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED
 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。

这个问题之前的邮件里有讨论过,见:http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html
官网文档也有说明:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources

IMPORTANT NOTES:
If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file 
is modified, its contents are re-processed entirely. This can break the 
“exactly-once” semantics, as appending data at the end of a file will lead to 
all its contents being re-processed.

val stream = env.readFile(textInputFormat, inputPath, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 10)


--
发件人:Jark [via Apache Flink 中文用户邮件列表] 
发送时间:2020年11月16日(星期一) 10:36
收件人:hepingtao 
主 题:Re: Flink中readFile中如何只读取增量文件

 你试了  FileProcessingMode.PROCESS_CONTINUOUSLY 了么? 

On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]> 
wrote: 

> 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下: 
> 
> val stream = env.readFileStream(inputPath, 10, 
> FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) 
> 
> 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long) 
> 方法替代,但事实上FileProcessingMode并没有对应的 
> watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source 
> Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。 
> 
> 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢! 
> 
> 
> 
> -- 
> Sent from: http://apache-flink.147419.n8.nabble.com/



If you reply to this email, your message will be added to the discussion below: 
http://apache-flink.147419.n8.nabble.com/Flink-readFile-tp142p8633.html 
 To start a new topic under Apache Flink 中文用户邮件列表, email 
ml+s147419n1...@n8.nabble.com 
 To unsubscribe from Apache Flink 中文用户邮件列表, click here.
NAML



Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
图片png格式,怕看不了,我文字补充下:
1、print的最后几行。

32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0)
32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0)
32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0)
32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0)
32> +I(195,jindy195,2020-07-03T18:04:22,0,statu0)
32> +I(196,jindy196,2020-07-03T18:04:22,0,statu0)
32> +I(197,jindy197,2020-07-03T18:04:22,0,statu0)
32> +I(198,jindy198,2020-07-03T18:04:22,0,statu0)
32> +I(199,jindy199,2020-07-03T18:04:22,0,statu0)
36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)

2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!

同时修下笔误:===>
snapshot后 test_status中的数据正常:
0, jindy0, 2020-07-06T20:01:15 , 0, statu0
1, jindy2, 2020-11-12T00:00:02 , 1, statu1
2, jindy2, 2020-07-03T18:04:22 , 2, statu2




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 Jark Wu
如果你是改了test表上的 status 关联字段,那么是会出现这个现象的。你一开始的 example 不是改 status 字段的。

这个问题的本质是 join key 和你最终的 sink key 不一致,导致可能出现乱序。
这个只需要在 sink 前显式按照 sink key shuffle 应该就能解决,比如加上一个 deduplicate by sink key 节点。
或者在 1.12 版本中,只需要 sink 并发与前面节点的并发不一样,框架也会自动加上一个 sink key shuffle。

关于你说的 join 节点热点问题,那是因为你的 status key 太少了,导致数据倾斜严重。





On Mon, 16 Nov 2020 at 12:03, jindy_liu <286729...@qq.com> wrote:

> 怕图片看不清,
> 我文字补充下:
> 1、print的最后几行。
>
> 32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0)
> 32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0)
> 32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0)
> 32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0)
> 32> +I(195,jindy195,2020-07-03T18:04:22,0,statu0)
> 32> +I(196,jindy196,2020-07-03T18:04:22,0,statu0)
> 32> +I(197,jindy197,2020-07-03T18:04:22,0,statu0)
> 32> +I(198,jindy198,2020-07-03T18:04:22,0,statu0)
> 32> +I(199,jindy199,2020-07-03T18:04:22,0,statu0)
> 36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
> 32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
> 30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
> 36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
> 36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
> 30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)
>
> 2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
怕图片看不清,
我文字补充下:
1、print的最后几行。

32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0)
32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0)
32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0)
32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0)
32> +I(195,jindy195,2020-07-03T18:04:22,0,statu0)
32> +I(196,jindy196,2020-07-03T18:04:22,0,statu0)
32> +I(197,jindy197,2020-07-03T18:04:22,0,statu0)
32> +I(198,jindy198,2020-07-03T18:04:22,0,statu0)
32> +I(199,jindy199,2020-07-03T18:04:22,0,statu0)
36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)

2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
图片是屏幕截图,png格式的。忘记加后缀了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: Flink中readFile中如何只读取增量文件

2020-11-15 文章 Hector He
试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY 
并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED
 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。


这个问题之前的邮件里有讨论过,见:http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html
官网文档也有说明:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources

IMPORTANT NOTES:

  1.  If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when 
a file is modified, its contents are re-processed entirely. This can break the 
“exactly-once” semantics, as appending data at the end of a file will lead to 
all its contents being re-processed.


val stream = env.readFile(textInputFormat, inputPath, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 10)

发件人: Jark Wu
发送时间: 2020年11月16日 10:36
收件人: user-zh
主题: Re: Flink中readFile中如何只读取增量文件

你试了  FileProcessingMode.PROCESS_CONTINUOUSLY 了么?

On Mon, 16 Nov 2020 at 09:23, hepingtao 
wrote:

> 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下:
>
> val stream = env.readFileStream(inputPath, 10,
> FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>
> 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long)
> 方法替代,但事实上FileProcessingMode并没有对应的
> watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source
> Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。
>
> 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
我又重试了次,不用重启job也会有问题,就是把并行度大于1会有问题!。

1、直接在sql-client里,启动/data/home/jindyliu/flink-demo/flink-1.11.2/bin/sql-client.sh
embedded -d 
/data/home/jindyliu/flink-demo/flink-1.11.2//conf/sql-client-defaults.yaml
sql-client-defaults.yaml的并行度设置为40.

数据一样,其中test表规模是200w条,status表11条。

源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)
源表status
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

//输出
CREATE TABLE test_status_print (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'print'
);

//联接
INSERT into test_status_print 
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;

复现操作,在mysql-cdc snapshot结束后,改test 表中的status字段,会出现顺序问题。我用print打印了。
snapshot后 test_status中的数据正常:
0, jindy0, 2020-07-06T20:01:15 , 0, statu0
1, jindy2, 2020-11-12T00:00:02 , 1, statu2
2, jindy2, 2020-07-03T18:04:22 , 2, statu3

snapshot后,将mysql表中记录id=0,1,2的行中的status值改为3,预期结果
0, jindy0, 2020-07-06T20:01:15 , 3, statu3
1, jindy2, 2020-11-12T00:00:02 , 3, statu3
2, jindy2, 2020-07-03T18:04:22 , 3, statu3
但输出顺序上有问题,会导致test_status表中的id=0,2两条记录丢失。

1、print输出:
 


ps:
另外观察到另外一个问题是:source数据送到join算子里,好像没啥hash能力,基本都挤在了一个结点上处理了?为啥会这样?感觉这样join算子会是瓶颈!!!很容易反压?!
 

@jark,帮忙看看,我的版本是Version: 1.11.2 Commit: fe36135 @
2020-09-09T16:19:03+02:00,官网下载的 ?







--
Sent from: http://apache-flink.147419.n8.nabble.com/


??????flink sql hive streaming??????????????????bug

2020-11-15 文章 Excalibur
??hive table read
      
blinkStreamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",
 true);
      Table table=blinkStreamTableEnv.sqlQuery("SELECT * FROM 
test.table_config /*+ 
OPTIONS('streaming-source.enable'='true','streaming-source.monitor-interval' = 
'30 min')*/");


??flink??
// given that the parallelism of the function is 1, we 
can only have 1 or 0 retrieved items.
// the 0 is for the case that we are migrating from a 
previous Flink version.
 
Preconditions.checkArgument(retrievedStates.size() <= 1,
getClass().getSimpleName() + " retrieved 
invalid state.");
 
if (retrievedStates.size() == 1 && 
globalModificationTime != Long.MIN_VALUE) {
// this is the case where we have both legacy 
and new state.
// The two should be mutually exclusive for the 
operator, thus we throw the exception.
 
throw new IllegalArgumentException(
"The " + getClass().getSimpleName() + " 
has already restored from a previous Flink version.");
 
} else if (retrievedStates.size() == 1) {
this.globalModificationTime = 
retrievedStates.get(0);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved a global mod 
time of {}.",
getClass().getSimpleName(), 
globalModificationTime);
}
}



--  --
??: 
   "Excalibur"  
  
<972263...@qq.com>;
: 2020??11??16??(??) 10:54
??: "user-zh"

flink sql hive streaming??????????????????bug

2020-11-15 文章 Excalibur
??1.11.2
??java.lang.IllegalArgumentException: 
The ContinuousFileMonitoringFunction has already restored from a 
previous Flink version.
    at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
??



??

flink sql hive streaming??????????????????bug

2020-11-15 文章 Excalibur
??1.11.2
??java.lang.IllegalArgumentException: The ContinuousFileMonitoringFunction has
 already restored from a previous Flink version.
    at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
    at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)

flink sql hive streaming??????????????????bug

2020-11-15 文章 Excalibur
??1.11.2
??java.lang.IllegalArgumentException: The 
ContinuousFileMonitoringFunction has already restored from a previous Flink 
version.
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)


??



??

Re: Flink中readFile中如何只读取增量文件

2020-11-15 文章 Jark Wu
你试了  FileProcessingMode.PROCESS_CONTINUOUSLY 了么?

On Mon, 16 Nov 2020 at 09:23, hepingtao 
wrote:

> 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下:
>
> val stream = env.readFileStream(inputPath, 10,
> FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>
> 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long)
> 方法替代,但事实上FileProcessingMode并没有对应的
> watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source
> Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。
>
> 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink cdc 多表关联处理延迟很大

2020-11-15 文章 Jark Wu
瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。

Best,
Jark

On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:

> select
> ri.sub_clazz_number,
> prcrs.rounds,
> count(*) as num
> from
> subclazz gs
> JOIN
> (SELECT gce.number, min( gce.extension_value ) AS grade FROM
> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP
> BY gce.number) AS temp
> ON
> temp.number = gs.course_number AND temp.grade>30
> JOIN
> right_info ri
> ON
> gs.number = ri.sub_clazz_number
> join
> wide_subclazz ws
> on
> ws.number = ri.sub_clazz_number
> join
> course gc
> on
> gc.number = ws.course_number and gc.course_category_id in (30,40)
> left join
> performance_regular_can_renewal_sign prcrs
> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
> and not exists (select 1 from internal_staff gis where gis.user_id =
> ri.user_id)
> and not exists (select 1 from clazz_extension ce where ws.clazz_number =
> ce.number
> and ce.extension_type = 3 and ce.isdel = 0
> and ce.extension_value in (1,3,4,7,8,11))
> group by ri.sub_clazz_number, prcrs.rounds
> Sql代码是这样的。
> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>
> > 在 2020年11月14日,下午5:53,Jark Wu  写道:
> >
> > 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
> > 需要明确下,到底是什么节点慢了。
> >
> > On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
> >
> >> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
> >> 有没有比较好的优化方案能缓解这样的问题?
>
>
>


Re: 关于去重(Deduplication)

2020-11-15 文章 Jark Wu
关于2, 你理解反了。性能上 deduplicate with first row 比 first_value 更优。 因为deduplicate
with first row 在 state 里面只存了 key,value 只用了一个空字节来表示。

On Sun, 15 Nov 2020 at 21:47, macia kk  wrote:

> 感谢 Jark 回复, 一直有看你的博客,收益匪浅。
>
> 关于2,性能上是 first_value 更优,因为只保存了 key 和 对应的 value,而窗口函数保留了整行数据?
>
>
>
> Jark Wu  于2020年11月15日周日 下午8:45写道:
>
> > 主要两个区别:
> > 1. 在语义上,deduplicate 是整行去重,而 first_value, last_value 是列去重。比如 deduplicate
> > with last row,是保留最后一行,如果最后一行中有 null 值,也会保留。而 last_value 是保留该列的最后非 null 值。
> > 2. 性能上 deduplicate 更优,比如 first row, 只保存了 key 的state信息。
> >
> > Best,
> > Jark
> >
> > On Sun, 15 Nov 2020 at 19:23, macia kk  wrote:
> >
> > > 各位大佬:
> > >
> > >   我看文档上建议使用的去重方式是用窗口函数
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> > > >
> > >
> > > SELECT [column_list]FROM (
> > >SELECT [column_list],
> > >  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
> > >ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
> > >FROM table_name)WHERE rownum <= N [AND conditions]
> > >
> > >
> > > 但是我看 Flink SQL 里还有个 first_value, laste_value,也能实现同样的目标。
> > > 请问这两者有什么区别吗,尤其是在 watermark  以及状态管理上?
> > >
> >
>


flink 1.11.2运行时出错

2020-11-15 文章 曹武
flink on yarn 模式,任务在yarn中跑了两天后出错,错误信息如下:
org.apache.flink.util.FlinkException: JobManager responsible for
fa0e8f776be3b5cd6573e1922da67c1f lost the leadership.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415)
[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1300(TaskExecutor.java:173)
[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
[DataHub-2.0-SNAPSHOT.jar:na]
at java.util.Optional.ifPresent(Optional.java:159) ~[na:1.8.0_181]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[DataHub-2.0-SNAPSHOT.jar:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[DataHub-2.0-SNAPSHOT.jar:na]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[DataHub-2.0-SNAPSHOT.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[DataHub-2.0-SNAPSHOT.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[DataHub-2.0-SNAPSHOT.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[DataHub-2.0-SNAPSHOT.jar:na]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[DataHub-2.0-SNAPSHOT.jar:na]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[DataHub-2.0-SNAPSHOT.jar:na]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[DataHub-2.0-SNAPSHOT.jar:na]
Caused by: java.lang.Exception: Job leader for job id
fa0e8f776be3b5cd6573e1922da67c1f lost leadership.
... 24 common frames omitted





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink cdc 多表关联处理延迟很大

2020-11-15 文章 丁浩浩
select 
ri.sub_clazz_number, 
prcrs.rounds, 
count(*) as num 
from 
subclazz gs 
JOIN 
(SELECT gce.number, min( gce.extension_value ) AS grade FROM 
course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP BY 
gce.number) AS temp 
ON 
temp.number = gs.course_number AND temp.grade>30 
JOIN 
right_info ri
ON  
gs.number = ri.sub_clazz_number 
join
wide_subclazz ws  
on
ws.number = ri.sub_clazz_number 
join 
course gc 
on 
gc.number = ws.course_number and gc.course_category_id in (30,40)  
left join 
performance_regular_can_renewal_sign prcrs 
on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2) 
where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null 
and not exists (select 1 from internal_staff gis where gis.user_id = ri.user_id)
and not exists (select 1 from clazz_extension ce where ws.clazz_number = 
ce.number  
and ce.extension_type = 3 and ce.isdel = 0  
and ce.extension_value in (1,3,4,7,8,11))  
group by ri.sub_clazz_number, prcrs.rounds 
Sql代码是这样的。
瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。

> 在 2020年11月14日,下午5:53,Jark Wu  写道:
> 
> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
> 需要明确下,到底是什么节点慢了。
> 
> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
> 
>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>> 有没有比较好的优化方案能缓解这样的问题?




flink 任务动态编译异常

2020-11-15 文章 iwodetianna
hi
遇到了一个问题, 本地调测正常,扔服务器上就会报错

环境:
flink: 1.10.1
scala: 2.11

业务需求flink动态生成规则, 我用一个模板文件根据配置生成完整的规则, 然后动态编译,在Pattern的filter函数中调用

任务jar包放到了flink的lib目录下
请大神帮忙分析一下原因,谢谢!!!

报错信息: 
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: e1MyTest
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-d31dca76-0497-40a3-8e86-9df357cf90eb/job_2aa246bfd0d9e56ff448a8ecd1f57316/blob_p-07a73cda0b33b2be6e2db11c35dd37629fbe23f7-1f123a4f3e2d59eeddc0062b149e5abd'
 (valid JAR)
Class not resolvable through given classloader.
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:266)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:115)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: e1MyTest
 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
 at java.util.HashMap.readObject(HashMap.java:1412)
 at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
 at java.util.ArrayList.readObject(ArrayList.java:797)
 at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
 at java.util.ArrayList.readObject(ArrayList.java:797)
 at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)

batch模式broadcast hash join为什么会有数据丢失

2020-11-15 文章


Re: Flink中readFile中如何只读取增量文件

2020-11-15 文章 hepingtao
我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下:

val stream = env.readFileStream(inputPath, 10,
FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)

源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long)
方法替代,但事实上FileProcessingMode并没有对应的
watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source
Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。

所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于去重(Deduplication)

2020-11-15 文章 macia kk
感谢 Jark 回复, 一直有看你的博客,收益匪浅。

关于2,性能上是 first_value 更优,因为只保存了 key 和 对应的 value,而窗口函数保留了整行数据?



Jark Wu  于2020年11月15日周日 下午8:45写道:

> 主要两个区别:
> 1. 在语义上,deduplicate 是整行去重,而 first_value, last_value 是列去重。比如 deduplicate
> with last row,是保留最后一行,如果最后一行中有 null 值,也会保留。而 last_value 是保留该列的最后非 null 值。
> 2. 性能上 deduplicate 更优,比如 first row, 只保存了 key 的state信息。
>
> Best,
> Jark
>
> On Sun, 15 Nov 2020 at 19:23, macia kk  wrote:
>
> > 各位大佬:
> >
> >   我看文档上建议使用的去重方式是用窗口函数
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> > >
> >
> > SELECT [column_list]FROM (
> >SELECT [column_list],
> >  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
> >ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
> >FROM table_name)WHERE rownum <= N [AND conditions]
> >
> >
> > 但是我看 Flink SQL 里还有个 first_value, laste_value,也能实现同样的目标。
> > 请问这两者有什么区别吗,尤其是在 watermark  以及状态管理上?
> >
>


jobmanager与taskmanager之间用rpc通信,为什么taskmanager之间用netty通信?

2020-11-15 文章 Jeff
如题!  jobmanager与taskmanager之前通信也用netty通信不行吗?

Re: 关于去重(Deduplication)

2020-11-15 文章 Jark Wu
主要两个区别:
1. 在语义上,deduplicate 是整行去重,而 first_value, last_value 是列去重。比如 deduplicate
with last row,是保留最后一行,如果最后一行中有 null 值,也会保留。而 last_value 是保留该列的最后非 null 值。
2. 性能上 deduplicate 更优,比如 first row, 只保存了 key 的state信息。

Best,
Jark

On Sun, 15 Nov 2020 at 19:23, macia kk  wrote:

> 各位大佬:
>
>   我看文档上建议使用的去重方式是用窗口函数
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> >
>
> SELECT [column_list]FROM (
>SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
>FROM table_name)WHERE rownum <= N [AND conditions]
>
>
> 但是我看 Flink SQL 里还有个 first_value, laste_value,也能实现同样的目标。
> 请问这两者有什么区别吗,尤其是在 watermark  以及状态管理上?
>


关于去重(Deduplication)

2020-11-15 文章 macia kk
各位大佬:

  我看文档上建议使用的去重方式是用窗口函数


SELECT [column_list]FROM (
   SELECT [column_list],
 ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
   ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)WHERE rownum <= N [AND conditions]


但是我看 Flink SQL 里还有个 first_value, laste_value,也能实现同样的目标。
请问这两者有什么区别吗,尤其是在 watermark  以及状态管理上?