flink on yarn??????????log4j????

2021-07-22 文章 comsir
hi all
flink??log4jlog4j
??
 ??

回复:flink sql 依赖隔离

2021-07-22 文章 silence

这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载
udf和sql jar之间、udf和udf之间都可能会有依赖冲突,
目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突
--
发件人:Michael Ran 
发送时间:2021年7月22日(星期四) 20:07
收件人:user-zh ; silence 
主 题:Re:flink sql 依赖隔离

通过任务进行隔离引用呗。你们美团已经是k8s了吧?
在 2021-07-05 14:06:53,"silence"  写道:
>请教大家目前flink sql有没有办法做到依赖隔离
>比如connector,format,udf(这个最重要)等,
>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划



Re:flink时态表:两个Hbase左关联有报错情况

2021-07-22 文章 Michael Ran
java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: 
org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils缺jar
在 2021-07-14 09:39:53,"xie_guo...@163.com"  写道:
>您好,有关flinkSQL时态表左关联时遇到了问题。
>具体场景:
>
> 两个Hbase表做时态表关联,左表定义了处理时间,右表原本不变(rowkey,cf),FLINK版本1.13.0,报错情况如下,麻烦问一下大家又遇到类似问题嘛,该怎么处理!
>
>2021-07-14 09:22:20.592 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
>2021-07-14 09:22:20.596 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
>LookupJoin(table=[default_catalog.default_database.hbase_source_pollution_dwb_enterprise_wodhz],
> joinType=[LeftOuterJoin], async=[true], lookup=[code=$f4], select=[code, 
>data1, data2, p, $f4, code0, data]) -> Calc(select=[code, 
>ROW(,,data.activ) -> NotNullEnforcer(fields=[EXPR$1, EXPR$2, p, EXPR$4]) 
>(3/3)#3 (4ada188e117c67ccd9bd6488ae95216a) switched from RUNNING to FAILED 
>with failure cause: java.util.concurrent.ExecutionException: 
>java.lang.NoClassDefFoundError: 
>org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
>at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928
>at 
>org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
>at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:448)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:671)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>at java.lang.Thread.run(Thread.java:748)
>Caused by: java.lang.NoClassDefFoundError: 
>org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
>at 
>org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
>at 
>org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:251)
>at LookupFunction$3.close(Unknown Source
>
>ps:同样的代码,左表换成Kafka,能够正常运行。网上搜索了一下相关报错,好像时缺包,目前pom文件是有hbase-client、hbase-commen、flink-sql-connector-hbase。
>
>
>
>Sincerely,
>xie_guo...@163.com


Re:flink ??????????????

2021-07-22 文章 Michael Ran
??
?? 2021-07-13 17:31:19??"" <1510603...@qq.com.INVALID> ??
>Hi All??
>
>
>  ??Flink 
>checkpoint??2min??
>??2min??  ??
> 
>
>
>
>The program finished with the following exception:
>
>
>org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
> failed.
>   at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
>   at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>   at 
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>Caused by: java.util.concurrent.TimeoutException
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)


Re:请教on yarn per job 作业采集日志进行监控方案

2021-07-22 文章 Michael Ran
简单的可以先监控任务状态,重启次数这种,消息延迟等这种能直接api拿到值的, 其他指标的比较麻烦,特别是task多了,算子多,还要合并
在 2021-07-21 11:32:31,"yihan xu"  写道:
>原本作业基本处于半裸奔的状态,最近线上出了一次小事故后,在考虑如何实时采集作业日志或者metric再配置告警。
>网上初步搜了一下,好像就是prometheus+grafana或者elk。
>
>请教各位大佬的项目目前都是用什么方式,我们小公司就我一个人搞flink,半路出家水平也有限,请大佬们推荐个易维护坑少点的方式?谢谢。
>
>发自我的iPhone
>
>
>发自我的iPhone


Re: Flink kafka自定义metrics在influxdb上解析失败

