Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-10 Thread Eron Wright
Jingsong, regarding the LogStore abstraction, I understand that you want to
retain some flexibility as the implementation evolves.  It makes sense that
the abstract interfaces would be @Internal for now.  Would you kindly
ensure the minimal extensibility is in place, so that the Pulsar dev
community may hack on a prototype implementation?

I believe this is important for maintaining the perception that Flink
doesn't unduly favor Kafka.

-Eron

On Tue, Nov 9, 2021 at 6:53 PM Jingsong Li  wrote:

> Hi all,
>
> I have started the voting thread [1]. Please cast your vote there or
> ask additional questions here.
>
> [1] https://lists.apache.org/thread/v3fzx0p6n2jogn86sptzr30kr3yw37sq
>
> Best,
> Jingsong
>
> On Mon, Nov 1, 2021 at 5:41 PM Jingsong Li  wrote:
> >
> > Hi Till,
> >
> > Thanks for your suggestion.
> >
> > At present, we do not want users to use other storage implementations,
> > which will undoubtedly require us to propose interfaces and APIs with
> > compatibility guarantee, which will complicate our design. And some
> > designs are constantly changing, we will constantly adjust according
> > to the needs of end users.
> >
> > However, this does not prevent us from proposing some internal
> > interfaces, such as ManagedTableStorageProvider you said, which can
> > make our code more robust and testable. However, these interfaces will
> > not be public, which means that we have no compatibility burden.
> >
> > Best,
> > Jingsong
> >
> > On Mon, Nov 1, 2021 at 3:57 PM Till Rohrmann 
> wrote:
> > >
> > > Hi Kurt,
> > >
> > > Thanks a lot for the detailed explanation. I do see that implementing
> this
> > > feature outside of Flink will be a bigger effort because we probably
> have
> > > to think more about the exact interfaces and contracts. On the other
> hand,
> > > I can also imagine that users might want to use different storage
> > > implementations (e.g. Pulsar instead of Kafka for the changelog
> storage) at
> > > some point.
> > >
> > > Starting with a feature branch and keeping this question in mind is
> > > probably a good compromise. Getting this feature off the ground in
> order to
> > > evaluate it with users is likely more important than thinking of all
> > > possible storage implementations and how to arrange the code. In case
> we
> > > should split it, maybe we need something like a
> ManagedTableStorageProvider
> > > that encapsulates the logic where to store the managed tables.
> > >
> > > Looking forward to this feature and the improvements it will add to
> Flink's
> > > SQL usability :-)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Nov 1, 2021 at 2:46 AM Kurt Young  wrote:
> > >
> > > > Hi Till,
> > > >
> > > > We have discussed the possibility of putting this FLIP into another
> > > > repository offline
> > > > with Stephan and Timo. This looks similar with another under going
> effort
> > > > which trying
> > > > to put all connectors outside the Flink core repository.
> > > >
> > > > From the motivation and scope of this FLIP, it's quite different from
> > > > current connectors in
> > > > some aspects. What we are trying to offer is some kind of built-in
> storage,
> > > > or we can call it
> > > > internal/managed tables, compared with current connectors, they kind
> of
> > > > express external
> > > > tables of Flink SQL. Functionality-wise, this managed table would
> have more
> > > > ability than
> > > > all these connectors, since we controlled the implementation of such
> > > > storage. Thus this table
> > > > storage will interact with lots SQL components, like metadata
> handling,
> > > > optimization, execution,
> > > > etc.
> > > >
> > > > However we do see some potential benefits if we choose to put it
> outside
> > > > Flink:
> > > > - We may achieve more rapid development speed and maybe more frequent
> > > > release.
> > > > - Force us to think really clearly about the interfaces it should be,
> > > > because we don't have
> > > > the shortcut to modify core & connector codes all at the same time.
> > > >
> > > > But we also can't ignore the overhead:
> > > > - We almost need everything that is discussed in the splitting
> connectors
> > > > thread.
> > > > - We have to create lots more interface than TableSource/TableSink
> to make
> > > > it just work at the first
> > > > place, e.g. interfaces to express such tables should be managed by
> Flink,
> > > > interfaces to express the
> > > > physical capability of the storage then it can be bridged to SQL
> optimizer
> > > > and executor.
> > > > - If we create lots of interfaces with only one implementation, that
> sounds
> > > > overengineering to me.
> > > >
> > > > Combining the pros and cons above, what we are trying to do is
> firstly
> > > > implement it in a feature branch,
> > > > and also keep good engineering and design in mind. At some point we
> > > > re-evaluate the decision whether
> > > > to put it inside or outside the Flink core. What do you think?
> > > >
> > > > Best,
> > > > Kurt
> > > 

Re: [DISCUSS] FLIP-180: Adjust StreamStatus and Idleness definition

2021-07-20 Thread Eron Wright
This proposal to narrow the definition of idleness to focus on the
event-time clock is great.

Please mention that the "temporary status toggle" code will be removed.

I agree with adding the markActive() functionality, for symmetry.  Speaking
of symmetry, could we now include the minor enhancement we discussed in
FLIP-167, the exposure of watermark status changes on the Sink interface.
I drafted a PR and would be happy to revisit it.
https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70

The flip mentions a 'watermarkstatus' package for the WatermarkStatus
class.  Should it be 'eventtime' package?

Regarding the change of 'streamStatus' to 'watermarkStatus', could you
spell out what the new method names will be on each interface? May I
suggest that Input.emitStreamStatus be Input.processStreamStatus?  This is
to help decouple the input's watermark status from the output's watermark
status.

I observe that AbstractStreamOperator is hardcoded to derive the output
channel's status from the input channel's status.  May I suggest
we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" to
allow for the operator subclass to customize the processing of the
aggregated watermark and watermark status.

Maybe the FLIP should spell out the expected behavior of the generic
watermark generator (TimestampsAndWatermarksOperator).  Should the
generator ignore the upstream idleness signal?  I believe it propagates the
signal, even though it also generates its own signals.   Given that
source-based and generic watermark generation shouldn't be combined, one
could argue that the generic watermark generator should activate only when
its input channel's watermark status is idle.

Thanks again for this effort!
-Eron


On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise  wrote:

> Dear devs,
>
> We recently discovered that StreamStatus and Idleness is insufficiently
> defined [1], so I drafted a FLIP [3] to amend that situation. It would be
> good to hear more opinions on that matter. Ideally, we can make the changes
> to 1.14 as some newly added methods are affected.
>
> Best,
>
> Arvid
>
> [1]
>
> https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E
> [2]
>
> https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>


Re: [DISCUSS] FLIP-182: Watermark alignment

2021-07-12 Thread Eron Wright
The notion of per-split watermarks seems quite interesting.  I think the
idleness feature could benefit from a per-split approach too, because
idleness is typically related to whether any splits are assigned to a given
operator instance.


On Mon, Jul 12, 2021 at 3:06 AM 刘建刚  wrote:

> +1 for the source watermark alignment.
> In the previous flink version, the source connectors are different in
> implementation and it is hard to make this feature. When the consumed data
> is not aligned or consuming history data, it is very easy to cause the
> unalignment. Source alignment can resolve many unstable problems.
>
> Seth Wiesman  于2021年7月9日周五 下午11:25写道:
>
> > +1
> >
> > In my opinion, this limitation is perfectly fine for the MVP. Watermark
> > alignment is a long-standing issue and this already moves the ball so far
> > forward.
> >
> > I don't expect this will cause many issues in practice, as I understand
> it
> > the FileSource always processes one split at a time, and in my
> experience,
> > 90% of Kafka users have a small number of partitions scale their
> pipelines
> > to have one reader per partition. Obviously, there are larger-scale Kafka
> > topics and more sources that will be ported over in the future but I
> think
> > there is an implicit understanding that aligning sources adds latency to
> > pipelines, and we can frame the follow-up "per-split" alignment as an
> > optimization.
> >
> > On Fri, Jul 9, 2021 at 6:40 AM Piotr Nowojski 
> > wrote:
> >
> > > Hey!
> > >
> > > A couple of weeks ago me and Arvid Heise played around with an idea to
> > > address a long standing issue of Flink: lack of watermark/event time
> > > alignment between different parallel instances of sources, that can
> lead
> > to
> > > ever growing state size for downstream operators like WindowOperator.
> > >
> > > We had an impression that this is relatively low hanging fruit that can
> > be
> > > quite easily implemented - at least partially (the first part mentioned
> > in
> > > the FLIP document). I have written down our proposal [1] and you can
> also
> > > check out our PoC that we have implemented [2].
> > >
> > > We think that this is a quite easy proposal, that has been in large
> part
> > > already implemented. There is one obvious limitation of our PoC. Namely
> > we
> > > can only easily block individual SourceOperators. This works perfectly
> > fine
> > > as long as there is at most one split per SourceOperator. However it
> > > doesn't work with multiple splits. In that case, if a single
> > > `SourceOperator` is responsible for processing both the least and the
> > most
> > > advanced splits, we won't be able to block this most advanced split for
> > > generating new records. I'm proposing to solve this problem in the
> future
> > > in another follow up FLIP, as a solution that works with a single split
> > per
> > > operator is easier and already valuable for some of the users.
> > >
> > > What do you think about this proposal?
> > > Best, Piotrek
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> > > [2] https://github.com/pnowojski/flink/commits/aligned-sources
> > >
> >
>


PR: "Propagate watermarks to sink API"

2021-06-21 Thread Eron Wright
Would someone be willing and able to review the PR which adds watermarks to
the sink API?

https://github.com/apache/flink/pull/15950

Thanks!
Eron


Re: [DISCUSS] Definition of idle partitions

2021-06-10 Thread Eron Wright
I quickly updated the draft PR that would propagate idleness information to
the Sink function, based on the recent improvement provided by
FLINK-18934.  For illustration purposes.
https://github.com/streamnative/flink/pull/2

On Thu, Jun 10, 2021 at 11:34 AM Eron Wright 
wrote:

> Regarding records vs watermarks, I feel it is wrong to include records in
> the considerations, because the clearest definition of idleness (IMO) is
> 'active participation in advancing the event-time clock', and records don't
> directly affect the clock.  Of course, records indirectly influence the
> clock by stimulating a generator.
>
> Let's focus on the problem that Arvid mentioned about the need to briefly
> toggle idleness (as implemented by the AnnouncedStatus class).  Seems to me
> that the idleness of an operator's inputs need not strictly determine
> whether its output is idle.  The operator should be able to react to status
> changes on a given input (implemented in FLINK-18934), and this MAY cause a
> change to the output status at the operator's discretion.  The default
> behavior would be passthrough.  Meanwhile, when a given operator emits a
> watermark, it is re-asserting itself as a participant in advancing the
> downstream event time clock, and its output channel should transition to
> active and remain active.  An operator should also be able to mark its
> output channel(s) as idle, to complete the framework.
>
> In concept, a watermark generator somewhere in the pipeline could 'take
> control' of the event time clock when its input channel transitions to
> idle.  The upstream source is relinquishing control of the clock in that
> situation.
>
> BTW, I recommend looking at the PR of FLINK-18934 because it lays bare the
> whole pipeline.  Nice work there Dawid!  To better reflect the decoupling
> of input from output idleness, "AbstractStreamOperator::emitStreamStatus"
> should be named "processStreamStatus" and call an overridable method to
> emit the status change whenever the combined idleness flips.  This would
> facilitate an idleness-aware watermark generator and an idleness-aware sink.
>
>
> On Thu, Jun 10, 2021 at 3:31 AM Till Rohrmann 
> wrote:
>
>> Thanks for providing these details Gordon. I have to admit that I do not
>> fully follow the reasoning why periodic watermark generators forced us to
>> define idleness for records. Is it because the idleness was generated
>> based
>> on the non-availability of more data in the sources and not in the
>> watermark generators which are executed after the records have been read
>> from the external system? So was the problem where the stream status was
>> decided in the end?
>>
>> If there is a periodic watermark generator somewhere in the pipeline that
>> periodically generates watermarks, then we don't have to mark its output
>> channels as watermark idle because watermarks are being sent. Hence, given
>> that the watermark generation logic makes sense, the overall job should be
>> able to make progress. If the watermark generator is informed about its
>> input channel status, it could even decide whether to propagate the
>> watermark idleness and stop generating watermarks or not. Of course, this
>> leaves room for people shooting themselves into their feet.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 10, 2021 at 5:44 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> > Forgot to provide the link to the [1] reference:
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-5017
>> >
>> > On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org>
>> > wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > Sorry for chiming in late here.
>> > >
>> > > Regarding the topic of changing the definition of StreamStatus and
>> > > changing the name as well:
>> > > After digging into some of the roots of this implementation [1],
>> > initially
>> > > the StreamStatus was actually defined to mark "watermark idleness",
>> and
>> > not
>> > > "record idleness" (in fact, the alternative name "WatermarkStatus" was
>> > > considered at the time).
>> > >
>> > > The concern at the time causing us to alter the definition to be
>> "record
>> > > idleness" in the final implementation was due to the existence of
>> > periodic
>> > > timestamp / watermark generators within the pipeline. Those would
>> > continue
>> > > to generate non-increasing watermarks in the absence of any input

Re: [DISCUSS] Definition of idle partitions

2021-06-10 Thread Eron Wright
rwise, I don't see other problems with changing the definition
> here.
> > >
> > > Thanks,
> > > Gordon
> > >
> > > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise  wrote:
> > >
> > >> Hi Eron,
> > >>
> > >> again to recap from the other thread:
> > >> - You are right that idleness is correct with static assignment and
> > fully
> > >> active partitions. In this case, the source defines idleness. (case A)
> > >> - For the more pressing use cases of idle, assigned partitions, the
> user
> > >> defines an idleness threshold, and it becomes potentially incorrect,
> > when
> > >> the partition becomes active again. (case B)
> > >> - Same holds for dynamic assignment of splits. If a source without a
> > split
> > >> gets a split assigned dynamically, there is a realistic chance that
> the
> > >> watermark advanced past the first record of the newly assigned split.
> > >> (case
> > >> C)
> > >> You can certainly insist that only the first case is valid (as it's
> > >> correct) but we know that users use it in other ways and that was also
> > the
> > >> intent of the devs.
> > >>
> > >> Now the question could be if it makes sense to distinguish these
> cases.
> > >> Would you treat the idleness information differently (especially in
> the
> > >> sink/source that motivated FLIP-167) if you knew that the idleness is
> > >> guaranteed correct?
> > >> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
> > >> (case B).
> > >>
> > >> However, that would still leave case C, which probably would need to
> be
> > >> solved completely differently. I could imagine that a source with
> > dynamic
> > >> assignments should never have IDLE subtasks and rather manage the
> > idleness
> > >> itself. For example, it could emit a watermark per second/minute that
> is
> > >> directly fetched from the source system. I'm just not sure if the
> > current
> > >> WatermarkAssigner interface suffices in that regard...
> > >>
> > >>
> > >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <
> piotr.nowoj...@gmail.com
> > >
> > >> wrote:
> > >>
> > >> > Hi Eron,
> > >> >
> > >> > Can you elaborate a bit more what do you mean? I don’t understand
> what
> > >> do
> > >> > you mean by more general solution.
> > >> >
> > >> > As of now, stream is marked idle by a source/watermark generator,
> > which
> > >> > has an effect of temporarily ignoring this stream/partition from
> > >> > calculating min watermark in the downstream tasks. However stream is
> > >> > switching back to active when any record is emitted. This is what’s
> > >> causing
> > >> > problems described by Arvid.
> > >> >
> > >> > The core of our proposal is very simple. Keep everything as it is
> > except
> > >> > stating that stream will be changed back to active only once a
> > >> watermark is
> > >> > emitted again - not record. In other words disconnecting idleness
> from
> > >> > presence of records and connecting it only to presence or lack of
> > >> > watermarks and allowing to emit records while “stream status” is
> > “idle”
> > >> >
> > >> > Piotrek
> > >> >
> > >> >
> > >> > > Wiadomość napisana przez Eron Wright  > >> .invalid>
> > >> > w dniu 09.06.2021, o godz. 06:01:
> > >> > >
> > >> > > It seems to me that idleness was introduced to deal with a very
> > >> specific
> > >> > > issue.  In the pipeline, watermarks are aggregated not on a
> > per-split
> > >> > basis
> > >> > > but on a per-subtask basis.  This works well when each subtask has
> > >> > exactly
> > >> > > one split.  When a sub-task has multiple splits, various
> > complications
> > >> > > occur involving the commingling of watermarks.  And when a
> sub-task
> > >> has
> > >> > no
> > >> > > splits, the pipeline stalls altogether.  To deal with the latter
> > >> problem,
> > >> > > idleness was introduced.  The

Re: [VOTE] Watermark propagation with Sink API

2021-06-10 Thread Eron Wright
We have 7 binding votes and no objections, so FLIP-167 passed.

Here's the implementation issue and PR which is ready for review.
https://issues.apache.org/jira/browse/FLINK-22700
https://github.com/apache/flink/pull/15950

Thank-you everyone.

On Thu, Jun 10, 2021 at 3:32 AM Arvid Heise  wrote:

> Reconfirming my +1 (binding).
>
> On Thu, Jun 10, 2021 at 12:14 PM Till Rohrmann 
> wrote:
>
> > +1,
> >
> > Cheers,
> > Till
> >
> > On Thu, Jun 10, 2021 at 4:59 AM Tzu-Li (Gordon) Tai  >
> > wrote:
> >
> > > +1
> > >
> > > On Thu, Jun 10, 2021 at 9:04 AM jincheng sun  >
> > > wrote:
> > >
> > > > +1 (binding)  // Sorry for the late reply.
> > > >
> > > > Best,
> > > > Jincheng
> > > >
> > > >
> > > > Piotr Nowojski  于2021年6月9日周三 下午10:04写道:
> > > >
> > > > > Thanks for driving this Eron, and sorry for causing the delay.
> > > > >
> > > > > +1 (binding) from my side
> > > > >
> > > > > Piotrek
> > > > >
> > > > > wt., 8 cze 2021 o 23:48 Eron Wright  > .invalid>
> > > > > napisał(a):
> > > > >
> > > > > > Voting is re-open for FLIP-167 as-is (without idleness support as
> > was
> > > > the
> > > > > > point of contention).
> > > > > >
> > > > > > On Fri, Jun 4, 2021 at 10:45 AM Eron Wright <
> > ewri...@streamnative.io
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Little update on this, more good discussion over the last few
> > days,
> > > > and
> > > > > > > the FLIP will probably be amended to incorporate idleness.
> >  Voting
> > > > > will
> > > > > > > remain open until, let's say, mid-next week.
> > > > > > >
> > > > > > > On Thu, Jun 3, 2021 at 8:00 AM Piotr Nowojski <
> > > pnowoj...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> I would like to ask you to hold on with counting the votes
> > until I
> > > > get
> > > > > > an
> > > > > > >> answer for my one question in the dev mailing list thread
> (sorry
> > > if
> > > > it
> > > > > > was
> > > > > > >> already covered somewhere).
> > > > > > >>
> > > > > > >> Best, Piotrek
> > > > > > >>
> > > > > > >> czw., 3 cze 2021 o 16:12 Jark Wu 
> napisał(a):
> > > > > > >>
> > > > > > >> > +1 (binding)
> > > > > > >> >
> > > > > > >> > Best,
> > > > > > >> > Jark
> > > > > > >> >
> > > > > > >> > On Thu, 3 Jun 2021 at 21:34, Dawid Wysakowicz <
> > > > > dwysakow...@apache.org
> > > > > > >
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> > > +1 (binding)
> > > > > > >> > >
> > > > > > >> > > Best,
> > > > > > >> > >
> > > > > > >> > > Dawid
> > > > > > >> > >
> > > > > > >> > > On 03/06/2021 03:50, Zhou, Brian wrote:
> > > > > > >> > > > +1 (non-binding)
> > > > > > >> > > >
> > > > > > >> > > > Thanks Eron, looking forward to seeing this feature
> soon.
> > > > > > >> > > >
> > > > > > >> > > > Thanks,
> > > > > > >> > > > Brian
> > > > > > >> > > >
> > > > > > >> > > > -Original Message-
> > > > > > >> > > > From: Arvid Heise 
> > > > > > >> > > > Sent: Wednesday, June 2, 2021 15:44
> > > > > > >> > > > To: dev
> > > > > > >> > > > Subject: Re: [VOTE] Watermark propagation with Sink API
> > > > > > >> > > >
> > > > > > >> 

Re: [DISCUSS] Definition of idle partitions

2021-06-08 Thread Eron Wright
It seems to me that idleness was introduced to deal with a very specific
issue.  In the pipeline, watermarks are aggregated not on a per-split basis
but on a per-subtask basis.  This works well when each subtask has exactly
one split.  When a sub-task has multiple splits, various complications
occur involving the commingling of watermarks.  And when a sub-task has no
splits, the pipeline stalls altogether.  To deal with the latter problem,
idleness was introduced.  The sub-task simply declares itself to be idle to
be taken out of consideration for purposes of watermark aggregation.

If we're looking for a more general solution, I would suggest we discuss
how to track watermarks on a per-split basis.  Or, as Till mentioned
recently, an alternate solution may be to dynamically adjust the
parallelism of the task.

I don't agree with the notion that idleness involves a correctness
tradeoff.  The facility I described above has no impact on correctness.
Meanwhile, various watermark strategies rely on heuristics involving the
processing-time domain, and the term idleness seems to have found purchase
there too.  The connection among the concepts seems tenuous.

-E



On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski  wrote:

> Hi Arvid,
>
> Thanks for writing down this summary and proposal. I think this was the
> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
> that idleness is intermittent, strictly a task local concept and as such
> shouldn't be exposed in for example sinks. While me and Eron thought that
> it's a concept strictly connected to watermarks.
>
> 1. I'm big +1 for changing the StreamStatus definition to stream "providing
> watermark" and "not providing watermark". With respect to that I agree with
> Dawid that record bound idleness *(if we would ever need to define/expose
> it)* should be an intermittent concept, like for example the existing in
> the Task/runtime input availability (StreamTaskInput#isAvailable).
> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> I also don't have any good ideas.
> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>
> Best,
> Piotrek
>
> wt., 8 cze 2021 o 16:35 Arvid Heise  napisał(a):
>
> > Hi devs,
> >
> > While discussing "Watermark propagation with Sink API" and during
> > "[FLINK-18934] Idle stream does not advance watermark in connected
> stream",
> > we noticed some drawbacks on how Flink defines idle partitions currently.
> >
> > To recap, idleness was always considered as a means to achieve progress
> in
> > window operators with idle partition in the source at the risk of losing
> a
> > bit of correctness. In particular, records could be considered late,
> simply
> > because of that idleness timeout and not because they arrived out of
> order.
> > A potential reprocessing would not be causing these records to be
> > considered late and we may end up with a different (correct) result.
> >
> > The drawbacks that we discovered are as follows:
> > - We currently only use idleness to exclude respective upstream tasks
> from
> > participating in watermark generation.
> > - However, the definition is bound to records. [1] In particular, while a
> > partition is idle, no records should be produced.
> > - That brings us into quite a few edge cases, where operators emit
> records,
> > while they are actually idling: Think of timers, asyncIO operators,
> window
> > operators based on timeouts, etc. that trigger on an operator ingesting
> an
> > idle partition.
> > - The proper solution would be to turn the operator active while emitting
> > and to return to being idle afterwards (but when?). However, this has
> some
> > unintended side-effects depending on when you switch back:
> >   - If you toggle stream status for each record, you get a huge overhead
> on
> > stream status records and quite a bit of processing in downstream
> operators
> > (that code path is not much optimized since switching is considered a
> rare
> > thing).
> >   - If you toggle after a certain time, you may get delays>idleness in
> the
> > downstream window operators.
> >   - You could turn back when you processed all pending mails, but if you
> > have a self-replicating mail that would be never. Self-enqueueing, low
> > timer would also produce a flood similar to the first case.
> >
> > All in all, the situation is quite unsatisfying because idleness implies
> no
> > records. However, currently there is no need to have that implication:
> > since we only use it for watermarks, we can easily allow records to be
> > emitted (in fact that was the old behavior before FLINK-18934 in many
> > cases) and still get the intended behavior in respect to watermarks:
> > - A channel that is active is providing watermarks.
> > - An idle channel is not providing any watermarks but can deliver
> records.
> >
> > Ultimately, that would mean that we are actually not talking idle/active
> > partitions anymore. We are talking more about whether a particular
> 

Re: [VOTE] Watermark propagation with Sink API

2021-06-08 Thread Eron Wright
Voting is re-open for FLIP-167 as-is (without idleness support as was the
point of contention).

On Fri, Jun 4, 2021 at 10:45 AM Eron Wright  wrote:

> Little update on this, more good discussion over the last few days, and
> the FLIP will probably be amended to incorporate idleness.   Voting will
> remain open until, let's say, mid-next week.
>
> On Thu, Jun 3, 2021 at 8:00 AM Piotr Nowojski 
> wrote:
>
>> I would like to ask you to hold on with counting the votes until I get an
>> answer for my one question in the dev mailing list thread (sorry if it was
>> already covered somewhere).
>>
>> Best, Piotrek
>>
>> czw., 3 cze 2021 o 16:12 Jark Wu  napisał(a):
>>
>> > +1 (binding)
>> >
>> > Best,
>> > Jark
>> >
>> > On Thu, 3 Jun 2021 at 21:34, Dawid Wysakowicz 
>> > wrote:
>> >
>> > > +1 (binding)
>> > >
>> > > Best,
>> > >
>> > > Dawid
>> > >
>> > > On 03/06/2021 03:50, Zhou, Brian wrote:
>> > > > +1 (non-binding)
>> > > >
>> > > > Thanks Eron, looking forward to seeing this feature soon.
>> > > >
>> > > > Thanks,
>> > > > Brian
>> > > >
>> > > > -Original Message-
>> > > > From: Arvid Heise 
>> > > > Sent: Wednesday, June 2, 2021 15:44
>> > > > To: dev
>> > > > Subject: Re: [VOTE] Watermark propagation with Sink API
>> > > >
>> > > >
>> > > > [EXTERNAL EMAIL]
>> > > >
>> > > > +1 (binding)
>> > > >
>> > > > Thanks Eron for driving this effort; it will open up new exciting
>> use
>> > > cases.
>> > > >
>> > > > On Tue, Jun 1, 2021 at 7:17 PM Eron Wright > > > .invalid>
>> > > > wrote:
>> > > >
>> > > >> After some good discussion about how to enhance the Sink API to
>> > > >> process watermarks, I believe we're ready to proceed with a vote.
>> > > >> Voting will be open until at least Friday, June 4th, 2021.
>> > > >>
>> > > >> Reference:
>> > > >>
>> > > >>
>> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/displa
>> > > >>
>> y/FLINK/FLIP-167*3A*Watermarks*for*Sink*API__;JSsrKys!!LpKI!zkBBhuqEEi
>> > > >> fxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-GCBb-PTq$
>> > > >> [cwiki[.]apache[.]org]
>> > > >>
>> > > >> Discussion thread:
>> > > >>
>> > > >>
>> > https://urldefense.com/v3/__https://lists.apache.org/thread.html/r5194
>> > > >>
>> e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e*40*3Cdev.flink.ap
>> > > >> ache.org
>> > *3E__;JSUl!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viF
>> > > >> WXPLul-GJXlxwqN$ [lists[.]apache[.]org]
>> > > >>
>> > > >> Implementation Issue:
>> > > >>
>> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
>> > > >>
>> K-22700__;!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-G
>> > > >> N6AJm4h$ [issues[.]apache[.]org]
>> > > >>
>> > > >> Thanks,
>> > > >> Eron Wright
>> > > >> StreamNative
>> > > >>
>> > >
>> > >
>> >
>>
>


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-08 Thread Eron Wright
Thanks, the narrowed FLIP-167 is fine for now.  I'll re-activate the vote
process.  Thanks!

On Tue, Jun 8, 2021 at 3:01 AM Till Rohrmann  wrote:

> Hi everyone,
>
> I do agree that Flink's definition of idleness is not fully thought through
> yet. Consequently, I would feel a bit uneasy to make it part of Flink's API
> right now. Instead, defining the proper semantics first and then exposing
> it sounds like a good approach forward. Hence, +1 for option number 1,
> which will also allow FLIP-167 to make progress.
>
> Concerning subtasks with no partitions assigned, would it make sense to
> terminate these tasks at some point? That way, the stream would be closed
> and there is no need to maintain a stream status. Of course, this also
> requires at some point that Flink can start new sources when new partitions
> appear.
>
> Cheers,
> Till
>
> On Tue, Jun 8, 2021 at 9:26 AM Piotr Nowojski 
> wrote:
>
> > Hi Eron,
> >
> > The FLIP-167 is narrow, but we recently discovered some problems with
> > current idleness semantics as Arvid explained. We are planning to
> present a
> > new proposal to redefine them. Probably as a part of it, we would need to
> > rename them. Given that, I think it doesn't make sense to expose idleness
> > to the sinks before we rename and define it properly. In other words:
> >
> > > 2. When the sink operator is idled, tell the sink function.
> >
> > We shouldn't expose stream status as a part of public API until it's
> > properly defined.
> >
> > I would propose one of the two things:
> > 1. Proceed with FLIP-167, without exposing idleness in the sinks YET.
> > Exposing idleness could be part of this next/future FLIP that would
> define
> > idleness in the first place.
> > 2. Block FLIP-167, until the idleness is fixed.
> >
> > I would vote for option number 1.
> >
> > Piotrek
> >
> > pon., 7 cze 2021 o 18:08 Eron Wright 
> > napisał(a):
> >
> > > Piotr, David, and Arvid, we've had an expansive discussion but
> ultimately
> > > the proposal is narrow.  It is:
> > > 1. When a watermark arrives at the sink operator, tell the sink
> function.
> > > 2. When the sink operator is idled, tell the sink function.
> > >
> > > With these enhancements, we will significantly improve correctness in
> > > multi-stage flows, and facilitate an exciting project in the Pulsar
> > > community.  Would you please lend your support to FLIP-167 so that we
> can
> > > land this enhancement for 1.14?  My deepest thanks!
> > >
> > > -Eron
> > >
> > >
> > >
> > >
> > > On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise  wrote:
> > >
> > > > Hi Eron,
> > > >
> > > > you either have very specific use cases in mind or have a
> misconception
> > > > about idleness in Flink with the new sources. The basic idea is that
> > you
> > > > have watermark generators only at the sources and the user supplies
> > them.
> > > > As a source author, you have no option to limit that. Here a bit of
> > > > background:
> > > >
> > > > We observed that many users that read from Kafka were confused about
> no
> > > > visible progress in their Flink applications because of some idle
> > > partition
> > > > and we introduced idleness subsequently. Idleness was always
> considered
> > > as
> > > > a means to achieve progress at the risk of losing a bit of
> correctness.
> > > > So especially in the case that you describe with a Pulsar partition
> > that
> > > is
> > > > empty but indefinitely active, the user needs to be able to use
> > idleness
> > > > such that downstream window operators progress.
> > > >
> > > > I hope to have clarified that "I wouldn't recommend using
> > withIdleness()
> > > > with source-based watermarks." would pretty much make the intended
> use
> > > case
> > > > not work anymore.
> > > >
> > > > ---
> > > >
> > > > Nevertheless, from the discussion with you and some offline
> discussion
> > > with
> > > > Piotr and Dawid, we actually found quite a bit of drawbacks from the
> > > > current definition of idleness:
> > > > - We currently only use idleness to exclude respective upstream tasks
> > > from
> > > > participating in watermark generation (as you have eloquently put
> > further
> > > 

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-07 Thread Eron Wright
Piotr, David, and Arvid, we've had an expansive discussion but ultimately
the proposal is narrow.  It is:
1. When a watermark arrives at the sink operator, tell the sink function.
2. When the sink operator is idled, tell the sink function.

With these enhancements, we will significantly improve correctness in
multi-stage flows, and facilitate an exciting project in the Pulsar
community.  Would you please lend your support to FLIP-167 so that we can
land this enhancement for 1.14?  My deepest thanks!

-Eron




On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise  wrote:

> Hi Eron,
>
> you either have very specific use cases in mind or have a misconception
> about idleness in Flink with the new sources. The basic idea is that you
> have watermark generators only at the sources and the user supplies them.
> As a source author, you have no option to limit that. Here a bit of
> background:
>
> We observed that many users that read from Kafka were confused about no
> visible progress in their Flink applications because of some idle partition
> and we introduced idleness subsequently. Idleness was always considered as
> a means to achieve progress at the risk of losing a bit of correctness.
> So especially in the case that you describe with a Pulsar partition that is
> empty but indefinitely active, the user needs to be able to use idleness
> such that downstream window operators progress.
>
> I hope to have clarified that "I wouldn't recommend using withIdleness()
> with source-based watermarks." would pretty much make the intended use case
> not work anymore.
>
> ---
>
> Nevertheless, from the discussion with you and some offline discussion with
> Piotr and Dawid, we actually found quite a bit of drawbacks from the
> current definition of idleness:
> - We currently only use idleness to exclude respective upstream tasks from
> participating in watermark generation (as you have eloquently put further
> up in the thread).
> - However, the definition is bound to records. So while a partition is
> idle, no records should be produced.
> - That brings us into quite a few edge cases, where operators emit records,
> while they are actually idling: Think of timers, asyncIO operators, window
> operators based on timeouts, etc.
> - The solution would be to turn the operator active while emitting and
> returning to being idle afterwards (but when?). However, this has some
> unintended side-effects depending on when you switch back.
>
> We are currently thinking that we should rephrase the definition to what
> you described:
> - A channel that is active is providing watermarks.
> - An idle channel is not providing any watermarks but can deliver records.
> - Then we are not talking about idle partitions anymore but explicit and
> implicit watermark generation and should probably rename the concepts.
> - This would probably mean that we also need an explicit markActive in
> source/sink to express that the respective entity now needs to wait for
> explicit watermarks.
>
> I'll open a proper discussion thread tomorrow.
>
> Note that we probably shouldn't rush this FLIP until we have clarified the
> semantics of idleness. We could also cut the scope of the FLIP to exclude
> idleness and go ahead without it (there should be enough binding votes
> already).
>
> On Sat, Jun 5, 2021 at 12:09 AM Eron Wright  .invalid>
> wrote:
>
> > I understand your scenario but I disagree with its assumptions:
> >
> > "However, the partition of A is empty and thus A is temporarily idle." -
> > you're assuming that the behavior of the source is to mark itself idle if
> > data isn't available, but that's clearly source-specific and not behavior
> > we expect to have in Pulsar source.  A partition may be empty
> indefinitely
> > while still being active.  Imagine that the producer is defending a
> lease -
> > "I'm here, there's no data, please don't advance the clock".
> >
> > "we bind idleness to wall clock time" - you're characterizing a specific
> > strategy (WatermarkStrategy.withIdleness()), not the inherent behavior of
> > the pipeline.  I wouldn't recommend using withIdleness() with
> source-based
> > watermarks.
> >
> > I do agree that dynamism in partition assignment can wreak havoc on
> > watermark correctness.  We have some ideas on the Pulsar side about that
> > too.  I would ask that we focus on the Flink framework and pipeline
> > behavior.  By offering a more powerful framework, we encourage stream
> > storage systems to "rise to the occasion" - treat event time in a
> > first-class way, optimize for correctness, etc.  In this case, FLIP-167
> is
> > setting the stage for evolution in Puls

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
I understand your scenario but I disagree with its assumptions:

"However, the partition of A is empty and thus A is temporarily idle." -
you're assuming that the behavior of the source is to mark itself idle if
data isn't available, but that's clearly source-specific and not behavior
we expect to have in Pulsar source.  A partition may be empty indefinitely
while still being active.  Imagine that the producer is defending a lease -
"I'm here, there's no data, please don't advance the clock".

"we bind idleness to wall clock time" - you're characterizing a specific
strategy (WatermarkStrategy.withIdleness()), not the inherent behavior of
the pipeline.  I wouldn't recommend using withIdleness() with source-based
watermarks.

I do agree that dynamism in partition assignment can wreak havoc on
watermark correctness.  We have some ideas on the Pulsar side about that
too.  I would ask that we focus on the Flink framework and pipeline
behavior.  By offering a more powerful framework, we encourage stream
storage systems to "rise to the occasion" - treat event time in a
first-class way, optimize for correctness, etc.  In this case, FLIP-167 is
setting the stage for evolution in Pulsar.

Thanks again Arvid for the great discussion.





On Fri, Jun 4, 2021 at 2:06 PM Arvid Heise  wrote:

> At least one big motivation is having (temporary) empty partitions. Let me
> give you an example, why imho idleness is only approximate in this case:
> Assume you have source subtask A, B, C that correspond to 3 source
> partitions and a downstream keyed window operator W.
>
> W would usually trigger on min_watermark(A, B, C). However, the partition
> of A is empty and thus A is temporarily idle. So W triggers on
> min_watermark(B, C). When A is now active again, the watermark implicitly
> is min_watermark(B, C) for A!
>
> Let's further assume that the source is filled by another pipeline before.
> This pipeline experiences technical difficulties for X minutes and could
> not produce into the partition of A, hence the idleness. When the upstream
> pipeline resumes it fills A with some records that are before
> min_watermark(B, C). Any watermark generated from these records is
> discarded as the watermark is monotonous. Therefore, these records will be
> considered late by W and discarded.
>
> Without idleness, we would have simply bocked W until the upstream pipeline
> fully recovers and we would not have had any late records. The same holds
> for any reprocessing where the data of partition A is continuous.
>
> If you look deeper, the issue is that we bind idleness to wall clock time
> (e.g. advance watermark after X seconds without data). Then we assume the
> watermark of the idle partition to be in sync with the slowest partition.
> However, in the case of hiccups, this assumption does not hold at all.
> I don't see any fix for that (easy or not easy) and imho it's inherent to
> the design of idleness.
> We lack information (why is no data coming) and have a heuristic to fix it.
>
> In the case of partition assignment where one subtask has no partition, we
> are probably somewhat safe. We know why no data is coming (no partition)
> and as long as we do not have dynamic partition assignment, there will
> never be a switch to active without restart (for the foreseeable future).
>
> On Fri, Jun 4, 2021 at 10:34 PM Eron Wright  .invalid>
> wrote:
>
> > Yes I'm talking about an implementation of idleness that is unrelated to
> > processing time.  The clear example is partition assignment to subtasks,
> > which probably motivated Flink's idleness functionality in the first
> place.
> >
> > On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise  wrote:
> >
> > > Hi Eron,
> > >
> > > Are you referring to an implementation of idleness that does not rely
> on
> > a
> > > wall clock but on some clock baked into the partition information of
> the
> > > source system?
> > > If so, you are right that it invalidates my points.
> > > Do you have an example on where this is used?
> > >
> > > With a wall clock, you always run into the issues that I describe since
> > you
> > > are effectively mixing event time and processing time...
> > >
> > >
> > > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright  > > .invalid>
> > > wrote:
> > >
> > > > Dawid, I think you're mischaracterizing the idleness signal as
> > > inherently a
> > > > heuristic, but Flink does not impose that.  A source-based watermark
> > (and
> > > > corresponding idleness signal) may well be entirely data-driven,
> > entirely
> > > > deterministic.  Basically you're underselli

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
Yes I'm talking about an implementation of idleness that is unrelated to
processing time.  The clear example is partition assignment to subtasks,
which probably motivated Flink's idleness functionality in the first place.

On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise  wrote:

> Hi Eron,
>
> Are you referring to an implementation of idleness that does not rely on a
> wall clock but on some clock baked into the partition information of the
> source system?
> If so, you are right that it invalidates my points.
> Do you have an example on where this is used?
>
> With a wall clock, you always run into the issues that I describe since you
> are effectively mixing event time and processing time...
>
>
> On Fri, Jun 4, 2021 at 6:28 PM Eron Wright  .invalid>
> wrote:
>
> > Dawid, I think you're mischaracterizing the idleness signal as
> inherently a
> > heuristic, but Flink does not impose that.  A source-based watermark (and
> > corresponding idleness signal) may well be entirely data-driven, entirely
> > deterministic.  Basically you're underselling what the pipeline is
> capable
> > of, based on painful experiences with using the generic, heuristics-based
> > watermark assigner.  Please don't let those experiences overshadow what's
> > possible with source-based watermarking.
> >
> > The idleness signal does have a strict definition, it indicates whether
> the
> > stream is actively participating in advancing the event time clock.  The
> > status of all participants is considered when aggregating watermarks.  A
> > source subtask generally makes the determination based on data, e.g.
> > whether a topic is assigned to that subtask.
> >
> > We have here a modest proposal to add callbacks to the sink function for
> > information that the sink operator already receives.  The practical
> result
> > is improved correctness when used with streaming systems that have
> > first-class support for event time.  The specific changes may be
> previewed
> > here:
> > https://github.com/apache/flink/pull/15950
> > https://github.com/streamnative/flink/pull/2
> >
> > Thank you all for the robust discussion. Do I have your support to
> proceed
> > to enhance FLIP-167 with idleness callbacks and to proceed to a vote?
> >
> > Eron
> >
> >
> > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise  wrote:
> >
> > > While everything I wrote before is still valid, upon further
> rethinking,
> > I
> > > think that the conclusion is not necessarily correct:
> > > - If the user wants to have pipeline A and B behaving as if A+B was
> > jointly
> > > executed in the same pipeline without the intermediate Pulsar topic,
> > having
> > > the idleness in that topic is to only way to guarantee consistency.
> > > - We could support the following in the respective sources: If the user
> > > that wants to use a different definition of idleness in B, they can
> just
> > > provide a new idleness definition. At that point, we should discard the
> > > idleness in the intermediate topic while reading.
> > >
> > > If we would agree on the latter way, I think having the idleness in the
> > > topic is of great use because it's a piece of information that cannot
> be
> > > inferred as stated by others. Consequently, we would be able to support
> > all
> > > use cases and can give the user the freedom to express his intent.
> > >
> > >
> > > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise  wrote:
> > >
> > > > I think the core issue in this discussion is that we kind of assume
> > that
> > > > idleness is something universally well-defined. But it's not. It's a
> > > > heuristic to advance data processing in event time where we would
> lack
> > > data
> > > > to do so otherwise.
> > > > Keep in mind that idleness has no real definition in terms of event
> > time
> > > > and leads to severe unexpected results: If you reprocess a data
> stream
> > > with
> > > > temporarily idle partitions, these partitions would not be deemed
> idle
> > on
> > > > reprocessing and there is a realistic chance that records that were
> > > deemed
> > > > late in the live processing case are now perfectly fine records in
> the
> > > > reprocessing case. (I can expand on that if that was too short)
> > > >
> > > > With that in mind, why would a downstream process even try to
> calculate
> > > > the same idleness state as the upstream process? I don'

Re: [VOTE] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
Little update on this, more good discussion over the last few days, and the
FLIP will probably be amended to incorporate idleness.   Voting will remain
open until, let's say, mid-next week.

On Thu, Jun 3, 2021 at 8:00 AM Piotr Nowojski  wrote:

> I would like to ask you to hold on with counting the votes until I get an
> answer for my one question in the dev mailing list thread (sorry if it was
> already covered somewhere).
>
> Best, Piotrek
>
> czw., 3 cze 2021 o 16:12 Jark Wu  napisał(a):
>
> > +1 (binding)
> >
> > Best,
> > Jark
> >
> > On Thu, 3 Jun 2021 at 21:34, Dawid Wysakowicz 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 03/06/2021 03:50, Zhou, Brian wrote:
> > > > +1 (non-binding)
> > > >
> > > > Thanks Eron, looking forward to seeing this feature soon.
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > > > -Original Message-
> > > > From: Arvid Heise 
> > > > Sent: Wednesday, June 2, 2021 15:44
> > > > To: dev
> > > > Subject: Re: [VOTE] Watermark propagation with Sink API
> > > >
> > > >
> > > > [EXTERNAL EMAIL]
> > > >
> > > > +1 (binding)
> > > >
> > > > Thanks Eron for driving this effort; it will open up new exciting use
> > > cases.
> > > >
> > > > On Tue, Jun 1, 2021 at 7:17 PM Eron Wright  > > .invalid>
> > > > wrote:
> > > >
> > > >> After some good discussion about how to enhance the Sink API to
> > > >> process watermarks, I believe we're ready to proceed with a vote.
> > > >> Voting will be open until at least Friday, June 4th, 2021.
> > > >>
> > > >> Reference:
> > > >>
> > > >>
> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/displa
> > > >>
> y/FLINK/FLIP-167*3A*Watermarks*for*Sink*API__;JSsrKys!!LpKI!zkBBhuqEEi
> > > >> fxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-GCBb-PTq$
> > > >> [cwiki[.]apache[.]org]
> > > >>
> > > >> Discussion thread:
> > > >>
> > > >>
> > https://urldefense.com/v3/__https://lists.apache.org/thread.html/r5194
> > > >>
> e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e*40*3Cdev.flink.ap
> > > >> ache.org
> > *3E__;JSUl!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viF
> > > >> WXPLul-GJXlxwqN$ [lists[.]apache[.]org]
> > > >>
> > > >> Implementation Issue:
> > > >>
> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > > >>
> K-22700__;!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-G
> > > >> N6AJm4h$ [issues[.]apache[.]org]
> > > >>
> > > >> Thanks,
> > > >> Eron Wright
> > > >> StreamNative
> > > >>
> > >
> > >
> >
>


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
 Yes, I understand this point. But it can also be the other way around.
> >> There might be a large gap between record2 and record3, and users might
> >> prefer or might be not able to duplicate idleness detection logic. The
> >> downstream system might be lacking some kind of information (that is
> only
> >> available in the top level/ingesting system) to correctly set the idle
> >> status.
> >>
> >> Piotrek
> >>
> >> pt., 4 cze 2021 o 12:30 Dawid Wysakowicz 
> >> napisał(a):
> >>
> >> >
> >> > Same as Eron I don't follow this point. Any streaming sink can be used
> >> as
> >> > this kind of transient channel. Streaming sinks, like Kafka, are also
> >> used
> >> > to connect one streaming system with another one, also for an
> immediate
> >> > consumption.
> >> >
> >> > Sure it can, but imo it is rarely the primary use case why you want to
> >> > offload the channels to an external persistent system. Again in my
> >> > understanding StreamStatus is something transient, e.g. part of our
> >> > external system went offline. I think those kind of events should not
> be
> >> > persisted.
> >> >
> >> > Both watermarks and idleness status can be some
> >> > inherent property of the underlying data stream. if an
> >> upstream/ingesting
> >> > system knows that this particular stream/partition of a stream is
> going
> >> > idle (for example for a couple of hours), why does this information
> >> have to
> >> > be re-created in the downstream system using some heuristic? It could
> be
> >> > explicitly encoded.
> >> >
> >> > Because it's most certainly not true in the downstream. The idleness
> >> works
> >> > usually according to a heuristic: "We have not seen records for 5
> >> minutes,
> >> > so there is a fair chance we won't see records for the next 5 minutes,
> >> so
> >> > let's not wait for watermarks for now." That heuristic most certainly
> >> won't
> >> > hold for a downstream persistent storage.
> >> >
> >> > Imagine you're starting consuming from the result channel in a
> situation
> >> > were you have:
> >> >
> >> > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2,
> >> record1,
> >> > record0
> >> >
> >> > Switching to the encoded StreamStatus.IDLE is unnecessary, and might
> >> cause
> >> > the record3 and record4 to be late depending on how the watermark
> >> > progressed in other partitions.
> >> >
> >> > I understand Eron's use case, which is not about storing the
> >> StreamStatus,
> >> > but performing an immediate aggregation or said differently changing
> the
> >> > partitioning/granularity of records and watermarks externally to
> Flink.
> >> The
> >> > produced by Flink partitioning is actually never persisted in that
> >> case. In
> >> > this case I agree exposing the StreamStatus makes sense. I am still
> >> > concerned it will lead to storing the StreamStatus which can lead to
> >> many
> >> > subtle problems.
> >> > On 04/06/2021 11:53, Piotr Nowojski wrote:
> >> >
> >> > Hi,
> >> >
> >> > Thanks for picking up this discussion. For the record, I also think we
> >> > shouldn't expose latency markers.
> >> >
> >> > About the stream status
> >> >
> >> >
> >> >  Persisting the StreamStatus
> >> >
> >> > I don't agree with the view that sinks are "storing" the data/idleness
> >> > status. This nomenclature makes only sense if we are talking about
> >> > streaming jobs producing batch data.
> >> >
> >> >
> >> > In my understanding a StreamStatus makes sense only when talking about
> >> > immediately consumed transient channels such as between operators
> within
> >> > a single job.
> >> >
> >> > Same as Eron I don't follow this point. Any streaming sink can be used
> >> as
> >> > this kind of transient channel. Streaming sinks, like Kafka, are also
> >> used
> >> > to connect one streaming system with another one, also for an
> immediate
> >> > consumption.
> >> >
> >> > You cou

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
I believe that the correctness of watermarks and stream status markers is
determined entirely by the source (ignoring the generic assigner).  Such
stream elements are known not to overtake records, and aren't transient
from a pipeline perspective.  I do agree that recoveries may be lossy if
some operator state is transient (e.g. valve state).

Consider that status markers already affect the flow of watermarks (e.g.
suppression), and thus affect operator behavior.  Seems to me that exposing
the idleness state is no different than exposing a watermark.

The high-level story is, there is a need for the Flink job to be
transparent or neutral with respect to the event time clock.  I believe
this is possible if time flows with high fidelity from source to sink.  Of
course, one always has the choice as to whether to use source-based
watermarks; as you mentioned, requirements vary.

Regarding the Pulsar specifics, we're working on a community proposal that
I'm anxious to share.  To answer your question, the broker aggregates
watermarks from multiple producers who are writing to a single topic.
Each sink
subtask is a producer.  The broker considers each producer's assertions
(watermarks, idleness) to be independent inputs, much like the case with
the watermark valve.

On your concern about idleness causing false late events, I understand your
point but don't think it applies if the keyspace assignments are stable.

I hope this explains to your satisfaction.

- Eron





On Fri, Jun 4, 2021, 12:07 AM Dawid Wysakowicz 
wrote:

> Hi Eron,
>
> I might be missing some background on Pulsar partitioning but something
> seems off to me. What is the chunk/batch/partition that Pulsar brokers
> will additionally combine watermarks for? Isn't it the case that only a
> single Flink sub-task would write to such a chunk and thus will produce
> an aggregated watermark already via the writeWatermark method?
>
> Personally I am really skeptical about exposing the StreamStatus in any
> Producer API. In my understanding the StreamStatus is a transient
> setting of a consumer of data. StreamStatus is a mechanism for making a
> tradeoff between correctness (how many late elements that are behind
> watermark we have) vs making progress. IMO one has to be extra cautious
> when it comes to persistent systems. Again I might be missing the exact
> use case you are trying to solve here, but I can imagine multiple jobs
> reading from such a stream which might have different correctness
> requirements. Just quickly throwing an idea out of my head you might
> want to have an entirely correct results which can be delayed for
> minutes, and a separate task that produces quick insights within
> seconds. Another thing to consider is that by the time the downstream
> job starts consuming the upstream one might have produced records to the
> previously idle chunk. Persisting the StreamStatus in such a scenario
> would add unnecessary false late events.
>
> In my understanding a StreamStatus makes sense only when talking about
> immediately consumed transient channels such as between operators within
> a single job.
>
> Best,
>
> Dawid
>
> On 03/06/2021 23:31, Eron Wright wrote:
> > I think the rationale for end-to-end idleness (i.e. between pipelines) is
> > the same as the rationale for idleness between operators within a
> > pipeline.   On the 'main issue' you mentioned, we entrust the source with
> > adapting to Flink's notion of idleness (e.g. in Pulsar source, it means
> > that no topics/partitions are assigned to a given sub-task); a similar
> > adaption would occur in the sink.  In other words, I think it reasonable
> > that a sink for a watermark-aware storage system has need for the
> idleness
> > signal.
> >
> > Let me explain how I would use it in Pulsar's sink.  Each sub-task is a
> > Pulsar producer, and is writing watermarks to a configured topic via the
> > Producer API.  The Pulsar broker aggregates the watermarks that are
> written
> > by each producer into a global minimum (similar to StatusWatermarkValve).
> > The broker keeps track of which producers are actively producing
> > watermarks, and a producer may mark itself as idle to tell the broker not
> > to wait for watermarks from it, e.g. when a producer is going offline.  I
> > had intended to mark the producer as idle when the sub-task is closing,
> but
> > now I see that it would be insufficient; the producer should also be
> idled
> > if the sub-task is idled.  Otherwise, the broker would wait indefinitely
> > for the idled sub-task to produce a watermark.
> >
> > Arvid, I think your original instincts were correct about idleness
> > propagation, and I hope I've demonstrated a practical use case.
> >
> >
> >
&g

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-03 Thread Eron Wright
I think the rationale for end-to-end idleness (i.e. between pipelines) is
the same as the rationale for idleness between operators within a
pipeline.   On the 'main issue' you mentioned, we entrust the source with
adapting to Flink's notion of idleness (e.g. in Pulsar source, it means
that no topics/partitions are assigned to a given sub-task); a similar
adaption would occur in the sink.  In other words, I think it reasonable
that a sink for a watermark-aware storage system has need for the idleness
signal.

Let me explain how I would use it in Pulsar's sink.  Each sub-task is a
Pulsar producer, and is writing watermarks to a configured topic via the
Producer API.  The Pulsar broker aggregates the watermarks that are written
by each producer into a global minimum (similar to StatusWatermarkValve).
The broker keeps track of which producers are actively producing
watermarks, and a producer may mark itself as idle to tell the broker not
to wait for watermarks from it, e.g. when a producer is going offline.  I
had intended to mark the producer as idle when the sub-task is closing, but
now I see that it would be insufficient; the producer should also be idled
if the sub-task is idled.  Otherwise, the broker would wait indefinitely
for the idled sub-task to produce a watermark.

Arvid, I think your original instincts were correct about idleness
propagation, and I hope I've demonstrated a practical use case.



On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise  wrote:

> When I was rethinking the idleness issue, I came to the conclusion that it
> should be inferred at the source of the respective downstream pipeline
> again.
>
> The main issue on propagating idleness is that you would force the same
> definition across all downstream pipelines, which may not be what the user
> intended.
> On the other hand, I don't immediately see a technical reason why the
> downstream source wouldn't be able to infer that.
>
>
> On Thu, Jun 3, 2021 at 9:14 PM Eron Wright  .invalid>
> wrote:
>
> > Thanks Piotr for bringing this up.  I reflected on this and I agree we
> > should expose idleness, otherwise a multi-stage flow could stall.
> >
> > Regarding the latency markers, I don't see an immediate need for
> > propagating them, because they serve to estimate latency within a
> pipeline,
> > not across pipelines.  One would probably need to enhance the source
> > interface also to do e2e latency.  Seems we agree this aspect is out of
> > scope.
> >
> > I took a look at the code to get a sense of how to accomplish this.  The
> > gist is a new `markIdle` method on the `StreamOperator` interface, that
> is
> > called when the stream status maintainer (the `OperatorChain`)
> transitions
> > to idle state.  Then, a new `markIdle` method on the `SinkFunction` and
> > `SinkWriter` that is called by the respective operators.   Note that
> > StreamStatus is an internal class.
> >
> > Here's a draft PR (based on the existing PR of FLINK-22700) to highlight
> > this new aspect:
> > https://github.com/streamnative/flink/pull/2/files
> >
> > Please let me know if you'd like me to proceed to update the FLIP with
> > these details.
> >
> > Thanks again,
> > Eron
> >
> > On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi,
> > >
> > > Sorry for chipping in late in the discussion, but I would second this
> > point
> > > from Arvid:
> > >
> > > > 4. Potentially, StreamStatus and LatencyMarker would also need to be
> > > encoded.
> > >
> > > It seems like this point was asked, but not followed? Or did I miss it?
> > > Especially the StreamStatus part. For me it sounds like exposing
> > watermarks
> > > without letting the sink know that the stream can be idle is an
> > incomplete
> > > feature and can be very problematic/confusing for potential users.
> > >
> > > Best,
> > > Piotrek
> > >
> > > pon., 31 maj 2021 o 08:34 Arvid Heise  napisał(a):
> > >
> > > > Afaik everyone can start a [VOTE] thread [1]. For example, here a
> > > > non-committer started a successful thread [2].
> > > > If you start it, I can already cast a binding vote and we just need 2
> > > more
> > > > for the FLIP to be accepted.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting
> > > > [2]
> > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Depre

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-03 Thread Eron Wright
Thanks Piotr for bringing this up.  I reflected on this and I agree we
should expose idleness, otherwise a multi-stage flow could stall.

Regarding the latency markers, I don't see an immediate need for
propagating them, because they serve to estimate latency within a pipeline,
not across pipelines.  One would probably need to enhance the source
interface also to do e2e latency.  Seems we agree this aspect is out of
scope.

I took a look at the code to get a sense of how to accomplish this.  The
gist is a new `markIdle` method on the `StreamOperator` interface, that is
called when the stream status maintainer (the `OperatorChain`) transitions
to idle state.  Then, a new `markIdle` method on the `SinkFunction` and
`SinkWriter` that is called by the respective operators.   Note that
StreamStatus is an internal class.

Here's a draft PR (based on the existing PR of FLINK-22700) to highlight
this new aspect:
https://github.com/streamnative/flink/pull/2/files

Please let me know if you'd like me to proceed to update the FLIP with
these details.

Thanks again,
Eron

On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski  wrote:

> Hi,
>
> Sorry for chipping in late in the discussion, but I would second this point
> from Arvid:
>
> > 4. Potentially, StreamStatus and LatencyMarker would also need to be
> encoded.
>
> It seems like this point was asked, but not followed? Or did I miss it?
> Especially the StreamStatus part. For me it sounds like exposing watermarks
> without letting the sink know that the stream can be idle is an incomplete
> feature and can be very problematic/confusing for potential users.
>
> Best,
> Piotrek
>
> pon., 31 maj 2021 o 08:34 Arvid Heise  napisał(a):
>
> > Afaik everyone can start a [VOTE] thread [1]. For example, here a
> > non-committer started a successful thread [2].
> > If you start it, I can already cast a binding vote and we just need 2
> more
> > for the FLIP to be accepted.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting
> > [2]
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Deprecating-Mesos-support-td50142.html
> >
> > On Fri, May 28, 2021 at 8:17 PM Eron Wright  > .invalid>
> > wrote:
> >
> > > Arvid,
> > > Thanks for the feedback.  I investigated the japicmp configuration,
> and I
> > > see that SinkWriter is marked Experimental (not Public or
> > PublicEvolving).
> > > I think this means that SinkWriter need not be excluded.  As you
> > mentioned,
> > > SinkFunction is already excluded.  I've updated the FLIP with an
> > > explanation.
> > >
> > > I believe all issues are resolved.  May we proceed to a vote now?  And
> > are
> > > you able to drive the vote process?
> > >
> > > Thanks,
> > > Eron
> > >
> > >
> > > On Fri, May 28, 2021 at 4:40 AM Arvid Heise  wrote:
> > >
> > > > Hi Eron,
> > > >
> > > > 1. fair point. It still feels odd to have writeWatermark in the
> > > > SinkFunction (it's supposed to be functional as you mentioned), but I
> > > agree
> > > > that invokeWatermark is not better. So unless someone has a better
> > idea,
> > > > I'm fine with it.
> > > > 2.+3. I tried to come up with scenarios for a longer time. In
> general,
> > it
> > > > seems as if the new SinkWriter interface encourages more injection
> (see
> > > > processing time service in InitContext), such that the need for the
> > > context
> > > > is really just context information of that particular record and I
> > don't
> > > > see any use beyond timestamp and watermark. For SinkFunction, I'd not
> > > > over-engineer as it's going to be deprecated soonish. So +1 to leave
> it
> > > > out.
> > > > 4. Okay so I double-checked: from an execution perspective, it works.
> > > > However, japicmp would definitely complain. I propose to add it to
> the
> > > > compatibility section like this. We need to add an exception to
> > > SinkWriter
> > > > then. (SinkFunction is already on the exception list)
> > > > 5.+6. Awesome, I was also sure but wanted to double check.
> > > >
> > > > Best,
> > > >
> > > > Arvid
> > > >
> > > >
> > > > On Wed, May 26, 2021 at 7:29 PM Eron Wright  > > > .invalid>
> > > > wrote:
> > > >
> > > > > Arvid,
> > > > >
> > > > > 1.

[VOTE] Watermark propagation with Sink API

2021-06-01 Thread Eron Wright
After some good discussion about how to enhance the Sink API to process
watermarks, I believe we're ready to proceed with a vote.  Voting will be
open until at least Friday, June 4th, 2021.

Reference:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API

Discussion thread:
https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E

Implementation Issue:
https://issues.apache.org/jira/browse/FLINK-22700

Thanks,
Eron Wright
StreamNative


Re: [DISCUSS] Watermark propagation with Sink API

2021-05-28 Thread Eron Wright
Arvid,
Thanks for the feedback.  I investigated the japicmp configuration, and I
see that SinkWriter is marked Experimental (not Public or PublicEvolving).
I think this means that SinkWriter need not be excluded.  As you mentioned,
SinkFunction is already excluded.  I've updated the FLIP with an
explanation.

I believe all issues are resolved.  May we proceed to a vote now?  And are
you able to drive the vote process?

Thanks,
Eron


On Fri, May 28, 2021 at 4:40 AM Arvid Heise  wrote:

> Hi Eron,
>
> 1. fair point. It still feels odd to have writeWatermark in the
> SinkFunction (it's supposed to be functional as you mentioned), but I agree
> that invokeWatermark is not better. So unless someone has a better idea,
> I'm fine with it.
> 2.+3. I tried to come up with scenarios for a longer time. In general, it
> seems as if the new SinkWriter interface encourages more injection (see
> processing time service in InitContext), such that the need for the context
> is really just context information of that particular record and I don't
> see any use beyond timestamp and watermark. For SinkFunction, I'd not
> over-engineer as it's going to be deprecated soonish. So +1 to leave it
> out.
> 4. Okay so I double-checked: from an execution perspective, it works.
> However, japicmp would definitely complain. I propose to add it to the
> compatibility section like this. We need to add an exception to SinkWriter
> then. (SinkFunction is already on the exception list)
> 5.+6. Awesome, I was also sure but wanted to double check.
>
> Best,
>
> Arvid
>
>
> On Wed, May 26, 2021 at 7:29 PM Eron Wright  .invalid>
> wrote:
>
> > Arvid,
> >
> > 1. I assume that the method name `invoke` stems from considering the
> > SinkFunction to be a functional interface, but is otherwise meaningless.
> > Keeping it as `writeWatermark` does keep it symmetric with SinkWriter.
> My
> > vote is to leave it.  You decide.
> >
> > 2+3. I too considered adding a `WatermarkContext`, but it would merely
> be a
> > placeholder.  I don't anticipate any context info in future.  As we see
> > with invoke, it is possible to add a context later in a
> > backwards-compatible way.  My vote is to not introduce a context.  You
> > decide.
> >
> > 4. No anticipated compatibility issues.
> >
> > 5. Short answer, it works as expected.  The new methods are invoked
> > whenever the underlying operator receives a watermark.  I do believe that
> > batch and ingestion time applications receive watermarks. Seems the
> > programming model is more unified in that respect since 1.12 (FLIP-134).
> >
> > 6. The failure behavior is the same as for elements.
> >
> > Thanks,
> > Eron
> >
> > On Tue, May 25, 2021 at 12:42 PM Arvid Heise  wrote:
> >
> > > Hi Eron,
> > >
> > > I think the FLIP is crisp and mostly good to go. Some smaller
> > > things/questions:
> > >
> > >1. SinkFunction#writeWatermark could be named
> > >SinkFunction#invokeWatermark or invokeOnWatermark to keep it
> > symmetric.
> > >2. We could add the context parameter to both. For
> SinkWriter#Context,
> > >we currently do not gain much. SinkFunction#Context also exposes
> > > processing
> > >time, which may or may not be handy and is currently mostly used for
> > >StreamingFileSink bucket policies. We may add that processing time
> > flag
> > >also to SinkWriter#Context in the future.
> > >3. Alternatively, we could also add a different context parameter
> just
> > >to keep the API stable while allowing additional information to be
> > > passed
> > >in the future.
> > >4. Would we run into any compatibility issue if we use Flink 1.13
> > source
> > >in Flink 1.14 (with this FLIP) or vice versa?
> > >5. What happens with sinks that use the new methods in applications
> > that
> > >do not have watermarks (batch mode, processing time)? Does this also
> > > work
> > >with ingestion time sufficiently?
> > >6. How do exactly once sinks deal with written watermarks in case of
> > >failure? I guess it's the same as normal records. (Either rollback
> of
> > >transaction or deduplication on resumption)
> > >
> > > Best,
> > >
> > > Arvid
> > >
> > > On Tue, May 25, 2021 at 6:44 PM Eron Wright  > > .invalid>
> > > wrote:
> > >
> > > > Does anyone have further comment on FLIP-167?
> > > >
> > > >
> > >
> >
&g

Re: [DISCUSS] Watermark propagation with Sink API

2021-05-26 Thread Eron Wright
Arvid,

1. I assume that the method name `invoke` stems from considering the
SinkFunction to be a functional interface, but is otherwise meaningless.
Keeping it as `writeWatermark` does keep it symmetric with SinkWriter.  My
vote is to leave it.  You decide.

2+3. I too considered adding a `WatermarkContext`, but it would merely be a
placeholder.  I don't anticipate any context info in future.  As we see
with invoke, it is possible to add a context later in a
backwards-compatible way.  My vote is to not introduce a context.  You
decide.

4. No anticipated compatibility issues.

5. Short answer, it works as expected.  The new methods are invoked
whenever the underlying operator receives a watermark.  I do believe that
batch and ingestion time applications receive watermarks. Seems the
programming model is more unified in that respect since 1.12 (FLIP-134).

6. The failure behavior is the same as for elements.

Thanks,
Eron

On Tue, May 25, 2021 at 12:42 PM Arvid Heise  wrote:

> Hi Eron,
>
> I think the FLIP is crisp and mostly good to go. Some smaller
> things/questions:
>
>1. SinkFunction#writeWatermark could be named
>SinkFunction#invokeWatermark or invokeOnWatermark to keep it symmetric.
>2. We could add the context parameter to both. For SinkWriter#Context,
>we currently do not gain much. SinkFunction#Context also exposes
> processing
>time, which may or may not be handy and is currently mostly used for
>StreamingFileSink bucket policies. We may add that processing time flag
>also to SinkWriter#Context in the future.
>3. Alternatively, we could also add a different context parameter just
>to keep the API stable while allowing additional information to be
> passed
>in the future.
>4. Would we run into any compatibility issue if we use Flink 1.13 source
>in Flink 1.14 (with this FLIP) or vice versa?
>5. What happens with sinks that use the new methods in applications that
>do not have watermarks (batch mode, processing time)? Does this also
> work
>with ingestion time sufficiently?
>6. How do exactly once sinks deal with written watermarks in case of
>failure? I guess it's the same as normal records. (Either rollback of
>transaction or deduplication on resumption)
>
> Best,
>
> Arvid
>
> On Tue, May 25, 2021 at 6:44 PM Eron Wright  .invalid>
> wrote:
>
> > Does anyone have further comment on FLIP-167?
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> >
> > Thanks,
> > Eron
> >
> >
> > On Thu, May 20, 2021 at 5:02 PM Eron Wright 
> > wrote:
> >
> > > Filed FLIP-167: Watermarks for Sink API:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > >
> > > I'd like to call a vote next week, is that reasonable?
> > >
> > >
> > > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian  wrote:
> > >
> > >> Hi Arvid and Eron,
> > >>
> > >> Thanks for the discussion and I read through Eron's pull request and I
> > >> think this can benefit Pravega Flink connector as well.
> > >>
> > >> Here is some background. Pravega had the watermark concept through the
> > >> event stream since two years ago, and here is a blog introduction[1]
> for
> > >> Pravega watermark.
> > >> Pravega Flink connector also had this watermark integration last year
> > >> that we wanted to propagate the Flink watermark to Pravega in the
> > >> SinkFunction, and at that time we just used the existing Flink API
> that
> > we
> > >> keep the last watermark in memory and check if watermark changes for
> > each
> > >> event[2] which is not efficient. With such new interface, we can also
> > >> manage the watermark propagation much more easily.
> > >>
> > >> [1] https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
> > >> [2]
> > >>
> >
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
> > >>
> > >> -Original Message-
> > >> From: Arvid Heise 
> > >> Sent: Wednesday, May 19, 2021 16:06
> > >> To: dev
> > >> Subject: Re: [DISCUSS] Watermark propagation with Sink API
> > >>
> > >>
> > >> [EXTERNAL EMAIL]
> > >>
> > >> Hi Eron,
> > >>
> > >> Thanks for pushing that topic. I can now see that the benefit is even
> > >> bigger than 

Re: [DISCUSS] Watermark propagation with Sink API

2021-05-25 Thread Eron Wright
Does anyone have further comment on FLIP-167?
https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API

Thanks,
Eron


On Thu, May 20, 2021 at 5:02 PM Eron Wright  wrote:

> Filed FLIP-167: Watermarks for Sink API:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
>
> I'd like to call a vote next week, is that reasonable?
>
>
> On Wed, May 19, 2021 at 6:28 PM Zhou, Brian  wrote:
>
>> Hi Arvid and Eron,
>>
>> Thanks for the discussion and I read through Eron's pull request and I
>> think this can benefit Pravega Flink connector as well.
>>
>> Here is some background. Pravega had the watermark concept through the
>> event stream since two years ago, and here is a blog introduction[1] for
>> Pravega watermark.
>> Pravega Flink connector also had this watermark integration last year
>> that we wanted to propagate the Flink watermark to Pravega in the
>> SinkFunction, and at that time we just used the existing Flink API that we
>> keep the last watermark in memory and check if watermark changes for each
>> event[2] which is not efficient. With such new interface, we can also
>> manage the watermark propagation much more easily.
>>
>> [1] https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
>> [2]
>> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
>>
>> -Original Message-
>> From: Arvid Heise 
>> Sent: Wednesday, May 19, 2021 16:06
>> To: dev
>> Subject: Re: [DISCUSS] Watermark propagation with Sink API
>>
>>
>> [EXTERNAL EMAIL]
>>
>> Hi Eron,
>>
>> Thanks for pushing that topic. I can now see that the benefit is even
>> bigger than I initially thought. So it's worthwhile anyways to include that.
>>
>> I also briefly thought about exposing watermarks to all UDFs, but here I
>> really have an issue to see specific use cases. Could you maybe take a few
>> minutes to think about it as well? I could only see someone misusing Async
>> IO as a sink where a real sink would be more appropriate. In general, if
>> there is not a clear use case, we shouldn't add the functionality as it's
>> just increased maintenance for no value.
>>
>> If we stick to the plan, I think your PR is already in a good shape. We
>> need to create a FLIP for it though, since it changes Public interfaces
>> [1]. I was initially not convinced that we should also change the old
>> SinkFunction interface, but seeing how little the change is, I wouldn't
>> mind at all to increase consistency. Only when we wrote the FLIP and
>> approved it (which should be minimal and fast), we should actually look at
>> the PR ;).
>>
>> The only thing which I would improve is the name of the function.
>> processWatermark sounds as if the sink implementer really needs to
>> implement it (as you would need to do it on a custom operator). I would
>> make them symmetric to the record writing/invoking method (e.g.
>> writeWatermark and invokeWatermark).
>>
>> As a follow-up PR, we should then migrate KafkaShuffle to the new API.
>> But that's something I can do.
>>
>> [1]
>>
>> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
>> [cwiki[.]apache[.]org]
>>
>> On Wed, May 19, 2021 at 3:34 AM Eron Wright > .invalid>
>> wrote:
>>
>> > Update: opened an issue and a PR.
>> >
>> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
>> > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
>> > plbgRO4$ [issues[.]apache[.]org]
>> > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
>> > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
>> > $ [github[.]com]
>> >
>> >
>> > On Tue, May 18, 2021 at 10:03 AM Eron Wright 
>> > wrote:
>> >
>> > > Thanks Arvid and David for sharing your ideas on this subject.  I'm
>> > > glad to hear that you're seeing use cases for watermark propagation
>> > > via an enhanced sink interface.
>> > >
>> > > As you've guessed, my interest is in Pulsar and am exploring some
>> > > options for brokering watermarks across stream processing pipelines.
>> > > I think
>> > Arvid
>> > > is speaking to a high-fidelity solution where the difference between

Re: [DISCUSS] Watermark propagation with Sink API

2021-05-20 Thread Eron Wright
Filed FLIP-167: Watermarks for Sink API:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API

I'd like to call a vote next week, is that reasonable?


On Wed, May 19, 2021 at 6:28 PM Zhou, Brian  wrote:

> Hi Arvid and Eron,
>
> Thanks for the discussion and I read through Eron's pull request and I
> think this can benefit Pravega Flink connector as well.
>
> Here is some background. Pravega had the watermark concept through the
> event stream since two years ago, and here is a blog introduction[1] for
> Pravega watermark.
> Pravega Flink connector also had this watermark integration last year that
> we wanted to propagate the Flink watermark to Pravega in the SinkFunction,
> and at that time we just used the existing Flink API that we keep the last
> watermark in memory and check if watermark changes for each event[2] which
> is not efficient. With such new interface, we can also manage the watermark
> propagation much more easily.
>
> [1] https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
> [2]
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
>
> -Original Message-
> From: Arvid Heise 
> Sent: Wednesday, May 19, 2021 16:06
> To: dev
> Subject: Re: [DISCUSS] Watermark propagation with Sink API
>
>
> [EXTERNAL EMAIL]
>
> Hi Eron,
>
> Thanks for pushing that topic. I can now see that the benefit is even
> bigger than I initially thought. So it's worthwhile anyways to include that.
>
> I also briefly thought about exposing watermarks to all UDFs, but here I
> really have an issue to see specific use cases. Could you maybe take a few
> minutes to think about it as well? I could only see someone misusing Async
> IO as a sink where a real sink would be more appropriate. In general, if
> there is not a clear use case, we shouldn't add the functionality as it's
> just increased maintenance for no value.
>
> If we stick to the plan, I think your PR is already in a good shape. We
> need to create a FLIP for it though, since it changes Public interfaces
> [1]. I was initially not convinced that we should also change the old
> SinkFunction interface, but seeing how little the change is, I wouldn't
> mind at all to increase consistency. Only when we wrote the FLIP and
> approved it (which should be minimal and fast), we should actually look at
> the PR ;).
>
> The only thing which I would improve is the name of the function.
> processWatermark sounds as if the sink implementer really needs to
> implement it (as you would need to do it on a custom operator). I would
> make them symmetric to the record writing/invoking method (e.g.
> writeWatermark and invokeWatermark).
>
> As a follow-up PR, we should then migrate KafkaShuffle to the new API. But
> that's something I can do.
>
> [1]
>
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
> [cwiki[.]apache[.]org]
>
> On Wed, May 19, 2021 at 3:34 AM Eron Wright  .invalid>
> wrote:
>
> > Update: opened an issue and a PR.
> >
> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
> > plbgRO4$ [issues[.]apache[.]org]
> > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
> > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
> > $ [github[.]com]
> >
> >
> > On Tue, May 18, 2021 at 10:03 AM Eron Wright 
> > wrote:
> >
> > > Thanks Arvid and David for sharing your ideas on this subject.  I'm
> > > glad to hear that you're seeing use cases for watermark propagation
> > > via an enhanced sink interface.
> > >
> > > As you've guessed, my interest is in Pulsar and am exploring some
> > > options for brokering watermarks across stream processing pipelines.
> > > I think
> > Arvid
> > > is speaking to a high-fidelity solution where the difference between
> > intra-
> > > and inter-pipeline flow is eliminated.  My goal is more limited; I
> > > want
> > to
> > > write the watermark that arrives at the sink to Pulsar.  Simply
> > > imagine that Pulsar has native support for watermarking in its
> > > producer/consumer API, and we'll leave the details to another forum.
> > >
> > > David, I like your invariant.  I see lateness as stemming from the
> > problem
> > > domain and from system dynamics (e.g. scheduling, batching, lag).
> > > When
> 

Re: [DISCUSS] Watermark propagation with Sink API

2021-05-18 Thread Eron Wright
Update: opened an issue and a PR.

https://issues.apache.org/jira/browse/FLINK-22700
https://github.com/apache/flink/pull/15950


On Tue, May 18, 2021 at 10:03 AM Eron Wright 
wrote:

> Thanks Arvid and David for sharing your ideas on this subject.  I'm glad
> to hear that you're seeing use cases for watermark propagation via an
> enhanced sink interface.
>
> As you've guessed, my interest is in Pulsar and am exploring some options
> for brokering watermarks across stream processing pipelines.  I think Arvid
> is speaking to a high-fidelity solution where the difference between intra-
> and inter-pipeline flow is eliminated.  My goal is more limited; I want to
> write the watermark that arrives at the sink to Pulsar.  Simply imagine
> that Pulsar has native support for watermarking in its producer/consumer
> API, and we'll leave the details to another forum.
>
> David, I like your invariant.  I see lateness as stemming from the problem
> domain and from system dynamics (e.g. scheduling, batching, lag).  When one
> depends on order-of-observation to generate watermarks, the app may become
> unduly sensitive to dynamics which bear on order-of-observation.  My goal
> is to factor out the system dynamics from lateness determination.
>
> Arvid, to be most valuable (at least for my purposes) the enhancement is
> needed on SinkFunction.  This will allow us to easily evolve the existing
> Pulsar connector.
>
> Next step, I will open a PR to advance the conversation.
>
> Eron
>
> On Tue, May 18, 2021 at 5:06 AM David Morávek 
> wrote:
>
>> Hi Eron,
>>
>> Thanks for starting this discussion. I've been thinking about this
>> recently
>> as we've run into "watermark related" issues, when chaining multiple
>> pipelines together. My to cents to the discussion:
>>
>> How I like to think about the problem, is that there should an invariant
>> that holds for any stream processing pipeline: "NON_LATE element entering
>> the system, should never become LATE"
>>
>> Unfortunately this is exactly what happens in downstream pipelines,
>> because
>> the upstream one can:
>> - break ordering (especially with higher parallelism)
>> - emit elements that are ahead of output watermark
>>
>> There is not enough information to re-construct upstream watermark in
>> latter stages (it's always just an estimate based on previous pipeline's
>> output).
>>
>> It would be great, if we could have a general abstraction, that is
>> reusable
>> for various sources / sinks (not just Kafka / Pulsar, thought this would
>> probably cover most of the use-cases) and systems.
>>
>> Is there any other use-case then sharing watermark between pipelines, that
>> you're trying to solve?
>>
>> Arvid:
>>
>> 1. Watermarks are closely coupled to the used system (=Flink). I have a
>> > hard time imagining that it's useful to use a different stream processor
>> > downstream. So for now, I'm assuming that both upstream and downstream
>> are
>> > Flink applications. In that case, we probably define both parts of the
>> > pipeline in the same Flink job similar to KafkaStream's #through.
>> >
>>
>> I'd slightly disagree here. For example we're "materializing" change-logs
>> produced by Flink pipeline into serving layer (random access db / in
>> memory
>> view / ..) and we need to know, whether responses we serve meet the
>> "freshness" requirements (eg. you may want to respond differently, when
>> watermark is lagging way too much behind processing time). Also not every
>> stream processor in the pipeline needs to be Flink. It can as well be a
>> simple element-wise transformation that reads from Kafka and writes back
>> into separate topic (that's what we do for example with ML models, that
>> have special hardware requirements).
>>
>> Best,
>> D.
>>
>>
>> On Tue, May 18, 2021 at 8:30 AM Arvid Heise  wrote:
>>
>> > Hi Eron,
>> >
>> > I think this is a useful addition for storage systems that act as
>> > pass-through for Flink to reduce recovery time. It is only useful if you
>> > combine it with regional fail-over as only a small part of the pipeline
>> is
>> > restarted.
>> >
>> > A couple of thoughts on the implications:
>> > 1. Watermarks are closely coupled to the used system (=Flink). I have a
>> > hard time imagining that it's useful to use a different stream processor
>> > downstream. So for now, I'm assuming that both upstream and downstream
>> are
>> > Flink

[jira] [Created] (FLINK-22700) Propagate watermarks to Sink API

2021-05-18 Thread Eron Wright (Jira)
Eron Wright created FLINK-22700:
---

 Summary: Propagate watermarks to Sink API
 Key: FLINK-22700
 URL: https://issues.apache.org/jira/browse/FLINK-22700
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Eron Wright
Assignee: Eron Wright


Make it possible for sink functions / sink writers to propagate watermarks to 
external storage systems.

Note that sink functions obtain the current watermark upon receiving a record.  
This issue is about obtaining the watermark as it is received from upstream 
(i.e. not dependent on receipt of an event).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Watermark propagation with Sink API

