Re: Calculation of UI's maximum non-heap memory

2023-02-20 Thread Alexis Sarda-Espinosa
Hi Weihua,

Thanks for your response, I am familiar with those calculations, the one I
don't understand is the Maximum Non-Heap value.

Regards,
Alexis.

On Tue, 21 Feb 2023, 04:45 Weihua Hu,  wrote:

> Hi, Alexis
>
> 1. With those configuration, Flink will set JVM parameters -Xms and -Xmx
> to 673185792(642m),-XX:MaxDirectMemorySize to 
> 67108864(64m),-XX:MaxMetaspaceSize
> to 157286400(150m), you can find more information from [1]
> 2. As the hint in Flink UI: "The maximum heap displayed might differ from
> the configured values depending on the used GC algorithm for this
> process.", This[2] shows how JVM calculate the max heap memory from
> configured -Xms/-Xmx
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/
> [2]
> https://stackoverflow.com/questions/52980629/runtime-getruntime-maxmemory-calculate-method
>
> Best,
> Weihua
>
>
> On Tue, Feb 21, 2023 at 12:15 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I have configured a job manager with the following settings (Flink
>> 1.16.1):
>>
>> jobmanager.memory.process.size: 1024m
>> jobmanager.memory.jvm-metaspace.size: 150m
>> jobmanager.memory.off-heap.size: 64m
>> jobmanager.memory.jvm-overhead.min: 168m
>> jobmanager.memory.jvm-overhead.max: 168m
>> jobmanager.memory.enable-jvm-direct-memory-limit: "true"
>>
>> However, when I look at the job manager dashboard in the UI, I see that
>> the value of Non-Heap Maximum is reported as 532 MB. Could someone clarify
>> how this value is calculated?
>>
>> In case it's relevant, the effective configuration for JVM Heap is
>> reported as 642 MB, with the reported maximum being 621 MB.
>>
>> Regards,
>> Alexis.
>>
>>


Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

2023-02-20 Thread wang
Hi dear engineers,


  One question as title: Whether Flink SQL window operations support "Allow 
Lateness and SideOutput"?


  Just as supported in Datastream api (allowedLateness and sideOutputLateData) 
like:


SingleOutputStreamOperator<>sumStream = dataStream.keyBy().timeWindow()
   
.allowedLateness(Time.minutes(1)) 
   
.sideOutputLateData(outputTag)
   .sum();


Thanks && Regards,
Hunk



Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

2023-02-20 Thread wang
Hi dear engineers,


  One question as title: Whether Flink SQL window operations support "Allow 
Lateness and SideOutput"?


  Just as supported in Datastream api (allowedLateness and sideOutputLateData) 
like:


SingleOutputStreamOperator<>sumStream = dataStream.keyBy().timeWindow()
   
.allowedLateness(Time.minutes(1)) 
   
.sideOutputLateData(outputTag)
   .sum();


Thanks && Regards,
Hunk



Re: Metrics or runtimeContext in global commit

2023-02-20 Thread Tobias Fröhlich
Dear Yuxia,

thank you for your answer! This is also our conclusion and my colleague has 
already proposed this feature.

Best regards,
Tobias


- Ursprüngliche Mail -
Von: "yuxia" 
An: "Dr. Tobias Fröhlich" 
CC: "User" , "dev" 
Gesendet: Montag, 20. Februar 2023 03:31:22
Betreff: Re: Metrics or runtimeContext in global commit

It seems no other way to get the runtimeContext in a global commit. For me, I 
think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Tobias Fröhlich" 
收件人: "User" 
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext" that 
implements Committer and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

if (committer instanceof  CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext) 
committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext) committer).init();
}

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich


Re: 广播流与非广播流 数据先后问题

2023-02-20 Thread Weihua Hu
Hi,

可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html

Best,
Weihua


On Tue, Feb 21, 2023 at 12:17 PM 知而不惑  wrote:

