… didn’t mean to hit the send button so soon 😊

I guess we are getting closer to a solution


Thias



From: Schwalbe Matthias
Sent: Freitag, 15. Oktober 2021 08:49
To: 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org>
Subject: RE: Any issues with reinterpretAsKeyedStream when scaling partitions?

Hi Dan again 😊,

I shed a second look … from what I see from your call stack I conclude that 
indeed you have a network shuffle between your two operators,
In which case reinterpretAsKeyedStream wouldn’t work

($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277 
indicates that the two operators are not chained)


… just as a double-check could you please share both your

  *   Execution plan (call println(env.getExecutionPlan) right before your call 
env.execute) (json), and
  *   Your job plan (screenshot from flink dashboard)

There is a number of preconditions before two operators get chained, and 
probably one of them fails (see [1]):

  *   The two operators need to allow chaining the resp. other (see [2] … 
chaining strategy)
  *   We need a ForwardPartitioner in between
  *   We need to be in streaming mode
  *   Both operators need the same parallelism
  *   Chaining needs to be enabled for the streaming environment
  *   The second operator needs to be single-input (i.e. no TwoInputOp nor 
union() before)


[1] 
https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
[2] 
https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932


From: Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>>
Sent: Donnerstag, 14. Oktober 2021 17:50
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Any issues with reinterpretAsKeyedStream when scaling partitions?

I have a job that uses reinterpretAsKeyedStream across a simple map to avoid a 
shuffle.  When changing the number of partitions, I'm hitting an issue with 
registerEventTimeTimer complaining that "key group from 110 to 119 does not 
contain 186".  I'm using Flink v1.12.3.

Any thoughts on this?  I don't know if there is a known issue with 
reinterpretAsKeyedStream.

Rough steps:
1. I have a raw input stream of View records.  I keyBy the View using 
Tuple2<Long, String>(platform_id, log_user_id).
2. I do a small transformation of View to a TinyView.  I 
reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.  The 
keys are the same.
3. I use the TinyView in a KeyedCoProcessFunction.

When I savepoint and start again with a different number of partitions, my 
KeyedCoProcessFunction hits an issue with registerEventTimeTimer and complains 
that "key group from 110 to 119 does not contain 186".  I verified that the key 
does not change and that we use Tuple2 with primitives Long and String.



2021-10-14 08:17:07
java.lang.IllegalArgumentException: view x insertion issue with 
registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174), 
flat=platform_id: 120
log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
log_timestamp: 1634224329606
view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"

                at 
ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
                at 
ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
                at 
ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
                at 
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
                at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
                at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
                at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
                at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
                at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
                at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
                at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
                at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: key group from 110 to 119 does 
not contain 186
                at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
                at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
                at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
                at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
                at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
                at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
                at 
org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
                at 
ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
                ... 17 more
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to