Re: flink1.11.2 yarn-session 部分类路径未加载

2021-06-01 文章 datayangl
启动是没有问题的,问题在于sql-client 使用yarn-session时会报错误。
/opt/flink-1.11.2/bin/sql-client.sh embedded -s yarn-session
启动sql-client之后执行hive数据查询报错,报错如下:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error., 
SinkConversionToRow -> Sink: Unnamed': Loading the input/output formats
failed:
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
at
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
... 7 more
Caused by: java.lang.Exception: Loading the input/output formats failed:
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:155)
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:59)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212)
... 19 more
Caused by: java.lang.RuntimeException: Deserializing the input/output
formats failed: org/apache/hadoop/mapred/JobConf
at
org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.(InputOutputFormatContainer.java:68)
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:152)
... 21 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at java.lang.Class.getDeclaredField(Class.java:2068)
at 
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.HashMap.readObject(HashMap.java:1396)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 

回复:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-01 文章 smq
你这个解决了吗,我也遇到了同样的问题





-- 原始邮件 --
发件人: todd http://apache-flink.147419.n8.nabble.com/

Re: flink1.11.2 yarn-session 部分类路径未加载

2021-06-01 文章 Zhiwen Sun
不需要 mapreduce 相关库吧。

我看我的 job 里加载到 classpath 的也没有 mapreduce。

Zhiwen Sun



On Wed, Jun 2, 2021 at 11:56 AM datayangl  wrote:

> flink1.11.2 启动yarn-session之后发现,有部分类路径始终没有加载到class_path中去
> 环境变量配置如下:
> <
> http://apache-flink.147419.n8.nabble.com/file/t919/66604010-2A08-4A68-8478-70A27D61224B.png>
>
>
> 其中tm的日志如下:
> tm.log 
>
> 其中hadoop-mapreduce-client相关的类路径一直没有加载到class_path中,求指教
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink1.11.2 yarn-session 部分类路径未加载

2021-06-01 文章 datayangl
flink1.11.2 启动yarn-session之后发现,有部分类路径始终没有加载到class_path中去
环境变量配置如下:

 

其中tm的日志如下:
tm.log   

其中hadoop-mapreduce-client相关的类路径一直没有加载到class_path中,求指教



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-06-01 文章 Fighting
退订

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 文章 HunterXHunter
那会一直增大下去吗,我跑了4天,ckp一直变大,没有稳定的迹象。是不是我需要调整compaction的配置



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink是否支持自定义的限流功能

2021-06-01 文章 suisuimu
是的,因为我们是写ES,ES不支持动态反压,所以想问下有没有可配置的静态限流的方法



--
Sent from: http://apache-flink.147419.n8.nabble.com/

退订

2021-06-01 文章 Yu Wang



回复:窗口函数使用的时间类型

2021-06-01 文章 guoyb
谢谢!明天回公司再调试。



---原始邮件---
发件人: "MOBIN"<18814118...@163.com
发送时间: 2021年6月1日(周二) 晚上7:41
收件人: "user-zh@flink.apache.org"https://help.aliyun.com/document_detail/62512.html?spm=a2c4g.11186623.6.827.49531b09XfgsU7




| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月1日 19:37,guoyb<861277...@qq.com 写道:
好的,谢谢!


我试试



---原始邮件---
发件人: "Shuo Cheng"

????

2021-06-01 文章 5599


回复:窗口函数使用的时间类型

2021-06-01 文章 MOBIN
其次可以参考下阿里的demo:
https://help.aliyun.com/document_detail/62512.html?spm=a2c4g.11186623.6.827.49531b09XfgsU7




| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月1日 19:37,guoyb<861277...@qq.com> 写道:
好的,谢谢!


我试试



---原始邮件---
发件人: "Shuo Cheng"

回复:窗口函数使用的时间类型

2021-06-01 文章 guoyb
好的,谢谢!


我试试



---原始邮件---
发件人: "Shuo Cheng"

Re: 窗口函数使用的时间类型

