Flink sql 流批一体的应用

2021-01-05 文章 Jacob
Hi all

现有一个场景:

消费kafka消息,逐条处理加工每条kafka数据,每隔15分钟将不同数据写进hive表(多张表)
之后,对上面的多张表进行一系列join merge等操作写到新表,生成最终的数据。


这样的场景如果用Flink去处理,是不是需要启动两个flink job,一个处理流数据,一个处理批数据
因为两个执行环境不一样
流处理:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
批处理:
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

有没有可能让这两部分合二为一呢,放在同一个job执行?



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复: flink sql消费kafka sink到mysql问题

2021-01-05 文章 air23
发现是flink sql 消费kafka 不管有没有解析成功。先去提交offset到kafka 但是实际 是解析失败了。
在 2021-01-06 14:01:34,"Evan"  写道:
>flinksql 貌似是目前做不到你说的这样
>
>
>
> 
>发件人: air23
>发送时间: 2021-01-06 12:29
>收件人: user-zh
>主题: flink sql消费kafka sink到mysql问题
>你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
>然后再重启 发现报错的数据 会丢失
>采用的scan.startup.mode' = 'group-offsets'
>按理说 不是要重新消费 失败的那条数据 开始消费吗?
>请问如何配置 可以不丢失数据
> 
> 
>CREATE TABLE source1 (
>id BIGINT   ,
>username STRING ,
>password STRING  ,
>AddTime TIMESTAMP  ,
>origin_table STRING METADATA FROM 'value.table' VIRTUAL,
>origin_sql_type MAP METADATA FROM 'value.sql-type' VIRTUAL
>) WITH (
>'connector' = 'kafka',
>'topic' = 'plink_canal',
>'properties.bootstrap.servers' = '***',
>'properties.group.id' = 'canal1',
>'scan.startup.mode' = 'group-offsets',
>'canal-json.table.include' = 'test.*',
>-- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 
>false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
>'format' = 'canal-json'
>);


hive表已创建的情况下从checkpoint恢复作业

2021-01-05 文章 amen...@163.com
hi everyone,

flink version: 1.12.0
job dag: kafka ->hive

今天碰到一个问题,我在第一次启动作业的时候,通过hivecatalog成功在hive中创建hive 
table,并正常入数,正常做checkpoint,但由于kafka含有脏数据,导致作业在重启3次仍无法跳过脏数据后变为Failed状态,于是修改作业kafka配置,开启可跳过解析异常行参数,再通过-s
 
hdfs:///xxx/checkpoints/chk-122去从checkpoint恢复作业时,首先报出来的异常是以前的kafka和hive表已经在指定catalog.database中创建,也确实是如此,但是我的疑问是:

1.任务失败后我调整作业重新从最近的chk恢复上线作业,按理来说chk state应该记录了我的表创建信息,从而不会再重新在hive中建表,但显然并没有如此
2.从错误情况来看,是先创建表,再从chk中恢复作业状态,那hive表已创建的异常 Caused by: 
AlreadyExistsException(message:Table kafka_source_table already exists) 该如何避免呢?

best,
amenhub





Flink SQL 如何保证多个sql 语句按顺序执行

2021-01-05 文章 Jacob
Dear All,在Flink SQL
job中,如果有多个sql语句,需要按顺序执行,即下一个sql的执行依赖上一个sql的执行结果。由于tableEnv.executeSql(sql)是*异步*提交的,那么如何保证多个sql是*顺序执行*?eg:在一个main函数中,有如下代码:String
sql1 = "";tableEnv.executeSql(sql1 );String sql2 =
"";tableEnv.executeSql(sql2 );问题:如何保证sql1先执行完成,再执行sql2



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: flink sql消费kafka sink到mysql问题

2021-01-05 文章 Evan
flinksql 貌似是目前做不到你说的这样



 
发件人: air23
发送时间: 2021-01-06 12:29
收件人: user-zh
主题: flink sql消费kafka sink到mysql问题
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据
 
 
CREATE TABLE source1 (
id BIGINT   ,
username STRING ,
password STRING  ,
AddTime TIMESTAMP  ,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP METADATA FROM 'value.sql-type' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'plink_canal',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'canal1',
'scan.startup.mode' = 'group-offsets',
'canal-json.table.include' = 'test.*',
-- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 
false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
'format' = 'canal-json'
);


??????????flink-sql????????????????????????????State????

2021-01-05 文章 ??????
flink??flink-on-yarn??jobTimeStampcurrent_dateenv.setStateBackend(new
 
MemoryStateBackend())??job??State??connector??upsert-kafka??dwd??sqlsqldwdupsert-kafka
|  select
|   TO_DATE(cast(doi.DeliveryTime as String),'-MM-dd') 
as  days,
|   doi.UserId,
|   count(doi.Code) as   SendTime,
|   sum(doi.PayAmount / 100) as SendCashcharge,
|   sum(doi.PayAmount / 100 - ChargeAmount / 100 + 
UseBalance / 100) as  SendCashuse,
|   sum(doi.CashMoney / 100)as  SendCash
|from dwd_order_info doi
|where doi.DeliveryTime cast(current_date AS TIMESTAMP) 
and doi.OrderType = 29 and doi.Status = 50 and doi.Status < 60
|group by TO_DATE(cast(doi.DeliveryTime as 
String),'-MM-dd'), doi.UserId

自定义InputFormat在发生异常时状态如何保存

2021-01-05 文章 automths
Hi:
我自定义一个InputFormat,在处理数据的过程中,发生异常,我想要将发生异常的上一个状态保存下来,以便于在问题修复后重启时能接着已经保存的状态点继续处理对应split剩余的数据,但是我又不需要像checkpoint那样,每个隔一段时间保存一下状态。这样的需求该怎么就现有的flink去实现呢?
我用的flink版本:1.12.0




祝好!
automths

Re: Flink SQL>查询的hive表数据全部为NULL

2021-01-05 文章 Jacob
谢谢回复

这个问题困扰了很久
已经解决
原因是写orc时候指定的字段名是column0、column1.、column33
而hive创建表的字段是实际字段的名字,两个不匹配,因此在flink sql中读不到
数据



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

2021-01-05 文章 Carmen Free
好的,非常感谢。

赵一旦  于2021年1月6日周三 下午1:08写道:

> 这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。
>
> Carmen Free  于2021年1月6日周三 上午10:58写道:
>
> > 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。
> >
> > 紧接着我这边出现了新的异常
> >
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> > No content to map due to end-of-input at [Source:UNKONWN; line: -1,
> column:
> > -1;]
> >
> > 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。
> >
> > 但是如果遇到了kafka消息为空的情况,这边不能处理吗?
> >
> > 赵一旦  于2021年1月5日周二 下午9:18写道:
> >
> > > 我感觉还是jar的问题。如下尝试下,我懒得去试了。
> > > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
> > > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
> > > ty.plain.PlainLoginModule
> > >
> > > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。
> > >
> > > Carmen Free  于2021年1月5日周二 下午5:09写道:
> > >
> > > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
> > > >
> > > > 1、版本说明
> > > > flink版本:1.10.2
> > > > kafka版本:1.1.0
> > > >
> > > > 2、kafka鉴权说明
> > > > 仅使用了sasl鉴权方式
> > > > 在kafka客户端有配置 kafka_server-jass.conf、
> > > > server.properties、producer.properties、consumer.properties
> > > >
> > > > 3、主要配置参数
> > > > sasl.mechanism=PLAIN
> > > > security.protocol=SASL_PLAINTEXT
> > > >
> sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> > > > required username="xx" password="xx-secret";
> > > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
> > > >
> > > > 4、用于flink SQL连接的jar包
> > > > flink-sql-connector-kafka_2.11-1.10.2.jar
> > > > flink-jdbc_2.11-1.10.2.jar
> > > > flink-csv-1.10.2-sql-jar.jar
> > > >
> > > >
> > > > 5、我的思路
> > > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> > > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
> > > >
> > > > 6、启动客户端
> > > > ./bin/sql-client.sh embedded -l sql_lib/
> > > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
> > > >
> > > >
> > > > 7、建表语句:
> > > > create table test_hello (
> > > > name string
> > > > ) with (
> > > > ...
> > > > ...
> > > > 'connector.properties.sasl.mechanism' = 'PLAIN',
> > > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> > > > 'connector.properties.sasl.jaas.config' =
> > > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> > > > username="xx" password="xx-secret";',
> > > > 'format.type' = 'csv'
> > > > );
> > > >
> > > > 建表没有问题,可以正常建表。
> > > >
> > > > 查询表的时候,就会报错,select * from test_hello;
> > > > 报错如下:
> > > > could not execute sql statement. Reason:
> > > > javax.security.auth.login.loginException: unable to find loginModule
> > > class:
> > > > org.apache.kafka.common.security.plain.PlainLoginModule
> > > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
> > > >
> > > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
> > > >
> > >
> >
>


Re: flink timestamp 解析问题

2021-01-05 文章 赵一旦
可以看下文档去,配置忽略解析错误。

air23  于2021年1月6日周三 上午10:41写道:

> 你好 这边使用flink sql有如下问题;
>
>
>
>
>
>
> CREATE TABLE source1 (
> id BIGINT   ,
> username STRING ,
> password STRING  ,
> AddTime TIMESTAMP  ,
> origin_table STRING METADATA FROM 'value.table' VIRTUAL
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'plink_canal',
> 'properties.bootstrap.servers' = '',
> 'properties.group.id' = 'canal1',
> 'scan.startup.mode' = 'group-offsets',
> 'canal-json.table.include' = 'test.*',
> 'format' = 'canal-json'
> );
>
>
>
> 当binlog 数据为非法日期时候。会出现如下报错
> Caused by: java.time.format.DateTimeParseException: Text '-00-00
> 00:00:00' could not be parsed: Invalid value for MonthOfYear (valid values
> 1 - 12): 0
> 这个有办法解决吗。我们上游业务库 数据 就会有这样的非法日期,
> 谢谢回答
>
>
>
>


Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

2021-01-05 文章 赵一旦
这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。

Carmen Free  于2021年1月6日周三 上午10:58写道:

> 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。
>
> 紧接着我这边出现了新的异常
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input at [Source:UNKONWN; line: -1, column:
> -1;]
>
> 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。
>
> 但是如果遇到了kafka消息为空的情况,这边不能处理吗?
>
> 赵一旦  于2021年1月5日周二 下午9:18写道:
>
> > 我感觉还是jar的问题。如下尝试下,我懒得去试了。
> > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
> > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
> > ty.plain.PlainLoginModule
> >
> > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。
> >
> > Carmen Free  于2021年1月5日周二 下午5:09写道:
> >
> > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
> > >
> > > 1、版本说明
> > > flink版本:1.10.2
> > > kafka版本:1.1.0
> > >
> > > 2、kafka鉴权说明
> > > 仅使用了sasl鉴权方式
> > > 在kafka客户端有配置 kafka_server-jass.conf、
> > > server.properties、producer.properties、consumer.properties
> > >
> > > 3、主要配置参数
> > > sasl.mechanism=PLAIN
> > > security.protocol=SASL_PLAINTEXT
> > > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> > > required username="xx" password="xx-secret";
> > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
> > >
> > > 4、用于flink SQL连接的jar包
> > > flink-sql-connector-kafka_2.11-1.10.2.jar
> > > flink-jdbc_2.11-1.10.2.jar
> > > flink-csv-1.10.2-sql-jar.jar
> > >
> > >
> > > 5、我的思路
> > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
> > >
> > > 6、启动客户端
> > > ./bin/sql-client.sh embedded -l sql_lib/
> > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
> > >
> > >
> > > 7、建表语句:
> > > create table test_hello (
> > > name string
> > > ) with (
> > > ...
> > > ...
> > > 'connector.properties.sasl.mechanism' = 'PLAIN',
> > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> > > 'connector.properties.sasl.jaas.config' =
> > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> > > username="xx" password="xx-secret";',
> > > 'format.type' = 'csv'
> > > );
> > >
> > > 建表没有问题,可以正常建表。
> > >
> > > 查询表的时候,就会报错,select * from test_hello;
> > > 报错如下:
> > > could not execute sql statement. Reason:
> > > javax.security.auth.login.loginException: unable to find loginModule
> > class:
> > > org.apache.kafka.common.security.plain.PlainLoginModule
> > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
> > >
> > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
> > >
> >
>


Re: 修改flink的任务调度

2021-01-05 文章 赵一旦
我不是很清楚,不过难度应该很大,不然社区早改了。当前任务经常导致机器资源不均衡,这个问题很常见。

penguin.  于2021年1月6日周三 上午11:15写道:

> Hi,请问大家知道怎么更改flink默认的任务调度方式吗?


flink sql消费kafka sink到mysql问题

2021-01-05 文章 air23
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据


CREATE TABLE source1 (
id BIGINT   ,
username STRING ,
password STRING  ,
AddTime TIMESTAMP  ,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP METADATA FROM 'value.sql-type' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'plink_canal',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'canal1',
'scan.startup.mode' = 'group-offsets',
'canal-json.table.include' = 'test.*',
-- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 
false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
'format' = 'canal-json'
);

