flink on yarn??????????log4j????

2021-07-22 Thread comsir
hi all
flink??log4jlog4j
??
 ??

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread Gen Luo
Thanks for sharing the thoughts Chesnay, and I overall agree with you. We
can't give a default value suitable for all jobs, but we can figure out
whether the current default value is too large for most of the jobs, and
that is the guideline for this topic. Configurability is reserved for the
others.

But maybe we should list the benefits of the change again. The motivation
presented in the FLIP is mostly about why we can, but may not be enough
about why we should, considering that FLINK-23216 will be fixed.

By the way, I'd like to make sure if the behavior of jobs using default
values will be changed when they upgrade their Flink version and resume
from savepoints. If so, we have to give a big warning  to users if we
finally decide to change this, since the change is silent but can be
critical in some cases.

@Till
> I am, however, not so sure whether we can give hard threshold
like 5000 tasks
A hard threshold can't apply to all cases indeed. I meant to suggest a
configuration validation phase, where we can check if the configuration is
potentially not suitable for the job, or if some options are not able to be
used together with other options. Users will be warned and if anything
wrong happens, this will be one of the debug guides. This is a little like
Chesnay's suggest, but in compiling phase. Large scale jobs with too short
heartbeat intervals is one of the cases, but I agree it's hard to decide
when we should show the warning.

On Thu, Jul 22, 2021 at 11:09 PM Chesnay Schepler 
wrote:

> I'm wondering if this discussion isn't going in the wrong direction.
> It is clear that we cannot support all use-case with the defaults, so
> let's not try that. We won't find it.
> And I would argue that is also not their purpose; they are configurable
> for a reason.
> I would say the defaults should provide a good experience to users
> starting out with Flink.
>
> Because, users with heavy workloads don't deploy Flink over night. They
> approach a production deployment step-by-step, for the very purpose of
> working out kinks in the configuration/stability.
> If in this process the default heartbeat configurations ends up being too
> harsh, *then that is not a problem*.
> *If *there is sufficient information presented to the user on how to
> transition to a working setup, that is.
> So far the purpose of that was the production checklist to some extent,
> but maybe we should also add a separate page for working under bigger loads.
>
> A far greater issue in my opinion is that users don't get warnings that
> something is about to go wrong.
> The heartbeat system right now is fairly binary. It works fine while the
> configuration is suitable, until it no longer is and everything goes up in
> flames.
>
> If we were to log warnings if Flink was close to hitting the heartbeat
> timeout, or even expose metrics for heartbeat round-trip times or similar,
> I think we could alleviate many concerns that people have.
>
> Or we just provide 2 configs with the distribution, one for starting out,
> one for more serious workloads. ¯\_(ツ)_/¯
> On 22/07/2021 13:22, 刘建刚 wrote:
>
> Thanks, Till. There are many reasons to reduce the heartbeat interval and
> timeout. But I am not sure what values are suitable. In our cases, the GC
> time and big job can be related factors. Since most flink jobs are pipeline
> and a total failover can cost some time, we should tolerate some stop-world
> situations. Also, I think that the FLINK-23216 should be solved to detect
> lost container fast and react to it. For my side, I suggest
> reducing the values gradually.
>
> Till Rohrmann  于2021年7月22日周四 下午5:33写道:
>
>> Thanks for your inputs Gen and Arnaud.
>>
>> I do agree with you, Gen, that we need better guidance for our users on
>> when to change the heartbeat configuration. I think this should happen in
>> any case. I am, however, not so sure whether we can give hard threshold
>> like 5000 tasks, for example, because as Arnaud said it strongly depends
>> on
>> the workload. Maybe we can explain it based on symptoms a user might
>> experience and what to do then.
>>
>> Concerning your workloads, Arnaud, I'd be interested to learn a bit more.
>> The user code runs in its own thread. This means that its operation won't
>> block the main thread/heartbeat. The only thing that can happen is that
>> the
>> user code starves the heartbeat in terms of CPU cycles or causes a lot of
>> GC pauses. If you are observing the former problem, then we might think
>> about changing the priorities of the respective threads. This should then
>> improve Flink's stability for these workloads and a shorter heartbeat
>> timeout should be possible.
>>
>> Also for the RAM-cached repositories, what exactly is causing the
>> heartbeat
>> to time out? Is it because you have a lot of GC or that the heartbeat
>> thread does not get enough CPU cycles?
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud 
>> wrote:
>>
>> > Hello,
>> >
>> >
>> >
>> > From a user 

回复:flink sql 依赖隔离

2021-07-22 Thread silence

这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载
udf和sql jar之间、udf和udf之间都可能会有依赖冲突,
目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突
--
发件人:Michael Ran 
发送时间:2021年7月22日(星期四) 20:07
收件人:user-zh ; silence 
主 题:Re:flink sql 依赖隔离

通过任务进行隔离引用呗。你们美团已经是k8s了吧?
在 2021-07-05 14:06:53,"silence"  写道:
>请教大家目前flink sql有没有办法做到依赖隔离
>比如connector,format,udf(这个最重要)等,
>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划



Re:flink时态表:两个Hbase左关联有报错情况

2021-07-22 Thread Michael Ran
java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: 
org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils缺jar
在 2021-07-14 09:39:53,"xie_guo...@163.com"  写道:
>您好,有关flinkSQL时态表左关联时遇到了问题。
>具体场景:
>
> 两个Hbase表做时态表关联,左表定义了处理时间,右表原本不变(rowkey,cf),FLINK版本1.13.0,报错情况如下,麻烦问一下大家又遇到类似问题嘛,该怎么处理!
>
>2021-07-14 09:22:20.592 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
>2021-07-14 09:22:20.596 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
>LookupJoin(table=[default_catalog.default_database.hbase_source_pollution_dwb_enterprise_wodhz],
> joinType=[LeftOuterJoin], async=[true], lookup=[code=$f4], select=[code, 
>data1, data2, p, $f4, code0, data]) -> Calc(select=[code, 
>ROW(,,data.activ) -> NotNullEnforcer(fields=[EXPR$1, EXPR$2, p, EXPR$4]) 
>(3/3)#3 (4ada188e117c67ccd9bd6488ae95216a) switched from RUNNING to FAILED 
>with failure cause: java.util.concurrent.ExecutionException: 
>java.lang.NoClassDefFoundError: 
>org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
>at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928
>at 
>org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
>at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:448)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:671)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>at java.lang.Thread.run(Thread.java:748)
>Caused by: java.lang.NoClassDefFoundError: 
>org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
>at 
>org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
>at 
>org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:251)
>at LookupFunction$3.close(Unknown Source
>
>ps:同样的代码,左表换成Kafka,能够正常运行。网上搜索了一下相关报错,好像时缺包,目前pom文件是有hbase-client、hbase-commen、flink-sql-connector-hbase。
>
>
>
>Sincerely,
>xie_guo...@163.com


Re:flink ??????????????

2021-07-22 Thread Michael Ran
??
?? 2021-07-13 17:31:19??"" <1510603...@qq.com.INVALID> ??
>Hi All??
>
>
>  ??Flink 
>checkpoint??2min??
>??2min??  ??
> 
>
>
>
>The program finished with the following exception:
>
>
>org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
> failed.
>   at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
>   at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>   at 
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>Caused by: java.util.concurrent.TimeoutException
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)


Re: Flink TaskManager container got restarted by K8S very frequently