2021-06-01 文章 Shuo Cheng
SQL 流作业 window 可定义在两种时间属性类型字段上:
1) event time: ddl 中需要给时间类型字段 (timestamp) 定义 watermark
2) process time: 使用 PROCTIME()

On 6/1/21, guoyb <861277...@qq.com> wrote:
> 是的。
>
>
> 大神能否指条明路解决这问题。
>
>
>
> ---原始邮件---
> 发件人: "MOBIN"<18814118...@163.com
> 发送时间: 2021年6月1日(周二) 晚上7:09
> 收件人: "user-zh@flink.apache.org" 主题: 回复:窗口函数使用的时间类型
>
>
> 是不是报的类似下面的错?
> Window aggregate can only be defined over a time attribute column, but
> TIMESTAMP(3) encountered
>
>
> | |
> MOBIN
> |
> 签名由网易邮箱大师定制
>
>
> 在2021年06月1日 19:00,guoyb<861277...@qq.com 写道:
> tumble() 开窗,需要的事件时间到底需要什么时间类型?一直报时间不对
> timestamp(3)
> datetime
> time
> 都试过了,没有一个对的。


?????? Pyflink jdbc????

2021-06-01 文章 ????
??




----
??: 
   "user-zh"

https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
pyflink??jdbc??jar??jdbc??flink??1.13.1
 gt; 
 gt; 
 gt; 
 gt; from pyflink.datastream import StreamExecutionEnvironment
 gt; from pyflink.table import StreamTableEnvironment, 