Re: PyFlink on Yarn, Per-Job模式,如何增加多个外部依赖jar包?

2021-01-05 文章 Zhizhao Shangguan
HI Wei Zhong,

  感谢您的回复!

  发现是软链的问题(lib目录下的jar包不能用软链),去掉后就可以了。
  


在 2021/1/6 上午11:06,“Wei 
Zhong” 写入:

Hi Zhizhao,

能检查一下'file://' 后面跟的是绝对路径吗?这个报错是因为对应的路径在本地磁盘上找不到导致的。

> 在 2021年1月6日,10:23,Zhizhao Shangguan  写道:
> 
> Hi:
>   PyFlink on Yarn, 
Per-Job模式,如何增加多个外部依赖jar包?比如flink-sql-connector-kafka、flink-connector-jdbc等。
>  
>  环境信息
>  Flink 版本:1.11.0
>  Os: mac
>  
>  尝试了如下方案,遇到了一些问题
> 1、  按照官网cli说明[1]:-j 可以指定jar包,但只能添加一个,后面在加-j不生效。
> 2、 按照依赖管理说明[2]:使用pipeline.jars,会报找不到文件的错误
> 配置信息
> t_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///path/flink-sql-connector-kafka_2.11-1.11.0.jar;file:///path/flink-connector-jdbc_2.11-1.11.0.jar;file:///path/mysql-connector-java-5.1.38.jar
 