2021-07-22 Thread Yang Wang
David's suggestion makes a lot of sense. You need to check whether the
TaskManager is killed by Kubernetes via `kubectl describe pod` for exit
code or the kubelet logs.

If it is not killed by Kubernetes, then it might crashed internally. Please
use `kubectl logs  --previous` to check the logs.

Best,
Yang

David Morávek  于2021年7月22日周四 下午8:04写道:

> If you run `kubectl describe pod ...` on the affected pod, you should see
> a reason why the previous pod has terminated (eg. OOM killed by Kubernetes).
>
> Best,
> D.
>
> On Thu, Jul 22, 2021 at 9:30 AM Fabian Paul 
> wrote:
>
>> CC user ML
>>
>>


Re:请教on yarn per job 作业采集日志进行监控方案

2021-07-22 Thread Michael Ran
简单的可以先监控任务状态,重启次数这种,消息延迟等这种能直接api拿到值的, 其他指标的比较麻烦,特别是task多了,算子多,还要合并
在 2021-07-21 11:32:31,"yihan xu"  写道:
>原本作业基本处于半裸奔的状态,最近线上出了一次小事故后,在考虑如何实时采集作业日志或者metric再配置告警。
>网上初步搜了一下,好像就是prometheus+grafana或者elk。
>
>请教各位大佬的项目目前都是用什么方式,我们小公司就我一个人搞flink,半路出家水平也有限,请大佬们推荐个易维护坑少点的方式?谢谢。
>
>发自我的iPhone
>
>
>发自我的iPhone


Re: Recover from savepoints with Kubernetes HA

2021-07-22 Thread Yang Wang
Please note that when the job is canceled, the HA data(including the
checkpoint pointers) stored in the ConfigMap/ZNode will be deleted.

But it is strange that the "-s/--fromSavepoint" does not take effect when
redeploying the Flink application. The JobManager logs could help a lot to
find the root cause.

Best,
Yang

Austin Cawley-Edwards  于2021年7月22日周四 下午11:09写道:

> Hey Thomas,
>
> Hmm, I see no reason why you should not be able to update the checkpoint
> interval at runtime, and don't believe that information is stored in a
> savepoint. Can you share the JobManager logs of the job where this is
> ignored?
>
> Thanks,
> Austin
>
> On Wed, Jul 21, 2021 at 11:47 AM Thms Hmm  wrote:
>
>> Hey Austin,
>>
>> Thanks for your help.
>>
>> I tried to change the checkpoint interval as example. The value for it
>> comes from an additional config file and is read and set within main() of
>> the job.
>>
>> The job is running in Application mode. Basically the same configuration
>> as from the official Flink website but instead of running the JobManager as
>> job it is created as deployment.
>>
>> For the redeployment of the job the REST API is triggered to create a
>> savepoint and cancel the job. After completion the deployment is updated
>> and the pods are recreated. The -s  Is always added as a
>> parameter to start the JobManager (standalone-job.sh). CLI is not involved.
>> We have automated these steps. But I tried the steps manually and have the
>> same results.
>>
>> I also tried to trigger a savepoint, scale the pods down, update the
>> start parameter with the recent savepoint and renamed
>> ‚kubernetes.cluster-id‘ as well as ‚high-availability.storageDir‘.
>>
>> When I trigger a savepoint with cancel, I also see that the HA config
>> maps are cleaned up.
>>
>>
>> Kr Thomas
>>
>> Austin Cawley-Edwards  schrieb am Mi. 21. Juli
>> 2021 um 16:52:
>>
>>> Hi Thomas,
>>>
>>> I've got a few questions that will hopefully help get to find an answer:
>>>
>>> What job properties are you trying to change? Something like parallelism?
>>>
>>> What mode is your job running in? i.e., Session, Per-Job, or
>>> Application?
>>>
>>> Can you also describe how you're redeploying the job? Are you using the
>>> Native Kubernetes integration or Standalone (i.e. writing k8s  manifest
>>> files yourself)? It sounds like you are using the Flink CLI as well, is
>>> that correct?
>>>
>>> Thanks,
>>> Austin
>>>
>>> On Wed, Jul 21, 2021 at 4:05 AM Thms Hmm  wrote:
>>>
 Hey,

 we have some application clusters running on Kubernetes and explore the
 HA mode which is working as expected. When we try to upgrade a job, e.g.
 trigger a savepoint, cancel the job and redeploy, Flink is not restarting
 from the savepoint we provide using the -s parameter. So all state is lost.

 If we just trigger the savepoint without canceling the job and redeploy
 the HA mode picks up from the latest savepoint.

 But this way we can not upgrade job properties as they were picked up
 from the savepoint as it seems.

 Is there any advice on how to do upgrades with HA enabled?

 Flink version is 1.12.2.

 Thanks for your help.

 Kr thomas

>>>


Re: Flink kafka自定义metrics在influxdb上解析失败

2021-07-22 Thread Caizhi Weng
Hi!

是通过什么方式拿到 sink table name 呢?从报错信息来看,拿到的可能并不是“a” 这样的 table name,而是一个 insert
语句的 digest,因此 metric 表名会变得比较复杂,导致解析错误。

当然也可以选择对 metric 表名进行转义。

Jimmy Zhang  于2021年7月23日周五 上午10:11写道:

> 大家好,Flink版本1.13.1。
> 我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
> 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。
>
> 但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where
> b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
> 但如果加上UDF,比如
> insert into a select
> CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME,
> 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where
> Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
> org.influxdb.InfluxDBException$UnableToParseException: partial write:
> unable to parse
> 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name=
> insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
> unable to parse ''tablename'\,
> 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\,
> 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)
>
> |
> Best,
> Jimmy
> |
>
> Signature is customized by Netease Mail Master


Re:Re: flink sql 依赖隔离

2021-07-22 Thread Michael Ran
我看阿里不是传到OSS,然后每个任务 image  拉取下来的时候顺便就把jar 拉进来了。完全隔离的,jar 也方便管理
在 2021-07-22 23:45:14,"Jeff Zhang"  写道:
>Zeppelin 支持依赖的动态加载
>https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
>https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2
>
>
>Michael Ran  于2021年7月22日周四 下午8:07写道:
>
>> 通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>> 在 2021-07-05 14:06:53,"silence"  写道:
>> >请教大家目前flink sql有没有办法做到依赖隔离
>> >比如connector,format,udf(这个最重要)等,
>> >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>> >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>>
>
>
>-- 
>Best Regards
>
>Jeff Zhang


Re: flink 1.13.1 再次查询row(a, b)生成的列时报错

2021-07-22 Thread Caizhi Weng
Hi!

关于问题一(如何配置 row 的字段名),可以通过 cast 语句:
select cast(row(f_sequence, f_random) as row) as c from
datagen

关于问题二,看起来确实是一个 bug,可以去 https://issues.apache.org/jira/projects/FLINK/issues
上开一个 ticket

Asahi Lee <978466...@qq.com.invalid> 于2021年7月22日周四 下午8:44写道:

> hi!
> 1. 我在使用flink 1.13.1 对通过row(a,b)生成的列再次查询时,发生错误,是否是一个bug?
> 2. 通过 row函数生成row类型的列时,无法指定row中字段的name,是否考虑支持name的配置?
> 我的示例程序如下:
>
>
> package test;
>
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.ResolvedSchema;
>
>
> public class DataGenTest {
>
>
>   public static void main(String[] args) {
> StreamExecutionEnvironment
> streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnvironment =
> StreamTableEnvironment.create(streamExecutionEnvironment);
>
>
> tableEnvironment.executeSql("CREATE TABLE
> datagen (\n" +
> " f_sequence
> INT,\n" +
> " f_random INT,\n"
> +
> " f_random_str
> STRING,\n" +
> " ts AS
> localtimestamp,\n" +
> " WATERMARK FOR ts
> AS ts\n" +
> ") WITH (\n" +
> " 'connector' =
> 'datagen',\n" +
> "
> 'rows-per-second'='5',\n" +
> "
> 'fields.f_sequence.kind'='sequence',\n" +
> "
> 'fields.f_sequence.start'='1',\n" +
> "
> 'fields.f_sequence.end'='1000',\n" +
> "
> 'fields.f_random.min'='1',\n" +
> "
> 'fields.f_random.max'='1000',\n" +
> "
> 'fields.f_random_str.length'='10'\n" +
> ")");
>
>
> Table table =
> tableEnvironment.sqlQuery("select row(f_sequence, f_random) as c from
> datagen");
> ResolvedSchema resolvedSchema =
> table.getResolvedSchema();
> System.out.println(resolvedSchema);
> /**
> * 打印如下:
> * (
> * `c` ROW<`EXPR$0` INT,
> `EXPR$1` INT NOT NULL
> * )
> *
> 问题一,通过使用row函数,我将两个列放入row类型中,那我如何配置row中字段的名称呢?,如下中的c1, c2:
> * (
> * `c` ROW<`c1` INT, `c2`
> INT NOT NULL
> * )
> */
>
>
> Table table1 =
> tableEnvironment.sqlQuery("select * from " + table);
> /**
> * 问题二,查询sql报错:
> * Exception in thread "main"
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> * validated type:
> *
> RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT
> NULL c) NOT NULL
> * converted type:
> * RecordType(RecordType(INTEGER EXPR$0,
> INTEGER EXPR$1) NOT NULL c) NOT NULL
> * rel:
> * LogicalProject(c=[ROW($0, $1)])
> *
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
> * 
> LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2],
> ts=[LOCALTIMESTAMP])
> *  
> LogicalTableScan(table=[[default_catalog, default_database, datagen]])
> */
> ResolvedSchema resolvedSchema1 =
> table1.getResolvedSchema();
> System.out.println(resolvedSchema1);
>
>
> table.execute().print();
>
>
>
>
>   }
>
>
> }


Re: k8s session模式SQLclient怎样连接

2021-07-22 Thread Caizhi Weng
Hi!

可以考虑把 k8s session 的 flink rest api 地址暴露出来,然后客户端把 execution.target 设为
remote,rest.address 和 rest.port 设为相应地址。

maker_d...@foxmail.com  于2021年7月22日周四 下午9:46写道:

> 大家好,
> 我将flink部署在k8s集群上,使用官方文档上的session模式进行部署,可以正常提交任务。
> 现在我想使用sqlclient,在提交任务时提示 :
> [ERROR] Could not execute SQL statement. Reason:
> java.net.UnknownHostException: flink-cluster
> 请问大家,如何使用sqlclient连接k8s上的flink session。
> flink版本 1.12.4.
>
>
>
> maker_d...@foxmail.com
>


Flink kafka自定义metrics在influxdb上解析失败

2021-07-22 Thread Jimmy Zhang
大家好,Flink版本1.13.1。
我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where 
b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
但如果加上UDF,比如
insert into a select 
CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 
'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where 
Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to 
parse 
'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= 
insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
unable to parse ''tablename'\, 
'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 
'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)

|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

Observability around Flink Pipeline/stateful functions

2021-07-22 Thread Deepak Sharma
@d...@spark.apache.org  @user 
I am looking for an example around the observability framework for Apache
Flink pipelines.
This could be message tracing across multiple flink pipelines or query on
the past state of a message that was processed by any flink pipeline.
If anyone has done similar work and can share any
pointers(blogs/books/writeup) , it would really help.

Thanks a lot in advance.

--Deepak


Re: flink sql 依赖隔离

2021-07-22 Thread Jeff Zhang
Zeppelin 支持依赖的动态加载
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2


Michael Ran  于2021年7月22日周四 下午8:07写道:

> 通过任务进行隔离引用呗。你们美团已经是k8s了吧?
> 在 2021-07-05 14:06:53,"silence"  写道:
> >请教大家目前flink sql有没有办法做到依赖隔离
> >比如connector,format,udf(这个最重要)等,
> >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
> >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread Chesnay Schepler

I'm wondering if this discussion isn't going in the wrong direction.
It is clear that we cannot support all use-case with the defaults, so 
let's not try that. We won't find it.
And I would argue that is also not their purpose; they are configurable 
for a reason.
I would say the defaults should provide a good experience to users 
starting out with Flink.


Because, users with heavy workloads don't deploy Flink over night. They 
approach a production deployment step-by-step, for the very purpose of 
working out kinks in the configuration/stability.
If in this process the default heartbeat configurations ends up being 
too harsh, /then that is not a problem/.
*If *there is sufficient information presented to the user on how to 
transition to a working setup, that is.
So far the purpose of that was the production checklist to some extent, 
but maybe we should also add a separate page for working under bigger loads.


A far greater issue in my opinion is that users don't get warnings that 
something is about to go wrong.
The heartbeat system right now is fairly binary. It works fine while the 
configuration is suitable, until it no longer is and everything goes up 
in flames.


If we were to log warnings if Flink was close to hitting the heartbeat 
timeout, or even expose metrics for heartbeat round-trip times or 
similar, I think we could alleviate many concerns that people have.


Or we just provide 2 configs with the distribution, one for starting 
out, one for more serious workloads. ¯\_(ツ)_/¯


On 22/07/2021 13:22, 刘建刚 wrote:
Thanks, Till. There are many reasons to reduce the heartbeat interval 
and timeout. But I am not sure what values are suitable. In our cases, 
the GC time and big job can be related factors. Since most flink jobs 
are pipeline and a total failover can cost some time, we should 
tolerate some stop-world situations. Also, I think that 
the FLINK-23216 should be solved to detect lost container fast and 
react to it. For my side, I suggest reducing the values gradually.


Till Rohrmann mailto:trohrm...@apache.org>> 
于2021年7月22日周四 下午5:33写道:


Thanks for your inputs Gen and Arnaud.

I do agree with you, Gen, that we need better guidance for our
users on
when to change the heartbeat configuration. I think this should
happen in
any case. I am, however, not so sure whether we can give hard
threshold
like 5000 tasks, for example, because as Arnaud said it strongly
depends on
the workload. Maybe we can explain it based on symptoms a user might
experience and what to do then.

Concerning your workloads, Arnaud, I'd be interested to learn a
bit more.
The user code runs in its own thread. This means that its
operation won't
block the main thread/heartbeat. The only thing that can happen is
that the
user code starves the heartbeat in terms of CPU cycles or causes a
lot of
GC pauses. If you are observing the former problem, then we might
think
about changing the priorities of the respective threads. This
should then
improve Flink's stability for these workloads and a shorter heartbeat
timeout should be possible.

Also for the RAM-cached repositories, what exactly is causing the
heartbeat
to time out? Is it because you have a lot of GC or that the heartbeat
thread does not get enough CPU cycles?

Cheers,
Till

On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud
mailto:al...@bouyguestelecom.fr>>
wrote:

> Hello,
>
>
>
> From a user perspective: we have some (rare) use cases where we use
> “coarse grain” datasets, with big beans and tasks that do
lengthy operation
> (such as ML training). In these cases we had to increase the
time out to
> huge values (heartbeat.timeout: 50) so that our app is not
killed.
>
> I’m aware this is not the way Flink was meant to be used, but it’s a
> convenient way to distribute our workload on datanodes without
having to
> use another concurrency framework (such as M/R) that would
require the
> recoding of sources and sinks.
>
>
>
> In some other (most common) cases, our tasks do some R/W accesses to
> RAM-cached repositories backed by a key-value storage such as
Kudu (or
> Hbase). If most of those calls are very fast, sometimes when the
system is
> under heavy load they may block more than a few seconds, and
having our app
> killed because of a short timeout is not an option.
>
>
>
> That’s why I’m not in favor of very short timeouts… Because in my
> experience it really depends on what user code does in the tasks. (I
> understand that normally, as user code is not a JVM-blocking
activity such
> as a GC, it should have no impact on heartbeats, but from
experience, it
> really does)
>
>
>
> Cheers,
>
> Arnaud
>
>
>
>
>
> *De :* Gen Luo 

Re: Recover from savepoints with Kubernetes HA

2021-07-22 Thread Austin Cawley-Edwards
Hey Thomas,

Hmm, I see no reason why you should not be able to update the checkpoint
interval at runtime, and don't believe that information is stored in a
savepoint. Can you share the JobManager logs of the job where this is
ignored?

Thanks,
Austin

On Wed, Jul 21, 2021 at 11:47 AM Thms Hmm  wrote:

> Hey Austin,
>
> Thanks for your help.
>
> I tried to change the checkpoint interval as example. The value for it
> comes from an additional config file and is read and set within main() of
> the job.
>
> The job is running in Application mode. Basically the same configuration
> as from the official Flink website but instead of running the JobManager as
> job it is created as deployment.
>
> For the redeployment of the job the REST API is triggered to create a
> savepoint and cancel the job. After completion the deployment is updated
> and the pods are recreated. The -s  Is always added as a
> parameter to start the JobManager (standalone-job.sh). CLI is not involved.
> We have automated these steps. But I tried the steps manually and have the
> same results.
>
> I also tried to trigger a savepoint, scale the pods down, update the start
> parameter with the recent savepoint and renamed ‚kubernetes.cluster-id‘ as
> well as ‚high-availability.storageDir‘.
>
> When I trigger a savepoint with cancel, I also see that the HA config maps
> are cleaned up.
>
>
> Kr Thomas
>
> Austin Cawley-Edwards  schrieb am Mi. 21. Juli
> 2021 um 16:52:
>
>> Hi Thomas,
>>
>> I've got a few questions that will hopefully help get to find an answer:
>>
>> What job properties are you trying to change? Something like parallelism?
>>
>> What mode is your job running in? i.e., Session, Per-Job, or Application?
>>
>> Can you also describe how you're redeploying the job? Are you using the
>> Native Kubernetes integration or Standalone (i.e. writing k8s  manifest
>> files yourself)? It sounds like you are using the Flink CLI as well, is
>> that correct?
>>
>> Thanks,
>> Austin
>>
>> On Wed, Jul 21, 2021 at 4:05 AM Thms Hmm  wrote:
>>
>>> Hey,
>>>
>>> we have some application clusters running on Kubernetes and explore the
>>> HA mode which is working as expected. When we try to upgrade a job, e.g.
>>> trigger a savepoint, cancel the job and redeploy, Flink is not restarting
>>> from the savepoint we provide using the -s parameter. So all state is lost.
>>>
>>> If we just trigger the savepoint without canceling the job and redeploy
>>> the HA mode picks up from the latest savepoint.
>>>
>>> But this way we can not upgrade job properties as they were picked up
>>> from the savepoint as it seems.
>>>
>>> Is there any advice on how to do upgrades with HA enabled?
>>>
>>> Flink version is 1.12.2.
>>>
>>> Thanks for your help.
>>>
>>> Kr thomas
>>>
>>


Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Timo Walther
Thanks, this should definitely work with the pre-packaged connectors of 
Ververica platform.


I guess we have to investigate what is going on. Until then, a 
workaround could be to add Hadoop manually and set the HADOOP_CLASSPATH 
environment variable. The root cause seems that Hadoop cannot be found.


Alternatively, you could also build a custom image and include Hadoop in 
the lib folder of Flink:


https://docs.ververica.com/v1.3/platform/installation/custom_images.html

I hope this helps. I will get back to you if we have a fix ready.

Regards,
Timo



On 22.07.21 14:30, Natu Lauchande wrote:

Sure.

That's how the ddl table looks like:

CREATETABLEtablea (

`a` BIGINT,

`b` BIGINT,

`c` BIGINT

)

COMMENT ''

WITH(

'auto-compaction'='false',

'connector'='filesystem',

'format'='parquet',

'parquet.block.size'='134217728',

'parquet.compression'='SNAPPY',

'parquet.dictionary.page.size'='1048576',

'parquet.enable.dictionary'='true',

'parquet.page.size'='1048576',

'parquet.writer.max-padding'='2097152',

'path'='s3a://test/test’,

'sink.partition-commit.delay'='1 h',

'sink.partition-commit.policy.kind'='success-file',

'sink.partition-commit.success-file.name 
'='_SUCCESS',


'sink.partition-commit.trigger'='process-time',

'sink.rolling-policy.check-interval'='20 min',

'sink.rolling-policy.file-size'='128MB',

'sink.rolling-policy.rollover-interval'='2 h'

);



When a change the connector to a blackhole it immediately works without 
errors. I have the redacted the names and paths.




Thanks,
Natu


On Thu, Jul 22, 2021 at 2:24 PM Timo Walther > wrote:


Maybe you can share also which connector/format you are using? What is
the DDL?

Regards,
Timo


On 22.07.21 14:11, Natu Lauchande wrote:
 > Hey Timo,
 >
 > Thanks for the reply.
 >
 > No custom file as we are using Flink SQL and submitting the job
directly
 > through the SQL Editor UI. We are using Flink 1.13.1 as the
supported
 > flink version. No custom code all through Flink SQL on UI no jars.
 >
 > Thanks,
 > Natu
 >
 > On Thu, Jul 22, 2021 at 2:08 PM Timo Walther mailto:twal...@apache.org>
 > >> wrote:
 >
 >     Hi Natu,
 >
 >     Ververica Platform 2.5 has updated the bundled Hadoop version
but this
 >     should not result in a NoClassDefFoundError exception. How
are you
 >     submitting your SQL jobs? You don't use Ververica's SQL
service but
 >     have
 >     built a regular JAR file, right? If this is the case, can you
share
 >     your
 >     pom.xml file with us? The Flink version stays constant at 1.12?
 >
 >     Regards,
 >     Timo
 >
 >     On 22.07.21 12:22, Natu Lauchande wrote:
 >      > Good day Flink community,
 >      >
 >      > Apache Flink/Ververica Community Edition - Question
 >      >
 >      >
 >      > I am having an issue with my Flink SQL jobs since updating
 >     from Flink
 >      > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs
running on
 >      > parquet and S3 i am getting the following error continuously:
 >      >
 >      > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
 >      > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local
(dataPort=39309).
 >      >
 >      > java.lang.NoClassDefFoundError:
org/apache/hadoop/conf/Configuration
 >      >
 >      > at java.lang.Class.getDeclaredConstructors0(Native Method)
 >     ~[?:1.8.0_292]
 >      >
 >      > at
java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
 >      > ~[?:1.8.0_292]
 >      >
 >      > at java.lang.Class.getDeclaredConstructors(Class.java:2020)
 >     ~[?:1.8.0_292]
 >      >
 >      > **
 >      >
 >      > at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
 >      > ~[?:1.8.0_292]
 >      >
 >      > at
 >      >
 >   
  org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)

 >
 >      > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >      >
 >      > at
 >      >
 >   
  org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)

 >
 >      > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >      >
 >      > at
 >      >
 >   
  org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)

 >
 >      > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >      >
 >      > at
 >      >
 >   
  org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)

 >
 >      > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >      >

