回复:[DISCUSS] Improve broadcast serialization

2018-10-19 Thread Zhijiang(wangzhijiang999)
I agree with the additional thoughts of a), b) and c).

In all the current implementations of ChannelSelector, the selector channels 
are either one or all, so it makes sense for change the interface as you 
suggested if we will not extend other selectors for partial channels in future. 
And the single channel implementation would reduce some overheads in arrays and 
loop. For broadcast selector, it is no need to retrun channels from selector 
and we can make a shortcut process for this special implementation.

Comparing 3 vs 5, I still prefer 3 currently which can reuse the current 
network process. We only create one BufferBuilder for al thel channels and 
build separate BufferConsumer for every channel sharing the same BufferBuilder. 
To do so, we just need a few changes on RecordWriter side, do not touch the 
following components in network stack. And it will already gain most of the 
performance benefits by doing so, which copies serialization temporary buffer 
only once to one BufferBuilder.

I can first create the JIRA for single channel interface if you have not done 
that before, and then continue with copying step by step. :)

Best,
Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年10月18日(星期四) 17:47
收件人:Zhijiang(wangzhijiang999) 
抄 送:Nico Kruber ; dev 
主 题:Re: [DISCUSS] Improve broadcast serialization

Hey,

I also think that 3rd option is the most promising, however logic of “dirty” 
channels might be causing some overheads. I was also thinking about other 
option:

5. In case of ‘emit’ called on BroadcastRecordWriter, we could write it to 
common/shared BufferBuilder, but somehow marked it as targeted to only one 
channel - we would send it over the network to all of the receivers, but all 
except of one would ignore it. This might be easier to implement in 
BroadcastRecordWriter, but would require extra logic on the receiver side. With 
respect to the performance it also might be better compared to 3.

Couple of more thoughts:

a) if we select BroadcastRecordWriter, literally the only way how it can be 
polluted by non broadcast writes are latency markers via `randomEmit`. When 
choosing 3 vs 5, mixing broadcast and non broadcast happens very rarely, so we 
shouldn’t optimise for it, but pick something that’s easiest to implement.
b) there are no use cases where `ChannelSelector` returns anything else besides 
single channel or broadcast.

b) point brings me to one more thing. I was once playing with simplifying 
`ChannelSelector` interface by adding new one `SingleChannelSelector` with 
method:

`int selectChannel(T record, int numChannels);`

And it was resulting with ~10% performance speed up for network stack alone 
(overhead of creating singleton arrays and iterating over them). I didn’t 
follow up on this, because performance gain wasn’t super huge, while it 
complicated `RecordWriter`, since it had to handle both either 
`SingleChannelSelector` or `ChannelSelector`. Now that I realised that there 
are no use cases for selecting more then one, but not all of the channels and 
that anyway we go with broadcasting, we will have to special handle 
`BroadcastPartitioner`, that’s the perfect occasion to actually simplify the 
implementation and drop this multi channel ChannelSelector.

I think we should to this as a first step in a preparation before either 3. or 
5. (changing ChannelSelector signature to:

int selectChannel(T record, int numChannels);

)

What do you think?

Piotrek

On 18 Oct 2018, at 06:12, Zhijiang(wangzhijiang999) 
 wrote:
Hi Piotr,

Thanks for your replies and suggestions!

For my rough idea of skip index list, I agree with your concerns of performance 
for non-broadcast case and complicated implementation. Although I think this 
idea seems more unified in semantics for "emit", "broadcastEmit" and 
"randomEmit" APIs, maybe it is not worth going deep into it currently for 
global changes.

Currently RecordWriter provides three main methods to write elements in 
different semantics:

"broadcastEmit" would write the element to all the channels, used for watermark 
currently.
"randomEmit" would write the element to one random channel, used for latency 
marker currently.
"emit" would write the element to some channels via ChannelSelector, used for 
normal records currectly. And the selected channels may be one, some or all.

If we want to retain these APIs for different requirements, then the 
RecordWriter should not be aware of which kind of elements would be written via 
APIs, so we should not make any assumings in the implementation. In details, I 
know the "randomEmit" in only used for latency marker currently, but we can not 
confirm whether this API would be used for other elements in future, so we can 
not estimate how frequency is used for this API for different possiable 
elements which is my above concerns. I do not want to limit any future 

回复:Sharing state between subtasks

2018-10-18 Thread Zhijiang(wangzhijiang999)
Not yet. We only have some initial thoughts and have not worked on it yet. We 
will update the progress in this discussion if have.

Best,
Zhijiang
--
发件人:Aljoscha Krettek 
发送时间:2018年10月18日(星期四) 17:53
收件人:dev ; Zhijiang(wangzhijiang999) 

抄 送:Till Rohrmann 
主 题:Re: Sharing state between subtasks

Hi Zhijiang,

do you already have working code or a design doc for the second approach?

Best,
Aljoscha

> On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) 
>  wrote:
> 
> Just noticed this discussion from @Till Rohrmann's weekly community update 
> and I want to share some thoughts from our experiences.
> 
> We also encountered the source consuption skew issue before, and we are 
> focused on improving this by two possible ways.
> 
> 1. Control the read strategy by the downstream side. In detail, every input 
> channel in downstream task corresponds to the consumption of one upstream 
> source task, and we will tag each input channel with watermark to find the 
> lowest channel to read in high priority. In essence, we actually rely on the 
> mechanism of backpressure. If the channel with highest timestamp is not read 
> by downstream task for a while, it will block the corresponding source task 
> to read when the buffers are exhausted. It is no need to change the source 
> interface in this way, but there are two major concerns: first it will affect 
> the barier alignment resulting in checkpoint delayed or expired. Second it 
> can not confirm source consumption alignment very precisely, and it is just a 
> best effort way. So we gave up this way finally.
> 
> 2. Add the new component of SourceCoordinator to coordinate the source 
> consumption distributedly. For example we can start this componnet in the 
> JobManager like the current role of CheckpointCoordinator. Then every source 
> task would commnicate with JobManager via current RPC mechanism, maybe we can 
> rely on the heartbeat message to attach the consumption progress as the 
> payloads. The JobManagerwill accumulator or state all the reported progress 
> and then give responses for different source tasks. We can define a protocol 
> for indicating the fast soruce task to sleep for specific time for example. 
> To do so, the coordinator has the global informations to give the proper 
> decision for individuals, so it seems more precise. And it will not affect 
> the barrier alignment, because the sleeping fast source can release the lock 
> to emit barrier as normal. The only concern is the changes for source 
> interface and may affect all related source implementations.
> 
> Currently we prefer to the second way to implement and will refer to other 
> good points above. :)
> 
> Best,
> Zhijiang
> --
> 发件人:Jamie Grier 
> 发送时间:2018年10月17日(星期三) 03:28
> 收件人:dev 
> 主 题:Re: Sharing state between subtasks
> 
> Here's a doc I started describing some changes we would like to make
> starting with the Kinesis Source.. It describes a refactoring of that code
> specifically and also hopefully a pattern and some reusable code we can use
> in the other sources as well.  The end goal would be best-effort event-time
> synchronization across all Flink sources but we are going to start with the
> Kinesis Source first.
> 
> Please take a look and please provide thoughts and opinions about the best
> state sharing mechanism to use -- that section is left blank and we're
> especially looking for input there.
> 
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> 
> -Jamie
> 
> 
> On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann  wrote:
> 
>> But on the Kafka source level it should be perfectly fine to do what Elias
>> proposed. This is of course is not the perfect solution but could bring us
>> forward quite a bit. The changes required for this should also be minimal.
>> This would become obsolete once we have something like shared state. But
>> until then, I think it would worth a try.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek 
>> wrote:
>> 
>>> The reason this selective reading doesn't work well in Flink in the
>> moment
>>> is because of checkpointing. For checkpointing, checkpoint barriers
>> travel
>>> within the streams. If we selectively read from inputs based on
>> timestamps
>>> this is akin to blocking an input if that input is very far ahead in
>> event
>>> time, which can happen when you have a very fast source and a slow source
>>> (in event time), maybe because you're in a catchup phase. In those c

回复:Sharing state between subtasks

2018-10-18 Thread Zhijiang(wangzhijiang999)
Just noticed this discussion from @Till Rohrmann's weekly community update and 
I want to share some thoughts from our experiences.

We also encountered the source consuption skew issue before, and we are focused 
on improving this by two possible ways.

1. Control the read strategy by the downstream side. In detail, every input 
channel in downstream task corresponds to the consumption of one upstream 
source task, and we will tag each input channel with watermark to find the 
lowest channel to read in high priority. In essence, we actually rely on the 
mechanism of backpressure. If the channel with highest timestamp is not read by 
downstream task for a while, it will block the corresponding source task to 
read when the buffers are exhausted. It is no need to change the source 
interface in this way, but there are two major concerns: first it will affect 
the barier alignment resulting in checkpoint delayed or expired. Second it can 
not confirm source consumption alignment very precisely, and it is just a best 
effort way. So we gave up this way finally.

2. Add the new component of SourceCoordinator to coordinate the source 
consumption distributedly. For example we can start this componnet in the 
JobManager like the current role of CheckpointCoordinator. Then every source 
task would commnicate with JobManager via current RPC mechanism, maybe we can 
rely on the heartbeat message to attach the consumption progress as the 
payloads. The JobManagerwill accumulator or state all the reported progress and 
then give responses for different source tasks. We can define a protocol for 
indicating the fast soruce task to sleep for specific time for example. To do 
so, the coordinator has the global informations to give the proper decision for 
individuals, so it seems more precise. And it will not affect the barrier 
alignment, because the sleeping fast source can release the lock to emit 
barrier as normal. The only concern is the changes for source interface and may 
affect all related source implementations.

Currently we prefer to the second way to implement and will refer to other good 
points above. :)

Best,
Zhijiang
--
发件人:Jamie Grier 
发送时间:2018年10月17日(星期三) 03:28
收件人:dev 
主 题:Re: Sharing state between subtasks

Here's a doc I started describing some changes we would like to make
starting with the Kinesis Source.. It describes a refactoring of that code
specifically and also hopefully a pattern and some reusable code we can use
in the other sources as well.  The end goal would be best-effort event-time
synchronization across all Flink sources but we are going to start with the
Kinesis Source first.

Please take a look and please provide thoughts and opinions about the best
state sharing mechanism to use -- that section is left blank and we're
especially looking for input there.

https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing

-Jamie


On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann  wrote:

> But on the Kafka source level it should be perfectly fine to do what Elias
> proposed. This is of course is not the perfect solution but could bring us
> forward quite a bit. The changes required for this should also be minimal.
> This would become obsolete once we have something like shared state. But
> until then, I think it would worth a try.
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek 
> wrote:
>
> > The reason this selective reading doesn't work well in Flink in the
> moment
> > is because of checkpointing. For checkpointing, checkpoint barriers
> travel
> > within the streams. If we selectively read from inputs based on
> timestamps
> > this is akin to blocking an input if that input is very far ahead in
> event
> > time, which can happen when you have a very fast source and a slow source
> > (in event time), maybe because you're in a catchup phase. In those cases
> > it's better to simply not read the data at the sources, as Thomas said.
> > This is also because with Kafka Streams, each operator is basically its
> own
> > job: it's reading from Kafka and writing to Kafka and there is not a
> > complex graph of different operations with network shuffles in between,
> as
> > you have with Flink.
> >
> > This different nature of Flink is also why I think that readers need
> > awareness of other readers to do the event-time alignment, and this is
> > where shared state comes in.
> >
> > > On 10. Oct 2018, at 20:47, Elias Levy 
> > wrote:
> > >
> > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske 
> wrote:
> > >
> > >> I think the new source interface would be designed to be able to
> > leverage
> > >> shared state to achieve time alignment.
> > >> I don't think this would be possible without some kind of shared
> state.
> > >>
> > >> The problem of tasks that are far ahead in time cannot be solved with
> > >> back-pressure.
> > >> That's because a task cannot choose from 