")
>  
> 启动命令
> # flink run -m yarn-cluster -pyarch venv.zip -pyexec 
venv.zip/venv/bin/Python -py StreamingKafkaToMysql.py
>  
> 错误信息
> 
>  
> [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html 

> [2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/dependency_management.html
 






修改flink的任务调度

2021-01-05 文章 penguin.
Hi,请问大家知道怎么更改flink默认的任务调度方式吗?

Re: PyFlink on Yarn, Per-Job模式,如何增加多个外部依赖jar包?

2021-01-05 文章 Wei Zhong
Hi Zhizhao,

能检查一下'file://' 后面跟的是绝对路径吗?这个报错是因为对应的路径在本地磁盘上找不到导致的。

> 在 2021年1月6日,10:23,Zhizhao Shangguan  写道:
> 
> Hi:
>   PyFlink on Yarn, 
> Per-Job模式,如何增加多个外部依赖jar包?比如flink-sql-connector-kafka、flink-connector-jdbc等。
>  
>  环境信息
>  Flink 版本:1.11.0
>  Os: mac
>  
>  尝试了如下方案,遇到了一些问题
> 1、  按照官网cli说明[1]:-j 可以指定jar包,但只能添加一个,后面在加-j不生效。
> 2、 按照依赖管理说明[2]:使用pipeline.jars,会报找不到文件的错误
> 配置信息
> t_env.get_config().get_configuration().set_string("pipeline.jars", 
> "file:///path/flink-sql-connector-kafka_2.11-1.11.0.jar;file:///path/flink-connector-jdbc_2.11-1.11.0.jar;file:///path/mysql-connector-java-5.1.38.jar
>  
> ")
>  
> 启动命令
> # flink run -m yarn-cluster -pyarch venv.zip -pyexec venv.zip/venv/bin/Python 
> -py StreamingKafkaToMysql.py
>  
> 错误信息
> 
>  
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html 
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/dependency_management.html
>  
> 


Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

2021-01-05 文章 Carmen Free
感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。

紧接着我这边出现了新的异常
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
No content to map due to end-of-input at [Source:UNKONWN; line: -1, column:
-1;]

这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。

但是如果遇到了kafka消息为空的情况,这边不能处理吗?

赵一旦  于2021年1月5日周二 下午9:18写道:

> 我感觉还是jar的问题。如下尝试下,我懒得去试了。
> 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
> org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
> ty.plain.PlainLoginModule
>
> 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。
>
> Carmen Free  于2021年1月5日周二 下午5:09写道:
>
> > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
> >
> > 1、版本说明
> > flink版本:1.10.2
> > kafka版本:1.1.0
> >
> > 2、kafka鉴权说明
> > 仅使用了sasl鉴权方式
> > 在kafka客户端有配置 kafka_server-jass.conf、
> > server.properties、producer.properties、consumer.properties
> >
> > 3、主要配置参数
> > sasl.mechanism=PLAIN
> > security.protocol=SASL_PLAINTEXT
> > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> > required username="xx" password="xx-secret";
> > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
> >
> > 4、用于flink SQL连接的jar包
> > flink-sql-connector-kafka_2.11-1.10.2.jar
> > flink-jdbc_2.11-1.10.2.jar
> > flink-csv-1.10.2-sql-jar.jar
> >
> >
> > 5、我的思路
> > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
> >
> > 6、启动客户端
> > ./bin/sql-client.sh embedded -l sql_lib/
> > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
> >
> >
> > 7、建表语句:
> > create table test_hello (
> > name string
> > ) with (
> > ...
> > ...
> > 'connector.properties.sasl.mechanism' = 'PLAIN',
> > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> > 'connector.properties.sasl.jaas.config' =
> > 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> > username="xx" password="xx-secret";',
> > 'format.type' = 'csv'
> > );
> >
> > 建表没有问题,可以正常建表。
> >
> > 查询表的时候,就会报错,select * from test_hello;
> > 报错如下:
> > could not execute sql statement. Reason:
> > javax.security.auth.login.loginException: unable to find loginModule
> class:
> > org.apache.kafka.common.security.plain.PlainLoginModule
> > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
> >
> > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
> >
>


来自penguin.的邮件

2021-01-05 文章 penguin.
hello,请问大家知道怎么更改flink默认的任务调度方式吗?

flink任务调度

2021-01-05 文章 penguin.
hello,请问大家知道怎么更改flink默认的任务调度方式吗?


flink timestamp 解析问题

2021-01-05 文章 air23
你好 这边使用flink sql有如下问题;






CREATE TABLE source1 (
id BIGINT   ,
username STRING ,
password STRING  ,
AddTime TIMESTAMP  ,
origin_table STRING METADATA FROM 'value.table' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'plink_canal',
'properties.bootstrap.servers' = '',
'properties.group.id' = 'canal1',
'scan.startup.mode' = 'group-offsets',
'canal-json.table.include' = 'test.*',
'format' = 'canal-json'
);



当binlog 数据为非法日期时候。会出现如下报错
Caused by: java.time.format.DateTimeParseException: Text '-00-00 00:00:00' 
could not be parsed: Invalid value for MonthOfYear (valid values 1 - 12): 0
这个有办法解决吗。我们上游业务库 数据 就会有这样的非法日期,
谢谢回答





Re: checkpoint失败怎么排查

2021-01-05 文章 赵一旦
那为什么没有日志呢,去机器看日志呗。

 于2021年1月6日周三 上午10:11写道:

> 应该是状态大,超时设了10分钟,还没有达到超时时间。到处找不到相关日志。
>
> 发自我的iPhone
>
> > 在 2021年1月6日,10:03,赵一旦  写道:
> >
> > 没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。
> >
> >  于2021年1月6日周三 上午9:53写道:
> >
> >> flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
> >>
> >> 发自我的iPhone
>


Re: 请教Flink中关于窗口的问题

2021-01-05 文章 赵一旦
你这个方法就可以的哈,至于第二个窗口又聚到一个结点的问题本身就是原始问题,基于你的方法缓解即可,第二层不可避免的。
你需要做的是调整合理的参数,使得第二层的数据虽然不均衡,但数据量以及足够低就可以了。
此外,还需要注意,当前key数量假设1w,加10随机就是10w,加100随机就是100w。这个key的膨胀也很严重的。最好的做法是仅针对高数据量的key分拆。

syumialiu  于2021年1月5日周二 下午11:53写道:

>
> 我在一个job中有一些很大的数据(key的种类很少,但是单个key下的数据数量很多),基本要实现的是一个时间滑动窗口结束时,当某个key的数量大于一个固定值后,将该key下的所有原数据输出。我现在的方法是将key加后缀,然后keyBy做窗口,但是这个做完之后还是需要再次keyBy把数据还原回去,并且这个过程又将全量数据拉到了一个节点上,请问有没有一些别的解决方法?
>
>
> | |
> syumialiu
> |
> |
> syumia...@163.com
> |
> 签名由网易邮箱大师定制


Re: 回复: flinksql1.11 查询phoenix维表报错Caused by: org.apache.calcite.avatica.NoSuchStatementException

2021-01-05 文章 hoose
@chengyanan1...@foxmail.com 你那里也遇过吗,这个要是bug,也不会修复呀



--
Sent from: http://apache-flink.147419.n8.nabble.com/


hive模块依赖orc版本与flink-orc版本不一致问题

2021-01-05 文章 奔跑的小飞袁
hello
 
目前我碰到一个问题,当我同时使用flink-orc_2.11-1.11.1.jar与flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar时发现针对orc这种数据格式所依赖的版本不同,我们hive版本是cdh
1.1.0,官网看到依赖的orc版本是1.4.3并且无需orc-shims这个依赖,但是flink-orc这个模块需要同时依赖orc-core
1.5.6与orc-shims 1.5.6,这两个模块如何同时使用



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint失败怎么排查

2021-01-05 文章 abc15606
应该是状态大,超时设了10分钟,还没有达到超时时间。到处找不到相关日志。

发自我的iPhone

> 在 2021年1月6日,10:03,赵一旦  写道:
> 
> 没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。
> 
>  于2021年1月6日周三 上午9:53写道:
> 
>> flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
>> 
>> 发自我的iPhone


Re: checkpoint失败怎么排查

2021-01-05 文章 赵一旦
没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。

 于2021年1月6日周三 上午9:53写道:

> flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
>
> 发自我的iPhone


Re: flink cpu 利用率

2021-01-05 文章 LakeShen
看下 Flink 任务运行,是否是其他机器上的资源先达到瓶颈,而不是 CPU,比如 IO,同时看下你的 flatmap 处理单条记录的时间。
同时也参考上面同学的,是否存在反压,如果 flatmap 逻辑比较复杂,也有这个可能。

Best,
LakeShen

赵一旦  于2021年1月5日周二 下午9:13写道:

>
> 可以看看是否反压。反压说明并行度还是不够,不反压的话看处理速度是否符合预期。符合预期就不用调了,说明你的任务不复杂,那点cpu占用就够了。如果不符合预期,也没有任何反压,那就是source消费速度太慢。
>
>
>
> housezhang  于2021年1月5日周二 下午5:44写道:
>
> > 有可能是cpu能够处理得过来,网络io处理不过来了,看看网络使用情况
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


?????? crontab????????????flink-job????,flink-sql-parquet_2.11-1.12.0.jar does not exist

2021-01-05 文章 ??????
thank you


----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html
gt; <
gt; 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.htmlgt;;
gt;
gt;
gt;
gt;
gt; --
gt; Sent from: http://apache-flink.147419.n8.nabble.com/
gt;

checkpoint失败怎么排查

2021-01-05 文章 abc15606
flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?

发自我的iPhone

?????? crontab????????????flink-job????,flink-sql-parquet_2.11-1.12.0.jar does not exist

2021-01-05 文章 ????
  /etc/profile




export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera/
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar


export 
ZOOKEEPER_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/zookeeper
export 
HADOOP_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop
export 
YARN_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop-yarn
export HIVE_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive
export 
HBASE_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hbase
export 
SPARK_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark
export 
OOZIE_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/oozie
export 
SQOOP_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/sqoop
export 
KAFKA_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka
export 
IMPALA_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/impala
export HADOOP_CONF_DIR=/etc/hadoop/conf
export 
HADOOP_COMMON_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop
export 
HADOOP_HDFS_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop-hdfs
export 
HADOOP_MAPRED_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/bin


export HADOOP_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"


export 
PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$YARN_HOME/bin:HADOOP_MAPRED_HOME
export 
PATH=$PATH:$HBASE_HOME/bin:$HIVE_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin
export 
PATH=$PATH:$OOZIE_HOME/bin:$IMPALA_HOME/bin:$IMPALA_HOME/sbin:$SQOOP_HOME/bin:$KAFKA_HOME/bin


export  PHOENIX_HOME=/opt/apache-phoenix-5.0.0-HBase-2.0-bin/
export PHOENIX_CLASSPATH=$PHOENIX_HOME
export PATH=$PATH:$PHOENIX_HOME/bin


#flink
export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.12.0
export PATH=$PATH:$FLINK_HOME/bin
export HADOOP_CLASSPATH=`hadoop classpath`




| |

|
|
liuha...@163.com
|
??
??2021??1??6?? 09:32<25977...@qq.com> ??
-??crontabflink-jobazkaban??



----
??: "zhisheng"https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html
 <
 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html;;




 --
 Sent from: http://apache-flink.147419.n8.nabble.com/


????: flinksql1.11 ????phoenix????????Caused by: org.apache.calcite.avatica.NoSuchStatementException

2021-01-05 文章 Evan
??bug



 
 
?? 2021-01-05 20:20
 user-zh
?? flinksql1.11 phoenixCaused by: 
org.apache.calcite.avatica.NoSuchStatementException
??
flinkv1.11phoneix 1.14.1

CREATE TABLE pe_login_kafka (
 id INT,
region_id INT,
 ts TIMESTAMP(3),
 proc_time AS PROCTIME()
) WITH (
 ??connector?? = ??kafka??,
 ??topic?? = ??t-region,
 ??properties.bootstrap.servers?? = ????,
 ??properties.group.id?? = gid??);
CREATE TABLE region_dim(
  id INT,
  region_name STRING
 ) WITH (
 ??connector?? = ??jdbc??,
  ??url?? = 
??jdbc:phoenix:thin:url=http://172.168.1.15:8765;serialization=PROTOBUF??,
  ??table-name?? = ph_region??,
  ??lookup.cache.max-rows?? = ??5000??,
  ??lookup.cache.ttl?? = ??600s??,
  ??lookup.max-retries?? = ??3??);
 
--sink
INSERT INTO 
FROM pe_login_kafka k LEFT JOIN region_dim FOR SYSTEM_TIME AS OF 
k.proc_time AS u
ON k.region_id = u.id;
??,
Caused by: java.sql.SQLException
at org.apache.calcite.avatica.Helper.createException(Helper.java:56) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.Helper.createException(Helper.java:41) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
 ~[flink-connector-jdbc_2.11-1.11.1.jar:1.11.1]
... 18 more
 
 
 
Caused by: org.apache.calcite.avatica.NoSuchStatementException
at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
... 20 more
2021-01-05 19:40:07,469 ERROR 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC 
executeBatch error, retry times = 2
java.sql.SQLException: null
at org.apache.calcite.avatica.Helper.createException(Helper.java:56) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.Helper.createException(Helper.java:41) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
 [flink-connector-jdbc_2.11-1.11.1.jar:1.11.1]
 
 
 
??jdbc??org.apache.calcite.avatica.AvaticaConnection??connection??Statement
??
 
 



?????? crontab????????????flink-job????,flink-sql-parquet_2.11-1.12.0.jar does not exist

2021-01-05 文章 ??????
$HADOOP_CLASSPATH??/home/xjia/opt/module/hadoop3.2.1/lib/native??




----
??: "zhisheng"https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html
 <
 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html;;




 --
 Sent from: http://apache-flink.147419.n8.nabble.com/


?????? crontab????????????flink-job????,flink-sql-parquet_2.11-1.12.0.jar does not exist

2021-01-05 文章 ??????
-??crontabflink-jobazkaban??



----
??: "zhisheng"https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html
 <
 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html;;




 --
 Sent from: http://apache-flink.147419.n8.nabble.com/


请教Flink中关于窗口的问题

2021-01-05 文章 syumialiu
我在一个job中有一些很大的数据(key的种类很少,但是单个key下的数据数量很多),基本要实现的是一个时间滑动窗口结束时,当某个key的数量大于一个固定值后,将该key下的所有原数据输出。我现在的方法是将key加后缀,然后keyBy做窗口,但是这个做完之后还是需要再次keyBy把数据还原回去,并且这个过程又将全量数据拉到了一个节点上,请问有没有一些别的解决方法?


| |
syumialiu
|
|
syumia...@163.com
|
签名由网易邮箱大师定制

请教Flink中关于窗口的问题

2021-01-05 文章 syumialiu
我在一个job中有一些很大的数据(key的种类很少,但是单个key下的数据数量很多),基本要实现的是一个时间滑动窗口结束时,当某个key的数量大于一个固定值后,将该key下的所有原数据输出。我现在的方法是将key加后缀,然后keyBy做窗口,但是这个做完之后还是需要再次keyBy把数据还原回去,并且这个过程又将全量数据拉到了一个节点上,请问有没有一些别的解决方法?


| |
syumialiu
|
|
syumia...@163.com
|
签名由网易邮箱大师定制

Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

2021-01-05 文章 赵一旦
我感觉还是jar的问题。如下尝试下,我懒得去试了。
将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
ty.plain.PlainLoginModule

因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。

Carmen Free  于2021年1月5日周二 下午5:09写道:

> flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
>
> 1、版本说明
> flink版本:1.10.2
> kafka版本:1.1.0
>
> 2、kafka鉴权说明
> 仅使用了sasl鉴权方式
> 在kafka客户端有配置 kafka_server-jass.conf、
> server.properties、producer.properties、consumer.properties
>
> 3、主要配置参数
> sasl.mechanism=PLAIN
> security.protocol=SASL_PLAINTEXT
> sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
> required username="xx" password="xx-secret";
> 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
>
> 4、用于flink SQL连接的jar包
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-jdbc_2.11-1.10.2.jar
> flink-csv-1.10.2-sql-jar.jar
>
>
> 5、我的思路
> 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
> table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
>
> 6、启动客户端
> ./bin/sql-client.sh embedded -l sql_lib/
> 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
>
>
> 7、建表语句:
> create table test_hello (
> name string
> ) with (
> ...
> ...
> 'connector.properties.sasl.mechanism' = 'PLAIN',
> 'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
> 'connector.properties.sasl.jaas.config' =
> 'org.apache.kafka.comon.security.plain.PlainLoginModule required
> username="xx" password="xx-secret";',
> 'format.type' = 'csv'
> );
>
> 建表没有问题,可以正常建表。
>
> 查询表的时候,就会报错,select * from test_hello;
> 报错如下:
> could not execute sql statement. Reason:
> javax.security.auth.login.loginException: unable to find loginModule class:
> org.apache.kafka.common.security.plain.PlainLoginModule
> 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
>
> kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。
>


Re: flink cpu 利用率

2021-01-05 文章 赵一旦
可以看看是否反压。反压说明并行度还是不够,不反压的话看处理速度是否符合预期。符合预期就不用调了,说明你的任务不复杂,那点cpu占用就够了。如果不符合预期,也没有任何反压,那就是source消费速度太慢。



housezhang  于2021年1月5日周二 下午5:44写道:

> 有可能是cpu能够处理得过来,网络io处理不过来了,看看网络使用情况
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 空指针警告

2021-01-05 文章 lp
好的,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinksql1.11 ????phoenix????????Caused by: org.apache.calcite.avatica.NoSuchStatementException

2021-01-05 文章 ????
??
flinkv1.11phoneix 1.14.1
CREATE TABLE pe_login_kafka (
id INT,
region_id INT,
ts TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 't-region,
'properties.bootstrap.servers' = '',
'properties.group.id' = gid');



CREATE TABLE region_dim(
 id INT,
 region_name STRING
) WITH (
'connector' = 'jdbc',
 'url' = 
'jdbc:phoenix:thin:url=http://172.168.1.15:8765;serialization=PROTOBUF',
 'table-name' = ph_region',
 'lookup.cache.max-rows' = '5000',
 'lookup.cache.ttl' = '600s',
 'lookup.max-retries' = '3');

--sink
INSERT INTO 
FROM pe_login_kafka k LEFT JOIN region_dim FOR SYSTEM_TIME AS OF 
k.proc_time AS u
ON k.region_id = u.id;
??,
Caused by: java.sql.SQLException
at org.apache.calcite.avatica.Helper.createException(Helper.java:56) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.Helper.createException(Helper.java:41) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
 ~[flink-connector-jdbc_2.11-1.11.1.jar:1.11.1]
... 18 more



Caused by: org.apache.calcite.avatica.NoSuchStatementException
at 
org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
... 20 more
2021-01-05 19:40:07,469 ERROR 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC 
executeBatch error, retry times = 2
java.sql.SQLException: null
at org.apache.calcite.avatica.Helper.createException(Helper.java:56) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.Helper.createException(Helper.java:41) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
 [flink-connector-jdbc_2.11-1.11.1.jar:1.11.1]



??jdbc??org.apache.calcite.avatica.AvaticaConnection??connection??Statement
??




Re: flink 空指针警告

2021-01-05 文章 赵一旦
这个问题en...出在如下地方:

KeyedStream keyByStream =
signoutTimeAndWM.keyBy(new KeySelector() {
@Override
public String getKey(ShareRealTimeData value) throws Exception {
return DateUtilMinutes.timeStampToDate(new
Date().getTime());  //  此处,不可以使用new Date这种当前时间。
}
});

修改,如果非要实现这种效果,可以先通过flatMap方式,针对每个元素 new Date
然后将这个date设置到ShareRealTimeData类中的一个字段(比如叫做key)。
然后再 keyBy(e->e.getKey()) 基于key这个字段做keyBy,效果一样,但不会出你这个问题。

原理比较复杂,和Flink的key分发机制有关,你这种写法会导致一个元素的key不稳定,因为实际就是<随机>的key。

lp <973182...@qq.com> 于2021年1月5日周二 下午8:11写道:

> operator操作:processWindowFunction的代码如下:
>
> class MyProcessWindowFuncation extends
> ProcessWindowFunction Tuple2String, String>>, String, TimeWindow>{
> private transient MapState>
> eveShareNoMaxPrice;
> private transient ValueState String>>> shareAndMaxPrice;
>
>
> @Override
> public void process(String s, Context context,
> Iterable elements, Collector Tuple2String, String>>> out) throws Exception {
> Iterator iterator = elements.iterator();
>
> //得到每trigger周期内每个shareNo的最大值
> while (iterator.hasNext()) {
> ShareRealTimeData next = iterator.next();
> Tuple2 t2 =
> eveShareNoMaxPrice.get(next.getShareNo());
> if (t2 == null || t2.f1 < next.getCurrentPrice()) {
> eveShareNoMaxPrice.put(next.getShareNo(),
> Tuple2.of(next.getShareName(), next.getCurrentPrice()));
> }
> }
>
>
> TreeMap> shareAndMaxPriceV =
> shareAndMaxPrice.value();
> if (shareAndMaxPriceV == null) {
> shareAndMaxPriceV = new TreeMap(new Comparator() {
> @Override
> public int compare(Double o1, Double o2) {
> return Double.compare(o2, o1);
> }
> });
> }
> Iterator>>
> keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator();
> while (keysAndMaxPrice.hasNext()) {
> Map.Entry> next =
> keysAndMaxPrice.next();
>
> shareAndMaxPriceV.put(next.getValue().f1,
> Tuple2.of(next.getKey(), next.getValue().f0));
> if (shareAndMaxPriceV.size() > 20) {
> shareAndMaxPriceV.pollLastEntry();
> }
> }
>
> eveShareNoMaxPrice.clear();
> shareAndMaxPrice.clear();
>
> out.collect(shareAndMaxPriceV);
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> eveShareNoMaxPrice = getRuntimeContext().getMapState(new
> MapStateDescriptor>("eveShareNoMaxPrice",
> TypeInformation.of(new TypeHint() {
> }), TypeInformation.of(new TypeHint Double>>()
> {
> })));
> shareAndMaxPrice = getRuntimeContext().getState(new
> ValueStateDescriptor String>>>("shareAndMaxPrice", TypeInformation.of(new
> TypeHint>>() {
> })));
> }
> }
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink on k8s 1.11.3版本,使用 hdfs 的任务, taskmanager 无法启动的问题

2021-01-05 文章 龙逸尘
按@Yang Wang 的指导,在 flink 脚本中设置了 HADOOP_CONF_DIR 可以成功运行了,感谢!

Yang Wang  于2021年1月4日周一 下午9:12写道:

> 1.11版本以后可以直接在Flink Client的机器上export HADOOP_CONF_DIR
> 然后运行flink run-application或者kubernetes_session.sh启动Flink任务,这样Flink
> Client会自动通过ConfigMap将Hadoop配置ship到JobManager和TaskManager pod
> 并且加到classpath的
>
> Best,
> Yang
>
> 龙逸尘  于2021年1月4日周一 下午4:39写道:
>
> > 各位下午好,目前我正在使用 Flink on k8s application-mode 构建构建一个消费 kafka 写入 hive 的
> > demo,使用 hdfs 作为 statebackend,遇到了一些问题,希望能得到帮助。这里我描述一下问题与 debug 的过程。
> >
> > Dockerfile 如下
> >
> > FROM flink:1.11.3-scala_2.11
> > RUN mkdir -p $FLINK_HOME/usrlib
> > RUN mkdir -p /opt/hadoop/conf
> > COPY flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> > $FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> > COPY flink-on-k8s-1.0-SNAPSHOT.jar
> > $FLINK_HOME/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
> > COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
> > ENV HADOOP_CONF_DIR /opt/hadoop/conf
> > ENV YARN_CONF_DIR /opt/hadoop/conf
> > COPY yarn-site.xml /opt/hadoop/conf/yarn-site.xml
> > COPY hdfs-site.xml /opt/hadoop/conf/hdfs-site.xml
> > COPY core-site.xml /opt/hadoop/conf/core-site.xml
> >
> > 启动命令如下
> >
> > flink-1.11.3/bin/flink run-application -p 1 -t kubernetes-application
> > -Dkubernetes.cluster-id=my-first-application-cluster-demo7-4
> > -Dkubernetes.jobmanager.service-account=flink
> > -Dtaskmanager.memory.process.size=1024m   -Dkubernetes.taskmanager.cpu=1
> > -Dtaskmanager.numberOfTaskSlots=1
> > -Dkubernetes.container-start-command-template="%java% %classpath%
> %jvmmem%
> > %jvmopts% %logging% %class% %args%"
> > -Dkubernetes.container.image=flink:demo7-4
> > -Dkubernetes.rest-service.exposed.type=NodePort
> > local:///opt/flink/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
> >
> > flink-on-k8s-1.0-SNAPSHOT.jar 这个 jar 包仅仅是消费 kafka,使用 hdfs 作为 statebackend
> > 记录状态。
> >
> > 一开始尝试只把 yarn-site.xml 等三个文件放在 usrlib 目录下,JobManager 无法启动,报错是
> > UnknownHost。参考邮件列表中的信息,设置HADOOP_CONF_DIR之后,JobManager 成功启动且没有报错日志,但是
> > TaskManager 一直处于 ContainerCreating 状态,7-8 分钟后 deployment 自动退出。使用 describe
> > pod 获取 tm 报错信息如下:
> >
> > Events:
> >   Type Reason   Age   From
> >  Message
> >    --     
> >  ---
> >   Normal   Scheduled default-scheduler
> > Successfully assigned
> > default/my-first-application-cluster-demo7-4-taskmanager-1-1 to
> > k8s-node0002
> >   Warning  FailedMount  37s (x10 over 4m46s)  kubelet,
> > k8s-ci-dcn-bigdata-node0002  MountVolume.SetUp failed for volume
> > "hadoop-config-volume" : configmap
> > "hadoop-config-my-first-application-cluster-demo7-4" not found
> >   Warning  FailedMount  29s (x2 over 2m44s)   kubelet, k8s-node0002
> Unable
> > to attach or mount volumes: unmounted volumes=[hadoop-config-volume],
> > unattached volumes=[hadoop-config-volume flink-config-volume
> > default-token-fhkhf]: timed out waiting for the condition
> >
> > 请问我是否配置有误,还是需要别的配置来启用 hdfs。
> > 期待您的回复~
> >
> > ---
> > Best Regards!
> >
> > Yichen
> >
>


Re: flink 空指针警告

2021-01-05 文章 lp
operator操作:processWindowFunction的代码如下:

class MyProcessWindowFuncation extends
ProcessWindowFunction>, String, TimeWindow>{
private transient MapState>
eveShareNoMaxPrice;
private transient ValueState>> shareAndMaxPrice;


@Override
public void process(String s, Context context,
Iterable elements, Collector>> out) throws Exception {
Iterator iterator = elements.iterator();

//得到每trigger周期内每个shareNo的最大值
while (iterator.hasNext()) {
ShareRealTimeData next = iterator.next();
Tuple2 t2 =
eveShareNoMaxPrice.get(next.getShareNo());
if (t2 == null || t2.f1 < next.getCurrentPrice()) {
eveShareNoMaxPrice.put(next.getShareNo(),
Tuple2.of(next.getShareName(), next.getCurrentPrice()));
}
}


TreeMap> shareAndMaxPriceV =
shareAndMaxPrice.value();
if (shareAndMaxPriceV == null) {
shareAndMaxPriceV = new TreeMap(new Comparator() {
@Override
public int compare(Double o1, Double o2) {
return Double.compare(o2, o1);
}
});
}
Iterator>>
keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator();
while (keysAndMaxPrice.hasNext()) {
Map.Entry> next =
keysAndMaxPrice.next();

shareAndMaxPriceV.put(next.getValue().f1,
Tuple2.of(next.getKey(), next.getValue().f0));
if (shareAndMaxPriceV.size() > 20) {
shareAndMaxPriceV.pollLastEntry();
}
}

eveShareNoMaxPrice.clear();
shareAndMaxPrice.clear();

out.collect(shareAndMaxPriceV);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
eveShareNoMaxPrice = getRuntimeContext().getMapState(new
MapStateDescriptor>("eveShareNoMaxPrice",
TypeInformation.of(new TypeHint() {
}), TypeInformation.of(new TypeHint>()
{
})));
shareAndMaxPrice = getRuntimeContext().getState(new
ValueStateDescriptor>>("shareAndMaxPrice", TypeInformation.of(new
TypeHint>>() {
})));
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 空指针警告

2021-01-05 文章 lp
我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码:

public static String timeStampToDate(Long timestamp){
ThreadLocal threadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("-MM-dd HH:mm:ss"));
String format = threadLocal.get().format(new Date(timestamp));
return format.substring(0,19);
}



根据数据所在的分钟keyBy后,我用了一个1min的滚动窗口,每500ms trigger一次,如下:

.
.
.
//根据数据所在的分钟(processingTime) keyBy
KeyedStream keyByStream =
signoutTimeAndWM.keyBy(new KeySelector() {
@Override
public String getKey(ShareRealTimeData value) throws Exception {
return DateUtilMinutes.timeStampToDate(new
Date().getTime());
}
});


SingleOutputStreamOperator>> topNforEveWindow = keyByStream
   
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1000)))
   
.trigger(ContinuousProcessingTimeTrigger.of(Time.milliseconds(500)))
//.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new MyProcessWindowFuncation());


//sink
topNforEveWindow.printToErr("topNforEveWindow");

.
.
.

程序运行时,随机在某些整分钟时抛出以下空指针警告:
19:49:22,001 WARN  org.apache.flink.runtime.taskmanager.Task   
[] - Window(TumblingProcessingTimeWindows(1000),
ContinuousProcessingTimeTrigger, TimeEvictor, ProcessWindowFunction$4) ->
Sink: Print to Std. Err (3/8) (222821e43f98390a2f5e3baeb5b542a8) switched
from RUNNING to FAILED.
java.lang.NullPointerException: null
at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:99)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:203)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]