k8s session模式SQLclient怎样连接

2021-07-22 Thread maker_d...@foxmail.com
大家好,
我将flink部署在k8s集群上,使用官方文档上的session模式进行部署,可以正常提交任务。
现在我想使用sqlclient,在提交任务时提示 :
[ERROR] Could not execute SQL statement. Reason:
java.net.UnknownHostException: flink-cluster
请问大家,如何使用sqlclient连接k8s上的flink session。
flink版本 1.12.4.



maker_d...@foxmail.com


flink 1.13.1 ????????row(a, b)??????????????

2021-07-22 Thread Asahi Lee
hi??
1. flink 1.13.1 
??row(a,b)bug??
2.  
rowrowrowname??name
??


package test;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ResolvedSchema;


public class DataGenTest {


  public static void main(String[] args) {
StreamExecutionEnvironment 
streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecutionEnvironment);


tableEnvironment.executeSql("CREATE TABLE datagen 
(\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str 
STRING,\n" +
" ts AS 
localtimestamp,\n" +
" WATERMARK FOR ts AS 
ts\n" +
") WITH (\n" +
" 'connector' = 
'datagen',\n" +
" 
'rows-per-second'='5',\n" +
" 
'fields.f_sequence.kind'='sequence',\n" +
" 
'fields.f_sequence.start'='1',\n" +
" 
'fields.f_sequence.end'='1000',\n" +
" 
'fields.f_random.min'='1',\n" +
" 
'fields.f_random.max'='1000',\n" +
" 
'fields.f_random_str.length'='10'\n" +
")");


Table table = tableEnvironment.sqlQuery("select 
row(f_sequence, f_random) as c from datagen");
ResolvedSchema resolvedSchema = 
table.getResolvedSchema();
System.out.println(resolvedSchema);
/**
* ??
* (
* `c` ROW<`EXPR$0` INT, `EXPR$1` 
INT NOT NULL
* )
* 
rowrowrow??c1,
 c2??
* (
* `c` ROW<`c1` INT, `c2` INT 
NOT NULL
* )
*/


Table table1 = tableEnvironment.sqlQuery("select * 
from " + table);
/**
* sql??
* Exception in thread "main" 
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
* validated type:
* 
RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL 
c) NOT NULL
* converted type:
* RecordType(RecordType(INTEGER EXPR$0, 
INTEGER EXPR$1) NOT NULL c) NOT NULL
* rel:
* LogicalProject(c=[ROW($0, $1)])
* 
LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
*  
LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], 
ts=[LOCALTIMESTAMP])
*   
LogicalTableScan(table=[[default_catalog, default_database, datagen]])
*/
ResolvedSchema resolvedSchema1 = 
table1.getResolvedSchema();
System.out.println(resolvedSchema1);


table.execute().print();




  }


}

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Natu Lauchande
Sure.

That's how the ddl table looks like:

CREATE TABLE tablea (

  `a` BIGINT,

  `b` BIGINT,

  `c` BIGINT

)

COMMENT ''

WITH (

  'auto-compaction' = 'false',

  'connector' = 'filesystem',

  'format' = 'parquet',

  'parquet.block.size' = '134217728',

  'parquet.compression' = 'SNAPPY',

  'parquet.dictionary.page.size' = '1048576',

  'parquet.enable.dictionary' = 'true',

  'parquet.page.size' = '1048576',

  'parquet.writer.max-padding' = '2097152',

  'path' = 's3a://test/test’,

  'sink.partition-commit.delay' = '1 h',

  'sink.partition-commit.policy.kind' = 'success-file',

  'sink.partition-commit.success-file.name' = '_SUCCESS',

  'sink.partition-commit.trigger' = 'process-time',

  'sink.rolling-policy.check-interval' = '20 min',

  'sink.rolling-policy.file-size' = '128MB',

  'sink.rolling-policy.rollover-interval' = '2 h'

);



When a change the connector to a blackhole it immediately works without
errors. I have the redacted the names and paths.



Thanks,
Natu

On Thu, Jul 22, 2021 at 2:24 PM Timo Walther  wrote:

> Maybe you can share also which connector/format you are using? What is
> the DDL?
>
> Regards,
> Timo
>
>
> On 22.07.21 14:11, Natu Lauchande wrote:
> > Hey Timo,
> >
> > Thanks for the reply.
> >
> > No custom file as we are using Flink SQL and submitting the job directly
> > through the SQL Editor UI. We are using Flink 1.13.1 as the supported
> > flink version. No custom code all through Flink SQL on UI no jars.
> >
> > Thanks,
> > Natu
> >
> > On Thu, Jul 22, 2021 at 2:08 PM Timo Walther  > > wrote:
> >
> > Hi Natu,
> >
> > Ververica Platform 2.5 has updated the bundled Hadoop version but
> this
> > should not result in a NoClassDefFoundError exception. How are you
> > submitting your SQL jobs? You don't use Ververica's SQL service but
> > have
> > built a regular JAR file, right? If this is the case, can you share
> > your
> > pom.xml file with us? The Flink version stays constant at 1.12?
> >
> > Regards,
> > Timo
> >
> > On 22.07.21 12:22, Natu Lauchande wrote:
> >  > Good day Flink community,
> >  >
> >  > Apache Flink/Ververica Community Edition - Question
> >  >
> >  >
> >  > I am having an issue with my Flink SQL jobs since updating
> > from Flink
> >  > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on
> >  > parquet and S3 i am getting the following error continuously:
> >  >
> >  > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
> >  > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local
> (dataPort=39309).
> >  >
> >  > java.lang.NoClassDefFoundError:
> org/apache/hadoop/conf/Configuration
> >  >
> >  > at java.lang.Class.getDeclaredConstructors0(Native Method)
> > ~[?:1.8.0_292]
> >  >
> >  > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
> >  > ~[?:1.8.0_292]
> >  >
> >  > at java.lang.Class.getDeclaredConstructors(Class.java:2020)
> > ~[?:1.8.0_292]
> >  >
> >  > **
> >  >
> >  > at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> >  > ~[?:1.8.0_292]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> 

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Timo Walther
Maybe you can share also which connector/format you are using? What is 
the DDL?


Regards,
Timo


On 22.07.21 14:11, Natu Lauchande wrote:

Hey Timo,

Thanks for the reply.

No custom file as we are using Flink SQL and submitting the job directly 
through the SQL Editor UI. We are using Flink 1.13.1 as the supported 
flink version. No custom code all through Flink SQL on UI no jars.


Thanks,
Natu

On Thu, Jul 22, 2021 at 2:08 PM Timo Walther > wrote:


Hi Natu,

Ververica Platform 2.5 has updated the bundled Hadoop version but this
should not result in a NoClassDefFoundError exception. How are you
submitting your SQL jobs? You don't use Ververica's SQL service but
have
built a regular JAR file, right? If this is the case, can you share
your
pom.xml file with us? The Flink version stays constant at 1.12?

Regards,
Timo

On 22.07.21 12:22, Natu Lauchande wrote:
 > Good day Flink community,
 >
 > Apache Flink/Ververica Community Edition - Question
 >
 >
 > I am having an issue with my Flink SQL jobs since updating
from Flink
 > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on
 > parquet and S3 i am getting the following error continuously:
 >
 > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
 > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309).
 >
 > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
 >
 > at java.lang.Class.getDeclaredConstructors0(Native Method)
~[?:1.8.0_292]
 >
 > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
 > ~[?:1.8.0_292]
 >
 > at java.lang.Class.getDeclaredConstructors(Class.java:2020)
~[?:1.8.0_292]
 >
 > **
 >
 > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
 > ~[?:1.8.0_292]
 >
 > at
 >

org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >


Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Natu Lauchande
Hey Timo,

Thanks for the reply.

No custom file as we are using Flink SQL and submitting the job directly
through the SQL Editor UI. We are using Flink 1.13.1 as the supported flink
version. No custom code all through Flink SQL on UI no jars.

Thanks,
Natu

On Thu, Jul 22, 2021 at 2:08 PM Timo Walther  wrote:

> Hi Natu,
>
> Ververica Platform 2.5 has updated the bundled Hadoop version but this
> should not result in a NoClassDefFoundError exception. How are you
> submitting your SQL jobs? You don't use Ververica's SQL service but have
> built a regular JAR file, right? If this is the case, can you share your
> pom.xml file with us? The Flink version stays constant at 1.12?
>
> Regards,
> Timo
>
> On 22.07.21 12:22, Natu Lauchande wrote:
> > Good day Flink community,
> >
> > Apache Flink/Ververica Community Edition - Question
> >
> >
> > I am having an issue with my Flink SQL jobs since updating from Flink
> > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on
> > parquet and S3 i am getting the following error continuously:
> >
> > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
> > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309).
> >
> > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
> >
> > at java.lang.Class.getDeclaredConstructors0(Native Method) ~[?:1.8.0_292]
> >
> > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
> > ~[?:1.8.0_292]
> >
> > at java.lang.Class.getDeclaredConstructors(Class.java:2020)
> ~[?:1.8.0_292]
> >
> > **
> >
> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> > ~[?:1.8.0_292]
> >
> > at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at 

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Timo Walther

