It seems that the key's hashcode is not stable. So would you like to show the details of the `TraceKeyOuterClass.TraceKey`.
Best, Guowei On Sun, Mar 20, 2022 at 3:21 PM Prashant Deva <[email protected]> wrote: > here is the key code (in kotlin) > > val ks = object: KeySelector<Tuple2<TraceKeyOuterClass.TraceKey, > TraceFragmentOuterClass.TraceFragment>, TraceKeyOuterClass.TraceKey> { > override fun getKey(it:Tuple2<TraceKeyOuterClass.TraceKey, > TraceFragmentOuterClass.TraceFragment>): TraceKeyOuterClass.TraceKey { > return it.f0 > } > } > > and here is the code that uses it: > > env.addSource(kafkaConsumer, name_source) > > .name(name_source).uid(name_source).setMaxParallelism(Config.MAX_PARALLELISM) > .keyBy (ks) > > .window(EventTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.seconds(60))) > .process(MyProcessor()) > > .name(name_processor).uid(name_processor).setMaxParallelism(Config.MAX_PARALLELISM) > .addSink(kafkaProducer) > .uid(name_sink).name(name_sink) > > > i am using protobufserializer from chill-protobuf library for serde. its > configured as follows: > > > env.config.registerTypeWithKryoSerializer(TraceFragmentOuterClass.TraceFragment::class.java, > ProtobufSerializer::class.java) > > env.config.registerTypeWithKryoSerializer(TraceKeyOuterClass.TraceKey::class.java, > ProtobufSerializer::class.java) > > env.config.registerTypeWithKryoSerializer(FullTraceOuterClass.FullTrace::class.java, > ProtobufSerializer::class.java) > > env.config.registerTypeWithKryoSerializer(SpanOuterClass.Span::class.java, > ProtobufSerializer::class.java) > > > On Sun, Mar 20, 2022 at 12:15 AM caoyu <[email protected]> wrote: > >> Would you like copy the key code here to help debugging. >> >> ---- Replied Message ---- >> From Prashant Deva<[email protected]> <[email protected]> >> Date 03/20/2022 12:24 >> To user<[email protected]> <[email protected]> >> Subject exception when parallelizing application >> using flink 1.13.2. When i increase the parallelization of my application >> from 1 to 2, i see the following exceptions. what do they mean? how can i >> possibly fix this? >> >> java.lang.IllegalArgumentException: key group from 128 to 256 does not >> contain 89 >> at >> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) >> at >> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:922) >> at >> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:44) >> at >> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:936) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) >> at java.base/java.lang.Thread.run(Thread.java:829) >> >>