请帮忙查看是什么原因?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: crontab通过脚本启动flink-job失败,flink-sql-parquet_2.11-1.12.0.jar does not exist

2021-01-05 文章 zhisheng
hi

可以检查一下提交任务的 flink 客户端的 lib 目录下面是否有 flink-sql-parquet_2.11-1.12.0.jar 依赖

Best
zhisheng

冯嘉伟 <1425385...@qq.com> 于2021年1月4日周一 上午9:58写道:

> hi!
>
> java.io.FileNotFoundException: File file:/home/xjia/.flink/...
> 可以看出,从本地加载jar包,而不是hdfs。
>
> 我觉得可能是hadoop环境的问题,导致读取的scheme是file,使用 echo $HADOOP_CLASSPATH 检查你的环境。
>
> Important Make sure that the HADOOP_CLASSPATH environment variable is set
> up
> (it can be checked by running echo $HADOOP_CLASSPATH). If not, set it up
> using
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Some questions about limit push down

2021-01-05 文章 Arvid Heise
This is most likely a bug, could you reiterate a bit how it is invalid?
I'm also CCing Jark since he is one of the SQL experts.

On Mon, Dec 28, 2020 at 10:37 AM Jun Zhang 
wrote:

> when I query hive table by sql, like this `select * from hivetable where
> id = 1 limit 1`,   I found that the limit push down is invalid, is it a bug
> or was it designed like this?
>
> if the sql is  'select * from hivetable  limit 1'  ,it is ok
>
> thanks
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


