StateMigrationException while using stateTTL

2024-05-22 Thread irakli.keshel...@sony.com
Hello,

I'm using Flink 1.17.1 and I have stateTTL enabled in one of my Flink jobs 
where I'm using the RocksDB for checkpointing. I have a value state of Pojo 
class (which is generated from Avro schema). I added a new field to my schema 
along with the default value to make sure it is backwards compatible, however 
when I redeployed the job, I got StateMigrationException. I have similar setup 
with other Flink jobs where adding a column doesn't cause any trouble.

This is my stateTTL config:

StateTtlConfig
 .newBuilder(Time.days(7))
 .cleanupInRocksdbCompactFilter(1000)
 .build

This is how I enable it:

val myStateDescriptor: ValueStateDescriptor[MyPojoClass] =
 new ValueStateDescriptor[MyPojoClass](
   "test-name",
   classOf[MyPojoClass])

myStateDescriptor.enableTimeToLive(initStateTTLConfig())

This is the exception I end up with:

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51) must 
not be incompatible with the old state serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51).
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:755)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:222)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:145)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:129)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:69)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 25 more

Does anyone know what is causing the issue?

Cheers,
Irakli




Re: Flink Batch Execution Mode

2024-03-13 Thread irakli.keshel...@sony.com
Hi Feng,

I'm using flink-connector-kafka 3.0.1-1.17. I see that 1.17 is affected, but 
the ticket is marked as fixed so I'm not sure if that is actually the issue.

Best,
Irakli

From: Feng Jin 
Sent: 12 March 2024 18:28
To: Keshelava, Irakli 
Cc: user@flink.apache.org 
Subject: Re: Flink Batch Execution Mode

Hi Irakli

What version of flink-connector-kafka are you using?
You may have encountered a bug [1] in the old version that prevents the source 
task from entering the finished state.


[1]. 
https://issues.apache.org/jira/browse/FLINK-31319<https://issues.apache.org/jira/browse/FLINK-31319>

Best,
Feng


On Tue, Mar 12, 2024 at 7:21 PM 
irakli.keshel...@sony.com<mailto:irakli.keshel...@sony.com> 
mailto:irakli.keshel...@sony.com>> wrote:
Hello,

I have a Flink job that is running in the Batch mode. The source for the job is 
a Kafka topic which has limited number of events. I can see that the job starts 
running fine and consumes the events, but never makes it past the first task 
and becomes idle. The Kafka source is defined to be bounded by following 
command: "KafkaSource.builder().setBounded(OffsetsInitializer.latest())".
I expect the job to consume all the events that are in the Kafka topic and then 
move to the next task, but I'm not sure if the "OffsetsInitializer.latest()" is 
the right OffsetInitializer. Can anyone help me out here? Thanks!

Cheers,
Irakli


Flink Batch Execution Mode

2024-03-12 Thread irakli.keshel...@sony.com
Hello,

I have a Flink job that is running in the Batch mode. The source for the job is 
a Kafka topic which has limited number of events. I can see that the job starts 
running fine and consumes the events, but never makes it past the first task 
and becomes idle. The Kafka source is defined to be bounded by following 
command: "KafkaSource.builder().setBounded(OffsetsInitializer.latest())".
I expect the job to consume all the events that are in the Kafka topic and then 
move to the next task, but I'm not sure if the "OffsetsInitializer.latest()" is 
the right OffsetInitializer. Can anyone help me out here? Thanks!

Cheers,
Irakli


Re: Batch mode execution

2024-03-04 Thread irakli.keshel...@sony.com
Thank you both! I'll try to switch the scheduler to "AdaptiveBatchScheduler".

Best,
Irakli

From: Junrui Lee 
Sent: 05 March 2024 03:50
To: user 
Subject: Re: Batch mode execution

Hello Irakli,

The error is due to the fact that the Adaptive Scheduler doesn’t support batch 
jobs, as detailed in the Flink documentation[1]. When operating in reactive 
mode, Flink automatically decides the type of scheduler to use. For batch 
execution, the default scheduler is AdaptiveBatchScheduler, not 
AdaptiveScheduler as in the streaming case.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#limitations<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#limitations>

Best regards,
Junrui

