Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread abc15606
试过了,一样的,本质也是通过写文件。

发自我的iPhone

> 在 2020年8月21日,13:35,Jingsong Li  写道:
> 
> 是的
> 
>> On Fri, Aug 21, 2020 at 1:30 PM  wrote:
>> 
>> flink hive表的方式是什么意思?hive streaming吗?
>> 
>> 发自我的iPhone
>> 
 在 2020年8月21日,13:27,Jingsong Li  写道:
>>> 
>>> Flink filesystem connector 或者 DataStream用flink-orc
>> 的版本是比较新的版本,所以老版本的ORC读不了。
>>> 
>>> 建议你用Flink hive表的方式来写orc
>>> 
 On Fri, Aug 21, 2020 at 12:25 PM  wrote:
 
 Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。
 
 发自我的iPhone
 
>> 在 2020年8月21日,12:15,Jingsong Li  写道:
> 
> 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。
> 确定这个版本hive写出的数据可以被读取吗?
> 
>> On Fri, Aug 21, 2020 at 10:17 AM  wrote:
>> 
>> 使用版本是flink 1.11
>> Hive 2.1.1
>> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?
>> 
>> 
>> 
> 
> --
> Best, Jingsong Lee
 
>>> 
>>> 
>>> --
>>> Best, Jingsong Lee
>> 
>> 
> 
> -- 
> Best, Jingsong Lee



Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread Jingsong Li
是的

On Fri, Aug 21, 2020 at 1:30 PM  wrote:

> flink hive表的方式是什么意思?hive streaming吗?
>
> 发自我的iPhone
>
> > 在 2020年8月21日,13:27,Jingsong Li  写道:
> >
> > Flink filesystem connector 或者 DataStream用flink-orc
> 的版本是比较新的版本,所以老版本的ORC读不了。
> >
> > 建议你用Flink hive表的方式来写orc
> >
> >> On Fri, Aug 21, 2020 at 12:25 PM  wrote:
> >>
> >> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。
> >>
> >> 发自我的iPhone
> >>
>  在 2020年8月21日,12:15,Jingsong Li  写道:
> >>>
> >>> 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。
> >>> 确定这个版本hive写出的数据可以被读取吗?
> >>>
>  On Fri, Aug 21, 2020 at 10:17 AM  wrote:
> 
>  使用版本是flink 1.11
>  Hive 2.1.1
>  flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?
> 
> 
> 
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread abc15606
flink hive表的方式是什么意思?hive streaming吗?

发自我的iPhone

> 在 2020年8月21日,13:27,Jingsong Li  写道:
> 
> Flink filesystem connector 或者 DataStream用flink-orc 的版本是比较新的版本,所以老版本的ORC读不了。
> 
> 建议你用Flink hive表的方式来写orc
> 
>> On Fri, Aug 21, 2020 at 12:25 PM  wrote:
>> 
>> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。
>> 
>> 发自我的iPhone
>> 
 在 2020年8月21日,12:15,Jingsong Li  写道:
>>> 
>>> 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。
>>> 确定这个版本hive写出的数据可以被读取吗?
>>> 
 On Fri, Aug 21, 2020 at 10:17 AM  wrote:
 
 使用版本是flink 1.11
 Hive 2.1.1
 flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?
 
 
 
>>> 
>>> --
>>> Best, Jingsong Lee
>> 
> 
> 
> -- 
> Best, Jingsong Lee



Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread Jingsong Li
Flink filesystem connector 或者 DataStream用flink-orc 的版本是比较新的版本,所以老版本的ORC读不了。

建议你用Flink hive表的方式来写orc

On Fri, Aug 21, 2020 at 12:25 PM  wrote:

> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。
>
> 发自我的iPhone
>
> > 在 2020年8月21日,12:15,Jingsong Li  写道:
> >
> > 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。
> > 确定这个版本hive写出的数据可以被读取吗?
> >
> >> On Fri, Aug 21, 2020 at 10:17 AM  wrote:
> >>
> >> 使用版本是flink 1.11
> >> Hive 2.1.1
> >> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?
> >>
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread abc15606
Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。

发自我的iPhone

> 在 2020年8月21日,12:15,Jingsong Li  写道:
> 
> 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。
> 确定这个版本hive写出的数据可以被读取吗?
> 
>> On Fri, Aug 21, 2020 at 10:17 AM  wrote:
>> 
>> 使用版本是flink 1.11
>> Hive 2.1.1
>> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?
>> 
>> 
>> 
> 
> -- 
> Best, Jingsong Lee


Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread Jingsong Li
如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。
确定这个版本hive写出的数据可以被读取吗?

On Fri, Aug 21, 2020 at 10:17 AM  wrote:

> 使用版本是flink 1.11
> Hive 2.1.1
> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?
>
>
>

-- 
Best, Jingsong Lee


回复:flink1.10中hive module 没有plus,greaterThan等函数

2020-08-20 Thread faaron zheng
Thanks,原来是我的打开方式不对 在2020年08月21日 11:17,Rui Li 写道: 是只用了hive 
module么?建议的方式是同时加载hive module和core module,解析函数的时候会根据加载的顺序去每个module里查找。 On Fri, 
Aug 21, 2020 at 11:06 AM faaron zheng  wrote: > Hi all, 
我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module > 中的build-in 
function比如plus,greaterThan。这会导致同样的sql core module可以执行成功,hive > 
module却会报错,比如在使用row_number() over()时候。这是什么原因? -- Best regards! Rui Li

Re: hive只作为元数据管理可以读到具体的表数据吗?

2020-08-20 Thread Rui Li
hive catalog只负责管理元数据,具体读数据不是hive
catalog来做的哈。所以能读什么样的表取决于flink是不是有对应的connector。文档上看jdbc
connector还是不支持Oracle的。

On Fri, Aug 21, 2020 at 11:16 AM Bruce  wrote:

> 请教大佬:
>
>
>
>
> flink平台引入hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗?
>
> 比如hive里面存储了Oracle的t_log表元数据信息,flink可以用hivecatalog读取到t_log具体的表数据吗?
>
>
>
>
> 发自我的iPhone



-- 
Best regards!
Rui Li


Re: flink1.11启动问题

2020-08-20 Thread Yang Wang
这样报错看着是Yarn NM的报错,你每次启动都是这样吗,还是偶然一次的

如果是偶然一次的,那这个报错应该是Flink stopContainer的时候Yarn NM正好重启了


Best,
Yang

酷酷的浑蛋  于2020年8月20日周四 上午10:59写道:

>
>
> flink1.11启动后报这个错,然后任务就自己挂了,没有其它错误,也没有报我代码错
>
> org.apache.hadoop.yarn.exceptions.YarnException:Containercontainer_1590424616102_807478_01_02isnothandledbythisNodeManager
>
> atsun.reflect.NativeConstructorAccessorImpl.newInstance0(NativeMethod)~[?:1.8.0_191]
>
> atsun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)~[?:1.8.0_191]
>
> atsun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)~[?:1.8.0_191]
>
> atjava.lang.reflect.Constructor.newInstance(Constructor.java:423)~[?:1.8.0_191]
>
> atorg.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)~[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)~[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainerInternal(NMClientImpl.java:297)~[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainer(NMClientImpl.java:247)~[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$StatefulContainer$StopContainerTransition.transition(NMClientAsyncImpl.java:422)[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$StatefulContainer$StopContainerTransition.transition(NMClientAsyncImpl.java:413)[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.state.StateMachineFactory$MultipleInternalArc.doTransition(StateMachineFactory.java:385)[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.state.StateMachineFactory.access$300(StateMachineFactory.java:46)[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:448)[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$StatefulContainer.handle(NMClientAsyncImpl.java:498)[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atorg.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor.run(NMClientAsyncImpl.java:557)[release-sql-flink-1.11-v2.2.1.jar:?]
>
> atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[?:1.8.0_191]
>
> atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[?:1.8.0_191]
> atjava.lang.Thread.run(Thread.java:748)[?:1.8.0_191]
>
>


Re: flink1.10中hive module 没有plus,greaterThan等函数

2020-08-20 Thread Rui Li
是只用了hive module么?建议的方式是同时加载hive module和core
module,解析函数的时候会根据加载的顺序去每个module里查找。

On Fri, Aug 21, 2020 at 11:06 AM faaron zheng  wrote:

> Hi all, 我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module
> 中的build-in function比如plus,greaterThan。这会导致同样的sql core module可以执行成功,hive
> module却会报错,比如在使用row_number() over()时候。这是什么原因?



-- 
Best regards!
Rui Li


taskmanager引用通用jdbc连接池的问题

2020-08-20 Thread Bruce
请教大佬:
flink on 
yarn按照并行度分配了2个taskManager,然后我们这有个jdbc的连接池在基础工程里「最大连接数是50」,有个算子的计算逻辑调用了基础工程的jdbc连接池进行业务处理。


那么2个taskManager共用这50个最大连接数还是每个taskmanager最大50 * 2 =100个连接数呢?

发自我的iPhone

1.11??????????TableEnvironment.executeSql("insert into ...")??job??????????????

2020-08-20 Thread Asahi Lee
??
  insert 
into??job


??
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);

String sourceDDL = "CREATE TABLE datagen (  " +
" f_random INT,  " +
" f_random_str STRING,  " +
" ts AS localtimestamp,  " +
" WATERMARK FOR ts AS ts  " +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='10',  " +
" 'fields.f_random.min'='1',  " +
" 'fields.f_random.max'='5',  " +
" 'fields.f_random_str.length'='10'  " +
")";

bsTableEnv.executeSql(sourceDDL);
Table datagen = bsTableEnv.from("datagen");

System.out.println(datagen.getSchema());

String sinkDDL = "CREATE TABLE print_table (" +
" f_random int," +
" c_val bigint, " +
" wStart TIMESTAMP(3) " +
") WITH ('connector' = 'print') ";
bsTableEnv.executeSql(sinkDDL);

System.out.println(bsTableEnv.from("print_table").getSchema());

Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), 
TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by 
TUMBLE(ts, INTERVAL '5' second), f_random");
bsTableEnv.executeSql("insert into print_table select * from " + table);

hive只作为元数据管理可以读到具体的表数据吗?

2020-08-20 Thread Bruce
请教大佬:




flink平台引入hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗?

比如hive里面存储了Oracle的t_log表元数据信息,flink可以用hivecatalog读取到t_log具体的表数据吗?




发自我的iPhone

flink1.10中hive module 没有plus,greaterThan等函数

2020-08-20 Thread faaron zheng
Hi all, 我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module 中的build-in 
function比如plus,greaterThan。这会导致同样的sql core module可以执行成功,hive 
module却会报错,比如在使用row_number() over()时候。这是什么原因?

Flink SQL Map类型字段大小写不敏感支持

2020-08-20 Thread zilong xiao
如题,在业务中有遇到过在Map类型字段中有表示同一个含义但是大小写不一致的情况,比如有个Map字段 my_map,里面可能存在key
aB,ab,Ab,在SQL中取值时能否能做到大小写不敏感呢,my_map['ab'] 能取所有相关key的值


Re: Flink 启动问题

2020-08-20 Thread zilong xiao
-yt应该只能写一个目录,你有什么痛点呢?

guaishushu1...@163.com  于2020年8月20日周四 下午8:40写道:

>  大佬们知道 flink 的-yt命令是不支持多个目录吗,而且只能上传到集群.jar文件吗???
>
>
>
> guaishushu1...@163.com
>


Re: state序列化问题

2020-08-20 Thread shizk233
抱歉,是我表述不清楚,ListState>只是举个例子,并不是我的应用场景实际的状态。
从实际考虑,我想利用MapState保存一系列特殊的计数器Map,也就是MapState>,主要用来做一个伪窗口,key是窗口的开始时间。

主要想知道,在MapStateDescriptor声明类型信息时,我是否应该把内部Map声明成明确的HashMap类型,而不是Map类型?

Yun Tang  于2020年8月21日周五 上午12:13写道:

> Hi
>
> 如果想要在list中保存String,也就是list中的每个元素格式是String,ListState的格式应该是
> ListState, 而不是
> ListState>,后者表示有一个list,list中的每一个元素均是一个list
>
> ListState 本身并不属于java的collection,所以不存在ArrayList 与 LinkedList的区别。
>
> 祝好
> 唐云
> 
> From: shizk233 
> Sent: Thursday, August 20, 2020 18:00
> To: user-zh@flink.apache.org 
> Subject: state序列化问题
>
> Hi all,
>
> 请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化,
> 那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList,
> 会对类型信息提取产生不良影响吗?
>
> 按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。
> 但是都可以作为List>来声明。
>
> 请求野生的大佬支援一下!
>


flink1.10中hive module 没有plus,greaterThan等函数

2020-08-20 Thread faaron zheng
Hi all, 我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module 中的build-in 
function比如plus,greaterThan。这会导致同样的sql core module可以执行成功,hive 
module却会报错,比如在使用row_number() over()时候。这是什么原因?

Re: Flink checkpointing with Azure block storage

2020-08-20 Thread Yun Tang
Hi Boris

I think the official guide [1] should be enough to tell you how to configure.
However, I think your changes to flink-conf.ymal might not take effect as you 
have configured the state backend as 'filesystem' while logs still tell us that 
"No state backend has been configured, using default (Memory / JobManager) 
MemoryStateBackend".

You can view the log to see whether your changes printed to search for "Loading 
configuration property".

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration

Best
Yun Tang


From: Boris Lublinsky 
Sent: Friday, August 21, 2020 7:18
To: user 
Subject: Re: Flink checkpointing with Azure block storage

To test it, I created flink-conf.yaml file and put it in resource directory of 
my project
The file contains the following:


#==
# Fault tolerance and checkpointing
#==

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# .
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: 
wasb://@$.blob.core.windows.net/

fs.azure.account.key..blob.core.windows.net:
 

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

Which should of produce error,

But what I see is that it does not seen to take effect:


313 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
3327 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb
 for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
3329 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  
- Proposing leadership to contender akka://flink/user/rpc/jobmanager_3


On Aug 20, 2020, at 5:14 PM, Boris Lublinsky 
mailto:boris.lublin...@lightbend.com>> wrote:

Is there somewhere a complete configuration example for such option?



Monitor the usage of keyed state

2020-08-20 Thread Mu Kong
Hi community,

I have a Flink job running with RichMapFunction that uses keyed state.
Although the TTL is enabled, I wonder if there is a way that I can monitor
the memory usage of the keyed state. I'm using RocksDB as the state backend.

Best regards,
Mu


Re: 关于hive的一个疑问

2020-08-20 Thread Harold.Miao
hi

hive catlog只存储元数据,元数据信息可以通过hive client获取Hive
Table,然后通过table.getParameters()可以获取到。
至于具体数据,是跟你的元数据对应的存储系统相关的。要去对应的存储里面去查。

Bruce  于2020年8月20日周四 下午7:52写道:

> hi,all.
>
> hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗?
>
>
>
>
> 比如hive里面存储了MySQL,Oracle的表元数据信息,可以用hivecatalog读取到具体的表数据吗?
>
>
>
>
> 发自我的iPhone



-- 

Best Regards,
Harold Miao


Re: Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread 刘大龙
赞,是的,MySQL单机每秒能写入1条应该很多了,我之前做过一个业务需求,直接用Jdbc,而不是Flink jdbc 
connector写,按每批5000条数据,测下来性能也差不多1条,这个应该是MYSQL的瓶颈,而不在connector这边


> -原始邮件-
> 发件人: "Benchao Li" 
> 发送时间: 2020-08-20 22:11:53 (星期四)
> 收件人: user-zh 
> 抄送: 
> 主题: Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗
> 
> 每秒1多条不算少了吧,如果还想再高一些,可以提高一下sink.buffer-flush.max-rows配置,默认是100
> 
> LittleFall <1578166...@qq.com> 于2020年8月20日周四 下午7:56写道:
> 
> > 这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink.
> >
> > ```java
> > package main;
> >
> > import
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> >
> > public class Main {
> >
> > public static void main(String[] args) {
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(
> > StreamExecutionEnvironment.getExecutionEnvironment(),
> >
> >
> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> > );
> >
> > tEnv.executeSql("CREATE TABLE gen_stuff (\n" +
> > "\tstuff_id int,\n" +
> > "\tstuff_base_id int,\n" +
> > "\tstuff_name varchar(20)\n" +
> > ") WITH (\n" +
> > " 'connector' = 'datagen'," +
> > "'rows-per-second'='1000'," +
> > "'fields.stuff_id.kind'='sequence'," +
> > "'fields.stuff_id.start'='1'," +
> > "'fields.stuff_id.end'='1000'," +
> > "'fields.stuff_name.length'='15'" +
> > ")"
> > );
> > tEnv.executeSql("CREATE TABLE result_stuff (\n" +
> > "\tstuff_id int,\n" +
> > "\tstuff_base_id int,\n" +
> > "\tstuff_name varchar(20)\n" +
> > ") WITH (\n" +
> > "\t'connector'  = 'jdbc',\n" +
> > "\t'url'=
> > 'jdbc:mysql://127.0.0.1:3306/test?rewritebatchedstatements=true',\n" +
> > "\t'table-name' = 'result_stuff',\n" +
> > "\t'username'   = 'root',\n" +
> > "\t'password'   = ''\n" +
> > ")"
> > );
> >
> > tEnv.executeSql("insert into result_stuff select stuff_id,
> > stuff_base_id, stuff_name from gen_stuff");
> > }
> > }
> > ```
> >
> > 然而,mysql 每秒大约只多 1 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。
> >
> > 请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。
> >
> > 我使用的和 jdbc 有关的依赖如下:
> >
> > ```xml
> > 
> > org.apache.flink
> >
> > flink-connector-jdbc_${scala.binary.version}
> > ${flink.version}
> > 
> > 
> > mysql
> > mysql-connector-java
> > 8.0.21
> > 
> > ```
> >
> > (作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s)
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> 
> 
> -- 
> 
> Best,
> Benchao Li


--
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281


flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread abc15606
使用版本是flink 1.11
Hive 2.1.1
flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?




Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

2020-08-20 Thread yobdcdoll
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).
*apply(...)*.addSink(new TemplateMySQLSink());

On Wed, Aug 19, 2020 at 6:27 PM wangl...@geekplus.com 
wrote:

>
> 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
>
> keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
> ListAggregate()).addSink(new TemplateMySQLSink());
>
>
> ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
>
> 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
>
> 有什么方式让一个窗口只做一次 aggregate 操作吗?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>


Flink 1.10.1 on Yarn

2020-08-20 Thread xuhaiLong


Hi


datastream 转为 table。使用 `JDBCOutputFormat.buildJDBCOutputFormat()` 输出到 
mysql,出现这个[1]异常 任务 failover, 
2点58分开始,1小时一次。导致 任务出现 [2] 异常,metaspace 为 256M,猜测是由于启动过于频繁 classLoder 为同一个引起的。


期望解答:
关于[1] 异常,是什么原因引起的?有没有什么合适的解决方案。flink 1.10 有没有其他输出在 mysql 的 connector?
关于[2]异常,这个问题是我猜测的原因吗?flink 有没有对这个的解决方案


[1] java.lang.RuntimeException: Execution of JDBC statement failed.
at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:102)
at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:93)
at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:40)
at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at 
com.netease.wm.trace.RecTraceV2$JdbcEnrichProcessFunction.processElement(RecTraceV2.scala:540)
at 
com.netease.wm.trace.RecTraceV2$JdbcEnrichProcessFunction.processElement(RecTraceV2.scala:451)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 

Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread 赵一旦
jdbc connector是1.11的吗,我之前还是得自己封装,搞好复杂。batch+timeout+retry+metric等机制。

引领  于2020年8月21日周五 上午9:41写道:

> 哈喽,你现在写入效率是否增加,我也遇到了,感觉写入速度比较低的问题
>
>
> | |
> 引领
> |
> |
> yrx73...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年08月20日 22:52,LittleFall<1578166...@qq.com> 写道:
> 谢谢你的回复,它确实帮到了我。
>
> 我找到了另一个问题:
>
> rewriteBatchedStatements=true
>
> 应该是驼峰式,而我之前用了全小写,以至于按批写入不生效。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


回复: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread 引领
哈喽,你现在写入效率是否增加,我也遇到了,感觉写入速度比较低的问题


| |
引领
|
|
yrx73...@163.com
|
签名由网易邮箱大师定制


在2020年08月20日 22:52,LittleFall<1578166...@qq.com> 写道:
谢谢你的回复,它确实帮到了我。

我找到了另一个问题:

rewriteBatchedStatements=true

应该是驼峰式,而我之前用了全小写,以至于按批写入不生效。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

JSON to Parquet

2020-08-20 Thread Averell
Hello,

I have a stream with each message is a JSON string with a quite complex
schema (multiple fields, multiple nested layers), and I need to write that
into parquet files after some slight modifications/enrichment.

I wonder what options are available for me to do that. I'm thinking of JSON
-> AVRO (GenericRecord) -> Parquet. Is that an option? I would want to be
able to quickly/dynamically (as less code change as possible) change the
JSON schema.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink checkpointing with Azure block storage

2020-08-20 Thread Boris Lublinsky
To test it, I created flink-conf.yaml file and put it in resource directory of 
my project
The file contains the following:

#==
# Fault tolerance and checkpointing
#==

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# .
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: 
wasb://@$.blob.core.windows.net/

fs.azure.account.key..blob.core.windows.net: 

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

Which should of produce error,

But what I see is that it does not seen to take effect:


313 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
3327 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb
 for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
3329 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  
- Proposing leadership to contender akka://flink/user/rpc/jobmanager_3


> On Aug 20, 2020, at 5:14 PM, Boris Lublinsky  
> wrote:
> 
> Is there somewhere a complete configuration example for such option?



Flink checkpointing with Azure block storage

2020-08-20 Thread Boris Lublinsky
Is there somewhere a complete configuration example for such option?


Re: No space left on device exception

2020-08-20 Thread Chesnay Schepler

That should work as well.

On 20/08/2020 22:46, Vishwas Siravara wrote:

Thank you Chesnay.
Yes but I could change the staging directory by adding 
-Djava.io.tmpdir=/data/flink-1.7.2/tmp to /env.java.opts /in the 
flink-conf.yaml file. Do you see any problem with that?


Best,
Vishwas

On Thu, Aug 20, 2020 at 2:01 PM Chesnay Schepler > wrote:


Could you try adding this to your flink-conf.yaml?

s3.staging-directory:/usr/mware/flink/tmp

On 20/08/2020 20:50, Vishwas Siravara wrote:

Hi Piotr,
I did some analysis and realised that the temp files for s3
checkpoints are staged in /tmp although the /io.tmp.dirs /is set
to a different directory.

ls -lrth
drwxr-xr-x. 2 was  was 32 Aug 20 17:52 hsperfdata_was
-rw---. 1 was  was   505M Aug 20 18:45 
presto-s3-8158855975833379228.tmp
-rw---. 1 was  was   505M Aug 20 18:45 
presto-s3-7048419193714606532.tmp
drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root
[was@sl73rspapd031 tmp]$
flink-conf.yaml configuration
io.tmp.dirs: /usr/mware/flink/tmp
The /tmp has only 2GB, is it possible to change the staging
directory for s3 checkpoints ?
Best,
Vishwas

On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara
mailto:vsirav...@gmail.com>> wrote:

Hi Piotr,
Thank you for your suggestion. I will try that, are the
temporary files created in the directory set in
/io.tmp.dirs/ in the flink-conf.yaml ? Would these files be
the same size as checkpoints ?


Thanks,
Vishwas

On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski
mailto:pnowoj...@apache.org>> wrote:

Hi,

As far as I know when uploading a file to S3, the writer
needs to first create some temporary files on the local
disks. I would suggest to double check all of the
partitions on the local machine and monitor available
disk space continuously while the job is running. If you
are just checking the free space manually, you can
easily miss a point of time when you those temporary
files are too big and approaching the available disk
space usage, as I'm pretty sure those temporary files are
cleaned up immediately after throwing this exception that
you see.

Piotrek

czw., 20 sie 2020 o 00:56 Vishwas Siravara
mailto:vsirav...@gmail.com>>
napisał(a):

Hi guys,
I have a deduplication job that runs on flink 1.7,
that has some state which uses FsState backend. My TM
heap size is 16 GB. I see the below error while
trying to checkpoint a state of size 2GB. There is
enough space available in s3, I tried to upload
larger files and they were all successful. There is
also enough disk space in the local file system, the
disk utility tool does not show anything suspicious.
Whenever I try to start my job from the last
successful checkpoint , it runs into the same error.
Can someone tell me what is the cause of this issue?
Many thanks.


Note: This error goes away when I delete io.tmp.dirs
and restart the job from last checkpoint , but the
disk utility tool does not show much usage before
deletion, so I am not able to figure out what
the problem is.

2020-08-19 21:12:01,909 WARN

org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory
- Could not close the state stream for
s3p://featuretoolkit.c

heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
1363 java.io.IOException: No space left on device
1364 at java.io.FileOutputStream.writeBytes(Native
Method)
1365 at
java.io.FileOutputStream.write(FileOutputStream.java:326)
1366 at

java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
1367 at

java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
1368 at
java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
1369 at
java.io.FilterOutputStream.close(FilterOutputStream.java:158)
1370 at

org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
1371 at


Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Thank you Chesnay.
Yes but I could change the staging directory by adding
-Djava.io.tmpdir=/data/flink-1.7.2/tmp
to *env.java.opts *in the flink-conf.yaml file. Do you see any problem with
that?

Best,
Vishwas

On Thu, Aug 20, 2020 at 2:01 PM Chesnay Schepler  wrote:

> Could you try adding this to your flink-conf.yaml?
>
> s3.staging-directory: /usr/mware/flink/tmp
>
> On 20/08/2020 20:50, Vishwas Siravara wrote:
>
> Hi Piotr,
> I did some analysis and realised that the temp files for s3
> checkpoints are staged in /tmp although the  *io.tmp.dirs *is set to a
> different directory.
>
> ls -lrth
>
> drwxr-xr-x. 2 was  was 32 Aug 20 17:52 hsperfdata_was
> -rw---. 1 was  was   505M Aug 20 18:45 
> presto-s3-8158855975833379228.tmp
> -rw---. 1 was  was   505M Aug 20 18:45 
> presto-s3-7048419193714606532.tmp
> drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root
> [was@sl73rspapd031 tmp]$
>
> flink-conf.yaml configuration
>
> io.tmp.dirs: /usr/mware/flink/tmp
>
> The /tmp has only 2GB, is it possible to change the staging directory for s3 
> checkpoints ?
>
> Best,
>
> Vishwas
>
>
> On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara 
> wrote:
>
>> Hi Piotr,
>> Thank you for your suggestion. I will try that, are the temporary files
>> created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ?
>> Would these files be the same size as checkpoints ?
>>
>>
>> Thanks,
>> Vishwas
>>
>> On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> As far as I know when uploading a file to S3, the writer needs to first
>>> create some temporary files on the local disks. I would suggest to double
>>> check all of the partitions on the local machine and monitor available disk
>>> space continuously while the job is running. If you are just checking the
>>> free space manually, you can easily miss a point of time when you those
>>> temporary files are too big and approaching the available disk space usage,
>>> as I'm pretty sure those temporary files are cleaned up immediately after
>>> throwing this exception that you see.
>>>
>>> Piotrek
>>>
>>> czw., 20 sie 2020 o 00:56 Vishwas Siravara 
>>> napisał(a):
>>>
 Hi guys,
 I have a deduplication job that runs on flink 1.7, that has some state
 which uses FsState backend. My TM heap size is 16 GB. I see the below error
 while trying to checkpoint a state of size 2GB. There is enough space
 available in s3, I tried to upload larger files and they were all
 successful. There is also enough disk space in the local file system, the
 disk utility tool does not show anything suspicious. Whenever I try to
 start my job from the last successful checkpoint , it runs into the same
 error. Can someone tell me what is the cause of this issue? Many thanks.


 Note: This error goes away when I delete io.tmp.dirs and restart the
 job from last checkpoint , but the disk utility tool does not show much
 usage before deletion, so I am not able to figure out what the problem is.

 2020-08-19 21:12:01,909 WARN
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
 not close the state stream for s3p://featuretoolkit.c
 heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
 1363 java.io.IOException: No space left on device
 1364 at java.io.FileOutputStream.writeBytes(Native Method)
 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326)
 1366 at
 java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
 1367 at
 java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
 1370 at
 org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
 1371 at
 org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 1372 at
 org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
 1373 at
 org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
 1374 at
 org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
 1375 at
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269)
 1376 at
 org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58)
 1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
 1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250)
 1379 at
 

Re: No space left on device exception

2020-08-20 Thread Chesnay Schepler

Could you try adding this to your flink-conf.yaml?

s3.staging-directory:/usr/mware/flink/tmp

On 20/08/2020 20:50, Vishwas Siravara wrote:

Hi Piotr,
I did some analysis and realised that the temp files for s3 
checkpoints are staged in /tmp although the /io.tmp.dirs /is set to a 
different directory.


ls -lrth
drwxr-xr-x. 2 was  was 32 Aug 20 17:52 hsperfdata_was
-rw---. 1 was  was   505M Aug 20 18:45 presto-s3-8158855975833379228.tmp
-rw---. 1 was  was   505M Aug 20 18:45 presto-s3-7048419193714606532.tmp
drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root
[was@sl73rspapd031 tmp]$
flink-conf.yaml configuration
io.tmp.dirs: /usr/mware/flink/tmp
The /tmp has only 2GB, is it possible to change the staging directory 
for s3 checkpoints ?

Best,
Vishwas

On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara > wrote:


Hi Piotr,
Thank you for your suggestion. I will try that, are the temporary
files created in the directory set in /io.tmp.dirs/ in the
flink-conf.yaml ? Would these files be the same size as checkpoints ?


Thanks,
Vishwas

On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski
mailto:pnowoj...@apache.org>> wrote:

Hi,

As far as I know when uploading a file to S3, the writer needs
to first create some temporary files on the local disks. I
would suggest to double check all of the partitions on the
local machine and monitor available disk space continuously
while the job is running. If you are just checking the free
space manually, you can easily miss a point of time when you
those temporary files are too big and approaching the
available disk space usage, as I'm pretty sure those temporary
files are cleaned up immediately after throwing this exception
that you see.

Piotrek

czw., 20 sie 2020 o 00:56 Vishwas Siravara
mailto:vsirav...@gmail.com>> napisał(a):

Hi guys,
I have a deduplication job that runs on flink 1.7, that
has some state which uses FsState backend. My TM heap size
is 16 GB. I see the below error while trying to checkpoint
a state of size 2GB. There is enough space available in
s3, I tried to upload larger files and they were all
successful. There is also enough disk space in the local
file system, the disk utility tool does not show anything
suspicious. Whenever I try to start my job from the last
successful checkpoint , it runs into the same error. Can
someone tell me what is the cause of this issue? Many thanks.


Note: This error goes away when I delete io.tmp.dirs and
restart the job from last checkpoint , but the disk
utility tool does not show much usage before deletion, so
I am not able to figure out what the problem is.

2020-08-19 21:12:01,909 WARN
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory
- Could not close the state stream for
s3p://featuretoolkit.c

heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
1363 java.io.IOException: No space left on device
1364 at java.io.FileOutputStream.writeBytes(Native Method)
1365 at
java.io.FileOutputStream.write(FileOutputStream.java:326)
1366 at

java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
1367 at
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
1368 at
java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
1369 at
java.io.FilterOutputStream.close(FilterOutputStream.java:158)
1370 at

org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
1371 at

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
1372 at

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
1373 at

org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
1374 at

org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
1375 at

org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269)
1376 at

org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58)
1377 at

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Hi Piotr,
I did some analysis and realised that the temp files for s3 checkpoints are
staged in /tmp although the  *io.tmp.dirs *is set to a different directory.

ls -lrth

drwxr-xr-x. 2 was  was 32 Aug 20 17:52 hsperfdata_was
-rw---. 1 was  was   505M Aug 20 18:45 presto-s3-8158855975833379228.tmp
-rw---. 1 was  was   505M Aug 20 18:45 presto-s3-7048419193714606532.tmp
drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root
[was@sl73rspapd031 tmp]$

flink-conf.yaml configuration

io.tmp.dirs: /usr/mware/flink/tmp


The /tmp has only 2GB, is it possible to change the staging directory
for s3 checkpoints ?


Best,

Vishwas


On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara 
wrote:

> Hi Piotr,
> Thank you for your suggestion. I will try that, are the temporary files
> created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ?
> Would these files be the same size as checkpoints ?
>
>
> Thanks,
> Vishwas
>
> On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> As far as I know when uploading a file to S3, the writer needs to first
>> create some temporary files on the local disks. I would suggest to double
>> check all of the partitions on the local machine and monitor available disk
>> space continuously while the job is running. If you are just checking the
>> free space manually, you can easily miss a point of time when you those
>> temporary files are too big and approaching the available disk space usage,
>> as I'm pretty sure those temporary files are cleaned up immediately after
>> throwing this exception that you see.
>>
>> Piotrek
>>
>> czw., 20 sie 2020 o 00:56 Vishwas Siravara 
>> napisał(a):
>>
>>> Hi guys,
>>> I have a deduplication job that runs on flink 1.7, that has some state
>>> which uses FsState backend. My TM heap size is 16 GB. I see the below error
>>> while trying to checkpoint a state of size 2GB. There is enough space
>>> available in s3, I tried to upload larger files and they were all
>>> successful. There is also enough disk space in the local file system, the
>>> disk utility tool does not show anything suspicious. Whenever I try to
>>> start my job from the last successful checkpoint , it runs into the same
>>> error. Can someone tell me what is the cause of this issue? Many thanks.
>>>
>>>
>>> Note: This error goes away when I delete io.tmp.dirs and restart the
>>> job from last checkpoint , but the disk utility tool does not show much
>>> usage before deletion, so I am not able to figure out what the problem is.
>>>
>>> 2020-08-19 21:12:01,909 WARN
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
>>> not close the state stream for s3p://featuretoolkit.c
>>> heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
>>> 1363 java.io.IOException: No space left on device
>>> 1364 at java.io.FileOutputStream.writeBytes(Native Method)
>>> 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326)
>>> 1366 at
>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>>> 1367 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>>> 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>>> 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>>> 1370 at
>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
>>> 1371 at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>> 1372 at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>>> 1373 at
>>> org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>>> 1374 at
>>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>>> 1375 at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269)
>>> 1376 at
>>> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58)
>>> 1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
>>> 1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250)
>>> 1379 at
>>> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
>>> 1380 at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:185)
>>> 1381 at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:84)
>>> 1382 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> 1383 at
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>>> 1384 at
>>> 

Debezium Flink EMR

2020-08-20 Thread Rex Fenley
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however,
EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink
1.11.0, from looking at the documentation.

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html

I'm wondering what alternative solutions are available for connecting
Debezium to Flink? Is there an open source Debezium connector that works
with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful
joins / materialization using the Table API over data ingested from
Postgres and possibly MySQL.

Appreciate any help, thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Decompose failure recovery time

2020-08-20 Thread Zhinan Cheng
Hi Piotr,

Thanks a lot.
I will try your suggestion to see what happen.

Regards,
Zhinan

On Fri, 21 Aug 2020 at 00:40, Piotr Nowojski  wrote:
>
> Hi Zhinan,
>
> It's hard to say, but my guess it takes that long for the tasks to respond to 
> cancellation which consists of a couple of steps. If a task is currently busy 
> processing something, it has to respond to interruption 
> (`java.lang.Thread#interrupt`). If it takes 30 seconds for a task to react to 
> the interruption and clean up it's resources, that can cause problems and 
> there is very little that Flink can do.
>
> If you want to debug it further, I would suggest collecting stack traces 
> during cancellation (or even better: profile the code during cancellation). 
> This would help you answer the question, what are the task threads busy with.
>
> Probably not a solution, but I'm mentioning it just in case, you can shorten 
> the `task.cancellation.timeout` period.  By default it's 180s. After that, 
> whole TaskManager will be killed. If you have spare TaskManagers or you can 
> restart them very quickly, lowering this timeout might help to some extent 
> (in an exchange for dirty shutdown, without cleaning up the resources).
>
> Piotrek
>
> czw., 20 sie 2020 o 18:00 Zhinan Cheng  napisał(a):
>>
>> Hi Piotr,
>>
>> Thanks a lot for your help.
>> Yes, I finally realize that I can only approximate the time for [1]
>> and [3] and measure [2] by monitoring the uptime and downtime metric
>> provided by Flink.
>>
>> And now my problem is that I found the time in [2] can be up to 40s, I
>> wonder why it takes so long to restart the job.
>> The log actually shows that the time to switch all operator instances
>> from CANCELING to CANCELED is around 30s, do you have any ideas about
>> this?
>>
>> Many thanks.
>>
>> Regards,
>> Zhinan
>>
>> On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
>> >
>> > Hi,
>> >
>> > > I want to decompose the recovery time into different parts, say
>> > > (1) the time to detect the failure,
>> > > (2) the time to restart the job,
>> > > (3) and the time to restore the checkpointing.
>> >
>> > 1. Maybe I'm missing something, but as far as I can tell, Flink can not
>> > help you with that. Time to detect the failure, would be a time between the
>> > failure occurred, and the time when JobManager realises about this failure.
>> > If we could reliably measure/check when the first one happened, then we
>> > could immediately trigger failover. You are interested in this exactly
>> > because there is no reliable way to detect the failure immediately. You
>> > could approximate this via analysing the logs.
>> >
>> > 2. Maybe there are some metrics that you could use, if not you check use
>> > the REST API [1] to monitor for the job status. Again you could also do it
>> > via analysing the logs.
>> >
>> > 3. In the future this might be measurable using the REST API (similar as
>> > the point 2.), but currently there is no way to do it that way. There is a
>> > ticket for that [2]. I think currently the only way is to do it is via
>> > analysing the logs.
>> >
>> > If you just need to do this once, I would analyse the logs manually. If you
>> > want to do it many times or monitor this continuously, I would write some
>> > simple script (python?) to mix checking REST API calls for 2. with logs
>> > analysing.
>> >
>> > Piotrek
>> >
>> >
>> > [1]
>> > https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs
>> > [2] https://issues.apache.org/jira/browse/FLINK-17012
>> > wt., 18 sie 2020 o 04:07 Zhinan Cheng  napisał(a):
>> >
>> > > Hi all,
>> > >
>> > > I am working on measuring the failure recovery time of Flink and I
>> > > want to decompose the recovery time into different parts, say the time
>> > > to detect the failure, the time to restart the job, and the time to
>> > > restore the checkpointing.
>> > >
>> > > Unfortunately, I cannot find  any information in Flink doc to solve
>> > > this, Is there any way that Flink has provided for this, otherwise,
>> > > how can I solve this?
>> > >
>> > > Thanks a lot for your help.
>> > >
>> > > Regards,
>> > > Juno
>> > >
>>
>> On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
>> >
>> > Hi,
>> >
>> > > I want to decompose the recovery time into different parts, say
>> > > (1) the time to detect the failure,
>> > > (2) the time to restart the job,
>> > > (3) and the time to restore the checkpointing.
>> >
>> > 1. Maybe I'm missing something, but as far as I can tell, Flink can not
>> > help you with that. Time to detect the failure, would be a time between the
>> > failure occurred, and the time when JobManager realises about this failure.
>> > If we could reliably measure/check when the first one happened, then we
>> > could immediately trigger failover. You are interested in this exactly
>> > because there is no reliable way to detect the failure immediately. You
>> > could approximate this via analysing the logs.
>> >
>> > 2. Maybe there are some metrics 

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Vijayendra Yadav
Hi Till/ Piotr,

*My process was working with : FsStateBackend  but when I switched
to RocksDBStateBackend I faced this problem. My class path is below. *

*Related jar in classpath: *
/usr/lib/hadoop-yarn/hadoop-yarn-api-2.8.5-amzn-6.jar:/usr/lib/hadoop-yarn/hadoop-yarn-api.jar:


*Classpath:*

Re: How Flink distinguishes between late and in-time events?

2020-08-20 Thread Piotr Nowojski
Hi Ori,

No. Flink does it differently. Operators that are keeping track of late
events, are remembering the latest watermark. If a new element arrives with
even time lower compared to the latest watermark, it is marked as a late
event [1]

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#lateness

czw., 20 sie 2020 o 17:13 Ori Popowski  napisał(a):

> In the documentation
> 
> it states that:
>
> *[…], Flink keeps the state of windows until their allowed lateness
> expires. Once this happens, Flink removes the window and deletes its state,
> as also described in the Window Lifecycle
> 
> section.*
>
> However, something doesn't make sense to me.
>
> If Flink deletes the window state, then how can it know that subsequent
> events are late? i.e. if the state is deleted, then Flink has no way of
> knowing than an event is late, because it can think it's just a new event,
> unless it keeps track of which keyed windows are closed forever.
>
> Does Flink remember which keyed windows are closed forever?
>
> Thanks.
>


Re: Decompose failure recovery time

2020-08-20 Thread Piotr Nowojski
Hi Zhinan,

It's hard to say, but my guess it takes that long for the tasks to respond
to cancellation which consists of a couple of steps. If a task is currently
busy processing something, it has to respond to interruption
(`java.lang.Thread#interrupt`). If it takes 30 seconds for a task to react
to the interruption and clean up it's resources, that can cause problems
and there is very little that Flink can do.

If you want to debug it further, I would suggest collecting stack traces
during cancellation (or even better: profile the code during cancellation).
This would help you answer the question, what are the task threads busy
with.

Probably not a solution, but I'm mentioning it just in case, you can
shorten the `task.cancellation.timeout` period.  By default it's 180s.
After that, whole TaskManager will be killed. If you have spare
TaskManagers or you can restart them very quickly, lowering this timeout
might help to some extent (in an exchange for dirty shutdown, without
cleaning up the resources).

Piotrek

czw., 20 sie 2020 o 18:00 Zhinan Cheng  napisał(a):

> Hi Piotr,
>
> Thanks a lot for your help.
> Yes, I finally realize that I can only approximate the time for [1]
> and [3] and measure [2] by monitoring the uptime and downtime metric
> provided by Flink.
>
> And now my problem is that I found the time in [2] can be up to 40s, I
> wonder why it takes so long to restart the job.
> The log actually shows that the time to switch all operator instances
> from CANCELING to CANCELED is around 30s, do you have any ideas about
> this?
>
> Many thanks.
>
> Regards,
> Zhinan
>
> On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
> >
> > Hi,
> >
> > > I want to decompose the recovery time into different parts, say
> > > (1) the time to detect the failure,
> > > (2) the time to restart the job,
> > > (3) and the time to restore the checkpointing.
> >
> > 1. Maybe I'm missing something, but as far as I can tell, Flink can not
> > help you with that. Time to detect the failure, would be a time between
> the
> > failure occurred, and the time when JobManager realises about this
> failure.
> > If we could reliably measure/check when the first one happened, then we
> > could immediately trigger failover. You are interested in this exactly
> > because there is no reliable way to detect the failure immediately. You
> > could approximate this via analysing the logs.
> >
> > 2. Maybe there are some metrics that you could use, if not you check use
> > the REST API [1] to monitor for the job status. Again you could also do
> it
> > via analysing the logs.
> >
> > 3. In the future this might be measurable using the REST API (similar as
> > the point 2.), but currently there is no way to do it that way. There is
> a
> > ticket for that [2]. I think currently the only way is to do it is via
> > analysing the logs.
> >
> > If you just need to do this once, I would analyse the logs manually. If
> you
> > want to do it many times or monitor this continuously, I would write some
> > simple script (python?) to mix checking REST API calls for 2. with logs
> > analysing.
> >
> > Piotrek
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs
> > [2] https://issues.apache.org/jira/browse/FLINK-17012
> > wt., 18 sie 2020 o 04:07 Zhinan Cheng 
> napisał(a):
> >
> > > Hi all,
> > >
> > > I am working on measuring the failure recovery time of Flink and I
> > > want to decompose the recovery time into different parts, say the time
> > > to detect the failure, the time to restart the job, and the time to
> > > restore the checkpointing.
> > >
> > > Unfortunately, I cannot find  any information in Flink doc to solve
> > > this, Is there any way that Flink has provided for this, otherwise,
> > > how can I solve this?
> > >
> > > Thanks a lot for your help.
> > >
> > > Regards,
> > > Juno
> > >
>
> On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
> >
> > Hi,
> >
> > > I want to decompose the recovery time into different parts, say
> > > (1) the time to detect the failure,
> > > (2) the time to restart the job,
> > > (3) and the time to restore the checkpointing.
> >
> > 1. Maybe I'm missing something, but as far as I can tell, Flink can not
> > help you with that. Time to detect the failure, would be a time between
> the
> > failure occurred, and the time when JobManager realises about this
> failure.
> > If we could reliably measure/check when the first one happened, then we
> > could immediately trigger failover. You are interested in this exactly
> > because there is no reliable way to detect the failure immediately. You
> > could approximate this via analysing the logs.
> >
> > 2. Maybe there are some metrics that you could use, if not you check use
> > the REST API [1] to monitor for the job status. Again you could also do
> it
> > via analysing the logs.
> >
> > 3. In the future this might be measurable using the REST API (similar as
> > the point 2.), but currently there 

Re: Flink checkpoint recovery time

2020-08-20 Thread Zhinan Cheng
Hi Till,

Thanks for the quick reply.

Yes, the job actually restarts twice, the metric fullRestarts also
indicates this, its value is 2.
But the job indeed takes around 30s to switch from CANCELLING to RESTARTING
in its first restart.
I just wonder why it takes so long here?

Also, even I set the heartbeat timeout from default 50s to 5s, this time is
similar, so I think this is nothing about the heartbeat timeout.

Regards,
Zhinan

On Fri, 21 Aug 2020 at 00:02, Till Rohrmann  wrote:

> Hi Zhinan,
>
> the logs show that the cancellation does not take 30s. What happens is
> that the job gets restarted a couple of times. The problem seems to be that
> one TaskManager died permanently but it takes the heartbeat timeout
> (default 50s) until it is detected as dead. In the meantime the system
> tries to redeploy tasks which will cause the job to fail again and again.
>
> Cheers,
> Till
>
> On Thu, Aug 20, 2020 at 4:41 PM Zhinan Cheng 
> wrote:
>
>> Hi Till,
>>
>> Sorry for the late reply.
>> Attached is the log of jobmanager.
>> I notice that during canceling the job, the jobmanager also warns that
>> the connections to the failed taskmanager is lost.
>> And this lasts for about 30s, and then the jobmanager
>> successfully cancels the operator instances that related to the
>> failed taskmanager and restarts the job.
>> Does there anyway help reduce the restart time?
>>
>> Thanks a lot.
>>
>> Regards,
>> Zhinan
>>
>> On Wed, 19 Aug 2020 at 16:37, Till Rohrmann  wrote:
>>
>>> Could you share the logs with us? This might help to explain why the
>>> cancellation takes so long. Flink is no longer using Akka's death watch
>>> mechanism.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 19, 2020 at 10:01 AM Zhinan Cheng 
>>> wrote:
>>>
 Hi Till,

 Thanks for the quick response.

 > for i) the cancellation depends on the user code. If the user code
 does a blocking operation, Flink needs to wait until it returns from there
 before it can move the Task's state to CANCELED.
 for this, my code just includes a map operation and then aggregates the
 results into a tumbling window. So I think in this case the time is not
 attributed to the code.
 I looked into the log, during the period, I observed that the
 jobmanager continues warning that its connection to the failed
 taskmanager is confused.
 I am not sure if this is the reason that delays the canceling, do you
 have any ideas about this?

 I am also looking the deadthwatch mechanism [1] of Akka to see if this
 is the reason.

 For (ii), I will open the JIRA issue for your mention.

 Thanks.


 [1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors

 Regards.
 Zhinan

 On Wed, 19 Aug 2020 at 15:39, Till Rohrmann 
 wrote:

> Hi Zhinan,
>
> for i) the cancellation depends on the user code. If the user code
> does a blocking operation, Flink needs to wait until it returns from there
> before it can move the Task's state to CANCELED.
>
> for ii) I think your observation is correct. Could you please open a
> JIRA issue for this problem so that it can be fixed in Flink? Thanks a 
> lot!
>
> For the time to restore the checkpoints it could also be interesting
> to add a proper metric to Flink. Hence, you could also create a JIRA issue
> for it.
>
> Cheers,
> Till
>
> On Wed, Aug 19, 2020 at 8:43 AM Zhinan Cheng 
> wrote:
>
>> Hi Yun,
>>
>> Thanks a lot for your help. Seems hard to measure the checkpointing
>> restore time currently.
>> I do monitor the "fullRestarts" metric and others like "uptime" and
>> "downtime" to observe some information about failure recovery.
>>
>> Still some confusions:
>> i) I found the time for the jobmanager to make the job from status
>> CANCELING to status CANCELED up to 30s?
>>  Is there any reason why it takes so long? Can I reduce this time?
>> ii) Currently the way to calculate the "downtime"  is not consistent
>> with the description in the doc, now the downtime is actually the current
>> timestamp minus the time timestamp when the job started.
>> But I think the doc obviously only want to measure the current
>> timestamp minus the timestamp when the job failed.
>>
>> I think I need to measure these times by adding specified metrics
>> myself.
>>
>> Regards,
>> Zhinan
>>
>>
>>
>>
>> On Wed, 19 Aug 2020 at 01:45, Yun Tang  wrote:
>>
>>> Hi Zhinan,
>>>
>>> For the time to detect the failure, you could refer to the time when
>>> 'fullRestarts' increase. That could give you information about the time 
>>> of
>>> job failure.
>>>
>>> For the checkpoint recovery time, there actually exist two parts:
>>>
>>>1. The time to read checkpoint meta in JM. However, this
>>>