EnvironmentSettings
 gt; env = StreamExecutionEnvironment.get_execution_environment()
 gt; table_env = StreamTableEnvironment.create( 
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
 gt; 
table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
 gt; 
 gt; 
 gt; # 2. create source Table
 gt; table_env.execute_sql("""
 gt; 
 gt; 
 gt; CREATE TABLE table_source (
 gt; amp;nbsp; e string
 gt; ) WITH (
 gt; amp;nbsp;'connector' = 'jdbc',
 gt; amp;nbsp; 'url' = 'jdbc:mysql://:3306/test',
 gt; amp;nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
 gt; amp;nbsp; 'table-name' = 'enum_test',
 gt; amp;nbsp; 'username' = 'pms_etl',
 gt; amp;nbsp; 'password' = 'pms_etl_q'
 gt; )
 gt; 
 gt; 
 gt; """)
 gt; 
 gt; 
 gt; # 3. create sink Table
 gt; table_env.execute_sql("""
 gt; amp;nbsp; amp;nbsp; CREATE TABLE print (
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; e 
string
 gt; amp;nbsp; amp;nbsp; ) WITH (
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; 
'connector' = 'print'
 gt; amp;nbsp; amp;nbsp; )
 gt; """)
 gt; amp;nbsp; amp;nbsp;
 gt; 
 gt; 
 gt; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
table_source").wait()
 gt; 
 gt; 
 gt; 
 gt; python
 gt; 
 gt; 
 gt; Traceback (most recent call last):
 gt; amp;nbsp; File "demo.py", line 41, in 

Re: Pyflink jdbc相关

2021-06-01 文章 Dian Fu
这样试试,把”\”改成”/“:

file:///D:/Pyproject/flink-connector-jdbc_2.11-1.13.1.jar



> 2021年6月1日 下午5:40,琴师 <1129656...@qq.com> 写道:
> 
> 再请叫一个问题,我在pycharm使用时候引用windows的地址不能引用,比如file:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar;
>   这样不能引入,大佬有用用过吗?
> 
> 
> --原始邮件--
> 发件人:  
>   "琴师"
> 
> <1129656...@qq.com;
> 发送时间:2021年6月1日(星期二) 下午5:30
> 收件人:"user-zh" 
> 主题:回复: Pyflink jdbc相关
> 
> 
> 
> 
> 
> 感谢,我换成2.11确实可以了
> 
> 
> -- 原始邮件 --
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年6月1日(星期二) 下午5:04
> 收件人:"user-zh" 
> 主题:Re: Pyflink jdbc相关
> 
> 
> 
> Hi,
> 
> 本地执行:
> 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的
> 
> 
> flink run:
> 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。
> 
> 
>  2021年6月1日 下午4:33,琴师 <1129656...@qq.com 写道:
>  
>  Hi,
>  nbsp; 
> nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
> 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
>  我的原代码如下:
>  
>  
>  from pyflink.datastream import StreamExecutionEnvironment
>  from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>  env = StreamExecutionEnvironment.get_execution_environment()
>  table_env = StreamTableEnvironment.create( 
> env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
>  
> table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
> "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
>  
>  
>  # 2. create source Table
>  table_env.execute_sql("""
>  
>  
>  CREATE TABLE table_source (
>  nbsp; e string
>  ) WITH (
>  nbsp;'connector' = 'jdbc',
>  nbsp; 'url' = 'jdbc:mysql://:3306/test',
>  nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
>  nbsp; 'table-name' = 'enum_test',
>  nbsp; 'username' = 'pms_etl',
>  nbsp; 'password' = 'pms_etl_q'
>  )
>  
>  
>  """)
>  
>  
>  # 3. create sink Table
>  table_env.execute_sql("""
>  nbsp; nbsp; CREATE TABLE print (
>  nbsp; nbsp; nbsp; nbsp; e string
>  nbsp; nbsp; ) WITH (
>  nbsp; nbsp; nbsp; nbsp; 'connector' = 'print'
>  nbsp; nbsp; )
>  """)
>  nbsp; nbsp;
>  
>  
>  table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
> table_source").wait()
>  
>  
>  
>  我直接用python执行时候错误返回如下
>  
>  
>  Traceback (most recent call last):
>  nbsp; File "demo.py", line 41, in   nbsp; nbsp; table_env.execute_sql("INSERT INTO table_sink 
> SELECT * FROM table_source").wait()
>  nbsp; File 
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", 
> line 804, in execute_sql
>  nbsp; nbsp; return TableResult(self._j_tenv.executeSql(stmt))
>  nbsp; File 
> "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in 
> __call__
>  nbsp; nbsp; answer, self.gateway_client, self.target_id, 
> self.name)
>  nbsp; File 
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 
> 146, in deco
>  nbsp; nbsp; return f(*a, **kw)
>  nbsp; File 
> "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in 
> get_return_value
>  nbsp; nbsp; format(target_id, ".", name), value)
>  py4j.protocol.Py4JJavaError: An error occurred while calling 
> o4.executeSql.
>  : org.apache.flink.table.api.ValidationException: Unable to create a 
> source for reading table 'default_catalog.default_database.table_source'.
>  
>  
>  Table options are:
>  
>  
>  'connector'='jdbc'
>  'driver'='com.mysql.cj.jdbc.Driver'
>  'password'='pms_etl_q'
>  'table-name'='enum_test'
>  'url'='jdbc:mysql://***:3306/test'
>  'username'='pms_etl'
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>  nbsp; nbsp; nbsp; nbsp; at 
> 

回复:窗口函数使用的时间类型

2021-06-01 文章 guoyb
是的。


大神能否指条明路解决这问题。



---原始邮件---
发件人: "MOBIN"<18814118...@163.com
发送时间: 2021年6月1日(周二) 晚上7:09
收件人: "user-zh@flink.apache.org"

回复:窗口函数使用的时间类型

2021-06-01 文章 MOBIN
是不是报的类似下面的错?
Window aggregate can only be defined over a time attribute column, but 
TIMESTAMP(3) encountered


| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月1日 19:00,guoyb<861277...@qq.com> 写道:
tumble() 开窗,需要的事件时间到底需要什么时间类型?一直报时间不对
timestamp(3)
datetime
time
都试过了,没有一个对的。

窗口函数使用的时间类型

2021-06-01 文章 guoyb
tumble() 开窗,需要的事件时间到底需要什么时间类型?一直报时间不对
timestamp(3)
datetime
time
都试过了,没有一个对的。

Re: kafka exactly-once语义下,从svaepoint恢复报错

2021-06-01 文章 r pp
 'properties.transaction.timeout.ms' = '3'  配置的太短了,30s
transactionalId   就过期了。 估计 都来不去启动吧
 官网的原文
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
that were started before taking a checkpoint, after recovering from the
said checkpoint. If the time between Flink application crash and completed
restart is larger than Kafka’s transaction timeout there will be data loss
(Kafka will automatically abort transactions that exceeded timeout time).
Having this in mind, please configure your transaction timeout
appropriately to your expected down times.

周瑞  于2021年6月1日周二 下午3:45写道:

>
> 您好:kafka在exactly-once语义下,从svaepoint恢复报错。初步排查认为是kafka事务使用了旧的epoch。请问这个问题怎么处理?
> //todo 通过配置传进来
> env.setParallelism(1);
> env.enableCheckpointing(60L, CheckpointingMode.EXACTLY_ONCE);
>
> // checkpoint的清除策略(即使任务被显示地取消也会保留checkpoint)
> env.getCheckpointConfig()
>
> .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
>
> //TODO 生产中必须使用 HDFS
> env.setStateBackend(new FsStateBackend("hdfs://
> 10.10.98.226:8020/tmp/checkpoint66"));
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> public static final  String TABLE_NAME = "KafkaTable";
> public static final  String COLUMN_NAME = "source_value";
>
> public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' = '3',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
> an operation with an old epoch. Either there is a newer producer with the
> same transactionalId,
>  or the producer's transaction has been expired by the broker. while
> recovering transaction KafkaTransactionState [transactionalId=Source:
> TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) - Sink:
> Sink(table=[default_catalog.default_database.KafkaTable],
> fields=[data])-7df19f87deec5680128845fd9a6ca18d-0, producerId=239009,
> epoch=216]. Presumably this transaction has been already committed before



-- 
Best,
  pp


Re: Flink是否支持自定义的限流功能

2021-06-01 文章 r pp
flink 的反压机制 不就是在限流么?

suisuimu <726400...@qq.com> 于2021年6月1日周二 下午5:37写道:

> Flink从Kafka读取数据时,是否支持用户自定义的限流策略。
> 例如根据消息中的某个字段的名称,设置流控规则。
> 请问是否支持呢?还是需要自己借助第三方组件(例如sentinel)来实现?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best,
  pp


?????? Pyflink jdbc????

2021-06-01 文章 ????
pycharmwindowsfile:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar;
  ??


----
??: 
   ""   
 <1129656...@qq.com;
:2021??6??1??(??) 5:30
??:"user-zh"https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
pyflink??jdbc??jar??jdbc??flink??1.13.1
 
 
 
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment, EnvironmentSettings
 env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create( 
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
 
table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
 
 
 # 2. create source Table
 table_env.execute_sql("""
 
 
 CREATE TABLE table_source (
 nbsp; e string
 ) WITH (
 nbsp;'connector' = 'jdbc',
 nbsp; 'url' = 'jdbc:mysql://:3306/test',
 nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
 nbsp; 'table-name' = 'enum_test',
 nbsp; 'username' = 'pms_etl',
 nbsp; 'password' = 'pms_etl_q'
 )
 
 
 """)
 
 
 # 3. create sink Table
 table_env.execute_sql("""
 nbsp; nbsp; CREATE TABLE print (
 nbsp; nbsp; nbsp; nbsp; e string
 nbsp; nbsp; ) WITH (
 nbsp; nbsp; nbsp; nbsp; 'connector' = 'print'
 nbsp; nbsp; )
 """)
 nbsp; nbsp;
 
 
 table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
table_source").wait()
 
 
 
 python
 
 
 Traceback (most recent call last):
 nbsp; File "demo.py", line 41, in 

Re: 不同的程序在同一时间段报同一个异常

2021-06-01 文章 r pp
你的网络环境是怎么样? 是在docker 上跑么?还是怎么?
从报错上看,netty 无法解码导致的,但是为什么会出现这样的现象?
或许 你可以把问题贴的在详细一点

5599 <673313...@qq.com> 于2021年6月1日周二 下午2:32写道:

> 退订
>
>
>
>
> --原始邮件--
> 发件人: "r pp" 发送时间: 2021年6月1日(星期二) 下午2:07
> 收件人: "user-zh" 主题: Re: 不同的程序在同一时间段报同一个异常
>
>
>
> 你的程序有挂掉么?
>
> mq sun 
>  大家好:
>  最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
>  ERROR org.apache.flink.runtime.blob.BlobServerConnection -Error
> while
>  excuting Blob connection
>  .
>  .
>  .
> 
> 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
>  :Adjusted frame length exceeds 10485760: 1347375960 -discarded
> 
> 
> 
> 上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因
> 
>
>
> --
> Best,
>  pp



-- 
Best,
  pp


Flink是否支持自定义的限流功能

2021-06-01 文章 suisuimu
Flink从Kafka读取数据时,是否支持用户自定义的限流策略。
例如根据消息中的某个字段的名称,设置流控规则。
请问是否支持呢?还是需要自己借助第三方组件(例如sentinel)来实现?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

自定义带有状态的udf

2021-06-01 文章 阿华田
自定义UDF 实现CheckpointedFunction 
伪代码如下 发现并没有执行initializeState






public class ClusterInfoCollectUdf   extends ScalarFunction implements 
CheckpointedFunction {
private static final LoggerLOGGER = 
LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient MapState  mapState;
private   MapStateDescriptor mapStateDescriptor;
   。



@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {


LOGGER.info("the snapshotStateis  started ");


}

@Override
public void initializeState(FunctionInitializationContext context) throws 
Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);

mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeStateis  started ");




}





| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



?????? Pyflink jdbc????

2021-06-01 文章 ????
2.11??


----
??: 
   "user-zh"

https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
pyflink??jdbc??jar??jdbc??flink??1.13.1
 
 
 
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment, EnvironmentSettings
 env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create( 
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
 
table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
 
 
 # 2. create source Table
 table_env.execute_sql("""
 
 
 CREATE TABLE table_source (
 nbsp; e string
 ) WITH (
 nbsp;'connector' = 'jdbc',
 nbsp; 'url' = 'jdbc:mysql://:3306/test',
 nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
 nbsp; 'table-name' = 'enum_test',
 nbsp; 'username' = 'pms_etl',
 nbsp; 'password' = 'pms_etl_q'
 )
 
 
 """)
 
 
 # 3. create sink Table
 table_env.execute_sql("""
 nbsp; nbsp; CREATE TABLE print (
 nbsp; nbsp; nbsp; nbsp; e string
 nbsp; nbsp; ) WITH (
 nbsp; nbsp; nbsp; nbsp; 'connector' = 'print'
 nbsp; nbsp; )
 """)
 nbsp; nbsp;
 
 
 table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
table_source").wait()
 
 
 
 python
 
 
 Traceback (most recent call last):
 nbsp; File "demo.py", line 41, in 

如何自定义带有状态的UDF

2021-06-01 文章 阿华田
自定义UDF 实现CheckpointedFunction 
伪代码如下 发现并没有执行initializeState






public class ClusterInfoCollectUdf   extends ScalarFunction implements 
CheckpointedFunction {
private static final LoggerLOGGER = 
LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient MapState  mapState;
private   MapStateDescriptor mapStateDescriptor;
   。



@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {


LOGGER.info("the snapshotStateis  started ");


}

@Override
public void initializeState(FunctionInitializationContext context) throws 
Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);

mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeStateis  started ");




}



| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: Pyflink jdbc相关

2021-06-01 文章 Dian Fu
Hi,

本地执行:
1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的


flink run:
1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。


> 2021年6月1日 下午4:33,琴师 <1129656...@qq.com> 写道:
> 
> Hi,
>  我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
> 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
> 我的原代码如下:
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> env = StreamExecutionEnvironment.get_execution_environment()
> table_env = StreamTableEnvironment.create( 
> env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
> table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
> "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
> 
> 
> # 2. create source Table
> table_env.execute_sql("""
> 
> 
> CREATE TABLE table_source (
>  e string
> ) WITH (
> 'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://:3306/test',
>  'driver' = 'com.mysql.cj.jdbc.Driver',
>  'table-name' = 'enum_test',
>  'username' = 'pms_etl',
>  'password' = 'pms_etl_q'
> )
> 
> 
> """)
> 
> 
> # 3. create sink Table
> table_env.execute_sql("""
>   CREATE TABLE print (
> e string
>   ) WITH (
> 'connector' = 'print'
>   )
> """)
>  
> 
> 
> table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
> table_source").wait()
> 
> 
> 
> 我直接用python执行时候错误返回如下
> 
> 
> Traceback (most recent call last):
>  File "demo.py", line 41, intable_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
> table_source").wait()
>  File 
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", 
> line 804, in execute_sql
>   return TableResult(self._j_tenv.executeSql(stmt))
>  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", 
> line 1286, in __call__
>   answer, self.gateway_client, self.target_id, self.name)
>  File 
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 
> 146, in deco
>   return f(*a, **kw)
>  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 
> 328, in get_return_value
>   format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
> : org.apache.flink.table.api.ValidationException: Unable to create a source 
> for reading table 'default_catalog.default_database.table_source'.
> 
> 
> Table options are:
> 
> 
> 'connector'='jdbc'
> 'driver'='com.mysql.cj.jdbc.Driver'
> 'password'='pms_etl_q'
> 'table-name'='enum_test'
> 'url'='jdbc:mysql://***:3306/test'
> 'username'='pms_etl'
> at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at 
> 

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 文章 Yun Tang
Hi,

增量checkpoint上传的是sst文件本身,里面可能有一部分空间是被无用数据占据的,你可以理解成增量checkpoint上传的是受到空间放大影响的RocksDB的数据,如果因为单机的数据量较小,没有及时触发compaction的话,确实存在整个远程checkpoint目录数据大于当前实际空间的情况。而关闭增量checkpoint,上传的其实是与savepoint格式一样的kv数据对,Flink会遍历整个DB,将目前有效的数据写出到远程。所以你关闭增量checkpoint,而发现checkpoint目录保持恒定大小的话,说明真实有效数据的空间是稳定的。

另外,其实不建议在日常生产中关闭增量checkpoint,主要原因是对于大规模作业来说,全量checkpoint一方面会对底层DFS来说每次需要上传的数据量变大,另一方面,也会增长单次checkpoint的
 e2e duration,有checkpoint超时失败的风险。

祝好
唐云

From: HunterXHunter <1356469...@qq.com>
Sent: Tuesday, June 1, 2021 11:44
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Pyflink jdbc????

2021-06-01 文章 ????
Hi??
 ??https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
pyflink??jdbc??jar??jdbc??flink??1.13.1



from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create( 
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")


# 2. create source Table
table_env.execute_sql("""


CREATE TABLE table_source (
 e string
) WITH (
'connector' = 'jdbc',
 'url' = 'jdbc:mysql://:3306/test',
 'driver' = 'com.mysql.cj.jdbc.Driver',
 'table-name' = 'enum_test',
 'username' = 'pms_etl',
 'password' = 'pms_etl_q'
)


""")


# 3. create sink Table
table_env.execute_sql("""
  CREATE TABLE print (
e string
  ) WITH (
'connector' = 'print'
  )
""")
 


table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
table_source").wait()



python


Traceback (most recent call last):
 File "demo.py", line 41, in 

flink sql1.13.1????change log??????join????????????????????????????

2021-06-01 文章 ??????
|insert into dwd_order_detail
|select
|   ord.Id,
|   ord.Code,
|   Status
| concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
| TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
|from
|orders ord
|left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
oed.CreateTimeCAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
|where ( ord.OrderTimeCAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
TIMESTAMP)
|or ord.ReviewTimeCAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
TIMESTAMP)
|or ord.RejectTimeCAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
TIMESTAMP)
|) and 
ord.IsDeleted=0;??kafka??canal-json??join??upsert-kafka1.12??1.12??1.13??XJ0120210531004794??canal-json+U|
 +U | XJ0120210531004704 |  50 | | +U | 
