Not exactly reproduciable. I cannot reproduce the problem in a test
environment with the same setting. This is only reproducible in PRD. I
guess it has something to do with the state itself.


Where you added the debugging log ? In RocksDBListState#get() ?
>
I added the debug log in
*org.apache.flink.contrib.streaming.state.RocksDBListState#migrateSerializedValue
*that prints message: before migration object size *480* :after object size
:*481*
and another debug log in
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.ListElementFilter#nextUnexpiredOffset
that prints: object size *480*

If I understand the code correctly the first debug log verifies the object
has been successfully serliazlied back to RocksDB with new schema however
the second debug log suggests the object is still in old schema when the
state is queried

On Mon, Aug 8, 2022 at 12:28 PM Hangxiang Yu <master...@gmail.com> wrote:

> Hi,
> IIUC, Conditions to reproduce it are:
> 1. Using RocksDBStateBackend with incremental strategy
> 2. Using ListState in the stateful operator
> 3. enabling TTL with cleanupInRocksdbCompactFilter
> 4. adding a field to make the job trigger schema evolution
> Then the exception will be thrown, right?
>
> As for the next question about why the value is not updated in RocksDB,
> Where you added the debugging log ? In RocksDBListState#get() ?
>
> Best,
> Hangxiang.
>
> On Wed, Aug 3, 2022 at 5:32 PM tao xiao <xiaotao...@gmail.com> wrote:
>
>> Hi team,
>>
>> I encountered below exception after I added a new field to a POJO used in
>> list state and resumed the job from checkpoint
>>
>>
>>
>>> [error occurred during error reporting , id
>>> 0xb]\n","stream":"stdout","time":
>>> \n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> #   http://bugreport.java.com/bugreport/crash.jsp\n
>>> ","stream":"stdout","time":
>>> # If you would like to submit a bug report, please
>>> visit:\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> # /opt/flink/hs_err_pid1.log\n","stream":"stdout","time":
>>> # An error report file with more information is saved
>>> as:\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> # Core dump written. Default location: /opt/flink/core or
>>> core.1\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> # V  [libjvm.so+0x57595f]  Exceptions::_throw(Thread*, char const*, int,
>>> Handle, char const*)+0x1ef\n","stream":"stdout","time":\# Problematic
>>> frame:\n","stream":"stdout","time":
>>> # Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64
>>> compressed oops)\n","stream":"stdout","time":
>>> # JRE version: OpenJDK Runtime Environment (8.0_302-b08) (build
>>> 1.8.0_302-b08)\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> #  SIGSEGV (0xb) at pc=0x00007f6a2ad5d95f, pid=1,
>>> tid=0x00007f69bac9f700\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> # A fatal error has been detected by the Java Runtime
>>> Environment:\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":]
>>>
>>>
>>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:193)
>>>
>>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:243)
>>>
>>> org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:412)
>>> at java.lang.reflect.Field.set(Field.java:764)
>>> at
>>> sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:80)
>>> at
>>> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
>>> at
>>> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
>>> Caused by: java.lang.IllegalArgumentException: Can not set int field
>>> xxxxx.hr to null value
>>>
>>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:214)
>>> Exception in thread \"Thread-23\"
>>> org.apache.flink.util.FlinkRuntimeException: Failed to deserialize list
>>> element for TTL compaction filter
>>>
>>
>> I verified that Flink recognized the state change
>>
>> Performing state migration for state ListStateDescriptor{name=novimp,
>>> defaultValue=null,
>>> serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6fe249e5}
>>> because the state serializer's schema, i.e. serialization format, has
>>> changed.
>>>
>>
>> and successfully migrated the state with new POJO class (I updated Flink
>> source code with my own debugging log)
>>
>> before migration object size *480* :after object size :*481*
>>
>>
>> However when the exception occurred the object read from state has byte
>> length of original size not updated size. It appears that the state
>> migration during the state recovery phase didn't take effect or persist to
>> RocksDB. Can you pls give me some pointers to debug this problem further?
>>
>>> object size *480*
>>>
>>
>>
>> Flink version is 1.13.2
>> RocksDB statebackend with incremental, aligned checkpoint
>> List state with TTL (24H) enabled
>>
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao

Reply via email to