Serialize and Parse ResolvedExpression

2023-09-06 Thread Xianxun Ye
Hi Team,I want to Serialize the ResolvedExpression to String or byte[] and transmit it into LookupFunction, and parse it back to ResolvedExpression in the LookupFunction.For my case:Select * from left_stream as s join dim_table for system_time as of s.proc_time as d on s

RE: Re: Re: How to read flinkSQL job state

2023-09-06 Thread Yifan He via user
Hi Hangxiang, We are using flink 1.14, the state backend is EmbeddedRocksDBStateBackend , and the Checkpoint Storage is filesystem. This is the checkpoint configuration from our running jobs Checkpointing Mode Exactly Once Checkpoint Storage FileSystemCheckpointStorage State Backend EmbeddedRocksD

Re: Kubernetes operator listing jobs TimeoutException

2023-09-06 Thread Evgeniy Lyutikov
Hi all! We update operator to version 1.6.0 and the problem remains. According to the log of these errors, there are even more errors than it was in version 1.4.0 How can this be fixed? 2023-09-07 06:11:37,742 o.a.f.k.o.FlinkOperator[INFO ] Starting Flink Kubernetes Operator (Version

Re: Re: How to read flinkSQL job state

2023-09-06 Thread Hangxiang Yu
Hi, Yifan. Which flink version are you using ? You are using filesystem instead of rocksdb so that your checkpoint size may not be incremental IIUC. On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user wrote: > Hi Shammon, > > We are using RocksDB,and the configuration is below: > execution.checkpo

Re: Re: How to read flinkSQL job state

2023-09-06 Thread Hangxiang Yu
Hi, Yifan. If you enable the debug level log, you could see the log like 'Generated hash xxx for node xxx'. I haven't found other ways to find the operator id of SQL jobs, maybe I missed something, or we should export this info more directly. Unfortunately, there is no default state name for an ope

Flink job reading from s3 path

2023-09-06 Thread Hou, Lijuan via user
Hi team, I want to implement a flink job to read avro files from s3 path, and output to a kafka topic. Currently, I am using AvroInputFormat like this: AvroInputFormat avroInputFormat = new AvroInputFormat<>(new Path(S3PathString), Session.class); TypeInformation typeInfo = TypeInformation.

Re: Keytab Setup on Kubernetes

2023-09-06 Thread Chirag Dewan via user
Thanks Greg, this is a really helpful reply.  >Any kind of Kerberos usage is starting with a "create a KDC server in your >environment". That sever must be set. When I say that I am kind of referring to Windows users with inbuild KDC and AD. That would require kinit for the AS. I was wondering h

Async IO metrics for tps

2023-09-06 Thread patricia lee
Hi flink users, I used Async IO (RichAsyncFunction) for sending 100 txns to a 3rd party. I check the runtimeContex that it has metric of numRecordsSent, we wanted to expose this metric to our prometheus server so that we can monitor how much records we are sending per second. The reason why we ne

RE: Re: How to read flinkSQL job state

2023-09-06 Thread Yifan He via user
Hi Hangxiang, Thanks for your answer! We are using RocksDB state backend, and the incremental checkpoint is enabled, and it is the incremental size keeps increasing. We didn't add any custom checkpoint configuration in flink sql jobs, where can I see the log of StreamGraphHasherV2.generateDetermin

RE: Re: How to read flinkSQL job state

2023-09-06 Thread Yifan He via user
Hi Shammon, We are using RocksDB,and the configuration is below: execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints: 1 execution.checkpointing.min-pause: 0 execution.checkpointing.mode: EXACTLY_ONCE execution.checkpo

Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
This is our yarn related settings: yarn.scheduler.fair.assignmultiple: "true" yarn.scheduler.fair.dynamic.max.assign: "false" yarn.scheduler.fair.max.assign: 1 any suggestions? Best Lu On Wed, Sep 6, 2023 at 9:16 AM Lu Niu wrote: > Hi, Thanks for all your help. Are there any other insights? >

Re: Strange exception in UnitTest when migrate Flink v1.16.2 to v1.17.1

2023-09-06 Thread Teunissen, F.G.J. (Fred) via user
Thanks for the insight, it did help indeed. I’ve added `org/apache/calcite**` and `org/apache/flink**` to the exclude-list in the `prepare-agent` goal of the jacoco-maven-plugin and that did the trick. Regards, Fred From: Aniket Sule Date: Wednesday, 6 September 2023 at 21:07 To: Teunissen, F.

RE: Strange exception in UnitTest when migrate Flink v1.16.2 to v1.17.1

2023-09-06 Thread Aniket Sule
Hello, You could look at https://github.com/hazelcast/hazelcast/issues/20945 to see if the workaround in the linked commit helps. I had faced test failures in upgrading from Flink 1.16 to 1.17, and that workaround resolved the test failures. Hope this helps. Regards Aniket From: Teunissen, F.G

Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
Thanks, I'll check it out! Best Lu On Wed, Sep 6, 2023 at 10:09 AM Biao Geng wrote: > Hi, > > > > If your YARN cluster uses fair scheduler, maybe you can check if the > yarn.scheduler.fair.assignmultiple > >

Re: Disabling Job Submissions through Flink UI

2023-09-06 Thread Muazim Wani
Hi Team, I came across this thread Jar Not Found . In which it is discussed that "If you configure `web.submit.enable=false` in your cluster, you could not upload a jar job, but you can still submit jobs via rest endpoint. You can cr

Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Biao Geng
Hi, If your YARN cluster uses fair scheduler, maybe you can check if the yarn.scheduler.fair.assignmultiple config is set. If that’s the case, then adjusting yarn.scheduler.fair.dynamic.max.assign and yarn.sc

Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
Hi, Thanks for all your help. Are there any other insights? Best Lu On Wed, Aug 30, 2023 at 11:29 AM Lu Niu wrote: > No. we don't use yarn.taskmanager.node-label > > Best > Lu > > On Tue, Aug 29, 2023 at 12:17 AM Geng Biao wrote: > >> Maybe you can check if you have set yarn.taskmanager.node-

Re: Memory Leak

2023-09-06 Thread Neha . via user
We also faced the same issue with Flink 1.16.1. Please enable jemalloc as a memory allocator, it fixed the issue for us. On Wed, Sep 6, 2023 at 9:07 PM Kenan Kılıçtepe wrote: > Hi, > Thanks for the answer. > I will try the documents you have shared. > But still it would be great if you can take

Re: Memory Leak

2023-09-06 Thread Kenan Kılıçtepe
Hi, Thanks for the answer. I will try the documents you have shared. But still it would be great if you can take a look at the numbers below and give some tips. At the moment RSS is 46.6GB although taskmanager.memory.process.size is set to 4m GC Statistics: 2023-09-06 15:15:03,785 INFO org.

Flink KafkaSource failure on empty partitions

2023-09-06 Thread David Clutter
I am using Flink 1.13.1 on AWS EMR and I seem to have hit this bug: https://issues.apache.org/jira/browse/FLINK-27041. My job will fail when there are empty partitions. I see it is fixed in a newer version of Flink but I cannot update Flink version at this time. Suggestions on a workaround? I a

Re: Memory Leak

2023-09-06 Thread Biao Geng
Hi Kenan, If you have confirmed the heap memory is ok(e.g. no Java OOM exception and no frequent GC), then the cause may be off-heap memory over usage, especially when your flink job uses some native library. To diagnose such problem, you can refer to [1][2] for more details about using NMT and jep

Re: backpressured metrics doesnt work

2023-09-06 Thread Kenan Kılıçtepe
Hi Ron, Thanks for your answer. The problem was with a job in my job graph. As it is locked immediately, no backpressure metrics were emitted. I think at least once, all jobs should be in free status. Kenan On Wed, Sep 6, 2023 at 12:35 PM liu ron wrote: > Hi, Kenan > > I think you need to prov

Memory Leak

2023-09-06 Thread Kenan Kılıçtepe
Hi, I have Flink 1.16.2 on a single server with 64GB Ram. Although taskmanager.memory.process.size is set to 4m, I can see memory usage of the task manager exceed 59GB and OS kills it because of OOM. I check the RSS column of application top for memory usage. I don`t see any heap memory p

Re: Help needed on stack overflow query

2023-09-06 Thread Feng Jin
Hi Nihar, Have you tried using the following configuration: metrics.reporter.my_reporter.filter.includes: jobmanager:*:*;taskmanager:*:* Please note that the default delimiter for the List parameter in Flink is ";". Best regards, Feng On Thu, Aug 24, 2023 at 11:36 PM Nihar Rao wrote: > Hello,

Strange exception in UnitTest when migrate Flink v1.16.2 to v1.17.1

2023-09-06 Thread Teunissen, F.G.J. (Fred) via user
Hi community, I would like to ask for some help in solving a strange failure in a Unit Test when code coverage (jacoco) is enabled. We have a project with a custom UDF that uses the MiniClusterExtension in a Unit Test. The Unit Test works fine when built for Flink v1.16.2, but it fails when bu

Re: How to read flinkSQL job state

2023-09-06 Thread liu ron
Hi, Yifan Flink SQL & Table API currently doesn't support reading the state directly. Best, Ron Yifan He via user 于2023年9月6日周三 13:11写道: > Hi team, > > We are investigating why the checkpoint size of our FlinkSQL jobs keeps > growing and we want to look into the checkpoint file to know what is

Re: backpressured metrics doesnt work

2023-09-06 Thread liu ron
Hi, Kenan I think you need to provide more context, maybe help to find the root cause. Best, Ron Kenan Kılıçtepe 于2023年9月4日周一 21:49写道: > Hi, > > Any idea why backpressured metrics are not working and how I can fix it? > > [image: image.png] > > Thanks > Kenan > >

Re: Send data asynchronously to a 3rd party via SinkFunction

2023-09-06 Thread liu ron
Hi, patricia If you want to use SinkFunction, maybe you should use `RichSinkFunction`[1], you can close the resources in close method. [1] https://github.com/apache/flink/blob/7d8f9821d2b3ed9876eae4ffe2e3c8b86af2d88a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink

Re: Question regarding asyncIO timeout

2023-09-06 Thread liu ron
Hi, Leon > Besides that, Do you know if the async timeout is actually a global timeout? meaning it accounts for the time of each attempt call plus any interval time in between. Yes, the timeout is total timeout, you can see [1][2] for more detail. [1] https://cwiki.apache.org/confluence/pages/v

Re: How to read flinkSQL job state

2023-09-06 Thread Shammon FY
Hi Yifan, Besides reading job state, I would like to know what statebackend are you using? Can you give the configurations about state and checkpoint for your job? Maybe you can check these configuration items to confirm if they are correct first. Best, Shammon FY On Wed, Sep 6, 2023 at 3:17 PM

Re: How to read flinkSQL job state

2023-09-06 Thread Hang Ruan
Hi, Yifan. I think the document[1] means to let us convert the DataStream to the Table[2]. Then we could handle the state with the Table API & SQL. Best, Hang [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/ [2] https://nightlies.apache.org/flink/flin

Re: How to read flinkSQL job state

2023-09-06 Thread Hangxiang Yu
Hi, Yifan. Unfortunately, The State Processor API only supports Datastream currently. But you still could use it to read your SQL job state. The most difficult thing is that you have to get the operator id which you could get from the log of StreamGraphHasherV2.generateDeterministicHash and state n