2021-05-18 Thread Eron Wright
o need to be
> > encoded.
> > 5. It's important to have some way to transport backpressure from the
> > downstream to the upstream. Or else you would have the same issue as
> > KafkaStreams where two separate pipelines can drift so far away that you
> > experience data loss if the data retention period is smaller than the
> > drift.
> > 6. It's clear that you trade a huge chunk of throughput for lower overall
> > latency in case of failure. So it's an interesting feature for use cases
> > with SLAs.
> >
> > Since we are phasing out SinkFunction, I'd prefer to only support
> > SinkWriter. Having a no-op default sounds good to me.
> >
> > We have some experimental feature for Kafka [1], which pretty much
> reflects
> > your idea. Here we have an ugly workaround to be able to process the
> > watermark by using a custom StreamSink task. We could also try to create
> a
> > FLIP that abstracts the actual system away and then we could use the
> > approach for both Pulsar and Kafka.
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java#L103
> >
> >
> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
> >  wrote:
> >
> > > I would like to propose an enhancement to the Sink API, the ability to
> > > receive upstream watermarks.   I'm aware that the sink context provides
> > the
> > > current watermark for a given record.  I'd like to be able to write a
> > sink
> > > function that is invoked whenever the watermark changes.  Out of scope
> > > would be event-time timers (since sinks aren't keyed).
> > >
> > > For context, imagine that a stream storage system had the ability to
> > > persist watermarks in addition to ordinary elements, e.g. to serve as
> > > source watermarks in a downstream processor.  Ideally one could
> compose a
> > > multi-stage, event-driven application, with watermarks flowing
> end-to-end
> > > without need for a heuristics-based watermark at each stage.
> > >
> > > The specific proposal would be a new method on `SinkFunction` and/or on
> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark', with a
> > default
> > > implementation that does nothing.
> > >
> > > Thoughts?
> > >
> > > Thanks!
> > > Eron Wright
> > > StreamNative
> > >
> >
>


-- 

Eron Wright   Cloud Engineering Lead

p: +1 425 922 8617 <18163542939>

streamnative.io |  Meet with me
<https://calendly.com/eronwright/regular-1-hour>

<https://github.com/streamnative>
<https://www.linkedin.com/company/streamnative/>
<https://twitter.com/streamnativeio/>


[DISCUSS] Watermark propagation with Sink API

2021-05-17 Thread Eron Wright
I would like to propose an enhancement to the Sink API, the ability to
receive upstream watermarks.   I'm aware that the sink context provides the
current watermark for a given record.  I'd like to be able to write a sink
function that is invoked whenever the watermark changes.  Out of scope
would be event-time timers (since sinks aren't keyed).

For context, imagine that a stream storage system had the ability to
persist watermarks in addition to ordinary elements, e.g. to serve as
source watermarks in a downstream processor.  Ideally one could compose a
multi-stage, event-driven application, with watermarks flowing end-to-end
without need for a heuristics-based watermark at each stage.

The specific proposal would be a new method on `SinkFunction` and/or on
`SinkWriter`, called 'processWatermark' or 'writeWatermark', with a default
implementation that does nothing.

Thoughts?

Thanks!
Eron Wright
StreamNative


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2019-01-06 Thread Eron Wright
Thanks Timo for merging a couple of the PRs.   Are you also able to review
the others that I mentioned?  Xuefu I would like to incorporate your
feedback too.

Check out this short demonstration of using a catalog in SQL Client:
https://asciinema.org/a/C8xuAjmZSxCuApgFgZQyeIHuo

Thanks again!

On Thu, Jan 3, 2019 at 9:37 AM Eron Wright  wrote:

> Would a couple folks raise their hand to make a review pass thru the 6 PRs
> listed above?  It is a lovely stack of PRs that is 'all green' at the
> moment.   I would be happy to open follow-on PRs to rapidly align with
> other efforts.
>
> Note that the code is agnostic to the details of the ExternalCatalog
> interface; the code would not be obsolete if/when the catalog interface is
> enhanced as per the design doc.
>
>
>
> On Wed, Jan 2, 2019 at 1:35 PM Eron Wright  wrote:
>
>> I propose that the community review and merge the PRs that I posted, and
>> then evolve the design thru 1.8 and beyond.   I think having a basic
>> infrastructure in place now will accelerate the effort, do you agree?
>>
>> Thanks again!
>>
>> On Wed, Jan 2, 2019 at 11:20 AM Zhang, Xuefu 
>> wrote:
>>
>>> Hi Eron,
>>>
>>> Happy New Year!
>>>
>>> Thank you very much for your contribution, especially during the
>>> holidays. Wile I'm encouraged by your work, I'd also like to share my
>>> thoughts on how to move forward.
>>>
>>> First, please note that the design discussion is still finalizing, and
>>> we expect some moderate changes, especially around TableFactories. Another
>>> pending change is our decision to shy away from scala, which our work will
>>> be impacted by.
>>>
>>> Secondly, while your work seemed about plugging in catalogs definitions
>>> to the execution environment, which is less impacted by TableFactory
>>> change, I did notice some duplication of your work and ours. This is no big
>>> deal, but going forward, we should probable have a better communication on
>>> the work assignment so as to avoid any possible duplication of work. On the
>>> other hand, I think some of your work is interesting and valuable for
>>> inclusion once we finalize the overall design.
>>>
>>> Thus, please continue your research and experiment and let us know when
>>> you start working on anything so we can better coordinate.
>>>
>>> Thanks again for your interest and contributions.
>>>
>>> Thanks,
>>> Xuefu
>>>
>>>
>>>
>>> --
>>> From:Eron Wright 
>>> Sent At:2019 Jan. 1 (Tue.) 18:39
>>> To:dev ; Xuefu 
>>> Cc:Xiaowei Jiang ; twalthr ;
>>> piotr ; Fabian Hueske ;
>>> suez1224 ; Bowen Li 
>>> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>>>
>>> Hi folks, there's clearly some incremental steps to be taken to
>>> introduce catalog support to SQL Client, complementary to what is proposed
>>> in the Flink-Hive Metastore design doc.  I was quietly working on this over
>>> the holidays.   I posted some new sub-tasks, PRs, and sample code
>>> to FLINK-10744.
>>>
>>> What inspired me to get involved is that the catalog interface seems
>>> like a great way to encapsulate a 'library' of Flink tables and functions.
>>> For example, the NYC Taxi dataset (TaxiRides, TaxiFares, various UDFs) may
>>> be nicely encapsulated as a catalog (TaxiData).   Such a library should be
>>> fully consumable in SQL Client.
>>>
>>> I implemented the above.  Some highlights:
>>>
>>> 1. A fully-worked example of using the Taxi dataset in SQL Client via an
>>> environment file.
>>> - an ASCII video showing the SQL Client in action:
>>> https://asciinema.org/a/C8xuAjmZSxCuApgFgZQyeIHuo
>>>
>>> - the corresponding environment file (will be even more concise once
>>> 'FLINK-10696 Catalog UDFs' is merged):
>>> *https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/dist/conf/sql-client-defaults.yaml
>>> <https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/dist/conf/sql-client-defaults.yaml>*
>>>
>>> - the typed API for standalone table applications:
>>> *https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/java/com/dataartisans/flinktraining/examples/table_java/examples/ViaCatalog.java#L50
>>

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2019-01-03 Thread Eron Wright
Would a couple folks raise their hand to make a review pass thru the 6 PRs
listed above?  It is a lovely stack of PRs that is 'all green' at the
moment.   I would be happy to open follow-on PRs to rapidly align with
other efforts.

Note that the code is agnostic to the details of the ExternalCatalog
interface; the code would not be obsolete if/when the catalog interface is
enhanced as per the design doc.



On Wed, Jan 2, 2019 at 1:35 PM Eron Wright  wrote:

> I propose that the community review and merge the PRs that I posted, and
> then evolve the design thru 1.8 and beyond.   I think having a basic
> infrastructure in place now will accelerate the effort, do you agree?
>
> Thanks again!
>
> On Wed, Jan 2, 2019 at 11:20 AM Zhang, Xuefu 
> wrote:
>
>> Hi Eron,
>>
>> Happy New Year!
>>
>> Thank you very much for your contribution, especially during the
>> holidays. Wile I'm encouraged by your work, I'd also like to share my
>> thoughts on how to move forward.
>>
>> First, please note that the design discussion is still finalizing, and we
>> expect some moderate changes, especially around TableFactories. Another
>> pending change is our decision to shy away from scala, which our work will
>> be impacted by.
>>
>> Secondly, while your work seemed about plugging in catalogs definitions
>> to the execution environment, which is less impacted by TableFactory
>> change, I did notice some duplication of your work and ours. This is no big
>> deal, but going forward, we should probable have a better communication on
>> the work assignment so as to avoid any possible duplication of work. On the
>> other hand, I think some of your work is interesting and valuable for
>> inclusion once we finalize the overall design.
>>
>> Thus, please continue your research and experiment and let us know when
>> you start working on anything so we can better coordinate.
>>
>> Thanks again for your interest and contributions.
>>
>> Thanks,
>> Xuefu
>>
>>
>>
>> --
>> From:Eron Wright 
>> Sent At:2019 Jan. 1 (Tue.) 18:39
>> To:dev ; Xuefu 
>> Cc:Xiaowei Jiang ; twalthr ;
>> piotr ; Fabian Hueske ;
>> suez1224 ; Bowen Li 
>> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>>
>> Hi folks, there's clearly some incremental steps to be taken to introduce
>> catalog support to SQL Client, complementary to what is proposed in the
>> Flink-Hive Metastore design doc.  I was quietly working on this over the
>> holidays.   I posted some new sub-tasks, PRs, and sample code
>> to FLINK-10744.
>>
>> What inspired me to get involved is that the catalog interface seems like
>> a great way to encapsulate a 'library' of Flink tables and functions.  For
>> example, the NYC Taxi dataset (TaxiRides, TaxiFares, various UDFs) may be
>> nicely encapsulated as a catalog (TaxiData).   Such a library should be
>> fully consumable in SQL Client.
>>
>> I implemented the above.  Some highlights:
>>
>> 1. A fully-worked example of using the Taxi dataset in SQL Client via an
>> environment file.
>> - an ASCII video showing the SQL Client in action:
>> https://asciinema.org/a/C8xuAjmZSxCuApgFgZQyeIHuo
>>
>> - the corresponding environment file (will be even more concise once
>> 'FLINK-10696 Catalog UDFs' is merged):
>> *https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/dist/conf/sql-client-defaults.yaml
>> <https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/dist/conf/sql-client-defaults.yaml>*
>>
>> - the typed API for standalone table applications:
>> *https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/java/com/dataartisans/flinktraining/examples/table_java/examples/ViaCatalog.java#L50
>> <https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/java/com/dataartisans/flinktraining/examples/table_java/examples/ViaCatalog.java#L50>*
>>
>> 2. Implementation of the core catalog descriptor and factory.  I realize
>> that some renames may later occur as per the design doc, and would be happy
>> to do that as a follow-up.
>> https://github.com/apache/flink/pull/7390
>>
>> 3. Implementation of a connect-style API on TableEnvironment to use
>> catalog descriptor.
>> https://github.com/apache/flink/pull/7392
>>
>> 4. Integration into SQL-Client's environment file:
&

[jira] [Created] (FLINK-11247) Fix DESCRIBE command to support catalog tables

2019-01-01 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-11247:


 Summary: Fix DESCRIBE command to support catalog tables
 Key: FLINK-11247
 URL: https://issues.apache.org/jira/browse/FLINK-11247
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 


When the {{DESCRIBE}} command is applied to a catalog table, it fails with an 
error:

{code}
Flink SQL> DESCRIBE nyc.TaxiRides;  

  
[ERROR] Could not execute SQL statement. Reason:

  
org.apache.flink.table.api.TableException: Table 'nyc.TaxiRides' was not found.
{code}

The reason appears to be that {{LocalExecutor}} calls 
{{TableEnvironment::scan}} with the fully-qualified table name as a parameter 
(e.g. {{scan("nyc.TaxiRides")}}) rather than with an array of components (e.g. 
{{scan("nyc", "TaxiRides")}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2019-01-01 Thread Eron Wright
Hi folks, there's clearly some incremental steps to be taken to introduce
catalog support to SQL Client, complementary to what is proposed in the
Flink-Hive Metastore design doc.  I was quietly working on this over the
holidays.   I posted some new sub-tasks, PRs, and sample code
to FLINK-10744.

What inspired me to get involved is that the catalog interface seems like a
great way to encapsulate a 'library' of Flink tables and functions.  For
example, the NYC Taxi dataset (TaxiRides, TaxiFares, various UDFs) may be
nicely encapsulated as a catalog (TaxiData).   Such a library should be
fully consumable in SQL Client.

I implemented the above.  Some highlights:

1. A fully-worked example of using the Taxi dataset in SQL Client via an
environment file.
- an ASCII video showing the SQL Client in action:
https://asciinema.org/a/C8xuAjmZSxCuApgFgZQyeIHuo

- the corresponding environment file (will be even more concise once
'FLINK-10696 Catalog UDFs' is merged):
*https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/dist/conf/sql-client-defaults.yaml
*

- the typed API for standalone table applications:
*https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/java/com/dataartisans/flinktraining/examples/table_java/examples/ViaCatalog.java#L50
*

2. Implementation of the core catalog descriptor and factory.  I realize
that some renames may later occur as per the design doc, and would be happy
to do that as a follow-up.
https://github.com/apache/flink/pull/7390

3. Implementation of a connect-style API on TableEnvironment to use catalog
descriptor.
https://github.com/apache/flink/pull/7392

4. Integration into SQL-Client's environment file:
https://github.com/apache/flink/pull/7393

I realize that the overall Hive integration is still evolving, but I
believe that these PRs are a good stepping stone. Here's the list (in
bottom-up order):
- https://github.com/apache/flink/pull/7386
- https://github.com/apache/flink/pull/7388
- https://github.com/apache/flink/pull/7389
- https://github.com/apache/flink/pull/7390
- https://github.com/apache/flink/pull/7392
- https://github.com/apache/flink/pull/7393

Thanks and enjoy 2019!
Eron W


On Sun, Nov 18, 2018 at 3:04 PM Zhang, Xuefu 
wrote:

> Hi Xiaowei,
>
> Thanks for bringing up the question. In the current design, the properties
> for meta objects are meant to cover anything that's specific to a
> particular catalog and agnostic to Flink. Anything that is common (such as
> schema for tables, query text for views, and udf classname) are abstracted
> as members of the respective classes. However, this is still in discussion,
> and Timo and I will go over this and provide an update.
>
> Please note that UDF is a little more involved than what the current
> design doc shows. I'm still refining this part.
>
> Thanks,
> Xuefu
>
>
> --
> Sender:Xiaowei Jiang 
> Sent at:2018 Nov 18 (Sun) 15:17
> Recipient:dev 
> Cc:Xuefu ; twalthr ; piotr <
> pi...@data-artisans.com>; Fabian Hueske ; suez1224 <
> suez1...@gmail.com>
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>
> Thanks Xuefu for the detailed design doc! One question on the properties
> associated with the catalog objects. Are we going to leave them completely
> free form or we are going to set some standard for that? I think that the
> answer may depend on if we want to explore catalog specific optimization
> opportunities. In any case, I think that it might be helpful for
> standardize as much as possible into strongly typed classes and use leave
> these properties for catalog specific things. But I think that we can do it
> in steps.
>
> Xiaowei
> On Fri, Nov 16, 2018 at 4:00 AM Bowen Li  wrote:
> Thanks for keeping on improving the overall design, Xuefu! It looks quite
>  good to me now.
>
>  Would be nice that cc-ed Flink committers can help to review and confirm!
>
>
>
>  One minor suggestion: Since the last section of design doc already touches
>  some new sql statements, shall we add another section in our doc and
>  formalize the new sql statements in SQL Client and TableEnvironment that
>  are gonna come along naturally with our design? Here are some that the
>  design doc mentioned and some that I came up with:
>
>  To be added:
>
> - USE  - set default catalog
> - USE  - set default schema
> - SHOW CATALOGS - show all registered catalogs
> - SHOW SCHEMAS [FROM catalog] - list schemas in the current default
> catalog or the specified catalog
> - DESCRIBE VIEW 

[jira] [Created] (FLINK-11241) Enhance TableEnvironment to connect to a catalog via a descriptor

2018-12-31 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-11241:


 Summary: Enhance TableEnvironment to connect to a catalog via a 
descriptor
 Key: FLINK-11241
 URL: https://issues.apache.org/jira/browse/FLINK-11241
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Eron Wright 
Assignee: Eron Wright 
 Fix For: 1.8.0


Given FLINK-11240, extend {{TableEnvironment}} to connect to an external 
catalog via an {{ExternalCatalogDescriptor}}.   Consider extending the existing 
{{connect()}} method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11240) Implement external catalog factory and descriptor

2018-12-31 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-11240:


 Summary: Implement external catalog factory and descriptor
 Key: FLINK-11240
 URL: https://issues.apache.org/jira/browse/FLINK-11240
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Eron Wright 
Assignee: Eron Wright 
 Fix For: 1.8.0


Similar to the efforts done in FLINK-8240 and FLINK-8866, implement 
descriptor-based loading of external catalogs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11239) Enhance SQL-Client to recursively list UDFs

2018-12-31 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-11239:


 Summary: Enhance SQL-Client to recursively list UDFs
 Key: FLINK-11239
 URL: https://issues.apache.org/jira/browse/FLINK-11239
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Eron Wright 
 Fix For: 1.8.0


The SQL Client provides a "SHOW FUNCTIONS" to show all registered functions.  
Enhance it to show functions produced by an external catalog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11238) Enhance SQL-Client to recursively list tables

2018-12-31 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-11238:


 Summary: Enhance SQL-Client to recursively list tables
 Key: FLINK-11238
 URL: https://issues.apache.org/jira/browse/FLINK-11238
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Eron Wright 
Assignee: Eron Wright 
 Fix For: 1.8.0


The SQL Client provides a "SHOW TABLES" command.   Tables that are added via an 
external catalog should be listed (presently, only the root schema is listed).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11237) Enhance LocalExecutor to wrap TableEnvironment w/ user classloader

2018-12-31 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-11237:


 Summary: Enhance LocalExecutor to wrap TableEnvironment w/ user 
classloader
 Key: FLINK-11237
 URL: https://issues.apache.org/jira/browse/FLINK-11237
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Assignee: Eron Wright 


The SQL Client's {{LocalExecutor}} calls into the table environment to execute 
queries, explain statements, and much more.   Any call that involves resolving 
a descriptor to a factory implementation must be wrapped in the user 
classloader.   Some of the calls already are wrapped (for resolving UDFs).  
With new functionality coming for resolving external catalogs with a 
descriptor, other call sites must be wrapped.

Note that the {{TableEnvironment}} resolves the tables defined within an 
external catalog lazily (at query time).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11234) ExternalTableCatalogBuilder unable to build a batch-only table

2018-12-30 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-11234:


 Summary: ExternalTableCatalogBuilder unable to build a batch-only 
table
 Key: FLINK-11234
 URL: https://issues.apache.org/jira/browse/FLINK-11234
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL, Tests
Reporter: Eron Wright 
Assignee: Eron Wright 


