Re: Re: Flink1.10 sink to mysql SocketException

2020-10-09 Thread sh_0...@126.com
感谢~



sh_0...@126.com
 
From: WeiXubin
Date: 2020-10-10 11:45
To: user-zh
Subject: Re: Flink1.10 sink to mysql SocketException
Hi,
你可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681,似乎与你问题相关。
由于长时间没有数据导致 connection 断开问题,该问题已经在1.11版本修复。
 
Best,
Weixubin
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? how to simulate the scene "back pressure" in flink?Thanks~!

2020-10-09 Thread ??????
Thanks for both of your help...
but...


I can not understand both:

DearDavid Anderson:
Is the whole command like this?
flink run--backpressure-c wordcount_increstate 
datastream_api-1.0-SNAPSHOT.jar



Dear Arvid Heise:
Forconf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
will this settings work to sleep when the output stream is generating?



apologise for my poor basic knowledge of flink~
Thanks for both of your help~






----
??: 
   "David Anderson" 
   
https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/flink-operations-playground.html#variants






On Fri, Oct 9, 2020 at 10:02 AM Arvid Heise http://localhost:8081, but you can really choose any 
other free port.



[1] 
https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L198
On Fri, Oct 9, 2020 at 9:24 AM ?? https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L536


On Tue, Oct 6, 2020 at 4:13 PM ?? 

?????? Flink 1.10.1 checkpoint????????

2020-10-09 Thread ??????
jdk8jdk261??




----
??: 
   "user-zh"

http://apache-flink.147419.n8.nabble.com/

Re: 动态加载table和udf的方法

2020-10-09 Thread Jeff Zhang
zeppelin 支持直接写udf,参考 https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2
或者加入钉钉群讨论:32803524

Zeahoo Z  于2020年10月10日周六 上午9:04写道:

> 你好,在开发中遇到了下面这个困难。
>
> 目前将定义的table和udf函数写在了 conf/sql-client-defaults.yaml
>
> 文件中。程序运行没有问题,但是带来一个问题:如果我需要添加或者修改table或者udf文件的时候,需要重启flink程序。有没有办法可以让我动态地添加或者更新我的table和udf(table新增可以通过sql-client来添加,但是更倾向于将table的定义记录在文件,udf存在修改和新增的情况)。这样依赖可以保证flink不重启。
>


-- 
Best Regards

Jeff Zhang


Re: Flink1.10 sink to mysql SocketException

2020-10-09 Thread WeiXubin
Hi,
你可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681,似乎与你问题相关。
由于长时间没有数据导致 connection 断开问题,该问题已经在1.11版本修复。

Best,
Weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11.2基于官网在k8s上部署是正常的,但是加了volume配置之后报错Read-only file system

2020-10-09 Thread cxydeve...@163.com
问题已经解决,挂载的lib目录是之前1.10版本的jar包,然后简单把flink1.10的一些jar包换成1.11版本, 结果没有替换干净,
所以导致启动不了. 把挂载目录里面全部删除,换成标准的lib, 再添加自己需要的jar就可以正常启动.



--
Sent from: http://apache-flink.147419.n8.nabble.com/

checkpoint fail

2020-10-09 Thread Song Wu
Summary
I'm hitting an error when running a  job, it happens several times, and I dont 
know why. 

Any help would be appreciated.  Thanks!
  
Details

flink version: 1.4.2-1700

java.lang.Exception: Could not complete snapshot 158 for operator 
asyncio_by_transform -> flatmap_by_action_list_flat -> 
order_source_kafka_sink-preprocessing (3/10).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:370)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1285)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1223)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:707)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:622)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:575)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:217)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$EntryIterator.next(HashMap.java:1463)
at java.util.HashMap$EntryIterator.next(HashMap.java:1461)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:198)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:107)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:48)
at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.(DefaultOperatorStateBackend.java:453)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:465)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:355)





Re:回复: Re: flink savepoint和checkpoint相关事项

2020-10-09 Thread izual






我理解是一样的,关于两者的不同点在这里[1]有介绍。
恢复方法是启动任务时 -s 指定从哪个路径恢复,例如 -s
file:///tmp/test/db262ffab6b00db9820c54f25a3f956f/chk-61


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint

在 2020-10-10 08:43:38,"zjfpla...@hotmail.com"  写道:
>请问savepoint也是一样吗?
>
>
>
>zjfpla...@hotmail.com
> 
>发件人: Yun Tang
>发送时间: 2020-10-10 01:35
>收件人: user-zh
>主题: Re: flink savepoint和checkpoint相关事项
>Hi
> 
>在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
>state [2] 恢复
> 
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state
> 
>祝好
>唐云
>
>From: zjfpla...@hotmail.com 
>Sent: Friday, October 9, 2020 8:59
>To: user-zh 
>Subject: flink savepoint和checkpoint相关事项
> 
>Hi,
>flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了
> 
> 
> 
>zjfpla...@hotmail.com


Flink1.10 sink to mysql SocketException

2020-10-09 Thread sh_0...@126.com
各位好:
请教一个问题
场景:
1.实时监控业务流向Kafka的binlog数据,使用Flink进行解析出来同步到对应的MySQL环境
2.业务数据并不是一直都有的,可能N天才有一条,使用Flink就是想第一时间同步到MySQL环境,便于后续使用,降低业务影响
3.Flink使用的版本为:1.10.0

问题:
1.由于业务binlog数据不定时出现,导致Flink sink to MySQL的链接N天就会出现问题,报下面的问题(已使用MySQL 
pool)
2.错误信息如下:
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: 
The last packet successfully received from the server was 
   159,043,262 milliseconds ago.  The last packet sent successfully to 
the server was 159,043,262 milliseconds ago. is longer than the server  
configured value of 'wait_timeout'. You should consider either expiring 
and/or testing connection validity before use in your application,  
increasing the server configured values for client timeouts, or using 
the Connector/J connection property 'autoReconnect=true' to avoid this  
problem.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:988)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3739)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2508)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)
at 
com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2073)
at 
com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2009)
at 
com.mysql.jdbc.PreparedStatement.executeLargeUpdate(PreparedStatement.java:5098)
at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:1994)
at 
org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)
at 
org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)
at com.jiuye.mdh.fla.MySQLSinkFunc.invoke(MySQLSinkFunc.java:34)
at com.jiuye.mdh.fla.MySQLSinkFunc.invoke(MySQLSinkFunc.java:17)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at SinkConversion$31.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 

Re: Flink的table-api不支持.

2020-10-09 Thread Benchao Li
user是关键字,需要用`user`来处理一下~

Kyle Zhang  于2020年10月9日周五 上午8:34写道:

> 试一试select *  from  OrderA orderA join OrderB orderB on
> orderA.user=orderB.user
>
> On Sun, Oct 4, 2020 at 5:09 PM 忝忝向仧 <153488...@qq.com> wrote:
>
> > Hi,all:
> >
> >
> > Table api的sql查询里面join的时候不能写"."么?
> > 这样写就会报错 如下
> > Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> > SQL parse failed. Encountered "." at line 1, column 36.
> > Was expecting one of:
> >>   "EXCEPT" ...
> >   "FETCH" ...
> >   "FROM" ...
> >   "INTERSECT" ...
> >   "LIMIT" ...
> >   "OFFSET" ...
> >   "ORDER" ...
> >   "MINUS" ...
> >   "UNION" ...
> >   "," ...
> >
> >
> >
> > Table result = tEnv.sqlQuery("select *  from  OrderA join OrderB on
> > OrderA.user=OrderB.user");
>


-- 

Best,
Benchao Li


FlinkJobNotFoundException????

2020-10-09 Thread ??????
mysql??MySQLlinux??
py4j.protocol.Py4JJavaError: An error occurred while calling o6.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (4d6b1273229e0e16fa433c652b5cb74d)





??


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
batch_source_ddl = """
CREATE TABLE mh_source_tab (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
ts BIGINT,
rt Decimal(6,2),
time1 VARCHAR 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**', 
'table-name' = 'nj_mh_test', --??
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'


)


warn_alarm_mh_ddl = """
CREATE TABLE warn_alarm_mh_sink (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
extremum Decimal(6,2),
PRIMARY KEY (lid,dir,posid,poleId) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**',
'table-name' = 'warn_mh_alarm_result', --??
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(batch_source_ddl )
t_env.execute_sql(warn_alarm_mh_ddl)


def threshold_alarm(delta_thres):
  source = t_env.from_path("mh_source_tab") \
.where("rt < -1")\
.group_by("lid, dir, posid, km, poleId")\
.select("lid, dir, posid, km, poleId, max(rt) as 
max_rt, min(rt) as min_rt, max(rt)-min(rt) as extremum")\
.where("extremum "+str(delta_thres))\
.select("lid, dir, posid, km, poleId, extremum")
  source.execute_insert("warn_alarm_mh_sink") \
 .get_job_client() \
 .get_job_execution_result() \
 .result()
if __name__ == '__main__':
  threshold_alarm(delta_thres=0.5)

动态加载table和udf的方法

2020-10-09 Thread Zeahoo Z
你好,在开发中遇到了下面这个困难。

目前将定义的table和udf函数写在了 conf/sql-client-defaults.yaml
文件中。程序运行没有问题,但是带来一个问题:如果我需要添加或者修改table或者udf文件的时候,需要重启flink程序。有没有办法可以让我动态地添加或者更新我的table和udf(table新增可以通过sql-client来添加,但是更倾向于将table的定义记录在文件,udf存在修改和新增的情况)。这样依赖可以保证flink不重启。


Re: Flink 1.10.1 checkpoint失败问题

2020-10-09 Thread Storm☀️
尝试了将jdk升级到了261,报错依然还有。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: Re: flink savepoint和checkpoint相关事项

2020-10-09 Thread zjfpla...@hotmail.com
请问savepoint也是一样吗?



zjfpla...@hotmail.com
 
发件人: Yun Tang
发送时间: 2020-10-10 01:35
收件人: user-zh
主题: Re: flink savepoint和checkpoint相关事项
Hi
 
在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
state [2] 恢复
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state
 
祝好
唐云

From: zjfpla...@hotmail.com 
Sent: Friday, October 9, 2020 8:59
To: user-zh 
Subject: flink savepoint和checkpoint相关事项
 
Hi,
flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了
 
 
 
zjfpla...@hotmail.com


Re: Any testing issues when using StreamTableEnvironment.createTemporaryView?

2020-10-09 Thread Dan Hill
I figured out my issue.  I needed to assign watermarks (e.g.
assignTimestampsAndWatermarks) after the fromElements.  I could not figure
out how the auto-generated code worked.  I hooked up a debugger and guessed
at the issue.

On Thu, Oct 8, 2020 at 11:09 PM Dan Hill  wrote:

> *Summary*
> I'm hitting an error when running a test that is related to using
> createTemporaryView to convert a Protobuf input stream to Flink Table API.
> I'm not sure how to debug "SourceConversion$5.processElement(Unknown
> Source)" line.  Is this generated code?  How can I debug this?
>
> Any help would be appreciated.  Thanks! - Dan
>
> *Details*
> My current input is a protocol buffer stream.  I convert it to the Table
> API spec using createTemporaryView.  The code is hacky.  I want to get some
> tests implemented before cleaning it up.
>
> KeyedStream batchLogStream =
> env.fromElements(BatchLog.class, new
> LogGenerator.BatchLogIterator().next())
> .keyBy((logRequest) -> logRequest.getUserId());
>
> tableEnv.createTemporaryView(
> "input_user",
> batchLogStream.flatMap(new ToUsers()),
> $("userId"),
> $("timeEpochMillis"),
> $("userTime").rowtime());
>
> This appears to work in my prototype (maybe serialization is broken).  In
> a Flink test, I hit the following error.
>
> org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map ->
> SourceConversion(table=[default.mydb.input_user], fields=[userId,
> timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) ->
> StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from
> RUNNING to FAILED. java.lang.NullPointerException
> at SourceConversion$5.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18)
> at
> ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.base/java.lang.Thread.run(Thread.java:834)
>
>
> I wasn't able to find this exact stacktrace when looking on Google.
>


Re: NPE when checkpointing

2020-10-09 Thread Piotr Nowojski
Hi Binh,

Could you try upgrading Flink's Java runtime? It was previously reported
that upgrading to jdk1.8.0_251 was solving the problem.

Piotrek

pt., 9 paź 2020 o 19:41 Binh Nguyen Van  napisał(a):

> Hi,
>
> Thank you for helping me!
> The code is compiled on
>
> java version "1.8.0_161"
> Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
>
> But I just checked our Hadoop and its Java version is
>
> java version "1.8.0_77"
> Java(TM) SE Runtime Environment (build 1.8.0_77-b03)
> Java HotSpot(TM) 64-Bit Server VM (build 25.77-b03, mixed mode)
>
> Thanks
> -Binh
>
> On Fri, Oct 9, 2020 at 10:23 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> One more thing. It looks like it's not a Flink issue, but some JDK bug.
>> Others reported that upgrading JDK version (for example to  jdk1.8.0_251)
>> seemed to be solving this problem. What JDK version are you using?
>>
>> Piotrek
>>
>> pt., 9 paź 2020 o 17:59 Piotr Nowojski  napisał(a):
>>
>>> Hi,
>>>
>>> Thanks for reporting the problem. I think this is a known issue [1] on
>>> which we are working to fix.
>>>
>>> Piotrek
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-18196
>>>
>>> pon., 5 paź 2020 o 08:54 Binh Nguyen Van 
>>> napisał(a):
>>>
 Hi,

 I have a streaming job that is written in Apache Beam and uses Flink as
 its runner. The job is working as expected for about 15 hours and then it
 started to have checkpointing error. The error message looks like this

 java.lang.Exception: Could not perform checkpoint 910 for operator Source: 
  (8/60).
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
 at 
 org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
 at 
 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
 at 
 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.NullPointerException
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1394)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776)
 ... 11 more

 When this happened, I have to stop the job and then start it again, and
 then 15 hours later the issue happens again.

 Here are some additional information

