Hey all,

I've run into an issue with the State Processor API. To highlight the
issues I've been having, I've created a reference repository that will
demonstrate the issue (repository:
https://github.com/segmentio/flink-state-management).

The current implementation of the pipeline has left us with keyed state
that we no longer need, and we don't have references some of the old keys.
My plan was to:
1. create a savepoint
2. read the keys from each operator (using State Processor API)
3. filter out all the keys that are longer used
4. bootstrap a new savepoint that contains the filtered state

I managed to get this working using a sample pipeline and a very basic key
(a string), but when I switched the key to be something more complex (a
case class of two strings), I started seeing this exception:
Caused by: org.apache.flink.util.StateMigrationException: The new key
serializer must be compatible.
at
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 13 more

Has anyone come across this before and figured out a fix? Any help you can
give would be greatly appreciated!

Thanks,
-- 
<http://segment.com/>
Mark Niehe ·  Software Engineer
Integrations
<https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·  Blog
<https://segment.com/blog?utm_source=signature&utm_medium=email>  ·  We're
Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email>

Reply via email to