回复:[DISCUSS] Improve broadcast serialization

2018-10-17 Thread Zhijiang(wangzhijiang999)
Hi Piotr,

Thanks for your replies and suggestions!

For my rough idea of skip index list, I agree with your concerns of performance 
for non-broadcast case and complicated implementation. Although I think this 
idea seems more unified in semantics for "emit", "broadcastEmit" and 
"randomEmit" APIs, maybe it is not worth going deep into it currently for 
global changes.

Currently RecordWriter provides three main methods to write elements in 
different semantics:

"broadcastEmit" would write the element to all the channels, used for watermark 
currently.
"randomEmit" would write the element to one random channel, used for latency 
marker currently.
"emit" would write the element to some channels via ChannelSelector, used for 
normal records currectly. And the selected channels may be one, some or all.

If we want to retain these APIs for different requirements, then the 
RecordWriter should not be aware of which kind of elements would be written via 
APIs, so we should not make any assumings in the implementation. In details, I 
know the "randomEmit" in only used for latency marker currently, but we can not 
confirm whether this API would be used for other elements in future, so we can 
not estimate how frequency is used for this API for different possiable 
elements which is my above concerns. I do not want to limit any future 
possibilities for these APIs caused by this improvement.

Considering the below suggestions:

1.  Inserting the elements via "randomEmit" in front of unfinished broadcast 
buffer will change the current sequence semantic. It may be not matter for 
latency marker currently, but may not be extented for future other elements.

2. If we easily implement "randomEmit" as the way of broadcast, I am wondering 
the broadcast storm in special cases and we also change the semantics to send 
the unnecessary elements for some channels.

3.  I prefer this way currently and it is similar with our previous discussion. 
And the implementation is more likely the way of current "broadcastEvent", 
which creates a new broadcast buffer for event, and finish the current buffer 
for all the channels before enqueuing this event buffer.

4. Yes, your sayings is write for current mode. And I want to pass a boolean 
parameter "isBroadcast" in the constructor of RecordWriter for indicating 
broadcast writes in specific processes, because the RecordWriter can not check 
ChannelSelector instance based on module dependency.

In conclusion, I want to implement this improvement based on the third point 
from current thoughting, which keeps the same behavior like normal "emit" 
mixing with "broadcastEvent".

Best,
Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年10月17日(星期三) 19:25
收件人:Zhijiang(wangzhijiang999) 
抄 送:Nico Kruber ; dev 
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

Regarding the second idea with skip index list, I would guess it might have bad 
performance impact in non broadcasting cases or would seriously complicate our 
Buffer implementation. Also it would make reading/data copying/slicing and 
other big chunk byte operations much more costly. Instead of memcpy whole 
buffer we would have to manually select the correct bits.

 > But I am just wondering if the switch is frequent between broadcasting and 
 > non-broadcasting operations

I haven't researched this topic any further then before. However my first guess 
would be that this switch doesn’t happen at all EXCEPT of `randomEmit` which is 
used for the latency markers (this statement requires further 
research/validation). Assuming that’s true.

1. Probably we can not/should not flush the broadcasted buffer, serialise 
randomEmit and flush it again, because this would prematurely emit latency 
marker - defeating it purpose and skewing the measured time. LatencyMarkers are 
expected to travel through pipeline at the exact same speed as regular records 
would.

2. Maybe we could just always broadcast the latency markers as well? This would 
be nice solution except of that at the level of RecordWriter we do not know 
whether this is latency marker or not - we no only that we were asked to 
“emit”, “randomEmit” or “broadcastEmit” and we have to handle them somehow 
(throwing exception?)

3. Assuming `randomEmit` or `emit` is rare, maybe we copy the broadcasted 
`BufferBuilder` into a new one, append there the record/latency marker so all 
except of one channel would share “broadcasted” BufferBuilder. Once we need to 
flush any of the buffers (either broadcasted or the “dirty” one) we flush them 
all and restart with all channels sharing a fresh new “broadcasted” 
BufferBuilder? 

4. For streaming isn't Broadcast currently realised via passing 
`BroadcastPartitioner` to the RecordWriter and using standard `emit` method? 
That would need to be chang

回复:[DISCUSS] Flink Cluster Overview Dashboard Improvement Proposal

2018-10-10 Thread Zhijiang(wangzhijiang999)
Thanks Fabian for proposing this topic.

It is very worth improving the web dashborad for showing more useful 
informations which can benefit flink users a lot.

Just two small personal concerns:
1. The start time and end time are already given, so it is easy to estimate the 
rough duration time. Is it necessary to show the duration information to occupy 
the space?
2. The job name given by users can be used for identification, and the job id 
is automatically generated in random. I am not sure whether this id is useful 
for further debugging. If not maybe we can ignore the job id from the dashboard?

Best,
Zhijiang
--
发件人:Jin Sun 
发送时间:2018年10月10日(星期三) 01:10
收件人:dev 
主 题:Re: [DISCUSS] Flink Cluster Overview Dashboard Improvement Proposal

Great job! That would very helpful for debug.


I would suggest to use small icons for this Job Manager/Managers when there are 
too many instances (like a thousand)
May be we can also introduce locality,  that task managers belongs to same rack 
shows together?




Small icons can be like this:




On Oct 9, 2018, at 8:49 PM, Till Rohrmann  wrote:
mation on the front
page. Your mock looks really promising to me since it shows some basic
metrics and cluster information at a glance. Apart from the the source
input and sink output metrics, all other required information should be
available to display it in the dashboard. Thus, your proposal should only
affect flink-runtime-web which should make it easier to realize.

I'm in favour of adding this feature to Flink's dashboard to make it
available to the whole community.



回复:[DISCUSS] [Contributing] (2) - Review Steps

2018-10-09 Thread Zhijiang(wangzhijiang999)
+1
--
发件人:vino yang 
发送时间:2018年10月9日(星期二) 14:08
收件人:dev 
主 题:Re: [DISCUSS] [Contributing] (2) - Review Steps

+1

Peter Huang  于2018年10月9日周二 下午1:54写道:

> +1
>
> On Mon, Oct 8, 2018 at 7:47 PM Thomas Weise  wrote:
>
> > +1
> >
> >
> > On Mon, Oct 8, 2018 at 7:36 PM Tzu-Li Chen  wrote:
> >
> > > +1
> > >
> > > Jin Sun  于2018年10月9日周二 上午2:10写道:
> > >
> > > > +1, look forward to see the change.
> > > >
> > > > > On Oct 9, 2018, at 12:07 AM, Fabian Hueske 
> > wrote:
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > > Since we have addressed all comments (please raise your voice if
> > > not!), I
> > > > > would like to move forward and convert the proposal [1] into a page
> > for
> > > > > Flink's website [2].
> > > > > I will create a pull request against the website repo [3].
> > > > >
> > > > > Once the page got merged, we can start posting the review form on
> new
> > > > pull
> > > > > requests.
> > > > >
> > > > > Best, Fabian
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1yaX2b9LNh-6LxrAmE23U3D2cRbocGlGKCYnvJd9lVhk
> > > > > [2] https://flink.apache.org
> > > > > [3] https://github.com/apache/flink-web
> > > > >
> > > > > Am Di., 25. Sep. 2018 um 17:56 Uhr schrieb Tzu-Li Chen <
> > > > wander4...@gmail.com
> > > > >> :
> > > > >
> > > > >> I agree with Chesnay that we don't guarantee (quick) review of a
> PR
> > at
> > > > the
> > > > >> project level. As ASF statement[1]:
> > > > >>
> > > > >>> Please show some patience with the developers if your patch is
> not
> > > > >> applied as fast as you'd like or a developer asks you to make
> > changes
> > > to
> > > > >> the patch. If you do not receive any feedback in a reasonable
> amount
> > > of
> > > > >> time (say a week or two), feel free to send a follow-up e-mail to
> > the
> > > > >> developer list. Open Source developers are all volunteers, often
> > doing
> > > > the
> > > > >> development in their spare time.
> > > > >>
> > > > >> However, an open source community shows its friendliness to
> > > > contributors.
> > > > >> Thus contributors believe their contribution would be take care
> of,
> > > > even be
> > > > >> rejected with a reason; project members are thought kind to
> provide
> > > > help to
> > > > >> the process.
> > > > >>
> > > > >> Just like this thread kicked off, it is glad to see that Flink
> > > community
> > > > >> try best to help its contributors and committers, then take
> > advantage
> > > of
> > > > >> "open source".
> > > > >>
> > > > >> Best,
> > > > >> tison.
> > > > >>
> > > > >> [1] http://www.apache.org/dev/contributors#patches
> > > > >>
> > > > >>
> > > > >> Chesnay Schepler  于2018年9月25日周二 下午11:21写道:
> > > > >>
> > > > >>> There is no guarantee that a PR will be looked at nor is it
> > possible
> > > to
> > > > >>> provide this in any way on the project level.
> > > > >>>
> > > > >>> As far as Apache is concerned all contributors/committers etc.
> work
> > > > >>> voluntarily, and
> > > > >>> as such assigning work (which includes ownership if it implies
> > such)
> > > or
> > > > >>> similar is simply not feasible.
> > > > >>>
> > > > >>> On 25.09.2018 16:54, Thomas Weise wrote:
> > > >  I think that all discussion/coordination related to a
> > contribution /
> > > > PR
> > > >  should be handled through the official project channel.
> > > > 
> > > >  I would also prefer that there are no designated "owners" and
> > > > >> "experts",
> > > >  for the reasons Fabian mentioned.
> > > > 
> > > >  Ideally there is no need to have "suggested reviewers" either,
> but
> > > > then
> > > >  what will be the process to ensure that PRs will be looked at?
> > > > 
> > > >  Thanks,
> > > >  Thomas
> > > > 
> > > > 
> > > > 
> > > >  On Tue, Sep 25, 2018 at 6:17 AM Tzu-Li Chen <
> wander4...@gmail.com
> > >
> > > > >>> wrote:
> > > > 
> > > > > Hi Fabian,
> > > > >
> > > > > You convinced me. I miss the advantage we can take from mailing
> > > > lists.
> > > > >
> > > > > Now I am of the same opinion.
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > >
> > > > > Fabian Hueske  于2018年9月25日周二 下午3:01写道:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> I think questions about Flink should be posted on the public
> > > mailing
> > > > > lists
> > > > >> instead of asking just a single expert.
> > > > >>
> > > > >> There's many reasons for that:
> > > > >> * usually more than one person can answer the question (what
> if
> > > the
> > > > > expert
> > > > >> is not available?)
> > > > >> * non-committers can join the discussion and contribute to the
> > > > >>> community
> > > > >> (how can they become experts otherwise?)
> > > > >> * the knowledge is shared on the mailing list (helps in cases
> > when
> > > > >> only
> > > > > one
> > > > >> 

回复:[DISCUSS] Dropping flink-storm?

2018-09-28 Thread Zhijiang(wangzhijiang999)
Very agree with to drop it. +1
--
发件人:Jeff Carter 
发送时间:2018年9月29日(星期六) 10:18
收件人:dev 
抄 送:chesnay ; Till Rohrmann ; user 

主 题:Re: [DISCUSS] Dropping flink-storm?

+1 to drop it.

On Fri, Sep 28, 2018, 7:25 PM Hequn Cheng  wrote:

> Hi,
>
> +1 to drop it. It seems that few people use it.
>
> Best, Hequn
>
> On Fri, Sep 28, 2018 at 10:22 PM Chesnay Schepler 
> wrote:
>
> > I'm very much in favor of dropping it.
> >
> > Flink has been continually growing in terms of features, and IMO we've
> > reached the point where we should cull some of the more obscure ones.
> > flink-storm, while interesting from a theoretical standpoint, offers too
> > little value.
> >
> > Note that the bolt/spout wrapper parts of the part are still compatible,
> > it's only topologies that aren't working.
> >
> > IMO compatibility layers only add value if they ease the migration to
> > Flink APIs.
> > * bolt/spout wrappers do this, but they will continue to work even if we
> > drop it
> > * topologies don't do this, so I'm not interested in then.
> >
> > On 28.09.2018 15:22, Till Rohrmann wrote:
> > > Hi everyone,
> > >
> > > I would like to discuss how to proceed with Flink's storm
> > > compatibility layer flink-strom.
> > >
> > > While working on removing Flink's legacy mode, I noticed that some
> > > parts of flink-storm rely on the legacy Flink client. In fact, at the
> > > moment flink-storm does not work together with Flink's new distributed
> > > architecture.
> > >
> > > I'm also wondering how many people are actually using Flink's Storm
> > > compatibility layer and whether it would be worth porting it.
> > >
> > > I see two options how to proceed:
> > >
> > > 1) Commit to maintain flink-storm and port it to Flink's new
> architecture
> > > 2) Drop flink-storm
> > >
> > > I doubt that we can contribute it to Apache Bahir [1], because once we
> > > remove the legacy mode, this module will no longer work with all newer
> > > Flink versions.
> > >
> > > Therefore, I would like to hear your opinion on this and in particular
> > > if you are using or planning to use flink-storm in the future.
> > >
> > > [1] https://github.com/apache/bahir-flink
> > >
> > > Cheers,
> > > Till
> >
> >
> >
>



