Help in designing the Flink usecase

2024-02-20 Thread neha goyal
Classification: External Hi, I have a use case involving calculating the lifetime order count of a customer in real-time. To reduce the memory footprint, I plan to run a batch job on stored data every morning (let's say at 5 am) to calculate the total order count up to that moment. Additionally,

Query around Rocksdb

2023-07-01 Thread neha goyal
Hello, I am trying to debug the unbounded memory consumption by the Flink process. The heap size of the process remains the same. The size of the RSS of the process keeps on increasing. I suspect it might be because of RocksDB. we have the default value for state.backend.rocksdb.memory.managed

Flink Sql erroring at runtime Flink 1.16

2023-05-17 Thread neha goyal
Hello, Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or TRIM functions(there might be more such functions), I am getting the exception. These functions used to work fine with Flink 1.13. select if( address_id = 'a', 'default', upper(address_id) ) as

Re: Flink Job Failure for version 1.16

2023-05-12 Thread neha goyal
:35 AM neha goyal wrote: > I have recently migrated from 1.13.6 to 1.16.1, I can see there is a > performance degradation for the Flink pipeline which is using Flink's > managed state ListState, MapState, etc. Pipelines are frequently failing > with the Exception: > > 06:59:42.021

Re: Flink Sql erroring at runtime

2023-05-11 Thread neha goyal
utput.pushToOperator(CopyingChainingOutput.java:82) ... 28 more On Mon, May 8, 2023 at 11:48 AM Hang Ruan wrote: > Hi, neha, > > I think the error occurred because of the deserialization. Is there some > example data and runnable SQLs to reproduce the problem? > > Best, >

Flink Job Failure for version 1.16

2023-05-10 Thread neha goyal
I have recently migrated from 1.13.6 to 1.16.1, I can see there is a performance degradation for the Flink pipeline which is using Flink's managed state ListState, MapState, etc. Pipelines are frequently failing with the Exception: 06:59:42.021 [Checkpoint Timer] WARN

Question about Flink metrics

2023-05-05 Thread neha goyal
Hello, I have a question about the Prometheus metrics. I am able to fetch the metrics from the following expression. sum(flink_jobmanager_job_numRestarts{job_name="$job_name"}) by (job_name) Now I am interested in only a few jobs and I want to give them a label. How to achieve this? How to give an

Flink Sql erroring at runtime

2023-05-02 Thread neha goyal
Hello, I am using Flink 1.16.1 and observing a different behavior from Flink 1.13.6. SELECT if(some_string_field is null, 'default', 'some_string_field') from my_stream This SQL flink job in the streaming environment is erroring out during runtime with the exception mentioned below. There are

Re: Flink not releasing the reference to a deleted jar file

2023-04-19 Thread neha goyal
ot > be deleted when one flink job exists. > > Best, > Shammon FY > > > On Wed, Apr 19, 2023 at 1:37 PM neha goyal wrote: > >> Adding to the above query, I have tried dropping the tables and the >> function as well but no luck. >> >> On Wed, Apr 19,

Re: Flink not releasing the reference to a deleted jar file

2023-04-18 Thread neha goyal
Adding to the above query, I have tried dropping the tables and the function as well but no luck. On Wed, Apr 19, 2023 at 11:01 AM neha goyal wrote: > Hello, > > I am attaching a sample code and screenshot where Flink is holding the > reference to a jar file even a

Flink not releasing the reference to a deleted jar file

2023-04-18 Thread neha goyal
Hello, I am attaching a sample code and screenshot where Flink is holding the reference to a jar file even after I close the streamExecutionEnvironment. Due to this, the deleted file is not getting cleaned up from the disk and we are getting disc space alerts. When we restart our application

Is there any API method for dynamic loading of the UDF jar

2023-02-26 Thread neha goyal
Hello, In Flink 16, CREATE FUNCTION USING JAR functionality has been introduced where we can specify the jar resources and the jar can be located in a remote file system such as hdfs/s3. I don't see an alternative method for the same functionality using the TableEnvironment methods call, for