> 有收到我的问题吗
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
>
>  发送时间:2023年2月21日(星期二) 上午9:37
> 收件人:"user-zh"
> 主题:广播流与非广播流 数据先后问题
>
>
>
> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction List FileEventOuterClass.FileEventgt;.ReadOnlyContext ctx,
> Collector  try {
>  ReadOnlyBroadcastState List ctx.getBroadcastState(ruleDescriptor);
>
>  List sensitiveRules = broadcastState.get(null);
>  if
> (CollectionUtils.isEmpty(sensitiveRules)) {
>  return;
>  }
>  
>  } catch (Exception e) {
> 
> log.error("SensitiveDataClassify err:", e);
>  }
> }
> public static void main(String[] args) throws Exception {
>  StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  env.setParallelism(1);
>
>  MapStateDescriptor List  new
> MapStateDescriptor ListTypeInfo
>  // 广播流
>  BroadcastStream broadcast = sensitiveRule.broadcast(ruleDescriptor);
>
>  DataStreamSource env.socketTextStream("localhost", 11451);
> 
> SingleOutputStreamOperator localhost.map((MapFunction value -gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("").build());
>
> 
> SingleOutputStreamOperator streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
>  streamOperator.print("qqq");
>  env.execute();
>
> }


Re: Calculation of UI's maximum non-heap memory

2023-02-20 Thread Weihua Hu
Hi, Alexis

1. With those configuration, Flink will set JVM parameters -Xms and -Xmx to
673185792(642m),-XX:MaxDirectMemorySize to 67108864(64m),-XX:MaxMetaspaceSize
to 157286400(150m), you can find more information from [1]
2. As the hint in Flink UI: "The maximum heap displayed might differ from
the configured values depending on the used GC algorithm for this
process.", This[2] shows how JVM calculate the max heap memory from
configured -Xms/-Xmx


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/
[2]
https://stackoverflow.com/questions/52980629/runtime-getruntime-maxmemory-calculate-method

Best,
Weihua


On Tue, Feb 21, 2023 at 12:15 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> I have configured a job manager with the following settings (Flink 1.16.1):
>
> jobmanager.memory.process.size: 1024m
> jobmanager.memory.jvm-metaspace.size: 150m
> jobmanager.memory.off-heap.size: 64m
> jobmanager.memory.jvm-overhead.min: 168m
> jobmanager.memory.jvm-overhead.max: 168m
> jobmanager.memory.enable-jvm-direct-memory-limit: "true"
>
> However, when I look at the job manager dashboard in the UI, I see that
> the value of Non-Heap Maximum is reported as 532 MB. Could someone clarify
> how this value is calculated?
>
> In case it's relevant, the effective configuration for JVM Heap is
> reported as 642 MB, with the reported maximum being 621 MB.
>
> Regards,
> Alexis.
>
>


Re: Re: Flink程序内存Dump不了

2023-02-20 Thread Biao Geng
Hi,
这个报错 sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410
可能和flink关系不大。
我们之前在生产环境中dump内存的时候遇到过类似问题,后续定位发现是运行jmap命令的Linux user与运行flink作业的Linux
user不一致导致的。
不知道和你们的问题一不一致,你可以试试t用op -p 查到USER,然后su 一下,再进行jmap
-dump:format=b,file=/tmp/dump.hprof 试试。

Best,
Biao Geng

Weihua Hu  于2023年2月20日周一 14:03写道:

> Hi,
>
> 可以把心跳超时(heartbeat.timeout)[1]也调大再尝试 dump 内存。
>
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options
>
> Best,
> Weihua
>
>
> On Mon, Feb 20, 2023 at 1:58 PM lxk  wrote:
>
> > 我尝试调整了参数,具体数值如下
> >
> >
> > akka.ask.timeout: 900s
> >
> >
> >
> > 但还是报同样的错
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2023-02-17 17:32:51,"Guo Thompson"  写道:
> > >可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了;
> > >
> > >lxk  于2023年2月14日周二 14:32写道:
> > >
> > >> Flink version:1.16
> > >> java version: jdk1.8.0_251
> > >> 问题:最近上线的Flink程序,频繁young
> > >>
> >
> gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps
> > >> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format
> > >> b,file=user.dump 26326
> > >>
> >
> 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下:
> > >> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410
> > >> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png
> > >> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。
> >
>


Re:Re: Flink SQL support array transform function

2023-02-20 Thread Xuekui
Hi YuXia,


Thanks for your advice.


By adding the hint, the type validation can pass.
But still I can't pass the function to this udf
Here is my query


select array_transform(ids, id - id +1) from tmp_table


The lambda functionid - id +1 can't be passed because "-" 
is not supported in calcite now.


Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "- " at line 3, column 40.
at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
  at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)













   
Original Email
   
 

Sender:"yuxia"< luoyu...@alumni.sjtu.edu.cn ;

Sent Time:2023/2/20 10:00