Hi Natu,

Ververica Platform 2.5 has updated the bundled Hadoop version but this 
should not result in a NoClassDefFoundError exception. How are you 
submitting your SQL jobs? You don't use Ververica's SQL service but have 
built a regular JAR file, right? If this is the case, can you share your 
pom.xml file with us? The Flink version stays constant at 1.12?


Regards,
Timo

On 22.07.21 12:22, Natu Lauchande wrote:

Good day Flink community,

Apache Flink/Ververica Community Edition - Question


I am having an issue with my Flink SQL jobs since updating from Flink 
1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on 
parquet and S3 i am getting the following error continuously:


INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @ 
10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309).


java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

at java.lang.Class.getDeclaredConstructors0(Native Method) ~[?:1.8.0_292]

at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) 
~[?:1.8.0_292]


at java.lang.Class.getDeclaredConstructors(Class.java:2020) ~[?:1.8.0_292]

**

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) 
~[?:1.8.0_292]


at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.conf.Configuration


at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_292]

at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292]

at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) 

Re:flink sql 依赖隔离

2021-07-22 Thread Michael Ran
通过任务进行隔离引用呗。你们美团已经是k8s了吧?
在 2021-07-05 14:06:53,"silence"  写道:
>请教大家目前flink sql有没有办法做到依赖隔离
>比如connector,format,udf(这个最重要)等,
>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划