Re: state序列化问题

2020-08-20 Thread Yun Tang
Hi

如果想要在list中保存String,也就是list中的每个元素格式是String,ListState的格式应该是 ListState, 
而不是 ListState>,后者表示有一个list,list中的每一个元素均是一个list

ListState 本身并不属于java的collection,所以不存在ArrayList 与 LinkedList的区别。

祝好
唐云

From: shizk233 
Sent: Thursday, August 20, 2020 18:00
To: user-zh@flink.apache.org 
Subject: state序列化问题

Hi all,

请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化,
那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList,
会对类型信息提取产生不良影响吗?

按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。
但是都可以作为List>来声明。

请求野生的大佬支援一下!


Re: 增量che ckpoint

2020-08-20 Thread Yun Tang
Hi

增量checkpoint与web界面的信息其实没有直接联系,增量checkpoint的信息记录由CheckpointCoordinator中的SharedStateRegistry[1]
 进行计数管理,而保留多少checkpoint则由 CheckpointStore管理 [2].
保留2个checkpoint的执行过程如下:
chk-1 completed --> register chk-1 in state registry --> add to checkpoint store
chk-2 completed --> register chk-2 in state registry --> add to checkpoint store
chk-3 completed --> register chk-3 in state registry --> add to checkpoint 
store --> chk-1 subsumed --> unregister chk-1 in state registry --> discard 
state with reference=0
chk-4 completed --> register chk-4 in state registry --> add to checkpoint 
store --> chk-2 subsumed --> unregister chk-2 in state registry --> discard 
state with reference=0

从上面可以看懂整个执行流程,所以当chk-3 
仍然有部分数据依赖chk-1时,那些state数据在unregister时,其计数统计并不会降为0,也就不会删掉,也不需要copy到本次中。


[1] 
https://github.com/apache/flink/blob/f8ce30a50b8dd803d4476ea5d83e7d48708d54fa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L192
[2] 
https://github.com/apache/flink/blob/f8ce30a50b8dd803d4476ea5d83e7d48708d54fa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java#L41

祝好
唐云



From: 赵一旦 
Sent: Thursday, August 20, 2020 10:50
To: user-zh@flink.apache.org 
Subject: Re: 增量che ckpoint

等其他人正解。下面是我的猜测:
保留2个检查点是web界面保留2个检查点,增量情况下,这2个检查点所有引用到的所有历史检查点肯定都不会被删除。
因此第3个检查点的时候,只有2,3检查点仍然引用了1,则1就不会被删除。

superainbower  于2020年8月20日周四 上午10:46写道:

> hi,请教大家一个问题,开启了增量checkpoint,同时checkpoint的个数设置为只保留2个,那么如果当前是第三次checkpoint
> 仍然依赖第一次的checkpoint会出现什么情况,会把第一次的copy过来到本次中吗?如过第一次不删除,不是会不满足保留2个的限制吗


Re: Decompose failure recovery time

2020-08-20 Thread Zhinan Cheng
Hi Piotr,

Thanks a lot for your help.
Yes, I finally realize that I can only approximate the time for [1]
and [3] and measure [2] by monitoring the uptime and downtime metric
provided by Flink.

And now my problem is that I found the time in [2] can be up to 40s, I
wonder why it takes so long to restart the job.
The log actually shows that the time to switch all operator instances
from CANCELING to CANCELED is around 30s, do you have any ideas about
this?

Many thanks.

Regards,
Zhinan

On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
>
> Hi,
>
> > I want to decompose the recovery time into different parts, say
> > (1) the time to detect the failure,
> > (2) the time to restart the job,
> > (3) and the time to restore the checkpointing.
>
> 1. Maybe I'm missing something, but as far as I can tell, Flink can not
> help you with that. Time to detect the failure, would be a time between the
> failure occurred, and the time when JobManager realises about this failure.
> If we could reliably measure/check when the first one happened, then we
> could immediately trigger failover. You are interested in this exactly
> because there is no reliable way to detect the failure immediately. You
> could approximate this via analysing the logs.
>
> 2. Maybe there are some metrics that you could use, if not you check use
> the REST API [1] to monitor for the job status. Again you could also do it
> via analysing the logs.
>
> 3. In the future this might be measurable using the REST API (similar as
> the point 2.), but currently there is no way to do it that way. There is a
> ticket for that [2]. I think currently the only way is to do it is via
> analysing the logs.
>
> If you just need to do this once, I would analyse the logs manually. If you
> want to do it many times or monitor this continuously, I would write some
> simple script (python?) to mix checking REST API calls for 2. with logs
> analysing.
>
> Piotrek
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs
> [2] https://issues.apache.org/jira/browse/FLINK-17012
> wt., 18 sie 2020 o 04:07 Zhinan Cheng  napisał(a):
>
> > Hi all,
> >
> > I am working on measuring the failure recovery time of Flink and I
> > want to decompose the recovery time into different parts, say the time
> > to detect the failure, the time to restart the job, and the time to
> > restore the checkpointing.
> >
> > Unfortunately, I cannot find  any information in Flink doc to solve
> > this, Is there any way that Flink has provided for this, otherwise,
> > how can I solve this?
> >
> > Thanks a lot for your help.
> >
> > Regards,
> > Juno
> >

On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
>
> Hi,
>
> > I want to decompose the recovery time into different parts, say
> > (1) the time to detect the failure,
> > (2) the time to restart the job,
> > (3) and the time to restore the checkpointing.
>
> 1. Maybe I'm missing something, but as far as I can tell, Flink can not
> help you with that. Time to detect the failure, would be a time between the
> failure occurred, and the time when JobManager realises about this failure.
> If we could reliably measure/check when the first one happened, then we
> could immediately trigger failover. You are interested in this exactly
> because there is no reliable way to detect the failure immediately. You
> could approximate this via analysing the logs.
>
> 2. Maybe there are some metrics that you could use, if not you check use
> the REST API [1] to monitor for the job status. Again you could also do it
> via analysing the logs.
>
> 3. In the future this might be measurable using the REST API (similar as
> the point 2.), but currently there is no way to do it that way. There is a
> ticket for that [2]. I think currently the only way is to do it is via
> analysing the logs.
>
> If you just need to do this once, I would analyse the logs manually. If you
> want to do it many times or monitor this continuously, I would write some
> simple script (python?) to mix checking REST API calls for 2. with logs
> analysing.
>
> Piotrek
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs
> [2] https://issues.apache.org/jira/browse/FLINK-17012
> wt., 18 sie 2020 o 04:07 Zhinan Cheng  napisał(a):
>
> > Hi all,
> >
> > I am working on measuring the failure recovery time of Flink and I
> > want to decompose the recovery time into different parts, say the time
> > to detect the failure, the time to restart the job, and the time to
> > restore the checkpointing.
> >
> > Unfortunately, I cannot find  any information in Flink doc to solve
> > this, Is there any way that Flink has provided for this, otherwise,
> > how can I solve this?
> >
> > Thanks a lot for your help.
> >
> > Regards,
> > Juno
> >


Re: Flink checkpoint recovery time

2020-08-20 Thread Till Rohrmann
Hi Zhinan,

the logs show that the cancellation does not take 30s. What happens is that
the job gets restarted a couple of times. The problem seems to be that one
TaskManager died permanently but it takes the heartbeat timeout (default
50s) until it is detected as dead. In the meantime the system tries to
redeploy tasks which will cause the job to fail again and again.

Cheers,
Till

On Thu, Aug 20, 2020 at 4:41 PM Zhinan Cheng 
wrote:

> Hi Till,
>
> Sorry for the late reply.
> Attached is the log of jobmanager.
> I notice that during canceling the job, the jobmanager also warns that the
> connections to the failed taskmanager is lost.
> And this lasts for about 30s, and then the jobmanager successfully cancels
> the operator instances that related to the failed taskmanager and restarts
> the job.
> Does there anyway help reduce the restart time?
>
> Thanks a lot.
>
> Regards,
> Zhinan
>
> On Wed, 19 Aug 2020 at 16:37, Till Rohrmann  wrote:
>
>> Could you share the logs with us? This might help to explain why the
>> cancellation takes so long. Flink is no longer using Akka's death watch
>> mechanism.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 19, 2020 at 10:01 AM Zhinan Cheng 
>> wrote:
>>
>>> Hi Till,
>>>
>>> Thanks for the quick response.
>>>
>>> > for i) the cancellation depends on the user code. If the user code
>>> does a blocking operation, Flink needs to wait until it returns from there
>>> before it can move the Task's state to CANCELED.
>>> for this, my code just includes a map operation and then aggregates the
>>> results into a tumbling window. So I think in this case the time is not
>>> attributed to the code.
>>> I looked into the log, during the period, I observed that the jobmanager
>>> continues warning that its connection to the failed taskmanager is confused.
>>> I am not sure if this is the reason that delays the canceling, do you
>>> have any ideas about this?
>>>
>>> I am also looking the deadthwatch mechanism [1] of Akka to see if this
>>> is the reason.
>>>
>>> For (ii), I will open the JIRA issue for your mention.
>>>
>>> Thanks.
>>>
>>>
>>> [1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors
>>>
>>> Regards.
>>> Zhinan
>>>
>>> On Wed, 19 Aug 2020 at 15:39, Till Rohrmann 
>>> wrote:
>>>
 Hi Zhinan,

 for i) the cancellation depends on the user code. If the user code does
 a blocking operation, Flink needs to wait until it returns from there
 before it can move the Task's state to CANCELED.

 for ii) I think your observation is correct. Could you please open a
 JIRA issue for this problem so that it can be fixed in Flink? Thanks a lot!

 For the time to restore the checkpoints it could also be interesting to
 add a proper metric to Flink. Hence, you could also create a JIRA issue for
 it.

 Cheers,
 Till

 On Wed, Aug 19, 2020 at 8:43 AM Zhinan Cheng 
 wrote:

> Hi Yun,
>
> Thanks a lot for your help. Seems hard to measure the checkpointing
> restore time currently.
> I do monitor the "fullRestarts" metric and others like "uptime" and
> "downtime" to observe some information about failure recovery.
>
> Still some confusions:
> i) I found the time for the jobmanager to make the job from status
> CANCELING to status CANCELED up to 30s?
>  Is there any reason why it takes so long? Can I reduce this time?
> ii) Currently the way to calculate the "downtime"  is not consistent
> with the description in the doc, now the downtime is actually the current
> timestamp minus the time timestamp when the job started.
> But I think the doc obviously only want to measure the current
> timestamp minus the timestamp when the job failed.
>
> I think I need to measure these times by adding specified metrics
> myself.
>
> Regards,
> Zhinan
>
>
>
>
> On Wed, 19 Aug 2020 at 01:45, Yun Tang  wrote:
>
>> Hi Zhinan,
>>
>> For the time to detect the failure, you could refer to the time when
>> 'fullRestarts' increase. That could give you information about the time 
>> of
>> job failure.
>>
>> For the checkpoint recovery time, there actually exist two parts:
>>
>>1. The time to read checkpoint meta in JM. However, this duration
>>of time has no explicit metrics currently as that part of duration 
>> would be
>>nearly just reading 1 MB file remotely from DFS.
>>2. The time for tasks to restore state. This should be treated as
>>the real time for checkpoint recovery and could even be 10 minutes+ 
>> when
>>restoring savepoint. Unfortunately, this part of time is also not 
>> recorded
>>in metrics now.
>>If you find the task is in RUNNING state but not consume any
>>record, that might be stuck in restoring checkpoint/savepoint.
>>
>>
>> Best
>> Yun Tang
>> 

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Till Rohrmann
Here is a link to information on how to integrate Flink with Hadoop [1]. In
the latest version you only need to point Flink to the Hadoop libraries via
setting the HADOOP_CLASSPATH environment variable.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html

Cheers,
Till

On Thu, Aug 20, 2020 at 5:50 PM Till Rohrmann  wrote:

> I agree with Piotr's analysis. It should not matter whether you are using
> RocksDBStateBackend or not. It seems as if you have a Hadoop dependency
> clash. Could you check which dependencies are on the class path?
>
> Cheers,
> Till
>
> On Thu, Aug 20, 2020 at 3:52 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> It looks more like a dependency convergence issue - you have two
>> conflicting versions of 
>> `org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest`
>> on the class path. Or you built your jar with one version and trying to
>> execute it with a different one.
>>
>> Till is it some kind of a known issue?
>>
>> Piotrek
>>
>>
>> czw., 20 sie 2020 o 06:48 Vijayendra Yadav 
>> napisał(a):
>>
>>> Hi Team,
>>>
>>> Getting the following error when using RocksDBStateBackend on yarn/EMR.
>>> Am I missing any dependencies?
>>>
>>>
>>> 2020-08-20 04:37:00,713 ERROR 
>>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - 
>>> Exception on heartbeat
>>> java.lang.NoSuchMethodError: 
>>> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>>> at 
>>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>>> at 
>>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
>>> 2020-08-20 04:37:00,714 INFO  
>>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - 
>>> Interrupted while waiting for queue
>>> java.lang.InterruptedException
>>> at 
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
>>> at 
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
>>> at 
>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>>> at 
>>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:323)
>>> 2020-08-20 04:37:00,714 ERROR 
>>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - 
>>> Stopping callback due to:
>>> java.lang.NoSuchMethodError: 
>>> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>>> at 
>>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>>> at 
>>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
>>> 2020-08-20 04:37:00,714 ERROR org.apache.flink.yarn.YarnResourceManager 
>>> - Fatal error occurred in ResourceManager.
>>> java.lang.NoSuchMethodError: 
>>> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>>> at 
>>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>>> at 
>>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
>>> 2020-08-20 04:37:00,714 ERROR 
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error 
>>> occurred in the cluster entrypoint.
>>> java.lang.NoSuchMethodError: 
>>> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>>> at 
>>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>>> at 
>>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
>>> 2020-08-20 04:37:00,718 INFO  org.apache.flink.runtime.blob.BlobServer
>>>
>>> - Stopped BLOB server at 0.0.0.0:45627
>>>
>>>
>>> Regards,
>>>
>>> Vijay
>>>
>>>


Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Till Rohrmann
I agree with Piotr's analysis. It should not matter whether you are using
RocksDBStateBackend or not. It seems as if you have a Hadoop dependency
clash. Could you check which dependencies are on the class path?

Cheers,
Till

On Thu, Aug 20, 2020 at 3:52 PM Piotr Nowojski 
wrote:

> Hi,
>
> It looks more like a dependency convergence issue - you have two
> conflicting versions of 
> `org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest`
> on the class path. Or you built your jar with one version and trying to
> execute it with a different one.
>
> Till is it some kind of a known issue?
>
> Piotrek
>
>
> czw., 20 sie 2020 o 06:48 Vijayendra Yadav 
> napisał(a):
>
>> Hi Team,
>>
>> Getting the following error when using RocksDBStateBackend on yarn/EMR.
>> Am I missing any dependencies?
>>
>>
>> 2020-08-20 04:37:00,713 ERROR 
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - 
>> Exception on heartbeat
>> java.lang.NoSuchMethodError: 
>> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>>  at 
>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>>  at 
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
>> 2020-08-20 04:37:00,714 INFO  
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - 
>> Interrupted while waiting for queue
>> java.lang.InterruptedException
>>  at 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
>>  at 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
>>  at 
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>>  at 
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:323)
>> 2020-08-20 04:37:00,714 ERROR 
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Stopping 
>> callback due to:
>> java.lang.NoSuchMethodError: 
>> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>>  at 
>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>>  at 
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
>> 2020-08-20 04:37:00,714 ERROR org.apache.flink.yarn.YarnResourceManager  
>>- Fatal error occurred in ResourceManager.
>> java.lang.NoSuchMethodError: 
>> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>>  at 
>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>>  at 
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
>> 2020-08-20 04:37:00,714 ERROR 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error 
>> occurred in the cluster entrypoint.
>> java.lang.NoSuchMethodError: 
>> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>>  at 
>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>>  at 
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
>> 2020-08-20 04:37:00,718 INFO  org.apache.flink.runtime.blob.BlobServer
>>
>> - Stopped BLOB server at 0.0.0.0:45627
>>
>>
>> Regards,
>>
>> Vijay
>>
>>


Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Hi Piotr,
Thank you for your suggestion. I will try that, are the temporary files
created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ?
Would these files be the same size as checkpoints ?


