Hello Yun,
Thank you very much for your response, that's what I thought,
However, it does not seem possible to remove only one state using the state
processor API,
We use it a lot, and we can only remove all of the operator states, not one
specifically,
Am I missing something?

Best Regards,
Bastien

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mar. 8 déc. 2020 à 08:54, Yun Tang <[email protected]> a écrit :

> Hi Bastien,
>
> Flink supports to register state via state descriptor when
> calling runtimeContext.getState(). However, once the state is registered,
> it cannot be removed anymore. And when you restore from savepoint, the
> previous state is registered again [1]. Flink does not to drop state
> directly and you could use state processor API [2] to remove related state.
>
>
> [1]
> https://github.com/apache/flink/blob/d94c7a451d22f861bd3f79435f777b427020eba0/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java#L171
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> ------------------------------
> *From:* bastien dine <[email protected]>
> *Sent:* Tuesday, December 8, 2020 0:28
> *To:* user <[email protected]>
> *Subject:* Problem when restoring from savepoint with missing state &
> POJO modification
>
> Hello,
> We have experienced some weird issues with POJO mapState in a streaming
> job upon checkpointing when removing state, then modifying the state POJO
> and restoring job
>
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)
>
> Reproduced in Flink 1.10 & 1.11
> (full stack below)
>
>
> *Context : *We have a streaming job with a state name "buffer" and POJO
> Buffer inside a CoFlatMap function
>
> MyCoFlat:
> *public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> {*
> *transient MapState<String, Buffer> buffer;*
> *@Override*
> *public void open(Configuration parameters) {*
> *buffer = getRuntimeContext().getMapState(new
> MapStateDescriptor<>("buffer", String.class, Buffer.class));*
>
> *} ....*
>
> Buffer :
>
>
>
>
> *public class Buffer { private String field1; private String field2;
> private String field3; ... + empty constructor  + getter / setter for POJO
> consideration*
>
> We had some troubles with our job, so we rework 2 things :
>  - we removed field2 in Buffer class,
>  - we stopped using "buffer" state anymore
>
> When restoring with savepoint (--allowNonRestoredState) we have the
> exception below
> The job is submitted to the cluster but fails on checkpointing, job is
> totally stuck.
>
>
> *Debug: *Debugging showed us some stuff, the exception is raised here (as
> expected):
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public PojoSerializer( Class<T> clazz, TypeSerializer<?>[]
> fieldSerializers, Field[] fields, ExecutionConfig executionConfig) {
> this.clazz = checkNotNull(clazz); this.fieldSerializers =
> (TypeSerializer<Object>[]) checkNotNull(fieldSerializers); this.fields =
> checkNotNull(fields); this.numFields = fieldSerializers.length;
> this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i <
> numFields; i++) { this.fields[i].setAccessible(true); <---- HERE }*
>
> In our fields, we have field[0] & field[2] but field[1] is totally missing
> from the array, that's why we have the NPE over here, when i=1
>
> So what we have done is to put this state back in our streaming job (with
> the missing field and POJO), redeploy with old savepoint and this went
> totally fine
> Then we have redeploy a job without this state
> This has been a 2 times deployment for our job (1 -> modify the POJO, 2 ->
> remove the state using this POJO)
> But the non-used-anymore state is still (at least the serializer) in the
> savepoints, we could be facing this problem again when we will
> modify Buffer POJO later.
> Finally we just modify a savepoint with API and remove this state once for
> all, and restart from it.
>
> I have a couple of questions here:
> Why does flink keep a non-used state in a savepoint (even if it can not
> map it into a new topology and allowNonRestoredState is checked ?)
> Why does flink not handle this case ? Behaviour seems to be different
> between an existing POJO state and this non used POJO state
> How can I clean my savepoint ? I don't want them to contain non-used state
>
> If anybody has experienced an issue like that before or knows how to
> handle this, I would be glad to discuss !
> Best regards,
>
> ------------------
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
>

Reply via email to