Re: Flink TaskManager container got restarted by K8S very frequently

2021-07-22 Thread David Morávek
If you run `kubectl describe pod ...` on the affected pod, you should see a
reason why the previous pod has terminated (eg. OOM killed by Kubernetes).

Best,
D.

On Thu, Jul 22, 2021 at 9:30 AM Fabian Paul 
wrote:

> CC user ML
>
>


Re:Re: flink大窗口性能问题

2021-07-22 Thread Michael Ran
并行度改大,窗口时间小点呗
在 2021-07-15 11:52:12,"Wanghui (HiCampus)"  写道:
>并行度增大也可以吗?
>
>
>
>On 2021/07/15 02:45:18, "Michael Ran" mailto:g...@163.com>> 
>wrote:
>
>> 要么内存增大,或者并行增大,要么窗口改小,同时保留数据时间减少>
>
>> 在 2021-07-15 10:23:25,"Hui Wang" 
>> <46...@qq.com.INVALID> 写道:>
>
>> >flink大窗口缓存数据量过大导致jvm频烦full gc,并且处理速度极低,最终OOM,该如何调优>
>
>>
>
>>


Re: Need help of deploying Flink HA on kubernetes cluster

2021-07-22 Thread Fabian Paul
Hi Dhiru,

No worries I completely understand your point. Usually all the executable 
scripts from Flink can be found in the main repository [1].
We also provide a community edition of our commercial product [2] which manages 
the lifecycle of the cluster and you do not have to use these scripts and can 
mange your application with an API. [3]

Best,
Fabian

[1] https://github.com/apache/flink/tree/master/flink-dist/src/main/flink-bin 

[2] https://www.ververica.com/getting-started 

[3] https://docs.ververica.com/getting_started/flink_operations.html

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread 刘建刚
Thanks, Till. There are many reasons to reduce the heartbeat interval and
timeout. But I am not sure what values are suitable. In our cases, the GC
time and big job can be related factors. Since most flink jobs are pipeline
and a total failover can cost some time, we should tolerate some stop-world
situations. Also, I think that the FLINK-23216 should be solved to detect
lost container fast and react to it. For my side, I suggest
reducing the values gradually.

Till Rohrmann  于2021年7月22日周四 下午5:33写道:

> Thanks for your inputs Gen and Arnaud.
>
> I do agree with you, Gen, that we need better guidance for our users on
> when to change the heartbeat configuration. I think this should happen in
> any case. I am, however, not so sure whether we can give hard threshold
> like 5000 tasks, for example, because as Arnaud said it strongly depends on
> the workload. Maybe we can explain it based on symptoms a user might
> experience and what to do then.
>
> Concerning your workloads, Arnaud, I'd be interested to learn a bit more.
> The user code runs in its own thread. This means that its operation won't
> block the main thread/heartbeat. The only thing that can happen is that the
> user code starves the heartbeat in terms of CPU cycles or causes a lot of
> GC pauses. If you are observing the former problem, then we might think
> about changing the priorities of the respective threads. This should then
> improve Flink's stability for these workloads and a shorter heartbeat
> timeout should be possible.
>
> Also for the RAM-cached repositories, what exactly is causing the heartbeat
> to time out? Is it because you have a lot of GC or that the heartbeat
> thread does not get enough CPU cycles?
>
> Cheers,
> Till
>
> On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud 
> wrote:
>
> > Hello,
> >
> >
> >
> > From a user perspective: we have some (rare) use cases where we use
> > “coarse grain” datasets, with big beans and tasks that do lengthy
> operation
> > (such as ML training). In these cases we had to increase the time out to
> > huge values (heartbeat.timeout: 50) so that our app is not killed.
> >
> > I’m aware this is not the way Flink was meant to be used, but it’s a
> > convenient way to distribute our workload on datanodes without having to
> > use another concurrency framework (such as M/R) that would require the
> > recoding of sources and sinks.
> >
> >
> >
> > In some other (most common) cases, our tasks do some R/W accesses to
> > RAM-cached repositories backed by a key-value storage such as Kudu (or
> > Hbase). If most of those calls are very fast, sometimes when the system
> is
> > under heavy load they may block more than a few seconds, and having our
> app
> > killed because of a short timeout is not an option.
> >
> >
> >
> > That’s why I’m not in favor of very short timeouts… Because in my
> > experience it really depends on what user code does in the tasks. (I
> > understand that normally, as user code is not a JVM-blocking activity
> such
> > as a GC, it should have no impact on heartbeats, but from experience, it
> > really does)
> >
> >
> >
> > Cheers,
> >
> > Arnaud
> >
> >
> >
> >
> >
> > *De :* Gen Luo 
> > *Envoyé :* jeudi 22 juillet 2021 05:46
> > *À :* Till Rohrmann 
> > *Cc :* Yang Wang ; dev ;
> > user 
> > *Objet :* Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval
> > default values
> >
> >
> >
> > Hi,
> >
> > Thanks for driving this @Till Rohrmann  . I would
> > give +1 on reducing the heartbeat timeout and interval, though I'm not
> sure
> > if 15s and 3s would be enough either.
> >
> >
> >
> > IMO, except for the standalone cluster, where the heartbeat mechanism in
> > Flink is totally relied, reducing the heartbeat can also help JM to find
> > out faster TaskExecutors in abnormal conditions that can not respond to
> the
> > heartbeat requests, e.g., continuously Full GC, though the process of
> > TaskExecutor is alive and may not be known by the deployment system.
> Since
> > there are cases that can benefit from this change, I think it could be
> done
> > if it won't break the experience in other scenarios.
> >
> >
> >
> > If we can address what will block the main threads from processing
> > heartbeats, or enlarge the GC costs, we can try to get rid of them to
> have
> > a more predictable response time of heartbeat, or give some advices to
> > users if their jobs may encounter these issues. For example, as far as I
> > know JM of a large scale job will be more busy and may not able to
> process
> > heartbeats in time, then we can give a advice that users working with job
> > large than 5000 tasks should enlarge there heartbeat interval to 10s and
> > timeout to 50s. The numbers are written casually.
> >
> >
> >
> > As for the issue in FLINK-23216, I think it should be fixed and may not
> be
> > a main concern for this case.
> >
> >
> >
> > On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann 
> > wrote:
> >
> > Thanks for sharing these insights.
> >
> >
> >
> > I think it is 

Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Natu Lauchande
Good day Flink community,

Apache Flink/Ververica Community Edition - Question


I am having an issue with my Flink SQL jobs since updating from Flink
1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on parquet
and S3 i am getting the following error continuously:

INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309).

java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

at java.lang.Class.getDeclaredConstructors0(Native Method)
~[?:1.8.0_292]

at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
~[?:1.8.0_292]

at java.lang.Class.getDeclaredConstructors(Class.java:2020)
~[?:1.8.0_292]

**

   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
~[?:1.8.0_292]

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.conf.Configuration

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
~[?:1.8.0_292]

at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292]

at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_292]

... 57 more

2021-07-22 09:38:43,095 DEBUG org.apache.flink.runtime.scheduler.SharedSlot
  [] - Remove logical slot
(SlotRequestId{4297879e795d0516e36a7c26ccc795b2}) for execution vertex (id
cbc357ccb763df2852fee8c4fc7d55f2_0) from the 

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread Till Rohrmann
Thanks for your inputs Gen and Arnaud.

I do agree with you, Gen, that we need better guidance for our users on
when to change the heartbeat configuration. I think this should happen in
any case. I am, however, not so sure whether we can give hard threshold
like 5000 tasks, for example, because as Arnaud said it strongly depends on
the workload. Maybe we can explain it based on symptoms a user might
experience and what to do then.

Concerning your workloads, Arnaud, I'd be interested to learn a bit more.
The user code runs in its own thread. This means that its operation won't
block the main thread/heartbeat. The only thing that can happen is that the
user code starves the heartbeat in terms of CPU cycles or causes a lot of
GC pauses. If you are observing the former problem, then we might think
about changing the priorities of the respective threads. This should then
improve Flink's stability for these workloads and a shorter heartbeat
timeout should be possible.

Also for the RAM-cached repositories, what exactly is causing the heartbeat
to time out? Is it because you have a lot of GC or that the heartbeat
thread does not get enough CPU cycles?

Cheers,
Till

On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> From a user perspective: we have some (rare) use cases where we use
> “coarse grain” datasets, with big beans and tasks that do lengthy operation
> (such as ML training). In these cases we had to increase the time out to
> huge values (heartbeat.timeout: 50) so that our app is not killed.
>
> I’m aware this is not the way Flink was meant to be used, but it’s a
> convenient way to distribute our workload on datanodes without having to
> use another concurrency framework (such as M/R) that would require the
> recoding of sources and sinks.
>
>
>
> In some other (most common) cases, our tasks do some R/W accesses to
> RAM-cached repositories backed by a key-value storage such as Kudu (or
> Hbase). If most of those calls are very fast, sometimes when the system is
> under heavy load they may block more than a few seconds, and having our app
> killed because of a short timeout is not an option.
>
>
>
> That’s why I’m not in favor of very short timeouts… Because in my
> experience it really depends on what user code does in the tasks. (I
> understand that normally, as user code is not a JVM-blocking activity such
> as a GC, it should have no impact on heartbeats, but from experience, it
> really does)
>
>
>
> Cheers,
>
> Arnaud
>
>
>
>
>
> *De :* Gen Luo 
> *Envoyé :* jeudi 22 juillet 2021 05:46
> *À :* Till Rohrmann 
> *Cc :* Yang Wang ; dev ;
> user 
> *Objet :* Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval
> default values
>
>
>
> Hi,
>
> Thanks for driving this @Till Rohrmann  . I would
> give +1 on reducing the heartbeat timeout and interval, though I'm not sure
> if 15s and 3s would be enough either.
>
>
>
> IMO, except for the standalone cluster, where the heartbeat mechanism in
> Flink is totally relied, reducing the heartbeat can also help JM to find
> out faster TaskExecutors in abnormal conditions that can not respond to the
> heartbeat requests, e.g., continuously Full GC, though the process of
> TaskExecutor is alive and may not be known by the deployment system. Since
> there are cases that can benefit from this change, I think it could be done
> if it won't break the experience in other scenarios.
>
>
>
> If we can address what will block the main threads from processing
> heartbeats, or enlarge the GC costs, we can try to get rid of them to have
> a more predictable response time of heartbeat, or give some advices to
> users if their jobs may encounter these issues. For example, as far as I
> know JM of a large scale job will be more busy and may not able to process
> heartbeats in time, then we can give a advice that users working with job
> large than 5000 tasks should enlarge there heartbeat interval to 10s and
> timeout to 50s. The numbers are written casually.
>
>
>
> As for the issue in FLINK-23216, I think it should be fixed and may not be
> a main concern for this case.
>
>
>
> On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann 
> wrote:
>
> Thanks for sharing these insights.
>
>
>
> I think it is no longer true that the ResourceManager notifies the
> JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.
>
>
>
> Given the GC pauses, would you then be ok with decreasing the heartbeat
> timeout to 20 seconds? This should give enough time to do the GC and then
> still send/receive a heartbeat request.
>
>
>
> I also wanted to add that we are about to get rid of one big cause of
> blocking I/O operations from the main thread. With FLINK-22483 [2] we will
> get rid of Filesystem accesses to retrieve completed checkpoints. This
> leaves us with one additional file system access from the main thread which
> is the one completing a pending checkpoint. I think it should be possible
> to get rid of this access because as Stephan said it 