lorenzo.affetti.ververica.com<http://lorenzo.affetti.ververica.com> via user 
mailto:user@flink.apache.org>> 于2024年3月4日周一 23:24写道:
Hello Irakli and thank you for your question.

I guess that somehow Flink enters the "reactive" mode while the adaptive 
scheduler is not configured.

I would go with 2 options to isolate your issue:

  *   Try with forcing the scheduling mode 
(https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/elastic_scaling/#adaptive-scheduler<https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/elastic_scaling/#adaptive-scheduler>)
 in your configuration: `jobmanager.scheduler: adaptive`
  *Let Flink decide for the runtime mode: if the source is bounded, you 
don't need `env.setRuntimeMode(RuntimeExecutionMode.BATCH)`, as Flink will 
understand that correctly.

Can you try one of the two and see if that helps?

For the rest: "running it in the "BATCH" mode was better as I don't have to 
deal with the Watermark Strategy". Still, you could opt for a simple 
watermarking strategy and stay with the streaming mode (for example, 
'BoundedOutOfOrcerness': 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>).
On Mar 4, 2024 at 15:54 +0100, 
irakli.keshel...@sony.com<mailto:irakli.keshel...@sony.com> 
mailto:irakli.keshel...@sony.com>>, wrote:
Hello,

I have a Flink job which is processing bounded number of events. Initially, I 
was running the job in the "STREAMING" mode, but I realized that running it in 
the "BATCH" mode was better as I don't have to deal with the Watermark 
Strategy. The job is reading the data from the Kafka topic and was running fine 
in the "STREAMING" mode.
I switched the job to the "BATCH" mode by setting 
"env.setRuntimeMode(RuntimeExecutionMode.BATCH)". I changed the Kafka Source to 
be bounded by setting ".setBounded(OffsetsInitializer.latest())" to the source 
builder and I updated the Watermark strategy to be 
"WatermarkStrategy.noWatermarks()".
After making these changes and deploying the job I end up with following error: 
"java.lang.IllegalStateException","error.message":"Adaptive Scheduler is 
required for reactive mode". I couldn't find any documentation online which is 
connecting "Adaptive Scheduler" to the "BATCH" processing. Does anyone know 
where this error is coming from and how I can deal with it?

Cheers,
Irakli


Batch mode execution

2024-03-04 Thread irakli.keshel...@sony.com
Hello,

I have a Flink job which is processing bounded number of events. Initially, I 
was running the job in the "STREAMING" mode, but I realized that running it in 
the "BATCH" mode was better as I don't have to deal with the Watermark 
Strategy. The job is reading the data from the Kafka topic and was running fine 
in the "STREAMING" mode.
I switched the job to the "BATCH" mode by setting 
"env.setRuntimeMode(RuntimeExecutionMode.BATCH)". I changed the Kafka Source to 
be bounded by setting ".setBounded(OffsetsInitializer.latest())" to the source 
builder and I updated the Watermark strategy to be 
"WatermarkStrategy.noWatermarks()".
After making these changes and deploying the job I end up with following error: 
"java.lang.IllegalStateException","error.message":"Adaptive Scheduler is 
required for reactive mode". I couldn't find any documentation online which is 
connecting "Adaptive Scheduler" to the "BATCH" processing. Does anyone know 
where this error is coming from and how I can deal with it?

Cheers,
Irakli


Dealing with stale Watermark

2023-10-20 Thread irakli.keshel...@sony.com
Hello,

I have a Flink application that is consuming events from the Kafka topic and 
builds sessions from them. I'm using the Keyed stream. The application runs 
fine initially, but after some time it is getting "stuck". I can see that the 
"processElement" function is processing the incoming events, but the onTimer 
function is never called. I can also see that the Watermark is getting stale (I 
guess that's why the onTimer is not called). My window duration is 1 minute, 
and the logic lives in onTimer function. Every minute I delete the old timer 
and create a new one (again with one minute duration). If certain logic is 
fulfilled, then I don't create a new timer anymore and clear the state.

The same application is running fine in another environment (this one just gets 
more data), because of this I believe the windowing logic has no flows. I can't 
find any anomalies in CPU/Memory consumption and the checkpoints are completed 
successfully as well.

Does anyone have similar issues?

Best,
Irakli