XJ0120210531004788 |  50 | | +U | XJ0120210531004819 |  
50 | | +U | XJ0120210531004667 |  50 | | +U |   
  XJ0120210531004695 |  50 | | +U | XJ0120210531004776 |
  50 | | +U | XJ0120210531004784 |  50 | | +U | 
XJ0120210531004861 |  50 | | +U | XJ0120210531004794 |  
50 | | +U | XJ0120210531004672 |  50 | | +U |   
  XJ0120210531004766 |  50 | | +U | XJ0120210531004806 
|  50 | | +U | XJ0120210531004812 |  50 | | +U |
 XJ0120210601000126 |  50 | | +U | 
XJ0120210601000179 |  50 | | +U | XJ0120210531004816 |  
50 | | +U | XJ0120210601000927 |  50 
|??joinupsert kakfa??upsert kafka| +I | 
XJ0120210531004794 |  50 | | -U | XJ0120210531004794 |  
50 
|orders??order_extend-U

????

2021-06-01 文章 on the way


kafka exactly-once语义下,从svaepoint恢复报错

2021-06-01 文章 周瑞
您好:kafka在exactly-once语义下,从svaepoint恢复报错。初步排查认为是kafka事务使用了旧的epoch。请问这个问题怎么处理?
//todo 通过配置传进来
env.setParallelism(1);
env.enableCheckpointing(60L, CheckpointingMode.EXACTLY_ONCE);