The `ExternalTableCatalogBuilder::supportsBatch` method should set `isBatch` to 
`true and `isStreaming` to `false`, but the logic is presently inverted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [jira] [Created] (FLINK-10225) Cannot access state from a empty taskmanager

2018-08-28 Thread Eron Wright
Thanks for posting this issue, I also recently saw this.  The cause appears
to be that the TaskExecutor's queryable state proxy knows only about jobs
that are using slot(s) from that executor.

Note FLINK-10117 calls for queryable state to be exposed via the REST
endpoint, which may indirectly address this issue.

On Mon, Aug 27, 2018 at 5:15 AM Pierre Zemb (JIRA)  wrote:

> Pierre Zemb created FLINK-10225:
> ---
>
>  Summary: Cannot access state from a empty taskmanager
>  Key: FLINK-10225
>  URL: https://issues.apache.org/jira/browse/FLINK-10225
>  Project: Flink
>   Issue Type: Bug
>   Components: Queryable State
> Affects Versions: 1.6.0, 1.5.3
>  Environment: 4tm and 1jm for now on 1.6.0
> Reporter: Pierre Zemb
>
>
> Hi!
>
> I've started to deploy a small Flink cluster (4tm and 1jm for now on
> 1.6.0), and deployed a small job on it. Because of the current load, job is
> completely handled by a single tm. I've created a small proxy that is using
> [QueryableStateClient|
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
> to access the current state. It is working nicely, except under certain
> circumstances. It seems to me that I can only access the state through a
> node that is holding a part of the job. Here's an example:
>  * job on tm1. Pointing QueryableStateClient to tm1. State accessible
>
>  * job still on tm1. Pointing QueryableStateClient to tm2 (for example).
> State inaccessible
>
>  * killing tm1, job is now on tm2. State accessible
>
>  * job still on tm2. Pointing QueryableStateClient to tm3. State
> inaccessible
>
>  * adding some parallelism to spread job on tm1 and tm2. Pointing
> QueryableStateClient to either tm1 and tm2 is working
>
>  * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State
> inaccessible
>
> When the state is inaccessible, I can see this (generated [here|
> https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]
> ):
>
> {{java.lang.RuntimeException: Failed request 0. Caused by:
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
> not retrieve location of state=repo-status of
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state
> is not ready, or ii) the job does not exist. at
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
> at
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
> at
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
> at
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
> at
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)}}
>
>
>
> Went a bit through the (master branch) code. Class KvStateClientProxy is
> holding {color:#33}kvStateLocationOracle the key-value state location
> oracle for the given JobID. Here's the usage{color}{color:#33}:{color}
>
>
>  * {color:#33}updateKvStateLocationOracle() in
> registerQueryableState() (TaskExecutor.java){color}
>  * {color:#33}registerQueryableState() in associateWithJobManager()
> (TaskExecutor.java){color}
>  * {color:#33}associateWithJobManager in establishJobManagerConnection
> (TaskExecutor.java)
> {color}
>  * {color:#33}establishJobManagerConnection in
> jobManagerGainedLeadership (TaskExecutor.java)
> {color}
>  * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess
> (JobLeaderService.java){color}
>
> {color:#33}It seems that the KvStateLocationOracle map is updated only
> when the task manager is part of the job. {color}
>
> {color:#33}For now, we are creating a List> and
> getting the first CompletableFuture.succeeded future, but that is a
> workaround.{color}
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v7.6.3#76005)
>


Re: SQL Client Limitations

2018-08-25 Thread Eron Wright
I'd like to better understand how catalogs will work in SQL Client.   I
assume we'll be able to reference catalog classes from the environment file
(e.g. FLINK-9172).

Thanks

On Tue, Aug 21, 2018 at 4:56 AM Fabian Hueske  wrote:

> Hi Dominik,
>
> The SQL Client supports the same subset of SQL that you get with Java /
> Scala embedded queries.
> The documentation [1] covers all supported operations.
>
> There are some limitations because certain operators require special time
> attributes (row time or processing time attributes) which are monotonically
> increasing.
> Some operators such as a regular join (in contrast to a time-windowed join)
> remove the monotonicity property of time attributes such that time-based
> operations cannot be applied anymore.
>
> Best,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/sql.html
>
>
>
> 2018-08-21 13:27 GMT+02:00 Till Rohrmann :
>
> > Hi Dominik,
> >
> > I think such a list would be really helpful. I've pulled Timo and Fabian
> > into this conversation because they probably know more.
> >
> > Cheers,
> > Till
> >
> > On Mon, Aug 20, 2018 at 12:43 PM Dominik Wosiński 
> > wrote:
> >
> >> Hey,
> >>
> >> Do we have any list of current limitations of SQL Client available
> >> somewhere or the only way is to go through JIRA issues?
> >>
> >> For example:
> >> I tried to make Group By Tumble Window and Inner Join in one query and
> it
> >> seems that it is not possible currently and I was wondering whether it's
> >> and issue with my query or known limitation.
> >>
> >> Thanks,
> >> Best Regards,
> >> Dominik.
> >>
> >
>


Re: [Discuss] FLIP-26 - SSL Mutual Authentication

2018-07-27 Thread Eron Wright
 Kubernetes setups, where exposing the BlobServer
> and
> > querying the blob port causes quite some friction.
> >
> >   - Treating queryable state as "internal connectivity" is fine for now.
> > We should treat it as "external" connectivity in the future if we move it
> > to HTTP/REST.
> >
> >
> > *Internal Connectivity and SSL Mutual Authentication*
> >
> > Simply activating SSL mutual authentication for the internal
> communication
> > is a really low hanging fruit.
> >
> > Activating client authentication for Akka, network stack Netty (and Blob
> > Server/Client in Flink 1.6) should require no change in the
> configurations
> > with respect to Flink 1.4. All processes are, with respect to internal
> > communication, simultaneously server and client endpoints. Because of
> that,
> > they already need KeyStore and TrustStore files for SSL handshakes, where
> > the TrustStore needs to trust the KeyStore Certificate.
> >
> > I personally favor the suggestion made to have a script that generates a
> > self-signed certificate and adds it to "conf" and updates the
> > configuration. That should be picked up by the Yarn and Mesos clients
> > anyways.
> >
> >
> > *External Connectivity*
> >
> > There is a huge surface area and I think we need to give users a way to
> > plug in their own tools.
> > From what I see (and after some discussions with Patrick and Gary) I
> think
> > it makes sense to look at proxies in a broad way, similar to the approach
> > Eron outlined.
> >
> > The basic approach could be like that:
> >
> >   - Everything goes through HTTPS, so the proxy can work with HTTP
> headers.
> >   - The proxy handles authentication and possibly authorization. The
> proxy
> > adds some header, for example a user name, a group id, an authorization
> > token.
> >   - Flink can configure an implementation of an 'authorizer' or validator
> > on the headers to decide whether the request is valid.
> >
> >   - Example 1: The proxy does authentication and adds the user name /
> > group as a header. The the Flink-side authorizer simply checks whether
> the
> > name is in the config (simple ACL-style) scheme.
> >   - Example 2: The proxy adds an JSON Web Token and the authorizer
> > validates that token.
> >
> > For secure connections between the Proxy and the Flink Endpoint I would
> > follow Eron's suggestion, to use separate KeyStores and TrustStores than
> > for internal communication.
> >
> > For Yarn and Mesos, I would like to see if we could handle those again as
> > a special case of the proxies above:
> >   - DCOS Admin Router forwards the user authentication token, so that
> > could be another authorizer implementation.
> >   - In YARN we could see if can implement the IP filter via such an
> > authorizer.
> >
> >
> > *Hostname Verification*
> >
> > For internal communication, and especially on dynamic environments like
> > Kubernetes, it is very hard to work with certificates and have hostname
> > verification on.
> >
> > If we assume internal communication works strictly with a shared secret
> > certificate and with client authentication, does hostname verification
> > actually still add security in that particular setup? My understanding
> was
> > that hostname verification is important to not have some valid
> certificate
> > presented, but the one bound to the server you want to talk to. If we
> have
> > anyways one trusted certificate only, isn't that already implied?
> >
> > On the other hand, it is still possible (and potentially valuable) for
> > users in standalone mode to use keystores and truststores from a PKI, in
> > which case there may still be an argument in favor of hostname
> verification.
> >
> > On Thu, May 10, 2018, 02:30 Eron Wright  wrote:
> >
> >> Hello,
> >>
> >> Given that some SSL enhancement bugs have been posted lately, I took
> some
> >> time to revise FLIP-26 which explores how to harden both external and
> >> internal communication.
> >>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80453255
> >>
> >> Some recent related issues:
> >> - FLINK-9312 - mutual auth for intra-cluster communication
> >> - FLINK-5030 - original SSL feature work
> >>
> >> There's also some recent discussion of how to use Flink SSL effectively
> in
> >> a Kubernetes environment.   The issue is about hostname verification.
> The
> >> proposal that I've put forward in FLIP-26 is to not use hostname
> >> verification for intra-cluster communication, but rather to rely in a
> >> cluster-internal certificate and a truststore consisting only of that
> >> certificate.   Meanwhile, a new "external" certificate would be
> >> configurable for the web/api endpoint and associated with a well-known
> DNS
> >> name as provided by a K8s Service resource.
> >>
> >> Stephan is this in-line with your thinking re FLINK-9312?
> >>
> >> Thanks
> >> Eron
> >>
> >
>


[Discuss] FLIP-26 - SSL Mutual Authentication

2018-05-09 Thread Eron Wright
Hello,

Given that some SSL enhancement bugs have been posted lately, I took some
time to revise FLIP-26 which explores how to harden both external and
internal communication.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80453255

Some recent related issues:
- FLINK-9312 - mutual auth for intra-cluster communication
- FLINK-5030 - original SSL feature work

There's also some recent discussion of how to use Flink SSL effectively in
a Kubernetes environment.   The issue is about hostname verification.  The
proposal that I've put forward in FLIP-26 is to not use hostname
verification for intra-cluster communication, but rather to rely in a
cluster-internal certificate and a truststore consisting only of that
certificate.   Meanwhile, a new "external" certificate would be
configurable for the web/api endpoint and associated with a well-known DNS
name as provided by a K8s Service resource.

Stephan is this in-line with your thinking re FLINK-9312?

Thanks
Eron


[jira] [Created] (FLINK-9234) Commons Logging is missing from shaded Flink Table library

2018-04-22 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-9234:
---

 Summary: Commons Logging is missing from shaded Flink Table library
 Key: FLINK-9234
 URL: https://issues.apache.org/jira/browse/FLINK-9234
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.4.2
 Environment: jdk1.8.0_172
flink 1.4.2
Mac High Sierra
Reporter: Eron Wright 
 Attachments: repro.scala

The flink-table shaded library seems to be missing some classes from 
{{org.apache.commons.logging}} that are required by 
{{org.apache.commons.configuration}}.  Ran into the problem while using the 
external catalog support, on Flink 1.4.2.

See attached a repro, which produces:
{code}
Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/table/shaded/org/apache/commons/logging/Log
at 
org.apache.flink.table.catalog.ExternalTableSourceUtil$.parseScanPackagesFromConfigFile(ExternalTableSourceUtil.scala:153)
at 
org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala:55)
at 
org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala)
at 
org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:78)
at 
org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:82)
at 
org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:256)
at 
org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:561)
at 
org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:497)
at 
org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485)
at Repro$.main(repro.scala:17)
at Repro.main(repro.scala)
{code}

Dependencies:
{code}
compile 'org.slf4j:slf4j-api:1.7.25'
compile 'org.slf4j:slf4j-log4j12:1.7.25'
runtime 'log4j:log4j:1.2.17'

compile 'org.apache.flink:flink-scala_2.11:1.4.2'
compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.2'
compile 'org.apache.flink:flink-clients_2.11:1.4.2'
compile 'org.apache.flink:flink-table_2.11:1.4.2'
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Flink security improvements

2018-03-21 Thread Eron Wright
Please accept my apologies also.

On Mon, Mar 19, 2018 at 2:46 AM, Till Rohrmann  wrote:

> Hi Shuyi,
>
> sorry for the unresponsiveness on your proposal. Currently, the community
> is strongly focused on fixing and testing Flink 1.5 so that we can release
> it soon. My gut feeling is that the community will pick up the security
> improvements thread once most of the blocking issues are resolved. Please
> bear with us until then.
>
> Cheers,
> Till
>
> On Fri, Mar 9, 2018 at 1:05 AM, Shuyi Chen  wrote:
>
> > Ping :)
> >
> > On Wed, Feb 21, 2018 at 7:16 PM, Shuyi Chen  wrote:
> >
> > > Hi Eron, thanks a lot for taking a look at the proposal, the comments
> are
> > > very useful. I've updated the document to address your concerns. Could
> > you
> > > please help take another look, and suggest what the next step is?
> Highly
> > > appreciated.
> > >
> > > Shuyi
> > >
> > > On Thu, Feb 15, 2018 at 4:19 AM, Shuyi Chen 
> wrote:
> > >
> > >> Hi community,
> > >>
> > >> I would like to propose a few improvements in Flink security regarding
> > >> scalability and extensibility. Here is the proposal:
> > >>
> > >> https://docs.google.com/document/d/10V7LiNlUJKeKZ58mkR7oVv1t
> > >> 6BrC6TZi3FGf2Dm6-i8/edit?usp=sharing
> > >>
> > >> Comments are highly appreciated. Please let me know what the next step
> > >> will be.
> > >>
> > >> Thanks a lot
> > >> Shuyi
> > >>
> > >> --
> > >> "So you have to trust that the dots will somehow connect in your
> > future."
> > >>
> > >
> > >
> > >
> > > --
> > > "So you have to trust that the dots will somehow connect in your
> future."
> > >
> >
> >
> >
> > --
> > "So you have to trust that the dots will somehow connect in your future."
> >
>


Re: FLIP-6 / Kubernetes

2018-03-21 Thread Eron Wright
Thanks Stephan for the information, it's exciting.   This is an aspect for
which the FLIP-6 design doc is lacking detail; somewhere we should work
through how this will work (e.g. from a job recovery and upgrade
perspective).   If there's a doc or an open issue, please share, thanks!

On Wed, Mar 21, 2018 at 5:41 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Christophe!
>
> I could imagine 1.5.1 or 1.6.
>
> It is important to point out that you can use Flink with Kubernetes quite
> well already. That addition is mainly intended to make it more idiomatic
> for Kubernetes  users.
>
> Best,
> Stephan
>
>
> On Wed, Mar 21, 2018 at 12:03 PM, Christophe Jolif <cjo...@gmail.com>
> wrote:
>
> > Hi Stephan,
> >
> > > This feature did not make it for 1.5, but should come very
> > soon after.
> >
> > Are you talking about a 1.5.1 here? What is the envisioned timeline?
> >
> > Thanks!
> >
> >
> > On Tue, Mar 20, 2018 at 9:30 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > @Eron That is definitely the way we want to suggest as the way to use
> k8s
> > > in the future. This feature did not make it for 1.5, but should come
> very
> > > soon after.
> > >
> > > @Thomas An implementation of a ResourceManager for k8s should come in
> the
> > > near future. Would be happy to jump on a joint FLIP, after the 1.5
> > release
> > > and Flink Forward (in three weeks or so).
> > >
> > > On Tue, Mar 20, 2018 at 4:25 PM, Eron Wright <eronwri...@gmail.com>
> > wrote:
> > >
> > > >  Till, is it possible to package a Flink application as a
> > self-contained
> > > > deployment on Kubernetes?  I mean, run a Flink application using
> 'flink
> > > > run' such that it launches its own RM/JM and waits for a sufficient #
> > of
> > > > TMs to join?
> > > >
> > > > Thanks!
> > > >
> > > > On Mon, Mar 19, 2018 at 2:57 AM, Till Rohrmann <trohrm...@apache.org
> >
> > > > wrote:
> > > >
> > > > > I forgot to add Flink's K8 documentation [1] which might also be
> > > helpful
> > > > > with getting started.
> > > > >
> > > > > [1]
> > > > > https://ci.apache.org/projects/flink/flink-docs-
> > master/ops/deployment/
> > > > > kubernetes.html
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Mon, Mar 19, 2018 at 10:54 AM, Till Rohrmann <
> > trohrm...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi Thomas,
> > > > > >
> > > > > > I think the one way to get started would be to adapt the Flink
> > docker
> > > > > > images [1,2] to run with Flink 1.5. Per default, they will use
> the
> > > > Flip-6
> > > > > > components.
> > > > > >
> > > > > > Flink 1.5 won't come with a dedicated integration with Kubernetes
> > > which
> > > > > is
> > > > > > able to start new pods. However, it should work that you manually
> > or
> > > by
> > > > > the
> > > > > > virtues of an external system start new pods which can be used.
> > Once
> > > > the
> > > > > > new pods have been started and the TaskExecutors have registered
> > one
> > > > > would
> > > > > > have to rescale the job manually to the new parallelism.
> > > > > >
> > > > > > [1] https://flink.apache.org/news/2017/05/16/official-docker-
> > > > image.html
> > > > > > [2] https://hub.docker.com/r/_/flink/
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Sun, Mar 18, 2018 at 9:05 PM, Thomas Weise <t...@apache.org>
> > > wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >>
> > > > > >> What would be a good starting point to try out Flink on
> Kubernetes
> > > > (any
> > > > > >> examples / tutorials)?
> > > > > >>
> > > > > >> Also, will the FLIP-6 work in 1.5 enable dynamic scaling on
> > > > Kubernetes?
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Thomas
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Christophe
> >
>


Re: FLIP-6 / Kubernetes

2018-03-20 Thread Eron Wright
 Till, is it possible to package a Flink application as a self-contained
deployment on Kubernetes?  I mean, run a Flink application using 'flink
run' such that it launches its own RM/JM and waits for a sufficient # of
TMs to join?

Thanks!

On Mon, Mar 19, 2018 at 2:57 AM, Till Rohrmann  wrote:

> I forgot to add Flink's K8 documentation [1] which might also be helpful
> with getting started.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/
> kubernetes.html
>
> Cheers,
> Till
>
> On Mon, Mar 19, 2018 at 10:54 AM, Till Rohrmann 
> wrote:
>
> > Hi Thomas,
> >
> > I think the one way to get started would be to adapt the Flink docker
> > images [1,2] to run with Flink 1.5. Per default, they will use the Flip-6
> > components.
> >
> > Flink 1.5 won't come with a dedicated integration with Kubernetes which
> is
> > able to start new pods. However, it should work that you manually or by
> the
> > virtues of an external system start new pods which can be used. Once the
> > new pods have been started and the TaskExecutors have registered one
> would
> > have to rescale the job manually to the new parallelism.
> >
> > [1] https://flink.apache.org/news/2017/05/16/official-docker-image.html
> > [2] https://hub.docker.com/r/_/flink/
> >
> > Cheers,
> > Till
> >
> > On Sun, Mar 18, 2018 at 9:05 PM, Thomas Weise  wrote:
> >
> >> Hi,
> >>
> >> What would be a good starting point to try out Flink on Kubernetes (any
> >> examples / tutorials)?
> >>
> >> Also, will the FLIP-6 work in 1.5 enable dynamic scaling on Kubernetes?
> >>
> >> Thanks,
> >> Thomas
> >>
> >
> >
>


Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Eron Wright
It is valuable to consider the behavior of a consumer in both a real-time
processing context, which consists mostly of tail reads, and a historical
processing context, where there's an abundance of backlogged data.   In the
historical processing context, system internals (e.g. shard selection
logic) have an outsized influence on the order of observation and
potentially the progression of the event time clock.  In a real-time
context, the order of observation is, by definition, mostly determined by
the order in which events are produced.

My point is, it would be good to explore the efficacy of these improvements
in both contexts.




On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise <t...@apache.org> wrote:

> I don't think there is a generic solution to the problem you are
> describing; we don't know how long it will take for resharding to take
> effect and those changes to become visible to the connector. Depending on
> how latency sensitive the pipeline is, possibly a configurable watermark
> hold period could be used to cushion the event time chaos introduced by
> resharding.
>
> This isn't the primary motivation for the connector customization I'm
> working on though. We face issues with restart from older checkpoints where
> parent and child shards are consumed in parallel.
>
>
> --
> sent from mobile
>
>
> On Feb 12, 2018 4:36 PM, "Eron Wright" <eronwri...@gmail.com> wrote:
>
> I'd like to know how you envision dealing with resharding in relation to
> the watermark state.   Imagine that a given shard S1 has a watermark of T1,
> and is then split into two shards S2 and S3.   The new shards are assigned
> to subtasks according to a hash function.  The current watermarks of those
> subtasks could be far ahead of T1, and thus the events in S2/S3 will be
> considered late.
>
> The problem of a chaotic event time clock is exacerbated by any source that
> uses dynamic partitioning.  Would a per-shard watermark generator really
> solve the problem that is motivating you?
>
> Thanks,
> Eron
>
> On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise <t...@apache.org> wrote:
>
> > Based on my draft implementation, the changes that are needed in the
> Flink
> > connector are as follows:
> >
> > I need to be able to override the following to track last record
> timestamp
> > and idle time per shard.
> >
> > protected final void emitRecordAndUpdateState(T record, long
> > recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber)
> {
> > synchronized (checkpointLock) {
> > sourceContext.collectWithTimestamp(record, recordTimestamp);
> > updateState(shardStateIndex, lastSequenceNumber);
> > }
> > }
> >
> > Any objection removing final from it?
> >
> > Also, why is sourceContext.collectWithTimestamp in the synchronized
> block?
> > My custom class will need to emit watermarks - I assume there is no need
> to
> > acquire checkpointLock for that? Otherwise I would also need to add
> > emitWatermark() to the base class.
> >
> > Let me know if anything else should be considered, I will open a JIRA and
> > PR otherwise.
> >
> > Thanks,
> > Thomas
> >
> >
> > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote:
> >
> > > -->
> > >
> > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <
> tzuli...@apache.org
> > >
> > > wrote:
> > >
> > >> Regarding the two hooks you would like to be available:
> > >>
> > >>
> > >>- Provide hook to override discovery (not to hit Kinesis from every
> > >>subtask)
> > >>
> > >> Yes, I think we can easily provide a way, for example setting -1 for
> > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> > >> Though, the user would then have to savepoint and restore in order to
> > >> pick up new shards after a Kinesis stream reshard (which is in
> practice
> > the
> > >> best way to by-pass the Kinesis API rate limitations).
> > >> +1 to provide that.
> > >>
> > >
> > > I'm considering a customization of KinesisDataFetcher with override for
> > > discoverNewShardsToSubscribe. We still want shards to be discovered,
> just
> > > not by hitting Kinesis from every subtask.
> > >
> > >
> > >>
> > >>
> > >>- Provide hook to support custom watermark generation (somewhere
> > >>around KinesisDataFetcher.emitRecordAndUpdateState)
> > &

Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Eron Wright
I'd like to know how you envision dealing with resharding in relation to
the watermark state.   Imagine that a given shard S1 has a watermark of T1,
and is then split into two shards S2 and S3.   The new shards are assigned
to subtasks according to a hash function.  The current watermarks of those
subtasks could be far ahead of T1, and thus the events in S2/S3 will be
considered late.

The problem of a chaotic event time clock is exacerbated by any source that
uses dynamic partitioning.  Would a per-shard watermark generator really
solve the problem that is motivating you?

Thanks,
Eron

On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise  wrote:

> Based on my draft implementation, the changes that are needed in the Flink
> connector are as follows:
>
> I need to be able to override the following to track last record timestamp
> and idle time per shard.
>
> protected final void emitRecordAndUpdateState(T record, long
> recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
> synchronized (checkpointLock) {
> sourceContext.collectWithTimestamp(record, recordTimestamp);
> updateState(shardStateIndex, lastSequenceNumber);
> }
> }
>
> Any objection removing final from it?
>
> Also, why is sourceContext.collectWithTimestamp in the synchronized block?
> My custom class will need to emit watermarks - I assume there is no need to
> acquire checkpointLock for that? Otherwise I would also need to add
> emitWatermark() to the base class.
>
> Let me know if anything else should be considered, I will open a JIRA and
> PR otherwise.
>
> Thanks,
> Thomas
>
>
> On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise  wrote:
>
> > -->
> >
> > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai  >
> > wrote:
> >
> >> Regarding the two hooks you would like to be available:
> >>
> >>
> >>- Provide hook to override discovery (not to hit Kinesis from every
> >>subtask)
> >>
> >> Yes, I think we can easily provide a way, for example setting -1 for
> >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> >> Though, the user would then have to savepoint and restore in order to
> >> pick up new shards after a Kinesis stream reshard (which is in practice
> the
> >> best way to by-pass the Kinesis API rate limitations).
> >> +1 to provide that.
> >>
> >
> > I'm considering a customization of KinesisDataFetcher with override for
> > discoverNewShardsToSubscribe. We still want shards to be discovered, just
> > not by hitting Kinesis from every subtask.
> >
> >
> >>
> >>
> >>- Provide hook to support custom watermark generation (somewhere
> >>around KinesisDataFetcher.emitRecordAndUpdateState)
> >>
> >> Per-partition watermark generation on the Kinesis side is slightly more
> >> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> >> I think we need to additionally allow new shards to be consumed only
> >> after its parent shard is fully read, otherwise “per-shard time
> >> characteristics” can be broken because of this out-of-orderness
> consumption
> >> across the boundaries of a closed parent shard and its child.
> >> There theses JIRAs [1][2] which has a bit more details on the topic.
> >> Otherwise, in general I’m also +1 to providing this also in the Kinesis
> >> consumer.
> >>
> >
> > Here I'm thinking to customize emitRecordAndUpdateState (method would
> need
> > to be made non-final). Using getSubscribedShardsState with additional
> > transient state to keep track of watermark per shard and emit watermark
> as
> > appropriate.
> >
> > That's the idea - haven't written any code for it yet.
> >
> > Thanks,
> > Thomas
> >
> >
>


[jira] [Created] (FLINK-8541) Mesos RM should recover from failover timeout

2018-01-31 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-8541:
---

 Summary: Mesos RM should recover from failover timeout
 Key: FLINK-8541
 URL: https://issues.apache.org/jira/browse/FLINK-8541
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management, Mesos
Affects Versions: 1.3.0
Reporter: Eron Wright 
Assignee: Eron Wright 


When a framework disconnects unexpectedly from Mesos, the framework's Mesos 
tasks continue to run for a configurable period of time known as the failover 
timeout.   If the framework reconnects to Mesos after the timeout has expired, 
Mesos rejects the connection attempt.   It is expected that the framework 
discard the previous framework ID and then connect as a new framework.

When Flink is in this situation, the only recourse is to manually delete the ZK 
state where the framework ID kept.   Let's improve the logic of the Mesos RM to 
automate that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-01-31 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-8533:
---

 Summary: Support MasterTriggerRestoreHook state reinitialization
 Key: FLINK-8533
 URL: https://issues.apache.org/jira/browse/FLINK-8533
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.3.0
Reporter: Eron Wright 
Assignee: Eron Wright 


{{MasterTriggerRestoreHook}} enables coordination with an external system for 
taking or restoring checkpoints. When execution is restarted from a checkpoint, 
{{restoreCheckpoint}} is called to restore or reinitialize the external system 
state. There's an edge case where the external state is not adequately 
reinitialized, that is when execution fails _before the first checkpoint_. In 
that case, the hook is not invoked and has no opportunity to restore the 
external state to initial conditions.

The impact is a loss of exactly-once semantics in this case. For example, in 
the Pravega source function, the reader group state (e.g. stream position data) 
is stored externally. In the normal restore case, the reader group state is 
forcibly rewound to the checkpointed position. In the edge case where no 
checkpoint has yet been successful, the reader group state is not rewound and 
consequently some amount of stream data is not reprocessed.

A possible fix would be to introduce an {{initializeState}} method on the hook 
interface. Similar to {{CheckpointedFunction::initializeState}}, this method 
would be invoked unconditionally upon hook initialization. The Pravega hook 
would, for example, initialize or forcibly reinitialize the reader group state. 
   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Time-stamp of output from a window operator

2017-12-22 Thread Eron Wright
That is correct, the window operator happily accepts late elements into the
applicable window(s).  Though the element is late (with respect to the
current watermark) it might still be included in the main firing, or
provoke a late firing.

Elements occurring downstream of an operator that uses late firing must
undergo de-duplication by whatever means.  Other solutions: leverage the
side output feature of the window operator, or be more conservative with
your watermarks (decreasing complexity at the expense of latency).

-Eron

On Fri, Dec 22, 2017 at 3:59 PM, Jagadish  wrote:

> Thanks for the super-helpful reply!
>
> Would the late-firing from the "hourly" window be considered an on-time
> arrival for the subsequent "daily" window operator?   Since, its windowing
> duration is much longer (1 day), I'd expect the "on-time/default" result
> emitted from the "daily" window to include the late-firings previous hourly
> windows?
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


Re: Flink-Yarn-Kerberos integration

2017-12-22 Thread Eron Wright
I agree that it is reasonable to use Hadoop DTs as you describe.  That
approach is even recommended in YARN's documentation (see Securing
Long-lived YARN Services on the YARN Application Security page).   But one
of the goals of Kerberos integration is to support Kerberized data access
for connectors other than HDFS, such as Kafka, Cassandra, and
Elasticsearch.   So your second point makes sense too, suggesting a general
architecture for managing secrets (DTs, keytabs, certificates, oauth
tokens, etc.) within the cluster.

There's quite a few aspects to Flink security, including:
1. data access (e.g. how a connector authenticates to a data source)
2. service authorization and network security (e.g. how a Flink cluster
protects itself from unauthorized access)
3. multi-user support (e.g. multi-user Flink clusters, RBAC)

I mention these aspects to clarify your point about AuthN, which I took to
be related to (1).   Do tell if I misunderstood.

Eron


On Wed, Dec 20, 2017 at 11:21 AM, Shuyi Chen  wrote:

> Hi community,
>
> We are working on secure Flink on YARN. The current Flink-Yarn-Kerberos
> integration will require each container of a job to log in Kerberos via
> keytab every say, 24 hours, and does not use any Hadoop delegation token
> mechanism except when localizing the container. As I fixed the current
> Flink-Yarn-Kerberos (FLINK-8275) and tried to add more
> features(FLINK-7860), I have some concern regarding the current
> implementation. It can pose a scalability issue to the KDC, e.g., if YARN
> cluster is restarted and all 10s of thousands of containers suddenly DDOS
> KDC.
>
> I would like to propose to improve the current Flink-YARN-Kerberos
> integration by doing something like the following:
> 1) AppMaster (JobManager) periodically authenticate the KDC, get all
> required DTs for the job.
> 2) all other TM or TE containers periodically retrieve new DTs from the
> AppMaster (either through a secure HDFS folder, or a secure Akka channel)
>
> Also, we want to extend Flink to support pluggable AuthN mechanism, because
> we have our own internal AuthN mechanism. We would like add support in
> Flink to authenticate periodically to our internal AuthN service as well
> through, e.g., dynamic class loading, and use similar mechanism to
> distribute the credential from the appMaster to containers.
>
> I would like to get comments and feedbacks. I can also write a design doc
> or create a Flip if needed. Thanks a lot.
>
> Shuyi
>
>
>
> --
> "So you have to trust that the dots will somehow connect in your future."
>


Re: Time-stamp of output from a window operator

2017-12-22 Thread Eron Wright
Each event in the stream has an associated timestamp as metadata.  A
timestamp assigner simply extracts the timestamp from the user object for
that purpose.

There is no per-operator watermark, but with
`assignTimestampsAndWatermarks` you may insert an operator that overrides
upstream watermarks.  Watermarks are markers that flow through the stream
to inform operators about the progression of time.   Time-sensitive
operators (e.g. window operators) use the watermark to take action at
specific points in time, such as at the time boundary of a window.

When a window function is invoked for a given window, any elements emitted
by the function take the 'max timestamp' of the window.   For example, if a
given window spans 11:00 to 12:00, any elements produced by the window
function will have a timestamp of 12:00.   The window function is fired
when the current time (as indicated by the watermark) reaches 12:00.   Now,
imagine that an event arrives after 12:00 that has a timestamp of, say
11:55.   That record would be considered late, but logically still belongs
in the 11:00-12:00 window.   Assuming the window operator was configured
with allowed lateness of at least 5 minutes, the window would be
re-evaluated, and any elements produced would have a timestamp of 12:00.

You can chain together window operators.  For example, we described an
"hourly" window above.  A subsequent "daily" window could further aggregate
the results of each hour.If a late firing were to occur in the "hourly"
window operator, the subsequent "daily" window operator would observe a
late element and apply its own lateness logic.

Hope this helps!
Eron



On Wed, Dec 20, 2017 at 11:10 PM, Jagadish Venkatraman <
jagadish1...@gmail.com> wrote:

> Hey Flink developers,
>
> I'm trying to understand the behavior of timestamp assignments in Apache
> Flink.
>
> Here's my current understanding:
>
> 1. A *TimestampAssigner* returns the timestamp of each incoming message,
> while a *WatermarkAssigner* emits the watermark for a source. (If
> per-operator watermarks are defined, they over-ride the watermarks
> generated from the source)
> 2. For each operator, its watermark is determined as the minimum of all the
> incoming watermarks from its prior operators.
>
> I was not sure on how the *timestamp* of the output from an operator is
> determined? For instance, let's say, for a pane emitted from an
> *EventTimeTumblingWindow*, what is its "timestamp"?
>
> What is the timestamp of a late-arrival?
>
> If I perform another EventTime window, on the output from a previous
> window, is it defined?
>
> If there are pointers to the source code I need to look at, I'd appreciate
> that as well. Thank you for the help.
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>


Re: [DISCUSS] Service Authorization (SSL client authentication)

2017-12-19 Thread Eron Wright
Folks, what is the next step to formally submit a FLIP?i.e. assign a
number, drive to 'accepted' state?

Given the introduction of new RPC and REST endpoints with FLIP-6, now is a
good time to agree on the approach to securing Flink.

Thanks!
Eron

On Mon, Nov 27, 2017 at 11:52 AM, Eron Wright <eronwri...@gmail.com> wrote:

> I'd like to make some progress on hardening Flink using SSL client
> authentication.   Here's the FLIP proposal:
> https://docs.google.com/document/d/13IRPb2GdL842rIzMgEn0ibOQHNku6
> W8aMf1p7gCPJjg/edit?usp=sharing
>
> 1. What is the next step to have this FLIP be accepted?
> 2. Does anyone have any objections to the technical plan?
>
> Thanks!
> Eron Wright
> Dell EMC
>
>
>


[jira] [Created] (FLINK-8265) Missing jackson dependency for flink-mesos

2017-12-14 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-8265:
---

 Summary: Missing jackson dependency for flink-mesos
 Key: FLINK-8265
 URL: https://issues.apache.org/jira/browse/FLINK-8265
 Project: Flink
  Issue Type: Bug
  Components: Mesos
Affects Versions: 1.4.0
Reporter: Eron Wright 
Assignee: Eron Wright 
Priority: Critical
 Fix For: 1.4.1


The Jackson library that is required by Fenzo is missing from the Flink 
distribution jar-file.

This manifests as an exception in certain circumstances when a hard constraint 
is configured ("mesos.constraints.hard.hostattribute").

{code}
NoClassDefFoundError: 
org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
at 
com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
at 
com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8247) Support Hadoop-free variant of Flink on Mesos

2017-12-12 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-8247:
---

 Summary: Support Hadoop-free variant of Flink on Mesos
 Key: FLINK-8247
 URL: https://issues.apache.org/jira/browse/FLINK-8247
 Project: Flink
  Issue Type: Bug
  Components: Mesos
Affects Versions: 1.4.0
Reporter: Eron Wright 


In Hadoop-free mode, Hadoop isn't on the classpath.  The Mesos job manager 
normally uses the Hadoop UserGroupInformation class to overlay a user context 
(`HADOOP_USER_NAME`) for the task managers.  

Detect the absence of Hadoop and skip over the `HadoopUserOverlay`, similar to 
the logic in `HadoopModuleFactory`.This may require the introduction of an 
overlay factory.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: TaskManager job lifecycle hooks

2017-12-07 Thread Eron Wright
Could you speak to whether the lifecycle provided by RichFunction
(open/close) would fit the requirement?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/api/common/functions/RichFunction.html#open-org.apache.flink.configuration.Configuration-

On Thu, Dec 7, 2017 at 1:57 PM, Ben Sidhom 
wrote:

> Hey,
>
> I'm working on the Apache Beam  portability
> story
> and trying to figure out how we can get the Flink runner to support
> the new portability
> API .
>
> In order to get the runner to work with portable SDKs, we need to be able
> to spin up and manage user containers from the TaskManagers themselves. All
> communication with user code (effectively user-defined functions) happens
> over RPC endpoints between the container and the Flink worker threads.
> Unfortunately, we cannot assume that the container images themselves are
> small or that they are cheap to start up. For this reason, we cannot
> reasonably start and stop these external services once per task (e.g., by
> wrapping service lifetimes within mapPartions). In order to support
> multiple jobs per JVM (either due to multiple task slots per manager or
> multiple jobs submitted to a cluster serially) , we need to know when to
> clean up resources associated with a particular job.
>
> Is there a way to do this in user code? Ideally, this would be something
> like a set of per-job startup and shutdown hooks that execute on each
> TaskManager that a particular job runs on. If this does not currently
> exist, how reasonable would it be to introduce client-facing APIs that
> would allow it? Is there a better approach for this lifecycle management
> that better fits into the Flink execution model?
>
> Thanks
> --
> -Ben
>


Re: [VOTE] Release 1.4.0, release candidate #3

2017-12-07 Thread Eron Wright
Just discovered:  the removal of Flink's Future (FLINK-7252) causes a
breaking change in connectors that use
`org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook`, because
`Future` is a type on one of the methods.

To my knowledge, this affects only the Pravega connector.  Curious to know
whether any other connectors are affected.  I don't think we (Dell EMC)
consider it a blocker but it will mean that the connector is Flink 1.4+.

Eron


On Thu, Dec 7, 2017 at 12:25 PM, Aljoscha Krettek 
wrote:

> I just noticed that I did a copy-and-paste error and the last paragraph
> about voting period should be this:
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Best,
> Aljoscha
>
> > On 7. Dec 2017, at 19:24, Bowen Li  wrote:
> >
> > I agree that it shouldn't block the release. The doc website part is even
> > better!
> >
> > On Thu, Dec 7, 2017 at 1:09 AM, Aljoscha Krettek 
> > wrote:
> >
> >> Good catch, yes. This shouldn't block the release, though, since the doc
> >> is always built form the latest state of a release branch, i.e. the 1.4
> doc
> >> on the website will update as soon as the doc on the release-1.4 branch
> is
> >> updated.
> >>
> >>> On 6. Dec 2017, at 20:47, Bowen Li  wrote:
> >>>
> >>> Hi Aljoscha,
> >>>
> >>> I found Flink's State doc and javaDoc are very ambiguous on what the
> >>> replacement of FoldingState is, which will confuse a lot of users. We
> >> need
> >>> to fix it in 1.4 release.
> >>>
> >>> I have submitted a PR at https://github.com/apache/flink/pull/5129
> >>>
> >>> Thanks,
> >>> Bowen
> >>>
> >>>
> >>> On Wed, Dec 6, 2017 at 5:56 AM, Aljoscha Krettek 
> >>> wrote:
> >>>
>  Hi everyone,
> 
>  Please review and vote on release candidate #3 for the version 1.4.0,
> as
>  follows:
>  [ ] +1, Approve the release
>  [ ] -1, Do not approve the release (please provide specific comments)
> 
> 
>  The complete staging area is available for your review, which
> includes:
>  * JIRA release notes [1],
>  * the official Apache source release and binary convenience releases
> to
> >> be
>  deployed to dist.apache.org[2], which are signed with the key with
>  fingerprint F2A67A8047499BBB3908D17AA8F4FD97121D7293 [3],
>  * all artifacts to be deployed to the Maven Central Repository [4],
>  * source code tag "release-1.4.0-rc1" [5],
>  * website pull request listing the new release [6].
> 
>  Please have a careful look at the website PR because I changed some
>  wording and we're now also releasing a binary without Hadoop
> >> dependencies.
> 
>  Please use this document for coordinating testing efforts: [7]
> 
>  The only change between RC1 and this RC2 is that the source release
>  package does not include the erroneously included binary Ruby
> >> dependencies
>  of the documentation anymore. Because of this I would like to propose
> a
>  shorter voting time and close the vote around the time that RC1 would
> >> have
>  closed. This would mean closing by end of Wednesday. Please let me
> know
> >> if
>  you disagree with this. The vote is adopted by majority approval, with
> >> at
>  least 3 PMC affirmative votes.
> 
>  Thanks,
>  Your friendly Release Manager
> 
>  [1] https://issues.apache.org/jira/secure/ReleaseNote.jspa?
>  projectId=12315522=12340533
>  [2] http://people.apache.org/~aljoscha/flink-1.4.0-rc3/
>  [3] https://dist.apache.org/repos/dist/release/flink/KEYS
>  [4] https://repository.apache.org/content/repositories/
> >> orgapacheflink-1141
>  [5] https://git-wip-us.apache.org/repos/asf?p=flink.git;a=tag;h=
>  8fb9635dd2e64dbb20887c84f646f02034b57cb1
>  [6] https://github.com/apache/flink-web/pull/95
>  [7] https://docs.google.com/document/d/1cOkycJwEKVjG_
>  onnpl3bQNTq7uebh48zDtIJxceyU2E/edit?usp=sharing
> 
>  Pro-tip: you can create a settings.xml file with these contents:
> 
>  
>  
>  flink-1.4.0
>  
>  
>  
>  flink-1.4.0
>  
>    
>  flink-1.4.0
>  
>  https://repository.apache.org/content/repositories/
>  orgapacheflink-1141/
>  
>    
>    
>  archetype
>  
>  https://repository.apache.org/content/repositories/
>  orgapacheflink-1141/
>  
>    
>  
>  
>  
>  
> 
>  And reference that in you maven commands via --settings
>  path/to/settings.xml. This is useful for creating a quickstart based
> on
> >> the
>  staged release and for building against the staged jars.
> >>
> >>
>
>


Re: [VOTE] Release 1.4.0, release candidate #2

2017-12-03 Thread Eron Wright
Update: PR for "FLINK-8174 - Mesos RM unable to accept offers for
unreserved resources" is ready for review.  We were able to solve (3),
supporting any combination of reserved and unreserved resources.

On Fri, Dec 1, 2017 at 2:24 PM, Eron Wright <eronwri...@gmail.com> wrote:

> There are three levels of support we could land on.
> 1. Flink works with unreserved resources (revert FLINK-7294).
> 2. Flink works with unreserved resources, and correctly ignores reserved
> resources (revert FLINK-7294 and mitigate Fenzo bug).
> 3. Flink works with unreserved resources and reserved resources.
>
> 3 is a moon shot.  Striving for 2.  Fallback on 1.
>
>
>
> On Fri, Dec 1, 2017 at 2:10 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Thanks for the update!
>>
>> Just to be clear, you're proposing going forward with the "simple fix" of
>> reverting FLINK-7294?
>>
>> > On 1. Dec 2017, at 18:39, Eron Wright <eronwri...@gmail.com> wrote:
>> >
>> > Update on reported Mesos issue (FLINK-8174):
>> >
>> > TLDR; a PR will be ready within 24 hours that will undo reservation
>> support.
>> >
>> > A couple of months ago, a fix (FLINK-7294) was merged related to how
>> Flink
>> > accepts Mesos resource offers.  The intention was to allow Flink to make
>> > use of so-called +reserved+ resources, a Mesos feature which makes it
>> > possible to reserve hosts for use by a specific framework/role.  The fix
>> > inadvertently regressed the ability to use +unreserved+ resources.
>> This is
>> > a serious regression because unreserved resources are the common case.
>> >
>> > The simple solution is to revert the earlier fix, deferring support for
>> > reservations to another release.   We are spending some time to find a
>> fix
>> > that works for all scenarios, but seems unlikely at this time.   I am
>> > reaching out to the original contributor to get their feedback.
>> >
>> > In the course of the investigation, a related flaw was discovered in
>> Fenzo
>> > that causes Flink to misinterpret offers that contain a mix of reserved
>> and
>> > unreserved resources.   I believe that a small fix is possible purely
>> > within Flink; an update to Fenzo does not appear necessary.
>> >
>> > Going forward, we will contribute an improved integration test suite
>> with
>> > which to test Flink under diverse Mesos conditions (e.g. reservations).
>> >
>> > Thanks,
>> > Eron
>> >
>> > On Thu, Nov 30, 2017 at 9:47 PM, Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org>
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> I’ve noticed a behavioral regression in the Kafka producer, that should
>> >> also be considered a blocker: https://issues.apache.org/
>> >> jira/browse/FLINK-8181
>> >> There’s already a PR for the issue here: https://github.com/
>> >> apache/flink/pull/5108
>> >>
>> >> Best,
>> >> Gordon
>> >>
>> >> On 30 November 2017 at 5:27:22 PM, Fabian Hueske (fhue...@gmail.com)
>> >> wrote:
>> >>
>> >> I've created a JIRA issue for the the Hadoop 2.9.0 build problem [1].
>> >>
>> >> Best, Fabian
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-8177
>> >>
>> >> 2017-11-30 4:35 GMT+01:00 Eron Wright <eronwri...@gmail.com>:
>> >>
>> >>> Unfortunately we've identified a blocker bug for Flink on Mesos -
>> >>> FLINK-8174. We'll have a patch ready on Thursday.
>> >>>
>> >>> Thanks,
>> >>> Eron
>> >>>
>> >>> On Wed, Nov 29, 2017 at 3:40 PM, Eron Wright <eronwri...@gmail.com>
>> >> wrote:
>> >>>
>> >>>> On Dell EMC side, we're testing the RC2 on DCOS 1.10.0. Seeing a
>> >>>> potential issue with offer acceptance and we'll update the thread
>> with
>> >> a
>> >>> +1
>> >>>> or with a more concrete issue within 24 hours.
>> >>>>
>> >>>> Thanks,
>> >>>> Eron
>> >>>>
>> >>>> On Wed, Nov 29, 2017 at 6:54 AM, Chesnay Schepler <
>> ches...@apache.org>
>> >>>> wrote:
>> >>>>
>> >>>>> I don't think anyone has taken a look yet, nor was there a
>> discussion
>> >&g

Re: [VOTE] Release 1.4.0, release candidate #2

2017-12-01 Thread Eron Wright
There are three levels of support we could land on.
1. Flink works with unreserved resources (revert FLINK-7294).
2. Flink works with unreserved resources, and correctly ignores reserved
resources (revert FLINK-7294 and mitigate Fenzo bug).
3. Flink works with unreserved resources and reserved resources.

3 is a moon shot.  Striving for 2.  Fallback on 1.



On Fri, Dec 1, 2017 at 2:10 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Thanks for the update!
>
> Just to be clear, you're proposing going forward with the "simple fix" of
> reverting FLINK-7294?
>
> > On 1. Dec 2017, at 18:39, Eron Wright <eronwri...@gmail.com> wrote:
> >
> > Update on reported Mesos issue (FLINK-8174):
> >
> > TLDR; a PR will be ready within 24 hours that will undo reservation
> support.
> >
> > A couple of months ago, a fix (FLINK-7294) was merged related to how
> Flink
> > accepts Mesos resource offers.  The intention was to allow Flink to make
> > use of so-called +reserved+ resources, a Mesos feature which makes it
> > possible to reserve hosts for use by a specific framework/role.  The fix
> > inadvertently regressed the ability to use +unreserved+ resources.  This
> is
> > a serious regression because unreserved resources are the common case.
> >
> > The simple solution is to revert the earlier fix, deferring support for
> > reservations to another release.   We are spending some time to find a
> fix
> > that works for all scenarios, but seems unlikely at this time.   I am
> > reaching out to the original contributor to get their feedback.
> >
> > In the course of the investigation, a related flaw was discovered in
> Fenzo
> > that causes Flink to misinterpret offers that contain a mix of reserved
> and
> > unreserved resources.   I believe that a small fix is possible purely
> > within Flink; an update to Fenzo does not appear necessary.
> >
> > Going forward, we will contribute an improved integration test suite with
> > which to test Flink under diverse Mesos conditions (e.g. reservations).
> >
> > Thanks,
> > Eron
> >
> > On Thu, Nov 30, 2017 at 9:47 PM, Tzu-Li (Gordon) Tai <
> tzuli...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> I’ve noticed a behavioral regression in the Kafka producer, that should
> >> also be considered a blocker: https://issues.apache.org/
> >> jira/browse/FLINK-8181
> >> There’s already a PR for the issue here: https://github.com/
> >> apache/flink/pull/5108
> >>
> >> Best,
> >> Gordon
> >>
> >> On 30 November 2017 at 5:27:22 PM, Fabian Hueske (fhue...@gmail.com)
> >> wrote:
> >>
> >> I've created a JIRA issue for the the Hadoop 2.9.0 build problem [1].
> >>
> >> Best, Fabian
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-8177
> >>
> >> 2017-11-30 4:35 GMT+01:00 Eron Wright <eronwri...@gmail.com>:
> >>
> >>> Unfortunately we've identified a blocker bug for Flink on Mesos -
> >>> FLINK-8174. We'll have a patch ready on Thursday.
> >>>
> >>> Thanks,
> >>> Eron
> >>>
> >>> On Wed, Nov 29, 2017 at 3:40 PM, Eron Wright <eronwri...@gmail.com>
> >> wrote:
> >>>
> >>>> On Dell EMC side, we're testing the RC2 on DCOS 1.10.0. Seeing a
> >>>> potential issue with offer acceptance and we'll update the thread with
> >> a
> >>> +1
> >>>> or with a more concrete issue within 24 hours.
> >>>>
> >>>> Thanks,
> >>>> Eron
> >>>>
> >>>> On Wed, Nov 29, 2017 at 6:54 AM, Chesnay Schepler <ches...@apache.org
> >
> >>>> wrote:
> >>>>
> >>>>> I don't think anyone has taken a look yet, nor was there a discussion
> >> as
> >>>>> to postponing it.
> >>>>>
> >>>>> It just slipped through the cracks i guess...
> >>>>>
> >>>>>
> >>>>> On 29.11.2017 15:47, Gyula Fóra wrote:
> >>>>>
> >>>>>> Hi guys,
> >>>>>> I ran into this again while playing with savepoint/restore
> >> parallelism:
> >>>>>>
> >>>>>> https://issues.apache.org/jira/browse/FLINK-7595
> >>>>>> https://github.com/apache/flink/pull/4651
> >>>>>>
> >>>>>> Anyone has some idea about the status of this PR or w

Re: [VOTE] Release 1.4.0, release candidate #2

2017-12-01 Thread Eron Wright
Update on reported Mesos issue (FLINK-8174):

TLDR; a PR will be ready within 24 hours that will undo reservation support.

A couple of months ago, a fix (FLINK-7294) was merged related to how Flink
accepts Mesos resource offers.  The intention was to allow Flink to make
use of so-called +reserved+ resources, a Mesos feature which makes it
possible to reserve hosts for use by a specific framework/role.  The fix
inadvertently regressed the ability to use +unreserved+ resources.  This is
a serious regression because unreserved resources are the common case.

The simple solution is to revert the earlier fix, deferring support for
reservations to another release.   We are spending some time to find a fix
that works for all scenarios, but seems unlikely at this time.   I am
reaching out to the original contributor to get their feedback.

In the course of the investigation, a related flaw was discovered in Fenzo
that causes Flink to misinterpret offers that contain a mix of reserved and
unreserved resources.   I believe that a small fix is possible purely
within Flink; an update to Fenzo does not appear necessary.

Going forward, we will contribute an improved integration test suite with
which to test Flink under diverse Mesos conditions (e.g. reservations).

Thanks,
Eron

On Thu, Nov 30, 2017 at 9:47 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> I’ve noticed a behavioral regression in the Kafka producer, that should
> also be considered a blocker: https://issues.apache.org/
> jira/browse/FLINK-8181
> There’s already a PR for the issue here: https://github.com/
> apache/flink/pull/5108
>
> Best,
> Gordon
>
> On 30 November 2017 at 5:27:22 PM, Fabian Hueske (fhue...@gmail.com)
> wrote:
>
> I've created a JIRA issue for the the Hadoop 2.9.0 build problem [1].
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8177
>
> 2017-11-30 4:35 GMT+01:00 Eron Wright <eronwri...@gmail.com>:
>
> > Unfortunately we've identified a blocker bug for Flink on Mesos -
> > FLINK-8174. We'll have a patch ready on Thursday.
> >
> > Thanks,
> > Eron
> >
> > On Wed, Nov 29, 2017 at 3:40 PM, Eron Wright <eronwri...@gmail.com>
> wrote:
> >
> > > On Dell EMC side, we're testing the RC2 on DCOS 1.10.0. Seeing a
> > > potential issue with offer acceptance and we'll update the thread with
> a
> > +1
> > > or with a more concrete issue within 24 hours.
> > >
> > > Thanks,
> > > Eron
> > >
> > > On Wed, Nov 29, 2017 at 6:54 AM, Chesnay Schepler <ches...@apache.org>
> > > wrote:
> > >
> > >> I don't think anyone has taken a look yet, nor was there a discussion
> as
> > >> to postponing it.
> > >>
> > >> It just slipped through the cracks i guess...
> > >>
> > >>
> > >> On 29.11.2017 15:47, Gyula Fóra wrote:
> > >>
> > >>> Hi guys,
> > >>> I ran into this again while playing with savepoint/restore
> parallelism:
> > >>>
> > >>> https://issues.apache.org/jira/browse/FLINK-7595
> > >>> https://github.com/apache/flink/pull/4651
> > >>>
> > >>> Anyone has some idea about the status of this PR or were we planning
> to
> > >>> postpone this to 1.5?
> > >>>
> > >>> Thanks,
> > >>> Gyula
> > >>>
> > >>>
> > >>> Fabian Hueske <fhue...@gmail.com> ezt írta (időpont: 2017. nov. 29.,
> > >>> Sze,
> > >>> 13:10):
> > >>>
> > >>> OK, the situation is the following:
> > >>>>
> > >>>> The test class (org.apache.flink.yarn.UtilsTest) implements a
> Hadoop
> > >>>> interface (Container) that was extended in Hadoop 2.9.0 by a getter
> > and
> > >>>> setter.
> > >>>> By adding the methods, we can compile Flink for Hadoop 2.9.0.
> However,
> > >>>> the
> > >>>> getter/setter add a dependency on a class that was also added in
> > Hadoop
> > >>>> 2.9.0.
> > >>>> Therefore, the implementation is not backwards compatible with
> Hadoop
> > >>>> versions < 2.9.0.
> > >>>>
> > >>>> Not sure how we can fix the problem. We would need two version of
> the
> > >>>> class
> > >>>> that are chosen based on the Hadoop version. Do we have something
> like
> > >>>> that
> > >>>> somewhere else?
> > >>>>

Re: [VOTE] Release 1.4.0, release candidate #2

2017-11-29 Thread Eron Wright
Unfortunately we've identified a blocker bug for Flink on Mesos -
FLINK-8174.We'll have a patch ready on Thursday.

Thanks,
Eron

On Wed, Nov 29, 2017 at 3:40 PM, Eron Wright <eronwri...@gmail.com> wrote:

> On Dell EMC side, we're testing the RC2 on DCOS 1.10.0.   Seeing a
> potential issue with offer acceptance and we'll update the thread with a +1
> or with a more concrete issue within 24 hours.
>
> Thanks,
> Eron
>
> On Wed, Nov 29, 2017 at 6:54 AM, Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> I don't think anyone has taken a look yet, nor was there a discussion as
>> to postponing it.
>>
>> It just slipped through the cracks i guess...
>>
>>
>> On 29.11.2017 15:47, Gyula Fóra wrote:
>>
>>> Hi guys,
>>> I ran into this again while playing with savepoint/restore parallelism:
>>>
>>> https://issues.apache.org/jira/browse/FLINK-7595
>>> https://github.com/apache/flink/pull/4651
>>>
>>> Anyone has some idea about the status of this PR or were we planning to
>>> postpone this to 1.5?
>>>
>>> Thanks,
>>> Gyula
>>>
>>>
>>> Fabian Hueske <fhue...@gmail.com> ezt írta (időpont: 2017. nov. 29.,
>>> Sze,
>>> 13:10):
>>>
>>> OK, the situation is the following:
>>>>
>>>> The test class (org.apache.flink.yarn.UtilsTest) implements a Hadoop
>>>> interface (Container) that was extended in Hadoop 2.9.0 by a getter and
>>>> setter.
>>>> By adding the methods, we can compile Flink for Hadoop 2.9.0. However,
>>>> the
>>>> getter/setter add a dependency on a class that was also added in Hadoop
>>>> 2.9.0.
>>>> Therefore, the implementation is not backwards compatible with Hadoop
>>>> versions < 2.9.0.
>>>>
>>>> Not sure how we can fix the problem. We would need two version of the
>>>> class
>>>> that are chosen based on the Hadoop version. Do we have something like
>>>> that
>>>> somewhere else?
>>>>
>>>> Since this is only a problem in a test class, Flink 1.4.0 might still
>>>> work
>>>> very well with Hadoop 2.9.0.
>>>> However, this has not been tested AFAIK.
>>>>
>>>> Cheers, Fabian
>>>>
>>>> 2017-11-29 12:47 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>> I just tried to build the release-1.4 branch for Hadoop 2.9.0 (released
>>>>> a
>>>>> few days ago) and got a compilation failure in a test class.
>>>>>
>>>>> Right now, I'm assessing how much we need to fix to support Hadoop
>>>>> 2.9.0.
>>>>> I'll report later.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2017-11-29 11:16 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>>>>>
>>>>> Agreed, this is a regression compared to the previous functionality. I
>>>>>> updated the issue to "Blocker".
>>>>>>
>>>>>> On 29. Nov 2017, at 10:01, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I have found the following issue:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-8165
>>>>>>>
>>>>>>> I would say this is a blocker (I personally pass the ParameterTool
>>>>>>> all
>>>>>>>
>>>>>> over
>>>>>>
>>>>>>> the place in my production apps), but a pretty trivial issue to fix,
>>>>>>>
>>>>>> we
>>>>
>>>>> can
>>>>>>
>>>>>>> wait a little to find other potential problems.
>>>>>>>
>>>>>>> I can submit a fix in a little bit.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gyula
>>>>>>>
>>>>>>> Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2017.
>>>>>>>
>>>>>> nov.
>>>>
>>>>> 29., Sze, 9:23):
>>>>>>>
>>>>>>> +1
>>>>>>>>
>>>>>>>> Verified:
>>>>>>>> - No missing release Maven artifacts
>>>>>>>> - Staged Apache sourc

[jira] [Created] (FLINK-8174) Mesos RM unable to accept offers for unreserved resources

2017-11-29 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-8174:
---

 Summary: Mesos RM unable to accept offers for unreserved resources
 Key: FLINK-8174
 URL: https://issues.apache.org/jira/browse/FLINK-8174
 Project: Flink
  Issue Type: Bug
  Components: Mesos
Affects Versions: 1.4.0, 1.3.3
Reporter: Eron Wright 
Assignee: Eron Wright 
Priority: Blocker
 Fix For: 1.4.0


Flink has suffered a regression due to FLINK-7294.   Any attempt to accept a 
resource offer that is based on unreserved resources will fail, because Flink 
(as of FLINK-7294) erroneously insists that the resource come from a prior 
reservation.

Looking at the original issue, the problem may have been misdiagnosed.  Ideally 
Flink should work with both reserved and unreserved resources, but the latter 
is a more common situation that is now broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[DISCUSS] Service Authorization (SSL client authentication)

2017-11-27 Thread Eron Wright
I'd like to make some progress on hardening Flink using SSL client
authentication.   Here's the FLIP proposal:
https://docs.google.com/document/d/13IRPb2GdL842rIzMgEn0ibOQHNku6W8aMf1p7gCPJjg/edit?usp=sharing

1. What is the next step to have this FLIP be accepted?
2. Does anyone have any objections to the technical plan?

Thanks!
Eron Wright
Dell EMC


[jira] [Created] (FLINK-7869) Don't use PipelineErrorHandler on the client

2017-10-18 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7869:
---

 Summary: Don't use PipelineErrorHandler on the client
 Key: FLINK-7869
 URL: https://issues.apache.org/jira/browse/FLINK-7869
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 


The `PipelineErrorHandler` is being used in both the server and client 
pipeline, but produces an HTTP response upon error (which obviously doesn't 
make sense on the client).

I believe that the `AbstractRestHandler` is best suited to producing an HTTP 
error response.   Keep in mind that the channel might not be in a state where 
an HTTP response makes sense, e.g. after a web socket has been handshaked.   I 
suggest that the last handler only log and close the channel.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] Releasing Flink 1.4

2017-10-16 Thread Eron Wright
+1 from our side on this plan.

On Mon, Oct 16, 2017 at 3:33 AM, Fabian Hueske  wrote:

> OK, sounds good to me.
>
> We have a couple of bugs to fix for the Table API / SQL but have PRs for
> most of them.
>
> There's only one major issue that I'd like to include in 1.4.0 which is a
> refactoring of the TableSource interface.
> This effort has already started and is currently waiting for reviews /
> comments.
> I'm quite confident that we can get it in within the next two weeks.
>
> Cheers, Fabian
>
> 2017-10-16 10:22 GMT+02:00 Aljoscha Krettek :
>
> > @Bowen I started marking essential stuff as blocking (with fixVersion
> > 1.4.0). You're right, that we should start moving things to 1.5.0 that
> are
> > not blocking and that we don't think will make it into 1.4.0. I think we
> > can only release 1.4.0 if there are 0 (zero) unresolved issues with
> > fixVersion 1.4.0.
> >
> > > On 14. Oct 2017, at 07:34, Alexandru Gutan 
> > wrote:
> > >
> > > great
> > >
> > > On 13 October 2017 at 18:02, Zhijiang(wangzhijiang999) <
> > > wangzhijiang...@aliyun.com> wrote:
> > >
> > >> totally agree with the way.--
> > >> 发件人:Stephan Ewen <
> > se...@apache.org
> > >>> 发送时间:2017年10月13日(星期五) 21:29收件人:dev@flink.apache.org <
> > dev@flink.apache.org>主
> > >> 题:Re: [DISCUSS] Releasing Flink 1.4
> > >> I am in favor of doing this, if we can set it up in the following way.
> > >>
> > >>  - We put out the 1.4 release now, as Till and Aljoscha suggested. A
> > >> stable cut before the fundamental changes go in.
> > >>
> > >>  - We merge the very big changes (FLIP-6, Network stack, localized
> state
> > >> restore, etc). directly (or very soon) after.
> > >>  - We try to stabilize these changes and release 1.5 asap after that.
> > >> Ideally Around end of year or so.
> > >>
> > >> The reason I am bringing this up is that I know various users waiting
> > very
> > >> much for FLIP-6 and Network Stack enhancements. Given that these
> issues
> > >> were flagged for release 1.4, the users were planning to have them
> > rather
> > >> soon.
> > >>
> > >> Stephan
> > >>
> > >>
> > >> On Fri, Oct 13, 2017 at 2:35 PM, Aljoscha Krettek <
> aljos...@apache.org>
> > >> wrote:
> > >>
> > >>> +1 Excellent
> > >>>
> > >>> I'd like to volunteer as release manager. I already set
> > >> up a Kanban board
> > >>> to monitor the open blocking (and non-blocking) issues
> > >> for 1.4, though this
> > >>> is independent of me volunteering as release manager. We
> > >> should all go over
> > >>> these issues and see which ones should actually be blockin
> > >> g and which ones
> > >>> are not yet on that list.
> > >>>
> >  On 13. Oct 2017, at 12:24, Renjie Liu 
> > wrote:
> > 
> >  Cool!!!
> > 
> >  On Fri, Oct 13, 2017 at 5:49 PM Till Rohrmann  >
> > >>> wrote:
> > 
> > > Hi all,
> > >
> > > I want to revive the discussion about releasing Flink 1.4 [1] and
> the
> > >>> set
> > > of features to include.
> > >
> > > The gist of the previous discussion was that we postponed the
> feature
> > > freeze for 1.4 in order to include some more features w
> > >> hich were being
> > > developed. By now, we have completed a good set of features such as
> > >>> exactly
> > > once Kafka producer, reduced dependency footprint, Hado
> > >> op-free Flink and
> > > many bug fixes. I believe that these features will
> > >> make good release and
> > > users are already waiting for them.
> > >
> > > Some of the other features which we wanted to include,
> > >> mainly Flip-6, to
> > > some extent the network stack enhancements and the state decoupling
> > >>> still
> > > need some more time. Since these features are major
> > >> changes to Flink's
> > > runtime, it would be in my opinion a good idea to cut a
> > >> stable release
> > >>> with
> > > the above-mentioned feature set now and give the engine
> > >> features a bit
> > >>> more
> > > time to ripen and be properly tested.
> > >
> > > Therefore, I would actually be in favour of aiming for
> > >> a quick release
> > > meaning that we now concentrate mainly on fixing bugs and critical
> > >>> issues.
> > > Moreover, I'm optimistic that the delayed features will be
> completed
> > >>> soon
> > > such that we can deliver them with the next release. Wh
> > >> at do you think?
> > >
> > > [1]
> > >
> > > http://apache-flink-mailing-list-archive.1008284.n3.
> > >>> nabble.com/DISCUSS-Flink-1-4-and-time-based-release-td19331.html
> > >
> > > Cheers,
> > > Till
> > >
> >  --
> >  Liu, Renjie
> >  Software Engineer, MVAD
> > >>>
> > >>>
> > >>
> > >>
> >
> >
>


[jira] [Created] (FLINK-7753) HandlerUtils should close the channel on error responses

2017-10-02 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7753:
---

 Summary: HandlerUtils should close the channel on error responses
 Key: FLINK-7753
 URL: https://issues.apache.org/jira/browse/FLINK-7753
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Assignee: Eron Wright 
Priority: Minor


Unexpected errors in the server pipeline correctly cause a 500 error response.  
 I suggest that such responses also close the channel rather than allowing 
keep-alive.   This would be a better security posture too since we don't know 
if the pipeline is corrupt following an unexpected error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7752) RedirectHandler should execute on the IO thread

2017-10-02 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7752:
---

 Summary: RedirectHandler should execute on the IO thread
 Key: FLINK-7752
 URL: https://issues.apache.org/jira/browse/FLINK-7752
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Priority: Minor


The redirect handler executes much of its logic (including 'respondAsLeader') 
on an arbitrary thread, but it would be cleaner to execute on the I/O thread by 
using `channelHandlerContext.executor()`.   Note that Netty allows writes on 
any thread but reads on the I/O thread.   



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7738) Create WebSocket handler (server)

2017-09-28 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7738:
---

 Summary: Create WebSocket handler (server)
 Key: FLINK-7738
 URL: https://issues.apache.org/jira/browse/FLINK-7738
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Assignee: Eron Wright 


An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] Service Authorization (redux)

2017-09-26 Thread Eron Wright
Hi folks, I'm happy to share with you a draft of a FLIP for service
authorization.   As I mentioned at the top of this thread, the goal is to
protect a deployed Flink cluster/session from unauthorized use.   In the
doc, I propose the use of SSL client authentication for internal
communication, plus YARN/Kubernetes/Mesos-specific functionality to achieve
single sign-on for the web interface/API.

FLIP-? - Service Authorization
<https://docs.google.com/document/d/13IRPb2GdL842rIzMgEn0ibOQHNku6W8aMf1p7gCPJjg/edit?usp=sharing>

Feel free to make comments and to give feedback on whether this feature
would be useful to you.

Thanks!
Eron


On Thu, Aug 3, 2017 at 11:11 AM, Eron Wright <eronwri...@gmail.com> wrote:

> Till, with (c) are you suggesting that we'd use Akka 2.3 for Scala 2.10
> and Akka 2.4+ for Scala 2.11+?   Sounds reasonable but I don't know how
> feasible it is.   I will say I'm optimistic because a) Akka 2.4 is said to
> be binary compatible, and b) the Flakka fork appears to be subsumed by 2.4.
>
> Let us then take (c) as the tentative plan.
>
> I agree the community should discuss dropping Scala 2.10 but I don't want
> to drive that conversation.
>
> Thanks
>
>
> On Thu, Aug 3, 2017 at 6:24 AM, Ufuk Celebi <u...@apache.org> wrote:
>
>> I haven't followed this discussion in detail nor am I familiar with
>> the service authorization topic or Flakka, but a) sounds like a lot of
>> maintenance work to me.
>>
>> If possible I would go with c) and maybe start a discussion about
>> dropping Scala 2.10 support to check whether that is a viable option
>> or not.
>>
>> – Ufuk
>>
>>
>> On Thu, Aug 3, 2017 at 1:59 PM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>> > Alternatively there would also be an option
>> >
>> > c) only support mutual auth for Akka 2.4+ if the backport is
>> unrealistic to
>> > do
>> >
>> > But this of course would break security for Scala 2.10. On the other
>> hand
>> > people are already using Flink without this feature.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Aug 2, 2017 at 7:21 PM, Eron Wright <eronwri...@gmail.com>
>> wrote:
>> >
>> >> Thanks Till and Aljoscha for the feedback.
>> >>
>> >> Seems there are two ways to proceed here, if we accept mutual SSL as
>> the
>> >> basis.
>> >>
>> >> a) Backport mutual-auth support from Akka 2.4 to Flakka.
>> >> b) Drop support for Scala 2.10 (FLINK-?), move to Akka 2.4
>> (FLINK-3662).
>> >>
>> >> Let's assume (a) for now.
>> >>
>> >>
>> >>
>> >> On Tue, Aug 1, 2017 at 3:05 PM, Till Rohrmann <trohrm...@apache.org>
>> >> wrote:
>> >>
>> >> > Dropping Java 7 alone is not enough to move to Akka 2.4+. For that we
>> >> need
>> >> > at least Scala 2.11.
>> >> >
>> >> > Cheers,
>> >> > Till
>> >> >
>> >> > On Tue, Aug 1, 2017 at 4:22 PM, Aljoscha Krettek <
>> aljos...@apache.org>
>> >> > wrote:
>> >> >
>> >> > > Hi Eron,
>> >> > >
>> >> > > I think after Dropping support for Java 7 we will move to Akka
>> 2.4+, so
>> >> > we
>> >> > > should be good there. I think quite some users should find a (more)
>> >> > secure
>> >> > > Flink interesting.
>> >> > >
>> >> > > Best,
>> >> > > Aljoscha
>> >> > > > On 24. Jul 2017, at 03:11, Eron Wright <eronwri...@gmail.com>
>> wrote:
>> >> > > >
>> >> > > > Hello, now might be a good time to revisit an important
>> enhancement
>> >> to
>> >> > > > Flink security, so-called service authorization.   This means the
>> >> > > hardening
>> >> > > > of a Flink cluster against unauthorized use with some sort of
>> >> > > > authentication and authorization scheme.   Today, Flink relies
>> >> entirely
>> >> > > on
>> >> > > > network isolation to protect itself from unauthorized job
>> submission
>> >> > and
>> >> > > > control, and to protect the secrets contained within a Flink
>> cluster.
>> >> > > > This is a problem in multi-user environments like YARN/Mesos/K8.
>> >> > > >

Re: Run apache flink from intellij or eclipse

2017-09-15 Thread Eron Wright
When you run in the IDE, an embedded ("local") Flink environment is
created.  It doesn't make use of your Flink cluster as "flink run" would.

I believe you can activate the Web UI by using the following API:
StreamExecutionEnvironment::createLocalEnvironmentWithWebUI



On Wed, Sep 13, 2017 at 4:52 AM, Erdem erdfem 
wrote:

> Hello,
>
> I am running flink from intellij or eclipse, but i dont see my job on the
> flink dashboard
> localhost:8081
>
> What should i do?
>


Re: Using Maven Archetype Plugin 3.0+ to create flink-quickstart-java project

2017-09-11 Thread Eron Wright
Michael, an alternative is to start your project with the released version
of Flink (1.3) rather than 1.4-SNAPSHOT:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#create-project

Afterwards you could update your dependencies (and add the Apache snapshot
repository) if you truly need the latest.

Hope this helps!


On Sun, Sep 10, 2017 at 8:49 AM, Michael Fong  wrote:

> Sorry about that.
>
> Here is the pastebin link :
>
>
> Regards,
>
> Michael
>
> On Sun, Sep 10, 2017 at 10:38 PM, Ted Yu  wrote:
>
> > The error was in an image which didn't come thru.
> >
> > Please use pastebin to convey the error.
> >
> > Cheers
> >
> > On Sun, Sep 10, 2017 at 6:57 AM, Michael Fong 
> > wrote:
> >
> > > Hi, all,
> > >
> > > While trying out the examples from here
> > >  > release-1.4/quickstart/java_api_quickstart.html>
> > > and create maven project w/ Maven Archetype Plugin. I've got a error
> > > complaining about
> > >
> > > [image: Inline image 1]
> > >
> > > This seems has something to do with Maven Archetype Plugin is not
> > > possible anymore to specify the repository via the commandline since
> > v3.0.
> > > Instead, dev could choose to specify repository information in
> > > settings.xml, as official doc suggested
> > >  > plugin/archetype-repository.html>
> > >
> > >
> > > I am just wondering if such information is noted somewhere in the
> > > document? Thanks in advance!
> > >
> > > Regards,
> > >
> > > Michael
> > >
> > >
> >
>


[DISCUSS] Watermark boundary condition

2017-08-30 Thread Eron Wright
Hello,

I think I see a bug in a few places related to determining whether an input
is to be considered late.  Some components use the logic that (timestamp <=
watermark) is considered late.  Others use (timestamp < watermark).  I
think the former is correct according to the definition in Watermark.java.

Compare:
https://github.com/apache/flink/blob/df7452d9811b0aa88919d7e3c1f6c34b36ac9b38/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L574

https://github.com/apache/flink/blob/df7452d9811b0aa88919d7e3c1f6c34b36ac9b38/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala#L116

https://github.com/apache/flink/blob/df7452d9811b0aa88919d7e3c1f6c34b36ac9b38/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java#L168

I might be misunderstanding the above snippets, it was a cursory look.

-Eron


[jira] [Created] (FLINK-7472) Release task managers gracefully

2017-08-17 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7472:
---

 Summary: Release task managers gracefully
 Key: FLINK-7472
 URL: https://issues.apache.org/jira/browse/FLINK-7472
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 


When a task manager is no longer needed (e.g. due to idle timeout in slot 
manager), the RM should gracefully stop it without spurious warnings.   While 
implies some actions should be taken before the TM is actually killed.   
Proactive steps include stopping the heartbeat monitor and sending a disconnect 
message.   

It is unclear whether `RM::closeTaskManagerConnection` method should be called 
proactively (when we plan to kill a TM), reactively (after the TM is killed), 
or both.  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7470) Acquire RM leadership before registering with Mesos

2017-08-17 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7470:
---

 Summary: Acquire RM leadership before registering with Mesos
 Key: FLINK-7470
 URL: https://issues.apache.org/jira/browse/FLINK-7470
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 



Mesos doesn't support fencing tokens in the scheduler protocol; it assumes 
external leader election among scheduler instances.   The last connection wins; 
prior connections for a given framework ID are closed.

The Mesos RM should not register as a framework until it has acquired RM 
leadership.   Evolve the ResourceManager as necessary.   One option is to 
introduce an ResourceManagerRunner that acquires leadership before starting the 
RM.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7469) Handle slot requests occuring before RM registration completes

2017-08-17 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7469:
---

 Summary: Handle slot requests occuring before RM registration 
completes
 Key: FLINK-7469
 URL: https://issues.apache.org/jira/browse/FLINK-7469
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Priority: Minor



Occasionally the TM-to-RM registration ask times out, causing the TM to pause 
registration for 10 seconds.  Meanwhile the registration may actually have 
succeeded in the RM.   Slot requests may then arrive at the TM while RM 
registration is incomplete.   

The current behavior appears to be that the TM honors the slot request.   
Please determine whether this is a feature or a bug.   If a feature, maybe a 
slot request should implicitly complete the registration.

See attached a log showing a certain TM exhibiting the described behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7463) Release nascent workers on slot request timeout

2017-08-16 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7463:
---

 Summary: Release nascent workers on slot request timeout 
 Key: FLINK-7463
 URL: https://issues.apache.org/jira/browse/FLINK-7463
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 


A slot request causes a new worker to be allocated.   If the slot request times 
out or is cancelled before the worker is launched, cancel the worker request if 
possible.

This is an optimization because an idle slot is eventually released anyway.   
However, I observe that a lot of worker requests pile up in the launch 
coordinator, as the JM keeps making request after request.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] Service Authorization (redux)

2017-08-03 Thread Eron Wright
Till, with (c) are you suggesting that we'd use Akka 2.3 for Scala 2.10 and
Akka 2.4+ for Scala 2.11+?   Sounds reasonable but I don't know how
feasible it is.   I will say I'm optimistic because a) Akka 2.4 is said to
be binary compatible, and b) the Flakka fork appears to be subsumed by 2.4.

Let us then take (c) as the tentative plan.

I agree the community should discuss dropping Scala 2.10 but I don't want
to drive that conversation.

Thanks


On Thu, Aug 3, 2017 at 6:24 AM, Ufuk Celebi <u...@apache.org> wrote:

> I haven't followed this discussion in detail nor am I familiar with
> the service authorization topic or Flakka, but a) sounds like a lot of
> maintenance work to me.
>
> If possible I would go with c) and maybe start a discussion about
> dropping Scala 2.10 support to check whether that is a viable option
> or not.
>
> – Ufuk
>
>
> On Thu, Aug 3, 2017 at 1:59 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
> > Alternatively there would also be an option
> >
> > c) only support mutual auth for Akka 2.4+ if the backport is unrealistic
> to
> > do
> >
> > But this of course would break security for Scala 2.10. On the other hand
> > people are already using Flink without this feature.
> >
> > Cheers,
> > Till
> >
> > On Wed, Aug 2, 2017 at 7:21 PM, Eron Wright <eronwri...@gmail.com>
> wrote:
> >
> >> Thanks Till and Aljoscha for the feedback.
> >>
> >> Seems there are two ways to proceed here, if we accept mutual SSL as the
> >> basis.
> >>
> >> a) Backport mutual-auth support from Akka 2.4 to Flakka.
> >> b) Drop support for Scala 2.10 (FLINK-?), move to Akka 2.4 (FLINK-3662).
> >>
> >> Let's assume (a) for now.
> >>
> >>
> >>
> >> On Tue, Aug 1, 2017 at 3:05 PM, Till Rohrmann <trohrm...@apache.org>
> >> wrote:
> >>
> >> > Dropping Java 7 alone is not enough to move to Akka 2.4+. For that we
> >> need
> >> > at least Scala 2.11.
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Tue, Aug 1, 2017 at 4:22 PM, Aljoscha Krettek <aljos...@apache.org
> >
> >> > wrote:
> >> >
> >> > > Hi Eron,
> >> > >
> >> > > I think after Dropping support for Java 7 we will move to Akka
> 2.4+, so
> >> > we
> >> > > should be good there. I think quite some users should find a (more)
> >> > secure
> >> > > Flink interesting.
> >> > >
> >> > > Best,
> >> > > Aljoscha
> >> > > > On 24. Jul 2017, at 03:11, Eron Wright <eronwri...@gmail.com>
> wrote:
> >> > > >
> >> > > > Hello, now might be a good time to revisit an important
> enhancement
> >> to
> >> > > > Flink security, so-called service authorization.   This means the
> >> > > hardening
> >> > > > of a Flink cluster against unauthorized use with some sort of
> >> > > > authentication and authorization scheme.   Today, Flink relies
> >> entirely
> >> > > on
> >> > > > network isolation to protect itself from unauthorized job
> submission
> >> > and
> >> > > > control, and to protect the secrets contained within a Flink
> cluster.
> >> > > > This is a problem in multi-user environments like YARN/Mesos/K8.
> >> > > >
> >> > > > Last fall, an effort was made to implement service authorization
> but
> >> > the
> >> > > PR
> >> > > > was ultimately rejected.   The idea was to add a simple secret
> key to
> >> > all
> >> > > > network communication between the client, JM, and TM.   Akka
> itself
> >> has
> >> > > > such a feature which formed the basis of the solution.  There are
> >> > > usability
> >> > > > challenges with this solution, including a dependency on SSL.
> >> > > >
> >> > > > Since then, the situation has evolved somewhat, and the use of SSL
> >> > mutual
> >> > > > authentication is more viable.   Mutual auth is supported in Akka
> >> > 2.4.12+
> >> > > > (or could be backported to Flakka).  My proposal is:
> >> > > >
> >> > > > 1. Upgrade Akka or backport the functionality to Flakka (see
> commit
> >> > > > 5d03902c5ec3212cd28f26c9b3ef7c3b628b9451).
> >> > > > 2. Implement SSL on any endpoint that doesn't yet support it (e.g.
> >> > > > queryable state).
> >> > > > 3. Enable mutual auth in Akka and implement it on non-Akka
> endpoints.
> >> > > > 4. Implement a simple authorization layer that accepts any
> >> > authenticated
> >> > > > connection.
> >> > > > 5. (stretch) generate and store a certificate automatically in
> YARN
> >> > mode.
> >> > > > 6. (stretch) Develop an alternate authentication method for the
> Web
> >> UI.
> >> > > >
> >> > > > Are folks interested in this capability?  Thoughts on the use of
> SSL
> >> > > mutual
> >> > > > auth versus something else?  Thanks!
> >> > > >
> >> > > > -Eron
> >> > >
> >> > >
> >> >
> >>
>


Re: [DISCUSS] Service Authorization (redux)

2017-08-02 Thread Eron Wright
Thanks Till and Aljoscha for the feedback.

Seems there are two ways to proceed here, if we accept mutual SSL as the
basis.

a) Backport mutual-auth support from Akka 2.4 to Flakka.
b) Drop support for Scala 2.10 (FLINK-?), move to Akka 2.4 (FLINK-3662).

Let's assume (a) for now.



On Tue, Aug 1, 2017 at 3:05 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Dropping Java 7 alone is not enough to move to Akka 2.4+. For that we need
> at least Scala 2.11.
>
> Cheers,
> Till
>
> On Tue, Aug 1, 2017 at 4:22 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Hi Eron,
> >
> > I think after Dropping support for Java 7 we will move to Akka 2.4+, so
> we
> > should be good there. I think quite some users should find a (more)
> secure
> > Flink interesting.
> >
> > Best,
> > Aljoscha
> > > On 24. Jul 2017, at 03:11, Eron Wright <eronwri...@gmail.com> wrote:
> > >
> > > Hello, now might be a good time to revisit an important enhancement to
> > > Flink security, so-called service authorization.   This means the
> > hardening
> > > of a Flink cluster against unauthorized use with some sort of
> > > authentication and authorization scheme.   Today, Flink relies entirely
> > on
> > > network isolation to protect itself from unauthorized job submission
> and
> > > control, and to protect the secrets contained within a Flink cluster.
> > > This is a problem in multi-user environments like YARN/Mesos/K8.
> > >
> > > Last fall, an effort was made to implement service authorization but
> the
> > PR
> > > was ultimately rejected.   The idea was to add a simple secret key to
> all
> > > network communication between the client, JM, and TM.   Akka itself has
> > > such a feature which formed the basis of the solution.  There are
> > usability
> > > challenges with this solution, including a dependency on SSL.
> > >
> > > Since then, the situation has evolved somewhat, and the use of SSL
> mutual
> > > authentication is more viable.   Mutual auth is supported in Akka
> 2.4.12+
> > > (or could be backported to Flakka).  My proposal is:
> > >
> > > 1. Upgrade Akka or backport the functionality to Flakka (see commit
> > > 5d03902c5ec3212cd28f26c9b3ef7c3b628b9451).
> > > 2. Implement SSL on any endpoint that doesn't yet support it (e.g.
> > > queryable state).
> > > 3. Enable mutual auth in Akka and implement it on non-Akka endpoints.
> > > 4. Implement a simple authorization layer that accepts any
> authenticated
> > > connection.
> > > 5. (stretch) generate and store a certificate automatically in YARN
> mode.
> > > 6. (stretch) Develop an alternate authentication method for the Web UI.
> > >
> > > Are folks interested in this capability?  Thoughts on the use of SSL
> > mutual
> > > auth versus something else?  Thanks!
> > >
> > > -Eron
> >
> >
>


[jira] [Created] (FLINK-7341) Update code snippets in documentation

2017-08-01 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7341:
---

 Summary: Update code snippets in documentation 
 Key: FLINK-7341
 URL: https://issues.apache.org/jira/browse/FLINK-7341
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Eron Wright 
Priority: Minor


Consider updating the documentation to use Java 8 lambdas where possible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Class Cache

2017-07-31 Thread Eron Wright
A Flink program is typically packaged as an 'uber-jar' containing its
dependencies.  The Flink quickstart project illustrates this (see the use
of the shading plugin in pom.xml).   Based on your description, the classes
of mylib2.jar were copied into mylib1.jar when the latter was built.  Try
rebuilding mylib1.jar to effect the change.

-Eron

On Mon, Jul 31, 2017 at 11:18 AM, Mike Accola  wrote:

> Are classes cached somewhere in flink?  I am running in a very basic,
> local environment on Linux (start_local.sh).  I've somehow gotten my
> environment into a strange state that I don't understand.  I feel like I
> am overlooking something simple, but I've checked everything I can think
> of.
>
> My main flink application with a ProcessFunction is embedded in
> mylib1.jar.  Within my ProcessFunction I use another class that is
> embedded in mylib2.jar.
>
> When I made changes to function in mylib2.jar and rebuilt the jar, I
> realized the changes weren't taking affect.  In fact, I then delete
> mylib2.jar entirely and my application still worked.  I can't figure out
> where my application is picking up the function contained in mylib2.jar. I
> have checked any temp directories, library paths, etc.  I have repeatedly
> stopped/started my flink environment just to be safe.
>
> I tried adding -verbose:class to env.java.opts.  It output a lot of class
> loading info to the stdout log, but there were no references to my class
> in mylib2.jar.
>
> This has to be caching this code somehow whether it is in flink or in the
> jvm.  Any ideas what could be happening or how to debug this further?
>
> Thanks
>
>
>


Re: [DISCUSS] A more thorough Pull Request check list and template

2017-07-24 Thread Eron Wright
I think a combination of techniques would be effective:
- identifying focus areas for the next release (e.g. see Robert's thread
about 1.4)
- emphasizing design discussion in advance of a PR
- assigning reviewers and a steward in a structured way
- using a label or assignment field to 'pass the baton'
- closing rejected PRs decisively

I don't mean to co-opt this thread for a broader process question, but
figured that the PR template could provide additional process guidance.

On Mon, Jul 24, 2017 at 11:55 AM, Stephan Ewen <se...@apache.org> wrote:

> @Eron Review timeliness would be great to improve.
>
> Some observation from the past year:
> There were periods where some components in Flink were making slow progress
> because all committers knowledgeable in those components were busy handling
> pull requests that were opened against those components, but were not in
> good shape, were adding not discussed designs, etc.
>
> I think the only way to ensure timely handling of pull requests is to be
> very strict in the handling. For example any non-trivial change needs prior
> discussion, agreement that this should be fixed now, and an agreed upon
> design doc. Otherwise the PR is not considered and simply rejected. Same
> for presence of docs, proper tests, ...
>
> But, I fear that introducing such strictness will scare off many in the
> community. So I would be very reluctant to do this.
> After all, many pull requests do bring in a good piece of perspective, at
> least, even if the code is not immediately suited for contribution...
>
>
> On Mon, Jul 24, 2017 at 8:18 PM, Eron Wright <eronwri...@gmail.com> wrote:
>
> > This seems like a good step in establishing a better PR process.  I
> believe
> > the process could be improved to ensure timely and targeted review by
> > component experts and committers.
> >
> > On Mon, Jul 17, 2017 at 9:36 AM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > Hi all!
> > >
> > > I have reflected a bit on the pull requests and on some of the recent
> > > changes to Flink and some of the introduced bugs / regressions that we
> > have
> > > fixed.
> > >
> > > One thing that I think would have helped is to have more explicit
> > > information about what the pull request does and how the contributor
> > would
> > > suggest to verify it. I have seen this when contributing to some other
> > > project and really liked the approach.
> > >
> > > It requires that a contributor takes a minute to reflect on what was
> > > touched, and what would be ways to verify that the changes work
> properly.
> > > Besides being a help to the reviewer, it also makes contributors aware
> of
> > > what is important during the review process.
> > >
> > >
> > > I suggest a new pull request template, as attached below, with a
> preview
> > > here:
> > > https://github.com/StephanEwen/incubator-flink/
> > > blob/pr_template/.github/PULL_REQUEST_TEMPLATE.md
> > >
> > > Don't be scared, it looks long, but a big part is the introductory text
> > > (only relevant for new contributors) and the examples contents for the
> > > description.
> > >
> > > Filling this out for code that is in shape should be a quick thing:
> > Remove
> > > the into and checklist, write a few sentences on what the PR does (one
> > > should do that anyways) and then pick some yes/no in the classification
> > > section.
> > >
> > > Curious to hear what you think!
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > 
> > >
> > > Full suggested pull request template:
> > >
> > >
> > >
> > > *Thank you very much for contributing to Apache Flink - we are happy
> that
> > > you want to help us improve Flink. To help the community review you
> > > contribution in the best possible way, please go through the checklist
> > > below, which will get the contribution into a shape in which it can be
> > best
> > > reviewed.*
> > >
> > > *Please understand that we do not do this to make contributions to
> Flink
> > a
> > > hassle. In order to uphold a high standard of quality for code
> > > contributions, while at the same time managing a large number of
> > > contributions, we need contributors to prepare the contributions well,
> > and
> > > give reviewers enough contextual information for the review. Please
> also
> > > understand that contributions that do not

Re: [DISCUSS] A more thorough Pull Request check list and template

2017-07-24 Thread Eron Wright
This seems like a good step in establishing a better PR process.  I believe
the process could be improved to ensure timely and targeted review by
component experts and committers.

On Mon, Jul 17, 2017 at 9:36 AM, Stephan Ewen  wrote:

> Hi all!
>
> I have reflected a bit on the pull requests and on some of the recent
> changes to Flink and some of the introduced bugs / regressions that we have
> fixed.
>
> One thing that I think would have helped is to have more explicit
> information about what the pull request does and how the contributor would
> suggest to verify it. I have seen this when contributing to some other
> project and really liked the approach.
>
> It requires that a contributor takes a minute to reflect on what was
> touched, and what would be ways to verify that the changes work properly.
> Besides being a help to the reviewer, it also makes contributors aware of
> what is important during the review process.
>
>
> I suggest a new pull request template, as attached below, with a preview
> here:
> https://github.com/StephanEwen/incubator-flink/
> blob/pr_template/.github/PULL_REQUEST_TEMPLATE.md
>
> Don't be scared, it looks long, but a big part is the introductory text
> (only relevant for new contributors) and the examples contents for the
> description.
>
> Filling this out for code that is in shape should be a quick thing: Remove
> the into and checklist, write a few sentences on what the PR does (one
> should do that anyways) and then pick some yes/no in the classification
> section.
>
> Curious to hear what you think!
>
> Best,
> Stephan
>
>
> 
>
> Full suggested pull request template:
>
>
>
> *Thank you very much for contributing to Apache Flink - we are happy that
> you want to help us improve Flink. To help the community review you
> contribution in the best possible way, please go through the checklist
> below, which will get the contribution into a shape in which it can be best
> reviewed.*
>
> *Please understand that we do not do this to make contributions to Flink a
> hassle. In order to uphold a high standard of quality for code
> contributions, while at the same time managing a large number of
> contributions, we need contributors to prepare the contributions well, and
> give reviewers enough contextual information for the review. Please also
> understand that contributions that do not follow this guide will take
> longer to review and thus typically be picked up with lower priority by the
> community.*
>
> ## Contribution Checklist
>
>   - Make sure that the pull request corresponds to a [JIRA issue](
> https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made
> for typos in JavaDoc or documentation files, which need no JIRA issue.
>
>   - Name the pull request in the form "[FLINK-1234] [component] Title of
> the pull request", where *FLINK-1234* should be replaced by the actual
> issue number. Skip *component* if you are unsure about which is the best
> component.
>   Typo fixes that have no associated JIRA issue should be named following
> this pattern: `[hotfix] [docs] Fix typo in event time introduction` or
> `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
>
>   - Fill out the template below to describe the changes contributed by the
> pull request. That will give reviewers the context they need to do the
> review.
>
>   - Make sure that the change passes the automated tests, i.e., `mvn clean
> verify`
>
>   - Each pull request should address only one issue, not mix up code from
> multiple issues.
>
>   - Each commit in the pull request has a meaningful commit message
> (including the JIRA id)
>
>   - Once all items of the checklist are addressed, remove the above text
> and this checklist, leaving only the filled out template below.
>
>
> **(The sections below can be removed for hotfixes of typos)**
>
> ## What is the purpose of the change
>
> *(For example: This pull request makes task deployment go through the blob
> server, rather than through RPC. That way we avoid re-transferring them on
> each deployment (during recovery).)*
>
>
> ## Brief change log
>
> *(for example:)*
>   - *The TaskInfo is stored in the blob store on job creation time as a
> persistent artifact*
>   - *Deployments RPC transmits only the blob storage reference*
>   - *TaskManagers retrieve the TaskInfo from the blob cache*
>
>
> ## Verifying this change
>
> *(Please pick either of the following options)*
>
> This change is a trivial rework / code cleanup without any test coverage.
>
> *(or)*
>
> This change is already covered by existing tests, such as *(please describe
> tests)*.
>
> *(or)*
>
> This change added tests and can be verified as follows:
>
> *(example:)*
>   - *Added integration tests for end-to-end deployment with large payloads
> (100MB)*
>   - *Extended integration test for recovery after master (JobManager)
> failure*
>   - *Added test that validates that TaskInfo is transferred only once
> 