回复:Codespeed deployment for Flink

2018-09-25 Thread Zhijiang(wangzhijiang999)
Thanks @Piotr Nowojski  and @Nico Kruber for the good job!

I already benefit from this benchmark in the previous PRs. Wish the 
visualization tool becoming stronger to benefit more for the community!

Best,
Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年9月21日(星期五) 22:59
收件人:dev 
抄 送:Nico Kruber 
主 题:Codespeed deployment for Flink

Hello community,

For almost a year in data Artisans Nico and I were maintaining a setup
that continuously evaluates Flink with benchmarks defined at
https://github.com/dataArtisans/flink-benchmarks 
. With growing interest
and after proving useful a couple of times, we have finally decided to
publish the web UI layer of this setup. Currently it is accessible via
the following (maybe not so?) temporarily url:

http://codespeed.dak8s.net:8000 

This is a simple web UI to present performance changes over past and
present commits to Apache Flink. It only has a couple of views and the
most useful ones are:

1. Timeline
2. Comparison (I recommend to use normalization)

Timeline is useful for spotting unintended regressions or unexpected
improvements. It is being updated every six hours.
Comparison is useful for comparing a given branch (for example a pending
PR) with the master branch. More about that later.

The codespeed project on it’s own is just a presentation layer. As
mentioned before, the only currently available benchmarks are defined in
the flink-benchmarks repository and they are executed periodically or on
demand by Jenkins on a single bare metal machine. The current setup
limits us only to micro benchmarks (they are easier to
setup/develop/maintain and have a quicker feedback loop compared to
cluster benchmarks) but there is no reason preventing us from setting up 
other kinds of benchmarks and upload their results to our codespeed 
instance as well.

Regarding the comparison view. Currently data Artisans’ Flink mirror
repository at https://github.com/dataArtisans/flink 
 is configured to
trigger benchmark runs on every commit/change that happens on the
benchmark-request branch (We chose to use dataArtisans' repository here
because we needed a custom GitHub hook that we couldn’t add to the
apache/flink repository). Benchmarking usually takes between one and two
hours. One obvious limitation at the moment is that there is only one
comparison view, with one comparison branch, so trying to compare two
PRs at the same time is impossible. However we can tackle
this problem once it will become a real issue, not only a theoretical one.

Piotrek & Nico



回复:[PROPOSAL] [community] A more structured approach to reviews and contributions

2018-09-17 Thread Zhijiang(wangzhijiang999)
From my personal experience as a contributor for three years, I feel better 
experience in contirbuting or reviewing than before, although we still have 
some points for further progress.

I reviewed the proposal doc, and it gives very constructive and meaningful 
guides which could help both contributor and reviewer. I agree with the bove 
suggestions and wish they can be praticed well!

Best,
Zhijiang
--
发件人:Till Rohrmann 
发送时间:2018年9月17日(星期一) 16:27
收件人:dev 
主 题:Re: [PROPOSAL] [community] A more structured approach to reviews and 
contributions

Thanks for writing this up Stephan. I like the steps and hope that it will
help the community to make the review process better. Thus, +1 for putting
your proposal to practice.

Cheers,
Till

On Mon, Sep 17, 2018 at 10:00 AM Stephan Ewen  wrote:

> Hi Flink community members!
>
> As many of you will have noticed, the Flink project activity has gone up
> again quite a bit.
> There are many more contributions, which is an absolutely great thing to
> have :-)
>
> However, we see a continuously growing backlog of pull requests and JIRA
> issues.
> To make sure the community will be able to handle the increased volume, I
> think we need to revisit some
> approaches and processes. I believe there are a few opportunities to
> structure things a bit better, which
> should help to scale the development.
>
> The first thing I would like to bring up are *Pull Request Reviews*. Even
> though more community members being
> active in reviews (which is a really great thing!) the Pull Request backlog
> is increasing quite a bit.
>
> Why are pull requests still not merged faster? Looking at the reviews, one
> thing I noticed is that most reviews deal
> immediately with detailed code issues, and leave out most of the core
> questions that need to be answered
> before a Pull Request can be merged, like "is this a desired feature?" or
> "does this align well with other developments?".
> I think that we even make things slightly worse that way: From my personal
> experience, I have often thought "oh, this
> PR has a review already" and rather looked at another PR, only to find
> later that the first review did never decide whether
> this PR is actually a good fit for Flink.
>
> There has never been a proper documentation of how to answer these
> questions, what to evaluate in reviews,
> guidelines for how to evaluate pull requests, other than code quality. I
> suspect that this is why so many reviewers
> do not address the "is this a good contribution" questions, making pull
> requests linger until another committers joins
> the review.
>
> Below is an idea for a guide *"How to Review Contributions"*. It outlines
> five core aspects to be checked in every
> pull request, and suggests a priority for clarifying those. The idea is
> that this helps us to better structure reviews, and
> to make each reviewer aware what we look for in a review and where to best
> bring in their help.
>
> Looking forward to comments!
>
> Best,
> Stephan
>
> 
>
> The draft is in this Google Doc. Please add small textual comments to the
> doc, and bigger principle discussions as replies to this mail.
>
>
> https://docs.google.com/document/d/1yaX2b9LNh-6LxrAmE23U3D2cRbocGlGKCYnvJd9lVhk/edit?usp=sharing
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *How to Review Contributions--This guide is for
> all committers and contributors that want to help with reviewing
> contributions. Thank you for your effort - good reviews are one the most
> important and crucial parts of an open source project. This guide should
> help the community to make reviews such that: - Contributors have a good
> contribution experience- Reviews are structured and check all important
> aspects of a contribution- Make sure we keep a high code quality in Flink-
> We avoid situations where contributors and reviewers spend a lot of time to
> refine a contribution that gets rejected laterReview ChecklistEvery review
> needs to check the following five aspects. We encourage to check these
> aspects in order, to avoid spending time on detailed code quality reviews
> when there is not yet consensus that a feature or change should be actually
> be added.(1) Is there consensus whether the change of feature should go
> into to Flink?For bug fixes, this needs to be checked only in case it
> requires bigger changes or might break existing programs and
> setups.Ideally, this question is already answered from a JIRA issue or the
> dev-list discussion, except in cases of bug fixes and small lightweight
> additions/extensions. In that case, this question can be immediately marked
> as resolved. For pull requests that are created without prior consensus,
> this question needs to be answered as part of the review.The decision
> whether the change should go into Flink 

回复:[DISCUSS] Proposal of external shuffle service

2018-09-11 Thread Zhijiang(wangzhijiang999)
Many thanks Till!

I would create a JIRA for this feature and design a document attched with it. 
I will let you know after ready! :)

Best,
Zhijiang


--
发件人:Till Rohrmann 
发送时间:2018年9月7日(星期五) 22:01
收件人:Zhijiang(wangzhijiang999) 
抄 送:dev 
主 题:Re: [DISCUSS] Proposal of external shuffle service

The rough plan sounds good Zhijiang. I think we should continue with what
you've proposed: Open a JIRA issue and creating a design document which
outlines the required changes a little bit more in detail. Once this is
done, we should link the design document in the JIRA issue and post it here
for further discussion.

Cheers,
Till

On Wed, Aug 29, 2018 at 6:04 PM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Glad to receive your positive feedbacks Till!
>
> Actually our motivation is to support batch job well as you mentioned.
>
> For output level, flink already has the Subpartition abstraction(writer),
> and currently there are PipelinedSubpartition(memory output) and
> SpillableSubpartition(one-sp-one-file output) implementations. We can
> extend this abstraction to realize other persistent outputs (e.g.
> sort-merge-file).
>
> For transport level(shuffle service), the current SubpartitionView
> abstraction(reader) seems as the brige linked with the output level, then
> the view can understand and read the different output formats. The current
> NetworkEnvironment seems take the role of internal shuffle service in
> TaskManager and the transport server is realized by netty inside. This
> component can also be started in other external containers like NodeManager
> of yarn to take the role of external shuffle service. Further we can
> abstract to extend the shuffle service for transporting outputs by http or
> rdma instead of current netty.  This abstraction should provide the way for
> output registration in order to read the results correctly, similar with
> current SubpartitionView.
>
> The above is still a rough idea. Next I plan to create a feature jira to
> cover the related changes if possible. It would be better if getting help
> from related committers to review the detail designs together.
>
> Best,
> Zhijiang
>
> ------
> 发件人:Till Rohrmann 
> 发送时间:2018年8月29日(星期三) 17:36
> 收件人:dev ; Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com>
> 主 题:Re: [DISCUSS] Proposal of external shuffle service
>
> Thanks for starting this design discussion Zhijiang!
>
> I really like the idea to introduce a ShuffleService abstraction which
> allows to have different implementations depending on the actual use case.
> Especially for batch jobs I can clearly see the benefits of persisting the
> results somewhere else.
>
> Do you already know which interfaces we need to extend and where to
> introduce new abstractions?
>
> Cheers,
> Till
>
> On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999)
>  wrote:
> Hi all!
>
> The shuffle service is responsible for transporting upstream produced data
> to the downstream side. In flink, the NettyServer is used for network
> transport service and this component is started in the TaskManager process.
> That means the TaskManager can support internal shuffle service which
> exists some concerns:
> 1. If a task finishes, the ResultPartition of this task still retains
> registered in TaskManager, because the output buffers have to be
> transported by internal shuffle service in TaskManager. That means the
> TaskManager can not be released by ResourceManager until ResultPartition
> released. It may waste container resources and can not support well for
> dynamic resource scenarios.
> 2. If we want to expand another shuffle service implementation, the
> current mechanism is not easy to handle, because the output level (result
> partition) and transport level (shuffle service) are not divided clearly
> and loss of abstraction to be extended.
>
> For above considerations, we propose the external shuffle service which
> can be deployed on any other external contaienrs, e.g. NodeManager
> container in yarn. Then the TaskManager can be released ASAP ifneeded when
> all the internal tasks finished. The persistent output files of these
> finished tasks can be served to transport by external shuffle service in
> the same machine.
>
> Further we can abstract both of the output level and transport level to
> support different implementations. e.g. We realized merging the data of all
> the subpartitions into limited persistent local files for disk improvements
> in some scenarios instead of one-subpartiton-one-file.
>
> I know it may be a big work for doing this, and I just point out some
> ideas, and wish getting any feedbacks from you!
>
> Best,
> Zhijiang
>
>
>



回复:[ANNOUNCE] New committer Gary Yao

2018-09-11 Thread Zhijiang(wangzhijiang999)
Congratulations Gary!  :)
--
发件人:Piotr Nowojski 
发送时间:2018年9月10日(星期一) 16:31
收件人:dev 
主 题:Re: [ANNOUNCE] New committer Gary Yao