- Flink version is 1.10.1
- Job reads data from Kafka, transform, and then writes to Kafka
- There are 6 tasks with the parallelism of 60 each (each task
reads from 1 Kafka topic)
- The job is deployed to run on YARN with 60 task managers and each
task manager has 1 slot
- The State backend is filesystem and HDFS is the storage (Doesn’t
seem to related to the type of state backend since the issue also 
 happened
when I use memory as the state backend)
- The checkpointing interval is 60 seconds (The longest duration of
the normal checkpoint as shown in Flink UI is 14 seconds)
- The minimum pause between checkpoints is 30 seconds
- Hadoop cluster is Kerberized but Kafka is not. Keytab and
principal are set in the Flink configuration file

 Can someone please help?

 Thanks
 -Binh

>>>


Re: NPE when checkpointing

2020-10-09 Thread Binh Nguyen Van
Hi,

Thank you for helping me!
The code is compiled on

java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)

But I just checked our Hadoop and its Java version is

java version "1.8.0_77"
Java(TM) SE Runtime Environment (build 1.8.0_77-b03)
Java HotSpot(TM) 64-Bit Server VM (build 25.77-b03, mixed mode)

Thanks
-Binh

On Fri, Oct 9, 2020 at 10:23 AM Piotr Nowojski  wrote:

> Hi,
>
> One more thing. It looks like it's not a Flink issue, but some JDK bug.
> Others reported that upgrading JDK version (for example to  jdk1.8.0_251)
> seemed to be solving this problem. What JDK version are you using?
>
> Piotrek
>
> pt., 9 paź 2020 o 17:59 Piotr Nowojski  napisał(a):
>
>> Hi,
>>
>> Thanks for reporting the problem. I think this is a known issue [1] on
>> which we are working to fix.
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18196
>>
>> pon., 5 paź 2020 o 08:54 Binh Nguyen Van  napisał(a):
>>
>>> Hi,
>>>
>>> I have a streaming job that is written in Apache Beam and uses Flink as
>>> its runner. The job is working as expected for about 15 hours and then it
>>> started to have checkpointing error. The error message looks like this
>>>
>>> java.lang.Exception: Could not perform checkpoint 910 for operator Source: 
>>>  (8/60).
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.NullPointerException
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1394)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776)
>>> ... 11 more
>>>
>>> When this happened, I have to stop the job and then start it again, and
>>> then 15 hours later the issue happens again.
>>>
>>> Here are some additional information
>>>
>>>- Flink version is 1.10.1
>>>- Job reads data from Kafka, transform, and then writes to Kafka
>>>- There are 6 tasks with the parallelism of 60 each (each task reads
>>>from 1 Kafka topic)
>>>- The job is deployed to run on YARN with 60 task managers and each
>>>task manager has 1 slot
>>>- The State backend is filesystem and HDFS is the storage (Doesn’t
>>>seem to related to the type of state backend since the issue also 
>>> happened
>>>when I use memory as the state backend)
>>>- The checkpointing interval is 60 seconds (The longest duration of
>>>the normal checkpoint as shown in Flink UI is 14 seconds)
>>>- The minimum pause between checkpoints is 30 seconds
>>>- Hadoop cluster is Kerberized but Kafka is not. Keytab and
>>>principal are set in the Flink configuration file
>>>
>>> Can someone please help?
>>>
>>> Thanks
>>> -Binh
>>>
>>


Re: flink savepoint和checkpoint相关事项

2020-10-09 Thread Yun Tang
Hi

在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
state [2] 恢复

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state

祝好
唐云

From: zjfpla...@hotmail.com 
Sent: Friday, October 9, 2020 8:59
To: user-zh 
Subject: flink savepoint和checkpoint相关事项

Hi,
flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了



zjfpla...@hotmail.com


Re: NPE when checkpointing

2020-10-09 Thread Piotr Nowojski
Hi,

One more thing. It looks like it's not a Flink issue, but some JDK bug.
Others reported that upgrading JDK version (for example to  jdk1.8.0_251)
seemed to be solving this problem. What JDK version are you using?

Piotrek

pt., 9 paź 2020 o 17:59 Piotr Nowojski  napisał(a):

> Hi,
>
> Thanks for reporting the problem. I think this is a known issue [1] on
> which we are working to fix.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18196
>
> pon., 5 paź 2020 o 08:54 Binh Nguyen Van  napisał(a):
>
>> Hi,
>>
>> I have a streaming job that is written in Apache Beam and uses Flink as
>> its runner. The job is working as expected for about 15 hours and then it
>> started to have checkpointing error. The error message looks like this
>>
>> java.lang.Exception: Could not perform checkpoint 910 for operator Source: 
>>  (8/60).
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>> at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>> at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>> at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1394)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776)
>> ... 11 more
>>
>> When this happened, I have to stop the job and then start it again, and
>> then 15 hours later the issue happens again.
>>
>> Here are some additional information
>>
>>- Flink version is 1.10.1
>>- Job reads data from Kafka, transform, and then writes to Kafka
>>- There are 6 tasks with the parallelism of 60 each (each task reads
>>from 1 Kafka topic)
>>- The job is deployed to run on YARN with 60 task managers and each
>>task manager has 1 slot
>>- The State backend is filesystem and HDFS is the storage (Doesn’t
>>seem to related to the type of state backend since the issue also happened
>>when I use memory as the state backend)
>>- The checkpointing interval is 60 seconds (The longest duration of
>>the normal checkpoint as shown in Flink UI is 14 seconds)
>>- The minimum pause between checkpoints is 30 seconds
>>- Hadoop cluster is Kerberized but Kafka is not. Keytab and principal
>>are set in the Flink configuration file
>>
>> Can someone please help?
>>
>> Thanks
>> -Binh
>>
>


[PyFlink] register udf functions with different versions of the same library in the same job

2020-10-09 Thread Sharipov, Rinat
Hi mates !

I've just read an amazing article

about PyFlink and I'm absolutely delighted.
I got some questions about udf registration, and it seems that it's
possible to specify the list of libraries that should be used to evaluate
udf functions.

As far as I understand, each udf function is a separate process, that is
managed by Beam (but I'm not sure I got it right).
Does it mean that I can register multiple udf functions with different
versions of the same library or what would be even better with different
python environments and they won't clash ?

A few words about the task that I'm trying to solve: I would like to build
a recommendation pipeline that will accumulate features as a table and make
recommendations using models from Ml flow registry. Since I don't want to
limit data analysts from usage in all libraries that they won't, the best
solution
for me - assemble the environment using conda descriptor and register a UDF
function.

Kubernetes and Kubeflow are not an option for us yet, so we are trying to
include models into existing pipelines.

thx !


Re: Network issue leading to "No pooled slot available"

2020-10-09 Thread Dan Diephouse
Quick update: it appears to work outside my test case too. I have not
encountered this issue post update at all.

On Thu, Oct 8, 2020 at 11:15 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Thanks for checking this workaround!
>
> I've created a jira issue [1] to check if AWS SDK version can be upgraded
> in Flink distribution.
>
> Regards,
> Roman
>
>
> On Fri, Oct 9, 2020 at 12:54 AM Dan Diephouse  wrote:
>
>> Well, I just dropped in the latest Amazon 1.11.878 SDK and now it
>> appears to respect interrupts in a test case I created. (the test fails
>> with the SDK that is in use by Flink)
>>
>> I will try it in a full fledged Flink environment and report back.
>>
>> On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse  wrote:
>>
>>> Did some digging... definitely appears that the Amazon SDK definitely is
>>> not picking up the interrupt.  I will try playing with the connection
>>> timeout. Hadoop defaults it to 20 ms, which may be part of the problem.
>>> Anyone have any other ideas?
>>>
>>> In theory this should be fixed by SDK v2 which uses NIO, but I don't
>>> think I'm up for all the changes that would involve in the downstream
>>> components.
>>>
>>> On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse  wrote:
>>>
 Using the latest - 1.11.2.

 I would assume the interruption is being ignored in the Hadoop / S3
 layer. I was looking at the defaults and (if I understood correctly) the
 client will retry 20 times. Which would explain why it never gets
 cancelled...

 On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
 khachatryan.ro...@gmail.com> wrote:

> Hi Dan Diephouse,
>
> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
> where 2 is a bug.
> It's unclear though where the interruption is ignored (Flink/Hadoop
> FS/S3 client).
>
> What version of Flink are you using?
>
> Regards,
> Roman
>
>
> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse 
> wrote:
>
>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>> If/when the network connection has issues, it seems to put Flink into an
>> irrecoverable state. Am I understanding this correctly? Any suggestions 
>> on
>> how to troubleshoot / fix?
>>
>> Here is what I'm observing:
>>
>> *1. Network is dropped *
>>
>> *2. S3 connections do not exit gracefully*
>>
>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' 
>> did
>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>> java.base@14.0.2
>> /java.net.Socket$SocketInputStream.read(Socket.java:982)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>> java.base@14.0.2
>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>
>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>
>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>
>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>
>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>
>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>> java.base@14.0.2
>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)

[Announce] Flink Forward Global Is Just Around the Corner

2020-10-09 Thread Seth Wiesman
Hi Everyone,

Flink Forward Global is just around the corner - October 19th to the 22nd
-  and our amazing line up of keynotes has been announced.


   - Peter Chalif from Citi will explore how Apache Flink has been
   fundamentally changing Big Data in the financial services industry
   - Ricky Saltzer from Epic Games will share exactly how Flink helps his
   team solve one the data engineering problems an online video game faces at
   Fornite scale
   - Renu Jiang from LinkedIn will explain how their team is handling
   extremely large stateful jobs, simplifying the APIs with SQL, and exploring
   new directions of unifying batch and stream processing
   - Xiaowei Jiang from Alibaba will explore how the combination of Hybrid
   Serving / Analytical Processing and Flink can support various use cases
   such as real-time data warehouse, real-time machine learning and more
   - One final Keynote for Ververica will be announced next week.

As a reminder, the event is virtual and free to attend[1]. There are also a
limited number of paid training slots available. Looking forward to seeing
everyone virtually soon!

https://www.flink-forward.org/

Seth Wiesman
- Committer Apache Flink
- Program Chair Flink Forward Virtual 2020


Re: NPE when checkpointing

2020-10-09 Thread Piotr Nowojski
Hi,

Thanks for reporting the problem. I think this is a known issue [1] on
which we are working to fix.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18196

pon., 5 paź 2020 o 08:54 Binh Nguyen Van  napisał(a):