[DISCUSS] Service Authorization (redux)

2017-07-23 Thread Eron Wright
Hello, now might be a good time to revisit an important enhancement to
Flink security, so-called service authorization.   This means the hardening
of a Flink cluster against unauthorized use with some sort of
authentication and authorization scheme.   Today, Flink relies entirely on
network isolation to protect itself from unauthorized job submission and
control, and to protect the secrets contained within a Flink cluster.
This is a problem in multi-user environments like YARN/Mesos/K8.

Last fall, an effort was made to implement service authorization but the PR
was ultimately rejected.   The idea was to add a simple secret key to all
network communication between the client, JM, and TM.   Akka itself has
such a feature which formed the basis of the solution.  There are usability
challenges with this solution, including a dependency on SSL.

Since then, the situation has evolved somewhat, and the use of SSL mutual
authentication is more viable.   Mutual auth is supported in Akka 2.4.12+
(or could be backported to Flakka).  My proposal is:

1. Upgrade Akka or backport the functionality to Flakka (see commit
5d03902c5ec3212cd28f26c9b3ef7c3b628b9451).
2. Implement SSL on any endpoint that doesn't yet support it (e.g.
queryable state).
3. Enable mutual auth in Akka and implement it on non-Akka endpoints.
4. Implement a simple authorization layer that accepts any authenticated
connection.
5. (stretch) generate and store a certificate automatically in YARN mode.
6. (stretch) Develop an alternate authentication method for the Web UI.