Congratulations :)

> On 8 Sep 2018, at 07:03, Rong Rong  wrote:
> 
> Congratulations Gary!!
> 
> On Fri, Sep 7, 2018 at 7:38 PM bupt_ljy  wrote:
> 
>> Congratulations Gary!
>> 
>> 
>> Jiayi
>> 
>> 
>> Original Message
>> Sender:vino yangyanghua1...@gmail.com
>> Recipient:dev...@flink.apache.org
>> Date:Saturday, Sep 8, 2018 10:11
>> Subject:Re: [ANNOUNCE] New committer Gary Yao
>> 
>> 
>> Congratulations Gary! Chen Qin qinnc...@gmail.com 于2018年9月8日周六 上午2:07写道:
>> Congrats!   ChenOn Sep 7, 2018, at 10:51, Xingcan Cui
>> xingc...@gmail.com wrote: Congratulations, Gary! Xingcan On
>> Sep 7, 2018, at 11:20 PM, Hequn Cheng chenghe...@gmail.com wrote:
>> Congratulations Gary! Hequn On Fri, Sep 7, 2018 at 11:16 PM
>> Matthias J. Sax mj...@apache.org  wrote: Congrats! On 09/07/2018
>> 08:15 AM, Timo Walther wrote:   Congratulations, Gary! Timo   Am
>> 07.09.18 um 16:46 schrieb Ufuk Celebi:   Great addition to the committers.
>> Congrats, Gary! – Ufuk   On Fri, Sep 7, 2018 at 4:45 PM, Kostas
>> Kloudas   k.klou...@data-artisans.com wrote:   Congratulations Gary! Well
>> deserved! Cheers,   Kostas On Sep 7, 2018, at 4:43 PM, Fabian
>> Hueske fhue...@gmail.com  wrote: Congratulations Gary! 2018-09-07
>> 16:29 GMT+02:00 Thomas Weise t...@apache.org: Congrats, Gary! On
>> Fri, Sep 7, 2018 at 4:17 PM Dawid Wysakowicz   dwysakow...@apache.org
>> wrote: Congratulations Gary! Well deserved! On 07/09/18 16:00,
>> zhangmingleihe wrote:   Congrats Gary! Cheers   Minglei 在
>> 2018年9月7日,下午9:59,Andrey Zagrebin   and...@data-artisans.com 写道:
>> Congratulations Gary! On 7 Sep 2018, at 15:45, Stefan Richter
>> s.rich...@data-artisans.com   wrote:   Congrats Gary! Am 07.09.2018
>> um 15:14 schrieb Till Rohrmann   trohrm...@apache.org   :   Hi
>> everybody, On behalf of the PMC I am delighted to announce Gary Yao as
>> a   new   Flink   committer! Gary started contributing to the project
>> in June 2017. He  helped   with   the   Flip-6 implementation, implemented
>> many of the new REST   handlers,   fixed   Mesos issues and initiated the
>> Jepsen-based distributed test   suite   which   uncovered several serious
>> issues. Moreover, he actively helps   community   members on the mailing
>> list and with PR reviews. Please join me in congratulating Gary for
>> becoming a Flink   committer!   Cheers,   Till



回复:[DISCUSS] Proposal of external shuffle service

2018-08-29 Thread Zhijiang(wangzhijiang999)
Glad to receive your positive feedbacks Till! 

Actually our motivation is to support batch job well as you mentioned.

For output level, flink already has the Subpartition abstraction(writer), and 
currently there are PipelinedSubpartition(memory output) and 
SpillableSubpartition(one-sp-one-file output) implementations. We can extend 
this abstraction to realize other persistent outputs (e.g. sort-merge-file).

For transport level(shuffle service), the current SubpartitionView 
abstraction(reader) seems as the brige linked with the output level, then the 
view can understand and read the different output formats. The current 
NetworkEnvironment seems take the role of internal shuffle service in 
TaskManager and the transport server is realized by netty inside. This 
component can also be started in other external containers like NodeManager of 
yarn to take the role of external shuffle service. Further we can abstract to 
extend the shuffle service for transporting outputs by http or rdma instead of 
current netty.  This abstraction should provide the way for output registration 
in order to read the results correctly, similar with current SubpartitionView.

The above is still a rough idea. Next I plan to create a feature jira to cover 
the related changes if possible. It would be better if getting help from 
related committers to review the detail designs together.

Best,
Zhijiang


--
发件人:Till Rohrmann 
发送时间:2018年8月29日(星期三) 17:36
收件人:dev ; Zhijiang(wangzhijiang999) 

主 题:Re: [DISCUSS] Proposal of external shuffle service

Thanks for starting this design discussion Zhijiang!

I really like the idea to introduce a ShuffleService abstraction which allows 
to have different implementations depending on the actual use case. Especially 
for batch jobs I can clearly see the benefits of persisting the results 
somewhere else.

Do you already know which interfaces we need to extend and where to introduce 
new abstractions?

Cheers,
Till
On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999) 
 wrote:
Hi all!

 The shuffle service is responsible for transporting upstream produced data to 
the downstream side. In flink, the NettyServer is used for network transport 
service and this component is started in the TaskManager process. That means 
the TaskManager can support internal shuffle service which exists some concerns:
 1. If a task finishes, the ResultPartition of this task still retains 
registered in TaskManager, because the output buffers have to be transported by 
internal shuffle service in TaskManager. That means the TaskManager can not be 
released by ResourceManager until ResultPartition released. It may waste 
container resources and can not support well for dynamic resource scenarios.
 2. If we want to expand another shuffle service implementation, the current 
mechanism is not easy to handle, because the output level (result partition) 
and transport level (shuffle service) are not divided clearly and loss of 
abstraction to be extended.

 For above considerations, we propose the external shuffle service which can be 
deployed on any other external contaienrs, e.g. NodeManager container in yarn. 
Then the TaskManager can be released ASAP ifneeded when all the internal tasks 
finished. The persistent output files of these finished tasks can be served to 
transport by external shuffle service in the same machine.

 Further we can abstract both of the output level and transport level to 
support different implementations. e.g. We realized merging the data of all the 
subpartitions into limited persistent local files for disk improvements in some 
scenarios instead of one-subpartiton-one-file.

 I know it may be a big work for doing this, and I just point out some ideas, 
and wish getting any feedbacks from you!

 Best,
 Zhijiang



[DISCUSS] Proposal of external shuffle service

2018-08-27 Thread Zhijiang(wangzhijiang999)
Hi all!

The shuffle service is responsible for transporting upstream produced data to 
the downstream side. In flink, the NettyServer is used for network transport 
service and this component is started in the TaskManager process. That means 
the TaskManager can support internal shuffle service which exists some concerns:
1. If a task finishes, the ResultPartition of this task still retains 
registered in TaskManager, because the output buffers have to be transported by 
internal shuffle service in TaskManager. That means the TaskManager can not be 
released by ResourceManager until ResultPartition released. It may waste 
container resources and can not support well for dynamic resource scenarios.
2. If we want to expand another shuffle service implementation, the current 
mechanism is not easy to handle, because the output level (result partition) 
and transport level (shuffle service) are not divided clearly and loss of 
abstraction to be extended.

For above considerations, we propose the external shuffle service which can be 
deployed on any other external contaienrs, e.g. NodeManager container in yarn. 
Then the TaskManager can be released ASAP ifneeded when all the internal tasks 
finished. The persistent output files of these finished tasks can be served to 
transport by external shuffle service in the same machine.

Further we can abstract both of the output level and transport level to support 
different implementations. e.g. We realized merging the data of all the 
subpartitions into limited persistent local files for disk improvements in some 
scenarios instead of one-subpartiton-one-file.

I know it may be a big work for doing this, and I just point out some ideas, 
and wish getting any feedbacks from you!

Best,
Zhijiang

回复:[DISCUSS] Improve broadcast serialization

2018-07-19 Thread Zhijiang(wangzhijiang999)
Ok, that is fine. :)

I will create JIRA today and submit the PR next week.

Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年7月19日(星期四) 17:52
收件人:Zhijiang(wangzhijiang999) 
抄 送:Nico Kruber ; dev 
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

I have only noticed your second response after sending my email :) 

Ok, now I think we are on the same page :) I think you can work on 2.1 and 
later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira 
issues/PRs please CC me.

Piotrek  

On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) 
 wrote:
Hi Piotr

1. I agree with we should discuss higher level first and focus on 
implementation on jira/pr. As long as RecordSerializer does not maintain the 
BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the 
RecordWriter at any time.  And I think it is the precondition to improve 
serializing only once for multi channels, otherwise we have to select 
serializer based on target channel index.

2. I already corrected this thought in last reply, maybe you have not seen it 
before you reply. :)  
We can break the broadcast improvement into two steps:
2.1 Serialize the record into temporary byte buffer only once for multi 
selected channels. (currently serialize many times)
2.2 Copy the temporary byte buffer into BufferBuilder only once and create 
different BufferConsumers based on the same BufferBuilder for each channel. 
(currently copy many times)
Regarding 2.1, just the same as your proposal[c], it is worth to do currently 
and can get good benefits I think.
Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to 
flush/finish last broadcast BufferBuilder for current non-broadcast writes and 
vice versa. I agree with your proposal[2] for this issue, and we can further 
consider it in future, maybe there are other better ways for avoiding it.

4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your 
proposal[c] which has no problem for mixed write mode, so no need additional 
flush. The 2.2 is just as your proposal[2] which concerns additional flush. 
Maybe my last reply make you misunderstand.

I can submit jira for above 2.1 first if no other concerns, thanks for the 
helpful advice. :)

Best,

Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年7月18日(星期三) 20:04
收件人:Zhijiang(wangzhijiang999) ; Nico Kruber 

抄 送:dev 
主 题:Re: [DISCUSS] Improve broadcast serialization


Hi 

1. I want to define a new AbstractRecordWriter as base class which defines some 
abstract methods and utility codes. The current RecordWriter used for other 
partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner 
will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are 
extactly as you showed below, but for current RecordWriter it also only needs 
one RecordSerializer if we make the RecordSerializer has no internal state.

Lets first discuss what we would like to have/implement on higher level and 
later focus on implementation details. Regarding making RecordSerializer 
stateless, there were some discussions about it previously and it was on our 
TODO list but I don’t remember what was holding us back. Maybe Nico will 
remember?


2. You pointed the key problem that how to handle `randomEmit` in 
BroadcastRecordWriter, and I think this process may resue the `emit` logic in 
current RecordWriter. Then the `emit` and `broadcastEmit` logics in 
BroadcastRecordWriter will serialize data only once and copy to BufferBuilder 
only once. So this improvement is deterministic for BroadcastPartitioner.


What logic to reuse do you have in mind? 
4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast 
partitioner, we can also do as you suggested in option [2], but it has to 
finish/flush the previous BufferBuilder generated by common `emit` operation. 
So it may bring bad impacts on buffer utility which was improved well in 
event-driven flush feature. So I am not sure whether it is worth doing 
`broadcastEmit` improvement in RecordWriter.


The whole point of my proposal [c] was to avoid the need to flush. Code would 
need a little bit more refactoring but it should look something like this:

void broadcastEmit(record):
 serializedRecord = serializer.serialize(record)
 for bufferBuilder in bufferBuilders:
 bufferBuilder.append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue 
writing

void emit(record, channel)
 serializedRecord = serializer.serialize(record)
 bufferBuilders[channel].append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue 
writing

I do not see here a need for additional flushes and it should be strict 
improvement over current code base.


I already realized the demo covering above 1,2,5 before. I can create jiras 
after we reach a final

回复:[DISCUSS] Improve broadcast serialization

2018-07-18 Thread Zhijiang(wangzhijiang999)
Hi Piotr

1. I agree with we should discuss higher level first and focus on 
implementation on jira/pr. As long as RecordSerializer does not maintain the 
BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the 
RecordWriter at any time.  And I think it is the precondition to improve 
serializing only once for multi channels, otherwise we have to select 
serializer based on target channel index.