Re: Questions about keyed streams

2021-07-22 Thread Fabian Paul
Hi Dan,

1) In general, there is no guarantee that your downstream operator is on the 
same TM although working on the same key group. Nevertheless, you can try force 
this kind of behaviour to prevent the network transfer by either chaining the 
two operators (if no shuffle is in between) or configure a slot sharing 
group[1]. A very naive approach is only using one large TM but this often does 
not suffice.

2) Unfortunately, I do not fully understand your second question. From a Flink 
Source perspective reading from Kafka currently does not preserve the Kafka 
partitioning, meaning that you need to regroup your data again. We currently 
investigate different solutions to allow mapping a Kafka partition to a Flink 
key group. I’d need some more information about the writing to Kafka scenario 
you are describing to give a satisfying answer.

Best,
Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/flink-architecture/#task-slots-and-resources

Re: Need help of deploying Flink HA on kubernetes cluster

2021-07-22 Thread Fabian Paul
Hi Dhirendra,

Thanks for reaching out. A good way to start is to have a look at [1] and [2]. 
Once you have everything setup it should be possible to delete the pod of the 
JobManager while an application is running and the job successfully recovers. 
You can use one of the example Flink application to do so [3].

Please let us know if this works for you.

Best,
Fabian








[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/
 

[3] https://github.com/apache/flink/tree/master/flink-examples

Re: Flink TaskManager container got restarted by K8S very frequently

2021-07-22 Thread Fabian Paul
CC user ML



RE: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread LINZ, Arnaud
Hello,

From a user perspective: we have some (rare) use cases where we use “coarse 
grain” datasets, with big beans and tasks that do lengthy operation (such as ML 
training). In these cases we had to increase the time out to huge values 
(heartbeat.timeout: 50) so that our app is not killed.
I’m aware this is not the way Flink was meant to be used, but it’s a convenient 
way to distribute our workload on datanodes without having to use another 
concurrency framework (such as M/R) that would require the recoding of sources 
and sinks.

In some other (most common) cases, our tasks do some R/W accesses to RAM-cached 
repositories backed by a key-value storage such as Kudu (or Hbase). If most of 
those calls are very fast, sometimes when the system is under heavy load they 
may block more than a few seconds, and having our app killed because of a short 
timeout is not an option.

That’s why I’m not in favor of very short timeouts… Because in my experience it 
really depends on what user code does in the tasks. (I understand that 
normally, as user code is not a JVM-blocking activity such as a GC, it should 
have no impact on heartbeats, but from experience, it really does)

Cheers,
Arnaud


De : Gen Luo 
Envoyé : jeudi 22 juillet 2021 05:46
À : Till Rohrmann 
Cc : Yang Wang ; dev ; user 

Objet : Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default 
values

Hi,
Thanks for driving this @Till Rohrmann . I would 
give +1 on reducing the heartbeat timeout and interval, though I'm not sure if 
15s and 3s would be enough either.

IMO, except for the standalone cluster, where the heartbeat mechanism in Flink 
is totally relied, reducing the heartbeat can also help JM to find out faster 
TaskExecutors in abnormal conditions that can not respond to the heartbeat 
requests, e.g., continuously Full GC, though the process of TaskExecutor is 
alive and may not be known by the deployment system. Since there are cases that 
can benefit from this change, I think it could be done if it won't break the 
experience in other scenarios.

If we can address what will block the main threads from processing heartbeats, 
or enlarge the GC costs, we can try to get rid of them to have a more 
predictable response time of heartbeat, or give some advices to users if their 
jobs may encounter these issues. For example, as far as I know JM of a large 
scale job will be more busy and may not able to process heartbeats in time, 
then we can give a advice that users working with job large than 5000 tasks 
should enlarge there heartbeat interval to 10s and timeout to 50s. The numbers 
are written casually.

As for the issue in FLINK-23216, I think it should be fixed and may not be a 
main concern for this case.

On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Thanks for sharing these insights.

I think it is no longer true that the ResourceManager notifies the JobMaster 
about lost TaskExecutors. See FLINK-23216 [1] for more details.

Given the GC pauses, would you then be ok with decreasing the heartbeat timeout 
to 20 seconds? This should give enough time to do the GC and then still 
send/receive a heartbeat request.

I also wanted to add that we are about to get rid of one big cause of blocking 
I/O operations from the main thread. With FLINK-22483 [2] we will get rid of 
Filesystem accesses to retrieve completed checkpoints. This leaves us with one 
additional file system access from the main thread which is the one completing 
a pending checkpoint. I think it should be possible to get rid of this access 
because as Stephan said it only writes information to disk that is already 
written before. Maybe solving these two issues could ease concerns about long 
pauses of unresponsiveness of Flink.

[1] https://issues.apache.org/jira/browse/FLINK-23216
[2] https://issues.apache.org/jira/browse/FLINK-22483

Cheers,
Till

On Wed, Jul 21, 2021 at 4:58 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Thanks @Till Rohrmann  for starting this discussion

Firstly, I try to understand the benefit of shorter heartbeat timeout. IIUC, it 
will make the JobManager aware of
TaskManager faster. However, it seems that only the standalone cluster could 
benefit from this. For Yarn and
native Kubernetes deployment, the Flink ResourceManager should get the 
TaskManager lost event in a very short time.

* About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
* Less than 1 second, Flink RM has a watch for all the TaskManager pods

Secondly, I am not very confident to decrease the timeout to 15s. I have 
quickly checked the TaskManager GC logs
in the past week of our internal Flink workloads and find more than 100 
10-seconds Full GC logs, but no one is bigger than 15s.
We are using CMS GC for old generation.


Best,
Yang

Till Rohrmann mailto:trohrm...@apache.org>> 于2021年7月17日周六 
上午1:05写道:
Hi everyone,

Since Flink 1.5 we have 

Re: 请教union算子union多个source 流时的健壮性如何保证

2021-07-22 Thread Caizhi Weng
Hi!

“某几条 stream 异常挂掉后,而不影响其他流的 union”,指的是如果其中几个 stream 出现问题,则无视这些 stream
的输入,只处理正常 stream 的输入吗?
如果是的话,目前 Flink 应该暂时没有这样的功能。可以考虑写一个自己的 mq connector,当 mq 有异常发生时则让 source
不再产生数据,而不是抛出错误。

Fisher Xiang  于2021年7月21日周三 下午11:14写道:

>
> 请问大家在使用 union算子union多个 stream时,比如 stream1.union(stream2, stream3, … stream
> n) ,其中1到n分别来自不同的MQ 集群MQ1, MQ2… MQ n, 当其中几个集群挂掉时,
> 整个flink 应用都会重启,那么该场景下怎么可以做到 某几条stream 异常挂掉后,而不影响其他流的 union,让整个 flink继续运行呢?
>
> [image: image.png]
>
> BR
> Fisher
>