// checkpoint的清除策略(即使任务被显示地取消也会保留checkpoint)
env.getCheckpointConfig()

.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);

//TODO 生产中必须使用 HDFS
env.setStateBackend(new 
FsStateBackend("hdfs://10.10.98.226:8020/tmp/checkpoint66"));

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
public static final  String TABLE_NAME = "KafkaTable";
public static final  String COLUMN_NAME = "source_value";

public static final String KAFKA_TABLE_FORMAT =
"CREATE TABLE "+TABLE_NAME+" (\n" +
"  "+COLUMN_NAME+" STRING\n" +
") WITH (\n" +
"   'connector' = 'kafka',\n" +
"   'topic' = '%s',\n" +
"   'properties.bootstrap.servers' = '%s',\n" +
"   'sink.semantic' = 'exactly-once',\n" +
"   'properties.transaction.timeout.ms' = '3',\n" +
"   'format' = 'dbz-json'\n" +
")\n";
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
operation with an old epoch. Either there is a newer producer with the same 
transactionalId,
 or the producer's transaction has been expired by the broker. while recovering 
transaction KafkaTransactionState [transactionalId=Source: 
TableSourceScan(table=[[default_catalog, default_database, debezium_source]], 
fields=[data]) - Sink: 
Sink(table=[default_catalog.default_database.KafkaTable], 
fields=[data])-7df19f87deec5680128845fd9a6ca18d-0, producerId=239009, 
epoch=216]. Presumably this transaction has been already committed before

?????? ??????????????????????????????????

2021-06-01 文章 5599





----
??: "r pp"

Re: 不同的程序在同一时间段报同一个异常

2021-06-01 文章 mq sun
有些程序挂掉,有些没有

r pp  于2021年6月1日周二 下午2:07写道:

> 你的程序有挂掉么?
>
> mq sun  于2021年5月31日周一 下午7:23写道:
>
> > 大家好:
> >   最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
> > ERROR org.apache.flink.runtime.blob.BlobServerConnection  -Error while
> > excuting Blob connection
> > .
> > .
> > .
> >
> >
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
> > :Adjusted frame length exceeds 10485760: 1347375960 -discarded
> >
> >
> >
> 上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因
> >
>
>
> --
> Best,
>   pp
>


Re: 不同的程序在同一时间段报同一个异常

2021-06-01 文章 r pp
你的程序有挂掉么?

mq sun  于2021年5月31日周一 下午7:23写道:

> 大家好:
>   最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
> ERROR org.apache.flink.runtime.blob.BlobServerConnection  -Error while
> excuting Blob connection
> .
> .
> .
>
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
> :Adjusted frame length exceeds 10485760: 1347375960 -discarded
>
>
> 上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因
>


-- 
Best,
  pp