2. I already corrected this thought in last reply, maybe you have not seen it 
before you reply. :)  
We can break the broadcast improvement into two steps:
2.1 Serialize the record into temporary byte buffer only once for multi 
selected channels. (currently serialize many times)
2.2 Copy the temporary byte buffer into BufferBuilder only once and create 
different BufferConsumers based on the same BufferBuilder for each channel. 
(currently copy many times)
Regarding 2.1, just the same as your proposal[c], it is worth to do currently 
and can get good benefits I think.
Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to 
flush/finish last broadcast BufferBuilder for current non-broadcast writes and 
vice versa. I agree with your proposal[2] for this issue, and we can further 
consider it in future, maybe there are other better ways for avoiding it.

4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your 
proposal[c] which has no problem for mixed write mode, so no need additional 
flush. The 2.2 is just as your proposal[2] which concerns additional flush. 
Maybe my last reply make you misunderstand.

I can submit jira for above 2.1 first if no other concerns, thanks for the 
helpful advice. :)

Best,

Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年7月18日(星期三) 20:04
收件人:Zhijiang(wangzhijiang999) ; Nico Kruber 

抄 送:dev 
主 题:Re: [DISCUSS] Improve broadcast serialization


Hi 

1. I want to define a new AbstractRecordWriter as base class which defines some 
abstract methods and utility codes. The current RecordWriter used for other 
partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner 
will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are 
extactly as you showed below, but for current RecordWriter it also only needs 
one RecordSerializer if we make the RecordSerializer has no internal state.

Lets first discuss what we would like to have/implement on higher level and 
later focus on implementation details. Regarding making RecordSerializer 
stateless, there were some discussions about it previously and it was on our 
TODO list but I don’t remember what was holding us back. Maybe Nico will 
remember?


2. You pointed the key problem that how to handle `randomEmit` in 
BroadcastRecordWriter, and I think this process may resue the `emit` logic in 
current RecordWriter. Then the `emit` and `broadcastEmit` logics in 
BroadcastRecordWriter will serialize data only once and copy to BufferBuilder 
only once. So this improvement is deterministic for BroadcastPartitioner.


What logic to reuse do you have in mind? 
4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast 
partitioner, we can also do as you suggested in option [2], but it has to 
finish/flush the previous BufferBuilder generated by common `emit` operation. 
So it may bring bad impacts on buffer utility which was improved well in 
event-driven flush feature. So I am not sure whether it is worth doing 
`broadcastEmit` improvement in RecordWriter.


The whole point of my proposal [c] was to avoid the need to flush. Code would 
need a little bit more refactoring but it should look something like this:

void broadcastEmit(record):
 serializedRecord = serializer.serialize(record)
 for bufferBuilder in bufferBuilders:
 bufferBuilder.append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue 
writing

void emit(record, channel)
 serializedRecord = serializer.serialize(record)
 bufferBuilders[channel].append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue 
writing

I do not see here a need for additional flushes and it should be strict 
improvement over current code base.


I already realized the demo covering above 1,2,5 before. I can create jiras 
after we reach a final agreement, then maybe you can help review PR if have 
time. :)


Sure :)

Piotrek

Best,

Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年7月18日(星期三) 16:37
收件人:dev ; Zhijiang(wangzhijiang999) 

主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

Couple of more thoughts

a) I’m not sure if you would have to modify current RecordWriter at all. You 
could extract interface from current RecordWriter and just provide two 
implementations: current one and BroadcastRecordWriter. I’m not sure, but it 
doesn’t seem like they would duplicate/share lots of code

回复:[DISCUSS] Improve broadcast serialization

2018-07-18 Thread Zhijiang(wangzhijiang999)
Hi Piotr,

Your thoughts bring me more inspirations and possibilities.

1. I want to define a new AbstractRecordWriter as base class which defines some 
abstract methods and utility codes. The current RecordWriter used for other 
partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner 
will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are 
extactly as you showed below, but for current RecordWriter it also only needs 
one RecordSerializer if we make the RecordSerializer has no internal state.

2. You pointed the key problem that how to handle `randomEmit` in 
BroadcastRecordWriter, and I think this process may resue the `emit` logic in 
current RecordWriter. Then the `emit` and `broadcastEmit` logics in 
BroadcastRecordWriter will serialize data only once and copy to BufferBuilder 
only once. So this improvement is deterministic for BroadcastPartitioner.

3. As for `emit` improvement in RecordWriter for non-broadcast partitioner, if 
the record is emitted to multi channels(not only all channels), we can 
serialized data only once and then copy to multi BufferBuilder as you suggested 
in option [c]. I think this improvement is also deterministic for saving 
serialization cost.

4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast 
partitioner, we can also do as you suggested in option [2], but it has to 
finish/flush the previous BufferBuilder generated by common `emit` operation. 
So it may bring bad impacts on buffer utility which was improved well in 
event-driven flush feature. So I am not sure whether it is worth doing 
`broadcastEmit` improvement in RecordWriter.

5. The current StreamRecordWriter is redundant to some extent. It only 
maintains the `flusher` compared with RecordWriter, and this flusher can also 
be maintained by RecordWriter as well. Because the current `flushAlways` 
maintained by RecordWriter and `flusher` in StreamRecordWriter are both 
inferred by timeout parameter. But this may be out of the scope of 
serialization improvement. :)  The motivation isthe new BroadcastRecordWriter 
can used for both stream and batch job.

In summary,  the above 1-3 is clear to do from theory aspect. But considering 
specific implementation, we may need think more for graceful codes, especially 
for the above (2).

I already realized the demo covering above 1,2,5 before. I can create jiras 
after we reach a final agreement, then maybe you can help review PR if have 
time. :)


Best,

Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年7月18日(星期三) 16:37
收件人:dev ; Zhijiang(wangzhijiang999) 

主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

Couple of more thoughts

a) I’m not sure if you would have to modify current RecordWriter at all. You 
could extract interface from current RecordWriter and just provide two 
implementations: current one and BroadcastRecordWriter. I’m not sure, but it 
doesn’t seem like they would duplicate/share lots of code. 
BroadcastRecordWriter would have fields:

private final RecordSerializer serializers;

private final Optional bufferBuilder;

Compared to RecordWriter’s arrays.

b) One thing that I noticed now are latency markers and randomEmit method. It 
prevents us from implementing option [1]. BroadcastRecordWriter would have to 
flush all channels on randomEmit (as I proposed in option [2]).

c) Another option to optimise broadcast writes (or for that matter all multi 
channel writes), would be to serialise record only once to 
SpanningRecordSerializer#serializationBuffer, but copy it multiple times to 
separate BufferBuilders. That would save us much more then half of the overhead 
(serialisation is more costly compared to data copying), while we would avoid 
problems with uneven state of channels. There would be no problems with mixed 
broadcast/non broadcast writes, this option could support both of them at the 
same time - in other words, it would be as generic as the current one.

d) Regarding StreamRecordWriter, other option is, that it could be refactored 
to a class implementing extracted RecordWriter interface and being a 
proxy/wrapper around another RecordWriter instance:

Class StreamRecordWriter implements RecordWriter {
  private final RecordWrtier recordWriter; //either broadcast or non broadcast 
  public void foo() {
recordWriter.foo();
  }
}

To be honest I’m not sure at the moment which one would be better [2] or [c]. 
In ideal world, we might want to replace current RecordWriter with [c] and 
after that (if that’s not enough) to implement [2] on top of [c]. 

Piotrek

> On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) 
>  wrote:
> 
> Hi Piotr,
> 
> Thanks for your replies and professional suggestions!
> 
> My initial thought is just as you said in first suggestion. The current 
> RecordWriter will emit StreamRecord to some subpartition via ChannelSelector 
> or broadcast events/watermark

回复:[DISCUSS] Improve broadcast serialization

2018-07-17 Thread Zhijiang(wangzhijiang999)
Hi Piotr,

Thanks for your replies and professional suggestions!

My initial thought is just as you said in first suggestion. The current 
RecordWriter will emit StreamRecord to some subpartition via ChannelSelector or 
broadcast events/watermark to all subpartitions directly.
If the ChannelSelector implementation is BroadcastPartitioner, then we can 
create a specialized BroadcastRecordWriter to handle the 'emit', 
'broadcastEmit', 'broadcastEvent', etc.
To make it seems not tricky, I want to abstract the RecordWriter as a plugin, 
then implement a BroadcastRecordWriter and NonBroadcastRecordWriter separately 
to extend abstract RecordWriter. That means we divide the RecordWriter by 
ChannelSelector, and also we may remove current StreamRecordWriter to uniform 
the RecordWriter criteria in both stream and batch mode.

Considering specific implementations, I think one RecordSerializer can work for 
both BroadcastRecordWriter and NonBroadcastRecordWriter, but the precondition 
is making the RecordSerializer has no internal state, so we have to remove the 
BufferBuilder variable from SpanningRecordSerializer and pass it via 
addRecord/continueWritingWithNextBufferBuilder
 methods from RecordWriter. BroadcastRecordWriter only needs maintain one 
BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need 
maintain one BufferBuilder per subpartition.

Another issue is whether this improvement is suitable for 
broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in suggestion 
2,3. I wonder it may decrease the buffer utilization if switch between 
broadcast and non-broadcast modes, even it may seem more tricky in 
implementation. I am still thinking of it.

Maybe we can implement the improvement for BroadcastPartitioner in first step 
and make sure one RecordSerializer for all subpartitions. That can reduce the 
memory overhead in RecordSerializer and the time cost in broadcast 
serialization scenarios.

Best,

Zhijiang


--
发件人:Piotr Nowojski 
发送时间:2018年7月17日(星期二) 23:31
收件人:dev ; Zhijiang(wangzhijiang999) 

主 题:Re: [DISCUSS] Improve broadcast serialization

Hi

Generally speaking this would be a nice optimisation, however it might be 
tricky to implement. The thing to keep in mind is that currently interface 
allow to interleave broadcasting and normal sending, because of that at any 
given time some serialisers can have more data then others. For example when we 
have two output channels and we are looping following writes:

Write sth to 1. Channel
Broadcast to all channels
Write sth to 1. Channel
Broadcast to all channels
Write sth to 1. Channel
Broadcast to all channels
(…)

Thus buffers of different channels can fill out with different rates.

> In theory every record can be serialized only once and referenced for all the 
> subpartitions in broadcast mode.

The problem here is that after records serialising, the only unit that can be 
referenced afterwards is “Buffer”. So that would leave us now with couple of 
options:

1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, 
guaranteeing that all channels always receive the same data. Here you could 
serialise records only once, to one BufferBuilder that could be shared and 
referenced by multiple BufferConsumers from different channels. Any non 
broadcast write would have to fail.

2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. 
for broadcasts, but for any non broadcast write: finish current broadcasting 
BufferBuilder, flush all data on all channels, serialise single record to 
single channel using newly create BufferBuilder and also immediately 
finish/flush it, so that any subsequent broadcasts will work again as in 1.:

3. Similar as 2, but lazily switch between broadcasting and non-broadcasting 
modes. It would have two modes of operating that could be switched back and 
forth: the same as currently implemented for non-broadcasted and optimised 
broadcast mode

Broadcast to all channels
Broadcast to all channels
Broadcast to all channels
Broadcast to all channels
Write sth to X Channel // this flushes all channels and clears/finishes 
previous BufferBuilder 
Write sth to Y Channel
Write sth to Y Channel
Write sth to Y Channel
Write sth to X Channel 
Broadcast to all channels // this flushes all channels and clears/finishes 
previous BufferBuilders, 
Broadcast to all channels
Broadcast to all channels
(…)

However both in 2. and 3. there would be very big penalty for mixing broadcast 
with normal writes.  

Piotrek

> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) 
>  wrote:
> 
> Hi all,
> 
> In current implementation, the RecordSerializer is created separately for 
> each subpartition in RecordWriter, that means the number of serializers 
> equals to the number of subpartitions.
> For broadcast partitioner, every record will be serialized many times in all 
> t