Are folks interested in this capability?  Thoughts on the use of SSL mutual
auth versus something else?  Thanks!

-Eron


Re: Dropping Java 7 support

2017-07-21 Thread Eron Wright
Opened FLINK-7242 as an umbrella issue for this.

On Wed, Jul 19, 2017 at 3:09 AM, Chesnay Schepler 
wrote:

> Are the specific things we want to change right away? (build profiles
> would be one thing)
>
> Would be neat to collect them in an umbrella issue.
>
>
> On 18.07.2017 16:49, Timo Walther wrote:
>
>> Hurray! Finally IntStreams, LongStreams, etc. in our stream processor ;-)
>>
>> Timo
>>
>> Am 18.07.17 um 16:31 schrieb Stephan Ewen:
>>
>>> Hi all!
>>>
>>> Over the last days, there was a longer poll running concerning dropping
>>> the
>>> support for Java 7.
>>>
>>> The feedback from users was unanimous - in favor of dropping Java 7 and
>>> going ahead with Java 8.
>>>
>>> So let's do that!
>>>
>>> Greetings,
>>> Stephan
>>>
>>> -- Forwarded message --
>>> From: Stephan Ewen 
>>> Date: Tue, Jul 18, 2017 at 4:29 PM
>>> Subject: Re: [POLL] Who still uses Java 7 with Flink ?
>>> To: user 
>>>
>>>
>>> All right, thanks everyone.
>>>
>>> I think the consensus here is clear :-)
>>>
>>> On Thu, Jul 13, 2017 at 5:17 PM, nragon >> es.com
>>>
 wrote:
 +1 dropping java 7



 --
 View this message in context: http://apache-flink-user-maili
 ng-list-archive.2336050.n4.nabble.com/POLL-Who-still-uses
 -Java-7-with-Flink-tp12216p14266.html
 Sent from the Apache Flink User Mailing List archive. mailing list
 archive
 at Nabble.com.


