Hi Stefan, Vino,
Thanks for your answers.

We are using full checkpointing, not incremental. We are using custom
serializers for the operators state classes, The serializers perform
encryption before writing and decrypt when reading. The serializer is
stateless.
We register the Serializers by using
env.getConfig()
      .registerTypeWithKryoSerializer(ProcessState.class,
ProcessStateSerializer.class);

In normal cases the Serialization works correctly, even after recovering
from a failure. We get this error only when taskmnager fails due to memory
problems.

Thanks again for your help,
Edward

El vie., 7 sept. 2018 a las 11:51, Stefan Richter (<
s.rich...@data-artisans.com>) escribió:

> Hi,
>
> what I can say is that any failures like OOMs should not corrupt
> checkpoint files, because only successfully completed checkpoints are used
> for recovery by the job manager. Just to get a bit more info, are you using
> full or incremental checkpoints? Unfortunately, it is a bit hard to say
> from the given information what the cause of the problem is. Typically,
> these problems have been observed when something was wrong with a
> serializer or a stateful serializer was used from multiple threads.
>
> Best,
> Stefan
>
> Am 07.09.2018 um 05:04 schrieb vino yang <yanghua1...@gmail.com>:
>
> Hi Edward,
>
> From this log: Caused by: java.io.EOFException, it seems that the state
> metadata file has been corrupted.
> But I can't confirm it, maybe Stefan knows more details, Ping him for you.
>
> Thanks, vino.
>
> Edward Rojas <edward.roja...@gmail.com> 于2018年9月7日周五 上午1:22写道:
>
>> Hello all,
>>
>> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
>> When performing some load testing we got an /OutOfMemoryError: native
>> memory
>> exhausted/, causing the job to fail and be restarted.
>>
>> After the Taskmanager is restarted, the job is recovered from a
>> Checkpoint,
>> but it seems that there is a problem when trying to access the state. We
>> got
>> the error from the *onTimer* function of a *onProcessingTime*.
>>
>> It would be possible that the OOM error could have caused to checkpoint a
>> corrupted state?
>>
>> We get Exceptions like:
>>
>> TimerException{java.lang.RuntimeException: Error while retrieving data
>> from
>> RocksDB.}
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:277)
>>         at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
>>         at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>>         at java.lang.Thread.run(Thread.java:811)
>> Caused by: java.lang.RuntimeException: Error while retrieving data from
>> RocksDB.
>>         at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
>>         at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
>>         at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>>         at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
>>         at
>>
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>         ... 7 more
>> Caused by: java.io.EOFException
>>         at java.io.DataInputStream.readFully(DataInputStream.java:208)
>>         at java.io.DataInputStream.readUTF(DataInputStream.java:618)
>>         at java.io.DataInputStream.readUTF(DataInputStream.java:573)
>>         at
>>
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
>>         at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
>>         ... 12 more
>>
>>
>> Thanks in advance for any help
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*

Reply via email to