[DISCUSS] Improve broadcast serialization

2018-07-13 Thread Zhijiang(wangzhijiang999)
Hi all,

In current implementation, the RecordSerializer is created separately for each 
subpartition in RecordWriter, that means the number of serializers equals to 
the number of subpartitions.
For broadcast partitioner, every record will be serialized many times in all 
the subpartitions, and this may bring bad performance to some extent.
In theory every record can be serialized only once and referenced for all the 
subpartitions in broadcast mode.

To do so, I propose the following changes:
1. Create and maintain only one serializer in RecordWriter, and it will 
serialize the record for all the subpartitions. It makes sense for any 
partitioners, and the memory overhead can be also decreased, because every 
serializer will maintain some separate byte buffers internally.
2. Maybe we can abstract the RecordWriter as a base class used for other 
partitioner mode and implement a BroadcastRecordWriter for 
BroadcastPartitioner. And this new implementation will add buffer references 
based on the number of subpartitions before adding into subpartition queue.
3. Maybe we can remove StreamRecordWriter by migrating flusher from it to 
RecordWriter, then the uniform RecordWriter can be used for both stream and 
batch. The above BroadcastRecordWriter can aslo uniform for both stream and 
batch.

I am not sure whether this improvement is proposed before and what do you think 
of it?
If necessary I can create JIRAs to contirbute it, and may need one commiter 
cooperate with me.

Best,

Zhijiang

回复:[DISCUSS] Release Flink 1.5.1

2018-07-05 Thread Zhijiang(wangzhijiang999)
I have reviewed FLINK-9676, wish it merged soon.

Zhijiang
--
发件人:Chesnay Schepler 
发送时间:2018年7月5日(星期四) 15:58
收件人:dev@flink.apache.org ; wangzhijiang999 
; zjffdu ; Till Rohrmann 

主 题:Re: [DISCUSS] Release Flink 1.5.1

Building the binary releases overnight failed due to a configuration 
mistake on my side.

Till has informed me that FLINK-9676 might occur more often than we 
initially suspected.
A PR to address the issue was already opened by Nico.
Given that I have to restart the process anyway today I'm delaying the 
release for a few hours
so we have a chance to get the fix in.

On 04.07.2018 11:21, Chesnay Schepler wrote:
> Alrighty, looks like we're in agreement for a soon release.
>
> Let's take inventory of issues to still include:
>
> - [FLINK-9554] flink scala shell doesn't work in yarn mode
> I'm currently taking a look and will probably merge it by the end of 
> today.
>
> - [FLINK-9676] Deadlock during canceling task and recycling exclusive 
> buffer
> Till is taking a look to gauge how critical/fixable it is; we may not 
> fix it in this release.
>
> - [FLINK-9707] LocalFileSystem does not support concurrent directory 
> creations
> Till opened a PR that I'm reviewing at the moment.
>
> - [FLINK-9693] Set Execution#taskRestore to null after deployment
> Till opened a PR, i guess i can take a look unless anyone else wants to.
>
> I will cut the release-branch this evening; this should be enough time 
> to fix the above issues. (except maybe FLINK-9676 of course)
>
> On 02.07.2018 12:19, Chesnay Schepler wrote:
>> Hello,
>>
>> it has been a little over a month since we've release 1.5.0. Since 
>> then we've addressed 56 JIRAs [1] for the 1.5 branch, including 
>> stability enhancement to the new execution mode (FLIP-6), fixes for 
>> critical issues in the metric system, but also features that didn't 
>> quite make it into 1.5.0 like FLIP-6 support for the scala-shell.
>>
>> I think now is a good time to start thinking about a 1.5.1 release, 
>> for which I would volunteer as the release manager.
>>
>> There are a few issues that I'm aware of that we should include in 
>> the release [3], but I believe these should be resolved within the 
>> next days.
>> So that we don't overlap with with proposed 1.6 release [2] we 
>> ideally start the release process this week.
>>
>> What do you think?
>>
>> [1] https://issues.apache.org/jira/projects/FLINK/versions/12343053
>>
>> [2] 
>> https://lists.apache.org/thread.html/1b8b0e627739d1f01b760fb722a1aeb2e786eec09ddd47b8303faadb@%3Cdev.flink.apache.org%3E
>>
>> [3]
>>
>> - https://issues.apache.org/jira/browse/FLINK-9280
>> - https://issues.apache.org/jira/browse/FLINK-8785
>> - https://issues.apache.org/jira/browse/FLINK-9567
>>
>
>



回复:[DISCUSS] Release Flink 1.5.1

2018-07-02 Thread Zhijiang(wangzhijiang999)
Hi Chesnay,

Agree with your proposal.  I submitted a jira FLINK-9676 related with deadlock 
issue.
I think it needs to be confirmed whether to be covered in this release or later.

Zhijiang
--
发件人:Chesnay Schepler 
发送时间:2018年7月2日(星期一) 18:19
收件人:dev@flink.apache.org 
主 题:[DISCUSS] Release Flink 1.5.1

Hello,

it has been a little over a month since we've release 1.5.0. Since then 
we've addressed 56 JIRAs [1] for the 1.5 branch, including stability 
enhancement to the new execution mode (FLIP-6), fixes for critical 
issues in the metric system, but also features that didn't quite make it 
into 1.5.0 like FLIP-6 support for the scala-shell.

I think now is a good time to start thinking about a 1.5.1 release, for 
which I would volunteer as the release manager.

There are a few issues that I'm aware of that we should include in the 
release [3], but I believe these should be resolved within the next days.
So that we don't overlap with with proposed 1.6 release [2] we ideally 
start the release process this week.

What do you think?

[1] https://issues.apache.org/jira/projects/FLINK/versions/12343053

[2] 
https://lists.apache.org/thread.html/1b8b0e627739d1f01b760fb722a1aeb2e786eec09ddd47b8303faadb@%3Cdev.flink.apache.org%3E

[3]

- https://issues.apache.org/jira/browse/FLINK-9280
- https://issues.apache.org/jira/browse/FLINK-8785
- https://issues.apache.org/jira/browse/FLINK-9567



回复:[ANNOUNCE] New committer Piotr Nowojski

2018-06-26 Thread Zhijiang(wangzhijiang999)
Congrats Piotr! 
Feel good to cooperate with you before. :)

Zhijiang
--
发件人:Ufuk Celebi 
发送时间:2018年6月24日(星期日) 01:25
收件人:dev 
主 题:Re: [ANNOUNCE] New committer Piotr Nowojski

Congrats and welcome Piotr! :-)

– Ufuk


On Sat, Jun 23, 2018 at 3:54 AM, zhangminglei <18717838...@163.com> wrote:
> Congrats Piotr!
>
> Cheers
> Minglei
>> 在 2018年6月23日,上午3:26,Till Rohrmann  写道:
>>
>> Hi everybody,
>>
>> On behalf of the PMC I am delighted to announce Piotr Nowojski as a new
>> Flink
>> committer!
>>
>> Piotr has been an active member of our community for more than a year.
>> Among other things, he contributed the TwoPhaseCommitSink, worked
>> extensively on improving Flink's network stack and is now contributing to
>> stream SQL. He is also helping the community by reviewing PRs, answering
>> questions and driving discussions on the mailing list.
>>
>> Please join me in congratulating Piotr for becoming a Flink committer!
>>
>> Cheers,
>> Till
>
>



回复:[ANNOUNCE] Two new committers: Xingcan Cui and Nico Kruber

2018-05-09 Thread Zhijiang(wangzhijiang999)
Congratulations, Xingcan and Nico !
Nico is a good PR reviewer and I gained a lot from him. 
:)--发件人:Fabian 
Hueske 发送时间:2018年5月9日(星期三) 02:53收件人:dev 
主 题:[ANNOUNCE] Two new committers: Xingcan Cui and Nico 
Kruber
Hi everyone,

I'm happy to announce that two members of the Flink community accepted the
offer of the PMC to become committers.

* Xingcan Cui has been contributing to Flink for about a year, focusing on
Flink's relational APIs (SQL & Table API). In the past year, Xingcan has
started design discussions, helped reviewing several pull requests, and
replied to questions on the user mailing list.

* Nico Kruber is an active contributor since 1.5 years and worked mostly on
internal features, such as the blob manager and a new network stack. Nico
answers many questions on the user mailing list, reports lots of bugs and
is a very active PR reviewer.

Please join me in congratulating Xingcan and Nico.

Cheers,
Fabian



回复:[DISCUSS] Releasing Flink 1.4

2017-10-18 Thread Zhijiang(wangzhijiang999)
   Hi Stephan!
   Thanks for replying.
   I am clearly now, all the big new features would be covered in 1.5. And 
it is very necessary to fully test big changes before release.
   Zhijiang
--发件人:Stephan 
Ewen <step...@data-artisans.com>发送时间:2017年10月18日(星期三) 22:54收件人:Till Rohrmann 
<trohrm...@apache.org>抄 送:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) 
<wangzhijiang...@aliyun.com>主 题:Re: [DISCUSS] Releasing Flink 1.4
Hi Zhijiang!
The proposal is to release 1.4 asap and have the master switch so 1.5 soon 
(early November or so). Merge the network code directly after that - the 1.5 
release would come after flip-6, network stack, and local state caching is 
merged and tested.
That gives us a better QA window.
The reason is that we got feedback from Netflix and some other users that 1.3 
had too many bugs, because we merged big changes directly before the release. 
We are worried that users lose trust in Flink of we release these big new 
features too fast and without intensively testing them.
What do you think?
Stephan
On Oct 18, 2017 09:27, "Till Rohrmann" <trohrm...@apache.org> wrote:
Thanks for driving this effort Aljoscha. I would be in favour of quick feature 
freeze (31st of October). However, looking at the list of open and in progress 
blockers we still have 53 blocker issues. I think it is unrealistic to complete 
all of them within the next two weeks. I would therefore propose to go again 
over the list of blockers to decide what is really a blocker for 1.4 and what 
not.
@Zhijiang, my gut feeling is that we will have this feature in Flink 1.5. There 
are simply too many other open issues which we have to address first in order 
to finish the 1.4 release. Moreover, merging it early in the release cycle of 
1.5 will give the barrier alignment improvements a good amount of exposure to 
spot potential problems before releasing it officially.
Cheers,Till
On Wed, Oct 18, 2017 at 10:00 AM, Zhijiang(wangzhijiang999) 
<wangzhijiang...@aliyun.com> wrote:
@Stephan Ewen , do you think the barrier alignment improvement should be 
covered in release-1.4 or release-1.5?It can avoid spilling data on receiver 
side for exactly-once and speed the barrier alignment.

We have implemented this feature based on new credit-based flow control and 
verified the good performance.If needed, I will try to submit the related PRs 
after network stack merged into 1.4 ASAP.

--发件人:Aljoscha 
Krettek <aljos...@apache.org>发送时间:2017年10月18日(星期三) 03:04收件人:dev 
<dev@flink.apache.org>主 题:Re: [DISCUSS] Releasing Flink 1.4
It seems we all agree on the general plan. Now we have to talk actual dates. I 
propose an aggressive feature-freeze date of October 31st (that would be in 2 
weeks) so that we can release 1.4 early and can go full-speed with the features 
moved to 1.5.

We can still merge in new features during the next two weeks but I propose that 
we then cut a release branch, start testing, and only merge bug fixes until the 
release. What do you think?


> On 17. Oct 2017, at 08:37, Piotr Nowojski <pi...@data-artisans.com> wrote:

> 
> +1

> 

> I would only try to merge as many of the smaller network stack improvements 
>as possible for 1.4, since they give quite big performance improvement.

> 

> Piotrek

> 

>> On 16 Oct 2017, at 17:42, Eron Wright <eronwri...@gmail.com> wrote:

>> 

>> +1 from our side on this plan.

>> 