> Hi,
>
> I have a streaming job that is written in Apache Beam and uses Flink as
> its runner. The job is working as expected for about 15 hours and then it
> started to have checkpointing error. The error message looks like this
>
> java.lang.Exception: Could not perform checkpoint 910 for operator Source: 
>  (8/60).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1394)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776)
> ... 11 more
>
> When this happened, I have to stop the job and then start it again, and
> then 15 hours later the issue happens again.
>
> Here are some additional information
>
>- Flink version is 1.10.1
>- Job reads data from Kafka, transform, and then writes to Kafka
>- There are 6 tasks with the parallelism of 60 each (each task reads
>from 1 Kafka topic)
>- The job is deployed to run on YARN with 60 task managers and each
>task manager has 1 slot
>- The State backend is filesystem and HDFS is the storage (Doesn’t
>seem to related to the type of state backend since the issue also happened
>when I use memory as the state backend)
>- The checkpointing interval is 60 seconds (The longest duration of
>the normal checkpoint as shown in Flink UI is 14 seconds)
>- The minimum pause between checkpoints is 30 seconds
>- Hadoop cluster is Kerberized but Kafka is not. Keytab and principal
>are set in the Flink configuration file
>
> Can someone please help?
>
> Thanks
> -Binh
>


Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-09 Thread Austin Cawley-Edwards
Hey Timo,

Hah, that's a fair point about using time. I guess I should update my
statement to "as a user, I don't want to worry about *manually managing*
time".

That's a nice suggestion with the KeyedProcessFunction and no windows, I'll
give that a shot. If I don't want to emit any duplicates, I'd have to
essentially buffer the "last seen duplicate" for each key in that process
function until the MAX_WATERMARK is sent through though, right? I could
emit early results if I assume the max number of possible duplicates, but
for records with no duplicates, I'd have to wait until no more records are
coming -- am I missing something?

Thanks so much,
Austin

On Fri, Oct 9, 2020 at 10:44 AM Timo Walther  wrote:

> Hi Austin,
>
> if you don't want to worry about time at all, you should probably not
> use any windows because those are a time-based operation.
>
> A solution that would look a bit nicer could be to use a pure
> KeyedProcessFunction and implement the deduplication logic without
> reusing windows. In ProcessFunctions you can register an event-time
> timer. The timer would be triggered by the MAX_WATERMARK when the
> pipeline shuts down even without having a timestamp assigned in the
> StreamRecord. Watermark will leave SQL also without a time attribute as
> far as I know.
>
> Regards,
> Timo
>
>
> On 08.10.20 17:38, Austin Cawley-Edwards wrote:
> > Hey Timo,
> >
> > Sorry for the delayed reply. I'm using the Blink planner and using
> > non-time-based joins. I've got an example repo here that shows my query/
> > setup [1]. It's got the manual timestamp assignment commented out for
> > now, but that does indeed solve the issue.
> >
> > I'd really like to not worry about time at all in this job hah -- I
> > started just using processing time, but Till pointed out that processing
> > time timers won't be fired when input ends, which is the case for this
> > streaming job processing CSV files, so I should be using event time.
> > With that suggestion, I switched to ingestion time, where I then
> > discovered the issue converting from SQL to data stream.
> >
> > IMO, as a user manually assigning timestamps on conversion makes sense
> > if you're using event time and already handling time attributes
> > yourself, but for ingestion time you really don't want to think about
> > time at all, which is why it might make sense to propigate the
> > automatically assigned timestamps in that case. Though not sure how
> > difficult that would be. Let me know what you think!
> >
> >
> > Best + thanks again,
> > Austin
> >
> > [1]: https://github.com/austince/flink-1.10-sql-windowing-error
> >
> > On Mon, Oct 5, 2020 at 4:24 AM Timo Walther  > > wrote:
> >
> > Btw which planner are you using?
> >
> > Regards,
> > Timo
> >
> > On 05.10.20 10:23, Timo Walther wrote:
> >  > Hi Austin,
> >  >
> >  > could you share some details of your SQL query with us? The
> > reason why
> >  > I'm asking is because I guess that the rowtime field is not
> inserted
> >  > into the `StreamRecord` of DataStream API. The rowtime field is
> only
> >  > inserted if there is a single field in the output of the query
> > that is a
> >  > valid "time attribute".
> >  >
> >  > Esp. after non-time-based joins and aggregations, time attributes
> > loose
> >  > there properties and become regular timestamps. Because timestamp
> > and
> >  > watermarks might have diverged.
> >  >
> >  > If you know what you're doing, you can also assign the timestamp
> >  > manually after `toRetractStream.assignTimestampAndWatermarks` and
> >  > reinsert the field into the stream record. But before you do
> that, I
> >  > think it is better to share more information about the query with
> us.
> >  >
> >  > I hope this helps.
> >  >
> >  > Regards,
> >  > Timo
> >  >
> >  >
> >  >
> >  > On 05.10.20 09:25, Till Rohrmann wrote:
> >  >> Hi Austin,
> >  >>
> >  >> thanks for offering to help. First I would suggest asking Timo
> > whether
> >  >> this is an aspect which is still missing or whether we
> > overlooked it.
> >  >> Based on that we can then take the next steps.
> >  >>
> >  >> Cheers,
> >  >> Till
> >  >>
> >  >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
> >  >> mailto:austin.caw...@gmail.com>
> > >>
> > wrote:
> >  >>
> >  >> Hey Till,
> >  >>
> >  >> Thanks for the notes. Yeah, the docs don't mention anything
> > specific
> >  >> to this case, not sure if it's an uncommon one. Assigning
> > timestamps
> >  >> on conversion does solve the issue. I'm happy to take a stab
> at
> >  >> implementing the feature if it is indeed missing and you all
> > think
> >  >> it'd be worthwhile. I think it's definitely a 

Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-09 Thread Jark Wu
Thanks for the explanation. I will review the pull request. Let's move the
discussion to the PR.

Best,
Jark

On Fri, 9 Oct 2020 at 21:06, Dylan Forciea  wrote:

> Jark,
>
>
>
> Thank you! I had actually mistyped the JIRA issue; autoCommit needs to be
> set to false for streaming to work. The default on the driver is true when
> the option isn’t specified. I’ve updated the issue accordingly.
>
>
>
> Setting this to false automatically on the read path would fix my issue.
> However, I’m only certain that this is proper for Postgres. I’m not sure if
> this should be done for other drivers, although my gut would say it should
> be fine if it’s only done for reading. My patch as it is will set the
> builder to not specify whether to set autoCommit if the option is not
> specified, which means it would then be left at the default of true. That
> would conflict with the 1.11 patch you suggested. Let me know if you think
> I should make the default true in the SQL API.
>
>
>
> https://github.com/apache/flink/pull/13570
>
>
>
> Regards,
>
> Dylan
>
>
>
> *From: *Jark Wu 
> *Date: *Thursday, October 8, 2020 at 10:15 PM
> *To: *Dylan Forciea 
> *Cc: *Till Rohrmann , dev ,
> Shengkai Fang , "user@flink.apache.org" <
> user@flink.apache.org>, Leonard Xu 
> *Subject: *Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
> Hi Dylan,
>
>
>
> Sorry for the late reply. We've just come back from a long holiday.
>
>
>
> Thanks for reporting this problem. First, I think this is a bug that
> `autoCommit` is false by default (JdbcRowDataInputFormat.Builder).
>
> We can fix the default to true in 1.11 series, and I think this can solve
> your problem in a short term?
>
> Besides, we should expose the connector options to set auto commit and
> this can be another issue to be implemented in master.
>
> I'm glad to review the code.
>
>
>
> What do you think?
>
>
>
> Regarding to the failed JMXReporterFactoryTest, I think this is a known
> issue, see FLINK-19539 [1]
>
>
>
> Best,
>
> Jark
>
>
>
> [1]: https://issues.apache.org/jira/browse/FLINK-19539
>
>
>
> On Fri, 9 Oct 2020 at 01:29, Dylan Forciea  wrote:
>
> I’ve updated the unit tests and documentation, and I was running the azure
> test pipeline as described in the instructions. However, it appears that
> what seems to be an unrelated test for the JMX code failed. Is this a
> matter of me not setting things up correctly? I wanted to ensure everything
> looked good before I submitted the PR.
>
>
>
> [ERROR] Failures:
>
> [ERROR]   JMXReporterFactoryTest.testPortRangeArgument:46
>
> Expected: (a value equal to or greater than <9000> and a value less than
> or equal to <9010>)
>
>  but: a value less than or equal to <9010> <9040> was greater than
> <9010>
>
> [ERROR]   JMXReporterFactoryTest.testWithoutArgument:60
>
> [INFO]
>
> [ERROR] Tests run: 10, Failures: 2, Errors: 0, Skipped: 0
>
>
>
> Thanks,
>
> Dylan Forciea
>
>
>
> *From: *Till Rohrmann 
> *Date: *Thursday, October 8, 2020 at 2:29 AM
> *To: *Dylan Forciea 
> *Cc: *dev , Shengkai Fang , "
> user@flink.apache.org" , "j...@apache.org" <
> j...@apache.org>, Leonard Xu 
> *Subject: *Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
> This sounds good. Maybe there are others in the community who can help
> with the review before the Jark and Leonard are back.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Oct 7, 2020 at 7:33 PM Dylan Forciea  wrote:
>
> Actually…. It looks like what I did covers both cases. I’ll see about
> getting some unit tests and documentation updated.
>
>
>
> Dylan
>
>
>
> *From: *Dylan Forciea 
> *Date: *Wednesday, October 7, 2020 at 11:47 AM
> *To: *Till Rohrmann , dev 
> *Cc: *Shengkai Fang , "user@flink.apache.org" <
> user@flink.apache.org>, "j...@apache.org" , Leonard Xu <
> xbjt...@gmail.com>
> *Subject: *Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
> Ok, I have created FLINK-19522 describing the issue. I have the code I
> made so far checked in at
> https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but
> this only fixes the SQL API. It sounds like there may be another change
> needed for the Table API… I’ll look into that and see if I can figure it
> out on my own while they’re out. I will also need to add some unit tests
> and update some documentation to get this ready for a PR.
>
>
>
> Thanks,
>
> Dylan
>
>
>
> *From: *Till Rohrmann 
> *Date: *Wednesday, October 7, 2020 at 10:55 AM
> *To: *dev 
> *Cc: *Shengkai Fang , "user@flink.apache.org" <
> user@flink.apache.org>, "j...@apache.org" , Leonard Xu <
> xbjt...@gmail.com>
> *Subject: *Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
> Hi Dylan,
>
>
>
> thanks for reaching out to the Flink community and excuse our late
> response. I am not an expert for the Table API and its JDBC connector but
> what you describe sounds like a missing feature. Also given that
> FLINK-12198 enabled this feature for the JDBCInputFormat indicates that we
> might simply need to 

Re: Best way to test Table API and SQL

2020-10-09 Thread Timo Walther

Hi Rex,

let me copy paste my answer from a similar thread 2 months ago:

Hi,

this might be helpful as well:

https://lists.apache.org/thread.html/rfe3b45a10fc58cf19d2f71c6841515eb7175ba731d5055b06f236b3f%40%3Cuser.flink.apache.org%3E

First of all, it is important to know if you are interested in 
end-to-end tests (incl. connectors) or excluding connectors. If you just 
like to test your operators, you can use a lot of the testing 
infrastructure of Flink.


If your are NOT using event-time, you can simply use 
`TableEnvironment.fromValues()` and `Table.execute().collect()`. This 
test uses it for example [1] (it is one of the newer test generations).


Otherwise you can use or implement your own testing connectors, like in 
org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase#testStructuredScalarFunction 
[2].


I hope this helps.

Regards,
Timo

