Re: Question about to modify operator state on Flink1.7 with State Processor API

2019-11-26 Thread Kaihao Zhao
Hi Vino/Seth,

Thanks Vino and Seth, changing the UID and setting offset manually is a
solution, but the pin point is we have tons of applications(owned by other
users) running on our platform, so it will be inefficient to do it
manually, and the most difficult part is to let users to change their code.
After a little dig into flink source code, I thought I found a workaround:
1. I modified flink-core 1.9's code in our migration service to skip the
Snapshot *Compability Check* and use *TupleSerializerConfigSnapshot*/
*KryoSerializerConfigSnapshot* instead of*
TupleSerializerSnapshot/KryoSerializerSnapshot*
2. Add Flink Proc Api lib to production Flink clusters' lib
Then it works, 1.7 Apps can resume from the modified savepoint.
Here is the code change I made:
https://github.com/mcgG/flink/commit/47661f97ad5ab30ee948805600c1d7f372ae85b2
I will be appreciate if you could give more suggestions.

Another interesting thing we met during the modification of the savepoint
is, current flink proc api only supports modify state by *UID*, but we
found some of our users are not using UID(I know it's not a good practice).
For this case, KafkaConnector has the keyed state named
*kafka-partition-offset-states*, I can get it's operatorID and then hack
into the internal data structure to modify the operatorStateIndex, it's a
little tricky. So I suggest maybe we could overload some new method
like: *public
void addOperator(bytep[] operatorID, BootstrapTransformation
transformation) *to make this kind of non-UID case easier.

Best,
Kaihao Zhao

On Tue, Nov 26, 2019 at 10:29 AM Seth Wiesman  wrote:

> The state proc api makes the same guarantees around savepoint
> compatibility as the rest of Flink. It is backwards compatible up to 3
> versions but there are no guarantees around forwards compatibility. A 1.9
> savepoint is not guaranteed to be resumable on a 1.7 cluster.
>
> That being said, the state proc api is overkill for this situation.
>
> All you need to do is change to UID on your source operator and set your
> offsets when configuring the source[1, 2]. When resuming your job pass the
> —allowNonRestoredState flag and your offsets will reset but all other state
> will be retained.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-happens-if-i-delete-an-operator-that-has-state-from-my-job
>
> On Mon, Nov 25, 2019 at 6:02 PM vino yang  wrote:
>
>> Hi Kaihao,
>>
>> Ping @Aljoscha Krettek  @Tzu-Li (Gordon) Tai
>>  to give more professional suggestions.
>>
>> What's more, we may need to give a statement about if the state processor
>> API can process the snapshots generated by the old version jobs.  WDYT?
>>
>> Best,
>> Vino
>>
>> Kaihao Zhao  于2019年11月25日周一 下午11:39写道:
>>
>>> Hi,
>>>
>>> We are running Flink 1.7 and recently due to Kafka cluster migration, we
>>> need to find a way to modify kafka offset in FlinkKafkaConnector's state,
>>> and we found Flink 1.9's State Processor API is the exactly tool we need,
>>> we are able to modify the operator state via State Processor API, but when
>>> trying to resume App from the modified savepoint, we found it failed with
>>> ClassNotFoundException: *TupleSerializerSnapshot*, these
>>> *TypeSerializerSnapshots* are new in Flink 1.9 but not in 1.7, so I
>>> wonder if there has any suggestion or workaround to modify 1.7's state?
>>>
>>> --
>>> Thanks & Regards
>>> Zhao Kaihao
>>>
>> --
>
> Seth Wiesman | Solutions Architect
>
> +1 314 387 1463
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


-- 
Zhao Kaihao


Re: Question about to modify operator state on Flink1.7 with State Processor API

2019-11-25 Thread vino yang
Hi Kaihao,

Ping @Aljoscha Krettek  @Tzu-Li (Gordon) Tai
 to give more professional suggestions.

What's more, we may need to give a statement about if the state processor
API can process the snapshots generated by the old version jobs.  WDYT?

Best,
Vino

Kaihao Zhao  于2019年11月25日周一 下午11:39写道:

> Hi,
>
> We are running Flink 1.7 and recently due to Kafka cluster migration, we
> need to find a way to modify kafka offset in FlinkKafkaConnector's state,
> and we found Flink 1.9's State Processor API is the exactly tool we need,
> we are able to modify the operator state via State Processor API, but when
> trying to resume App from the modified savepoint, we found it failed with
> ClassNotFoundException: *TupleSerializerSnapshot*, these
> *TypeSerializerSnapshots* are new in Flink 1.9 but not in 1.7, so I
> wonder if there has any suggestion or workaround to modify 1.7's state?
>
> --
> Thanks & Regards
> Zhao Kaihao
>


Question about to modify operator state on Flink1.7 with State Processor API

2019-11-25 Thread Kaihao Zhao
Hi,

We are running Flink 1.7 and recently due to Kafka cluster migration, we
need to find a way to modify kafka offset in FlinkKafkaConnector's state,
and we found Flink 1.9's State Processor API is the exactly tool we need,
we are able to modify the operator state via State Processor API, but when
trying to resume App from the modified savepoint, we found it failed with
ClassNotFoundException: *TupleSerializerSnapshot*, these
*TypeSerializerSnapshots* are new in Flink 1.9 but not in 1.7, so I wonder
if there has any suggestion or workaround to modify 1.7's state?

-- 
Thanks & Regards
Zhao Kaihao