Hi Viacheslav,

Certainly I can …

There is two parts to it,

  *   setting up such MultipleInputStreamOperator, which is documented (sort 
of), but not quite complete
     *   I can prepare some boiler-plate, not today, but in the next days (if 
you are interested)
  *   Second part is about how to put all joins and other operations into a 
single operator implementation (well, you exactly do that 😊 ):
     *   Equi-joins on the key, you can process per Input() implementation and 
state kept from other inputs
     *   Windowing is restricted to a single window key type (a Namespace in 
Flink-speak) for your operator
        *   Windowing can be implemented manually and modelled after the 
official Flink windowing operators
     *   Should you absolutely need more than one windowing namespace, then you 
need to become creative with state primitives
  *   You mentioned also broadcast streams, that is in the end you’ll have more 
than 2 input streams, the keyed ones + the broadcast streams
     *   This is where MultipleInputStreamOperator comes into play, because you 
are not restricted to only 2 input streams as in the KeyedCoProcessFunction case
     *   That gives you more freedom to combine data in a single operator 
instead of being forced to split/chain multiple operators

Kind regards

Thias




From: Viacheslav Chernyshev <v.chernys...@outlook.com>
Sent: Tuesday, February 28, 2023 3:42 PM
To: user@flink.apache.org
Subject: Re: Is it possible to preserve chaining for multi-input operators?

Hi Matthias,

Thank you for the reply. You are absolutely right, the first keyBy is 
unavoidable, but after that we fix the parallelism and maintain the same key 
throughout the pipeline.

The MultipleInputStreamOperator approach that you've described looks very 
interesting! Unfortunately, I have never used it before. Would you be able to 
share the details for how to force the chaining with e.g. two input streams?

Kind regards,
Viacheslav
________________________________
From: Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>>
Sent: 28 February 2023 14:12
To: Viacheslav Chernyshev 
<v.chernys...@outlook.com<mailto:v.chernys...@outlook.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: RE: Is it possible to preserve chaining for multi-input operators?




Hi Viacheslav,



These are two very interesting questions…



You have found out about the chaining restriction to single input operators to 
be chained, it does also not help to union() multiple streams into a single 
input, they still count as multiple inputs.



  *   The harder way to go would be to patch the relevant parts of Flink to 
allow chaining with multiple inputs

     *   This is very complicated to get right, especially for the then 
multiple inputs and outputs that need to get façaded
     *   We once did it (successfully) and abandoned the idea because of its 
complexity and maintenance cost

  *   The other way might be to implement all into one 
org.apache.flink.streaming.api.operators.MultipleInputStreamOperator that 
allows to have any (reasonable) number of inputs, keyed, non-keyed, broadcast ; 
mixed …. Let me explain:

     *   From what you say I assume, that after the Kafka source you need to 
.keyBy() the instrument-id anyway, which means a shuffle and 
(de-/)serialization … unavoidable.
     *   However, after that shuffle, the MultipleInputStreamOperator could 
force-chain all your logic as long as it stays to be on the same key/partition 
domain
     *   Integration of broadcast inputs is a no-brainer there
     *   We do these things all the time and it really helps cutting down 
serialization cost, among other things
     *   This way does not necessarily help with keeping latency down, as more 
inputs means more time to round-robin the available inputs



I hope this helps



What do you think?



Regards



Thias











From: Viacheslav Chernyshev 
<v.chernys...@outlook.com<mailto:v.chernys...@outlook.com>>
Sent: Tuesday, February 28, 2023 1:06 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Is it possible to preserve chaining for multi-input operators?



Hi everyone,



My team is developing a streaming pipeline for analytics on top of market data. 
The ultimate goal is to be able to handle tens of millions of events per second 
distributed across the cluster according to the unique ID of a particular 
financial instrument. Unfortunately, we struggle with achieving acceptable 
performance. As far as I can see, Flink forcibly breaks operator chaining when 
it encounters a job graph node with multiple inputs. Subsequently, it severely 
affects the performance because a network boundary is enforced, and every event 
is forcibly serialised and deserialised.



From the pipeline graph perspective, the requirements are:

  *   Read data from multiple Kafka topics that are connected to different 
nodes in the graph.
  *   Broadcast a number of dynamic rules to the pipeline.

The cleanest way is to achieve the first goal is to have a bunch of 
KeyedCoProcessFunction operations. This design didn't work for us because the 
SerDe overhead added by broken chains was too high, we had to completely 
flatten the pipeline instead. Unfortunately, I can't find any way to solve the 
second problem. As soon as the broadcast stream is introduced into the 
pipeline, the performance tanks.



Is there any technique that I could possibly utilise to preserve the chaining?



Kind regards,

Viacheslav
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to