To:"Xuekui"< baixue...@foxmail.com ;

Cc recipient:"fskmine"< fskm...@gmail.com ;"Caizhi Weng"< 
tsreape...@gmail.com ;"User"< user@flink.apache.org ;

Subject:Re: Flink SQL support array transform function


Hi, Xuekui.
As said in the exception stack, may be you can try to provide hint for 
the function's parameters.




class ArrayTransformFunction extends ScalarFunction {

 def eval(@DataTypeHint("ARRAYhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference




Best regards,
Yuxia



发件人: "Xuekui" https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/


Xuekui https://spark.apache.org/docs/latest/api/sql/index.html#transform

I found it's not supported by Flink SQL , is there any plan for it?


Thank you

Re: 广播流与非广播流 数据先后问题

2023-02-20 Thread Weihua Hu
Hi,

可以把具体的报错信息贴一下,另外代码中没有看到使用 listState 缓存元素的部分

Best,
Weihua


On Tue, Feb 21, 2023 at 9:38 AM 知而不惑  wrote:

> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction List ctx, Collector try {
> ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(ruleDescriptor);
>
> List if (CollectionUtils.isEmpty(sensitiveRules)) {
> return;
> }
> 
> } catch (Exception e) {
> log.error("SensitiveDataClassify err:", e);
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> MapStateDescriptor new MapStateDescriptor<("ruleBroadcastState", Types.VOID,
> new ListTypeInfo<(SensitiveRule.class));
>
> // 广播流
> BroadcastStream sensitiveRule.broadcast(ruleDescriptor);
>
> DataStreamSource env.socketTextStream("localhost", 11451);
> SingleOutputStreamOperator localhost.map((MapFunction -
> FileEventOuterClass.FileEvent.newBuilder().setChannel("").build());
>
> SingleOutputStreamOperator streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
> streamOperator.print("qqq");
> env.execute();
>
> }


flink cep A B C 事件一段时间不分先后顺序匹配

2023-02-20 Thread Peihui He
hi, all

  如题,看了https://mp.weixin.qq.com/s/PT8ImeOOheXR295gQRsN8w
这篇文章后,发现第四个问题没有讲到解决方案。
   请教大家有什么好的方案没呢?


Best Wishes!


Calculation of UI's maximum non-heap memory

2023-02-20 Thread Alexis Sarda-Espinosa
Hello,

I have configured a job manager with the following settings (Flink 1.16.1):

jobmanager.memory.process.size: 1024m
jobmanager.memory.jvm-metaspace.size: 150m
jobmanager.memory.off-heap.size: 64m
jobmanager.memory.jvm-overhead.min: 168m
jobmanager.memory.jvm-overhead.max: 168m
jobmanager.memory.enable-jvm-direct-memory-limit: "true"

However, when I look at the job manager dashboard in the UI, I see that the
value of Non-Heap Maximum is reported as 532 MB. Could someone clarify how
this value is calculated?

In case it's relevant, the effective configuration for JVM Heap is reported
as 642 MB, with the reported maximum being 621 MB.

Regards,
Alexis.


Re: Job Cancellation Failing

2023-02-20 Thread Matthias Pohl via user
What do you mean by "earlier it used to fail due to ExecutionGraphStore not
existing in /tmp" folder? Did you get the error message "Could not create
executionGraphStorage directory in /tmp." and creating this folder fixed
the issue?

It also looks like the stacktrace doesn't match any of the 1.15 versions in
terms of line numbers. Or I might miss something here. Could you provide
the exact Flink version you're using?

I might also help to share the JobManager logs to understand the context in
which the cancel operation was triggered.

Matthias

On Mon, Feb 20, 2023 at 1:53 AM Puneet Duggal 
wrote:

> Flink Cluster Context:
>
>
>- Flink Version - 1.15
>- Deployment Mode - Session
>- Number of Job Managers - 3 (HA)
>- Number of Task Managers - 1
>
>
> Cancellation of Job fails due to following
>
> org.apache.flink.runtime.rest.NotFoundException: Job
> 1cb2185d4d72c8c6f0a3a549d7de4ef0 not found
> at
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:99)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> at
> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.lambda$getExecutionGraphInternal$0(DefaultExecutionGraphCache.java:109)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
> at akka.dispatch.OnComplete.internal(Future.scala:299)
> at akka.dispatch.OnComplete.internal(Future.scala:297)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:118)
> at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:1144)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:540)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at