Hi all,
Very thanks Jark for the new scenarios. Based on the these new scenarios, I
think these scenarios and iteration should be able to represent a type of
scenarios that requires broadcasting events.
I also totally agree with Piotr that all the scenarios we have discussed should
be clearly
Hi all,
Thanks Yun for bringing this topic. I missed this discussion because of the
"multicast" title.
After reading the design, if I understand correctly, it is proposing a
custom event mach mechanism, i.e. broadcasting custom event.
It is a orthogonality topic with multicasting. So I would
Hi,
Before starting a work on the design doc, I would suggest to find someone to
shepherd this project. Otherwise this effort might drown among other parallel
things. I could take care of that from the runtime perspective, however most of
the changes are about the API and changes, which are
Hi Yun Gao,
Thanks a lot for your clarification.
Now that the notification of broadcast events requires alignment whose
implementation, in my opinion, will affect the correctness of synchronous
iterations, I prefer to postpone the discussion until you have completed
the design of the new
Hi Xiaogang,
Very thanks for also considering the iteration case! :) These points are
really important for iteration. As a whole, we are implementing a new iteration
library on top of Stream API. As a library, most of its implementation does not
need to touch Runtime layer, but it
Hi, Yun Gao
The discussion seems to move in a different direction, changing from
supporting multicasting to implementing new iteration libraries on data
streams.
Regarding the broadcast events in iterations, many details of new iteration
libraries are unclear,
1. How the iteration progress is
Hi,
Very thanks for all the points raised !
@Piotr For using another edge to broadcast the event, I think it may not be
able to address the iteration case. The primary problem is that with two edges
we cannot ensure the order of records. However, In the iteration case, the
broadcasted
Yes, glad to see that there is already a PR for such optimization.
Best,
Kurt
On Mon, Aug 26, 2019 at 6:59 PM Piotr Nowojski wrote:
> Hi,
>
> Xiaogang, those things worry me the most.
>
> 1. Regarding the broadcasting, doesn’t the BroadcastState [1] cover our
> issues? Can not we construct a
Hi,
Xiaogang, those things worry me the most.
1. Regarding the broadcasting, doesn’t the BroadcastState [1] cover our issues?
Can not we construct a job graph, where one operator has two outputs, one keyed
another broadcasted, which are wired together back to the
KeyedBroadcastProcessFunction
>From SQL's perspective, distributed cross join is a valid feature but not
very
urgent. Actually this discuss reminds me about another useful feature
(sorry
for the distraction):
when doing broadcast in batch shuffle mode, we can make each producer only
write one copy of the output data, but not
Thanks Yun for bringing up this discussion and very thanks for all the deep
thoughts!
For now, I think this discussion contains two scenarios: one if for
iteration library support and the other is for SQL join support. I think
both of the two scenarios are useful but they seem to have different
Hi all,
I also think that multicasting is a necessity in Flink, but more details
are needed to be considered.
Currently network is tightly coupled with states in Flink to achieve
automatic scaling. We can only access keyed states in keyed streams and
operator states in all streams.
In the
Hi Piotr,
Thanks for the explanation.
Agreed that the broadcastEmit(record) is a better choice for broadcasting
for the iterations.
As broadcasting for the iterations is the first motivation, let's support
it first.
Thanks,
Zhu Zhu
Yun Gao 于2019年8月23日周五 下午11:56写道:
> Hi Piotr,
>
>
Hi Piotr,
Very thanks for the suggestions!
Totally agree with that we could first focus on the broadcast scenarios
and exposing the broadcastEmit method first considering the semantics and
performance.
For the keyed stream, I also agree with that broadcasting keyed
Hi,
If the primary motivation is broadcasting (for the iterations) and we have no
immediate need for multicast (cross join), I would prefer to first expose
broadcast via the DataStream API and only later, once we finally need it,
support multicast. As I wrote, multicast would be more
Thanks Piotr,
Users asked for this feature sometimes ago when they migrating batch jobs
to Flink(Blink).
It's not very urgent as they have taken some workarounds to solve it.(like
partitioning data set to different job vertices)
So it's fine to not make it top priority.
Anyway, as a commonly
Hi,
Thanks for the answers :) Ok I understand the full picture now. +1 from my side
on solving this issue somehow. But before we start discussing how to solve it
one last control question:
I guess this multicast is intended to be used in blink planner, right? Assuming
that we implement the
Hi Piotr,
Thanks a lot for sharing the thoughts!
For the iteration, agree with that multicasting is not necessary.
Exploring the broadcast interface to Output of the operators in some way should
also solve this issue, and I think it should be even more convenient to have
Hi Piotr,
Yes you are right it's a distributed cross join requirement.
Broadcast join can help with cross join cases. But users cannot use it if
the data set to join is too large to fit into one subtask.
Sorry for left some details behind.
Thanks,
Zhu Zhu
Piotr Nowojski 于2019年8月23日周五
Hi Yun and Zhu Zhu,
Thanks for the more detailed example Zhu Zhu.
As far as I understand for the iterations example we do not need multicasting.
Regarding the Join example, I don’t fully understand it. The example that Zhu
Zhu presented has a drawback of sending both tables to multiple nodes.
Hi Piotr,
Thanks a lot for the suggestions!
The core motivation of this discussion is to implement a new iteration
library on the DataStream, and it requires to insert special records in the
stream to notify the progress of the iteration. The mechanism of such records
is
Hi Piotr,
The case is about a broadcast join:
A--\
+--(join)--> C
B--/
Usually we can broadcast A(the result that JobVertex A produces) to all
subtasks of C.
But in this case the size of A is too large to fit in one subtask of C.
Thus we have to partition A to (A_0, A_1, A_2, ..., A_m-1).
Hi,
Yun:
Thanks for proposing the idea. I have checked the document and left couple of
questions there, but it might be better to answer them here.
What is the exact motivation and what problems do you want to solve? We have
dropped multicast support from the network stack [1] for two
Thanks Yun for starting this discussion.
I think the multicasting can be very helpful in certain cases.
I have received requirements from users that they want to do broadcast
join, while the data set to broadcast is too large to fit in one task.
Thus the requirement turned out to be to support
24 matches
Mail list logo