2021-07-22 文章 Caizhi Weng
Hi!

是通过什么方式拿到 sink table name 呢?从报错信息来看,拿到的可能并不是“a” 这样的 table name,而是一个 insert
语句的 digest,因此 metric 表名会变得比较复杂,导致解析错误。

当然也可以选择对 metric 表名进行转义。

Jimmy Zhang  于2021年7月23日周五 上午10:11写道:

> 大家好,Flink版本1.13.1。
> 我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
> 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。
>
> 但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where
> b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
> 但如果加上UDF,比如
> insert into a select
> CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME,
> 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where
> Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
> org.influxdb.InfluxDBException$UnableToParseException: partial write:
> unable to parse
> 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name=
> insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
> unable to parse ''tablename'\,
> 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\,
> 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)
>
> |
> Best,
> Jimmy
> |
>
> Signature is customized by Netease Mail Master


Re:Re: flink sql 依赖隔离

2021-07-22 文章 Michael Ran
我看阿里不是传到OSS,然后每个任务 image  拉取下来的时候顺便就把jar 拉进来了。完全隔离的,jar 也方便管理
在 2021-07-22 23:45:14,"Jeff Zhang"  写道:
>Zeppelin 支持依赖的动态加载
>https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
>https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2
>
>
>Michael Ran  于2021年7月22日周四 下午8:07写道:
>
>> 通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>> 在 2021-07-05 14:06:53,"silence"  写道:
>> >请教大家目前flink sql有没有办法做到依赖隔离
>> >比如connector,format,udf(这个最重要)等,
>> >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>> >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>>
>
>
>-- 
>Best Regards
>
>Jeff Zhang


Re: flink 1.13.1 再次查询row(a, b)生成的列时报错

2021-07-22 文章 Caizhi Weng
Hi!

关于问题一(如何配置 row 的字段名),可以通过 cast 语句:
select cast(row(f_sequence, f_random) as row) as c from
datagen

关于问题二,看起来确实是一个 bug,可以去 https://issues.apache.org/jira/projects/FLINK/issues
上开一个 ticket

Asahi Lee <978466...@qq.com.invalid> 于2021年7月22日周四 下午8:44写道:

> hi!
> 1. 我在使用flink 1.13.1 对通过row(a,b)生成的列再次查询时,发生错误,是否是一个bug?
> 2. 通过 row函数生成row类型的列时,无法指定row中字段的name,是否考虑支持name的配置?
> 我的示例程序如下:
>
>
> package test;
>
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.ResolvedSchema;
>
>
> public class DataGenTest {
>
>
>   public static void main(String[] args) {
> StreamExecutionEnvironment
> streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnvironment =
> StreamTableEnvironment.create(streamExecutionEnvironment);
>
>
> tableEnvironment.executeSql("CREATE TABLE
> datagen (\n" +
> " f_sequence
> INT,\n" +
> " f_random INT,\n"
> +
> " f_random_str
> STRING,\n" +
> " ts AS
> localtimestamp,\n" +
> " WATERMARK FOR ts
> AS ts\n" +
> ") WITH (\n" +
> " 'connector' =
> 'datagen',\n" +
> "
> 'rows-per-second'='5',\n" +
> "
> 'fields.f_sequence.kind'='sequence',\n" +
> "
> 'fields.f_sequence.start'='1',\n" +
> "
> 'fields.f_sequence.end'='1000',\n" +
> "
> 'fields.f_random.min'='1',\n" +
> "
> 'fields.f_random.max'='1000',\n" +
> "
> 'fields.f_random_str.length'='10'\n" +
> ")");
>
>
> Table table =
> tableEnvironment.sqlQuery("select row(f_sequence, f_random) as c from
> datagen");
> ResolvedSchema resolvedSchema =
> table.getResolvedSchema();
> System.out.println(resolvedSchema);
> /**
> * 打印如下:
> * (
> * `c` ROW<`EXPR$0` INT,
> `EXPR$1` INT NOT NULL
> * )
> *
> 问题一,通过使用row函数,我将两个列放入row类型中,那我如何配置row中字段的名称呢?,如下中的c1, c2:
> * (
> * `c` ROW<`c1` INT, `c2`
> INT NOT NULL
> * )
> */
>
>
> Table table1 =
> tableEnvironment.sqlQuery("select * from " + table);
> /**
> * 问题二,查询sql报错:
> * Exception in thread "main"
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> * validated type:
> *
> RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT
> NULL c) NOT NULL
> * converted type:
> * RecordType(RecordType(INTEGER EXPR$0,
> INTEGER EXPR$1) NOT NULL c) NOT NULL
> * rel:
> * LogicalProject(c=[ROW($0, $1)])
> *
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
> * 
> LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2],
> ts=[LOCALTIMESTAMP])
> *  
> LogicalTableScan(table=[[default_catalog, default_database, datagen]])
> */
> ResolvedSchema resolvedSchema1 =
> table1.getResolvedSchema();
> System.out.println(resolvedSchema1);
>
>
> table.execute().print();
>
>
>
>
>   }
>
>
> }


Re: k8s session模式SQLclient怎样连接

2021-07-22 文章 Caizhi Weng
Hi!

可以考虑把 k8s session 的 flink rest api 地址暴露出来,然后客户端把 execution.target 设为
remote,rest.address 和 rest.port 设为相应地址。

maker_d...@foxmail.com  于2021年7月22日周四 下午9:46写道:

> 大家好,
> 我将flink部署在k8s集群上,使用官方文档上的session模式进行部署,可以正常提交任务。
> 现在我想使用sqlclient,在提交任务时提示 :
> [ERROR] Could not execute SQL statement. Reason:
> java.net.UnknownHostException: flink-cluster
> 请问大家,如何使用sqlclient连接k8s上的flink session。
> flink版本 1.12.4.
>
>
>
> maker_d...@foxmail.com
>


Flink kafka自定义metrics在influxdb上解析失败

2021-07-22 文章 Jimmy Zhang
大家好,Flink版本1.13.1。
我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where 
b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
但如果加上UDF,比如
insert into a select 
CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 
'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where 
Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to 
parse 
'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= 
insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
unable to parse ''tablename'\, 
'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 
'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)

|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

Re: flink sql 依赖隔离

2021-07-22 文章 Jeff Zhang
Zeppelin 支持依赖的动态加载
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2


Michael Ran  于2021年7月22日周四 下午8:07写道:

> 通过任务进行隔离引用呗。你们美团已经是k8s了吧?
> 在 2021-07-05 14:06:53,"silence"  写道:
> >请教大家目前flink sql有没有办法做到依赖隔离
> >比如connector,format,udf(这个最重要)等,
> >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
> >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>


-- 
Best Regards

Jeff Zhang


k8s session模式SQLclient怎样连接

2021-07-22 文章 maker_d...@foxmail.com
大家好,
我将flink部署在k8s集群上,使用官方文档上的session模式进行部署,可以正常提交任务。
现在我想使用sqlclient,在提交任务时提示 :
[ERROR] Could not execute SQL statement. Reason:
java.net.UnknownHostException: flink-cluster
请问大家,如何使用sqlclient连接k8s上的flink session。
flink版本 1.12.4.



maker_d...@foxmail.com


flink 1.13.1 ????????row(a, b)??????????????

2021-07-22 文章 Asahi Lee
hi??
1. flink 1.13.1 
??row(a,b)bug??
2.  
rowrowrowname??name
??


package test;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ResolvedSchema;


public class DataGenTest {


  public static void main(String[] args) {
StreamExecutionEnvironment 
streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecutionEnvironment);


tableEnvironment.executeSql("CREATE TABLE datagen 
(\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str 
STRING,\n" +
" ts AS 
localtimestamp,\n" +
" WATERMARK FOR ts AS 
ts\n" +
") WITH (\n" +
" 'connector' = 
'datagen',\n" +
" 
'rows-per-second'='5',\n" +
" 
'fields.f_sequence.kind'='sequence',\n" +
" 
'fields.f_sequence.start'='1',\n" +
" 
'fields.f_sequence.end'='1000',\n" +
" 
'fields.f_random.min'='1',\n" +
" 
'fields.f_random.max'='1000',\n" +
" 
'fields.f_random_str.length'='10'\n" +
")");


Table table = tableEnvironment.sqlQuery("select 
row(f_sequence, f_random) as c from datagen");
ResolvedSchema resolvedSchema = 
table.getResolvedSchema();
System.out.println(resolvedSchema);
/**
* ??
* (
* `c` ROW<`EXPR$0` INT, `EXPR$1` 
INT NOT NULL
* )
* 
rowrowrow??c1,
 c2??
* (
* `c` ROW<`c1` INT, `c2` INT 
NOT NULL
* )
*/


Table table1 = tableEnvironment.sqlQuery("select * 
from " + table);
/**
* sql??
* Exception in thread "main" 
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
* validated type:
* 
RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL 
c) NOT NULL
* converted type:
* RecordType(RecordType(INTEGER EXPR$0, 
INTEGER EXPR$1) NOT NULL c) NOT NULL
* rel:
* LogicalProject(c=[ROW($0, $1)])
* 
LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
*  
LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], 
ts=[LOCALTIMESTAMP])
*   
LogicalTableScan(table=[[default_catalog, default_database, datagen]])
*/
ResolvedSchema resolvedSchema1 = 
table1.getResolvedSchema();
System.out.println(resolvedSchema1);


table.execute().print();




  }


}

Re:flink sql 依赖隔离

2021-07-22 文章 Michael Ran
通过任务进行隔离引用呗。你们美团已经是k8s了吧?
在 2021-07-05 14:06:53,"silence"  写道:
>请教大家目前flink sql有没有办法做到依赖隔离
>比如connector,format,udf(这个最重要)等,
>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划


Re:Re: flink大窗口性能问题

2021-07-22 文章 Michael Ran
并行度改大,窗口时间小点呗
在 2021-07-15 11:52:12,"Wanghui (HiCampus)"  写道:
>并行度增大也可以吗?
>
>
>
>On 2021/07/15 02:45:18, "Michael Ran" mailto:g...@163.com>> 
>wrote:
>
>> 要么内存增大,或者并行增大,要么窗口改小,同时保留数据时间减少>
>
>> 在 2021-07-15 10:23:25,"Hui Wang" 
>> <46...@qq.com.INVALID> 写道:>
>
>> >flink大窗口缓存数据量过大导致jvm频烦full gc,并且处理速度极低,最终OOM,该如何调优>
>
>>
>
>>


Re: 请教union算子union多个source 流时的健壮性如何保证

2021-07-22 文章 Caizhi Weng
Hi!

“某几条 stream 异常挂掉后,而不影响其他流的 union”,指的是如果其中几个 stream 出现问题,则无视这些 stream
的输入,只处理正常 stream 的输入吗?
如果是的话,目前 Flink 应该暂时没有这样的功能。可以考虑写一个自己的 mq connector,当 mq 有异常发生时则让 source
不再产生数据,而不是抛出错误。

Fisher Xiang  于2021年7月21日周三 下午11:14写道:

>
> 请问大家在使用 union算子union多个 stream时,比如 stream1.union(stream2, stream3, … stream
> n) ,其中1到n分别来自不同的MQ 集群MQ1, MQ2… MQ n, 当其中几个集群挂掉时,
> 整个flink 应用都会重启,那么该场景下怎么可以做到 某几条stream 异常挂掉后,而不影响其他流的 union,让整个 flink继续运行呢?
>
> [image: image.png]
>
> BR
> Fisher
>