回复:apache flink

2021-01-05 文章
请问需要在flink源码的哪些地方修改才能实现自己的任务调度呢 1214316932 邮箱:1214316...@qq.com 签名由 网易邮箱大师 定制 
在2021年01月05日 11:27,Waldeinsamkeit. 写道: 是的,目前是想重写任务调度器,按自己的方式来将任务调度到集群的节点中。 
--原始邮件-- 发件人:                       
                                                                                
                 "user-zh"                                                      
                              <573693...@qq.com; 发送时间:2021年1月5日(星期二) 
中午11:19 收件人:"user-zh"

Re: Flink SQL>查询的hive表数据全部为NULL

2021-01-05 文章 housezhang
可以看下flink jobmanager 上的日志,会不会有什么异常出现了。s



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.10在udf中传入array类型的解析异常

2021-01-05 文章 sunfulin
hi,
我遇到一个问题,消费的source里有字段定义为array>这种类型,然后想通过一个udf将它处理成一个字符串。udf的入参定义如下:


public String eval(Row[] item, String char1, String char2);


但是在函数处理时,debug发现拿到的item里的row信息始终为null。也通过DataTypeHint注解给出了item的实际类型。这是不是1.10的bug呀?如果有相关的issue单的话,烦请有知道的发我下哈。
我在1.11里验证同样的逻辑,是没这个问题的。

