Re: Calculation of UI's maximum non-heap memory
Hi Weihua, Thanks for your response, I am familiar with those calculations, the one I don't understand is the Maximum Non-Heap value. Regards, Alexis. On Tue, 21 Feb 2023, 04:45 Weihua Hu, wrote: > Hi, Alexis > > 1. With those configuration, Flink will set JVM parameters -Xms and -Xmx > to 673185792(642m),-XX:MaxDirectMemorySize to > 67108864(64m),-XX:MaxMetaspaceSize > to 157286400(150m), you can find more information from [1] > 2. As the hint in Flink UI: "The maximum heap displayed might differ from > the configured values depending on the used GC algorithm for this > process.", This[2] shows how JVM calculate the max heap memory from > configured -Xms/-Xmx > > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/ > [2] > https://stackoverflow.com/questions/52980629/runtime-getruntime-maxmemory-calculate-method > > Best, > Weihua > > > On Tue, Feb 21, 2023 at 12:15 AM Alexis Sarda-Espinosa < > sarda.espin...@gmail.com> wrote: > >> Hello, >> >> I have configured a job manager with the following settings (Flink >> 1.16.1): >> >> jobmanager.memory.process.size: 1024m >> jobmanager.memory.jvm-metaspace.size: 150m >> jobmanager.memory.off-heap.size: 64m >> jobmanager.memory.jvm-overhead.min: 168m >> jobmanager.memory.jvm-overhead.max: 168m >> jobmanager.memory.enable-jvm-direct-memory-limit: "true" >> >> However, when I look at the job manager dashboard in the UI, I see that >> the value of Non-Heap Maximum is reported as 532 MB. Could someone clarify >> how this value is calculated? >> >> In case it's relevant, the effective configuration for JVM Heap is >> reported as 642 MB, with the reported maximum being 621 MB. >> >> Regards, >> Alexis. >> >>
Whether Flink SQL window operations support "Allow Lateness and SideOutput"?
Hi dear engineers, One question as title: Whether Flink SQL window operations support "Allow Lateness and SideOutput"? Just as supported in Datastream api (allowedLateness and sideOutputLateData) like: SingleOutputStreamOperator<>sumStream = dataStream.keyBy().timeWindow() .allowedLateness(Time.minutes(1)) .sideOutputLateData(outputTag) .sum(); Thanks && Regards, Hunk
Whether Flink SQL window operations support "Allow Lateness and SideOutput"?
Hi dear engineers, One question as title: Whether Flink SQL window operations support "Allow Lateness and SideOutput"? Just as supported in Datastream api (allowedLateness and sideOutputLateData) like: SingleOutputStreamOperator<>sumStream = dataStream.keyBy().timeWindow() .allowedLateness(Time.minutes(1)) .sideOutputLateData(outputTag) .sum(); Thanks && Regards, Hunk
Re: Metrics or runtimeContext in global commit
Dear Yuxia, thank you for your answer! This is also our conclusion and my colleague has already proposed this feature. Best regards, Tobias - Ursprüngliche Mail - Von: "yuxia" An: "Dr. Tobias Fröhlich" CC: "User" , "dev" Gesendet: Montag, 20. Februar 2023 03:31:22 Betreff: Re: Metrics or runtimeContext in global commit It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. I added flink-devs channel for more attention/discussion in flink devs. Best regards, Yuxia - 原始邮件 - 发件人: "Tobias Fröhlich" 收件人: "User" 发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34 主题: Metrics or runtimeContext in global commit Dear flink team, I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology. The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator. The only solution I found was by cloning the flink source code and amending it in the following way: 1. declaring an abstract class "CommitterWithRuntimeContext" that implements Committer and has: - an additional field for the runtimeContext - setter and getter for this field - an abstract method "void init()" 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end: if (committer instanceof CommitterWithRuntimeContext) { ((CommitterWithRuntimeContext) committer).setRuntimeContext(getRuntimeContext()); ((CommitterWithRuntimeContext) committer).init(); } I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext. Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext? Best regards and thanks in advance Tobias Fröhlich
Re: 广播流与非广播流 数据先后问题
Hi, 可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考: https://flink.apache.org/community.html Best, Weihua On Tue, Feb 21, 2023 at 12:17 PM 知而不惑 wrote: > 有收到我的问题吗 > > > > > --原始邮件-- > 发件人: > "user-zh" > > 发送时间:2023年2月21日(星期二) 上午9:37 > 收件人:"user-zh" > 主题:广播流与非广播流 数据先后问题 > > > > 各位大佬好 > 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction > 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt > 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错 > 以下是processElement 的最小工作单元代码示例 和 main 方法的使用: > @Override > public void processElement(FileEventOuterClass.FileEvent value, > BroadcastProcessFunction List FileEventOuterClass.FileEventgt;.ReadOnlyContext ctx, > Collector try { > ReadOnlyBroadcastState List ctx.getBroadcastState(ruleDescriptor); > > List sensitiveRules = broadcastState.get(null); > if > (CollectionUtils.isEmpty(sensitiveRules)) { > return; > } > > } catch (Exception e) { > > log.error("SensitiveDataClassify err:", e); > } > } > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > MapStateDescriptor List new > MapStateDescriptorListTypeInfo > // 广播流 > BroadcastStream broadcast = sensitiveRule.broadcast(ruleDescriptor); > > DataStreamSource env.socketTextStream("localhost", 11451); > > SingleOutputStreamOperator localhost.map((MapFunction value -gt; > FileEventOuterClass.FileEvent.newBuilder().setChannel("").build()); > > > SingleOutputStreamOperator streamOperator = stream.connect(broadcast).process(new > SensitiveDataClassify()); > streamOperator.print("qqq"); > env.execute(); > > }
Re: Calculation of UI's maximum non-heap memory
Hi, Alexis 1. With those configuration, Flink will set JVM parameters -Xms and -Xmx to 673185792(642m),-XX:MaxDirectMemorySize to 67108864(64m),-XX:MaxMetaspaceSize to 157286400(150m), you can find more information from [1] 2. As the hint in Flink UI: "The maximum heap displayed might differ from the configured values depending on the used GC algorithm for this process.", This[2] shows how JVM calculate the max heap memory from configured -Xms/-Xmx [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/ [2] https://stackoverflow.com/questions/52980629/runtime-getruntime-maxmemory-calculate-method Best, Weihua On Tue, Feb 21, 2023 at 12:15 AM Alexis Sarda-Espinosa < sarda.espin...@gmail.com> wrote: > Hello, > > I have configured a job manager with the following settings (Flink 1.16.1): > > jobmanager.memory.process.size: 1024m > jobmanager.memory.jvm-metaspace.size: 150m > jobmanager.memory.off-heap.size: 64m > jobmanager.memory.jvm-overhead.min: 168m > jobmanager.memory.jvm-overhead.max: 168m > jobmanager.memory.enable-jvm-direct-memory-limit: "true" > > However, when I look at the job manager dashboard in the UI, I see that > the value of Non-Heap Maximum is reported as 532 MB. Could someone clarify > how this value is calculated? > > In case it's relevant, the effective configuration for JVM Heap is > reported as 642 MB, with the reported maximum being 621 MB. > > Regards, > Alexis. > >
Re: Re: Flink程序内存Dump不了
Hi, 这个报错 sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410 可能和flink关系不大。 我们之前在生产环境中dump内存的时候遇到过类似问题,后续定位发现是运行jmap命令的Linux user与运行flink作业的Linux user不一致导致的。 不知道和你们的问题一不一致,你可以试试t用op -p 查到USER,然后su 一下,再进行jmap -dump:format=b,file=/tmp/dump.hprof 试试。 Best, Biao Geng Weihua Hu 于2023年2月20日周一 14:03写道: > Hi, > > 可以把心跳超时(heartbeat.timeout)[1]也调大再尝试 dump 内存。 > > > [1] > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options > > Best, > Weihua > > > On Mon, Feb 20, 2023 at 1:58 PM lxk wrote: > > > 我尝试调整了参数,具体数值如下 > > > > > > akka.ask.timeout: 900s > > > > > > > > 但还是报同样的错 > > > > > > > > > > > > > > > > > > > > > > > > 在 2023-02-17 17:32:51,"Guo Thompson" 写道: > > >可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了; > > > > > >lxk 于2023年2月14日周二 14:32写道: > > > > > >> Flink version:1.16 > > >> java version: jdk1.8.0_251 > > >> 问题:最近上线的Flink程序,频繁young > > >> > > > gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps > > >> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format > > >> b,file=user.dump 26326 > > >> > > > 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下: > > >> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410 > > >> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png > > >> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。 > > >
Re:Re: Flink SQL support array transform function
Hi YuXia, Thanks for your advice. By adding the hint, the type validation can pass. But still I can't pass the function to this udf Here is my query select array_transform(ids, id - id +1) from tmp_table The lambda functionid - id +1 can't be passed because "-" is not supported in calcite now. Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "- " at line 3, column 40. at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) Original Email Sender:"yuxia"< luoyu...@alumni.sjtu.edu.cn ; Sent Time:2023/2/20 10:00 To:"Xuekui"< baixue...@foxmail.com ; Cc recipient:"fskmine"< fskm...@gmail.com ;"Caizhi Weng"< tsreape...@gmail.com ;"User"< user@flink.apache.org ; Subject:Re: Flink SQL support array transform function Hi, Xuekui. As said in the exception stack, may be you can try to provide hint for the function's parameters. class ArrayTransformFunction extends ScalarFunction { def eval(@DataTypeHint("ARRAYhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference Best regards, Yuxia 发件人: "Xuekui" https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ Xuekui https://spark.apache.org/docs/latest/api/sql/index.html#transform I found it's not supported by Flink SQL , is there any plan for it? Thank you
Re: 广播流与非广播流 数据先后问题
Hi, 可以把具体的报错信息贴一下,另外代码中没有看到使用 listState 缓存元素的部分 Best, Weihua On Tue, Feb 21, 2023 at 9:38 AM 知而不惑 wrote: > 各位大佬好 > 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction > 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt > 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错 > 以下是processElement 的最小工作单元代码示例 和 main 方法的使用: > @Override > public void processElement(FileEventOuterClass.FileEvent value, > BroadcastProcessFunction List ctx, Collector try { > ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(ruleDescriptor); > > List if (CollectionUtils.isEmpty(sensitiveRules)) { > return; > } > > } catch (Exception e) { > log.error("SensitiveDataClassify err:", e); > } > } > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > MapStateDescriptor new MapStateDescriptor<("ruleBroadcastState", Types.VOID, > new ListTypeInfo<(SensitiveRule.class)); > > // 广播流 > BroadcastStream sensitiveRule.broadcast(ruleDescriptor); > > DataStreamSource env.socketTextStream("localhost", 11451); > SingleOutputStreamOperator localhost.map((MapFunction - > FileEventOuterClass.FileEvent.newBuilder().setChannel("").build()); > > SingleOutputStreamOperator streamOperator = stream.connect(broadcast).process(new > SensitiveDataClassify()); > streamOperator.print("qqq"); > env.execute(); > > }
flink cep A B C 事件一段时间不分先后顺序匹配
hi, all 如题,看了https://mp.weixin.qq.com/s/PT8ImeOOheXR295gQRsN8w 这篇文章后,发现第四个问题没有讲到解决方案。 请教大家有什么好的方案没呢? Best Wishes!
Calculation of UI's maximum non-heap memory
Hello, I have configured a job manager with the following settings (Flink 1.16.1): jobmanager.memory.process.size: 1024m jobmanager.memory.jvm-metaspace.size: 150m jobmanager.memory.off-heap.size: 64m jobmanager.memory.jvm-overhead.min: 168m jobmanager.memory.jvm-overhead.max: 168m jobmanager.memory.enable-jvm-direct-memory-limit: "true" However, when I look at the job manager dashboard in the UI, I see that the value of Non-Heap Maximum is reported as 532 MB. Could someone clarify how this value is calculated? In case it's relevant, the effective configuration for JVM Heap is reported as 642 MB, with the reported maximum being 621 MB. Regards, Alexis.
Re: Job Cancellation Failing
What do you mean by "earlier it used to fail due to ExecutionGraphStore not existing in /tmp" folder? Did you get the error message "Could not create executionGraphStorage directory in /tmp." and creating this folder fixed the issue? It also looks like the stacktrace doesn't match any of the 1.15 versions in terms of line numbers. Or I might miss something here. Could you provide the exact Flink version you're using? I might also help to share the JobManager logs to understand the context in which the cancel operation was triggered. Matthias On Mon, Feb 20, 2023 at 1:53 AM Puneet Duggal wrote: > Flink Cluster Context: > > >- Flink Version - 1.15 >- Deployment Mode - Session >- Number of Job Managers - 3 (HA) >- Number of Task Managers - 1 > > > Cancellation of Job fails due to following > > org.apache.flink.runtime.rest.NotFoundException: Job > 1cb2185d4d72c8c6f0a3a549d7de4ef0 not found > at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:99) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.lambda$getExecutionGraphInternal$0(DefaultExecutionGraphCache.java:109) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45) > at akka.dispatch.OnComplete.internal(Future.scala:299) > at akka.dispatch.OnComplete.internal(Future.scala:297) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) > at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:118) > at > akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:1144) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:540) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at