>> On Mon, Oct 16, 2017 at 3:33 AM, Fabian Hueske <fhue...@gmail.com> wrote:

>> 

>>> OK, sounds good to me.

>>> 

>>> We have a couple of bugs to fix for the Table API / SQL but have PRs for

>>> most of them.

>>> 

>>> There's only one major issue that I'd like to include in 1.4.0 which is a

>>> refactoring of the TableSource interface.

>>> This effort has already started and is currently waiting for reviews /

>>> comments.

>>> I'm quite confident that we can get it in within the next two weeks.

>>> 

>>> Cheers, Fabian

>>> 

>>> 2017-10-16 10:22 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:

>>> 

>>>> @Bowen I started marking essential stuff as blocking (with fixVersion

>>>> 1.4.0). You're right, that we should start moving things to 1.5.0 that

>>> are

>>>> not blocking and that we don't think will make it into 1.4.0. I think we

>>>> can only release 1.4.0 if there are 0 (zero) unresolved issues with

>>>> fixVersion 1.4.0.

>>>> 

>>>>> On 14. Oct 2017, at 07:34, Alexandru Gutan <alex.guta...@gmail.com>

>>>> wrote:

>>>>> 

>>

回复:[DISCUSS] Releasing Flink 1.4

2017-10-18 Thread Zhijiang(wangzhijiang999)
Thanks for replying @Till Rohrmann , and I agree with your suggestion.
Cheers,Zhijiang
--发件人:Till 
Rohrmann <trohrm...@apache.org>发送时间:2017年10月18日(星期三) 16:27收件人:dev 
<dev@flink.apache.org>; Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>抄 
送:Stephan Ewen <step...@data-artisans.com>主 题:Re: [DISCUSS] Releasing Flink 1.4
Thanks for driving this effort Aljoscha. I would be in favour of quick
feature freeze (31st of October). However, looking at the list of open and
in progress blockers we still have 53 blocker issues. I think it is
unrealistic to complete all of them within the next two weeks. I would
therefore propose to go again over the list of blockers to decide what is
really a blocker for 1.4 and what not.

@Zhijiang, my gut feeling is that we will have this feature in Flink 1.5.
There are simply too many other open issues which we have to address first
in order to finish the 1.4 release. Moreover, merging it early in the
release cycle of 1.5 will give the barrier alignment improvements a good
amount of exposure to spot potential problems before releasing it
officially.

Cheers,
Till

On Wed, Oct 18, 2017 at 10:00 AM, Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> @Stephan Ewen , do you think the barrier alignment improvement should be
> covered in release-1.4 or release-1.5?It can avoid spilling data on
> receiver side for exactly-once and speed the barrier alignment.
> We have implemented this feature based on new credit-based flow control
> and verified the good performance.If needed, I will try to submit the
> related PRs after network stack merged into 1.4 ASAP.
> --发件人:Aljoscha
> Krettek <aljos...@apache.org>发送时间:2017年10月18日(星期三) 03:04收件人:dev <
> dev@flink.apache.org>主 题:Re: [DISCUSS] Releasing Flink 1.4
> It seems we all agree on the general plan. Now we have to
> talk actual dates. I propose an aggressive feature-freeze
> date of October 31st (that would be in 2 weeks) so that
> we can release 1.4 early and can go full-speed with the
> features moved to 1.5.
>
> We can still merge in new features during the next two
> weeks but I propose that we then cut a release branch,
> start testing, and only merge bug fixes until the release.
> What do you think?
>
> > On 17. Oct 2017, at 08:37, Piotr Nowojski <pi...@data-artisans.com
> > wrote:
> >
> > +1
> >
> > I would only try to merge as many of the smaller
> network stack improvements as possible for 1.4, since they
> give quite big performance improvement.
> >
> > Piotrek
> >
> >> On 16 Oct 2017, at 17:42, Eron Wright <eronwri...@gmail.com> wrote:
> >>
> >> +1 from our side on this plan.
> >>
> >> On Mon, Oct 16, 2017 at 3:33 AM, Fabian Hueske <fhue...@gmail.com
> > wrote:
> >>
> >>> OK, sounds good to me.
> >>>
> >>> We have a couple of bugs to fix for the Table API /
> SQL but have PRs for
> >>> most of them.
> >>>
> >>> There's only one major issue that I'd like to
> include in 1.4.0 which is a
> >>> refactoring of the TableSource interface.
> >>> This effort has already started and is currently waiting for reviews /
> >>> comments.
> >>> I'm quite confident that we can get it in within the next two weeks.
> >>>
> >>> Cheers, Fabian
> >>>
> >>> 2017-10-16 10:22 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
> >>>
> >>>> @Bowen I started marking essential stuff as blocking (with fixVersion
> >>>> 1.4.0). You're right, that we should start moving
> things to 1.5.0 that
> >>> are
> >>>> not blocking and that we don't think will make it into
> 1.4.0. I think we
> >>>> can only release 1.4.0 if there are 0 (zero) unresolved issues with
> >>>> fixVersion 1.4.0.
> >>>>
> >>>>> On 14. Oct 2017, at 07:34, Alexandru Gutan <alex.guta...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> great
> >>>>>
> >>>>> On 13 October 2017 at 18:02, Zhijiang(wangzhijiang999) <
> >>>>> wangzhijiang...@aliyun.com> wrote:
> >>>>>
> >>>>>> totally agree with the way.--
> >>>>>> 发件人:Stephan Ewen <
> >>>> se...@apache.org
> >>>>>>> 发送时间:2017年10月13日(星期五) 21:29收件人:dev@flink.apach

回复:[DISCUSS] Releasing Flink 1.4

2017-10-18 Thread Zhijiang(wangzhijiang999)
@Stephan Ewen , do you think the barrier alignment improvement should be 
covered in release-1.4 or release-1.5?It can avoid spilling data on receiver 
side for exactly-once and speed the barrier alignment.
We have implemented this feature based on new credit-based flow control and 
verified the good performance.If needed, I will try to submit the related PRs 
after network stack merged into 1.4 ASAP.
--发件人:Aljoscha 
Krettek <aljos...@apache.org>发送时间:2017年10月18日(星期三) 03:04收件人:dev 
<dev@flink.apache.org>主 题:Re: [DISCUSS] Releasing Flink 1.4
It seems we all agree on the general plan. Now we have to talk actual dates. I 
propose an aggressive feature-freeze date of October 31st (that would be in 2 
weeks) so that we can release 1.4 early and can go full-speed with the features 
moved to 1.5.

We can still merge in new features during the next two weeks but I propose that 
we then cut a release branch, start testing, and only merge bug fixes until the 
release. What do you think?

> On 17. Oct 2017, at 08:37, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> +1
> 
> I would only try to merge as many of the smaller network stack improvements 
>as possible for 1.4, since they give quite big performance improvement.
> 
> Piotrek
> 
>> On 16 Oct 2017, at 17:42, Eron Wright <eronwri...@gmail.com> wrote:
>> 
>> +1 from our side on this plan.
>> 
>> On Mon, Oct 16, 2017 at 3:33 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>> 
>>> OK, sounds good to me.
>>> 
>>> We have a couple of bugs to fix for the Table API / SQL but have PRs for
>>> most of them.
>>> 
>>> There's only one major issue that I'd like to include in 1.4.0 which is a
>>> refactoring of the TableSource interface.
>>> This effort has already started and is currently waiting for reviews /
>>> comments.
>>> I'm quite confident that we can get it in within the next two weeks.
>>> 
>>> Cheers, Fabian
>>> 
>>> 2017-10-16 10:22 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>>> 
>>>> @Bowen I started marking essential stuff as blocking (with fixVersion
>>>> 1.4.0). You're right, that we should start moving things to 1.5.0 that
>>> are
>>>> not blocking and that we don't think will make it into 1.4.0. I think we
>>>> can only release 1.4.0 if there are 0 (zero) unresolved issues with
>>>> fixVersion 1.4.0.
>>>> 
>>>>> On 14. Oct 2017, at 07:34, Alexandru Gutan <alex.guta...@gmail.com>
>>>> wrote:
>>>>> 
>>>>> great
>>>>> 
>>>>> On 13 October 2017 at 18:02, Zhijiang(wangzhijiang999) <
>>>>> wangzhijiang...@aliyun.com> wrote:
>>>>> 
>>>>>> totally agree with the way.--
>>>>>> 发件人:Stephan Ewen <
>>>> se...@apache.org
>>>>>>> 发送时间:2017年10月13日(星期五) 21:29收件人:dev@flink.apache.org <
>>>> dev@flink.apache.org>主
>>>>>> 题:Re: [DISCUSS] Releasing Flink 1.4
>>>>>> I am in favor of doing this, if we can set it up in the following way.
>>>>>> 
>>>>>> - We put out the 1.4 release now, as Till and Aljoscha suggested. A
>>>>>> stable cut before the fundamental changes go in.
>>>>>> 
>>>>>> - We merge the very big changes (FLIP-6, Network stack, localized
>>> state
>>>>>> restore, etc). directly (or very soon) after.
>>>>>> - We try to stabilize these changes and release 1.5 asap after that.
>>>>>> Ideally Around end of year or so.
>>>>>> 
>>>>>> The reason I am bringing this up is that I know various users waiting
>>>> very
>>>>>> much for FLIP-6 and Network Stack enhancements. Given that these
>>> issues
>>>>>> were flagged for release 1.4, the users were planning to have them
>>>> rather
>>>>>> soon.
>>>>>> 
>>>>>> Stephan
>>>>>> 
>>>>>> 
>>>>>> On Fri, Oct 13, 2017 at 2:35 PM, Aljoscha Krettek <
>>> aljos...@apache.org>
>>>>>> wrote:
>>>>>> 
>>>>>>> +1 Excellent
>>>>>>> 
>>>>>>> I'd like to volunteer as release manager. I already set
>>>>>> up a Kanban board
>>>>>>> to m

回复:[DISCUSS] Releasing Flink 1.4

2017-10-13 Thread Zhijiang(wangzhijiang999)
totally agree with the 
way.--发件人:Stephan
 Ewen 发送时间:2017年10月13日(星期五) 21:29收件人:dev@flink.apache.org 
主 题:Re: [DISCUSS] Releasing Flink 1.4
I am in favor of doing this, if we can set it up in the following way.

  - We put out the 1.4 release now, as Till and Aljoscha suggested. A
stable cut before the fundamental changes go in.

  - We merge the very big changes (FLIP-6, Network stack, localized state
restore, etc). directly (or very soon) after.
  - We try to stabilize these changes and release 1.5 asap after that.
Ideally Around end of year or so.

The reason I am bringing this up is that I know various users waiting very
much for FLIP-6 and Network Stack enhancements. Given that these issues
were flagged for release 1.4, the users were planning to have them rather
soon.

Stephan


On Fri, Oct 13, 2017 at 2:35 PM, Aljoscha Krettek 
wrote:

> +1 Excellent
>
> I'd like to volunteer as release manager. I already set up a Kanban board
> to monitor the open blocking (and non-blocking) issues for 1.4, though this
> is independent of me volunteering as release manager. We should all go over
> these issues and see which ones should actually be blocking and which ones
> are not yet on that list.
>
> > On 13. Oct 2017, at 12:24, Renjie Liu  wrote:
> >
> > Cool!!!
> >
> > On Fri, Oct 13, 2017 at 5:49 PM Till Rohrmann 
> wrote:
> >
> >> Hi all,
> >>
> >> I want to revive the discussion about releasing Flink 1.4 [1] and the
> set
> >> of features to include.
> >>
> >> The gist of the previous discussion was that we postponed the feature
> >> freeze for 1.4 in order to include some more features which were being
> >> developed. By now, we have completed a good set of features such as
> exactly
> >> once Kafka producer, reduced dependency footprint, Hadoop-free Flink and
> >> many bug fixes. I believe that these features will make good release and
> >> users are already waiting for them.
> >>
> >> Some of the other features which we wanted to include, mainly Flip-6, to
> >> some extent the network stack enhancements and the state decoupling
> still
> >> need some more time. Since these features are major changes to Flink's
> >> runtime, it would be in my opinion a good idea to cut a stable release
> with
> >> the above-mentioned feature set now and give the engine features a bit
> more
> >> time to ripen and be properly tested.
> >>
> >> Therefore, I would actually be in favour of aiming for a quick release
> >> meaning that we now concentrate mainly on fixing bugs and critical
> issues.
> >> Moreover, I'm optimistic that the delayed features will be completed
> soon
> >> such that we can deliver them with the next release. What do you think?
> >>
> >> [1]
> >>
> >> http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/DISCUSS-Flink-1-4-and-time-based-release-td19331.html
> >>
> >> Cheers,
> >> Till
> >>
> > --
> > Liu, Renjie
> > Software Engineer, MVAD
>
>