[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/expressions/MathFunctionsITCase.java


[2] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java#L700



Let me know if you need more information.

Regards,
Timo

On 09.10.20 07:39, Rex Fenley wrote:

Hello

I'd like to write a unit test for my Flink Job. It consists mostly of 
the Table API and SQL using a StreamExecutionEnvironment with the blink 
planner, from source to sink.

What's the best approach for testing Table API/SQL?

I read 
https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html 
however that seems to cover more for specialized functions with 
DataStreams vs entire Table API constructs. What I think I'd like is to 
be able to have some stubbed input sources and mocked out sinks which I 
use to test against my Tables.


Does this seem reasonable?

I did find TestStreamEnvironment and maybe that would be useful at least 
for running the tests locally it seems? 
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/TestStreamEnvironment.html


Any help appreciated. Thanks!

--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 







Re: sql/table configuration naming guide/style/spec

2020-10-09 Thread Timo Walther

Hi Luan,

we haven't updated all config parameters to string-based options. This 
is still on going. The idle state retention will be configurable in 1.12:


https://issues.apache.org/jira/browse/FLINK-18555

I hope this helps.

Regards,
Timo


On 09.10.20 15:33, Luan Cooper wrote:

Hi

I've read Sql Configuration guide 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#execution-options

*which is very helpful to write sql like hive*

*example on our flink sql ide: *
set table.exec.async-lookup.timeout = 3s;
insert into xxx
select *
from xxx

*what if I want to add new configuration for idleState retention time*
java style:
tableConfig.setIdleStateRetentionTime(Time.seconds(minSeconds), 
Time.seconds(maxSeconds));


*sql style we're need to support:*
set sqlstate.idle.time.retention.mintime = 1min;
insert into xxx
select *
from xxx

*So my question is what's the suggested naming spc/guide/style for new 
tableConfiguration ?*

*Is there any FLIP or doc for this?*

Thanks





Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-09 Thread Timo Walther

Hi Austin,

if you don't want to worry about time at all, you should probably not 
use any windows because those are a time-based operation.


A solution that would look a bit nicer could be to use a pure 
KeyedProcessFunction and implement the deduplication logic without 
reusing windows. In ProcessFunctions you can register an event-time 
timer. The timer would be triggered by the MAX_WATERMARK when the 
pipeline shuts down even without having a timestamp assigned in the 
StreamRecord. Watermark will leave SQL also without a time attribute as 
far as I know.


Regards,
Timo


On 08.10.20 17:38, Austin Cawley-Edwards wrote:

Hey Timo,

Sorry for the delayed reply. I'm using the Blink planner and using 
non-time-based joins. I've got an example repo here that shows my query/ 
setup [1]. It's got the manual timestamp assignment commented out for 
now, but that does indeed solve the issue.


I'd really like to not worry about time at all in this job hah -- I 
started just using processing time, but Till pointed out that processing 
time timers won't be fired when input ends, which is the case for this 
streaming job processing CSV files, so I should be using event time. 
With that suggestion, I switched to ingestion time, where I then 
discovered the issue converting from SQL to data stream.


IMO, as a user manually assigning timestamps on conversion makes sense 
if you're using event time and already handling time attributes 
yourself, but for ingestion time you really don't want to think about 
time at all, which is why it might make sense to propigate the 
automatically assigned timestamps in that case. Though not sure how 
difficult that would be. Let me know what you think!



Best + thanks again,
Austin

[1]: https://github.com/austince/flink-1.10-sql-windowing-error

On Mon, Oct 5, 2020 at 4:24 AM Timo Walther > wrote:


Btw which planner are you using?

Regards,
Timo

On 05.10.20 10:23, Timo Walther wrote:
 > Hi Austin,
 >
 > could you share some details of your SQL query with us? The
reason why
 > I'm asking is because I guess that the rowtime field is not inserted
 > into the `StreamRecord` of DataStream API. The rowtime field is only
 > inserted if there is a single field in the output of the query
that is a
 > valid "time attribute".
 >
 > Esp. after non-time-based joins and aggregations, time attributes
loose
 > there properties and become regular timestamps. Because timestamp
and
 > watermarks might have diverged.
 >
 > If you know what you're doing, you can also assign the timestamp
 > manually after `toRetractStream.assignTimestampAndWatermarks` and
 > reinsert the field into the stream record. But before you do that, I
 > think it is better to share more information about the query with us.
 >
 > I hope this helps.
 >
 > Regards,
 > Timo
 >
 >
 >
 > On 05.10.20 09:25, Till Rohrmann wrote:
 >> Hi Austin,
 >>
 >> thanks for offering to help. First I would suggest asking Timo
whether
 >> this is an aspect which is still missing or whether we
overlooked it.
 >> Based on that we can then take the next steps.
 >>
 >> Cheers,
 >> Till
 >>
 >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
 >> mailto:austin.caw...@gmail.com>
>>
wrote:
 >>
 >>     Hey Till,
 >>
 >>     Thanks for the notes. Yeah, the docs don't mention anything
specific
 >>     to this case, not sure if it's an uncommon one. Assigning
timestamps
 >>     on conversion does solve the issue. I'm happy to take a stab at
 >>     implementing the feature if it is indeed missing and you all
think
 >>     it'd be worthwhile. I think it's definitely a confusing
aspect of
 >>     working w/ the Table & DataStream APIs together.
 >>
 >>     Best,
 >>     Austin
 >>
 >>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann
mailto:trohrm...@apache.org>
 >>     >>
wrote:
 >>
 >>     Hi Austin,
 >>
 >>     yes, it should also work for ingestion time.
 >>
 >>     I am not entirely sure whether event time is preserved when
 >>     converting a Table into a retract stream. It should be
possible
 >>     and if it is not working, then I guess it is a missing
feature.
 >>     But I am sure that @Timo Walther
 >>     > knows more about it. In doubt, you
 >>     could assign a new watermark generator when having
obtained the
 >>     retract stream.
 >>
 >>     Here is also a link to some information about event time and
 >>     watermarks [1]. Unfortunately, it 

Re: Flink Kuberntes Libraries

2020-10-09 Thread Till Rohrmann
Hi Saksham,

if you want to extend the Flink Docker image you can find here more details
[1].

If you want to include the library in your user jar, then you have to add
the library as a dependency to your pom.xml file and enable the shade
plugin for building an uber jar [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization
[2]
https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html

Cheers,
Till

On Fri, Oct 9, 2020 at 3:22 PM saksham sapra 
wrote:

> Thanks Till for helping out,
>
> The way you suggested, is it possible to copy libs which is in D directory
> to FLINK_HOME/libs. I tried to run a copy command : copy D:/data/libs to
> FLINK_HOME/libs and it gets copied but i dont how can i check where it gets
> copied and this libs is taken by flink?
>
>
> Thanks,
> Saksham Sapra
>
> On Wed, Oct 7, 2020 at 9:40 PM Till Rohrmann  wrote:
>
>> HI Saksham,
>>
>> the easiest approach would probably be to include the required libraries
>> in your user code jar which you submit to the cluster. Using maven's shade
>> plugin should help with this task. Alternatively, you could also create a
>> custom Flink Docker image where you add the required libraries to the
>> FLINK_HOME/libs directory. This would however mean that every job you
>> submit to the Flink cluster would see these libraries in the system class
>> path.
>>
>> Cheers,
>> Till
>>
>> On Wed, Oct 7, 2020 at 2:08 PM saksham sapra 
>> wrote:
>>
>>> Hi ,
>>>
>>> i have made some configuration using this link page :
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
>>> .
>>> and i am able to run flink on UI , but i need to submit a job using :
>>> http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy/#/submit
>>> through POstman, and i have some libraries which in local i can add in libs
>>> folder but in this how can i add my libraries so that it works properly.
>>>
>>> [image: image.png]
>>>
>>


Re: flink session job retention time

2020-10-09 Thread Andrey Zagrebin
Hi Richard,

If you mean the retention of completed jobs, there are following options:
jobstore.cache-size [1]
jobstore.expiration-time [2]
jobstore.max-capacity [3]

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#jobstore-cache-size
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#jobstore-expiration-time
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#jobstore-max-capacity

On Fri, Oct 9, 2020 at 9:45 AM Richard Moorhead 
wrote:

> Is there a configuration that controls how long jobs are retained in a
> flink session?
>


Re:Flink SQL Create View

2020-10-09 Thread hailongwang
Hi mulan,
 是的,1.11 才支持,相关 issue 可查看:
https://issues.apache.org/jira/browse/FLINK-17106


Best,
Hailong Wang




在 2020-10-09 20:17:11,"hua mulan"  写道:
>在代码中执行 CREATE VIEW语句是不是1.11才支持,我自己试了下。还想再发邮件确认下。
>
>String DML =
>"INSERT INTO dwd_foo SELECT id, name FROM v";
>String V_SQL =
>"CREATE VIEW v AS SELECT id, name FROM ods_foo";
>tEnv.sqlUpdate(ODS_DDL);
>tEnv.sqlUpdate(V_SQL);
>
>来自 Outlook


sql/table configuration naming guide/style/spec

2020-10-09 Thread Luan Cooper
Hi

I've read Sql Configuration guide
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#execution-options
*which is very helpful to write sql like hive*

*example on our flink sql ide: *
set table.exec.async-lookup.timeout = 3s;
insert into xxx
select *
from xxx

*what if I want to add new configuration for idleState retention time*
java style:
tableConfig.setIdleStateRetentionTime(Time.seconds(minSeconds),
Time.seconds(maxSeconds));

*sql style we're need to support:*
set sqlstate.idle.time.retention.mintime = 1min;
insert into xxx
select *
from xxx

*So my question is what's the suggested naming spc/guide/style for new
tableConfiguration ?*
*Is there any FLIP or doc for this?*

Thanks


Flink SQL Create View

2020-10-09 Thread hua mulan
在代码中执行 CREATE VIEW语句是不是1.11才支持,我自己试了下。还想再发邮件确认下。

String DML =
"INSERT INTO dwd_foo SELECT id, name FROM v";
String V_SQL =
"CREATE VIEW v AS SELECT id, name FROM ods_foo";
tEnv.sqlUpdate(ODS_DDL);
tEnv.sqlUpdate(V_SQL);

来自 Outlook


Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-09 Thread Dylan Forciea
Jark,

Thank you! I had actually mistyped the JIRA issue; autoCommit needs to be set 
to false for streaming to work. The default on the driver is true when the 
option isn’t specified. I’ve updated the issue accordingly.

Setting this to false automatically on the read path would fix my issue. 
However, I’m only certain that this is proper for Postgres. I’m not sure if 
this should be done for other drivers, although my gut would say it should be 
fine if it’s only done for reading. My patch as it is will set the builder to 
not specify whether to set autoCommit if the option is not specified, which 
means it would then be left at the default of true. That would conflict with 
the 1.11 patch you suggested. Let me know if you think I should make the 
default true in the SQL API.

https://github.com/apache/flink/pull/13570

Regards,
Dylan

From: Jark Wu 
Date: Thursday, October 8, 2020 at 10:15 PM
To: Dylan Forciea 
Cc: Till Rohrmann , dev , Shengkai 
Fang , "user@flink.apache.org" , 
Leonard Xu 
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Hi Dylan,

Sorry for the late reply. We've just come back from a long holiday.

Thanks for reporting this problem. First, I think this is a bug that 
`autoCommit` is false by default (JdbcRowDataInputFormat.Builder).
We can fix the default to true in 1.11 series, and I think this can solve your 
problem in a short term?
Besides, we should expose the connector options to set auto commit and this can 
be another issue to be implemented in master.
I'm glad to review the code.

What do you think?

Regarding to the failed JMXReporterFactoryTest, I think this is a known issue, 
see FLINK-19539 [1]

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-19539

On Fri, 9 Oct 2020 at 01:29, Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
I’ve updated the unit tests and documentation, and I was running the azure test 
pipeline as described in the instructions. However, it appears that what seems 
to be an unrelated test for the JMX code failed. Is this a matter of me not 
setting things up correctly? I wanted to ensure everything looked good before I 
submitted the PR.

[ERROR] Failures:
[ERROR]   JMXReporterFactoryTest.testPortRangeArgument:46
Expected: (a value equal to or greater than <9000> and a value less than or 
equal to <9010>)
 but: a value less than or equal to <9010> <9040> was greater than <9010>
[ERROR]   JMXReporterFactoryTest.testWithoutArgument:60
[INFO]
[ERROR] Tests run: 10, Failures: 2, Errors: 0, Skipped: 0

Thanks,
Dylan Forciea

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Thursday, October 8, 2020 at 2:29 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Cc: dev mailto:d...@flink.apache.org>>, Shengkai Fang 
mailto:fskm...@gmail.com>>, 
"user@flink.apache.org" 
mailto:user@flink.apache.org>>, 
"j...@apache.org" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

This sounds good. Maybe there are others in the community who can help with the 
review before the Jark and Leonard are back.

Cheers,
Till

On Wed, Oct 7, 2020 at 7:33 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
Actually…. It looks like what I did covers both cases. I’ll see about getting 
some unit tests and documentation updated.

Dylan

From: Dylan Forciea mailto:dy...@oseberg.io>>
Date: Wednesday, October 7, 2020 at 11:47 AM
To: Till Rohrmann mailto:trohrm...@apache.org>>, dev 
mailto:d...@flink.apache.org>>
Cc: Shengkai Fang mailto:fskm...@gmail.com>>, 
"user@flink.apache.org" 
mailto:user@flink.apache.org>>, 
"j...@apache.org" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Ok, I have created FLINK-19522 describing the issue. I have the code I made so 
far checked in at 
https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this 
only fixes the SQL API. It sounds like there may be another change needed for 
the Table API… I’ll look into that and see if I can figure it out on my own 
while they’re out. I will also need to add some unit tests and update some 
documentation to get this ready for a PR.

Thanks,
Dylan

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Wednesday, October 7, 2020 at 10:55 AM
To: dev mailto:d...@flink.apache.org>>
Cc: Shengkai Fang mailto:fskm...@gmail.com>>, 
"user@flink.apache.org" 
mailto:user@flink.apache.org>>, 
"j...@apache.org" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Hi Dylan,

thanks for reaching out to the Flink community and excuse our late response. I 
am not an expert for the Table API and its JDBC connector but what you describe 
sounds like a missing 

Re: Native State in Python Stateful Functions?

2020-10-09 Thread Clements, Danial C
Hi Gordan,

Glad to hear this is all inline with the patterns for StateFun!  I think one 
thing that still trips me up in understanding the relationship between Flink 
and StateFun is how masters and workers come into play.  In the case of remote 
functions, what are the scaling requirements for the Flink workers?  How do you 
know when you’ll need to add more instances?

Thanks,
Dan

From: "Tzu-Li (Gordon) Tai" 
Date: Friday 9 October 2020 at 10:28
To: "Clements, Danial C" 
Cc: user 
Subject: Re: Native State in Python Stateful Functions?

Hi,
On Fri, Oct 9, 2020, 4:20 PM Clements, Danial C 
mailto:danial.cleme...@optum.com>> wrote:
Hi,

This makes sense and I can appreciate the stateless aspect for the remote 
functions.  We have a number of components that need access to quite a bit of 
data, the idea was to key the incoming stream in a way that would minimize 
calls to a reference DB and then store that result set in the state so it would 
be readily available for subsequent messages with the same key.  Additionally, 
I had hoped to use delayed messages as a way of invalidating cache after a 
certain amount of time.  Please tell me if this is an antipattern as this 
project is really my first foray into stream processing.

This is definitely not an antipattern!
Co-sharding state and message streams so that compute may benefit from local 
state access instead of requiring remote queries is one of the key principles 
of distributed stateful stream processing.

Your idea of invalidating old state is also very sane. You can actually just 
set a state TTL for that [1].

For us, python is a hard requirement so I was hoping that the state would be 
similar to the other Flink jobs where its local to the processor, however given 
the remote stateful architecture, it completely makes sense why it
Using non-JVM languages, state access must always somehow be transported out 
from the Flink JVM processes (where the state is maintained) to the functions, 
whether it's over a local or remote network.

This is the same for all Python libraries on top of Flink, such as Flink's 
Python Table API, or Apache Beam's Python API.
Both of these require transporting state over a local network.

If you'd like to use StateFun because of the programming constructs and dynamic 
messaging flexibility it provides, you actually have many different function 
deployment options.

For example, the remote deployment approach I explained in my previous email, 
in which functions are deployed as services separate to the Flink StateFun 
cluster and can benefit from rapid scalability and zero downtime upgrades / 
live reloads.

Alternatively, if you prefer performance over operational flexibility, you can 
consider the sidecar / co-location deployment approach [2].

Cheers,
Gordon


[1] 
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/index.html#defining-functions

[2] 
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/distributed_architecture.html#co-located-functions

On a separate topic, is anyone using StateFun in production?

Thanks,
Dan

From: "Tzu-Li (Gordon) Tai" mailto:tzuli...@apache.org>>
Date: Friday 9 October 2020 at 06:54
To: "Clements, Danial C" 
mailto:danial.cleme...@optum.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Native State in Python Stateful Functions?

Hi,

Nice to hear that you are trying out StateFun!

It is by design that function state is attached to each HTTP invocation 
request, as defined by StateFun's remote invocation request-reply protocol.
This decision was made with typical application cloud-native architectures in 
mind - having function deployments be stateless and require no session 
dependencies between the StateFun runtime and the functions services allows the 
functions to scale out very easily.

There are some discussions on potentially adding a bi-directional protocol in 
the future so that state can be lazily fetched on demand instead of every 
invocation, but that is still in very early stages of discussion.

Could you briefly describe what the state access pattern in your application 
looks like?
Maybe this can provide some insight for us in figuring out how a more advanced 
/ efficient protocol should be designed in future releases.

On Thu, Oct 8, 2020, 6:20 PM Clements, Danial C 
mailto:danial.cleme...@optum.com>> wrote:
Hi,

In doing some testing with Flink stateful functions in Python and I’ve gotten a 
small POC working.  One of our key requirements for our stream processors is 
that they be written in python due to the skillset of our team.  Given that the 
Python DataStreams api seems to be under development in Flink 1.12, we’ve 
implemented our business logic as a stateful function using the remote pattern. 
 In some testing, it seems the state object is getting serialized and sent 
along with each HTTP request
One clarification here:
StateFun does not serialize or deserialize state, everything is maintained and 

Re: how to simulate the scene "back pressure" in flink?Thanks~!

2020-10-09 Thread David Anderson
The Flink Operations Playground includes an optional backpressure
simulation you can experiment with. It is described at the end of [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/flink-operations-playground.html#variants


On Fri, Oct 9, 2020 at 10:02 AM Arvid Heise  wrote:

> You can add
>
> conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
>
> at any place before creating the environment [1]. Default value is 8081,
> so you can access web ui through http://localhost:8081, but you can
> really choose any other free port.
>
> [1]
> https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L198
> On Fri, Oct 9, 2020 at 9:24 AM 大森林  wrote:
>
>>  Thanks for your repies,
>> could you tell me where to set RestOption.POPT?in configuration
>> what's the value should I set for RestOption.PORT?
>>
>> Thanks.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Arvid Heise" ;
>> *发送时间:* 2020年10月9日(星期五) 下午3:00
>> *收件人:* "大森林";
>> *抄送:* "David,Anderson";"user"<
>> user@flink.apache.org>;
>> *主题:* Re: how to simulate the scene "back pressure" in flink?Thanks~!
>>
>> The easiest way to see backpressure is to add some sleep to your sink,
>> check [1] for an example.
>> If you execute that unit test with a RestOption.PORT set in the
>> configuration, you can even load the Web UI and watch the backpressure
>> accumulate and finally go away at the end of the test.
>>
>> [1]
>> https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L536
>>
>> On Tue, Oct 6, 2020 at 4:13 PM 大森林  wrote:
>>
>>>
>>> I want to learn the concept "back pressure".
>>> but I can not find the datastream generator example to generate a lot of
>>> data.
>>>
>>> besides,
>>> is there any example on how to simulate the scene "back pressure"  in
>>> WEB UI?
>>>
>>> Thanks for your help~
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2020-10-09 Thread Matthias
Reviving this thread again after I came across FLINK-12214 [1] since there
are use cases which might benefit from this feature. Was there some
conclusion on public APIs in the meantime? Should we proceed with the
discussion here?

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-12214



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re: checkpoint rocksdb hdfs 如何协调,保证数据不丢失

2020-10-09 Thread Michael Ran
好的,非常感谢。
在 2020-09-29 14:23:10,"王冶"  写道:
>Hi~ 按你的问题顺序回答如下:
> 1. Flink中的RocksDB是支持保存到hdfs的,且支持的非常好,将rocksdb的存储路径设置为hdfs路径即可。
> 2.
>in-flight的数据是保存在本地磁盘的,仅当checkpoint的时候,才会将本地的状态拷贝到hdfs。而且checkpoint本身不会因为远程拷贝影响计算速度。
> 3.
>多久备份一次,取决于你配置的checkpoint的间隔。每次checkpoint都会备份&远程拷贝。但请注意,默认配置下checkpoint会在作业停止后删除,这时候你需要手动触发savepoint,你当然也可以在作业运行过程中出发保存savepoint,savepoint的好处是不会随作业停止而删除,且可以让新作业基于savepoint启动,从而实现exactly-once或at-least的语义。
> 4. Flink提供多种状态后端,需要根据你的实际场景选择。但对于大状态和高可用场景,推荐rocksdb。具体的推荐还是多读下文档。
>
>文档:
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#rocksdb-state-backend-details
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/large_state_tuning.html
>
>祝好,
>By Soda
>
>
>On Tue, 29 Sep 2020 at 11:06, Michael Ran  wrote:
>
>> dear all :
>> 我们checkpoint 信息默认保存在rocksdb上,但是rocksdb
>> 是一个单机系统,性能OK,要做到不丢失还是要备份到hdfs分布式文件系统上。
>>
>>
>>问题是:
>>1. 如果仅保存在rocksdb,那么单机物理损坏,数据是会丢失的。
>>2. 如果仅保存hdfs,那么性能可能跟不上
>>3.如果先保存到rocksdb,再定时备份到hdfs,那么是多久备份一次?中间出现物理损坏,还是会出现一端时间的丢失。
>>4. 这块的详细设计,和具体流程、场景有合适的文档推荐吗?怎么再性能和数据完整性上做考虑的


Re:Re: flink1.11流式写入hive速度慢的问题

2020-10-09 Thread me
flink1.11大家有没有遇到写入hive速度慢的问题,加到并行度之后,写入速度1000条/秒,写入性能还是很差劲,完全不满足需要,要怎么把实时的数据写入hive中?
flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有1000条/秒,
Datastream 是直接读取的kafka数据,速度现在是1条每秒,现在只能写入1000条/每秒
val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
chaitin_test.printSchema()
tableEnv.executeSql("insert into chaitin_test select test from " + chaitin_test)


 原始邮件 
发件人: me
收件人: user-zh
发送时间: 2020年10月9日(周五) 17:33
主题: Re:Re: flink1.11流式写入hive速度慢的问题


您的回复感觉一点关系也没有,有点打广告的嫌疑 原始邮件 发件人: Michael Ran 收件人: 
user-zh 发送时间: 2020年10月9日(周五) 17:14 主题: Re:Re: 
flink1.11流式写入hive速度慢的问题 不知道现在flink 能否直接获取hive 文件写入。以前直接用jdbc 写hive 
速度本来就快不起来,每次都要生成文件。 如果先写文件,文件写好了再进行一次load 就会快很多 在 2020-10-09 15:55:15,"Jingsong 
Li"  写道: >Hi, >是Hive表吧? 
>https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的 > 
>可以下载最新的1.11分支的Hive依赖来试下: 
>https://repository.apache.org/snapshots/org/apache/flink/ 
>(比如你用hive-1.2.2依赖,你可以下载 
>https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
 >) > >Best, >Jingsong > >On Fri, Oct 9, 2020 at 3:50 PM me  
wrote: > >> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table >> >> >> 
原始邮件 >> 发件人: me >> 收件人: user-zh >> 
发送时间: 2020年10月9日(周五) 15:34 >> 主题: flink1.11流式写入hive速度慢的问题 >> >> >> flink1.11 
将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒 >> val chaitin_test = 
tableEnv.fromDataStream(dataStream,'test) >> chaitin_test.printSchema() >> 
tableEnv.executeSql("insert into chaitin_test select test from " + >> 
chaitin_test) > > > >-- >Best, Jingsong Lee

Re:Re: flink1.11流式写入hive速度慢的问题

2020-10-09 Thread me
您的回复感觉一点关系也没有,有点打广告的嫌疑


 原始邮件 
发件人: Michael Ran
收件人: user-zh
发送时间: 2020年10月9日(周五) 17:14
主题: Re:Re: flink1.11流式写入hive速度慢的问题


不知道现在flink 能否直接获取hive 文件写入。以前直接用jdbc 写hive 速度本来就快不起来,每次都要生成文件。 
如果先写文件,文件写好了再进行一次load 就会快很多 在 2020-10-09 15:55:15,"Jingsong Li" 
 写道: >Hi, >是Hive表吧? 
>https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的 > 
>可以下载最新的1.11分支的Hive依赖来试下: 
>https://repository.apache.org/snapshots/org/apache/flink/ 
>(比如你用hive-1.2.2依赖,你可以下载 
>https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
 >) > >Best, >Jingsong > >On Fri, Oct 9, 2020 at 3:50 PM me  
wrote: > >> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table >> >> >> 
原始邮件 >> 发件人: me >> 收件人: user-zh >> 
发送时间: 2020年10月9日(周五) 15:34 >> 主题: flink1.11流式写入hive速度慢的问题 >> >> >> flink1.11 
将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒 >> val chaitin_test = 
tableEnv.fromDataStream(dataStream,'test) >> chaitin_test.printSchema() >> 
tableEnv.executeSql("insert into chaitin_test select test from " + >> 
chaitin_test) > > > >-- >Best, Jingsong Lee

Re: Native State in Python Stateful Functions?

2020-10-09 Thread Tzu-Li (Gordon) Tai
Hi,

On Fri, Oct 9, 2020, 4:20 PM Clements, Danial C 
wrote:

> Hi,
>
>
>
> This makes sense and I can appreciate the stateless aspect for the remote
> functions.  We have a number of components that need access to quite a bit
> of data, the idea was to key the incoming stream in a way that would
> minimize calls to a reference DB and then store that result set in the
> state so it would be readily available for subsequent messages with the
> same key.  Additionally, I had hoped to use delayed messages as a way of
> invalidating cache after a certain amount of time.  Please tell me if this
> is an antipattern as this project is really my first foray into stream
> processing.
>

This is definitely not an antipattern!
Co-sharding state and message streams so that compute may benefit from
local state access instead of requiring remote queries is one of the key
principles of distributed stateful stream processing.

Your idea of invalidating old state is also very sane. You can actually
just set a state TTL for that [1].

>
>
> For us, python is a hard requirement so I was hoping that the state would
> be similar to the other Flink jobs where its local to the processor,
> however given the remote stateful architecture, it completely makes sense
> why it
>
Using non-JVM languages, state access must always somehow be transported
out from the Flink JVM processes (where the state is maintained) to the
functions, whether it's over a local or remote network.

This is the same for all Python libraries on top of Flink, such as Flink's
Python Table API, or Apache Beam's Python API.
Both of these require transporting state over a local network.

If you'd like to use StateFun because of the programming constructs and
dynamic messaging flexibility it provides, you actually have many different
function deployment options.

For example, the remote deployment approach I explained in my previous
email, in which functions are deployed as services separate to the Flink
StateFun cluster and can benefit from rapid scalability and zero downtime
upgrades / live reloads.

Alternatively, if you prefer performance over operational flexibility, you
can consider the sidecar / co-location deployment approach [2].

Cheers,
Gordon


[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/index.html#defining-functions

[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/distributed_architecture.html#co-located-functions

On a separate topic, is anyone using StateFun in production?
>
>
>
> Thanks,
>
> Dan
>
>
>
> *From: *"Tzu-Li (Gordon) Tai" 
> *Date: *Friday 9 October 2020 at 06:54
> *To: *"Clements, Danial C" 
> *Cc: *user 
> *Subject: *Re: Native State in Python Stateful Functions?
>
>
>
> Hi,
>
>
>
> Nice to hear that you are trying out StateFun!
>
>
>
> It is by design that function state is attached to each HTTP invocation
> request, as defined by StateFun's remote invocation request-reply protocol.
>
> This decision was made with typical application cloud-native architectures
> in mind - having function deployments be stateless and require no session
> dependencies between the StateFun runtime and the functions services allows
> the functions to scale out very easily.
>
>
>
> There are some discussions on potentially adding a bi-directional protocol
> in the future so that state can be lazily fetched on demand instead of
> every invocation, but that is still in very early stages of discussion.
>
>
>
> Could you briefly describe what the state access pattern in your
> application looks like?
>
> Maybe this can provide some insight for us in figuring out how a more
> advanced / efficient protocol should be designed in future releases.
>
>
>
> On Thu, Oct 8, 2020, 6:20 PM Clements, Danial C 
> wrote:
>
> Hi,
>
>
>
> In doing some testing with Flink stateful functions in Python and I’ve
> gotten a small POC working.  One of our key requirements for our stream
> processors is that they be written in python due to the skillset of our
> team.  Given that the Python DataStreams api seems to be under development
> in Flink 1.12, we’ve implemented our business logic as a stateful function
> using the remote pattern.  In some testing, it seems the state object is
> getting serialized and sent along with each HTTP request
>
> One clarification here:
>
> StateFun does not serialize or deserialize state, everything is maintained
> and provided to functions as byte arrays.
>
> Serialization / deserialization happens in user code (i.e. the functions).
>
>
>
> Cheers,
>
> Gordon
>
> and given that we’re storing quite a bit of data in this state, this seems
> to contribute to the latency of the application in a linear fashion.  Is
> there any way around this?  Is there a way to store the state local to the
> python application?
>
>
>
> Thanks,
>
> Dan
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to 

FlinkJobNotFoundException????

2020-10-09 Thread ??????
mysql??MySQLlinux??
py4j.protocol.Py4JJavaError: An error occurred while calling o6.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (4d6b1273229e0e16fa433c652b5cb74d)





??


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
batch_source_ddl = """
CREATE TABLE mh_source_tab (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
ts BIGINT,
rt Decimal(6,2),
time1 VARCHAR 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**', 
'table-name' = 'nj_mh_test', --??
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'


)


warn_alarm_mh_ddl = """
CREATE TABLE warn_alarm_mh_sink (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
extremum Decimal(6,2),
PRIMARY KEY (lid,dir,posid,poleId) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**',
'table-name' = 'warn_mh_alarm_result', --??
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(batch_source_ddl )
t_env.execute_sql(warn_alarm_mh_ddl)


def threshold_alarm(delta_thres):
  source = t_env.from_path("mh_source_tab") \
.where("rt < -1")\
.group_by("lid, dir, posid, km, poleId")\
.select("lid, dir, posid, km, poleId, max(rt) as 
max_rt, min(rt) as min_rt, max(rt)-min(rt) as extremum")\
.where("extremum "+str(delta_thres))\
.select("lid, dir, posid, km, poleId, extremum")
  source.execute_insert("warn_alarm_mh_sink") \
 .get_job_client() \
 .get_job_execution_result() \
 .result()
if __name__ == '__main__':
  threshold_alarm(delta_thres=0.5)

FlinkJobNotFoundException????

2020-10-09 Thread ??????
mysql??MySQLlinux??
py4j.protocol.Py4JJavaError: An error occurred while calling o6.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (4d6b1273229e0e16fa433c652b5cb74d)





??


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
batch_source_ddl = """
CREATE TABLE mh_source_tab (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
ts BIGINT,
rt Decimal(6,2),
time1 VARCHAR 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**', 
'table-name' = 'nj_mh_test', --??
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'


)


warn_alarm_mh_ddl = """
CREATE TABLE warn_alarm_mh_sink (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
extremum Decimal(6,2),
PRIMARY KEY (lid,dir,posid,poleId) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**',
'table-name' = 'warn_mh_alarm_result', --??
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(batch_source_ddl)
t_env.execute_sql(warn_alarm_mh_ddl)


def threshold_alarm(delta_thres):
  source = t_env.from_path("mh_source_tab") \
.where("rt < -1")\
.group_by("lid, dir, posid, km, poleId")\
.select("lid, dir, posid, km, poleId, max(rt) as 
max_rt, min(rt) as min_rt, max(rt)-min(rt) as extremum")\
.where("extremum "+str(delta_thres))\
.select("lid, dir, posid, km, poleId, extremum")
  source.execute_insert("warn_alarm_mh_sink") \
 .get_job_client() \
 .get_job_execution_result() \
 .result()
if __name__ == '__main__':
  threshold_alarm(delta_thres=0.5)

Re:Re: flink1.11流式写入hive速度慢的问题

2020-10-09 Thread Michael Ran
不知道现在flink 能否直接获取hive 文件写入。以前直接用jdbc 写hive 速度本来就快不起来,每次都要生成文件。  
如果先写文件,文件写好了再进行一次load  就会快很多
在 2020-10-09 15:55:15,"Jingsong Li"  写道:
>Hi,
>是Hive表吧?
>https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的
>
>可以下载最新的1.11分支的Hive依赖来试下:
>https://repository.apache.org/snapshots/org/apache/flink/
>(比如你用hive-1.2.2依赖,你可以下载
>https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
>)
>
>Best,
>Jingsong
>
>On Fri, Oct 9, 2020 at 3:50 PM me  wrote:
>
>> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table
>>
>>
>>  原始邮件
>> 发件人: me
>> 收件人: user-zh
>> 发送时间: 2020年10月9日(周五) 15:34
>> 主题: flink1.11流式写入hive速度慢的问题
>>
>>
>> flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
>> val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
>> chaitin_test.printSchema()
>> tableEnv.executeSql("insert into chaitin_test select test from " +
>> chaitin_test)
>
>
>
>-- 
>Best, Jingsong Lee


Re: Flink 1.11 submit job timed out

2020-10-09 Thread DONG, Weike
Hi,

我们也遇到了同样的问题,并行度增加后,JobManager 卡住的时间越来越长,直到所有的 TaskManager 都被迫超时了。目前来看和 GC
无关,网络这里嫌疑更大。

On Fri, Jul 31, 2020 at 7:55 PM Matt Wang  wrote:

> 遇到了同样的问题,也是启动了 taskmanager-query-state-service.yaml
> 这个服务后,作业才能正常提交的,另外我是在本地装的 k8s 集群进行测试的,如果是 GC 的问题,启不启动 TM service 应该不会有影响的
>
>
> --
>
> Best,
> Matt Wang
>
>
> On 07/27/2020 15:01,Yang Wang wrote:
> 建议先配置heartbeat.timeout的值大一些,然后把gc log打出来
> 看看是不是经常发生fullGC,每次持续时间是多长,从你目前提供的log看,进程内JM->RM都会心跳超时
> 怀疑还是和GC有关的
>
> env.java.opts.jobmanager: -Xloggc:/jobmanager-gc.log
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M
>
>
> Best,
> Yang
>
> SmileSmile  于2020年7月27日周一 下午1:50写道:
>
> Hi,Yang Wang
>
> 因为日志太长了,删了一些重复的内容。
> 一开始怀疑过jm gc的问题,将jm的内存调整为10g也是一样的情况。
>
> Best
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> On 07/27/2020 11:36, Yang Wang wrote:
> 看你这个任务,失败的根本原因并不是“No hostname could be resolved
> ”,这个WARNING的原因可以单独讨论(如果在1.10里面不存在的话)。
> 你可以本地起一个Standalone的集群,也会有这样的WARNING,并不影响正常使用
>
>
> 失败的原因是slot 5分钟申请超时了,你给的日志里面2020-07-23 13:55:45,519到2020-07-23
> 13:58:18,037是空白的,没有进行省略吧?
> 这段时间按理应该是task开始deploy了。在日志里看到了JM->RM的心跳超时,同一个Pod里面的同一个进程通信也超时了
> 所以怀疑JM一直在FullGC,这个需要你确认一下
>
>
> Best,
> Yang
>
> SmileSmile  于2020年7月23日周四 下午2:43写道:
>
> Hi Yang Wang
>
> 先分享下我这边的环境版本
>
>
> kubernetes:1.17.4.   CNI: weave
>
>
> 1 2 3 是我的一些疑惑
>
> 4 是JM日志
>
>
> 1. 去掉taskmanager-query-state-service.yaml后确实不行  nslookup
>
> kubectl exec -it busybox2 -- /bin/sh
> / # nslookup 10.47.96.2
> Server:  10.96.0.10
> Address: 10.96.0.10:53
>
> ** server can't find 2.96.47.10.in-addr.arpa: NXDOMAIN
>
>
>
> 2. Flink1.11和Flink1.10
>
> detail subtasks taskmanagers xxx x 这行
>
>
> 1.11变成了172-20-0-50。1.10是flink-taskmanager-7b5d6958b6-sfzlk:36459。这块的改动是?(目前这个集群跑着1.10和1.11,1.10可以正常运行,如果coredns有问题,1.10版本的flink应该也有一样的情况吧?)
>
> 3. coredns是否特殊配置?
>
> 在容器中解析域名是正常的,只是反向解析没有service才会有问题。coredns是否有什么需要配置?
>
>
> 4. time out时候的JM日志如下:
>
>
>
> 2020-07-23 13:53:00,228 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> ResourceManager akka.tcp://flink@flink-jobmanager
> :6123/user/rpc/resourcemanager_0
> was granted leadership with fencing token
> 
> 2020-07-23 13:53:00,232 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
> Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/rpc/dispatcher_1 .
> 2020-07-23 13:53:00,233 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl []
> -
> Starting the SlotManager.
> 2020-07-23 13:53:03,472 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registering TaskManager with ResourceID 1f9ae0cd95a28943a73be26323588696
> (akka.tcp://flink@10.34.128.9:6122/user/rpc/taskmanager_0) at
> ResourceManager
> 2020-07-23 13:53:03,777 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registering TaskManager with ResourceID cac09e751264e61615329c20713a84b4
> (akka.tcp://flink@10.32.160.6:6122/user/rpc/taskmanager_0) at
> ResourceManager
> 2020-07-23 13:53:03,787 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registering TaskManager with ResourceID 93c72d01d09f9ae427c5fc980ed4c1e4
> (akka.tcp://flink@10.39.0.8:6122/user/rpc/taskmanager_0) at
> ResourceManager
> 2020-07-23 13:53:04,044 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registering TaskManager with ResourceID 8adf2f8e81b77a16d5418a9e252c61e2
> (akka.tcp://flink@10.38.64.7:6122/user/rpc/taskmanager_0) at
> ResourceManager
> 2020-07-23 13:53:04,099 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registering TaskManager with ResourceID 23e9d2358f6eb76b9ae718d879d4f330
> (akka.tcp://flink@10.42.160.6:6122/user/rpc/taskmanager_0) at
> ResourceManager
> 2020-07-23 13:53:04,146 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registering TaskManager with ResourceID 092f8dee299e32df13db3111662b61f8
> (akka.tcp://flink@10.33.192.14:6122/user/rpc/taskmanager_0) at
> ResourceManager
>
>
> 2020-07-23 13:55:44,220 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
> Received
> JobGraph submission 99a030d0e3f428490a501c0132f27a56 (JobTest).
> 2020-07-23 13:55:44,222 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
> Submitting job 99a030d0e3f428490a501c0132f27a56 (JobTest).
> 2020-07-23 13:55:44,251 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
> Starting
> RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_2 .
> 2020-07-23 13:55:44,260 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> [] - Initializing job JobTest
> (99a030d0e3f428490a501c0132f27a56).
> 2020-07-23 13:55:44,278 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using restart back off time strategy
> NoRestartBackoffTimeStrategy for 

Re: Native State in Python Stateful Functions?

2020-10-09 Thread Clements, Danial C
Hi,

This makes sense and I can appreciate the stateless aspect for the remote 
functions.  We have a number of components that need access to quite a bit of 
data, the idea was to key the incoming stream in a way that would minimize 
calls to a reference DB and then store that result set in the state so it would 
be readily available for subsequent messages with the same key.  Additionally, 
I had hoped to use delayed messages as a way of invalidating cache after a 
certain amount of time.  Please tell me if this is an antipattern as this 
project is really my first foray into stream processing.

For us, python is a hard requirement so I was hoping that the state would be 
similar to the other Flink jobs where its local to the processor, however given 
the remote stateful architecture, it completely makes sense why it isn’t.

On a separate topic, is anyone using StateFun in production?

Thanks,
Dan

From: "Tzu-Li (Gordon) Tai" 
Date: Friday 9 October 2020 at 06:54
To: "Clements, Danial C" 
Cc: user 
Subject: Re: Native State in Python Stateful Functions?

Hi,

Nice to hear that you are trying out StateFun!

It is by design that function state is attached to each HTTP invocation 
request, as defined by StateFun's remote invocation request-reply protocol.
This decision was made with typical application cloud-native architectures in 
mind - having function deployments be stateless and require no session 
dependencies between the StateFun runtime and the functions services allows the 
functions to scale out very easily.

There are some discussions on potentially adding a bi-directional protocol in 
the future so that state can be lazily fetched on demand instead of every 
invocation, but that is still in very early stages of discussion.

Could you briefly describe what the state access pattern in your application 
looks like?
Maybe this can provide some insight for us in figuring out how a more advanced 
/ efficient protocol should be designed in future releases.

On Thu, Oct 8, 2020, 6:20 PM Clements, Danial C 
mailto:danial.cleme...@optum.com>> wrote:
Hi,

In doing some testing with Flink stateful functions in Python and I’ve gotten a 
small POC working.  One of our key requirements for our stream processors is 
that they be written in python due to the skillset of our team.  Given that the 
Python DataStreams api seems to be under development in Flink 1.12, we’ve 
implemented our business logic as a stateful function using the remote pattern. 
 In some testing, it seems the state object is getting serialized and sent 
along with each HTTP request
One clarification here:
StateFun does not serialize or deserialize state, everything is maintained and 
provided to functions as byte arrays.
Serialization / deserialization happens in user code (i.e. the functions).

Cheers,
Gordon
and given that we’re storing quite a bit of data in this state, this seems to 
contribute to the latency of the application in a linear fashion.  Is there any 
way around this?  Is there a way to store the state local to the python 
application?

Thanks,
Dan

This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.

This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.


Re: how to simulate the scene "back pressure" in flink?Thanks~!

2020-10-09 Thread Arvid Heise
You can add

conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());

at any place before creating the environment [1]. Default value is 8081, so
you can access web ui through http://localhost:8081, but you can really
choose any other free port.

[1]
https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L198
On Fri, Oct 9, 2020 at 9:24 AM 大森林  wrote:

>  Thanks for your repies,
> could you tell me where to set RestOption.POPT?in configuration
> what's the value should I set for RestOption.PORT?
>
> Thanks.
>
>
> -- 原始邮件 --
> *发件人:* "Arvid Heise" ;
> *发送时间:* 2020年10月9日(星期五) 下午3:00
> *收件人:* "大森林";
> *抄送:* "David,Anderson";"user"<
> user@flink.apache.org>;
> *主题:* Re: how to simulate the scene "back pressure" in flink?Thanks~!
>
> The easiest way to see backpressure is to add some sleep to your sink,
> check [1] for an example.
> If you execute that unit test with a RestOption.PORT set in the
> configuration, you can even load the Web UI and watch the backpressure
> accumulate and finally go away at the end of the test.
>
> [1]
> https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L536
>
> On Tue, Oct 6, 2020 at 4:13 PM 大森林  wrote:
>
>>
>> I want to learn the concept "back pressure".
>> but I can not find the datastream generator example to generate a lot of
>> data.
>>
>> besides,
>> is there any example on how to simulate the scene "back pressure"  in WEB
>> UI?
>>
>> Thanks for your help~
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: How can I increase the parallelism on the Table API for Streaming Aggregation?

2020-10-09 Thread Felipe Gutierrez
thanks! I will test
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Thu, Oct 8, 2020 at 6:19 PM Khachatryan Roman
 wrote:
>
> Hi Felipe,
>
> Your source is not parallel so it doesn't make sense to make local group 
> operator parallel.
> If the source implemented ParallelSourceFunction, subsequent operators would 
> be parallelized too.
>
> Regards,
> Roman
>
>
> On Thu, Oct 8, 2020 at 5:00 PM Felipe Gutierrez 
>  wrote:
>>
>> Hi community,
>>
>> I was implementing the stream aggregation using Table API [1] and
>> trying out the local aggregation plan to optimize the query. Basically
>> I had to configure it like this:
>>
>> Configuration configuration = tableEnv.getConfig().getConfiguration();
>> // set low-level key-value options
>> configuration.setInteger("table.exec.resource.default-parallelism", 4);
>> // local-global aggregation depends on mini-batch is enabled
>> configuration.setString("table.exec.mini-batch.enabled", "true");
>> configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
>> configuration.setString("table.exec.mini-batch.size", "1000");
>> // enable two-phase, i.e. local-global aggregation
>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>>
>> and when I saw the query plan on the dashboard I realized that the
>> LocalGroupAggregate is with parallelism 1 while the
>> GlobalGroupAggregate is with parallelism 4. Why was the
>> LocalGroupAggregate also with parallelism 4 since I set it on the
>> property ("table.exec.resource.default-parallelism"? Here is my code
>> [2].
>>
>> Thanks,
>> Felipe
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
>> [2] 
>> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com


Re: flink1.11流式写入hive速度慢的问题

2020-10-09 Thread Jingsong Li
Hi,
是Hive表吧?
https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的

可以下载最新的1.11分支的Hive依赖来试下:
https://repository.apache.org/snapshots/org/apache/flink/
(比如你用hive-1.2.2依赖,你可以下载
https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
)

Best,
Jingsong

On Fri, Oct 9, 2020 at 3:50 PM me  wrote:

> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table
>
>
>  原始邮件
> 发件人: me
> 收件人: user-zh
> 发送时间: 2020年10月9日(周五) 15:34
> 主题: flink1.11流式写入hive速度慢的问题
>
>
> flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
> val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
> chaitin_test.printSchema()
> tableEnv.executeSql("insert into chaitin_test select test from " +
> chaitin_test)



-- 
Best, Jingsong Lee


转发:flink1.11流式写入hive速度慢的问题

2020-10-09 Thread me
dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table


 原始邮件 
发件人: me
收件人: user-zh
发送时间: 2020年10月9日(周五) 15:34
主题: flink1.11流式写入hive速度慢的问题


flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
chaitin_test.printSchema()
tableEnv.executeSql("insert into chaitin_test select test from " + chaitin_test)

【公告】Flink Forward 2020 亚洲峰会议题提交时间延长

2020-10-09 Thread Kurt Young
大家好,

希望大家都过了一个美好充实的国庆。由于长假的影响,我们也决定将 Flink Forward 2020 亚洲峰会的议题提交截止时间延长到
*2020年10月22日*,提交链接:https://sourl.cn/ZEXM2Y

期待您的投递和参会!如果您有任何问题欢迎与我联系。

谢谢,
Kurt


?????? how to simulate the scene "back pressure" in flink?Thanks~!

2020-10-09 Thread ??????
Thanks for your repies,
could you tell me where to set RestOption.POPT?in configuration
what's the value should I set for RestOption.PORT?


Thanks.




----
??: 
   "Arvid Heise"

https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L536


On Tue, Oct 6, 2020 at 4:13 PM ?? 

Re: how to simulate the scene "back pressure" in flink?Thanks~!

2020-10-09 Thread Arvid Heise
The easiest way to see backpressure is to add some sleep to your sink,
check [1] for an example.
If you execute that unit test with a RestOption.PORT set in the
configuration, you can even load the Web UI and watch the backpressure
accumulate and finally go away at the end of the test.

[1]
https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L536

On Tue, Oct 6, 2020 at 4:13 PM 大森林  wrote:

>
> I want to learn the concept "back pressure".
> but I can not find the datastream generator example to generate a lot of
> data.
>
> besides,
> is there any example on how to simulate the scene "back pressure"  in WEB
> UI?
>
> Thanks for your help~
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


flink1.11流式写入hive速度慢的问题

2020-10-09 Thread me
flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
chaitin_test.printSchema()
tableEnv.executeSql("insert into chaitin_test select test from " + chaitin_test)

flink session job retention time

2020-10-09 Thread Richard Moorhead
Is there a configuration that controls how long jobs are retained in a
flink session?


Re: checkpoint失败导致watermark不更新问题

2020-10-09 Thread restart
感谢回复,关于反压问题,是有关注过的,一般checkpoint失败次数增多时,通过flink-web看到的反压确实都是high,因为有过这样的想法,就是那怕下游反压大,最终sink
es的通道是通的,理论上数据还是会流向下游的,只是快慢问题,watermark迟早会更新的,所以一直把注意力放在checkpoint这块上,我按你的建议,先解决下反压问题,在看下情况如何。
再次感谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: NoResourceAvailableException

2020-10-09 Thread Khachatryan Roman
I assume that before submitting a job you started a cluster with default
settings with ./bin/start-cluster.sh.

Did you submit any other jobs?
Can you share the logs from log folder?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:03 PM Alexander Semeshchenko 
wrote:

>
> 
>
> Installing (download & tar zxf) Apache Flink 1.11.1 and running: ./bin/flink
> run examples/streaming/WordCount.jar it show on the nice message after
> more less 5 min. the trying of submitting:  Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources. at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ... 45 more Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>
> It's Flink default configuration.
>
> Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little
> Endian CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 1 Core(s) per
> socket: 1
> free -g total used free shared buff/cache available
>
> Mem: 62 1 23 3 37 57 Swap: 7 0 7
>
> are there some advices about what is happened?
>


Flink Sql client on yarn 问题

2020-10-09 Thread air23
你好。我在sql client 配置的yarn  yarn日志报错如下
2020-10-09 14:17:37,721 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 

2020-10-09 14:17:37,726 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Starting 
YarnJobClusterEntrypoint (Version: 1.11.1, Scala: 2.11, Rev:7eb514a, 
Date:2020-07-15T07:02:09+02:00)
2020-10-09 14:17:37,726 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  OS current 
user: yarn
2020-10-09 14:17:38,101 FATAL org.apache.hadoop.conf.Configuration  
   [] - error parsing conf core-site.xml
java.io.FileNotFoundException: /etc/hadoop/conf.cloudera.yarn/core-site.xml 
(Permission denied)
at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_231]
at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_231]
at java.io.FileInputStream.(FileInputStream.java:138) 
~[?:1.8.0_231]
at java.io.FileInputStream.(FileInputStream.java:93) 
~[?:1.8.0_231]
at 
sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90) 
~[?:1.8.0_231]
at 
sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188)
 ~[?:1.8.0_231]
at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2629) 
~[hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2703) 
[hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2653) 
[hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2559) 
[hadoop-common-2.6.0-cdh5.16.2.jar:?]
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1078) 
[hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1132) 
[hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1540) 
[hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.security.SecurityUtil.setConfigurationInternal(SecurityUtil.java:85)
 [hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.security.SecurityUtil.(SecurityUtil.java:74) 
[hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:320)
 [hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:308)
 [hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:895)
 [hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:861)
 [hadoop-common-2.6.0-cdh5.16.2.jar:?]
at 
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:728)
 [hadoop-common-2.6.0-cdh5.16.2.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_231]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_231]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_231]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_231]
at 
org.apache.flink.runtime.util.EnvironmentInformation.getHadoopUser(EnvironmentInformation.java:201)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.util.EnvironmentInformation.logEnvironmentInfo(EnvironmentInformation.java:399)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:67)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
2020-10-09 14:17:38,113 WARN  
org.apache.flink.runtime.util.EnvironmentInformation [] - Error while 
accessing user/group information via Hadoop utils.
java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_231]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_231]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_231]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_231]
at 
org.apache.flink.runtime.util.EnvironmentInformation.getHadoopUser(EnvironmentInformation.java:201)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.util.EnvironmentInformation.logEnvironmentInfo(EnvironmentInformation.java:399)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:67)
 

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-09 Thread Yun Gao
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not s​upport checkpoints after some tasks finished, which causes 
some problems for bounded or mixed jobs:
1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be 
replayed before committed to external systems in streaming mode. If sources are 
bounded and checkpoints are disabled after some tasks are finished, the data 
sent after the last checkpoint would always not be able to be committed. This 
issue has already been reported some times in the user ML[2][3][4] and is 
future brought up when working on FLIP-143: Unified Sink API [5]. 
2. The jobs with both bounded and unbounded sources might have to 
replay a large amount of records after failover due to no periodic checkpoints 
are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 

Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
 --Original Mail --
Sender:Yun Gao 
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev , User-Flink 
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not support checkpoints after some tasks finished, which causes some 
problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
before committed to external systems in streaming mode. If sources are bounded 
and checkpoints are disabled after some tasks are finished, the data sent after 
the last checkpoint would always not be able to be committed. This issue has 
already been reported some times in the user ML[2][3][4] and is future brought 
up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large 
amount of records after failover due to no periodic checkpoints are taken after 
the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

FlinkJobNotFoundException????

2020-10-09 Thread ??????
mysql??MySQLlinux??
py4j.protocol.Py4JJavaError: An error occurred while calling o6.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (4d6b1273229e0e16fa433c652b5cb74d)





??


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
batch_source_ddl = """
CREATE TABLE mh_source_tab (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
ts BIGINT,
rt Decimal(6,2),
time1 VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**', 
'table-name' = 'nj_mh_test', --??
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'


)


warn_alarm_mh_ddl = """
CREATE TABLE warn_alarm_mh_sink (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
extremum Decimal(6,2),
PRIMARY KEY (lid,dir,posid,poleId) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**',
'table-name' = 'warn_mh_alarm_result', --??
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(mh_source_tab)
t_env.execute_sql(warn_alarm_mh_ddl)


def threshold_alarm(delta_thres):
  source = t_env.from_path("mh_source_tab") \
.where("rt < -1")\
.group_by("lid, dir, posid, km, poleId")\
.select("lid, dir, posid, km, poleId, max(rt) as 
max_rt, min(rt) as min_rt, max(rt)-min(rt) as extremum")\
.where("extremum "+str(delta_thres))\
.select("lid, dir, posid, km, poleId, extremum")
  source.execute_insert("warn_alarm_mh_sink") \
 .get_job_client() \
 .get_job_execution_result() \
 .result()
if __name__ == '__main__':
  threshold_alarm(delta_thres=0.5)

[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-09 Thread Yun Gao
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not support checkpoints after some tasks finished, which causes some 
problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
before committed to external systems in streaming mode. If sources are bounded 
and checkpoints are disabled after some tasks are finished, the data sent after 
the last checkpoint would always not be able to be committed. This issue has 
already been reported some times in the user ML[2][3][4] and is future brought 
up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large 
amount of records after failover due to no periodic checkpoints are taken after 
the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

Re: Network issue leading to "No pooled slot available"

2020-10-09 Thread Khachatryan Roman
Thanks for checking this workaround!

I've created a jira issue [1] to check if AWS SDK version can be upgraded
in Flink distribution.

Regards,
Roman


On Fri, Oct 9, 2020 at 12:54 AM Dan Diephouse  wrote:

> Well, I just dropped in the latest Amazon 1.11.878 SDK and now it
> appears to respect interrupts in a test case I created. (the test fails
> with the SDK that is in use by Flink)
>
> I will try it in a full fledged Flink environment and report back.
>
> On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse  wrote:
>
>> Did some digging... definitely appears that the Amazon SDK definitely is
>> not picking up the interrupt.  I will try playing with the connection
>> timeout. Hadoop defaults it to 20 ms, which may be part of the problem.
>> Anyone have any other ideas?
>>
>> In theory this should be fixed by SDK v2 which uses NIO, but I don't
>> think I'm up for all the changes that would involve in the downstream
>> components.
>>
>> On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse  wrote:
>>
>>> Using the latest - 1.11.2.
>>>
>>> I would assume the interruption is being ignored in the Hadoop / S3
>>> layer. I was looking at the defaults and (if I understood correctly) the
>>> client will retry 20 times. Which would explain why it never gets
>>> cancelled...
>>>
>>> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi Dan Diephouse,

 From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
 where 2 is a bug.
 It's unclear though where the interruption is ignored (Flink/Hadoop
 FS/S3 client).

 What version of Flink are you using?

 Regards,
 Roman


 On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse  wrote:

> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
> If/when the network connection has issues, it seems to put Flink into an
> irrecoverable state. Am I understanding this correctly? Any suggestions on
> how to troubleshoot / fix?
>
> Here is what I'm observing:
>
> *1. Network is dropped *
>
> *2. S3 connections do not exit gracefully*
>
> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
> not react to cancelling signal for 30 seconds, but is stuck in method:
>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
> java.base@14.0.2
> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
> java.base@14.0.2
> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
> java.base@14.0.2
> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
> java.base@14.0.2
> /java.net.Socket$SocketInputStream.read(Socket.java:982)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
> java.base@14.0.2
> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>
> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>
> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>
> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
> java.base@14.0.2
> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>
> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>
> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>
> 

Any testing issues when using StreamTableEnvironment.createTemporaryView?

2020-10-09 Thread Dan Hill
*Summary*
I'm hitting an error when running a test that is related to using
createTemporaryView to convert a Protobuf input stream to Flink Table API.
I'm not sure how to debug "SourceConversion$5.processElement(Unknown
Source)" line.  Is this generated code?  How can I debug this?

Any help would be appreciated.  Thanks! - Dan

*Details*
My current input is a protocol buffer stream.  I convert it to the Table
API spec using createTemporaryView.  The code is hacky.  I want to get some
tests implemented before cleaning it up.

KeyedStream batchLogStream =
env.fromElements(BatchLog.class, new
LogGenerator.BatchLogIterator().next())
.keyBy((logRequest) -> logRequest.getUserId());

tableEnv.createTemporaryView(
"input_user",
batchLogStream.flatMap(new ToUsers()),
$("userId"),
$("timeEpochMillis"),
$("userTime").rowtime());

This appears to work in my prototype (maybe serialization is broken).  In a
Flink test, I hit the following error.

org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map ->
SourceConversion(table=[default.mydb.input_user], fields=[userId,
timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) ->
StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from
RUNNING to FAILED. java.lang.NullPointerException
at SourceConversion$5.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18)
at
ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:834)


I wasn't able to find this exact stacktrace when looking on Google.


Re: checkpoint失败导致watermark不更新问题

2020-10-09 Thread shizk233
hi,这种情况似乎像是反压造成的,数据流反压会导致算子来不及处理checkpoint事件,watermark消息也会因为反压无法发送到下游算子。

建议观察下反压的情况[1],如果是这样的话,再针对反压源头进行优化处理。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/back_pressure.html


restart  于2020年10月9日周五 上午11:48写道:

> 大家好,请假一个问题:
>场景是这样的,flink消费kafka,清洗后按分、时、天的维度(其中小时的聚合数据源来自分钟的聚合输出,天类似)进行聚合后sink
>
> es,但是经常会在job跑几个小时后,出现大量的checkpoint失败,同时watermark不更新的现象,中间多次调整checkpoint的相关参数,也参照了网上的相关分析(
> http://pangongsen.com/2018/04/25/Flink%E6%B0%B4%E4%BD%8D%E7%BA%BF%E4%B8%8D%E8%A7%A6%E5%8F%91%E7%9A%84%E5%87%A0%E7%82%B9%E6%80%BB%E7%BB%93/
> ),
> 仍然没找到问题点,flink接触时间不长,有点束手无策了。
> task manager log:
> 2020-09-30 15:03:19,708 INFO  org.apache.hadoop.hdfs.DFSClient
>
> - Could not complete
>
> /flink/state/error/5d895e1de420b44791b0850c23004b0e/chk-41/5991fd64-9d2c-4ac2-b08f-859efba879ee
> retrying...
> 2020-09-30 15:13:25,236 WARN  org.apache.hadoop.hdfs.DataStreamer
>
> - Slow waitForAckedSeqno took 38022ms (threshold=3ms). File being
> written:
>
> /flink/state/infrastructure_dep/error/5d895e1de420b44791b0850c23004b0e/shared/57f53e0d-3d58-4a5a-893d-50667065d975,
> block: BP-1864147273-172.20.3.102-1555051764064:blk_2307424648_1233776433,
> Write pipeline datanodes:
> [DatanodeInfoWithStorage[172.20.1.61:1004
> ,DS-e84bae2e-224a-4826-8f65-3a4bd3bd481a,DISK],
> DatanodeInfoWithStorage[172.20.1.27:1004
> ,DS-c69e89a7-88cd-45f0-b261-57fe9fb1c7d8,DISK],
> DatanodeInfoWithStorage[172.20.1.24:1004
> ,DS-cd0a08e6-ca5c-4d56-8c9d-fdd331d0b270,DISK]].
> checkpoint相关代码:
> RocksDBStateBackend rocksDBStateBackend = new
> RocksDBStateBackend(stateBackendConfig.getCheckpointDataUri(), true);
>
>
> rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend((StateBackend)rocksDBStateBackend);
> env.enableCheckpointing(5*6, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(90);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6);
> env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> watermark使用的是BoundedOutOfOrdernessTimestampExtractor。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>