Hey. I just hit a similar error in production when trying to savepoint. We also use protobufs.
Has anyone found a better fix to this? On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <trohrm...@apache.org> wrote: > Glad to hear that you solved your problem. Afaik Flink should not read the > fields of messages and call hashCode on them. > > Cheers, > Till > > On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov < > radoslav.smilya...@smule.com> wrote: > >> Hi Till, >> >> I found my problem. It was indeed related to a mutable hashcode. >> >> I was using a protobuf message in the key selector function and one of >> the protobuf fields was enum. I checked the implementation of the hashcode >> of the generated message and it is using the int value field of the >> protobuf message so I assumed that it is ok and it's immutable. >> >> I replaced the key selector function to use Tuple[Long, Int] (since my >> protobuf message has only these two fields where the int parameter stands >> for the enum value field). After changing my code to use the Tuple it >> worked. >> >> I am not sure if Flink somehow reads the protobuf message fields and uses >> the hashcode of the fields directly since the generated protobuf enum >> indeed has a mutable hashcode (Enum.hashcode). >> >> Nevertheless it's ok with the Tuple key. >> >> Thanks for your response! >> >> Best Regards, >> Rado >> >> >> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Rado, >>> >>> it is hard to tell the reason w/o a bit more details. Could you share >>> with us the complete logs of the problematic run? Also the job you are >>> running and the types of the state you are storing in RocksDB and use as >>> events in your job are very important. In the linked SO question, the >>> problem was a type whose hashcode was not immutable. >>> >>> Cheers, >>> Till >>> >>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov < >>> radoslav.smilya...@smule.com> wrote: >>> >>>> Hello all, >>>> >>>> I am running a Flink job that performs data enrichment. My job has 7 >>>> kafka consumers that receive messages for dml statements performed for 7 db >>>> tables. >>>> >>>> Job setup: >>>> >>>> - Flink is run in k8s in a similar way as it is described here >>>> >>>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions> >>>> . >>>> - 1 job manager and 2 task managers >>>> - parallelism is set to 4 and 2 task slots >>>> - rocksdb as state backend >>>> - protobuf for serialization >>>> >>>> Whenever I try to trigger a savepoint after my state is bootstrapped I >>>> get the following error for different operators: >>>> >>>> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in >>>> KeyGroupRange{startKeyGroup=32, endKeyGroup=63}. >>>> at >>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) >>>> at >>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) >>>> at >>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319) >>>> at >>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261) >>>> >>>> Note: key group might vary. >>>> >>>> I found this >>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> >>>> article >>>> in Stackoverflow which relates to such an exception (btw my job graph looks >>>> similar to the one described in the article except that my job has more >>>> joins). I double checked my hashcodes and I think that they are fine. >>>> >>>> I tried to reduce the parallelism to 1 with 1 task slot per task >>>> manager and this configuration seems to work. This leads me to a direction >>>> that it might be some concurrency issue. >>>> >>>> I would like to understand what is causing the savepoint failure. Do >>>> you have any suggestions what I might be missing? >>>> >>>> Thanks in advance! >>>> >>>> Best Regards, >>>> Rado >>>> >>>