回复:[DISCUSS] A more thorough Pull Request check list and template

2017-07-18 Thread Zhijiang(wangzhijiang999)
From my side, I also like the new template which contains more useful 
information, and both the reviewer and contributor may get benefits from it.
For my previous pull requests as a contributor, I always listed the purpose of 
change and change log, but rarely mentioned how to verify the change and 
affected components.These are really necessary and shoule not be ignored in the 
pull request.  So the template can indeed help contributor think more when 
submitting pull request.
If I review other pull requests, I also want to see these sections before code 
review.
zhijiang--发件人:Stephan
 Ewen 发送时间:2017年7月18日(星期二) 22:59收件人:dev@flink.apache.org 
主 题:Re: [DISCUSS] A more thorough Pull Request check list 
and template
My thinking was exactly as echoed by Gordon and Ufuk:

  - The yes/no sections are also for reviewers a reminder of what to watch
out for.
Let's face it, probably half of the committers are not aware that these
things need to be checked implicitly against every change.
A good part of the recent issues came from exactly that. Changes get
merged (because the pull request lingered or the number of open PRs is
high) and these implications are not thought through.

  - This is to me a tradeoff between requiring explicit +1s from certain
people (maintainers) for certain components, and getting an awareness into
everybody's mind.

  - It also makes all users aware that these things are considered and
implicitly manages expectations in how fast can things get merged.


Concerning the long text: I think it is fine to play the ball a bit more to
the contributors.
Making it easy, yes. But also making it correct and well. We need to make
contributors aware of what it means to contribute to a system to runs
highly available critical infrastructure. There is quite often still the
mindset of "hey, cool, open source, let me throw something out there".

My take is that anyone who is serious about contributing and serious about
quality is not put off by this template.

Concerning the introductory text: I bet that rarely anyone reads the "how
to contribute" guide. Before the template, virtually no new pull request
had even the required naming.
That text needs to be in the template, or we might as well not have it
anywhere at all.



Just for reference: Below is the introductory text of the JDK ;-)

5. Know what to expect

Only the best patches submitted will actually make it all the way into a
JDK code base. The goal is not to take in the maximum number of
contributions possible, but rather to accept only the highest-quality
contributions. The JDK is used daily by millions of people and thousands of
businesses, often in mission-critical applications, and so we can't afford
to accept anything less than the very best.

If you're relatively new to the Java platform then we recommend that you
gain more experience writing Java applications before you attempt to work
on the JDK itself. The purpose of the sponsored-contribution process is to
bring developers who already have the skills required to work on the JDK
into the existing development community. The members of that community have
neither the time nor the patience required to teach basic Java programming
skills or platform implementation techniques.





On Tue, Jul 18, 2017 at 12:15 PM, Ufuk Celebi  wrote:

> On Tue, Jul 18, 2017 at 10:47 AM, Fabian Hueske  wrote:
> > For example even if the question about changed dependencies is answered
> > with "no", the reviewer still has to check that.
>
> But having it as a required option/text in the PR descriptions helps
> reviewers to actually remember to check that. I think we should be
> more realistic here and assume that reviewers will also overlook
> things etc.
>
> To me, keeping the questions is more important than the intro text.
> Therefore, I would be OK with moving the text to the contrib guide,
> but I would definitely keep the detailed yes/nos and not go with high
> level questions that everyone will answer differently.
>
> – Ufuk
>



回复:An addition to Netty's memory footprint

2017-06-30 Thread Zhijiang(wangzhijiang999)
Based on Kurt's scenario, if the cumulator allocates a big ByteBuf from 
ByteBufAllocator during expansion, it is easy to result in creating a new 
PoolChunk(16M) because of no consistent memory in current PoolChunks. And this 
will cause the total used direct memory beyond estimated.

For further explaination:1. Each PoolArena maintains a list of PoolChunks and 
the PoolChunk is grouped into different lists based on memory usages.2. Each 
PoolChunk contains a list of subpage(8K) which are constructed a complete 
balanced binary tree for allocating memory easily.3. When allocating a length 
memory from ByteBufAllocator, PoolArena will try to loop all the current 
internal PoolChunks to find the enough consistent memory. If not found , it 
will create a new chunk.
For example, if the memory usage for a chunk is 50%, that means there are 8M 
room available for this chunk. If the length of memory allocation is small, 
this chunk can satisfy in most cases.But if the length is big like 1M, the 
remainder 50% space may not satisfy because all the available subpages are not 
under the same parent node in the tree.
After the network improvement mentioned in Stephan's FLIP, the direct memory 
usage by netty PooledByteBuffer can be largely reduced and under controlled 
easily.
cheers,zhijiang
--发件人:Kurt 
Young 发送时间:2017年6月30日(星期五) 15:51收件人:dev 
; user 主 题:An addition to Netty's 
memory footprint
Hi,
Ufuk had write up an excellent document about Netty's memory allocation [1] 
inside Flink, and i want to add one more note after running some large scale 
jobs.
The only inaccurate thing about [1] is how much memory will 
LengthFieldBasedFrameDecoder use. From our observations, it will cost at most 
4M for each physical connection. 
Why(tl;dr): the reason is ByteToMessageDecoder which is the base class of 
LengthFieldBasedFrameDecoder used a Cumulator to save the bytes for further 
decoding. The Cumulator will try to discard some read bytes to make room in the 
buffer when channelReadComplete is triggered. In most cases, 
channelReadComplete will only be triggered by AbstractNioByteChannel after 
which has read "maxMessagesPerRead" times. The default value for 
maxMessagesPerRead is 16. So in worst case, the Cumulator will write up to 1M 
(64K * 16) data. And due to the logic of ByteBuf's discardSomeReadBytes, the 
Cumulator will expand to 4M.
We add an option to tune the maxMessagesPerRead, set it to 2 and everything 
works fine. I know Stephan is working on network improvements, it will be a 
good choice to replace the whole netty pipeline with Flink's own 
implementation. But I think we will face some similar logics when implementing, 
careful about this.
BTW, should we open a jira to add this config?

[1] https://cwiki.apache.org/confluence/display/FLINK/Netty+memory+allocation


回复:[ANNOUNCE] New Flink committer Shaoxuan Wang

2017-06-22 Thread Zhijiang(wangzhijiang999)
Congratulations, Shaoxuan!
Cheers,Zhijiang--发件人:Fabian
 Hueske 发送时间:2017年6月22日(星期四) 04:19收件人:dev@flink.apache.org 
主 题:[ANNOUNCE] New Flink committer Shaoxuan Wang
Hi everybody,

On behalf of the PMC, I'm very happy to announce that Shaoxuan Wang has
accepted the invitation of the PMC to become a Flink committer.

Shaoxuan has contributed several major features to the Table API / SQL and
is very engaged in discussions about the design of new features and the
future direction of Flink's relational APIs.

Please join in me congratulating Shaoxuan for becoming a Flink committer.

Thanks, Fabian



回复:[DISCUSS] Feature Freeze

2017-04-27 Thread Zhijiang(wangzhijiang999)
Hi Ufuk,
Thank you for launching this topic!
I wish my latest refinement of buffer provider 
(https://issues.apache.org/jira/browse/FLINK-6337)  to be included in 1.3 and 
most of the jobs can get benefit from it. And I think it can be completed with 
the help of your reviews this week.
Cheers,Zhijiang--发件人:Ufuk
 Celebi 发送时间:2017年4月27日(星期四) 22:25收件人:dev 
抄 送:Robert Metzger 主 题:[DISCUSS] 
Feature Freeze
Hey devs! :-)

We decided to follow a time-based release model with the upcoming 1.3
release and the planned feature freeze is on Monday, May 1st.

I wanted to start a discussion to get a quick overview of the current
state of things.

- Is everyone on track and aware of the feature freeze? ;)
- Are there any major features we want in 1.3 that have not been merged yet?
- Do we need to extend the feature freeze, because of an important feature?

Would be great to gather a list of features/PRs that we want in the
1.3 release. This could be a good starting point for the release
manager (@Robert?).

Best,

Ufuk


回复:[jira] [Created] (FLINK-6057) Better default needed for num network buffers

2017-03-15 Thread Zhijiang(wangzhijiang999)
Hi Luke,
       Yes, it is really not very friendly for users to set the number of 
network buffers. And it may cause OOM exception if not set the proper amount 
for large scale job.
The proper amount should be calculated automatically by framework based on the 
number of input and output channels for each task, and @Nico Kruber is working 
on 
this feature now. You can check this jira for some details 
"https://issues.apache.org/jira/browse/FLINK-4545;.
Cheers
Zhijiang
--发件人:Luke 
Hutchison (JIRA) 发送时间:2017年3月15日(星期三) 17:01收件人:dev 
主 题:[jira] [Created] (FLINK-6057) Better default needed 
for num network buffers
Luke Hutchison created FLINK-6057:
-

 Summary: Better default needed for num network buffers
 Key: FLINK-6057
 URL: https://issues.apache.org/jira/browse/FLINK-6057
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison


Using the default environment,

{code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
{code}

my code will sometimes fail with an error that Flink ran out of network 
buffers. To fix this, I have to do:

{code}
int numTasks = Runtime.getRuntime().availableProcessors();
config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, numTasks);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTasks);
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 
numTasks * 2048);
{code}

The default value of 2048 fails when I increase the degree of parallelism for a 
large Flink pipeline (hence the fix to set the number of buffers to numTasks * 
2048).

This is particularly problematic because a pipeline can work fine on one 
machine, and when you start the pipeline on a machine with more cores, it can 
fail.

The default execution environment should pick a saner default based on the 
level of parallelism (or whatever is needed to ensure that the number of 
network buffers is not going to be exceeded for a given execution environment).




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


回复:Dataset and eager scheduling

2017-03-02 Thread Zhijiang(wangzhijiang999)
Hi,
    From my understand,  if you do not care resource waste and confirm there 
are enough resources in cluster, you can set EAGER schedule mode for batch job.
From optimizer aspect, if not set the PIPELINED_FORCED hint for 
ExecutionMode, for some special topology cases, the optimizer would consider 
BATCH DataExchangeMode to avoid dead lock risk. That means the producer tasks 
should first deploy and output the data. After the producer tasks finish, the 
consumer tasks will be scheduled and start to consume data.And it is exactly 
the case of FROM_SOURCE schedule mode. For this case, if use EAGER mode for 
replacement, the consumer task may be do nothing after startup until the 
producer tasks finish, so it wastes resources.  But for PIPELINED 
DataExchangeMode, EAGER schedule mode can make sense because the consumer task 
can request data once the producer task ouput the first data.
Maybe my understanding is not very accurate, welcome any discuss!

Cheers,
zhijiang
--发件人:CPC 
发送时间:2017年3月2日(星期四) 18:52收件人:dev 主 
题:Dataset and eager scheduling
Hi all,

Currently our team trying implement a runtime operator also playing with
scheduler. We are trying to understand batch optimizer but it will take
some time. What we want to know is whether changing batch scheduling mode
from LAZY_FROM_SOURCES to EAGER could affect optimizer? I mean whether
optimizer have some strong assumptions that batch jobs scheduling mode is
always lazy_from_sources?

Thanks...