Thanks,
Vishwas

On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski  wrote:

> Hi,
>
> As far as I know when uploading a file to S3, the writer needs to first
> create some temporary files on the local disks. I would suggest to double
> check all of the partitions on the local machine and monitor available disk
> space continuously while the job is running. If you are just checking the
> free space manually, you can easily miss a point of time when you those
> temporary files are too big and approaching the available disk space usage,
> as I'm pretty sure those temporary files are cleaned up immediately after
> throwing this exception that you see.
>
> Piotrek
>
> czw., 20 sie 2020 o 00:56 Vishwas Siravara 
> napisał(a):
>
>> Hi guys,
>> I have a deduplication job that runs on flink 1.7, that has some state
>> which uses FsState backend. My TM heap size is 16 GB. I see the below error
>> while trying to checkpoint a state of size 2GB. There is enough space
>> available in s3, I tried to upload larger files and they were all
>> successful. There is also enough disk space in the local file system, the
>> disk utility tool does not show anything suspicious. Whenever I try to
>> start my job from the last successful checkpoint , it runs into the same
>> error. Can someone tell me what is the cause of this issue? Many thanks.
>>
>>
>> Note: This error goes away when I delete io.tmp.dirs and restart the job
>> from last checkpoint , but the disk utility tool does not show much usage
>> before deletion, so I am not able to figure out what the problem is.
>>
>> 2020-08-19 21:12:01,909 WARN
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
>> not close the state stream for s3p://featuretoolkit.c
>> heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
>> 1363 java.io.IOException: No space left on device
>> 1364 at java.io.FileOutputStream.writeBytes(Native Method)
>> 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326)
>> 1366 at
>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>> 1367 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>> 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>> 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>> 1370 at
>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
>> 1371 at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> 1372 at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>> 1373 at
>> org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> 1374 at
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> 1375 at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269)
>> 1376 at
>> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58)
>> 1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
>> 1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250)
>> 1379 at
>> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
>> 1380 at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:185)
>> 1381 at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:84)
>> 1382 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> 1383 at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>> 1384 at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>> 1385 at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> 1386 at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> 1387 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> 1388 at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> 1389 at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> 1390 at java.lang.Thread.run(Thread.java:748)
>> 1391 Suppressed: java.io.IOException: No space left on device
>> 1392 at java.io.FileOutputStream.writeBytes(Native Method)
>> 1393 at java.io.FileOutputStream.write(FileOutputStream.java:326)
>> 1394 at
>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>> 1395 at 

How Flink distinguishes between late and in-time events?

2020-08-20 Thread Ori Popowski
In the documentation

it states that:

*[…], Flink keeps the state of windows until their allowed lateness
expires. Once this happens, Flink removes the window and deletes its state,
as also described in the Window Lifecycle

section.*

However, something doesn't make sense to me.

If Flink deletes the window state, then how can it know that subsequent
events are late? i.e. if the state is deleted, then Flink has no way of
knowing than an event is late, because it can think it's just a new event,
unless it keeps track of which keyed windows are closed forever.

Does Flink remember which keyed windows are closed forever?

Thanks.


Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread LittleFall
谢谢你的回复,它确实帮到了我。

我找到了另一个问题:

rewriteBatchedStatements=true

应该是驼峰式,而我之前用了全小写,以至于按批写入不生效。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-20 Thread Kostas Kloudas
Hi all,

Thanks for the comments!

@Dawid: "execution.mode" can be a nice alternative and from a quick
look it is not used currently by any configuration option. I will
update the FLIP accordingly.

@David: Given that having the option to allow timers to fire at the
end of the job is already in the FLIP, I will leave it as is and I
will update the default policy to be "ignore processing time timers
set by the user". This will allow existing dataStream programs to run
on bounded inputs. This update will affect point 2 in the "Processing
Time Support in Batch" section.

If these changes cover your proposals, then I would like to start a
voting thread tomorrow evening if this is ok with you.

Please let me know until then.

Kostas

On Tue, Aug 18, 2020 at 3:54 PM David Anderson  wrote:
>
> Being able to optionally fire registered processing time timers at the end of 
> a job would be interesting, and would help in (at least some of) the cases I 
> have in mind. I don't have a better idea.
>
> David
>
> On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas  wrote:
>>
>> Hi Kurt and David,
>>
>> Thanks a lot for the insightful feedback!
>>
>> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
>> agree with you that it requires a lot more work and careful thinking
>> on the semantics. This FLIP was written under the assumption that if
>> the user wants to have checkpoints on bounded input, he/she will have
>> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
>> can be handled as a separate topic in the future.
>>
>> In the case of MIXED workloads and for this FLIP, the scheduling mode
>> should be set to STREAMING. That is why the AUTOMATIC option sets
>> scheduling to BATCH only if all the sources are bounded. I am not sure
>> what are the plans there at the scheduling level, as one could imagine
>> in the future that in mixed workloads, we schedule first all the
>> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>> subgraph per application, which is going to be scheduled after all
>> Bounded ones have finished. Essentially the bounded subgraphs will be
>> used to bootstrap the unbounded one. But, I am not aware of any plans
>> towards that direction.
>>
>>
>> @David: The processing time timer handling is a topic that has also
>> been discussed in the community in the past, and I do not remember any
>> final conclusion unfortunately.
>>
>> In the current context and for bounded input, we chose to favor
>> reproducibility of the result, as this is expected in batch processing
>> where the whole input is available in advance. This is why this
>> proposal suggests to not allow processing time timers. But I
>> understand your argument that the user may want to be able to run the
>> same pipeline on batch and streaming this is why we added the two
>> options under future work, namely (from the FLIP):
>>
>> ```
>> Future Work: In the future we may consider adding as options the capability 
>> of:
>> * firing all the registered processing time timers at the end of a job
>> (at close()) or,
>> * ignoring all the registered processing time timers at the end of a job.
>> ```
>>
>> Conceptually, we are essentially saying that we assume that batch
>> execution is assumed to be instantaneous and refers to a single
>> "point" in time and any processing-time timers for the future may fire
>> at the end of execution or be ignored (but not throw an exception). I
>> could also see ignoring the timers in batch as the default, if this
>> makes more sense.
>>
>> By the way, do you have any usecases in mind that will help us better
>> shape our processing time timer handling?
>>
>> Kostas
>>
>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson  wrote:
>> >
>> > Kostas,
>> >
>> > I'm pleased to see some concrete details in this FLIP.
>> >
>> > I wonder if the current proposal goes far enough in the direction of 
>> > recognizing the need some users may have for "batch" and "bounded 
>> > streaming" to be treated differently. If I've understood it correctly, the 
>> > section on scheduling allows me to choose STREAMING scheduling even if I 
>> > have bounded sources. I like that approach, because it recognizes that 
>> > even though I have bounded inputs, I don't necessarily want batch 
>> > processing semantics. I think it makes sense to extend this idea to 
>> > processing time support as well.
>> >
>> > My thinking is that sometimes in development and testing it's reasonable 
>> > to run exactly the same job as in production, except with different 
>> > sources and sinks. While it might be a reasonable default, I'm not 
>> > convinced that switching a processing time streaming job to read from a 
>> > bounded source should always cause it to fail.
>> >
>> > David
>> >
>> > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:
>> >>
>> >> Hi all,
>> >>
>> >> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> >> API in favour of the DataStream API and the Table 

Re: Watermark generation issues with File sources in Flink 1.11.1

2020-08-20 Thread Arti Pande
Hi Till,

Thank you for your quick response. Both the AssignerWithPeriodicWatermarks
and WatermarkStrategy I am using are very simple ones.

*Code for AssignerWithPeriodicWatermarks:*

