Re: flink sql使用HepPlanner进行编译优化

2021-07-13 文章 terry Huang
flink sql的不确定性优化相比于批处理看起来是比较少的,另外我们使用的版本(flink-1.8)的实现并没有统计信息,因此我们在尝试使用
Hep Planner来提高编译速度。不知道是否会导致其它问题,比如语义变化等

Caizhi Weng  于2021年7月14日周三 上午10:08写道:

> Hi!
>
> Hep planner 是一个 rule based 的 planner,较多用于确定性的优化上。Volcano planner 是一个 cost
> based 的 planner,多用于不确定性的优化(例如 join 方式的选择,build 端的选择等),需要靠统计信息等进行决策。目前 Flink
> 两者均有应用。
>
> terry Huang  于2021年7月13日周二 下午7:31写道:
>
> > 大佬们好,目前Flink sql使用calcite 的Volcano
> >
> >
> Planner进行逻辑计划优化,但是我们的实践下来觉得编译时间有点长,我们准备使用HepPlanner来做优化。请问,这么做是否会带来致命问题或者flink
> > sql 使用Volcano planner的原因是什么呢
> >
>


Re: flink 触发保存点失败

2021-07-13 文章 Yun Tang
Hi,

这个看上去是client触发savepoint失败,而不是savepoint本身end-to-end执行超时。建议对照一下JobManager的日志,观察在触发的时刻,JM日志里是否有触发savepoint的相关日志,也可以在flink
 web UI上观察相应的savepoint是否出现在checkpoint tab的历史里面。

祝好
唐云

From: 仙剑……情动人间 <1510603...@qq.com.INVALID>
Sent: Tuesday, July 13, 2021 17:31
To: flink邮件列表 
Subject: flink 触发保存点失败

Hi All,


  我触发Flink 
保存点总是失败,报错如下,一直说是超时,但是没有进一步的信息可以查看,我查资料说可以设置checkpoint超时时间,我设置了2min,但是触发
保存点时在2min之前就会报错,另外我的 状态 并不大
 



The program finished with the following exception:


org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
 failed.
at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Re: flink时态表:两个Hbase左关联有报错情况

2021-07-13 文章 Caizhi Weng
Hi!

flink classpath 下是否有 hbase connector 相关的 jar 包呢?如果用户代码里没有用到 hbase
相关的类和方法,其实用户代码的 pom 文件里是不需要这些依赖的。

xie_guo...@163.com  于2021年7月14日周三 上午9:43写道:

> 您好,有关flinkSQL时态表左关联时遇到了问题。
> 具体场景:
>
> 两个Hbase表做时态表关联,左表定义了处理时间,右表原本不变(rowkey,cf),FLINK版本1.13.0,报错情况如下,麻烦问一下大家又遇到类似问题嘛,该怎么处理!
>
> 2021-07-14 09:22:20.592 WARN  org.apache.flink.runtime.taskmanager.Task
> --- 2021-07-14 09:22:20.596 WARN
> org.apache.flink.runtime.taskmanager.Task  ---
> LookupJoin(table=[default_catalog.default_database.hbase_source_pollution_dwb_enterprise_wodhz],
> joinType=[LeftOuterJoin], async=[true], lookup=[code=$f4], select=[code,
> data1, data2, p, $f4, code0, data]) -> Calc(select=[code,
> ROW(,,data.activ) -> NotNullEnforcer(fields=[EXPR$1, EXPR$2, p,
> EXPR$4]) (3/3)#3 (4ada188e117c67ccd9bd6488ae95216a) switched from RUNNING
> to FAILED with failure cause: java.util.concurrent.ExecutionException:
> java.lang.NoClassDefFoundError:
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:448)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:671)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
> at
> org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
> at
> org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:251)
> at LookupFunction$3.close(Unknown Source
>
>
> ps:同样的代码,左表换成Kafka,能够正常运行。网上搜索了一下相关报错,好像时缺包,目前pom文件是有hbase-client、hbase-commen、flink-sql-connector-hbase。
>
>
>
> Sincerely,
> xie_guo...@163.com
>


Re: flink sql使用HepPlanner进行编译优化

2021-07-13 文章 Caizhi Weng
Hi!

Hep planner 是一个 rule based 的 planner,较多用于确定性的优化上。Volcano planner 是一个 cost
based 的 planner,多用于不确定性的优化(例如 join 方式的选择,build 端的选择等),需要靠统计信息等进行决策。目前 Flink
两者均有应用。

terry Huang  于2021年7月13日周二 下午7:31写道:

> 大佬们好,目前Flink sql使用calcite 的Volcano
>
> Planner进行逻辑计划优化,但是我们的实践下来觉得编译时间有点长,我们准备使用HepPlanner来做优化。请问,这么做是否会带来致命问题或者flink
> sql 使用Volcano planner的原因是什么呢
>


flink时态表:两个Hbase左关联有报错情况

2021-07-13 文章 xie_guo...@163.com
您好,有关flinkSQL时态表左关联时遇到了问题。
具体场景:

两个Hbase表做时态表关联,左表定义了处理时间,右表原本不变(rowkey,cf),FLINK版本1.13.0,报错情况如下,麻烦问一下大家又遇到类似问题嘛,该怎么处理!

2021-07-14 09:22:20.592 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
2021-07-14 09:22:20.596 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
LookupJoin(table=[default_catalog.default_database.hbase_source_pollution_dwb_enterprise_wodhz],
 joinType=[LeftOuterJoin], async=[true], lookup=[code=$f4], select=[code, 
data1, data2, p, $f4, code0, data]) -> Calc(select=[code, ROW(,,data.activ) 
-> NotNullEnforcer(fields=[EXPR$1, EXPR$2, p, EXPR$4]) (3/3)#3 
(4ada188e117c67ccd9bd6488ae95216a) switched from RUNNING to FAILED with failure 
cause: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: 
org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:671)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
at 
org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
at 
org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:251)
at LookupFunction$3.close(Unknown Source

ps:同样的代码,左表换成Kafka,能够正常运行。网上搜索了一下相关报错,好像时缺包,目前pom文件是有hbase-client、hbase-commen、flink-sql-connector-hbase。



Sincerely,
xie_guo...@163.com


flink sql使用HepPlanner进行编译优化

2021-07-13 文章 terry Huang
大佬们好,目前Flink sql使用calcite 的Volcano
Planner进行逻辑计划优化,但是我们的实践下来觉得编译时间有点长,我们准备使用HepPlanner来做优化。请问,这么做是否会带来致命问题或者flink
sql 使用Volcano planner的原因是什么呢


flink ??????????????

2021-07-13 文章 ????????????????
Hi All??


  ??Flink 
checkpoint??2min??
??2min??  ??
 



The program finished with the following exception:


org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
 failed.
at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

参与开发社区

2021-07-13 文章 沉黙dē羔羊
大家好,请教下,用了flink hudi 写入数据报错,如下:
Caused by: org.apache.flink.util.FlinkException: Error from OperatorCoordinator
... 41 more
Caused by: java.lang.AbstractMethodError: Method 
org/apache/hudi/sink/StreamWriteOperatorCoordinator.subtaskReady(ILorg/apache/flink/runtime/operators/coordination/OperatorCoordinator$SubtaskGateway;)V
 is abstract
at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.subtaskReady(StreamWriteOperatorCoordinator.java)
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.notifySubtaskReady(OperatorCoordinatorHolder.java:416)
... 40 more

Re: Re:Re: flink on native k8s要如何动态改变日志配置?

2021-07-13 文章 Yang Wang
运行在session内的任务日志无法独立配置
除非是你每个任务的用户代码package都不一样,在log4j中配置不同的package写入不同的文件。但Flink框架日志还是无法区分

Best,
Yang

东东  于2021年7月13日周二 下午12:56写道:

> 是的,日志配置是针对JM和TM的
>
>
>
>
>
> 在 2021-07-13 12:37:20,"casel.chen"  写道:
> >如果是 session
> mode的话,日志配置文件是对整个session上运行的作业都生效么?每个运行在session上的作业可以独立配置日志吗?谢谢!
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-07-12 10:57:08,"Yang Wang"  写道:
> >>你直接修改ConfigMap中存储的log4j-console.properties就可以立即生效了,具体参考这里[1]
> >>
> >>[1].
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#changing-the-log-level-dynamically
> >>
> >>Best,
> >>Yang
> >>
> >>casel.chen  于2021年7月9日周五 下午8:29写道:
> >>
> >>> flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger
> >>> Level,以及用户可以传入自定义的日志模板,目前有办法做到么?
>


Re: Pyflink中使用ConnectedStream时,key_by后分区的问题

2021-07-13 文章 Fei Zhao
Hi,

我在u...@flink.apache.org列表中也发送了这个问题,已经得到回复说问题的原因如下:

The root cause is the wrong mapping of the state key to the state. This
> kind of wrong mapping occurs when the key is switched, but the state is not
> used. As you wrote in the example, the `data` you declared is not used in
> `process_element2`
>

目前已经开了issue来追踪这个问题,并且会在下个release-1.13.2中包含修复。

赵飞  于2021年7月12日周一 下午4:53写道:

> Hi,
>
> 以下是完整代码。需要补充的是,相同条件下运行相同代码,结果可能会不一样,有时候正常,有时候能够复现问题。
>
> import random
>
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import KeyedCoProcessFunction
> from pyflink.datastream.state import MapStateDescriptor
> from pyflink.datastream import RuntimeContext
>
>
> def test(data):
> product_ids = set()
> for key, value in data.items():
> product_ids.add(value[0])
> return list(product_ids)
>
>
> class MyFunction(KeyedCoProcessFunction):
> def open(self, ctx):
> data_desc = MapStateDescriptor('data', Types.STRING(), 
> Types.ROW([Types.INT()]))
> self.data = ctx.get_map_state(data_desc)
>
> rule_desc = MapStateDescriptor('rule', Types.STRING(), 
> Types.ROW([Types.INT()]))
> self.rules = ctx.get_map_state(rule_desc)
>
> def process_element1(self, data_value, ctx):
> row_id, others = data_value[0], data_value[1:]
> self.data.put(row_id, others)
> result = []
> for key, value_list in self.rules.items():
> product_id, random_0, random_1  = value_list
> # Do some calculations
> product_ids_of_state_data = test(self.data)
> result.append([random_0, random_1, product_id, 
> product_ids_of_state_data])
> return result
>
> def process_element2(self, rule_value, ctx):
> row_id, others = rule_value[0], rule_value[1:]
> self.rules.put(row_id, others)
>
> def generate_data1(count):
> collection = []
> for i in range(count):
> collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
> return collection
>
> def generate_data2(count):
> collection = []
> for i in range(count):
> collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, i 
> * 2])
> return collection
>
>
> def main():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
>
> data = env.from_collection(generate_data1(50))
> rules = env.from_collection([
> ['row_0', 1, 'rule1_value0', 'rule1_value1'],
> ['row_1', 2, 'rule2_value0', 'rule2_value1']
> ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), 
> Types.STRING()]))
> results = data.connect(rules).key_by(lambda x: x[1], lambda y: 
> y[1]).process(MyFunction())
> results.print()
>
> env.execute("test_job")
>
> if __name__ == "__main__":
> main()
>
>
> Dian Fu  于2021年7月12日周一 下午4:48写道:
>
>> Hi,
>>
>> 是否发一下可复现的完整示例?
>>
>> Regards,
>> Dian
>>
>> > 2021年7月10日 下午7:44,赵飞  写道:
>> >
>> > 各位好,请教一个问题。
>> >
>> >
>> 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码大致如下:
>> >
>> > ---
>> > results = data.connect(rules).key_by('product_id',
>> > 'product_id').process(MyFunction())
>> > results.print()
>> >
>> > class MyFunction(KeyedCoProcessFunction):
>> >def open(self, ctx):
>> >data_desc = MapStateDescriptor('data', key_type, value_type)
>> >self.data = ctx.get_map_state(data_desc)
>> >
>> >rule_desc = MapStateDescriptor('rule', key_type,  value_type)
>> >self.rules = ctx.get_map_state(rule_desc)
>> >
>> >def process_element1(self, data_value, ctx):
>> >row_id, others = data_value[0], data_value[1:]
>> >self.data.put(row_id, others)
>> >result = []
>> >for key, value_list in self.rules.items():
>> >product_id, random_0, random_1  = value_list
>> ># Do some calculations
>> >product_ids_of_state_data = OtherFunction(self.data)
>> >result.append(random_0, random_1, product_id,
>> > product_ids_of_state_data)
>> >return result
>> >
>> >def process_element2(self, rule_value, ctx):
>> >row_id, others = rule_value[0], rule_value[1:]
>> >self.rules.put(row_id, others)
>> > --
>> >
>> > 数据格式大致如下:
>> > # 数据流(假设第二个元素为产品id)
>> > [
>> >['row_0', 1, 'a_0', 2],
>> >['row_1', 2, 'a_1', 3],
>> >['row_2', 1, 'a_2', 4],
>> >['row_4', 2, 'a_3', 5]
>> > ]
>> >
>> > # 规则流(假设第二个元素为产品id)
>> > [
>> >['row_0', 1, 'rule1_value0', 'rule1_value1'],
>> >['row_1', 2, 'rule2_value0', 'rule2_value1']
>> > ]
>> >
>> > 执行程序(指定全局并行度为1)后,得到的输出类似于:
>> > ['rule1_value0',  'rule1_value1', 1, [1, 2]]
>> > ['rule2_value0',  'rule2_value1', 2, [1, 2]]
>> >
>> 从输出来看,当某产品的数据进来时,只使用了其对应的规则进行了处理,可以表明规则确实按产品id分区了,但是维护数据的MapState中却包含了多个产品id的数据。
>> >
>> > 更进一步的现象为:
>> > 0. 如果数据流中的数据一直按照先产品1,后产品2的顺序,那么能够正常分区。但是如果无法保证这个顺序,则会出现以上描述的问题
>> > 1. 

flink 消费kafka 数据去重后聚合问题

2021-07-13 文章 yanyunpeng
create view distinct_view as 
select val,ptime from (select * ,ROW_NUMBER() OVER (PARTITION BY MN ORDER BY 
ptime ) as rowNum from test_udf) where rowNum = 1


select avg(val) as avg_var, STDDEV_POP(val) as spddev_pop_var from 
distinct_view GROUP BY HOP(ptime, INTERVAL '2' SECONDS, INTERVAL '1' DAY);

kafka的数据有重复 但是这么写有问题 请问这种情况应该怎么写