>>
>>
>


[jira] [Created] (FLINK-7242) Drop Java 7 Support

2017-07-21 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7242:
---

 Summary: Drop Java 7 Support
 Key: FLINK-7242
 URL: https://issues.apache.org/jira/browse/FLINK-7242
 Project: Flink
  Issue Type: Task
Reporter: Eron Wright 
Priority: Critical


This is the umbrella issue for dropping Java 7 support.   The decision was 
taken following a vote 
[here|http://mail-archives.apache.org/mod_mbox/flink-dev/201707.mbox/%3CCANC1h_tawd90CU12v%2BfQ%2BQU2ORsh%3Dnob7AehT11jGHs1g5Hqtg%40mail.gmail.com%3E]
 and announced 
[here|http://mail-archives.apache.org/mod_mbox/flink-dev/201707.mbox/%3CCANC1h_vnxpiBnAB0OmQPD6NMH6L_PLCyWYsX32mZ0H%2BXP3%2BheQ%40mail.gmail.com%3E].
  Reasons cited include new language features and compatibility with Akka 2.4 
and Scala 2.12.

Please open sub-tasks as necessary.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Using native library in Flink

2017-07-18 Thread Eron Wright
The solution mentioned by Timo works well with a standalone Flink cluster
but might not work with a YARN or Mesos cluster.  An alternative is to have
your Java library contain the native library within itself, and to extract
it to a temporary directory before calling `System.loadLibrary(...)`.
Note that you lose the advantages of using the native OS's packaging system
(e.g. security patches, dependency management).   The TensorFlow Java
library demonstrates the technique:

https://github.com/tensorflow/tensorflow/blob/v1.2.1/tensorflow/java/src/main/java/org/tensorflow/NativeLibrary.java

-Eron

On Tue, Jul 18, 2017 at 8:02 AM, Timo Walther  wrote:

> Hi Mike,
>
> do you run Flink locally or in a cluster? You have to make sure that VM
> argument -Djava.library.path is set for all Flink JVMs. Job Manager and
> Task Managers might run in separate JVMs. Make also sure that the library
> is accessible from all node. I don't know what happens if the file is
> accessed by multiple processes/threads at the same time. It might also
> important where you put the static { ... } loading. It should be in the
> Function, because these classes get deserialized on the TaskManager.
>
> I hope this helps.
>
> Timo
>
>
> Am 17.07.17 um 21:30 schrieb Mike Accola:
>
> I am new Flink user just trying to learn a little bit.  I am trying to
>> incorporate an existing C++ library into a new Flink application.  I am
>> seeing some strange behavior when trying to link in the native (C++)
>> library using java via JNI.
>>   I am running this on Linux (RHEL6)
>>   I can run my application once without error.  Sometimes it will run
>> successfully a 2nd or 3rd time.  However, eventually on a subsequent run,
>> I get an exception about the the native library not being found:
>>   java.lang.UnsatisfiedLinkError: no dummy2native in java.library.path
>>  at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>>  at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>>  at java.lang.System.loadLibrary(System.java:1122)
>>  at com.att.flink.tdata.spss.TinyLoader.loadNative(Dummy2.java:
>> 10)
>>   For debugging purposes for now, my native library does not have any
>> external references.  It really contains 1 method that essentially does
>> nothing.
>>   The behavior seems to indicate that there is some kind of cleanup being
>> done that "unloads" the native library.  I suspect this is somehow related
>> to Flink's implementation of its library cache manager, but I have not
>> been able to prove this yet.
>>   A few more details:
>>   - I have a c++ library libdummy2native.so that contains a method that
>> can
>> be invoked via JNI.
>> - I have a jar containing a class, called Dummy2.  The Dummy2 constructor
>> will invoke the JNI method.
>> - The libdummy2native.so library is invoked with System.loadLibrary() like
>> this:
>>   static {System.loadLibrary("dummy2native"); }
>> - In my simple Flink application, I have extended the ProcessFunction
>> class.  Within this class, I have overriden processElement method that
>> declares a Dummy2 object.
>> - The Dummy2 class can be called and invoked without error when used in a
>> standalone java program.
>>   Any thoughts or ideas on what to try next would be appreciated.
>> Initially,
>> I'd be happy to be able to just explain this behavior.  I will worry about
>> fixing it afterwards.
>>   Thanks.
>>
>>
>>
>>
>>
>


[jira] [Created] (FLINK-6631) Implement FLIP-6 MesosTaskExecutorRunner

2017-05-18 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6631:
---

 Summary: Implement FLIP-6 MesosTaskExecutorRunner
 Key: FLINK-6631
 URL: https://issues.apache.org/jira/browse/FLINK-6631
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 


Develop a Mesos task executor runner.   As with FLINK-6630, consider a general 
solution.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner

2017-05-18 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6630:
---

 Summary: Implement FLIP-6 MesosAppMasterRunner
 Key: FLINK-6630
 URL: https://issues.apache.org/jira/browse/FLINK-6630
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Assignee: Eron Wright 


A new runner must be developed for the FLIP-6 RM.  Target the "single job" 
scenario.

Take some time to consider a general solution or a base implementation that is 
shared with the old implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Release 1.3.0 RC1 (Non voting, testing release candidate)

2017-05-17 Thread Eron Wright
Robert, please add FLINK-6606 to the list of JIRAs that you're tracking,
thanks.

On Tue, May 16, 2017 at 8:30 AM, Robert Metzger  wrote:

> I totally forgot to post a document with testing tasks in the RC0 thread,
> so I'll do it in the RC1 thread.
>
> Please use this document:
> https://docs.google.com/document/d/11WCfV15VwQNF-
> Rar4E0RtWiZw1ddEbg5WWf4RFSQ_2Q/edit#
>
> If I have the feeling that not enough people are seeing the document, I'll
> write a dedicated email to user@ and dev@ :)
>
>
> On Tue, May 16, 2017 at 9:26 AM, Robert Metzger 
> wrote:
>
> > Thanks for the pointer. I'll keep an eye on the JIRA.
> >
> > I've gone through the JIRAs tagged with 1.3.0 yesterday to create a list
> > of new features in 1.3. Feel free to add more / change it in the wiki:
> > https://cwiki.apache.org/confluence/display/FLINK/
> > Flink+Release+and+Feature+Plan#FlinkReleaseandFeaturePlan-Flink1.3
> >
> > On Mon, May 15, 2017 at 10:29 PM, Gyula Fóra 
> wrote:
> >
> >> Thanks Robert,
> >>
> >> Just for the record I think there are still some problems with
> incremental
> >> snapshots, I think Stefan is still working on it.
> >>
> >> I added some comments to https://issues.apache.org/
> jira/browse/FLINK-6537
> >>
> >> Gyula
> >>
> >> Robert Metzger  ezt írta (időpont: 2017. máj. 15.,
> >> H,
> >> 19:41):
> >>
> >> > Hi Devs,
> >> >
> >> > This is the second non-voting RC. The last RC had some big issues,
> >> making
> >> > it hard to start Flink locally. I hope this RC proves to be more
> stable.
> >> >
> >> > I hope to create the first voting RC by end of this week.
> >> >
> >> > -
> >> >
> >> > The release commit is 3659a82f553fedf8afe8b5fae75922075fe17e85
> >> >
> >> > The artifacts are located here:
> >> > http://people.apache.org/~rmetzger/flink-1.3.0-rc1/
> >> >
> >> > The maven staging repository is here:
> >> > https://repository.apache.org/content/repositories/
> orgapacheflink-1119
> >> >
> >> > -
> >> >
> >> > Happy testing!
> >> >
> >> > Regards,
> >> > Robert
> >> >
> >>
> >
> >
>


[jira] [Created] (FLINK-6606) Create checkpoint hook with user classloader

2017-05-16 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6606:
---

 Summary: Create checkpoint hook with user classloader
 Key: FLINK-6606
 URL: https://issues.apache.org/jira/browse/FLINK-6606
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Eron Wright 
Assignee: Eron Wright 
Priority: Blocker


Flink should set the thread's classloader when calling the checkpoint hook 
factory's `create` method.   Without that, the hook is likely to fail during 
initialization (e.g. using ServiceLoader). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Changing Flink's shading model

2017-05-11 Thread Eron Wright
In my opinion, the ideal approach to mitigating conflicts between
application code and Flink itself is to relocate all of Flink's
dependencies.  Rationale is to avoid placing the burden of relocation on
the app developer, and ultimately to eliminate the need for an app uber-jar.

For example, imagine enhancing Flink to directly support Maven
repositories, e.g.
```
$ flink run --package org.example:app:1.0
...
Downloading: https://repo1.maven.org/maven2/org/example/app/1.0/app.pom
...
```

>From that perspective, FLINK-6529 is another good step in that direction.
But it seems like we'd be forking more libraries ("fetty"!).   Would we
need to alter the source code or rely on the shading plugin?  As Chesnay
mentioned, what's the impact in the IDE?

In the future, could the entire flink-runtime be made an uber-jar,
performing the relocation at that stage?


On Thu, May 11, 2017 at 3:36 AM, Ufuk Celebi  wrote:

> On Thu, May 11, 2017 at 10:59 AM, Till Rohrmann 
> wrote:
> > Have we somewhere documented how to publish
> > artifacts on Maven central?
>
> Pulling in Robert who published frocks. @Robert: Would you like to
> volunteer for this? Would really help to combine this with some docs
> about publishing Maven artefacts in the flink-shaded-deps README. :-)
> In general, I'm curious to hear your opinion on this proposal.
>
> – Ufuk
>


[jira] [Created] (FLINK-6532) Mesos version check

2017-05-10 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6532:
---

 Summary: Mesos version check
 Key: FLINK-6532
 URL: https://issues.apache.org/jira/browse/FLINK-6532
 Project: Flink
  Issue Type: Improvement
  Components: Mesos
Reporter: Eron Wright 


The minimum requirement for the Mesos subsystem of Flink is 1.0.   We should 
enforce the requirement with a version check upon connection.  This may be 
accomplished by checking the 'version' property of the 'MesosInfo' structure.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6531) Deserialize checkpoint hooks with user classloader

2017-05-10 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6531:
---

 Summary: Deserialize checkpoint hooks with user classloader
 Key: FLINK-6531
 URL: https://issues.apache.org/jira/browse/FLINK-6531
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Eron Wright 
Assignee: Eron Wright 
Priority: Blocker
 Fix For: 1.3.0


The checkpoint hooks introduced in FLINK-6390 aren't being deserialized with 
the user classloader, breaking remote execution.

Remote execution produces a `ClassNotFoundException` as the job graph is 
transferred from the client to the JobManager.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Feature Freeze

2017-05-02 Thread Eron Wright
Robert, I'd like to see FLINK-5974
 (Mesos DNS support)
added to the list of important issues.  A PR is ready.

On Tue, May 2, 2017 at 4:30 AM, Kostas Kloudas 
wrote:

> The only thing that I want to add in the features to be added for 1.3
> is the NOT pattern for the CEP library.
>
> There is an open PR here: https://github.com/apache/flink/pull/3798 <
> https://github.com/apache/flink/pull/3798>
> which is not far from getting in.
>
> Kostas
>
> > On May 2, 2017, at 12:10 PM, Robert Metzger  wrote:
> >
> > Thanks a lot Ufuk for starting the discussion regarding the 1.3 feature
> > freeze.
> >
> > I didn't feature freeze yesterday (Monday) because it was a public
> holiday
> > here in Germany.
> >
> > I haven't made up my mind whether to do the feature freeze today or not.
> > Many important features seem to be close to completion.
> > I don't think we can pick features and wait with the release until they
> are
> > finished. The only thing I can imagine we could do is extend the
> deadline a
> > little bit to give people more time to complete.
> > But I'm strictly against multiple extensions.
> >
> > To make the discussion a bit easier, I've collected all the JIRAs
> mentioned
> > in this thread. I decided to group them into "really important" and
> > "important". I hope nobody is offended by my selection and I'm very open
> to
> > discuss it.
> > It doesn't really matter anyways because we will not block the release on
> > the completion of certain features.
> >
> >
> > Completed Features for 1.3
> > - TODO!
> >
> > Blockers:
> > - TODO!
> >
> >
> > Really important (open):
> > - FLINK-6364: Implement incremental checkpointing in RocksDBStateBackend
> > (pending PR)
> > - FLINK-5906: Add support to register user defined aggregates in
> > TableEnvironment (no PR)
> > - FLINK-6047: Add support for Retraction in Table API / SQL (depends on
> > FLINK-6093)
> > - FLINK-6093: Implement and turn on retraction for table sink (pending
> PR)
> > - FLINK-6334: Refactoring UDTF interface (pending PR, under review)
> > - FLINK-5998: Un-fat Hadoop from Flink fat jar (pending PR, under review)
> > - FLINK-4545: Flink automatically manages TM network buffer (pending PR,
> > under review)
> > - FLINK-6178: Allow upgrades to state serializers (pending PR, no review
> )
> >
> >
> > Really important (closed):
> > - FLINK-5892: Recover job state at the granularity of operator (merged)
> >
> >
> >
> > Important (open):
> > - FLINK-6013: Add Datadog HTTP metrics reporter (pending PR, under
> review)
> > - FLINK-6337: Remove the buffer provider from
> PartitionRequestServerHandler
> > (pending PR, under review)
> > - FLINK-6033: Support UNNEST query in the stream SQL API (no PR)
> > - FLINK-6335: Parse DISTINCT over grouped window in stream SQL (pending
> PR)
> > - FLINK-6373: Add runtime support for distinct aggregation over grouped
> > windows (pending PR, under review)
> > - FLINK-6281: Create TableSink for JDBC (pending PR, under review)
> > - FLINK-6225: Support Row Stream for CassandraSink (pending PR, under
> > review)
> > - FLINK-6196: Support dynamic schema in Table Function (pending PR, under
> > review)
> > - FLINK-4022: Partition and topic discovery for FlinkKafkaConsumer
> (pending
> > PR, no review)
> > - FLINK-4821: Implement rescalable non-partitioned state for Kinesis
> > Connector (pending PR, under review)
> > Important (closed):
> > - FLINK-6377: Support map types in the Table / SQL API (merged)
> > - FLINK-6398: RowSerializer's duplicate should always return a new
> instance
> > (merged)
> >
> >
> > I'll now collect the list of finished features in 1.3 and the blockers
> and
> > update above list.
> >
> > Stephan's email just came in, and I like the idea of freezing the feature
> > set now but extending the deadline to Friday for the branching. (Friday
> 3pm
> > CEST)
> >
> > Does everybody agree to that proposal?
> >
> >
> > On Tue, May 2, 2017 at 12:05 PM, Stephan Ewen  wrote:
> >
> >> Thanks all for the lively discussion about the feature freeze and how to
> >> proceed.
> >> Because we committed to a time-based release schedule, we should not
> break
> >> the feature freeze too badly, or we would just disable the
> >> time-based-release-policy at the very first time it would trigger.
> >>
> >> Here are a few thoughts about what we can do:
> >>
> >>  - First of all, please note that feature freeze does not mean bug fix
> >> freeze. Important bug fixes can and should go in as part of the 1.3.0
> >> release testing period
> >>
> >>  - We should probably not add brand new features to the list at this
> point
> >> (that would most likely break the release schedule completely)
> >>
> >>  - I think there is a case to include some features which are partially
> >> (but not fully) in master already, or exist basically completely as Pull
> >> Requests at this time.
> >>
> >> How about we do 

[jira] [Created] (FLINK-6391) fix build for scala 2.11 (gelly-examples)

2017-04-26 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6391:
---

 Summary: fix build for scala 2.11 (gelly-examples)
 Key: FLINK-6391
 URL: https://issues.apache.org/jira/browse/FLINK-6391
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Eron Wright 
Assignee: Eron Wright 


After switching the build to Scala 2.11 (using 
`tools/change-scala-version.sh`), the build fails in flink-dist module.

{code}
...
[INFO] flink-dist . FAILURE [ 19.337 s]
[INFO] flink-fs-tests . SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 31:16 min
[INFO] Finished at: 2017-04-26T15:17:43-07:00
[INFO] Final Memory: 380M/1172M
[INFO] 
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project 
flink-dist_2.11: Failed to create assembly: Error adding file to archive: 
/Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar
 -> [Help 1]
{code}

The root cause appears to be that the change-scala-version tool should update 
flink-dist/.../assemblies/bin.xml to use the correct version of 
flink-gelly-examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6379) Implement FLIP-6 Mesos Resource Manager

2017-04-25 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6379:
---

 Summary: Implement FLIP-6 Mesos Resource Manager
 Key: FLINK-6379
 URL: https://issues.apache.org/jira/browse/FLINK-6379
 Project: Flink
  Issue Type: Sub-task
  Components: Mesos
Reporter: Eron Wright 
Assignee: Eron Wright 


Given the new ResourceManager of FLIP-6, implement a new MesosResourceManager.  
 

The minimal effort would be to implement a new resource manager while 
continuing to use the various local actors (launch coordinator, task monitor, 
etc.) which implement the various FSMs associated with Mesos scheduling. 

The Fenzo library would continue to solve the packing problem of matching 
resource offers to slot requests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6378) Implement FLIP-6 Flink-on-Mesos

2017-04-25 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6378:
---

 Summary: Implement FLIP-6 Flink-on-Mesos
 Key: FLINK-6378
 URL: https://issues.apache.org/jira/browse/FLINK-6378
 Project: Flink
  Issue Type: New Feature
  Components: Mesos
Reporter: Eron Wright 


This is the parent issue for implementing Flink on Mesos using the new FLIP-6 
architecture.

This covers individual jobs running as Mesos frameworks, where the framework 
and job lifetime are coupled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Mesos component status for 1.3

2017-04-21 Thread Eron Wright
Hello!

Here's a list of related PRs that I'm tracking for 1.3 release:

- FLINK-5974 - Mesos-DNS hostname support
- FLINK-6336 - Mesos placement constraints
- FLINK-5975 - Mesos volume support

We need a committer to push these over the finish line.

Also, I'd like to nominate myself as a shepherd of the Mesos component.

Thanks,
Eron



[jira] [Created] (FLINK-6322) Mesos task labels

2017-04-18 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6322:
---

 Summary: Mesos task labels
 Key: FLINK-6322
 URL: https://issues.apache.org/jira/browse/FLINK-6322
 Project: Flink
  Issue Type: Improvement
  Components: Mesos
Reporter: Eron Wright 
Priority: Minor



Task labels serve many purposes in Mesos, such a tagging tasks for 
log-aggregation purposes.   

I propose a new configuration setting for a list of 'key=value' labels to be 
applied to TM instances.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5974) Support Mesos DNS

2017-03-06 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-5974:
---

 Summary: Support Mesos DNS
 Key: FLINK-5974
 URL: https://issues.apache.org/jira/browse/FLINK-5974
 Project: Flink
  Issue Type: Improvement
  Components: Cluster Management, Mesos
Reporter: Eron Wright 
Assignee: Eron Wright 



In certain Mesos/DCOS environments, the slave hostnames aren't resolvable.  For 
this and other reasons, Mesos DNS names would ideally be used for communication 
within the Flink cluster, not the hostname discovered via 
`InetAddress.getLocalHost`.

Some parts of Flink are already configurable in this respect, notably 
`jobmanager.rpc.address`.  However, the Mesos AppMaster doesn't use that 
setting for non-JM addresses (e.g. artifact server), it uses the hostname.

Similarly, the `taskmanager.hostname` setting isn't used in Mesos deployment 
mode.   To effectively use Mesos DNS, the TM should use 
`..mesos` as its hostname.   This could be derived 
from an interpolate configuration string.





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   >