public class CustomEventTimeWatermarkGenerator implements
AssignerWithPeriodicWatermarks {

private final long maxOutOfOrderness = 0;
private long currentMaxTimestamp;

@Override
public long extractTimestamp(MyPojo myPojo, long previousTimestamp) {
long timestamp = myPojo.getInitiationTime().toEpochMilli();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}


*Code for WatermarkStrategy :*

WatermarkStrategy watermarkStrategy =
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner((event, timestamp) ->
event.getInitiationTime().toEpochMilli());


Thanks & regards,
Arti


On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann  wrote:

> Hi Arti,
>
> thanks for sharing this feedback with us. The WatermarkStrategy has been
> introduced quite recently and might have some rough edges. I am pulling in
> Aljoscha and Klou who have worked on this feature and might be able to help
> you. For better understanding your problem, it would be great if you could
> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.
>
> For the file source, the Flink community has recently introduced a new
> source abstraction which will also support checkpoints for file sources
> once the file source connector has been migrated to the new interfaces. The
> community is currently working on it.
>
> Cheers,
> Till
>
> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande  wrote:
>
>> Hi,
>>
>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>> the watermark generation has issues with file source alone. It works well
>> with Kafka source.
>>
>> With 1.9.2 a custom watermark generator implementation of
>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>> deprecated and to be replaced with WatermarkStrategy (that combines both
>> WatermarkGenerator and TimestampAssigner).
>>
>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>> perfectly well but with file source none of them works. The watermark
>> assigner never increments the watermarks resulting in stateful operators
>> not clearing their state ever, leading to erroneous results and
>> continuously increasing memory usage.
>>
>> Same code works well with Kafka source. Is this a known issue? If so, any
>> fix planned shortly?
>>
>> A side note (and probably a candidate for separate email, but I will
>> write it here) even checkpoints do not work with File Source since 1.9.2
>> and it is still the problem with 1.11.1. Just wondering if File source with
>> stream API is not a priority in Flink development? If so we can rethink our
>> sources.
>>
>> Thanks & regards,
>> Arti
>>
>


Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread Benchao Li
每秒1多条不算少了吧,如果还想再高一些,可以提高一下sink.buffer-flush.max-rows配置,默认是100

LittleFall <1578166...@qq.com> 于2020年8月20日周四 下午7:56写道:

> 这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink.
>
> ```java
> package main;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
>
> public class Main {
>
> public static void main(String[] args) {
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(
> StreamExecutionEnvironment.getExecutionEnvironment(),
>
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> );
>
> tEnv.executeSql("CREATE TABLE gen_stuff (\n" +
> "\tstuff_id int,\n" +
> "\tstuff_base_id int,\n" +
> "\tstuff_name varchar(20)\n" +
> ") WITH (\n" +
> " 'connector' = 'datagen'," +
> "'rows-per-second'='1000'," +
> "'fields.stuff_id.kind'='sequence'," +
> "'fields.stuff_id.start'='1'," +
> "'fields.stuff_id.end'='1000'," +
> "'fields.stuff_name.length'='15'" +
> ")"
> );
> tEnv.executeSql("CREATE TABLE result_stuff (\n" +
> "\tstuff_id int,\n" +
> "\tstuff_base_id int,\n" +
> "\tstuff_name varchar(20)\n" +
> ") WITH (\n" +
> "\t'connector'  = 'jdbc',\n" +
> "\t'url'=
> 'jdbc:mysql://127.0.0.1:3306/test?rewritebatchedstatements=true',\n" +
> "\t'table-name' = 'result_stuff',\n" +
> "\t'username'   = 'root',\n" +
> "\t'password'   = ''\n" +
> ")"
> );
>
> tEnv.executeSql("insert into result_stuff select stuff_id,
> stuff_base_id, stuff_name from gen_stuff");
> }
> }
> ```
>
> 然而,mysql 每秒大约只多 1 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。
>
> 请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。
>
> 我使用的和 jdbc 有关的依赖如下:
>
> ```xml
> 
> org.apache.flink
>
> flink-connector-jdbc_${scala.binary.version}
> ${flink.version}
> 
> 
> mysql
> mysql-connector-java
> 8.0.21
> 
> ```
>
> (作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Piotr Nowojski
Thank you for the clarification Chesney and sorry for the incorrect
previous answer.

Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler  napisał(a):

> This is incorrect; we do store the JobGraph in ZooKeeper. If you just
> delete the deployment the cluster will recover the previous JobGraph
> (assuming you aren't changing the Zookeeper configuration).
>
> If you wish to update the job, then you should cancel it (along with
> creating a savepoint), which will clear the Zookeeper state, and then
> create a new deployment
>
> On 20/08/2020 15:43, Piotr Nowojski wrote:
>
> Hi Alexey,
>
> I might be wrong (I don't know this side of Flink very well), but as far
> as I know JobGraph is never stored in the ZK. It's always recreated from
> the job's JAR. So you should be able to upgrade the job by replacing the
> JAR with a newer version, as long as the operator UIDs are the same before
> and after the upgrade (for operator state to match before and after the
> upgrade).
>
> Best, Piotrek
>
> czw., 20 sie 2020 o 06:34 Alexey Trenikhun  napisał(a):
>
>> Hello,
>>
>> Let's say I run Flink Job cluster with persistent storage and Zookeeper
>> HA on k8s with single  JobManager and use externalized checkpoints. When JM
>> crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from
>> ZK and restore from latest checkpoint. Now let's say I want to upgrade job
>> binary, I delete deployments, create new deployments referring to newer
>> image, will JM still read JobGraph from ZK or will create new one from new
>> job jar?
>>
>> Thanks,
>> Alexey
>>
>
>


Re: Stream job with Event Timestamp (Backfill getting incorrect results)

2020-08-20 Thread Piotr Nowojski
Hi,

It's hard for me to help you debug your code, but as long as:
- you are using event time for processing records (in operators like
`WindowOperator`)
- you do not have late records
- you are replaying the same records
- your code is deterministic
- you do not rely on the order of the records

Flink should behave deterministically and the results should be the same.

Maybe try to write unit tests/integration tests for your operators/logic
and feed some pre computed input? Or try to reproduce the problem and then
narrow it down to some single user_id/key, create a unit test/it case for
this and debug your code in a debugger on a local machine?

One thing to note, are you sure you are reprocessing the same records?
Kafka has for example the concept of retention time, after it can drop
older records from the topic.

Piotrek

czw., 20 sie 2020 o 12:14 aj  napisał(a):

> I have a streaming job where I am doing window operation on *"user_id" *and
> then doing some summarization based on some time bases logic like :
>
> 1.  end the session based on 30 mins inactivity of the user.
> 2.  The End_trip event or cancellation event has arrived for the user.
>
> I am trying to rerun the job with some old offset for backfilling then I
> am getting wrong results. Some of the sessions is ending with same start
> and end time.  How to control the streaming job when lot of data get
> accumulated in Kafka and I have to replay the job. Please help me what is
> going wrong.
>
> My assumption is it may be due to:
> 1. Out of order events
> 2. I am reading data from multiple topics so the end_trip event that is
> happening at a later time can be read before and end the session.
>
> I am using keyedProcessFunction like this :
>
> public class DemandFunnelProcessFunction extends
>
> KeyedProcessFunction, 
> DemandFunnelSummaryTuple> {
>
>   private static final Logger LOGGER = 
> LoggerFactory.getLogger(DemandFunnelProcessFunction.class);
>
>   private transient ValueState sessionSummary;
>   private transient ValueState> distanceListState;
>
>   @SuppressWarnings("checkstyle:LocalVariableName")
>   @Override
>   public void processElement(Tuple2 recordTuple2, 
> Context context,
>   Collector collector) throws Exception {
>
> GenericRecord record = recordTuple2.f1;
>
>
> String event_name = record.get("event_name").toString();
> long event_ts = (Long) record.get("event_ts");
>
> DemandFunnelSummaryTuple currentTuple = sessionSummary.value();
> ArrayList distanceList =
> distanceListState.value() != null ? distanceListState.value() : new 
> ArrayList();
>
> try {
>
>   if (currentTuple == null) {
> currentTuple = new DemandFunnelSummaryTuple();
> String demandSessionId = UUID.randomUUID().toString();
> currentTuple.setDemandSessionId(demandSessionId);
> currentTuple.setStartTime(event_ts);
> currentTuple.setUserId(recordTuple2.f0);
> currentTuple.setEventName("demand_funnel_summary");
> int geo_id = record.get("geo_id") != null ? (int) 
> record.get("geo_id") : 0;
> currentTuple.setGeoId(geo_id);
>   }
>   long endTime = currentTuple.getEndTime();
>
>   if (event_name.equals("search_list_keyless")) {
> //System.out.println("inside search_list_keyless " + recordTuple2.f0);
> currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
> SearchEventUtil.searchSummaryCalculation(record, currentTuple, 
> distanceList);
>   }
>
>
>   currentTuple.setEndTime(event_ts);
>   sessionSummary.update(currentTuple);
>   distanceListState.update(distanceList);
>
>   if (event_name.equals("keyless_booking_cancellation") || event_name
>   .equals("keyless_end_trip")) {
> try {
>
>   DemandFunnelSummaryTuple sessionSummaryTuple = 
> sessionSummary.value();
>
>   if (sessionSummaryTuple != null) {
> sessionSummaryTuple.setAvgResultCount(
> (double) distanceList.size() / 
> sessionSummaryTuple.getTotalSearch());
> if (distanceList.size() > 0) {
>   int distanceSum = distanceList.stream()
>   .collect(Collectors.summingInt(Integer::intValue));
>   sessionSummaryTuple.setAvgBikeDistance((double) distanceSum / 
> distanceList.size());
>   sessionSummaryTuple
>   
> .setP50DistNearestBike(SearchEventUtil.percentile(distanceList, 50));
>   sessionSummaryTuple
>   
> .setP90DistNearestBike(SearchEventUtil.percentile(distanceList, 90));
> }
> sessionSummaryTuple.setEndTime(event_ts);
> collector.collect(sessionSummaryTuple);
>   }
> } catch (Exception e) {
>   DemandFunnelSummaryTuple sessionSummaryTuple = 
> sessionSummary.value();
>   LOGGER.info("Error in collecting event for user_id " + 
> sessionSummaryTuple.getUserId());
>   

Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Chesnay Schepler
This is incorrect; we do store the JobGraph in ZooKeeper. If you just 
delete the deployment the cluster will recover the previous JobGraph 
(assuming you aren't changing the Zookeeper configuration).


If you wish to update the job, then you should cancel it (along with 
creating a savepoint), which will clear the Zookeeper state, and then 
create a new deployment


On 20/08/2020 15:43, Piotr Nowojski wrote:

Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as 
far as I know JobGraph is never stored in the ZK. It's always 
recreated from the job's JAR. So you should be able to upgrade the job 
by replacing the JAR with a newer version, as long as the operator 
UIDs are the same before and after the upgrade (for operator state to 
match before and after the upgrade).


Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun > napisał(a):


Hello,

Let's say I run Flink Job cluster with persistent storage and
Zookeeper HA on k8s with single  JobManager and use externalized
checkpoints. When JM crashes, k8s will restart JM pod, and JM will
read JobId and JobGraph from ZK and restore from latest
checkpoint. Now let's say I want to upgrade job binary, I delete
deployments, create new deployments referring to newer image, will
JM still read JobGraph from ZK or will create new one from new job
jar?

Thanks,
Alexey





Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Piotr Nowojski
Hi,

It looks more like a dependency convergence issue - you have two
conflicting versions of
`org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest`
on the class path. Or you built your jar with one version and trying to
execute it with a different one.

Till is it some kind of a known issue?

Piotrek


czw., 20 sie 2020 o 06:48 Vijayendra Yadav 
napisał(a):

> Hi Team,
>
> Getting the following error when using RocksDBStateBackend on yarn/EMR. Am
> I missing any dependencies?
>
>
> 2020-08-20 04:37:00,713 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> java.lang.NoSuchMethodError: 
> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
> 2020-08-20 04:37:00,714 INFO  
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - 
> Interrupted while waiting for queue
> java.lang.InterruptedException
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
>   at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:323)
> 2020-08-20 04:37:00,714 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Stopping 
> callback due to:
> java.lang.NoSuchMethodError: 
> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
> 2020-08-20 04:37:00,714 ERROR org.apache.flink.yarn.YarnResourceManager   
>   - Fatal error occurred in ResourceManager.
> java.lang.NoSuchMethodError: 
> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
> 2020-08-20 04:37:00,714 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error 
> occurred in the cluster entrypoint.
> java.lang.NoSuchMethodError: 
> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
>   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:280)
>   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
> 2020-08-20 04:37:00,718 INFO  org.apache.flink.runtime.blob.BlobServer
>
> - Stopped BLOB server at 0.0.0.0:45627
>
>
> Regards,
>
> Vijay
>
>


Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Piotr Nowojski
Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as far as
I know JobGraph is never stored in the ZK. It's always recreated from the
job's JAR. So you should be able to upgrade the job by replacing the JAR
with a newer version, as long as the operator UIDs are the same before and
after the upgrade (for operator state to match before and after the
upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun  napisał(a):

> Hello,
>
> Let's say I run Flink Job cluster with persistent storage and Zookeeper HA
> on k8s with single  JobManager and use externalized checkpoints. When JM
> crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from
> ZK and restore from latest checkpoint. Now let's say I want to upgrade job
> binary, I delete deployments, create new deployments referring to newer
> image, will JM still read JobGraph from ZK or will create new one from new
> job jar?
>
> Thanks,
> Alexey
>


Re: No space left on device exception

2020-08-20 Thread Piotr Nowojski
Hi,

As far as I know when uploading a file to S3, the writer needs to first
create some temporary files on the local disks. I would suggest to double
check all of the partitions on the local machine and monitor available disk
space continuously while the job is running. If you are just checking the
free space manually, you can easily miss a point of time when you those
temporary files are too big and approaching the available disk space usage,
as I'm pretty sure those temporary files are cleaned up immediately after
throwing this exception that you see.

Piotrek

czw., 20 sie 2020 o 00:56 Vishwas Siravara  napisał(a):

> Hi guys,
> I have a deduplication job that runs on flink 1.7, that has some state
> which uses FsState backend. My TM heap size is 16 GB. I see the below error
> while trying to checkpoint a state of size 2GB. There is enough space
> available in s3, I tried to upload larger files and they were all
> successful. There is also enough disk space in the local file system, the
> disk utility tool does not show anything suspicious. Whenever I try to
> start my job from the last successful checkpoint , it runs into the same
> error. Can someone tell me what is the cause of this issue? Many thanks.
>
>
> Note: This error goes away when I delete io.tmp.dirs and restart the job
> from last checkpoint , but the disk utility tool does not show much usage
> before deletion, so I am not able to figure out what the problem is.
>
> 2020-08-19 21:12:01,909 WARN
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
> not close the state stream for s3p://featuretoolkit.c
> heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
> 1363 java.io.IOException: No space left on device
> 1364 at java.io.FileOutputStream.writeBytes(Native Method)
> 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326)
> 1366 at
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 1367 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
> 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 1370 at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
> 1371 at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> 1372 at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
> 1373 at
> org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> 1374 at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> 1375 at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269)
> 1376 at
> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58)
> 1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
> 1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250)
> 1379 at
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
> 1380 at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:185)
> 1381 at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:84)
> 1382 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 1383 at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
> 1384 at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
> 1385 at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> 1386 at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 1387 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 1388 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 1389 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 1390 at java.lang.Thread.run(Thread.java:748)
> 1391 Suppressed: java.io.IOException: No space left on device
> 1392 at java.io.FileOutputStream.writeBytes(Native Method)
> 1393 at java.io.FileOutputStream.write(FileOutputStream.java:326)
> 1394 at
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 1395 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 1396 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 1397 at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> 1398 ... 21 more
>
>
> Thanks,
> Vishwas
>


Re: Decompose failure recovery time

2020-08-20 Thread Piotr Nowojski
Hi,

> I want to decompose the recovery time into different parts, say
> (1) the time to detect the failure,
> (2) the time to restart the job,
> (3) and the time to restore the checkpointing.

1. Maybe I'm missing something, but as far as I can tell, Flink can not
help you with that. Time to detect the failure, would be a time between the
failure occurred, and the time when JobManager realises about this failure.
If we could reliably measure/check when the first one happened, then we
could immediately trigger failover. You are interested in this exactly
because there is no reliable way to detect the failure immediately. You
could approximate this via analysing the logs.

2. Maybe there are some metrics that you could use, if not you check use
the REST API [1] to monitor for the job status. Again you could also do it
via analysing the logs.

3. In the future this might be measurable using the REST API (similar as
the point 2.), but currently there is no way to do it that way. There is a
ticket for that [2]. I think currently the only way is to do it is via
analysing the logs.

If you just need to do this once, I would analyse the logs manually. If you
want to do it many times or monitor this continuously, I would write some
simple script (python?) to mix checking REST API calls for 2. with logs
analysing.

Piotrek


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs
[2] https://issues.apache.org/jira/browse/FLINK-17012
wt., 18 sie 2020 o 04:07 Zhinan Cheng  napisał(a):

> Hi all,
>
> I am working on measuring the failure recovery time of Flink and I
> want to decompose the recovery time into different parts, say the time
> to detect the failure, the time to restart the job, and the time to
> restore the checkpointing.
>
> Unfortunately, I cannot find  any information in Flink doc to solve
> this, Is there any way that Flink has provided for this, otherwise,
> how can I solve this?
>
> Thanks a lot for your help.
>
> Regards,
> Juno
>


Flink 启动问题

2020-08-20 Thread guaishushu1...@163.com
 大佬们知道 flink 的-yt命令是不支持多个目录吗,而且只能上传到集群.jar文件吗???



guaishushu1...@163.com


Re: flink 1.11.1 sql client 流式 join 出现预期之外的含 null 行

2020-08-20 Thread LittleFall
本问题最后可以归结到这个问题: 请问在 flink sql 中建立的多张表应当怎样分辨接收 kafka 传来的 canal-json?

  

并且已经解决。

谢谢你的帮助



--
Sent from: http://apache-flink.147419.n8.nabble.com/


JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread LittleFall
这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink.

```java
package main;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;

public class Main {

public static void main(String[] args) {
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment(),
   
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
);

tEnv.executeSql("CREATE TABLE gen_stuff (\n" +
"\tstuff_id int,\n" +
"\tstuff_base_id int,\n" +
"\tstuff_name varchar(20)\n" +
") WITH (\n" +
" 'connector' = 'datagen'," +
"'rows-per-second'='1000'," +
"'fields.stuff_id.kind'='sequence'," +
"'fields.stuff_id.start'='1'," +
"'fields.stuff_id.end'='1000'," +
"'fields.stuff_name.length'='15'" +
")"
);
tEnv.executeSql("CREATE TABLE result_stuff (\n" +
"\tstuff_id int,\n" +
"\tstuff_base_id int,\n" +
"\tstuff_name varchar(20)\n" +
") WITH (\n" +
"\t'connector'  = 'jdbc',\n" +
"\t'url'=
'jdbc:mysql://127.0.0.1:3306/test?rewritebatchedstatements=true',\n" +
"\t'table-name' = 'result_stuff',\n" +
"\t'username'   = 'root',\n" +
"\t'password'   = ''\n" +
")"
);

tEnv.executeSql("insert into result_stuff select stuff_id,
stuff_base_id, stuff_name from gen_stuff");
}
}
```

然而,mysql 每秒大约只多 1 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。

请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。

我使用的和 jdbc 有关的依赖如下:

```xml

org.apache.flink
   
flink-connector-jdbc_${scala.binary.version}
${flink.version}


mysql
mysql-connector-java
8.0.21

```

(作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于hive的一个疑问

2020-08-20 Thread Bruce
hi,all.

hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗?




比如hive里面存储了MySQL,Oracle的表元数据信息,可以用hivecatalog读取到具体的表数据吗?




发自我的iPhone

Stream job with Event Timestamp (Backfill getting incorrect results)

2020-08-20 Thread aj
I have a streaming job where I am doing window operation on *"user_id" *and
then doing some summarization based on some time bases logic like :

1.  end the session based on 30 mins inactivity of the user.
2.  The End_trip event or cancellation event has arrived for the user.

I am trying to rerun the job with some old offset for backfilling then I am
getting wrong results. Some of the sessions is ending with same start and
end time.  How to control the streaming job when lot of data get
accumulated in Kafka and I have to replay the job. Please help me what is
going wrong.

My assumption is it may be due to:
1. Out of order events
2. I am reading data from multiple topics so the end_trip event that is
happening at a later time can be read before and end the session.

I am using keyedProcessFunction like this :

public class DemandFunnelProcessFunction extends

KeyedProcessFunction,
DemandFunnelSummaryTuple> {

  private static final Logger LOGGER =
LoggerFactory.getLogger(DemandFunnelProcessFunction.class);

  private transient ValueState sessionSummary;
  private transient ValueState> distanceListState;

  @SuppressWarnings("checkstyle:LocalVariableName")
  @Override
  public void processElement(Tuple2 recordTuple2,
Context context,
  Collector collector) throws Exception {

GenericRecord record = recordTuple2.f1;


String event_name = record.get("event_name").toString();
long event_ts = (Long) record.get("event_ts");

DemandFunnelSummaryTuple currentTuple = sessionSummary.value();
ArrayList distanceList =
distanceListState.value() != null ? distanceListState.value()
: new ArrayList();

try {

  if (currentTuple == null) {
currentTuple = new DemandFunnelSummaryTuple();
String demandSessionId = UUID.randomUUID().toString();
currentTuple.setDemandSessionId(demandSessionId);
currentTuple.setStartTime(event_ts);
currentTuple.setUserId(recordTuple2.f0);
currentTuple.setEventName("demand_funnel_summary");
int geo_id = record.get("geo_id") != null ? (int)
record.get("geo_id") : 0;
currentTuple.setGeoId(geo_id);
  }
  long endTime = currentTuple.getEndTime();

  if (event_name.equals("search_list_keyless")) {
//System.out.println("inside search_list_keyless " + recordTuple2.f0);
currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
SearchEventUtil.searchSummaryCalculation(record, currentTuple,
distanceList);
  }


  currentTuple.setEndTime(event_ts);
  sessionSummary.update(currentTuple);
  distanceListState.update(distanceList);

  if (event_name.equals("keyless_booking_cancellation") || event_name
  .equals("keyless_end_trip")) {
try {

  DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();

  if (sessionSummaryTuple != null) {
sessionSummaryTuple.setAvgResultCount(
(double) distanceList.size() /
sessionSummaryTuple.getTotalSearch());
if (distanceList.size() > 0) {
  int distanceSum = distanceList.stream()
  .collect(Collectors.summingInt(Integer::intValue));
  sessionSummaryTuple.setAvgBikeDistance((double)
distanceSum / distanceList.size());
  sessionSummaryTuple

.setP50DistNearestBike(SearchEventUtil.percentile(distanceList, 50));
  sessionSummaryTuple

.setP90DistNearestBike(SearchEventUtil.percentile(distanceList, 90));
}
sessionSummaryTuple.setEndTime(event_ts);
collector.collect(sessionSummaryTuple);
  }
} catch (Exception e) {
  DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
  LOGGER.info("Error in collecting event for user_id " +
sessionSummaryTuple.getUserId());
  e.printStackTrace();
}
sessionSummary.clear();
distanceListState.clear();
  }
} catch (Exception e) {
  LOGGER.info("error in processing event --" + recordTuple2.f1.toString());
  LOGGER.info(e.toString());
  e.printStackTrace();

}
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx,
Collector out)
  throws Exception {

try {
  DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
  if (sessionSummaryTuple != null) {
System.out.println(
"calling on timer" + sessionSummaryTuple.getUserId() + " "
+ sessionSummaryTuple
.getEndTime() + "  " + timestamp);
ArrayList distanceList = distanceListState.value();
if (distanceList != null && distanceList.size() > 0) {
  sessionSummaryTuple
  .setAvgResultCount(
  (double) distanceList.size() /
sessionSummaryTuple.getTotalSearch());
  int distanceSum =
distanceList.stream().collect(Collectors.summingInt(Integer::intValue));
  sessionSummaryTuple.setAvgBikeDistance((double) 

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread shizk233
理解了!感谢!

Benchao Li  于2020年8月20日周四 下午6:00写道:

> 不同的key应该用的是同一个state的实例,但是state下面会处理不同的key对应的state,也就是key对于用户来说是透明的。
> 比如你用一个MapState,那就是约等于每个key都有一个Map实例,不同key之间是独立的。
>
> shizk233  于2020年8月20日周四 下午5:03写道:
>
> > 谢谢大佬解答。
> > 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例,
> > 那么数据流上的不同key(不是map里的hash key)是怎么保证只获取到对应的那部分map state数据呢?
> >
> > 按我的理解,按key分流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。
> >
> > Benchao Li  于2020年8月20日周四 下午4:40写道:
> >
> > > Hi,
> > >
> > > 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。
> > >
> > > shizk233  于2020年8月20日周四 下午2:22写道:
> > >
> > > > Hi all,
> > > >
> > > > 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。
> > > > 问题1:
> > > > 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系?
> > > > 还是这两个方法是顺序执行的?
> > > >
> > > > 问题2:
> > > > 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的
> > > > Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


state序列化问题

2020-08-20 Thread shizk233
Hi all,

请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化,
那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList,
会对类型信息提取产生不良影响吗?

按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。
但是都可以作为List>来声明。

请求野生的大佬支援一下!


Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread Benchao Li
不同的key应该用的是同一个state的实例,但是state下面会处理不同的key对应的state,也就是key对于用户来说是透明的。
比如你用一个MapState,那就是约等于每个key都有一个Map实例,不同key之间是独立的。

shizk233  于2020年8月20日周四 下午5:03写道:

> 谢谢大佬解答。
> 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例,
> 那么数据流上的不同key(不是map里的hash key)是怎么保证只获取到对应的那部分map state数据呢?
>
> 按我的理解,按key分流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。
>
> Benchao Li  于2020年8月20日周四 下午4:40写道:
>
> > Hi,
> >
> > 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。
> >
> > shizk233  于2020年8月20日周四 下午2:22写道:
> >
> > > Hi all,
> > >
> > > 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。
> > > 问题1:
> > > 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系?
> > > 还是这两个方法是顺序执行的?
> > >
> > > 问题2:
> > > 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的
> > > Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread shizk233
谢谢Cayden 我去看看

Cayden chen <1193216...@qq.com> 于2020年8月20日周四 下午4:58写道:

> hi, 你可以看下这个,讲的挺详细的
> https://blog.csdn.net/yuchuanchen/article/details/105677408
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2020年8月20日(星期四) 下午4:39
> 收件人:"user-zh"
> 主题:Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能
>
>
>
> Hi,
>
> 问题12 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。
>
> shizk233 
>  Hi all,
> 
>  请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。
>  问题1:
>  如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系?
>  还是这两个方法是顺序执行的?
> 
>  问题2:
>  虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的
>  Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?
> 
>
>
> --
>
> Best,
> Benchao Li


DDL中声明主键会报类型不匹配

2020-08-20 Thread xiao cai
Hi:
flink版本1.11.0 connector为kafka
DDL中声明某个字段为primary key时,会报类型不匹配,去掉primary key constraint就可以正常执行。
把shop_id设置为 varchar not null也不行。


org.apache.flink.table.api.ValidationException: Type STRING NOT NULL of table 
field 'shop_id' does not match with the physical type STRING of the 'shop_id' 
field of the TableSource return type.


SQL如下:
create table source_0 (  
  `shop_id` varchar,  
  `user_id` bigint,  
  `category_id` int,  
  `ts` bigint,  
  `proc_time` as PROCTIME(),  
  `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, '-MM-dd 
HH:mm:ss')),  
  watermark for event_time AS event_time, 
  PRIMARY KEY (shop_id, user_id) NOT ENFORCED 
  ) with (  
  'connector.type' = 'kafka',  


  )

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread shizk233
谢谢大佬解答。
想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例,
那么数据流上的不同key(不是map里的hash key)是怎么保证只获取到对应的那部分map state数据呢?

按我的理解,按key分流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。

Benchao Li  于2020年8月20日周四 下午4:40写道:

> Hi,
>
> 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。
>
> shizk233  于2020年8月20日周四 下午2:22写道:
>
> > Hi all,
> >
> > 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。
> > 问题1:
> > 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系?
> > 还是这两个方法是顺序执行的?
> >
> > 问题2:
> > 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的
> > Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?
> >
>
>
> --
>
> Best,
> Benchao Li
>


?????? KeyedCoProcessFunction??????????????????????????????????

2020-08-20 Thread Cayden chen
hi, 
https://blog.csdn.net/yuchuanchen/article/details/105677408


----
??: 
   "user-zh"



Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-20 Thread Stephan Ewen
We have removed some public methods in the past, after a careful
deprecation period, if they were not well working any more.

The sentiment I got from users is that careful cleanup is in fact
appreciated, otherwise things get confusing over time (the deprecated
methods cause noise in the API).
Still, we need to be very careful here.

I would suggest to
  - start with the non-public breaking methods
  - remove fold() (very long deprecated)
  - remove split() buggy

Removing the env.socketStream() and env.fileStream() methods would probably
be good, too. They are very long deprecated and they don't work well (with
checkpoints) and the sources are the first thing a user needs to understand
when starting with Flink, so removing noise there is super helpful.


On Thu, Aug 20, 2020 at 8:53 AM Dawid Wysakowicz 
wrote:

> Hey Till,
>
> You've got a good point here. Removing some of the methods would cause
> breaking the stability guarantees. I do understand if we decide not to
> remove them for that reason, let me explain though why I am thinking it
> might make sense to remove them already. First of all I am a bit afraid it
> might take a long time before we arrive at the 2.0 version. We have not
> ever discussed that in the community. At the same time a lot of the methods
> already don't work or are buggy, and we do not fix them any more.
>
> Methods which removing would not break the Public guarantees:
>
>- ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
>- RuntimeContext#getAllAccumulators (deprecated in 0.10)
>- ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
>- 
> StreamExecutionEnvironment#setNumberOfExecutionRetries/getNumberOfExecutionRetries
>(not the equivalent in the ExecutionConfig)
>- StreamExecutionEnvironment#setStateBackend(AbstractStateBackend)
>(deprecated in 1.5)
>
> Methods which removing would break the Public guarantees:
>
> which have no effect:
>
>- ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
>- ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
>
> which are buggy or discouraged and thus we do not support fixing them:
>
>- DataStream#split (deprecated in 1.8)
>- DataStream#fold and all related classes and methods such as
>FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated in
>1.3/1.4)
>
> The methods like:
>
>- 
> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...),
>
>- methods in (Connected)DataStream that specify keys as either indices
>or field names
>-
>ExecutionConfig#setNumberOfExecutionRetries/getNumberOfExecutionRetries
>
> should be working just fine and I feel the least eager to remove those.
>
> I'd suggest I will open PRs for removing the methods that will not cause
> breakage of the Public guarantees as the general feedback was rather
> positive. For the rest I do understand the resentment to do so and will not
> do it in the 1.x branch. Still I think it is valuable to have the
> discussion.
>
> Best,
>
> Dawid
>
>
> On 18/08/2020 09:26, Till Rohrmann wrote:
>
> Having looked at the proposed set of methods to remove I've noticed that
> some of them are actually annotated with @Public. According to our
> stability guarantees, only major releases (1.0, 2.0, etc.) can break APIs
> with this annotation. Hence, I believe that we cannot simply remove them
> unless the community decides to change the stability guarantees we give or
> by making the next release a major release (Flink 2.0).
>
> Cheers,
> Till
>
> On Tue, Aug 18, 2020 at 5:57 AM Yun Gao  wrote:
>
>> +1 for removing the methods that are deprecated for a while & have
>> alternative methods.
>>
>> One specific thing is that if we remove the DataStream#split, do we
>> consider enabling side-output in more operators in the future ? Currently
>> it should be only available in ProcessFunctions, but not available to other
>> commonly used UDF like Source or AsyncFunction[1].
>>
>> One temporary solution occurs to me is to add a ProcessFunction after the
>> operators want to use side-output. But I think the solution is not very
>> direct to come up with and if it really works we might add it to the
>> document of side-output.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-7954
>>
>> Best,
>>  Yun
>>
>> --Original Mail --
>> *Sender:*Kostas Kloudas 
>> *Send Date:*Tue Aug 18 03:52:44 2020
>> *Recipients:*Dawid Wysakowicz 
>> *CC:*dev , user 
>> *Subject:*Re: [DISCUSS] Removing deprecated methods from DataStream API
>>
>>> +1 for removing them.
>>>
>>>
>>>
>>> From a quick look, most of them (not all) have been deprecated a long
>>> time ago.
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Kostas
>>>
>>>
>>>
>>> On Mon, Aug 17, 2020 at 9:37 PM Dawid Wysakowicz wrote:
>>>
>>> >
>>>
>>> > @David Yes, my idea was to remove any use of fold method and all
>>> related classes including WindowedStream#fold
>>>
>>> >

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread Benchao Li
Hi,

问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。

shizk233  于2020年8月20日周四 下午2:22写道:

> Hi all,
>
> 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。
> 问题1:
> 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系?
> 还是这两个方法是顺序执行的?
>
> 问题2:
> 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的
> Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?
>


-- 

Best,
Benchao Li


Re: 有什么方式基于yarn集群直接运行flink任务(仅部署yarn集群,不部署flink)

2020-08-20 Thread caozhen
赵一旦 wrote
> 没太懂,yarn部分没自己部署过,yarn集群部署好(假设5台机器),那么这5台机器上不部署任何flink相关dist包就可以嘛。
> 比如我从额外一台机器6作为提交任务的机器,向yarn集群提交flink任务。但是我的jar也只包含用户jar呀,yarn容器中运行的jobmanager/taskmanager等进程使用的flink的dist包从哪来呢?
> 
> 
> 徐骁 

> ffxrqyzby@

>  于2020年8月19日周三 下午7:58写道:
> 
>> flink yarn 有个 job 发布方式,
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
>> 提交包就行了, 会自动拉起 Flink 集群运行任务, 任务结束集群自动销毁
>>
>> 赵一旦 

> hinobleyd@

>  于2020年8月19日周三 下午5:54写道:
>>
>> > 如题,我直接5机器部署yarn集群,上边未部署flink。
>> > 能否直接提交flink任务到该集群呢?
>> > 类似于打包提交的任务是一个包括了flink完整包的效果。
>> >
>>





yarn
per-job模式,在一台机器放个flink客户端,通过客户端提交作业到yarn集群,提交过程中,flink依赖的classpath会被上传到hdfs,使用的时候分发到TM



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Till Rohrmann
This is indeed not optimal. Could you file a JIRA issue to add this
functionality? Thanks a lot Yuval.

Cheers,
Till

On Thu, Aug 20, 2020 at 9:47 AM Yuval Itzchakov  wrote:

> Hi Till,
> KafkaSerializationSchema is only pluggable for the DataStream API, not for
> the Table API. KafkaTableSink hard codes a KeyedSerializationSchema that
> uses a null key, and this behavior can't be overridden.
>
> I have to say I was quite surprised by this behavior, as publishing events
> to Kafka using a key to keep order inside a given partition is usually a
> very common requirement.
>
> On Thu, Aug 20, 2020 at 10:26 AM Till Rohrmann 
> wrote:
>
>> Hi Yuval,
>>
>> it looks as if the KafkaTableSink only supports writing out rows without
>> a key. Pulling in Timo for verification.
>>
>> If you want to use a Kafka producer which writes the records out with a
>> key, then please take a look at KafkaSerializationSchema. It supports this
>> functionality.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 19, 2020 at 6:36 PM Yuval Itzchakov 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm running Flink 1.9.0 and I'm trying to set the key to be published by
>>> the Table API's Kafka Connector. I've searched the documentation by
>>> could find no reference for such an ability.
>>>
>>> Additionally, while browsing the code of the KafkaTableSink, it looks
>>> like it creates a KeyedSerializationSchemaWrapper which just sets the key
>>> to null?
>>>
>>> Would love some help.
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Same kafka partition being consumed by multiple task managers.

2020-08-20 Thread Deshpande, Omkar
Hello,

I am running a streaming Beam app with the Flink runner(java).

  *   Beam 2.19

  *   Flink 1.9

Checkpoints and savepoints are configured to go to s3 and HA is enabled using 
Zookeeper.

I was running the app with 3 task managers. I took a savepoint and started the 
app with 6 task managers. My input topic has 12 partitions. With 3 pods, the 
partitions were distributed evenly. After restarting with the increased number 
of task managers, some partitions are being consumed by 2 task managers.
partition   task manager
0   4
1   4
2   1
3   1
4   3
5   3
6   4, 0
7   4, 0
8   1, 2
9   1, 2
10  3, 5
11  3, 5
 Looks like there were 3 task managers [1,3,4] and they correctly distributed 
partitions between them.
Then 3 new task managers were added [0,2,5] and partitions were not properly 
re-distributed.

Where could this metadata be coming from?

Omkar


Re: 有什么方式基于yarn集群直接运行flink任务(仅部署yarn集群,不部署flink)

2020-08-20 Thread 徐骁
这个命令会把 flink-dist 提交到 hdfs 上的

赵一旦  于2020年8月19日周三 下午10:10写道:

> 没太懂,yarn部分没自己部署过,yarn集群部署好(假设5台机器),那么这5台机器上不部署任何flink相关dist包就可以嘛。
>
> 比如我从额外一台机器6作为提交任务的机器,向yarn集群提交flink任务。但是我的jar也只包含用户jar呀,yarn容器中运行的jobmanager/taskmanager等进程使用的flink的dist包从哪来呢?
>
>
> 徐骁  于2020年8月19日周三 下午7:58写道:
>
> > flink yarn 有个 job 发布方式,
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > 提交包就行了, 会自动拉起 Flink 集群运行任务, 任务结束集群自动销毁
> >
> > 赵一旦  于2020年8月19日周三 下午5:54写道:
> >
> > > 如题,我直接5机器部署yarn集群,上边未部署flink。
> > > 能否直接提交flink任务到该集群呢?
> > > 类似于打包提交的任务是一个包括了flink完整包的效果。
> > >
> >
>


Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Yuval Itzchakov
Hi Till,
KafkaSerializationSchema is only pluggable for the DataStream API, not for
the Table API. KafkaTableSink hard codes a KeyedSerializationSchema that
uses a null key, and this behavior can't be overridden.

I have to say I was quite surprised by this behavior, as publishing events
to Kafka using a key to keep order inside a given partition is usually a
very common requirement.

On Thu, Aug 20, 2020 at 10:26 AM Till Rohrmann  wrote:

> Hi Yuval,
>
> it looks as if the KafkaTableSink only supports writing out rows without a
> key. Pulling in Timo for verification.
>
> If you want to use a Kafka producer which writes the records out with a
> key, then please take a look at KafkaSerializationSchema. It supports this
> functionality.
>
> Cheers,
> Till
>
> On Wed, Aug 19, 2020 at 6:36 PM Yuval Itzchakov  wrote:
>
>> Hi,
>>
>> I'm running Flink 1.9.0 and I'm trying to set the key to be published by
>> the Table API's Kafka Connector. I've searched the documentation by
>> could find no reference for such an ability.
>>
>> Additionally, while browsing the code of the KafkaTableSink, it looks
>> like it creates a KeyedSerializationSchemaWrapper which just sets the key
>> to null?
>>
>> Would love some help.
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

-- 
Best Regards,
Yuval Itzchakov.


Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Dawid Wysakowicz
Hi Yuval,

Unfortunately setting the key or timestamp (or other metadata) from the
SQL API is not supported yet. There is an ongoing discussion to support
it[1].

Right now your option would be to change the code of KafkaTableSink and
write your own version of KafkaSerializationSchema as Till mentioned.

Best,

Dawid


[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-107-Reading-table-columns-from-different-parts-of-source-records-td38277.html

On 20/08/2020 09:26, Till Rohrmann wrote:
> Hi Yuval,
>
> it looks as if the KafkaTableSink only supports writing out rows
> without a key. Pulling in Timo for verification.
>
> If you want to use a Kafka producer which writes the records out with
> a key, then please take a look at KafkaSerializationSchema. It
> supports this functionality.
>
> Cheers,
> Till
>
> On Wed, Aug 19, 2020 at 6:36 PM Yuval Itzchakov  > wrote:
>
> Hi,
>
> I'm running Flink 1.9.0 and I'm trying to set the key to be
> published by the Table API's Kafka Connector. I've searched the
> documentation by could find no reference for such an ability.
>
> Additionally, while browsing the code of the KafkaTableSink, it
> looks like it creates a KeyedSerializationSchemaWrapper which just
> sets the key to null?
>
> Would love some help.
>
> -- 
> Best Regards,
> Yuval Itzchakov.
>


signature.asc
Description: OpenPGP digital signature


Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Till Rohrmann
Hi Yuval,

it looks as if the KafkaTableSink only supports writing out rows without a
key. Pulling in Timo for verification.

If you want to use a Kafka producer which writes the records out with a
key, then please take a look at KafkaSerializationSchema. It supports this
functionality.

Cheers,
Till

On Wed, Aug 19, 2020 at 6:36 PM Yuval Itzchakov  wrote:

> Hi,
>
> I'm running Flink 1.9.0 and I'm trying to set the key to be published by
> the Table API's Kafka Connector. I've searched the documentation by
> could find no reference for such an ability.
>
> Additionally, while browsing the code of the KafkaTableSink, it looks like
> it creates a KeyedSerializationSchemaWrapper which just sets the key to
> null?
>
> Would love some help.
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Orc文件问题请教

2020-08-20 Thread abc15606
已经找到原因了,hive on spark情况下,hive读取不了orc

发自我的iPhone

> 在 2020年8月20日,14:50,Jingsong Li  写道:
> 
> 你可以贴下异常栈,
> 估计是ORC版本问题,如果你用file system的orc writer,那是比较新的版本。
> 建议你用下Hive的表来写,这样你可以选版本。
> 
> Best,
> Jingsong
> 
>> On Thu, Aug 20, 2020 at 12:10 PM  wrote:
>> 
>> 使用flink sql写到orc文件,以后,flink能读取出来,但是spark和hive均不能读取出来,impala能读取。
>> 
>> 发自我的iPhone
> 
> 
> 
> -- 
> Best, Jingsong Lee


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-20 Thread Dawid Wysakowicz
Hey Till,

You've got a good point here. Removing some of the methods would cause
breaking the stability guarantees. I do understand if we decide not to
remove them for that reason, let me explain though why I am thinking it
might make sense to remove them already. First of all I am a bit afraid
it might take a long time before we arrive at the 2.0 version. We have
not ever discussed that in the community. At the same time a lot of the
methods already don't work or are buggy, and we do not fix them any more.

Methods which removing would not break the Public guarantees:

  * ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
  * RuntimeContext#getAllAccumulators (deprecated in 0.10)
  * ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
  * 
StreamExecutionEnvironment#setNumberOfExecutionRetries/getNumberOfExecutionRetries
(not the equivalent in the ExecutionConfig)
  * StreamExecutionEnvironment#setStateBackend(AbstractStateBackend)
(deprecated in 1.5)

Methods which removing would break the Public guarantees:

which have no effect:

  * ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
  * ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)

which are buggy or discouraged and thus we do not support fixing them:

  * DataStream#split (deprecated in 1.8)
  * DataStream#fold and all related classes and methods such as
FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated
in 1.3/1.4)

The methods like:

  * 
StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...),

  * methods in (Connected)DataStream that specify keys as either indices
or field names
  * ExecutionConfig#setNumberOfExecutionRetries/getNumberOfExecutionRetries

should be working just fine and I feel the least eager to remove those.

I'd suggest I will open PRs for removing the methods that will not cause
breakage of the Public guarantees as the general feedback was rather
positive. For the rest I do understand the resentment to do so and will
not do it in the 1.x branch. Still I think it is valuable to have the
discussion.

Best,

Dawid


On 18/08/2020 09:26, Till Rohrmann wrote:
> Having looked at the proposed set of methods to remove I've noticed
> that some of them are actually annotated with @Public. According to
> our stability guarantees, only major releases (1.0, 2.0, etc.) can
> break APIs with this annotation. Hence, I believe that we cannot
> simply remove them unless the community decides to change the
> stability guarantees we give or by making the next release a major
> release (Flink 2.0).
>
> Cheers,
> Till
>
> On Tue, Aug 18, 2020 at 5:57 AM Yun Gao  > wrote:
>
> +1 for removing the methods that are deprecated for a while & have
> alternative methods.
>
> One specific thing is that if we remove the DataStream#split, do
> we consider enabling side-output in more operators in the future ?
> Currently it should be only available in ProcessFunctions, but not
> available to other commonly used UDF like Source or AsyncFunction[1].
>
> One temporary solution occurs to me is to add a ProcessFunction
> after the operators want to use side-output. But I think the
> solution is not very direct to come up with and if it really works
> we might add it to the document of side-output. 
>
> [1] https://issues.apache.org/jira/browse/FLINK-7954
>
> Best,
>  Yun
>
> --Original Mail --
> *Sender:*Kostas Kloudas  >
> *Send Date:*Tue Aug 18 03:52:44 2020
> *Recipients:*Dawid Wysakowicz  >
> *CC:*dev mailto:d...@flink.apache.org>>,
> user mailto:user@flink.apache.org>>
> *Subject:*Re: [DISCUSS] Removing deprecated methods from
> DataStream API
>
> +1 for removing them.
>
>
>
> From a quick look, most of them (not all) have been
> deprecated a long time ago.
>
>
>
> Cheers,
>
> Kostas
>
>
>
> On Mon, Aug 17, 2020 at 9:37 PM Dawid Wysakowicz wrote:
>
> >
>
> > @David Yes, my idea was to remove any use of fold method
> and all related classes including WindowedStream#fold
>
> >
>
> > @Klou Good idea to also remove the deprecated
> enableCheckpointing() &
> StreamExecutionEnvironment#readFile and alike. I did
> another pass over some of the classes and thought we could
> also drop:
>
> >
>
> > ExecutionConfig#set/getCodeAnalysisMode
>
> > ExecutionConfig#disable/enableSysoutLogging
>
> > ExecutionConfig#set/isFailTaskOnCheckpointError
>
> > ExecutionConfig#isLatencyTrackingEnabled
>
> >
>
> > As for the 

Re: 【bug】flink 1.11使用连接器时schema丢失DataType指定的物理信息

2020-08-20 Thread 赵 建云
是的,新的DynamicTable在开发中。table api下不能绑定物理类型的情况,导致了一些类型兼容问题,现在有了解决的方案。

赵建云
2020年8月20日

2020年8月20日 下午2:27,Jingsong Li 
mailto:jingsongl...@gmail.com>> 写道:

1.11 就用新的source sink接口吧

On Wed, Aug 19, 2020 at 12:43 AM 赵 建云 
mailto:zhaojianyu...@outlook.com>> wrote:

补充图片链接
创建连接器
http://image.zhaojianyun.com/mweb/bug1.png
TableSourceSinkFactory中的创建sink
http://image.zhaojianyun.com/mweb/bug2.png
TableSchema的运行时物理信息
http://image.zhaojianyun.com/mweb/bug3.png



2020年8月18日 下午10:09,赵 建云 
mailto:zhaojianyu...@outlook.com>> 写道:

hello all:
我在为flink 1.11开发新的连接器时,发现了问题。
连接器的旧版本是支持flink1.9的,最近升级了flink
1.11后,test中,发现创建连接器需要声明schema,schema需要使用TableSchema信息,TableSchema包含的DataType,DataType指定物理类型后,在TableSourceSinkFactory中,获得的schema中,丢失了前面指定的物理类型。
这个问题影响了source、sink。导致了启动时,检查类型不能通过。
例如
DataTypes.DATE().bridgedTo(java.sql.Date.class);中,在运行时物理类型java.sql.Date丢失了,实际使用的是java.time.LocalDate。

*
创建连接器
[创建连接器]
*
TableSourceSinkFactory中的创建sink
[TableSourceSinkFactory中的创建sink]
*
TableSchema的运行时物理信息
[TableSchema的运行时物理信息]

我在flink的jira没找到提交问题的按钮,so,就把问题发在了中文组里,请大家支持下这个问题~
赵建云
2020年8月18日



--
Best, Jingsong Lee



Re: Re: Re:Re: 用hive streaming写 orc文件的问题

2020-08-20 Thread Jingsong Li
这是bug,已经修复了,待发布

On Fri, Aug 14, 2020 at 6:05 PM flink小猪 <18579099...@163.com> wrote:

> 根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间
> 会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务,
> parquet表依然没有任何问题,而orc表任务无限重启。并报错。
>
> java.io.FileNotFoundException: File does not exist:
> hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at StreamExecCalc$21.processElement(Unknown Source) ~[?:?]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at 

Re:Re: Flink StreamingFileSink滚动策略

2020-08-20 Thread guoliang_wang1335
我去试试,谢谢啦。

















在 2020-08-20 14:19:41,"Jingsong Li"  写道:
>只要你继承CheckpointRollingPolicy,想怎么实现shouldRollOnEvent和shouldRollOnProcessingTime都行
>
>On Wed, Aug 19, 2020 at 6:20 PM guoliang_wang1335 
>wrote:
>
>> 请问,Flink StreamingFileSink使用批量写Hadoop SequenceFile
>> format,能自定义滚动策略吗?我想指定文件大小、文件最长未更新时间和checponit来进行滚动,可以通过实现RollingPolicy接口来定制吗?谢谢!
>>
>>
>> 看文档<
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
>> >备注,批量编码默认情况下仅仅有OnCheckpointRollingPolicy,在每次checkpoint时候进行切分。如果设置checkpoint时间不合理,这样会产生蛮多小文件的。
>>
>>
>>
>>
>>
>>
>
>-- 
>Best, Jingsong Lee


Re: flink集成到cdh

2020-08-20 Thread Jingsong Li
具体什么错呢

On Tue, Aug 18, 2020 at 8:34 PM smq <374060...@qq.com> wrote:

>
> 大家好,在网上找了个制作parcel的工具,flink1.9版本打好之后可以正常通过cm安装运行,但是1.10和1.11安装之后都是启动不了,请问大家有这方面的经验可以传授下吗,感激不尽!



-- 
Best, Jingsong Lee


回复:答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-20 Thread xiao cai
Hi:
感谢答复,确实是个思路。
不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。


Best,
xiao cai


 原始邮件 
发件人: 范超
收件人: user-zh@flink.apache.org
发送时间: 2020年8月20日(周四) 09:11
主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件


我之前开启job的failover 
restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task
 executor No TaskExecutor registered under containe_. 
我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai 
[mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh 
 主题: Flink on Yarn 
启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn 
启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink 
任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
 Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO 
org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. 
2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - 
Received 1 containers with resource , 1 pending 
container requests. 2020-08-19 11:23:08,100 INFO 
org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor 
container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 with 
TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb (134217728 
bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), 
taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb 
(536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), 
jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO 
org.apache.flink.yarn.YarnResourceManager [] - Creating container launch 
context for TaskManagers 2020-08-19 11:23:08,101 INFO 
org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers 2020-08-19 
11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Removing 
container request Capability[]Priority[1]. 2020-08-19 
11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Accepted 1 
requested containers, returned 0 excess containers, 0 pending container 
requests of resource . 2020-08-19 11:23:08,102 INFO 
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Processing 
Event EventType: START_CONTAINER for Container 
container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] 
- Unhandled exception. 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
 No TaskExecutor registered under container_e07_1596440446172_0094_01_68. 
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) 
~[?:1.8.0_191] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 

Re: 【bug】flink 1.11使用连接器时schema丢失DataType指定的物理信息

2020-08-20 Thread Jingsong Li
1.11 就用新的source sink接口吧

On Wed, Aug 19, 2020 at 12:43 AM 赵 建云  wrote:

> 补充图片链接
> 创建连接器
> http://image.zhaojianyun.com/mweb/bug1.png
> TableSourceSinkFactory中的创建sink
> http://image.zhaojianyun.com/mweb/bug2.png
> TableSchema的运行时物理信息
> http://image.zhaojianyun.com/mweb/bug3.png
>
>
>
> 2020年8月18日 下午10:09,赵 建云  zhaojianyu...@outlook.com>> 写道:
>
> hello all:
> 我在为flink 1.11开发新的连接器时,发现了问题。
> 连接器的旧版本是支持flink1.9的,最近升级了flink
> 1.11后,test中,发现创建连接器需要声明schema,schema需要使用TableSchema信息,TableSchema包含的DataType,DataType指定物理类型后,在TableSourceSinkFactory中,获得的schema中,丢失了前面指定的物理类型。
> 这个问题影响了source、sink。导致了启动时,检查类型不能通过。
> 例如
> DataTypes.DATE().bridgedTo(java.sql.Date.class);中,在运行时物理类型java.sql.Date丢失了,实际使用的是java.time.LocalDate。
>
>  *
> 创建连接器
> [创建连接器]
>  *
> TableSourceSinkFactory中的创建sink
> [TableSourceSinkFactory中的创建sink]
>  *
> TableSchema的运行时物理信息
> [TableSchema的运行时物理信息]
>
> 我在flink的jira没找到提交问题的按钮,so,就把问题发在了中文组里,请大家支持下这个问题~
> 赵建云
> 2020年8月18日
>
>

-- 
Best, Jingsong Lee


Re: Flink SQL血缘关系

2020-08-20 Thread Jingsong Li
取决于你为啥要做血缘关系

On Wed, Aug 19, 2020 at 1:17 AM guaishushu1...@163.com <
guaishushu1...@163.com> wrote:

> 哪位大佬知道,如果要做Flink SQL血缘关系是在sqlNode中拿表之间关系好,还是在Transformation 算子中拿血缘关系好
>
>
>
> guaishushu1...@163.com
>


-- 
Best, Jingsong Lee


KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread shizk233
Hi all,

请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。
问题1:
如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系?
还是这两个方法是顺序执行的?

问题2:
虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的
Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?


Re: Flink StreamingFileSink滚动策略

2020-08-20 Thread Jingsong Li
只要你继承CheckpointRollingPolicy,想怎么实现shouldRollOnEvent和shouldRollOnProcessingTime都行

On Wed, Aug 19, 2020 at 6:20 PM guoliang_wang1335 
wrote:

> 请问,Flink StreamingFileSink使用批量写Hadoop SequenceFile
> format,能自定义滚动策略吗?我想指定文件大小、文件最长未更新时间和checponit来进行滚动,可以通过实现RollingPolicy接口来定制吗?谢谢!
>
>
> 看文档<
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
> >备注,批量编码默认情况下仅仅有OnCheckpointRollingPolicy,在每次checkpoint时候进行切分。如果设置checkpoint时间不合理,这样会产生蛮多小文件的。
>
>
>
>
>
>

-- 
Best, Jingsong Lee


Re: Watermark generation issues with File sources in Flink 1.11.1

2020-08-20 Thread Till Rohrmann
Hi Arti,

thanks for sharing this feedback with us. The WatermarkStrategy has been
introduced quite recently and might have some rough edges. I am pulling in
Aljoscha and Klou who have worked on this feature and might be able to help
you. For better understanding your problem, it would be great if you could
share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.

For the file source, the Flink community has recently introduced a new
source abstraction which will also support checkpoints for file sources
once the file source connector has been migrated to the new interfaces. The
community is currently working on it.

Cheers,
Till

On Wed, Aug 19, 2020 at 5:38 PM Arti Pande  wrote:

> Hi,
>
> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
> the watermark generation has issues with file source alone. It works well
> with Kafka source.
>
> With 1.9.2 a custom watermark generator implementation of
> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
> deprecated and to be replaced with WatermarkStrategy (that combines both
> WatermarkGenerator and TimestampAssigner).
>
> With Flink 1.11.1 when using Kafka source both the above options (i.e.
> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
> perfectly well but with file source none of them works. The watermark
> assigner never increments the watermarks resulting in stateful operators
> not clearing their state ever, leading to erroneous results and
> continuously increasing memory usage.
>
> Same code works well with Kafka source. Is this a known issue? If so, any
> fix planned shortly?
>
> A side note (and probably a candidate for separate email, but I will write
> it here) even checkpoints do not work with File Source since 1.9.2 and it
> is still the problem with 1.11.1. Just wondering if File source with stream
> API is not a priority in Flink development? If so we can rethink our
> sources.
>
> Thanks & regards,
> Arti
>


Re: How to access state in TimestampAssigner in Flink 1.11?

2020-08-20 Thread Till Rohrmann
Hi Theo,

thanks for reaching out to the community. I am pulling in Aljoscha and Klou
who have worked on the new WatermarkStrategy/WatermarkGenerator abstraction
and might be able to help you with your problem. At the moment, it looks to
me that there is no way to combine state with the new WatermarkGenerator
abstraction.

Cheers,
Till

On Wed, Aug 19, 2020 at 3:37 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi there,
>
> Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.
>
> In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which
> also extended AbstractRichFunction and could thus utilize State and
> getRuntimeContext() in there. This worked as the
> TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed
> my assigner in as the userFunction to that operator.
>
> I used this feature for some "per partition processing" which Flinks
> somehow isn't ideally suited for at the moment I guess. We have ascending
> watermarks per kafka partition and do some processing on that. In order to
> maintain state per kafka-partition, I now keyby kafkapartition in our
> stream (not ideal but better than operatorstate in terms of rescaling) but
> afterwards need to emulate the watermark strategy from the initial kafka
> source, i.e. reassign watermarks the same way as the kafka source did (per
> kafka partition within the operator). Via getRuntimeContext() I am/was able
> to identify the kafkaPartitions one operatorinstance was responsible for
> and could produce the outputwatermark accordingly. (min over all
> responsible partitions).
>
> In Flink 1.11, how can I rebuild this behavior? Do I really need to build
> my own TimestampsAndWatermarksOperator which works like the old one? Or is
> there a better approach?
>
> Best regards
> Theo
>


Re: Orc文件问题请教

2020-08-20 Thread Jingsong Li
你可以贴下异常栈,
估计是ORC版本问题,如果你用file system的orc writer,那是比较新的版本。
建议你用下Hive的表来写,这样你可以选版本。

Best,
Jingsong

On Thu, Aug 20, 2020 at 12:10 PM  wrote:

> 使用flink sql写到orc文件,以后,flink能读取出来,但是spark和hive均不能读取出来,impala能读取。
>
> 发自我的iPhone



-- 
Best, Jingsong Lee