Re: flink cpu 利用率

2021-01-05 文章 housezhang
有可能是cpu能够处理得过来,网络io处理不过来了,看看网络使用情况



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

2021-01-05 文章 Carmen Free
flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

1、版本说明
flink版本:1.10.2
kafka版本:1.1.0

2、kafka鉴权说明
仅使用了sasl鉴权方式
在kafka客户端有配置 kafka_server-jass.conf、
server.properties、producer.properties、consumer.properties

3、主要配置参数
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
required username="xx" password="xx-secret";
当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。

4、用于flink SQL连接的jar包
flink-sql-connector-kafka_2.11-1.10.2.jar
flink-jdbc_2.11-1.10.2.jar
flink-csv-1.10.2-sql-jar.jar


5、我的思路
类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。

6、启动客户端
./bin/sql-client.sh embedded -l sql_lib/
其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包


7、建表语句:
create table test_hello (
name string
) with (
...
...
'connector.properties.sasl.mechanism' = 'PLAIN',
'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
'connector.properties.sasl.jaas.config' =
'org.apache.kafka.comon.security.plain.PlainLoginModule required
username="xx" password="xx-secret";',
'format.type' = 'csv'
);

建表没有问题,可以正常建表。

查询表的时候,就会报错,select * from test_hello;
报错如下:
could not execute sql statement. Reason:
javax.security.auth.login.loginException: unable to find loginModule class:
org.apache.kafka.common.security.plain.PlainLoginModule
但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?

kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。


Re:Re: Re: flink cpu 利用率

2021-01-05 文章 爱吃鱼
我在测试的时候12个并行度,16,24都测试了但启任务后的cpu利用率还是 140%左右,不管并行度设置为多少。

















在 2021-01-05 16:49:02,"赵一旦"  写道:
>不纠结几核。如果任务结点本身不多的话,可以提一提再,只要network buffer数量够就好。
>
>爱吃鱼  于2021年1月5日周二 下午4:39写道:
>
>> 24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-01-05 16:24:30,"赵一旦"  写道:
>> >加大并行度。
>> >
>> >爱吃鱼  于2021年1月5日周二 下午4:18写道:
>> >
>> >> 怎么提高flink cpu利用率。
>> >> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
>> >> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
>> >> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。
>>
>>
>>
>>
>>
>>


flink cpu 利用率

2021-01-05 文章 爱吃鱼
怎么提高flink cpu利用率。
业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。




 

Re: Re: flink cpu 利用率

2021-01-05 文章 赵一旦
不纠结几核。如果任务结点本身不多的话,可以提一提再,只要network buffer数量够就好。

爱吃鱼  于2021年1月5日周二 下午4:39写道:

> 24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-05 16:24:30,"赵一旦"  写道:
> >加大并行度。
> >
> >爱吃鱼  于2021年1月5日周二 下午4:18写道:
> >
> >> 怎么提高flink cpu利用率。
> >> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
> >> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
> >> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。
>
>
>
>
>
>


回复:flink cpu 利用率

2021-01-05 文章 爱吃鱼


在2021年01月05日 16:37,爱吃鱼 写道:

24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右

















在 2021-01-05 16:24:30,"赵一旦"  写道:
>加大并行度。
>
>爱吃鱼  于2021年1月5日周二 下午4:18写道:
>
>> 怎么提高flink cpu利用率。
>> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
>> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
>> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。





 





 

Re:Re: flink cpu 利用率

2021-01-05 文章 爱吃鱼
24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右

















在 2021-01-05 16:24:30,"赵一旦"  写道:
>加大并行度。
>
>爱吃鱼  于2021年1月5日周二 下午4:18写道:
>
>> 怎么提高flink cpu利用率。
>> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
>> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
>> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。





 

Re:Re: flink cpu 利用率

2021-01-05 文章 爱吃鱼
24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右

















在 2021-01-05 16:24:30,"赵一旦"  写道:
>加大并行度。
>
>爱吃鱼  于2021年1月5日周二 下午4:18写道:
>
>> 怎么提高flink cpu利用率。
>> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
>> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
>> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。


?????? flink 1.12 Cancel Job??????????(??)

2021-01-05 文章 ??????
flink-sql,with-upsertMemoryStateBackendenv.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)??



----
??: ""

flink cpu 利用率

2021-01-05 文章 爱吃鱼
怎么提高flink cpu利用率。
业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。




 





 

Re: flink cpu 利用率

2021-01-05 文章 赵一旦
加大并行度。

爱吃鱼  于2021年1月5日周二 下午4:18写道:

> 怎么提高flink cpu利用率。
> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。


flink cpu 利用率

2021-01-05 文章 爱吃鱼
怎么提高flink cpu利用率。
业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。




 

flink cpu 利用率

2021-01-05 文章 爱吃鱼
怎么提高flink cpu利用率。
业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。

flink????????????

2021-01-05 文章 Waldeinsamkeit.