Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-12 Thread Jamie Grier
sult in its inability to
> support query rewriting.
>
> Our proposal is indeed similar to the ability of materialized view, but
> considering the following two factors: firstly, we should try to follow the
> standard as much as possible without conflicting with it, and secondly,
> materialized view does not directly satisfy the scenario of modifying data,
> so using Table would be more appropriate.
>
> Although materialized view is also one of the candidates, it is not a more
> suitable option.
>
>
> > I'm actually against all of the other proposed names so I rank them
> equally
> last.  I don't think we need yet another new concept for this.  I think
> that will just add to users' confusion and learning curve which is already
> substantial with Flink.  We need to make things easier rather than harder.
>
> Also, just to clarify, and sorry if my previous statement may not be that
> accurate, this is not a new concept, it is just a new type of table,
> similar to the capabilities of materialized view, but simplifies the data
> processing pipeline, which is also aligned with the long term vision of
> Flink SQL.
>
>
> Best,
> Ron
>
>
> Jamie Grier  于2024年4月11日周四 05:59写道:
>
> > Sorry for coming very late to this thread.  I have not contributed much
> to
> > Flink publicly for quite some time but I have been involved with Flink,
> > daily, for years now and I'm keenly interested in where we take Flink SQL
> > going forward.
> >
> > Thanks for the proposal!!  I think it's definitely a step in the right
> > direction and I'm thrilled this is happening.  The end state I have in
> mind
> > is that we get rid of execution modes as something users have to think
> > about and instead make sure the SQL a user writes completely describes
> > their intent.  In the case of this proposal the intent a user has is that
> > the system continually maintains an object (whatever we decide to call
> it)
> > that is the result of their query and further that these can be easily
> > chained together into declarative data pipelines.
> >
> > I would think it would be very unsurprising to users to call this a
> > MATERIALIZED VIEW, except for the fact that this object can also accept
> > updates via one-off DML statements.  However, I don't think this object
> > *should* accept updates via one-off DML statements so I may be the odd
> man
> > out here.   I would like to dive into this a little more if at all
> > possible.  The reasoning I've seen mentioned is GDPR requirements so can
> we
> > dig into that specifically?  I am not terribly familiar with the exact
> GDPR
> > requirements but I should think that the solution to deleting data is to
> > delete it in the upstream tables which would appropriately update any
> > downstream MVs (or whatever we call it).
> >
> > So, with that context and the desire to explore the GDPR requirements a
> > little more I would vote like so:
> >
> > (1) Materialized View, it should work as expected to SQL users, no new
> > concept, no direct updates, dig into GDPR requirements though.
> > (2) Dynamic Table, just follow the Snowflake precedent.
> >
> > I'm actually against all of the other proposed names so I rank them
> equally
> > last.  I don't think we need yet another new concept for this.  I think
> > that will just add to users' confusion and learning curve which is
> already
> > substantial with Flink.  We need to make things easier rather than
> harder.
> >
> > All of that said, I think these discussions may be a bit easier if they
> > were part of a shared longer term vision for Flink SQL overall.  You can
> > see this from the little bits of side discussion that come up even in
> this
> > thread.  I'm not quite sure how to address that though.  I will however
> > give an example.
> >
> > I think that longer term the Flink SQL query text alone should dictate
> > everything the system should do and we shouldn't rely on things like
> > runtime execution modes at all.  This means, for example, that a SELECT
> > statement should always be a point in time query and immediately return
> > results over the current data set and terminate.   This also holds for an
> > INSERT INTO query for that matter, and CTAS.  A continuous query that
> > perpetually maintains some view in the background should really have a
> > distinct syntax.  Basically Flink SQL should behave in a way that is
> > unsurprising to users of existing database systems.
> >
> > Anyway, the point is that maybe we need a high level sketch of where
> we're
> > going so we can

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-10 Thread Jamie Grier
Sorry for coming very late to this thread.  I have not contributed much to
Flink publicly for quite some time but I have been involved with Flink,
daily, for years now and I'm keenly interested in where we take Flink SQL
going forward.

Thanks for the proposal!!  I think it's definitely a step in the right
direction and I'm thrilled this is happening.  The end state I have in mind
is that we get rid of execution modes as something users have to think
about and instead make sure the SQL a user writes completely describes
their intent.  In the case of this proposal the intent a user has is that
the system continually maintains an object (whatever we decide to call it)
that is the result of their query and further that these can be easily
chained together into declarative data pipelines.

I would think it would be very unsurprising to users to call this a
MATERIALIZED VIEW, except for the fact that this object can also accept
updates via one-off DML statements.  However, I don't think this object
*should* accept updates via one-off DML statements so I may be the odd man
out here.   I would like to dive into this a little more if at all
possible.  The reasoning I've seen mentioned is GDPR requirements so can we
dig into that specifically?  I am not terribly familiar with the exact GDPR
requirements but I should think that the solution to deleting data is to
delete it in the upstream tables which would appropriately update any
downstream MVs (or whatever we call it).

So, with that context and the desire to explore the GDPR requirements a
little more I would vote like so:

(1) Materialized View, it should work as expected to SQL users, no new
concept, no direct updates, dig into GDPR requirements though.
(2) Dynamic Table, just follow the Snowflake precedent.

I'm actually against all of the other proposed names so I rank them equally
last.  I don't think we need yet another new concept for this.  I think
that will just add to users' confusion and learning curve which is already
substantial with Flink.  We need to make things easier rather than harder.

All of that said, I think these discussions may be a bit easier if they
were part of a shared longer term vision for Flink SQL overall.  You can
see this from the little bits of side discussion that come up even in this
thread.  I'm not quite sure how to address that though.  I will however
give an example.

I think that longer term the Flink SQL query text alone should dictate
everything the system should do and we shouldn't rely on things like
runtime execution modes at all.  This means, for example, that a SELECT
statement should always be a point in time query and immediately return
results over the current data set and terminate.   This also holds for an
INSERT INTO query for that matter, and CTAS.  A continuous query that
perpetually maintains some view in the background should really have a
distinct syntax.  Basically Flink SQL should behave in a way that is
unsurprising to users of existing database systems.

Anyway, the point is that maybe we need a high level sketch of where we're
going so we can make sure it all hangs together nicely.

That all said I do think CREATE MATERIALIZED is a step in the right
direction but we should figure out the GDPR stuff and the overall direction
for Flink SQL going forward as well.





On Wed, Apr 10, 2024 at 6:16 AM Dawid Wysakowicz 
wrote:

> Hi all,
> I thought I'd cast my vote as well to give extra data:
>
>
>1. Materialized Table
>2. Materialized View (generally speaking I am not too concerned with
>using a View here, but since there are concerns around updating a view I
>put it second)
>
> I think what is suggested in this FLIP is really close to what MATERIALIZED
> VIEWS do already, that's why I very much prefer any of the two options
> above over any of the remaining candidates, but if I were to order them it
> would be:
>
> 3. Refresh Table (it says what it does)
> 4. Live Table - a new concept to explain, "live" can be interpreted in many
> ways
> 5. Derived Table - does not say much
>
> Best,
> Dawid
>
> On Wed, 10 Apr 2024 at 04:50, Jark Wu  wrote:
>
> > I have been following up on the discussion, it's a great FLIP to further
> > unify stream and batch ETL pipelines. Thanks for the proposal!
> >
> > Here is my ranking:
> >
> > 1. Materialized Table  -> "The table materializes the results of a query
> > that you specify", this can reflect what we want and doesn't conflict
> with
> > any SQL standard.
> > 2. Derived Table -> easy to understand and write, but need to extend the
> > standard
> > 3. Live Table ->  looks too much like Databrick's Delta Live Table.
> > 4. Materialized View -> looks weird to insert/update a view.
> >
> >
> > Best,
> > Jark
> >
> >
> >
> >
> > On Wed, 10 Apr 2024 at 09:57, Becket Qin  wrote:
> >
> > > Thanks for the proposal. I like the FLIP.
> > >
> > > My ranking:
> > >
> > > 1. Refresh(ing) / Live Table -> easy to understand and implies the
> > dynamic
> > > 

Re: SQL Gateway and SQL Client

2022-11-04 Thread Jamie Grier
Hi Shengkai,

We're doing more and more Flink development at Confluent these days and we're 
currently trying to bootstrap a prototype that relies on the SQL Client and 
Gateway.  We will be using the the components in some of our projects and would 
like to co-develop them with you and the rest of the Flink community.

As of right now it's a pretty big blocker for our upcoming milestone that the 
SQL Client has not yet been modified to talk to the SQL Gateway and we want to 
help with this effort ASAP!  We would be even willing to take over the work if 
it's not yet started but I suspect it already is.

Anyway, rather than start working immediately on the SQL Client and adding a 
the new Gateway mode ourselves we wanted to start a conversation with you and 
see where you're at with things and offer to help.

Do you have anything you can already share so we can start with your work or 
just play around with it.  Like I said, we just want to get started and are 
very able to help in this area.  We see both the SQL Client and Gateway being 
very important for us and have a good team to help develop it.

Let me know if there is a branch you can share, etc.  It would be much 
appreciated!

-Jamie Grier


On 2022/10/28 06:06:49 Shengkai Fang wrote:
> Hi.
> 
> > Is there a possibility for us to get engaged and at least introduce
> initial changes to support authentication/authorization?
> 
> Yes. You can write a FLIP about the design and change. We can discuss this
> in the dev mail. If the FLIP passes, we can develop it together.
> 
> > Another question about persistent Gateway: did you have any specific
> thoughts about it or some draft design?
> 
> We don't have any detailed plan about this. But I know Livy has a similar
> feature.
> 
> Best,
> Shengkai
> 
> 
> Alexey Leonov-Vendrovskiy  于2022年10月27日周四 15:12写道:
> 
> > Apologies from the delayed response on my side.
> >
> >  I think the authentication module is not part of our plan in 1.17 because
> >> of the busy work. I think we'll start the design at the end of the
> >> release-1.17.
> >
> >
> > Is there a possibility for us to get engaged and at least introduce
> > initial changes to support authentication/authorization? Specifically,
> > changes in the API and in SQL Client.
> >
> > We expect the following authentication flow:
> >
> > On the SQL gateway we want to be able to use a delegation token.
> > SQL client should be able to supply an API key.
> > The SQL Gateway *would not *be submitting jobs on behalf of the client.
> >
> > Ideally it would be nice to introduce some interfaces in the SQL Gateway
> > that would allow implementing custom authentication and authorization.
> >
> > Another question about persistent Gateway: did you have any specific
> > thoughts about it or some draft design?
> >
> > Thanks,
> > Alexey
> >
> >
> > On Fri, Oct 21, 2022 at 1:13 AM Shengkai Fang  wrote:
> >
> >> Sorry for the late response.
> >>
> >> In the next version(Flink 1.17), we plan to support the SQL Client to
> >> submit the statement to the Flink SQL Gateway. The FLINK-29486
> >> <https://issues.apache.org/jira/browse/FLINK-29486> is the first step to
> >> remove the usage of the `Parser` in the client side, which needs to read
> >> the table schema during the converting sql node to operation. I think the 
> >> authentication
> >> module is not part of our plan in 1.17 because of the busy work. I think
> >> we'll start the design at the end of the release-1.17.
> >> But could you share more details about the requirements of the
> >> authentication?
> >> - Do you use the kerberos or delegation token or password to do the
> >> authentication?
> >> - After the authentication, do you need the sql gateway to submit the
> >> job on behalf of the client?
> >> - ...
> >>
> >> For detailed implementation, I think Hive and Presto are good examples to
> >> dig in.  If you have some thoughts about the authentication module,
> >> please let me know.
> >>
> >> Best,
> >> Shengkai
> >>
> >> Alexey Leonov-Vendrovskiy  于2022年10月19日周三 00:37写道:
> >>
> >>> Thank you for the response, Yuxia!
> >>>
> >>> Shengkai, I would like to learn more about nearest and a bit more
> >>> distant plans about development of the SQL Gateway and the SQL Client.
> >>> Do you have a description of the work planned or maybe can share general
> >>> thoughts about the Authentication module, or Persistent Gateway.
> >>

Re: Checkpoint metrics.

2019-09-14 Thread Jamie Grier
Thanks Konstantin,

Refining this a little bit..  All the checkpoints for all the subtasks
upstream of the sink complete in seconds.  Most of the subtasks of the sink
itself also complete in seconds other than these very few "slow" ones.  So,
somehow we are taking at worst 29 minutes to clear the data between the
slow sink subtask and the tasks just upstream.  It's very hard to see how
this can be the case.  I think I need to dig further into the operation of
the StreamingFileSink to see if I can make sense out of how this can
happen.  I don't see any signs of major data/partition skew in the metrics
so this is pretty strange.

Has anyone seen this sort of behavior with the StreamingFileSink before?

-Jamie




On Sat, Sep 14, 2019 at 2:30 AM Konstantin Knauf 
wrote:

> Hi Jamie,
>
> I think, your interpretation is correct. It takes a long time until the
> first barrier reaches the "slow" subtask and in case of the screenshot
> another 3m 22s until the last barrier reaches the subtask. Regarding the
> total amount of data: depending on the your checkpoint configuration
> (especially the min time between checkpoints), there might also still be
> alignment buffers, which need to be processes while the next checkpoint has
> already started.
>
> Cheers,
>
> Konstantin
>
>
>
> On Sat, Sep 14, 2019 at 1:35 AM Jamie Grier 
> wrote:
>
> > Here's the second screenshot I forgot to include:
> > https://pasteboard.co/IxhNIhc.png
> >
> > On Fri, Sep 13, 2019 at 4:34 PM Jamie Grier  wrote:
> >
> > > Alright, here's another case where this is very pronounced.  Here's a
> > link
> > > to a couple of screenshots showing the overall stats for a slow task as
> > > well as a zoom in on the slowest of them:
> > > https://pasteboard.co/IxhGWXz.png
> > >
> > > This is the sink stage of a pipeline with 3 upstream tasks.  All the
> > > upstream subtasks complete their checkpoints end-to-end in under 10
> > > seconds.  Most of the sink subtasks also complete end-to-end in under a
> > few
> > > seconds.  There are a few that take a minute or so (which is also
> > > indicative of a problem) but then there is one that takes 29 minutes.
> > >
> > > The sink here is the StreamingFileSink.
> > >
> > > It does seem that each subtask that has a high end-to-end time also
> has a
> > > substantially higher alignment time but the end-to-end time is much
> > larger
> > > than just alignment.
> > >
> > > I suppose the correct interpretation of this is that the end-to-end
> time
> > > alone indicates heavy backpressure / slow progress making on the slow
> > > subtasks and since they are moving so slowly that also explains how
> there
> > > could be a relatively high alignment time as well.  The skew in the
> > barrier
> > > arrival times is essentially amplified since the subtasks are making
> > their
> > > way through the data so darn slowly.
> > >
> > > It's still very hard to understand how this sink could be taking so
> long
> > > to make progress.  I mean unless I misunderstand something the total
> > amount
> > > of data that has to be worked through to receive a barrier can't be
> more
> > > than what is buffered in Flink's network stack in the worst case,
> right?
> > > How could it take 29 minutes to consume this data in the sink?
> > >
> > > Anyway, I'd appreciate and feedback or insights.
> > >
> > > Thanks :)
> > >
> > > -Jamie
> > >
> > >
> > > On Fri, Sep 13, 2019 at 12:11 PM Jamie Grier  wrote:
> > >
> > >> Thanks Seth and Stephan,
> > >>
> > >> Yup, I had intended to upload a image.  Here it is:
> > >> https://pasteboard.co/Ixg0YP2.png
> > >>
> > >> This one is very simple and I suppose can be explained by heavy
> > >> backpressure.  The more complex version of this problem I run into
> > >> frequently is where a single (or a couple of) sub-task(s) in a highly
> > >> parallel job takes up to an order of magnitude longer than others with
> > >> regard to end-to-end time and usually ends up causing checkpoint
> > timeouts.
> > >> This often occurs in the sink task but that's not the only case I've
> > seen.
> > >>
> > >> The only metric that shows a high value will still be end-to-end time.
> > >> Sync, async, and alignment times are all negligible.  This, to me, is
> > very
> > >> hard to understand, especially when this task wil

Re: Checkpoint metrics.

2019-09-13 Thread Jamie Grier
Alright, here's another case where this is very pronounced.  Here's a link
to a couple of screenshots showing the overall stats for a slow task as
well as a zoom in on the slowest of them:  https://pasteboard.co/IxhGWXz.png

This is the sink stage of a pipeline with 3 upstream tasks.  All the
upstream subtasks complete their checkpoints end-to-end in under 10
seconds.  Most of the sink subtasks also complete end-to-end in under a few
seconds.  There are a few that take a minute or so (which is also
indicative of a problem) but then there is one that takes 29 minutes.

The sink here is the StreamingFileSink.

It does seem that each subtask that has a high end-to-end time also has a
substantially higher alignment time but the end-to-end time is much larger
than just alignment.

I suppose the correct interpretation of this is that the end-to-end time
alone indicates heavy backpressure / slow progress making on the slow
subtasks and since they are moving so slowly that also explains how there
could be a relatively high alignment time as well.  The skew in the barrier
arrival times is essentially amplified since the subtasks are making their
way through the data so darn slowly.

It's still very hard to understand how this sink could be taking so long to
make progress.  I mean unless I misunderstand something the total amount of
data that has to be worked through to receive a barrier can't be more than
what is buffered in Flink's network stack in the worst case, right?  How
could it take 29 minutes to consume this data in the sink?

Anyway, I'd appreciate and feedback or insights.

Thanks :)

-Jamie


On Fri, Sep 13, 2019 at 12:11 PM Jamie Grier  wrote:

> Thanks Seth and Stephan,
>
> Yup, I had intended to upload a image.  Here it is:
> https://pasteboard.co/Ixg0YP2.png
>
> This one is very simple and I suppose can be explained by heavy
> backpressure.  The more complex version of this problem I run into
> frequently is where a single (or a couple of) sub-task(s) in a highly
> parallel job takes up to an order of magnitude longer than others with
> regard to end-to-end time and usually ends up causing checkpoint timeouts.
> This often occurs in the sink task but that's not the only case I've seen.
>
> The only metric that shows a high value will still be end-to-end time.
> Sync, async, and alignment times are all negligible.  This, to me, is very
> hard to understand, especially when this task will take 10 minutes+ to
> complete and everything else takes seconds.
>
> Rather than speak hypothetically on this I'll post some data to this
> thread as this situation occurs again.  Maybe we can make sense of it
> together.
>
> Thanks a lot for the help.
>
> -Jamie
>
>
> .
>
> On Thu, Sep 12, 2019 at 10:57 AM Stephan Ewen  wrote:
>
>> Hi Jamie!
>>
>> Did you mean to attach a screenshot? If yes, you need to share that
>> through
>> a different channel, the mailing list does not support attachments,
>> unfortunately.
>>
>> Seth is right how the time is measured.
>> One important bit to add to the interpretation:
>>   - For non-source tasks, the time include the "travel of the barriers",
>> which can take long under back pressure
>>   - For source tasks, it includes the "time to acquire the checkpoint
>> lock", which can be long if the source is blocked in trying to emit data
>> (again, backpressure).
>>
>> As part of FLIP-27 we will eliminate the checkpoint lock (have a mailbox
>> instead) which should lead to faster lock acquisition.
>>
>> The "unaligned checkpoints" discussion is looking at ways to make
>> checkpoints much less susceptible to back pressure.
>>
>>
>> https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E
>>
>> Hope that helps understanding what is going on.
>>
>> Best,
>> Stephan
>>
>>
>> On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman  wrote:
>>
>> > Great timing, I just debugged this on Monday. E2e time is checkpoint
>> > coordinator to checkpoint coordinator, so it includes RPC to the source
>> and
>> > RPC from the operator back for the JM.
>> >
>> > Seth
>> >
>> > > On Sep 11, 2019, at 6:17 PM, Jamie Grier 
>> > wrote:
>> > >
>> > > Hey all,
>> > >
>> > > I need to make sense of this behavior.  Any help would be appreciated.
>> > >
>> > > Here’s an example of a set of Flink checkpoint metrics I don’t
>> > understand.  This is the first operator in a job and as you can see the
>> > end-to-end time for the checkpoint is long, but it’s not explained by
>>

Re: Checkpoint metrics.

2019-09-13 Thread Jamie Grier
Here's the second screenshot I forgot to include:
https://pasteboard.co/IxhNIhc.png

On Fri, Sep 13, 2019 at 4:34 PM Jamie Grier  wrote:

> Alright, here's another case where this is very pronounced.  Here's a link
> to a couple of screenshots showing the overall stats for a slow task as
> well as a zoom in on the slowest of them:
> https://pasteboard.co/IxhGWXz.png
>
> This is the sink stage of a pipeline with 3 upstream tasks.  All the
> upstream subtasks complete their checkpoints end-to-end in under 10
> seconds.  Most of the sink subtasks also complete end-to-end in under a few
> seconds.  There are a few that take a minute or so (which is also
> indicative of a problem) but then there is one that takes 29 minutes.
>
> The sink here is the StreamingFileSink.
>
> It does seem that each subtask that has a high end-to-end time also has a
> substantially higher alignment time but the end-to-end time is much larger
> than just alignment.
>
> I suppose the correct interpretation of this is that the end-to-end time
> alone indicates heavy backpressure / slow progress making on the slow
> subtasks and since they are moving so slowly that also explains how there
> could be a relatively high alignment time as well.  The skew in the barrier
> arrival times is essentially amplified since the subtasks are making their
> way through the data so darn slowly.
>
> It's still very hard to understand how this sink could be taking so long
> to make progress.  I mean unless I misunderstand something the total amount
> of data that has to be worked through to receive a barrier can't be more
> than what is buffered in Flink's network stack in the worst case, right?
> How could it take 29 minutes to consume this data in the sink?
>
> Anyway, I'd appreciate and feedback or insights.
>
> Thanks :)
>
> -Jamie
>
>
> On Fri, Sep 13, 2019 at 12:11 PM Jamie Grier  wrote:
>
>> Thanks Seth and Stephan,
>>
>> Yup, I had intended to upload a image.  Here it is:
>> https://pasteboard.co/Ixg0YP2.png
>>
>> This one is very simple and I suppose can be explained by heavy
>> backpressure.  The more complex version of this problem I run into
>> frequently is where a single (or a couple of) sub-task(s) in a highly
>> parallel job takes up to an order of magnitude longer than others with
>> regard to end-to-end time and usually ends up causing checkpoint timeouts.
>> This often occurs in the sink task but that's not the only case I've seen.
>>
>> The only metric that shows a high value will still be end-to-end time.
>> Sync, async, and alignment times are all negligible.  This, to me, is very
>> hard to understand, especially when this task will take 10 minutes+ to
>> complete and everything else takes seconds.
>>
>> Rather than speak hypothetically on this I'll post some data to this
>> thread as this situation occurs again.  Maybe we can make sense of it
>> together.
>>
>> Thanks a lot for the help.
>>
>> -Jamie
>>
>>
>> .
>>
>> On Thu, Sep 12, 2019 at 10:57 AM Stephan Ewen  wrote:
>>
>>> Hi Jamie!
>>>
>>> Did you mean to attach a screenshot? If yes, you need to share that
>>> through
>>> a different channel, the mailing list does not support attachments,
>>> unfortunately.
>>>
>>> Seth is right how the time is measured.
>>> One important bit to add to the interpretation:
>>>   - For non-source tasks, the time include the "travel of the barriers",
>>> which can take long under back pressure
>>>   - For source tasks, it includes the "time to acquire the checkpoint
>>> lock", which can be long if the source is blocked in trying to emit data
>>> (again, backpressure).
>>>
>>> As part of FLIP-27 we will eliminate the checkpoint lock (have a mailbox
>>> instead) which should lead to faster lock acquisition.
>>>
>>> The "unaligned checkpoints" discussion is looking at ways to make
>>> checkpoints much less susceptible to back pressure.
>>>
>>>
>>> https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E
>>>
>>> Hope that helps understanding what is going on.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman 
>>> wrote:
>>>
>>> > Great timing, I just debugged this on Monday. E2e time is checkpoint
>>> > coordinator to checkpoint coordinator, so it includes RPC to the
>>> source and
>>> > RPC f

Re: Checkpoint metrics.

2019-09-13 Thread Jamie Grier
Thanks Seth and Stephan,

Yup, I had intended to upload a image.  Here it is:
https://pasteboard.co/Ixg0YP2.png

This one is very simple and I suppose can be explained by heavy
backpressure.  The more complex version of this problem I run into
frequently is where a single (or a couple of) sub-task(s) in a highly
parallel job takes up to an order of magnitude longer than others with
regard to end-to-end time and usually ends up causing checkpoint timeouts.
This often occurs in the sink task but that's not the only case I've seen.

The only metric that shows a high value will still be end-to-end time.
Sync, async, and alignment times are all negligible.  This, to me, is very
hard to understand, especially when this task will take 10 minutes+ to
complete and everything else takes seconds.

Rather than speak hypothetically on this I'll post some data to this thread
as this situation occurs again.  Maybe we can make sense of it together.

Thanks a lot for the help.

-Jamie


.

On Thu, Sep 12, 2019 at 10:57 AM Stephan Ewen  wrote:

> Hi Jamie!
>
> Did you mean to attach a screenshot? If yes, you need to share that through
> a different channel, the mailing list does not support attachments,
> unfortunately.
>
> Seth is right how the time is measured.
> One important bit to add to the interpretation:
>   - For non-source tasks, the time include the "travel of the barriers",
> which can take long under back pressure
>   - For source tasks, it includes the "time to acquire the checkpoint
> lock", which can be long if the source is blocked in trying to emit data
> (again, backpressure).
>
> As part of FLIP-27 we will eliminate the checkpoint lock (have a mailbox
> instead) which should lead to faster lock acquisition.
>
> The "unaligned checkpoints" discussion is looking at ways to make
> checkpoints much less susceptible to back pressure.
>
>
> https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E
>
> Hope that helps understanding what is going on.
>
> Best,
> Stephan
>
>
> On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman  wrote:
>
> > Great timing, I just debugged this on Monday. E2e time is checkpoint
> > coordinator to checkpoint coordinator, so it includes RPC to the source
> and
> > RPC from the operator back for the JM.
> >
> > Seth
> >
> > > On Sep 11, 2019, at 6:17 PM, Jamie Grier 
> > wrote:
> > >
> > > Hey all,
> > >
> > > I need to make sense of this behavior.  Any help would be appreciated.
> > >
> > > Here’s an example of a set of Flink checkpoint metrics I don’t
> > understand.  This is the first operator in a job and as you can see the
> > end-to-end time for the checkpoint is long, but it’s not explained by
> > either sync, async, or alignment times.  I’m not sure what to make of
> > this.  It makes me think I don’t understand the meaning of the metrics
> > themselves.  In my interpretation the end-to-end time should always be,
> > roughly, the sum of the other components — certainly in the case of a
> > source task such as this.
> > >
> > > Any thoughts or clarifications anyone can provide on this?  We have
> many
> > jobs with slow checkpoints that suffer from this sort of thing with
> metrics
> > that look similar.
> > >
> > > -Jamie
> > >
> >
>


Checkpoint metrics.

2019-09-11 Thread Jamie Grier
Hey all,

I need to make sense of this behavior.  Any help would be appreciated.

Here’s an example of a set of Flink checkpoint metrics I don’t understand.
This is the first operator in a job and as you can see the end-to-end time
for the checkpoint is long, but it’s not explained by either sync, async,
or alignment times.  I’m not sure what to make of this.  It makes me think
I don’t understand the meaning of the metrics themselves.  In my
interpretation the end-to-end time should always be, roughly, the sum of
the other components — certainly in the case of a source task such as this.

Any thoughts or clarifications anyone can provide on this?  We have many
jobs with slow checkpoints that suffer from this sort of thing with metrics
that look similar.

-Jamie


Re: JobManager scale limitation - Slow S3 checkpoint deletes

2019-03-07 Thread Jamie Grier
All of the suggestions is this thread are good ones.  I had also considered
farming the actual cleanup work out to all the TaskMangers as well but
didn't realize how the easy the fix might be for this.  Thanks, Till.

With Till's change https://github.com/apache/flink/pull/7924 the problem
we're actually experiencing should be fixed so I'm not going to pursue this
any further right now.

-Jamie


On Thu, Mar 7, 2019 at 2:29 AM Till Rohrmann  wrote:

> I think part of the problem is that we currently use the executor of the
> common RpcService to run the I/O operations as Stephan suspected [1]. I
> will be fixing this problem for 1.8.0 and 1.7.3.
>
> This should resolve the problem but supporting different means of clean up
> might still be interesting to add.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11851
>
> Cheers,
> Till
>
> On Thu, Mar 7, 2019 at 8:56 AM Yun Tang  wrote:
>
> > Sharing the communication pressure of a single node to multi task
> managers
> > would be a good idea. From my point of view, let task managers to know
> the
> > information that some specific checkpoint had already been aborted could
> > benefit a lot of things:
> >
> >   *   Let task manager to clean up the files, which is the topic of this
> > thread.
> >   *   Let `StreamTask` could cancel aborted running checkpoint in
> > task-side, just as https://issues.apache.org/jira/browse/FLINK-8871 want
> > to achieve.
> >   *   Let local state store could prune local checkpoints as soon as
> > possible without waiting for next `notifyCheckpointComplete` come.
> >   *   Let state backend on task manager side could did something on its
> > side, which would be really helpful for specific state backend
> > disaggregating computation and storage.
> >
> > Best
> > Yun Tang
> > 
> > From: Thomas Weise 
> > Sent: Thursday, March 7, 2019 12:06
> > To: dev@flink.apache.org
> > Subject: Re: JobManager scale limitation - Slow S3 checkpoint deletes
> >
> > Nice!
> >
> > Perhaps for file systems without TTL/expiration support (AFAIK includes
> > HDFS), cleanup could be performed in the task managers?
> >
> >
> > On Wed, Mar 6, 2019 at 6:01 PM Jamie Grier 
> > wrote:
> >
> > > Yup, it looks like the actor threads are spending all of their time
> > > communicating with S3.  I've attached a picture of a typical stack
> trace
> > > for one of the actor threads [1].  At the end of that call stack what
> > > you'll see is the thread blocking on synchronous communication with the
> > S3
> > > service.  This is for one of the flink-akka.actor.default-dispatcher
> > > threads.
> > >
> > > I've also attached a link to a YourKit snapshot if you'd like to
> explore
> > > the profiling data in more detail [2]
> > >
> > > [1]
> > >
> > >
> >
> https://drive.google.com/open?id=0BzMcf4IvnGWNNjEzMmRJbkFiWkZpZDFtQWo4LXByUXpPSFpR
> > > [2] https://drive.google.com/open?id=1iHCKJT-PTQUcDzFuIiacJ1MgAkfxza3W
> > >
> > >
> > >
> > > On Wed, Mar 6, 2019 at 7:41 AM Stephan Ewen  wrote:
> > >
> > > > I think having an option to not actively delete checkpoints (but
> rather
> > > > have the TTL feature of the file system take care of it) sounds like
> a
> > > good
> > > > idea.
> > > >
> > > > I am curious why you get heartbeat misses and akka timeouts during
> > > deletes.
> > > > Are some parts of the deletes happening sychronously in the actor
> > thread?
> > > >
> > > > On Wed, Mar 6, 2019 at 3:40 PM Jamie Grier 
> > > > wrote:
> > > >
> > > > > We've run into an issue that limits the max parallelism of jobs we
> > can
> > > > run
> > > > > and what it seems to boil down to is that the JobManager becomes
> > > > > unresponsive while essentially spending all of it's time discarding
> > > > > checkpoints from S3.  This results in sluggish UI, sporadic
> > > > > AkkaAskTimeouts, heartbeat misses, etc.
> > > > >
> > > > > Since S3 (and I assume HDFS) have policy that can be used to
> discard
> > > old
> > > > > objects without Flink actively deleting them I think it would be a
> > > useful
> > > > > feature to add the option to Flink to not ever discard checkpoints.
> > I
> > > > > believe this will solve the problem.
> > > > >
> > > > > Any objections or other known solutions to this problem?
> > > > >
> > > > > -Jamie
> > > > >
> > > >
> > >
> >
>


Re: JobManager scale limitation - Slow S3 checkpoint deletes

2019-03-06 Thread Jamie Grier
Yup, it looks like the actor threads are spending all of their time
communicating with S3.  I've attached a picture of a typical stack trace
for one of the actor threads [1].  At the end of that call stack what
you'll see is the thread blocking on synchronous communication with the S3
service.  This is for one of the flink-akka.actor.default-dispatcher
threads.

I've also attached a link to a YourKit snapshot if you'd like to explore
the profiling data in more detail [2]

[1]
https://drive.google.com/open?id=0BzMcf4IvnGWNNjEzMmRJbkFiWkZpZDFtQWo4LXByUXpPSFpR
[2] https://drive.google.com/open?id=1iHCKJT-PTQUcDzFuIiacJ1MgAkfxza3W



On Wed, Mar 6, 2019 at 7:41 AM Stephan Ewen  wrote:

> I think having an option to not actively delete checkpoints (but rather
> have the TTL feature of the file system take care of it) sounds like a good
> idea.
>
> I am curious why you get heartbeat misses and akka timeouts during deletes.
> Are some parts of the deletes happening sychronously in the actor thread?
>
> On Wed, Mar 6, 2019 at 3:40 PM Jamie Grier 
> wrote:
>
> > We've run into an issue that limits the max parallelism of jobs we can
> run
> > and what it seems to boil down to is that the JobManager becomes
> > unresponsive while essentially spending all of it's time discarding
> > checkpoints from S3.  This results in sluggish UI, sporadic
> > AkkaAskTimeouts, heartbeat misses, etc.
> >
> > Since S3 (and I assume HDFS) have policy that can be used to discard old
> > objects without Flink actively deleting them I think it would be a useful
> > feature to add the option to Flink to not ever discard checkpoints.  I
> > believe this will solve the problem.
> >
> > Any objections or other known solutions to this problem?
> >
> > -Jamie
> >
>


JobManager scale limitation - Slow S3 checkpoint deletes

2019-03-06 Thread Jamie Grier
We've run into an issue that limits the max parallelism of jobs we can run
and what it seems to boil down to is that the JobManager becomes
unresponsive while essentially spending all of it's time discarding
checkpoints from S3.  This results in sluggish UI, sporadic
AkkaAskTimeouts, heartbeat misses, etc.

Since S3 (and I assume HDFS) have policy that can be used to discard old
objects without Flink actively deleting them I think it would be a useful
feature to add the option to Flink to not ever discard checkpoints.  I
believe this will solve the problem.

Any objections or other known solutions to this problem?

-Jamie


Re: [DISCUSS] Flink framework and user log separation

2019-02-28 Thread Jamie Grier
I think maybe if I understood this correctly this design is going in the
wrong direction.  The problem with Flink logging, when you are running
multiple jobs in the same TMs, is not just about separating out the
business level logging into separate files.  The Flink framework itself
logs many things where there is clearly a single job in context but that
all ends up in the same log file and with no clear separation amongst the
log lines.

Also, I don't think shooting to have multiple log files is a very good idea
either.  It's common, especially on container-based deployments, that the
expectation is that a process (like Flink) logs everything to stdout and
the surrounding tooling takes care of routing that log data somewhere.  I
think we should stick with that model and expect that there will be a
single log stream coming out of each Flink process.

Instead, I think it would be better to enhance Flink's logging capability
such that the appropriate context can be added to each log line with the
exact format controlled by the end user.  It might make sense to take a
look at MDC, for example, as a way to approach this.


On Thu, Feb 28, 2019 at 4:24 AM vino yang  wrote:

> Dear devs,
>
> Currently, for log output, Flink does not explicitly distinguish between
> framework logs and user logs. In Task Manager, logs from the framework are
> intermixed with the user's business logs. In some deployment models, such
> as Standalone or YARN session, there are different task instances of
> different jobs deployed in the same Task Manager. It makes the log event
> flow more confusing unless the users explicitly use tags to distinguish
> them and it makes locating problems more difficult and inefficient. For
> YARN job cluster deployment model, this problem will not be very serious,
> but we still need to artificially distinguish between the framework and the
> business log. Overall, we found that Flink's existing log model has the
> following problems:
>
>
>-
>
>Framework log and business log are mixed in the same log file. There
>is no way to make a clear distinction, which is not conducive to problem
>location and analysis;
>-
>
>Not conducive to the independent collection of business logs;
>
>
> Therefore, we propose a mechanism to separate the framework and business
> log. It can split existing log files for Task Manager.
>
> Currently, it is associated with two JIRA issue:
>
>-
>
>FLINK-11202[1]: Split log file per job
>-
>
>FLINK-11782[2]: Enhance TaskManager log visualization by listing all
>log files for Flink web UI
>
>
> We have implemented and validated it in standalone and Flink on YARN (job
> cluster) mode.
>
> sketch 1:
>
> [image: flink-web-ui-taskmanager-log-files.png]
>
> sketch 2:
> [image: flink-web-ui-taskmanager-log-files-2.png]
>
> Design documentation :
> https://docs.google.com/document/d/1TTYAtFoTWaGCveKDZH394FYdRyNyQFnVoW5AYFvnr5I/edit?usp=sharing
>
> Best,
> Vino
>
> [1]: https://issues.apache.org/jira/browse/FLINK-11202
> [2]: https://issues.apache.org/jira/browse/FLINK-11782
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-26 Thread Jamie Grier
This is awesome, Stephan!  Thanks for doing this.

-Jamie


On Tue, Feb 26, 2019 at 9:29 AM Stephan Ewen  wrote:

> Here is the pull request with a draft of the roadmap:
> https://github.com/apache/flink-web/pull/178
>
> Best,
> Stephan
>
> On Fri, Feb 22, 2019 at 5:18 AM Hequn Cheng  wrote:
>
>> Hi Stephan,
>>
>> Thanks for summarizing the great roadmap! It is very helpful for users
>> and developers to track the direction of Flink.
>> +1 for putting the roadmap on the website and update it per release.
>>
>> Besides, would be great if the roadmap can add the UpsertSource
>> feature(maybe put it under 'Batch Streaming Unification').
>> It has been discussed a long time ago[1,2] and is moving forward step by
>> step.
>> Currently, Flink can only emit upsert results. With the UpsertSource, we
>> can make our system a more complete one.
>>
>> Best, Hequn
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-TABLE-How-to-handle-empty-delete-for-UpsertSource-td23856.html#a23874
>> [2] https://issues.apache.org/jira/browse/FLINK-8545
>> 
>>
>>
>>
>> On Fri, Feb 22, 2019 at 3:31 AM Rong Rong  wrote:
>>
>>> Hi Stephan,
>>>
>>> Yes. I completely agree. Jincheng & Jark gave some very valuable
>>> feedbacks and suggestions and I think we can definitely move the
>>> conversation forward to reach a more concrete doc first before we put in to
>>> the roadmap. Thanks for reviewing it and driving the roadmap effort!
>>>
>>> --
>>> Rong
>>>
>>> On Thu, Feb 21, 2019 at 8:50 AM Stephan Ewen  wrote:
>>>
 Hi Rong Rong!

 I would add the security / kerberos threads to the roadmap. They seem
 to be advanced enough in the discussions so that there is clarity what will
 come.

 For the window operator with slicing, I would personally like to see
 the discussion advance and have some more clarity and consensus on the
 feature before adding it to the roadmap. Not having that in the first
 version of the roadmap does not mean there will be no activity. And when
 the discussion advances well in the next weeks, we can update the roadmap
 soon.

 What do you think?

 Best,
 Stephan


 On Thu, Feb 14, 2019 at 5:46 PM Rong Rong  wrote:

> Hi Stephan,
>
> Thanks for the clarification, yes I think these issues has already
> been discussed in previous mailing list threads [1,2,3].
>
> I also agree that updating the "official" roadmap every release is a
> very good idea to avoid frequent update.
> One question I might've been a bit confusion is: are we suggesting to
> keep one roadmap on the documentation site (e.g. [4]) per release, or
> simply just one most up-to-date roadmap in the main website [5] ?
> Just like the release notes in every release, the former will probably
> provide a good tracker for users to look back at previous roadmaps as well
> I am assuming.
>
> Thanks,
> Rong
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>
> [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
> [5] https://flink.apache.org/
>
> On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:
>
>> I think the website is better as well.
>>
>> I agree with Fabian that the wiki is not so visible, and visibility
>> is the main motivation.
>> This type of roadmap overview would not be updated by everyone -
>> letting committers update the roadmap means the listed threads are 
>> actually
>> happening at the moment.
>>
>>
>> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske 
>> wrote:
>>
>>> Hi,
>>>
>>> I like the idea of putting the roadmap on the website because it is
>>> much more visible (and IMO more credible, obligatory) there.
>>> However, I share the concerns about frequent updates.
>>>
>>> It think it would be great to update the "official" roadmap on the
>>> website once per release (-bugfix releases), i.e., every three month.
>>> We can use the wiki to collect and draft the roadmap for the next
>>> update.
>>>
>>> Best, Fabian
>>>
>>>
>>> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang <
>>> zjf...@gmail.com>:
>>>
 Hi Stephan,

 Thanks for this proposal. It is a good idea to track the roadmap.
 One suggestion is that it might be better to put it into wiki page 
 first.
 Because it is easier to update the roadmap on wiki compared to on 
 flink web
 site. 

[jira] [Created] (FLINK-11617) CLONE - Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2019-02-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-11617:
---

 Summary: CLONE - Handle AmazonKinesisException gracefully in 
Kinesis Streaming Connector
 Key: FLINK-11617
 URL: https://issues.apache.org/jira/browse/FLINK-11617
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Jamie Grier
Assignee: Scott Kidder


My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

{noformat}
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

It's interesting that the Kinesis endpoint returned a 500 status code, but 
that's outside the scope of this issue.

I think we can handle this exception in the same manner as a 
ProvisionedThroughputException: performing an exponential backoff and retrying 
a finite number of times before throwing an exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Ratelimiting in the Flink Kafka connector

2019-01-31 Thread Jamie Grier
I had the same reaction initially as some of the others on this thread --
which is "Use Kafka quotas"..  I agree that in general a service should
protect itself with it's own rate limiting rather than building it into
clients like the FlinkKafkaConsumer.

However, there are a few reasons we need to do this in our company
currently:
  - We can't use Kafka quotas right now because the Kafka vendor we're
using doesn't support them
  - Flink jobs that also make calls to RPC services are frequently DDOS'd
by Flink apps and we simply need to slow them down when processing a
backlog to protect external services.  You could argue those services
should protect themselve, and I agree, but for various technical reasons
that's not possible ATM.
  - If you are going to artificially rate limit a Flink job the best place
to do it is definitely in the source -- otherwise you end up with issues
with backpressure and checkpointing.

So, that said I suspect other users have the same issue so I think it's a
good general feature to add to the Kafka consumer.  It already exists in
the Kinesis consumer as well.

In terms of code bloat -- well the code is dead simple.  It's just adding a
Guava RateLimiter to the poll() loop and it's opt-in.  The code has already
been implemented for this.

@Lakshmi Gururaja Rao   Can you put up a apache/flink PR for
this since it's already finished internally?

Anyway, I'm not opposed to making the KafkaConsumer a little more
customizable via adding some hooks if that's what others prefer -- however,
let's also make the rate limited KafkaConsumer available in the Flink
project at large rather than keeping it internal at Lyft.  I think it's
generally useful.

-Jamie


On Tue, Jan 29, 2019 at 8:57 PM Thomas Weise  wrote:

> It is preferred for the service to rate limit. The problem is that not all
> Kafka setups have that control enabled / support for it.
>
> Even when rate limiting was enabled, it may still be *nice* for the client
> to gracefully handle it.
>
> There was discussion in the past that we should not bloat the Kafka
> consumer further and I agree with that.
>
> On the other hand it would be good if the consumer can be augmented a bit
> to provide hooks for customization (we had done that for the Kinesis
> consumer also).
>
> Thanks,
> Thomas
>
>
> On Mon, Jan 28, 2019 at 3:14 AM Becket Qin  wrote:
>
> > Hi Lakshmi,
> >
> > As Nagajun mentioned, you might want to configure quota on the Kafka
> broker
> > side for your Flink connector client.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Jan 26, 2019 at 10:44 AM Ning Shi  wrote:
> >
> > > > We have a Flink job reading from Kafka (specifically it uses
> > > > FlinkKafkaConsumer011). There are instances when the job is
> processing
> > a
> > > > backlog and it ends up reading at a significantly high throughput and
> > > > degrades the underlying Kafka cluster. If there was a way to rate
> limit
> > > the
> > > > calls to Kafka (by controlling how often the *consumer*.poll() is
> > > called),
> > > > it would be a useful feature for our use case.
> > > >
> > > > Has anyone has run into a similar issue? Are there are any
> > > efforts/thoughts
> > > > on implementing a rate-limiting feature in the Flink Kafka connector?
> > >
> > > We has similar problem and ended up putting a Guava rate limiter
> > > inside the Kafka consumer to limit the consumption rate. Since we use
> > > POJO, this is easily done by putting the rate limiter inside the POJO
> > > deserializer, which runs in the Kafka source.
> > >
> > > This has the benefit of not slowing down checkpoints because the
> > > source doesn't have to do alignment. If you don't care about
> > > checkpoint alignment, you can also add a map function with a Guava
> > > rate limiter immediately after the Kafka source. When it throttles,
> > > back pressure should eventually cause the Kafka source to slowdown
> > > consumption.
> > >
> > > Ning
> > >
> >
>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-18 Thread Jamie Grier
I'm not sure if this is required.  It's quite convenient to be able to just
grab a single tarball and you've got everything you need.

I just did this for the latest binary release and it was 273MB and took
about 25 seconds to download.  Of course I know connection speeds vary
quite a bit but I don't think 273 MB seems onerous to download and I like
the simplicity of it the way it is.



On Fri, Jan 18, 2019 at 3:34 AM Fabian Hueske  wrote:

> Hi Chesnay,
>
> Thank you for the proposal.
> I think this is a good idea.
> We follow a similar approach already for Hadoop dependencies and
> connectors (although in application space).
>
> +1
>
> Fabian
>
> Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
> ches...@apache.org>:
>
>> Hello,
>>
>> the binary distribution that we release by now contains quite a lot of
>> optional components, including various filesystems, metric reporters and
>> libraries. Most users will only use a fraction of these, and as such
>> pretty much only increase the size of flink-dist.
>>
>> With Flink growing more and more in scope I don't believe it to be
>> feasible to ship everything we have with every distribution, and instead
>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
>> lean and additional components are downloaded separately and added by
>> the user.
>>
>> This would primarily affect the /opt directory, but could also be
>> extended to cover flink-dist. For example, the yarn and mesos code could
>> be spliced out into separate jars that could be added to lib manually.
>>
>> Let me know what you think.
>>
>> Regards,
>>
>> Chesnay
>>
>>


Re: [DISCUSS] Bot for stale PRs on GitHub

2019-01-11 Thread Jamie Grier
+1 to try the bot solution and see how it goes.

On Fri, Jan 11, 2019 at 6:54 AM jincheng sun 
wrote:

> +1 for the bot solution!
> and I think Timo‘s suggestion is very useful!
> Thanks,
> Jincheng
>
>
> Timo Walther 于2019年1月11日 周五22:44写道:
>
> > Thanks for bringing up this discussion again. +1 for a bot solution.
> > However, we should discuss a good process for closing PRs.
> >
> > In many cases, PRs are closed not because the contributor did not
> > respond but because no committer prioritizes the PR high enough. Or the
> > PR has issues that might not have been communicated clear enough (e.g.
> > bad code quality, big contribution that requires a big amount of time by
> > a reviewer).
> >
> > So maybe we can first introduce labels for better communication. Right
> > now, we don't use the label feature at all.
> >
> > For example, we could add a "Ownership needed" label by default. Because
> > why should a PR be closed if not a single committer opened at least the
> > description?
> >
> > Regards,
> >
> > Timo
> >
> >
> >
> > Am 11.01.19 um 12:36 schrieb qi luo:
> > > +1 for the stable bot, as it will help bring valuable PR out to be
> > reviewed.
> > >
> > >> On Jan 11, 2019, at 6:26 PM, Driesprong, Fokko 
> > wrote:
> > >>
> > >> +1 I'm in favor of the Stale bot.
> > >>
> > >> We use the Stalebot at Apache Airflow as well, and it really helps
> > smoothen
> > >> the reviewing process. Keep in mind that the number of PR's processed
> by
> > >> the Stalebot is limited at each run. So you won't get a gazillion
> > >> notifications, but just a few every couple of days. Just enough to
> prune
> > >> the list of PR's.
> > >> Most of the really old PR's are not relevant anymore, so its good
> > practice
> > >> to close these. If the person who still thinks it is relevant, the PR
> > will
> > >> be revisited and can still be considered merging. Otherwise, the PR
> > will be
> > >> closed by the bot. There is no value in having the old PR's hanging
> > around.
> > >> Having 500 open PR's doesn't look really good at the project in my
> > opinion.
> > >> My suggestion would be to give it a try.
> > >>
> > >> Cheers, Fokko
> > >>
> > >> Op do 10 jan. 2019 om 12:45 schreef Chesnay Schepler <
> > ches...@apache.org>:
> > >>
> >  The bot will remind both reviewers and contributors that they have
> to
> > >>> be active on a PR, I found that useful on some PRs that I had open at
> > Beam
> > >>>
> > >>> I don't think we really want every contributor bumping their PR
> > >>> regularly. This will create unbearable noise and, if they actually
> > >>> update it, will lead to them wasting a lot of time since we won't
> > >>> suddenly start reviewing it.
> > >>>
> > >>> On 10.01.2019 12:06, Aljoscha Krettek wrote:
> >  For reference, this is the older staleness discussion:
> > >>>
> >
> https://lists.apache.org/thread.html/d53bee8431776f38ebaf8f5678b1ffd9513cd65ce15d821bbdca95aa@%3Cdev.flink.apache.org%3E
> > >>> <
> > >>>
> >
> https://lists.apache.org/thread.html/d53bee8431776f38ebaf8f5678b1ffd9513cd65ce15d821bbdca95aa@%3Cdev.flink.apache.org%3E
> > 
> >  My main arguments for automatic closing of PRs are:
> > 
> >    - This will eventually close out old, stale PRs, making the number
> > we
> > >>> see in Github better reflect the actual state
> >    - The bot will remind both reviewers and contributors that they
> have
> > >>> to be active on a PR, I found that useful on some PRs that I had open
> > at
> > >>> Beam
> >  Aljoscha
> > 
> > > On 10. Jan 2019, at 11:21, Chesnay Schepler 
> > wrote:
> > >
> > > Without any new argument for doing so, I'm still against it.
> > >
> > > On 10.01.2019 09:54, Aljoscha Krettek wrote:
> > >> Hi,
> > >>
> > >> I know we had similar discussions in the past but I’d like to
> bring
> > up
> > >>> this topic again.
> > >> What do you think about adding a stale bot (
> > >>> https://probot.github.io/apps/stale/ <
> > https://probot.github.io/apps/stale/>)
> > >>> to our Github Repo? This would automatically nag about stale PRs and
> > close
> > >>> them after a (configurable) time of inactivity. This would do two
> > things:
> > >> (1) Clean up old PRs that truly are outdated and stale
> > >> (2) Remind both contributor and reviewers about PRs that are still
> > >>> good and are on the verge of getting stale, thus potentially speeding
> > up
> > >>> review or facilitating it in the first place
> > >> Best,
> > >> Aljoscha
> > >>>
> >
> >
>


Re: [DISCUSS] Detection Flink Backpressure

2019-01-03 Thread Jamie Grier
..  Maybe we should add a way to register those threads such that they are
also sampled.  Thoughts?

On Thu, Jan 3, 2019 at 10:25 AM Jamie Grier  wrote:

> One unfortunate problem with the current back-pressure detection mechanism
> is that it doesn't work well with all of our sources.  The problem is that
> some sources (Kinesis for sure) emit elements from threads Flink knows
> nothing about and therefore those stack traces aren't sampled.  The result
> is that you never see back-pressure detected in the first chain of a Flink
> job containing that source.
>
> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski 
> wrote:
>
>> Hi all,
>>
>> peiliping: I think your idea could be problematic for couple of reasons.
>> Probably minor concern is that checkpoint time could be affected not only
>> because of the back pressure, but also because how long does it take to
>> actually perform the checkpoint. Bigger issues are that this bottleneck
>> detection would be limited to only during checkpointing (what if one has
>> checkpoints only once every 1 hour? Or none at all?) AND
>> performance/bottlenecks may change significantly during checkpointing (for
>> example writing state for the first operator to DFS can affect indirectly
>> down stream operators).
>>
>> The idea of detecting back pressure/bottlenecks using output/input
>> buffers is much more natural. Because in the end, almost by definition, if
>> the output buffers are full, that means that the given task is back
>> pressured.
>>
>> Both input and output queues length are already exposed via metrics, so
>> developers have an access to raw data to manually calculate/detect
>> bottlenecks. It would be actually nice to automatically aggregate those
>> metrics and provide ready to use metrics: boolean flags whether
>> task/stage/job are back pressured or not.
>>
>> Replacing current back pressure detection mechanism that probes the
>> threads and checks which of them are waiting for buffers is another issues.
>> Functionally it is equivalent to monitoring whether the output queues are
>> full. This might be more hacky, but will give the same results, thus it
>> wasn’t high on my priority list to change/refactor. It would be nice to
>> clean this up a little bit and unify, but using metrics can also mean some
>> additional work, since there are some known metrics related performance
>> issues.
>>
>> Piotrek
>>
>> > On 3 Jan 2019, at 10:35, peiliping  wrote:
>> >
>> > I have some ideas about detecting the backpressure (the blocking
>> operators)  by checkpoint barrier .
>> >
>> > I have some flink-jobs with checkpoint , but their checkpoints will
>> take a long time to be completed .
>> >
>> > I need to find out the blocking operators  , the same as the
>> backpressure detection .
>> >
>> > In a checkpoint object , I can get a timestamp which means the
>> start-time , then I compute a metric in
>> >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>> >
>> > The metric  is  a delta time between checkpoint.timestamp to the time
>> when StreamTask.executeCheckpointing invoke
>> >
>> > and I named it as checkpoint-delay-time .
>> >
>> > It looks like the end-to-end-time metric in checkpoint  but not include
>> async-handles  ,
>> >
>> > For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>> ---> C (parallelism : 1)
>> >
>> > Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>> A(there are 2 instances )
>> >
>> > Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>> B(there are 3 instances )
>> >
>> > Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>> C(there is 1 instance)
>> >
>> > Then I can get the other 3 delta time from checkpoint-delay-values
>> >
>> > result-0-->A  = Checkpoint-delay-value-A  -  0
>> >
>> > result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>> >
>> > result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>> >
>> > someone ( result-X-->Y)  which is longer than 5s (maybe other
>> threshold)  should be the black sheep .
>> >
>> >
>> >
>> >
>> >
>> > 在 2019/1/3 下午2:43, Yun Gao :
>> >> Hello liping,
>> >>
>> >>Thank you for proposing to optimize the backpressure detection!
>> F

Re: [DISCUSS] Detection Flink Backpressure

2019-01-03 Thread Jamie Grier
One unfortunate problem with the current back-pressure detection mechanism
is that it doesn't work well with all of our sources.  The problem is that
some sources (Kinesis for sure) emit elements from threads Flink knows
nothing about and therefore those stack traces aren't sampled.  The result
is that you never see back-pressure detected in the first chain of a Flink
job containing that source.

On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski  wrote:

> Hi all,
>
> peiliping: I think your idea could be problematic for couple of reasons.
> Probably minor concern is that checkpoint time could be affected not only
> because of the back pressure, but also because how long does it take to
> actually perform the checkpoint. Bigger issues are that this bottleneck
> detection would be limited to only during checkpointing (what if one has
> checkpoints only once every 1 hour? Or none at all?) AND
> performance/bottlenecks may change significantly during checkpointing (for
> example writing state for the first operator to DFS can affect indirectly
> down stream operators).
>
> The idea of detecting back pressure/bottlenecks using output/input buffers
> is much more natural. Because in the end, almost by definition, if the
> output buffers are full, that means that the given task is back pressured.
>
> Both input and output queues length are already exposed via metrics, so
> developers have an access to raw data to manually calculate/detect
> bottlenecks. It would be actually nice to automatically aggregate those
> metrics and provide ready to use metrics: boolean flags whether
> task/stage/job are back pressured or not.
>
> Replacing current back pressure detection mechanism that probes the
> threads and checks which of them are waiting for buffers is another issues.
> Functionally it is equivalent to monitoring whether the output queues are
> full. This might be more hacky, but will give the same results, thus it
> wasn’t high on my priority list to change/refactor. It would be nice to
> clean this up a little bit and unify, but using metrics can also mean some
> additional work, since there are some known metrics related performance
> issues.
>
> Piotrek
>
> > On 3 Jan 2019, at 10:35, peiliping  wrote:
> >
> > I have some ideas about detecting the backpressure (the blocking
> operators)  by checkpoint barrier .
> >
> > I have some flink-jobs with checkpoint , but their checkpoints will take
> a long time to be completed .
> >
> > I need to find out the blocking operators  , the same as the
> backpressure detection .
> >
> > In a checkpoint object , I can get a timestamp which means the
> start-time , then I compute a metric in
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
> >
> > The metric  is  a delta time between checkpoint.timestamp to the time
> when StreamTask.executeCheckpointing invoke
> >
> > and I named it as checkpoint-delay-time .
> >
> > It looks like the end-to-end-time metric in checkpoint  but not include
> async-handles  ,
> >
> > For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
> ---> C (parallelism : 1)
> >
> > Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
> A(there are 2 instances )
> >
> > Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
> B(there are 3 instances )
> >
> > Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
> C(there is 1 instance)
> >
> > Then I can get the other 3 delta time from checkpoint-delay-values
> >
> > result-0-->A  = Checkpoint-delay-value-A  -  0
> >
> > result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
> >
> > result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
> >
> > someone ( result-X-->Y)  which is longer than 5s (maybe other
> threshold)  should be the black sheep .
> >
> >
> >
> >
> >
> > 在 2019/1/3 下午2:43, Yun Gao :
> >> Hello liping,
> >>
> >>Thank you for proposing to optimize the backpressure detection!
> From our previous experience, we think the InputBufferPoolUsageGauge and
> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
> list of tasks A ---> B > C, if we found that the OutputBufferPoolUsage
> of task A and InputBufferPoolUsage of task B is 100%, but the
> OutputBufferPoolUsage of task B is less than 100%, then it should be the
> task B that causes the backpressure.
> >>
> >>   However, currently we think that the InputBufferPoolUsage and
> OutputBufferPoolUsage requires some modification to be more accurate:
> >>1. When there are multiple inputs or outputs, the
> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
> usage instead of the average usage [1].
> >>  2. Currently the sender side will report backlog right before
> fulfilling the output Buffer. Together with the pre-allocate logic in the
> receiver side, the InputBufferPoolUsage may be 100% even if the data have
> not been received yet [2].
> >>
> >> 

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-15 Thread Jamie Grier
Thanks Aljoscha for getting this effort going!

There's been plenty of discussion here already and I'll add my big +1 to
making this interface very simple to implement for a new
Source/SplitReader.  Writing a new production quality connector for Flink
is very difficult today and requires a lot of detailed knowledge about
Flink, event time progress, watermarking, idle shard detection, etc and it
would be good to move almost all of this type of code into Flink itself and
out of source implementations.  I also think this is totally doable and I'm
really excited to see this happening.

I do have a couple of thoughts about the API and the implementation..

In a perfect world there would be a single thread per Flink source sub-task
and no additional threads for SplitReaders -- but this assumes a world
where you have true async IO APIs for the upstream systems (like Kafka and
Kinesis, S3, HDFS, etc).  If that world did exist the single thread could
just sit in an efficient select() call waiting for new data to arrive on
any Split.  That'd be awesome..

But, that world doesn't exist and given that practical consideration I
would think the next best implementation is going to be, in practice,
probably a thread per SplitReader that does nothing but call the source API
and drop whatever it reads into a (blocking) queue -- as Aljoscha mentioned
(calling it N+1) and as we started to describe here:
https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit#heading=h.lfi2u4fv9cxa

I guess my point is that I think we should strive to move as much of
something like the diagram referenced in the above doc into Flink itself
and out of sources and simplify the SplitReader API as much as possible as
well.

With the above in mind and with regard to the discussion about blocking,
etc..  I'm not sure I agree with some of the discussion so far with regard
to this API design.  The calls to the upstream systems (kafka/kinesis) are
in fact going to be blocking calls.  So a simple API without the constraint
that the methods must be implemented in a non-blocking way seems better to
me from the point of view of somebody writing a new source implementation.
My concern is that if you force the implementer of the SplitReader
interface to do so in a non-blocking way you're just going to make it
harder to write those implementations.  Those calls to read the next bit of
data are going to be blocking calls with most known important sources -- at
least Kafka/Kinesis/HDFS -- so I think maybe we should just deal with that
head on and work around it a higher level so the SplitReader interface
stays super simple to implement.  This means we manage all the threading in
Flink core, the API stays pull-based, and the implementer is allowed to
simply block until they have data to return.

I maybe would change my mind about this if truly asynchronous APIs to the
upstream source systems were likely to be available in the near future or
are now and I'm just ignorant of it.  But even then the supporting code in
Flink to drive async and sync sources would be different and in fact they
might just have different APIs altogether -- SplitReader vs
AsyncSplitReader maybe.

In the end I think playing with the implementation, across more than one
source, and moving as much common code into Flink itself will reveal the
best API of course.

One other interesting note is that you need to preserve per-partition
ordering so you have to take care with the implementation if it were to be
based on a thread pool and futures so as not to reorder the reads.

Anyway, I'm thrilled to see this starting to move forward and I'd very much
like to help with the implementation wherever I can.  We're doing a
simplified internal version of some of this at Lyft for just Kinesis
because we need a solution for event time alignment in the very short term
but we'd like to immediately start helping to do this properly in Flink
after that.  One of the end goals for us is event time alignment across
heterogeneous sources.  Another is making it possible for non-expert users
to have a high probability of being able to write their own, correct,
connectors.

-Jamie

On Thu, Nov 15, 2018 at 3:43 AM Aljoscha Krettek 
wrote:

> Hi,
>
> I thought I had sent this mail a while ago but I must have forgotten to
> send it.
>
> There is another thing we should consider for splits: the range of
> timestamps that it can contain. For example, the splits of a file source
> would know what the minimum and maximum timestamp in the splits is,
> roughly. For infinite splits, such as Kafka partitions, the minimum would
> be meaningful but the maximum would be +Inf. If the splits expose the
> interval of time that they contain the readers, or the component that
> manages the readers can make decisions about which splits to forward and
> read first. And it can also influence the minimum watermark that a reader
> forwards: it should never emit a watermark if it knows there are splits to
> 

Re: Sharing state between subtasks

2018-11-14 Thread Jamie Grier
urce
> > > consumption distributedly. For example we can start this componnet in
> the
> > > JobManager like the current role of CheckpointCoordinator. Then every
> > > source task would commnicate with JobManager via current RPC mechanism,
> > > maybe we can rely on the heartbeat message to attach the consumption
> > > progress as the payloads. The JobManagerwill accumulator or state all
> the
> > > reported progress and then give responses for different source tasks.
> We
> > > can define a protocol for indicating the fast soruce task to sleep for
> > > specific time for example. To do so, the coordinator has the global
> > > informations to give the proper decision for individuals, so it seems
> > more
> > > precise. And it will not affect the barrier alignment, because the
> > sleeping
> > > fast source can release the lock to emit barrier as normal. The only
> > > concern is the changes for source interface and may affect all related
> > > source implementations.
> > > >
> > > > Currently we prefer to the second way to implement and will refer to
> > > other good points above. :)
> > > >
> > > > Best,
> > > > Zhijiang
> > > > --
> > > > 发件人:Jamie Grier 
> > > > 发送时间:2018年10月17日(星期三) 03:28
> > > > 收件人:dev 
> > > > 主 题:Re: Sharing state between subtasks
> > > >
> > > > Here's a doc I started describing some changes we would like to make
> > > > starting with the Kinesis Source.. It describes a refactoring of that
> > > code
> > > > specifically and also hopefully a pattern and some reusable code we
> can
> > > use
> > > > in the other sources as well.  The end goal would be best-effort
> > > event-time
> > > > synchronization across all Flink sources but we are going to start
> with
> > > the
> > > > Kinesis Source first.
> > > >
> > > > Please take a look and please provide thoughts and opinions about the
> > > best
> > > > state sharing mechanism to use -- that section is left blank and
> we're
> > > > especially looking for input there.
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > > >
> > > > -Jamie
> > > >
> > > >
> > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann  >
> > > wrote:
> > > >
> > > >> But on the Kafka source level it should be perfectly fine to do what
> > > Elias
> > > >> proposed. This is of course is not the perfect solution but could
> > bring
> > > us
> > > >> forward quite a bit. The changes required for this should also be
> > > minimal.
> > > >> This would become obsolete once we have something like shared state.
> > But
> > > >> until then, I think it would worth a try.
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <
> aljos...@apache.org
> > >
> > > >> wrote:
> > > >>
> > > >>> The reason this selective reading doesn't work well in Flink in the
> > > >> moment
> > > >>> is because of checkpointing. For checkpointing, checkpoint barriers
> > > >> travel
> > > >>> within the streams. If we selectively read from inputs based on
> > > >> timestamps
> > > >>> this is akin to blocking an input if that input is very far ahead
> in
> > > >> event
> > > >>> time, which can happen when you have a very fast source and a slow
> > > source
> > > >>> (in event time), maybe because you're in a catchup phase. In those
> > > cases
> > > >>> it's better to simply not read the data at the sources, as Thomas
> > said.
> > > >>> This is also because with Kafka Streams, each operator is basically
> > its
> > > >> own
> > > >>> job: it's reading from Kafka and writing to Kafka and there is not
> a
> > > >>> complex graph of different operations with network shuffles in
> > between,
> > > >> as
> > > >>> you have with Flink.
> > > >>>
> > > &g

[jira] [Created] (FLINK-10888) Expose new global watermark RPC to sources

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10888:
---

 Summary: Expose new global watermark RPC to sources
 Key: FLINK-10888
 URL: https://issues.apache.org/jira/browse/FLINK-10888
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Jamie Grier
Assignee: Jamie Grier


Expose new JobMaster RPC for watermark tracking to Source implementations so it 
can be used to align reads across sources.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10887) Add source watermarking tracking to the JobMaster

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10887:
---

 Summary: Add source watermarking tracking to the JobMaster
 Key: FLINK-10887
 URL: https://issues.apache.org/jira/browse/FLINK-10887
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: Jamie Grier
Assignee: Jamie Grier


We need to add a new RPC to the JobMaster such that the current watermark for 
every source sub-task can be reported and the current global minimum/maximum 
watermark can be retrieved so that each source can adjust their partition read 
rates in an attempt to keep sources roughly aligned in event time.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10886) Event time synchronization across sources

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10886:
---

 Summary: Event time synchronization across sources
 Key: FLINK-10886
 URL: https://issues.apache.org/jira/browse/FLINK-10886
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Reporter: Jamie Grier
Assignee: Jamie Grier


When reading from a source with many parallel partitions, especially when 
reading lots of historical data (or recovering from downtime and there is a 
backlog to read), it's quite common for there to develop an event-time skew 
across those partitions.
 
When doing event-time windowing -- or in fact any event-time driven processing 
-- the event time skew across partitions results directly in increased 
buffering in Flink and of course the corresponding state/checkpoint size growth.
 
As the event-time skew and state size grows larger this can have a major effect 
on application performance and in some cases result in a "death spiral" where 
the application performance get's worse and worse as the state size grows and 
grows.
 
So, one solution to this problem, outside of core changes in Flink itself, 
seems to be to try to coordinate sources across partitions so that they make 
progress through event time at roughly the same rate.  In fact if there is 
large skew the idea would be to slow or even stop reading from some partitions 
with newer data while first reading the partitions with older data.  Anyway, to 
do this we need to share state somehow amongst sub-tasks.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Sharing state between subtasks

2018-10-16 Thread Jamie Grier
Here's a doc I started describing some changes we would like to make
starting with the Kinesis Source.. It describes a refactoring of that code
specifically and also hopefully a pattern and some reusable code we can use
in the other sources as well.  The end goal would be best-effort event-time
synchronization across all Flink sources but we are going to start with the
Kinesis Source first.

Please take a look and please provide thoughts and opinions about the best
state sharing mechanism to use -- that section is left blank and we're
especially looking for input there.

https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing

-Jamie


On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann  wrote:

> But on the Kafka source level it should be perfectly fine to do what Elias
> proposed. This is of course is not the perfect solution but could bring us
> forward quite a bit. The changes required for this should also be minimal.
> This would become obsolete once we have something like shared state. But
> until then, I think it would worth a try.
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek 
> wrote:
>
> > The reason this selective reading doesn't work well in Flink in the
> moment
> > is because of checkpointing. For checkpointing, checkpoint barriers
> travel
> > within the streams. If we selectively read from inputs based on
> timestamps
> > this is akin to blocking an input if that input is very far ahead in
> event
> > time, which can happen when you have a very fast source and a slow source
> > (in event time), maybe because you're in a catchup phase. In those cases
> > it's better to simply not read the data at the sources, as Thomas said.
> > This is also because with Kafka Streams, each operator is basically its
> own
> > job: it's reading from Kafka and writing to Kafka and there is not a
> > complex graph of different operations with network shuffles in between,
> as
> > you have with Flink.
> >
> > This different nature of Flink is also why I think that readers need
> > awareness of other readers to do the event-time alignment, and this is
> > where shared state comes in.
> >
> > > On 10. Oct 2018, at 20:47, Elias Levy 
> > wrote:
> > >
> > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske 
> wrote:
> > >
> > >> I think the new source interface would be designed to be able to
> > leverage
> > >> shared state to achieve time alignment.
> > >> I don't think this would be possible without some kind of shared
> state.
> > >>
> > >> The problem of tasks that are far ahead in time cannot be solved with
> > >> back-pressure.
> > >> That's because a task cannot choose from which source task it accepts
> > >> events and from which doesn't.
> > >> If it blocks an input, all downstream tasks that are connected to the
> > >> operator are affected. This can easily lead to deadlocks.
> > >> Therefore, all operators need to be able to handle events when they
> > arrive.
> > >> If they cannot process them yet because they are too far ahead in
> time,
> > >> they are put in state.
> > >>
> > >
> > > The idea I was suggesting is not for operators to block an input.
> > Rather,
> > > it is that they selectively choose from which input to process the next
> > > message from based on their timestamp, so long as there are buffered
> > > messages waiting to be processed.  That is a best-effort alignment
> > > strategy.  Seems to work relatively well in practice, at least within
> > Kafka
> > > Streams.
> > >
> > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> both
> > > its inputs.  Instead, it could keep them separate and selectively
> consume
> > > from the one that had a buffer available, and if both have buffers
> > > available, from the buffer with the messages with a lower timestamp.
> >
> >
>


Re: Sharing state between subtasks

2018-10-10 Thread Jamie Grier
Also, I'm afraid I derailed this thread just a bit..  So also back to
Thomas's original question..

If we decide state-sharing across source subtasks is the way forward for
now -- does anybody have thoughts to share on what form this should take?

Thomas mentioned Akka or JGroups.  Other thoughts?


On Wed, Oct 10, 2018 at 6:58 AM Jamie Grier  wrote:

> Okay, so I think there is a lot of agreement here about (a) This is a real
> issue for people, and (b) an ideal long-term approach to solving it.
>
> As Aljoscha and Elias said a full solution to this would be to also
> redesign the source interface such that individual partitions are exposed
> in the API and not hidden inside sources like now -- then we could be much
> smarter about the way we read from the individual partitions.  We would
> also have to modify the stream task code such that it also reads in a
> time-aligned way throughout the data flow to solve the full problem --
> either that or use some shared state between sources to keep them
> time-aligned across sub-tasks just at the source.
>
> With regard to this question of state sharing between source sub-tasks
> versus modifying Flink to do time-aligned reads throughout the system --
> does anybody have a strong opinion on this?
>
> We're basically looking for a way forward and our initial approach, though
> ugly because it requires modification to all of the sources we use, was
> going to be to share state between source sub-tasks in order to keep them
> time-aligned with no further modifications required to Flink's core.
>
> However, if it seems reasonable to do and there is consensus on the best
> way forward maybe we should be looking at introducing the time-alignment
> properly instead of hacking the sources.
>
>
>
>
> On Tue, Oct 9, 2018 at 12:01 PM Elias Levy 
> wrote:
>
>> On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek 
>> wrote:
>>
>> > @Elias Do you know if Kafka Consumers do this alignment across multiple
>> > consumers or only within one Consumer across the partitions that it
>> reads
>> > from.
>> >
>>
>> The behavior is part of Kafka Streams
>> <
>> https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65
>> >,
>> not the Kafka consumer.  The alignment does not occur across Kafka
>> consumers, but that is because Kafka Streams, unlikely Flink, uses a
>> single
>> consumer to fetch records from multiple sources / topics.  The alignment
>> occurs with the stream task.  Stream tasks keep queues per topic-partition
>> (which may be from different topics), and select the next record to
>> processed by selecting the queue with the lowest timestamp.
>>
>> The equivalent in Flink would be for the Kafka connector source to select
>> the message among partitions with the lowest timestamp to emit next, and
>> for multiple input stream operators to select the message among inputs
>> with
>> the lowest timestamp to process.
>>
>


Re: Sharing state between subtasks

2018-10-10 Thread Jamie Grier
Okay, so I think there is a lot of agreement here about (a) This is a real
issue for people, and (b) an ideal long-term approach to solving it.

As Aljoscha and Elias said a full solution to this would be to also
redesign the source interface such that individual partitions are exposed
in the API and not hidden inside sources like now -- then we could be much
smarter about the way we read from the individual partitions.  We would
also have to modify the stream task code such that it also reads in a
time-aligned way throughout the data flow to solve the full problem --
either that or use some shared state between sources to keep them
time-aligned across sub-tasks just at the source.

With regard to this question of state sharing between source sub-tasks
versus modifying Flink to do time-aligned reads throughout the system --
does anybody have a strong opinion on this?

We're basically looking for a way forward and our initial approach, though
ugly because it requires modification to all of the sources we use, was
going to be to share state between source sub-tasks in order to keep them
time-aligned with no further modifications required to Flink's core.

However, if it seems reasonable to do and there is consensus on the best
way forward maybe we should be looking at introducing the time-alignment
properly instead of hacking the sources.




On Tue, Oct 9, 2018 at 12:01 PM Elias Levy 
wrote:

> On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek 
> wrote:
>
> > @Elias Do you know if Kafka Consumers do this alignment across multiple
> > consumers or only within one Consumer across the partitions that it reads
> > from.
> >
>
> The behavior is part of Kafka Streams
> <
> https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65
> >,
> not the Kafka consumer.  The alignment does not occur across Kafka
> consumers, but that is because Kafka Streams, unlikely Flink, uses a single
> consumer to fetch records from multiple sources / topics.  The alignment
> occurs with the stream task.  Stream tasks keep queues per topic-partition
> (which may be from different topics), and select the next record to
> processed by selecting the queue with the lowest timestamp.
>
> The equivalent in Flink would be for the Kafka connector source to select
> the message among partitions with the lowest timestamp to emit next, and
> for multiple input stream operators to select the message among inputs with
> the lowest timestamp to process.
>


Re: Sharing state between subtasks

2018-10-08 Thread Jamie Grier
I'll add to what Thomas already said..  The larger issue driving this is
that when reading from a source with many parallel partitions, especially
when reading lots of historical data (or recovering from downtime and there
is a backlog to read), it's quite common for there to develop an event-time
skew across those partitions.

When doing event-time windowing -- or in fact any event-time driven
processing -- the event time skew across partitions results directly in
increased buffering in Flink and of course the corresponding
state/checkpoint size growth.

As the event-time skew and state size grows larger this can have a major
effect on application performance and in some cases result in a "death
spiral" where the application performance get's worse and worse as the
state size grows and grows.

So, one solution to this problem, outside of core changes in Flink itself,
seems to be to try to coordinate sources across partitions so that they
make progress through event time at roughly the same rate.  In fact if
there is large skew the idea would be to slow or even stop reading from
some partitions with newer data while first reading the partitions with
older data.  Anyway, to do this we need to share state somehow amongst
sub-tasks.

The common sense view of this is the following:  Why would we want to pull
data from a perfectly good buffer (like a filesystem, Kinesis, or Kafka)
into Flink state just to manage and checkpoint it while waiting to be able
to complete event time computations.  The completion of computations is
held up by the partitions with the oldest data so it's of no value to read
the newer data until you've read the old.  It seems much better to leave
the newer data buffered upstream.

I'd be very curious to hear others' thoughts on this..  I would expect many
people to have run into similar issues.  I also wonder if anybody has
already been working on similar issues.  It seems there is room for some
core Flink changes to address this as well and I'm guessing people have
already thought about it.

-Jamie



On Sun, Oct 7, 2018 at 12:58 PM Thomas Weise  wrote:

> I'm looking to implement a state sharing mechanism between subtasks (of one
> or multiple tasks). Our use case is to align watermarks between subtasks of
> one or multiple sources to prevent some data fetchers to race ahead of
> others and cause massive state buffering in Flink.
>
> Each subtask would share a small state (probably just a key and couple
> longs). The state would be updated periodically (perhaps every 30s). Other
> subtasks should see these changes with similar latency. It is essentially a
> hash table to which every node contributes a distinct key.
>
> An initial idea was to implement this using ZooKeeper ephemeral nodes. But
> since there is no way to read all child nodes in one sweep, state access
> becomes very chatty. With lets's say 512 subtasks we would end up with 512
> * 512 reads per interval (1 to list children, N-1 to fetch data, per
> subtask).
>
> My next stop will be a group communication mechanism like JGroups or Akka
> (following looks like a potential good fit:
> https://doc.akka.io/docs/akka/2.5/distributed-data.html?language=java).
> But
> before that I wanted to check if others already had a similar need and
> possibly experience/implementation to share?
>
> There are probably more use cases related to discovery etc. Perhaps Flink
> could provide a state primitive, if there is broader interest in the
> community?
>
> Thanks,
> Thomas
>


[jira] [Created] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion

2018-10-02 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10484:
---

 Summary: New latency tracking metrics format causes metrics 
cardinality explosion
 Key: FLINK-10484
 URL: https://issues.apache.org/jira/browse/FLINK-10484
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.5.4, 1.6.1, 1.6.0
Reporter: Jamie Grier
Assignee: Jamie Grier


The new metrics format for latency tracking causes huge metrics cardinality 
explosion due to the format and the fact that there is a metric reported for a 
every combination of source subtask index and operator subtask index.  Yikes!

This format is actually responsible for basically taking down our metrics 
system due to DDOSing our metrics servers (at Lyft).

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [Proposal] Utilities for reading, transforming and creating Streaming savepoints

2018-08-17 Thread Jamie Grier
This is great, Gyula!  A colleague here at Lyft has also done some work
around bootstrapping DataStream programs and we've also talked a bit about
doing this by running DataSet programs.

On Fri, Aug 17, 2018 at 3:28 AM, Gyula Fóra  wrote:

> Hi All!
>
> I want to share with you a little project we have been working on at King
> (with some help from some dataArtisans folks). I think this would be a
> valuable addition to Flink and solve a bunch of outstanding production
> use-cases and headaches around state bootstrapping and state analytics.
>
> We have built a quick and dirty POC implementation on top of Flink 1.6,
> please check the README for some nice examples to get a quick idea:
>
> https://github.com/king/bravo
>
> *Short story*
> Bravo is a convenient state reader and writer library leveraging the
> Flink’s batch processing capabilities. It supports processing and writing
> Flink streaming savepoints. At the moment it only supports processing
> RocksDB savepoints but this can be extended in the future for other state
> backends and checkpoint types.
>
> Our goal is to cover a few basic features:
>
>- Converting keyed states to Flink DataSets for processing and analytics
>- Reading/Writing non-keyed operators states
>- Bootstrap keyed states from Flink DataSets and create new valid
>savepoints
>- Transform existing savepoints by replacing/changing some states
>
>
> Some example use-cases:
>
>- Point-in-time state analytics across all operators and keys
>- Bootstrap state of a streaming job from external resources such as
>reading from database/filesystem
>- Validate and potentially repair corrupted state of a streaming job
>- Change max parallelism of a job
>
>
> Our main goal is to start working together with other Flink production
> users and make this something useful that can be part of Flink. So if you
> have use-cases please talk to us :)
> I have also started a google doc which contains a little bit more info than
> the readme and could be a starting place for discussions:
>
> https://docs.google.com/document/d/103k6wPX20kMu5H3SOOXSg5PZIaYpw
> dhqBMr-ppkFL5E/edit?usp=sharing
>
> I know there are a bunch of rough edges and bugs (and no tests) but our
> motto is: If you are not embarrassed, you released too late :)
>
> Please let me know what you think!
>
> Cheers,
> Gyula
>


[jira] [Created] (FLINK-10154) Make sure we always read at least one record in KinesisConnector

2018-08-15 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10154:
---

 Summary: Make sure we always read at least one record in 
KinesisConnector
 Key: FLINK-10154
 URL: https://issues.apache.org/jira/browse/FLINK-10154
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.6.0
Reporter: Jamie Grier
Assignee: Jamie Grier


It's possible in some cases to request zero records from Kinesis in the Kinesis 
connector.  This can happen when the "adpative reads" feature is enabled.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-22 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-9061:
--

 Summary: S3 checkpoint data not partitioned well -- causes errors 
and poor performance
 Key: FLINK-9061
 URL: https://issues.apache.org/jira/browse/FLINK-9061
 Project: Flink
  Issue Type: Bug
  Components: FileSystem, State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Jamie Grier


I think we need to modify the way we write checkpoints to S3 for high-scale 
jobs (those with many total tasks).  The issue is that we are writing all the 
checkpoint data under a common key prefix.  This is the worst case scenario for 
S3 performance since the key is used as a partition key.
 
In the worst case checkpoints fail with a 500 status code coming back from S3 
and an internal error type of TooBusyException.

 
One possible solution would be to add a hook in the Flink filesystem code that 
allows me to "rewrite" paths.  For example say I have the checkpoint directory 
set to:
 
s3://bucket/flink/checkpoints
 
I would hook that and rewrite that path to:
 
s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
path
 
This would distribute the checkpoint write load around the S3 cluster evenly.
 
For reference: 
https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
 
Any other people hit this issue?  Any other ideas for solutions?  This is a 
pretty serious problem for people trying to checkpoint to S3.
 
-Jamie
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Errors checkpointing to S3 for high-scale jobs

2018-03-22 Thread Jamie Grier
I think we need to modify the way we write checkpoints to S3 for high-scale
jobs (those with many total tasks).  The issue is that we are writing all
the checkpoint data under a common key prefix.  This is the worst case
scenario for S3 performance since the key is used as a partition key.

In the worst case checkpoints fail with a 500 status code coming back from
S3 and an internal error type of TooBusyException.

One possible solution would be to add a hook in the Flink filesystem code
that allows me to "rewrite" paths.  For example say I have the checkpoint
directory set to:

s3://bucket/flink/checkpoints

I would hook that and rewrite that path to:

s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the
original path

This would distribute the checkpoint write load around the S3 cluster
evenly.

For reference:
https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/

Any other people hit this issue?  Any other ideas for solutions?  This is a
pretty serious problem for people trying to checkpoint to S3.

-Jamie


Re: Does `flink-connector-filesystem` work with Hadoop-Free Flink?

2018-02-23 Thread Jamie Grier
Yeah, I meant that latter..  but it sounds like it could be just asking for
trouble.  I just like the idea of keeping the set of un-shaded JARs in the
flink/lib directory to a minimum..

Thanks.

On Fri, Feb 23, 2018 at 5:29 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> You mean putting the Flink-native S3 filesystem in the user jar or Hadoop
> in the user jar. The former wouldn't work, I think, because the FileSystems
> are being initialised before the user-jar is loaded. The latter might work
> but only if you don't have Hadoop in the classpath, i.e. not on YARN and
> only on a Hadoop-free cluster. Maybe...
>
> > On 23. Feb 2018, at 13:32, Jamie Grier <jgr...@lyft.com> wrote:
> >
> > Thanks, Aljoscha :)
> >
> > So is it possible to continue to use the new "native' fllesystems along
> > with the BucketingSink by including the Hadoop dependencies only in the
> > user's uber jar? Or is that asking for trouble?  Has anyone tried that
> > successfully?
> >
> > -Jamie
> >
> >
> > On Fri, Feb 23, 2018 at 12:39 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> I'm afraid not, since the BucketingSink uses the Hadoop FileSystem
> >> directly and not the Flink FileSystem abstraction. The flink-s3-fs-*
> >> modules only provide Flink FileSystems.
> >>
> >> One of the goals for 1.6 is to provide a BucketingSink that uses the
> Flink
> >> FileSystem and also works well with eventually consistent file systems.
> >>
> >> --
> >> Aljoscha
> >>
> >>> On 23. Feb 2018, at 06:31, Jamie Grier <jgr...@lyft.com> wrote:
> >>>
> >>> Is the `flink-connector-filesystem` connector supposed to work with the
> >>> latest hadoop-free Flink releases, say along with the
> >> `flink-s3-fs-presto`
> >>> filesystem implementation?
> >>>
> >>> -Jamie
> >>
> >>
>
>


Re: Does `flink-connector-filesystem` work with Hadoop-Free Flink?

2018-02-23 Thread Jamie Grier
Thanks, Aljoscha :)

So is it possible to continue to use the new "native' fllesystems along
with the BucketingSink by including the Hadoop dependencies only in the
user's uber jar? Or is that asking for trouble?  Has anyone tried that
successfully?

-Jamie


On Fri, Feb 23, 2018 at 12:39 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> I'm afraid not, since the BucketingSink uses the Hadoop FileSystem
> directly and not the Flink FileSystem abstraction. The flink-s3-fs-*
> modules only provide Flink FileSystems.
>
> One of the goals for 1.6 is to provide a BucketingSink that uses the Flink
> FileSystem and also works well with eventually consistent file systems.
>
> --
> Aljoscha
>
> > On 23. Feb 2018, at 06:31, Jamie Grier <jgr...@lyft.com> wrote:
> >
> > Is the `flink-connector-filesystem` connector supposed to work with the
> > latest hadoop-free Flink releases, say along with the
> `flink-s3-fs-presto`
> > filesystem implementation?
> >
> > -Jamie
>
>


Does `flink-connector-filesystem` work with Hadoop-Free Flink?

2018-02-22 Thread Jamie Grier
Is the `flink-connector-filesystem` connector supposed to work with the
latest hadoop-free Flink releases, say along with the `flink-s3-fs-presto`
filesystem implementation?

-Jamie


Re: Timestamp/watermark support in Kinesis consumer

2018-02-21 Thread Jamie Grier
I know this is a very simplistic idea but...

In general the issue Eron is describing occurs whenever two (or more)
parallel partitions are assigned to the same Flink sub-task and there is
large time delta between them.  This problem exists though largely because
we are not making any decisions about which of these partitions to read and
when but rather just treating them all the same.  However, this isn't the
only way to approach the problem.

Think instead of each partition as a "roughly time sorted" file and the
function of the connector as roughly a merge sort type process.  In other
words just read the older data first by peeking at each partition and
deciding what to read next.  The output of the connector would be a roughly
time ordered stream that way..

However to really solve the whole problem you'd have to carry this idea
throughout Flink and be more selective about which data you read and when
throughout the whole data flow graph.  Similar problem I think and just
something I've been thinking a bit about lately.




On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright  wrote:

> It is valuable to consider the behavior of a consumer in both a real-time
> processing context, which consists mostly of tail reads, and a historical
> processing context, where there's an abundance of backlogged data.   In the
> historical processing context, system internals (e.g. shard selection
> logic) have an outsized influence on the order of observation and
> potentially the progression of the event time clock.  In a real-time
> context, the order of observation is, by definition, mostly determined by
> the order in which events are produced.
>
> My point is, it would be good to explore the efficacy of these improvements
> in both contexts.
>
>
>
>
> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise  wrote:
>
> > I don't think there is a generic solution to the problem you are
> > describing; we don't know how long it will take for resharding to take
> > effect and those changes to become visible to the connector. Depending on
> > how latency sensitive the pipeline is, possibly a configurable watermark
> > hold period could be used to cushion the event time chaos introduced by
> > resharding.
> >
> > This isn't the primary motivation for the connector customization I'm
> > working on though. We face issues with restart from older checkpoints
> where
> > parent and child shards are consumed in parallel.
> >
> >
> > --
> > sent from mobile
> >
> >
> > On Feb 12, 2018 4:36 PM, "Eron Wright"  wrote:
> >
> > I'd like to know how you envision dealing with resharding in relation to
> > the watermark state.   Imagine that a given shard S1 has a watermark of
> T1,
> > and is then split into two shards S2 and S3.   The new shards are
> assigned
> > to subtasks according to a hash function.  The current watermarks of
> those
> > subtasks could be far ahead of T1, and thus the events in S2/S3 will be
> > considered late.
> >
> > The problem of a chaotic event time clock is exacerbated by any source
> that
> > uses dynamic partitioning.  Would a per-shard watermark generator really
> > solve the problem that is motivating you?
> >
> > Thanks,
> > Eron
> >
> > On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise  wrote:
> >
> > > Based on my draft implementation, the changes that are needed in the
> > Flink
> > > connector are as follows:
> > >
> > > I need to be able to override the following to track last record
> > timestamp
> > > and idle time per shard.
> > >
> > > protected final void emitRecordAndUpdateState(T record, long
> > > recordTimestamp, int shardStateIndex, SequenceNumber
> lastSequenceNumber)
> > {
> > > synchronized (checkpointLock) {
> > > sourceContext.collectWithTimestamp(record,
> recordTimestamp);
> > > updateState(shardStateIndex, lastSequenceNumber);
> > > }
> > > }
> > >
> > > Any objection removing final from it?
> > >
> > > Also, why is sourceContext.collectWithTimestamp in the synchronized
> > block?
> > > My custom class will need to emit watermarks - I assume there is no
> need
> > to
> > > acquire checkpointLock for that? Otherwise I would also need to add
> > > emitWatermark() to the base class.
> > >
> > > Let me know if anything else should be considered, I will open a JIRA
> and
> > > PR otherwise.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise  wrote:
> > >
> > > > -->
> > > >
> > > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <
> > tzuli...@apache.org
> > > >
> > > > wrote:
> > > >
> > > >> Regarding the two hooks you would like to be available:
> > > >>
> > > >>
> > > >>- Provide hook to override discovery (not to hit Kinesis from
> every
> > > >>subtask)
> > > >>
> > > >> Yes, I think we can easily provide a way, for example setting -1 for
> > > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard 

Re: Timestamp/watermark support in Kinesis consumer

2018-02-21 Thread Jamie Grier
Big +1 on trying to come up with a common framework for partition-based,
replayable sources.  There is so much common code to be written that makes
it possible to write correct connectors and Gordon's bullet points are
exactly those -- and it's not just Kinesis and Kafka.  It's also true for
reading data out of something like S3.  If your data is organized as a
bunch of parallel, roughly time ordered files, there really isn't much
difference in the kind of code you have to write for this for all the hard
bits mentioned above.

The good news is that the potential outcome of this sort of effort could be
that production quality, correct, parallel connectors are much easier to
implement.  Ideally everything other than the code you write to discover
partitions and the code to consume data from a single simple partition
could be mostly common.




On Thu, Feb 8, 2018 at 2:01 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Thomas,
>
> It’s great that you’ve brought out these issues, which IMO are all very
> valid. They have also been in my head for a while.
>
> Here’s a list of things, out of the top of my head, that I would really
> like to improve as part of a major Kafka / Kinesis connector rework.
> Some have JIRAs for them already, or were discussed in some other
> indirectly related JIRA. It might make sense to open an umbrella ticket and
> consolidate all of them there.
>
> - Common abstraction for partition-based, replayable sources, which
> handles 1) position checkpointing, 2) partition discovery / topic
> subscription (using the file source pattern), 3) custom startup positions,
> 4) per-partition watermarks, and 5) partition idleness.
> - Configuration for the connectors are not well-defined. Some go through
> provided properties, some requires using setter methods, etc. Moreover, it
> is actually very confusing for some users that we share the Properties to
> carry Flink connector-specific configurations, as well as the internally
> used client configuration [1]. I think in this aspect, Beam’s KafkaIO has a
> nice API [2] when it comes to this.
> - Some default behaviors of the current connectors, such as partitioning
> and flushing on the producer sides, and offset-committing for the Kafka
> consumer, do not make sense [3] [4].
> - The deserialization / serialization schema together with the partitioner
> interfaces don’t really place well together. For example, the
> `getTargetTopic` method should really be part of the partitioner [5].
>
> I think we are now in a good position to try making this happen for 1.6.
> Once 1.5 is out of the way, I can try opening an umbrella JIRA and collect
> everything there so we can discuss more there.
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-4280?
> focusedCommentId=15399648=com.atlassian.jira.
> plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15399648
> [2] https://github.com/apache/beam/blob/master/sdks/java/io/
> kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L171
> [3] https://issues.apache.org/jira/browse/FLINK-5728
> [4] https://issues.apache.org/jira/browse/FLINK-5704
> [5] https://issues.apache.org/jira/browse/FLINK-6288
>
> On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote:
>
> Generalizing the pattern would be great. I was also wondering if there
> aren't other commonalities between sources that would benefit from a shared
> framework. Kafka and Kinesis don't look all that different from a consumer
> perspective: replayable source, topic -> stream, partition -> shard, offset
> -> sequence, dynamic discovery, state saving - shouldn't there be more
> common code?
>
> Meanwhile, we need to find a way to address shortcomings in the current
> Kinesis connector to enable the use case. I would prefer to do that without
> permanently forking the connector code, so here are some more thoughts:
> Provide hook to override discovery (not to hit Kinesis from every subtask)
> Provide hook to support custom watermark generation (somewhere around
> KinesisDataFetcher.emitRecordAndUpdateState)
> If we can accomplish these in short order, it would be great. The current
> implementation makes it hard/impossible to override certain behaviors
> (final protected methods and the like). If there is agreement then I would
> like to address those as a quick PR.
>
> Thanks,
> Thomas
>
>
> On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek 
> wrote:
> Hi,
>
> That last point is very valid. For a while now I've wanted to generalise
> the pattern of our file source to other sources. (This is related to how
> Beam sources are being refactored to use Splittable DoFn.)
>
> I'm very eager for design work to start on this once 1.5 is out the door.
> There are some other folks (cc'ed) who have also talked/thought about this
> before.
>
> Best,
> Aljoscha
>
> > On 7. Feb 2018, at 01:44, Thomas Weise  wrote:
> >
> > In addition to lack of watermark support, the Kinesis 

[jira] [Created] (FLINK-6199) Single outstanding Async I/O operation per key

2017-03-27 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-6199:
--

 Summary: Single outstanding Async I/O operation per key
 Key: FLINK-6199
 URL: https://issues.apache.org/jira/browse/FLINK-6199
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Jamie Grier


I would like to propose we extend the Async I/O semantics a bit such that a 
user can guarantee a single outstanding async request per key.

This would allow a user to order async requests per key while still achieving 
the throughput benefits of using Async I/O in the first place.

This is essential for operations where stream order is important but we still 
need to use Async operations to interact with an external system in a 
performant way.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-09 Thread Jamie Grier
rized to receive this for the addressee, you must not use, copy,
> > > disclose or take any action based on this message or any information
> > > herein. If you have received this message in error, please advise the
> > > sender immediately by reply e-mail and delete this message. Thank you
> for
> > > your cooperation.
> > >
> > > On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thank you for the nice proposal!
> > > >
> > > > I think it would make sense to allow user's to affect the readiness
> of
> > > the
> > > > side input. I think making it ready when the first element arrives is
> > > only
> > > > slightly better then making it always ready from usability
> perspective.
> > > For
> > > > instance if I am joining against a static data set I want to wait
> for the
> > > > whole set before making it ready. This could be exposed as a user
> defined
> > > > condition that could also recognize bounded inputs maybe.
> > > >
> > > > Maybe we could also add an aggregating (merging) side input type,
> that
> > > > could work as a broadcast state.
> > > >
> > > > What do you think?
> > > >
> > > > Gyula
> > > >
> > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017.
> márc.
> > > 6.,
> > > > H, 15:18):
> > > >
> > > > > Hi Folks,
> > > > >
> > > > > I would like to finally agree on a plan for implementing side
> inputs in
> > > > > Flink. There has already been an attempt to come to consensus [1],
> > > which
> > > > > resulted in two design documents. I tried to consolidate those two
> and
> > > > > also added a section about implementation plans. This is the
> resulting
> > > > > FLIP:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > 17+Side+Inputs+for+DataStream+API
> > > > >
> > > > >
> > > > > In terms of semantics I tried to go with the minimal viable
> solution.
> > > > > The part that needs discussing is how we want to implement this. I
> > > > > outlined three possible implementation plans in the FLIP but what
> it
> > > > > boils down to is that we need to introduce some way of getting
> several
> > > > > inputs into an operator/task.
> > > > >
> > > > >
> > > > > Please have a look at the doc and let us know what you think.
> > > > >
> > > > >
> > > > >
> > > > > Best,
> > > > >
> > > > > Aljoscha
> > > > >
> > > > >
> > > > >
> > > > > [1]
> > > > > https://lists.apache.org/thread.html/
> 797df0ba066151b77c7951fd7d603a
> > > > 8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E
> > > > >
> > > >
> > >
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: [DISCUSS] Side Outputs and Split/Select

2017-03-08 Thread Jamie Grier
+1

On Sat, Mar 4, 2017 at 12:25 AM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> +1
>
> > On Mar 2, 2017, at 1:08 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> >
> > +1
> >
> > 2017-03-02 12:11 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> >
> >> Ok, so it seems we have to go with the OutputTag variant for windows as
> >> well, for now.
> >>
> >> For Flink 2.0 we can change that. Would everyone be OK with that?
> >>
> >> On Thu, Mar 2, 2017 at 12:05 PM, Robert Metzger <rmetz...@apache.org>
> >> wrote:
> >>
> >>> Flink enforces binary compatibility for all classes tagged with the
> >> @Public
> >>> annotation.
> >>> Binary compatibility allows users to execute a job against a newer
> Flink
> >>> version without recompiling their job jar.
> >>> Your change alters the return type of some methods (apply()). I think
> >>> there's no way to do that in a binary compatible way.
> >>>
> >>> The only thing we could do is keep the return type as is, but return a
> >>> WindowedOperation instance.
> >>> Users could then manually cast the returned object to access the late
> >>> stream.
> >>>
> >>> Downgrading to "source compatibility" only should fix the issue, but
> then
> >>> users have to recompile their Flink jobs when upgrading the Flink
> >> version.
> >>>
> >>> On Tue, Feb 28, 2017 at 9:37 PM, Fabian Hueske <fhue...@gmail.com>
> >> wrote:
> >>>
> >>>> Hi Chen and Aljoscha,
> >>>>
> >>>> thanks for the great proposal and work.
> >>>>
> >>>> I prefer the WindowedOperator.getLateStream() variant without
> explicit
> >>>> tags.
> >>>> I think it is fine to start adding side output to ProcessFunction
> >> (keyed
> >>>> and non-keyed) and window operators and see how it is picked up by
> >> users.
> >>>>
> >>>> Best, Fabian
> >>>>
> >>>>
> >>>> 2017-02-28 15:42 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> >>>>
> >>>>> Quick update: I created a branch where I make the result type of
> >>>>> WindowedStream operations more specific:
> >>>>> https://github.com/aljoscha/flink/blob/windowed-stream-
> >>>>> result-specific/flink-streaming-java/src/main/java/
> >>>>> org/apache/flink/streaming/api/datastream/WindowedStream.java
> >>>>>
> >>>>> We would need this for the "lateStream()" API without the explicit
> >>>>> OutputTag.
> >>>>>
> >>>>> It seems the backwards compatibility checker doesn't like this and
> >>>>> complains about breaking binary backwards compatibility. +Robert
> >>> Metzger
> >>>>> <rmetz...@apache.org> Do you have an idea what we could do there?
> >>>>>
> >>>>> On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <u...@apache.org> wrote:
> >>>>>
> >>>>>> On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <
> >>>> aljos...@apache.org>
> >>>>>> wrote:
> >>>>>>> I see the ProcessFunction as a bit of the generalised future of
> >>>>> FlatMap,
> >>>>>> so
> >>>>>>> to me it makes sense to only allow side outputs on the
> >>>> ProcessFunction
> >>>>>> but
> >>>>>>> I'm open for anything. If we decide for this I'm happy with an
> >>>>> additional
> >>>>>>> method on Collector.
> >>>>>>
> >>>>>> I think it's best to restrict this to ProcessFunction after all
> >>> (given
> >>>>>> that we allow it for non-keyed streams, etc.). ;-)
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: [DISCUSS] Per-key event time

2017-02-28 Thread Jamie Grier
to O2 per
> > > watermark (of course it depends on how fast the watermarks arrive),
> > > total of 2. Although, if we keep track of per-key watermarks, S1 would
> > > need to send watermarks for every key directed to O1, also for O2. So
> if
> > > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if
> watermarks
> > > arrive at the same rate per-key as per-source in the previous case) we
> > > S1 would send a total of 20 watermarks.
> > >
> > > Another question is whether how large the state-per-key is? If it's
> > > really small (an integer maybe, or state of a small state machine),
> then
> > > the overhead of keeping track of a (Long) watermark is large
> > > memory-wise. E.g. Int state vs. Long watermark results in 3x as large
> > > state. Also, the checkpointing would be ~3x as slow. Of course, for
> > > large states a Long watermark would not mean much overhead.
> > >
> > > We could resolve the memory issue by using some kind of sketch data
> > > structure. Right now the granularity of watermark handling is
> > > per-operator-instance. On the other hand, per-key granularity might be
> > > costly. What if we increased the granularity of watermarks inside an
> > > operator by keeping more than one watermark tracker in one operator?
> > > This could be quite simply done with a hash table. With a hash table of
> > > size 1, we would yield the current semantics (per-operator-instance
> > > granularity). With a hash table large enough to have at most one key
> per
> > > bucket, we would yield per-key watermark tracking. In between lies the
> > > trade-off between handling time-skew and a lot of memory overhead. This
> > > does not seem hard to implement.
> > >
> > > Of course, at some point we would still need to take care of watermarks
> > > per-key. Imagine that keys A and B would go to the same bucket of the
> > > hash table, and watermarks are coming in like this: (B,20), (A,10),
> > > (A,15), (A,40). Then the watermark of the bucket should be the minimum
> > > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of
> > > the watermarks of A and B separately. But after we have a correct
> > > watermark for the bucket, all we need to care about is the bucket
> > > watermarks. So somewhere (most probably at the source) we would have to
> > > pay memory overhead of tracking every key, but nowhere else in the
> > > topology.
> > >
> > > Regarding the potentially large network overhead, the same compression
> > > could be useful. I.e. we would not send watermarks from one operator
> > > per-key, but rather per-hash. Again, the trade-off between time skew
> and
> > > memory consumption is configurable by the size of the hash table used.
> > >
> > > Cheers,
> > > Gabor
> > >
> > > On 2017-02-23 08:57, Paris Carbone wrote:
> > >
> > > > Hey Jamie!
> > > >
> > > > Key-based progress tracking sounds like local-only progress tracking
> to
> > > me, there is no need to use a low watermarking mechanism at all since
> all
> > > streams of a key are handled by a single partition at a time (per
> > operator).
> > > > Thus, this could be much easier to implement and support (i.e., no
> need
> > > to broadcast the progress state of each partition all the time).
> > > > State-wise it should be fine too if it is backed by rocksdb,
> especially
> > > if we have MapState in the future.
> > > >
> > > > Just my quick thoughts on this, to get the discussion going :)
> > > >
> > > > cheers
> > > > Paris
> > > >
> > > >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com>
> > wrote:
> > > >>
> > > >> Hi Flink Devs,
> > > >>
> > > >> Use cases that I see quite frequently in the real world would
> benefit
> > > from
> > > >> a different watermarking / event time model than the one currently
> > > >> implemented in Flink.
> > > >>
> > > >> I would call Flink's current approach partition-based watermarking
> or
> > > maybe
> > > >> subtask-based watermarking. In this model the current "event time"
> is
> > a
> > > >> property local to each subtask instance in a dataflow graph. The
> event
> > > >> time at any subtask is the minimum of the watermarks it has

Re: [DISCUSS] Side Outputs and Split/Select

2017-02-27 Thread Jamie Grier
Aljoscha,

Ahh, that is much better.  As long as it's explicitly referring to late
data I think it's fine.  I also like the second variant where a user
doesn't have to explicitly create the OutputTag.



On Mon, Feb 27, 2017 at 8:45 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> @Jamie I must have mistyped my last API proposal. This piece of code:
> WindowedOperator windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream lateData = windowedResult.getSideOutput();
>
> should actually have been:
>
> WindowedOperator windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream lateData = windowedResult.getLateDataSideOutput();
>
> So apart from the naming it's pretty much the same as your suggestion,
> right? The reason why I preferred the explicit OutputTag is that we
> otherwise have to create another layer of OutputTags that are internal to
> the system so that users cannot accidentally also send data to the same
> side output. It just means writing more code for use and introducing the
> more concrete return type for the WindowedStream operations. But that's
> fine if y'all prefer that variant. :-)
>
> On Sat, 25 Feb 2017 at 04:19 Chen Qin <qinnc...@gmail.com> wrote:
>
> > Hi Jamie,
> >
> > I think it does make consuming late arriving events more explicit! At
> cost
> > of
> > fix a predefined OutputTag which user have no control nor definition
> > an extra UDF which essentially filter out all mainOutputs and only let
> > sideOutput pass (like filterFunction)
> >
> > Thanks,
> > Chen
> >
> > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> > wrote:
> > >
> > > I prefer the ProcessFunction and side outputs solution over split() and
> > > select() which I've never liked primarily due to the lack of type
> safety
> > > and it also doesn't really seem to fit with the rest of Flink's API.
> > >
> > > On the late data question I strongly prefer the late data concept being
> > > explicit in the API.  Could we not also do something like:
> > >
> > > WindowedStream<> windowedStream = input
> > >  .keyBy(...)
> > >  .window(...);
> > >
> > > DataStream<> mainOutput = windowedStream
> > >  .apply(...);
> > >
> > > DataStream<> lateOutput = windowStream
> > >  .lateStream();
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote:
> > >
> > >> Hi,
> > >>
> > >> Thanks for the nice proposal, I like the idea of side outputs, and it
> > would
> > >> make a lot of topologies much simpler.
> > >>
> > >> Regarding the API I think we should come up with a way of making side
> > >> otuputs accessible from all sort of operators in a similar way. For
> > >> instance through the RichFunction interface with a special collector
> > that
> > >> we invalidate when the user should not be collecting to it. (just a
> > quick
> > >> idea)
> > >>
> > >> I personally wouldn't deprecate the "universal" Split/Select API that
> > can
> > >> be used on any  DataStream in favor of functionality that is only
> > >> accessible trhough the process function/ or a few select operators. I
> > think
> > >> the Split/Select pattern is also very nice and I use it in many
> > different
> > >> contexts to get efficient multiway filtering (after map/co operators
> for
> > >> examples).
> > >>
> > >> Regards,
> > >> Gyula
> > >>
> > >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr.
> > 23.,
> > >> Cs, 15:42):
> > >>
> > >>> Hi Folks,
> > >>> Chen and I have been working for a while now on making FLIP-13 (side
> > >>> outputs) [1] a reality. We think we have a pretty good internal
> > >>> implementation and also a proposal for an API but now we need to
> > discuss
> > >>> how we want to go forward with this, especially how we should deal
> with
> > >>> split/select which does some of the same things side outputs can do.
> > I'll
> > >>> first quickly describe what the split/select API looks like, so that
> > >> we're
> > >>> all on the same

Re: [DISCUSS] Side Outputs and Split/Select

2017-02-24 Thread Jamie Grier
etSideOutput(sideOutput1);
> > DataStream sideOutputStream2 =
> > mainOutputStream.getSideOutput(sideOutput2);
> >
> > Notice how the OutputTags are anonymous inner classes, similar to
> TypeHint.
> > We need this to be able to analyse the type of the side-output streams.
> > Also notice, how the types of the side-output streams can be independent
> of
> > the main-output stream, also notice how everything is correctly type
> > checked by the Java Compiler.
> >
> > This change requires making ProcessFunction an abstract base class so
> that
> > not every user has to implement the onTimer() method. We would also need
> to
> > allow ProcessFunction on a non-keyed stream.
> >
> > Chen also implemented an API based on FlatMapFunction that looks like the
> > one proposed in the FLIP. This relies on CollectorWrapper, which can be
> > used to "pimp" a Collector to also allow emitting to side outputs.
> >
> > For WindowedStream we have two proposals: make OutputTag visible on the
> > WindowedStream API or make the result type of WindowedStream operations
> > more specific to allow a getDroppedDataSideOutput() method. For the first
> > proposal it would look like this:
> >
> > final OutputTag lateDataTag = new OutputTag<>("side-output-1"){}
> ;
> >
> > DataStream windowedResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .sideOutputLateData(lateDataTag)
> >   .apply(...)
> >
> > DataStream lateData = windowedResult.getSideOutput(lateDataTag);
> >
> > For the second proposal it would look like this:
> >
> > WindowedOperator windowedResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .apply(...)
> >
> > DataStream lateData = windowedResult.getSideOutput();
> >
> > Right now, the result of window operations is a
> > SingleOutputStreamOperator, same as it is for all DataStream
> operations.
> > Making the result type more specific, i.e. a WindowedOperator, would
> allow
> > us to add extra methods there. This would require wrapping a
> > SingleOutputStreamOperator and forwarding all the method calls to the
> > wrapped operator which can be a bit of a hassle for future changes. The
> > first proposal requires additional boilerplate code.
> >
> > Sorry for the long mail but I think it's necessary to get everyone on the
> > same page. The question is now: how should we proceed with the proposed
> API
> > and the old split/select API? I propose to deprecate split/select and
> only
> > have side outputs, going forward. Of course, I'm a bit biased on this.
> ;-)
> > If we decide to do this, we also need to decide on what the side output
> API
> > should look like.
> >
> > Happy discussing! Feedback very welcome. :-)
> >
> > Best,
> > Aljoscha
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 13+Side+Outputs+in+Flink
> >
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


[DISCUSS] Per-key event time

2017-02-22 Thread Jamie Grier
Hi Flink Devs,

Use cases that I see quite frequently in the real world would benefit from
a different watermarking / event time model than the one currently
implemented in Flink.

I would call Flink's current approach partition-based watermarking or maybe
subtask-based watermarking.  In this model the current "event time" is a
property local to each subtask instance in a dataflow graph.  The event
time at any subtask is the minimum of the watermarks it has received on
each of it's input streams.

There are a couple of issues with this model that are not optimal for some
(maybe many) use cases.

1) A single slow subtask (or say source partition) anywhere in the dataflow
can mean no progress can be made on the computation at all.

2) In many real world scenarios the time skew across keys can be *many*
times greater than the time skew within the data with the same key.

In this discussion I'll use "time skew" to refer to the out-of-orderness
with respect to timestamp of the data.  Out-of-orderness is a mouthful ;)

Anyway, let me provide an example or two.

In IoT applications the source of events is a particular device out in the
world, let's say a device in a connected car application.  The data for
some particular device may be very bursty and we will certainly get events
from these devices in Flink out-of-order just because of things like
partitions in Kafka, shuffles in Flink, etc.  However, the time skew in the
data for a single device should likely be very small (milliseconds or maybe
seconds)..

However, in the same application the time skew across different devices can
be huge (hours or even days).  An obvious example of this, again using
connected cars as a representative example is the following:  Car A is
recording data locally at 12:00 pm on Saturday but doesn't currently have a
network connection.  Car B is doing the same thing but does have a network
connection.  Car A will transmit it's data when the network comes back on
line.  Let's say this is at 4pm.  Car B was transmitting it's data
immediately.  This creates a huge time skew (4 hours) in the observed
datastream when looked at as a whole.  However, the time skew in that data
for Car A or Car B alone could be tiny.  It will be out of order of course
but maybe by only milliseconds or seconds.

What the above means in the end for Flink is that the watermarks must be
delayed by up to 4 hours or more because we're looking at the data stream
as a whole -- otherwise the data for Car A will be considered late.  The
time skew in the data stream when looked at as a whole is large even though
the time skew for any key may be tiny.

This is the problem I would like to see a solution for.  The basic idea of
keeping track of watermarks and event time "per-key" rather than per
partition or subtask would solve I think both of these problems stated
above and both of these are real issues for production applications.

The obvious downside of trying to do this per-key is that the amount of
state you have to track is much larger and potentially unbounded.  However,
I could see this approach working if the keyspace isn't growing rapidly but
is stable or grows slowly.  The saving grace here is that this may actually
be true of the types of applications where this would be especially
useful.  Think IoT use cases.  Another approach to keeping state size in
check would be a configurable TTL for a key.

Anyway, I'm throwing this out here on the mailing list in case anyone is
interested in this discussion, has thought about the problem deeply
already, has use cases of their own they've run into or has ideas for a
solution to this problem.

Thanks for reading..

-Jamie


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


[jira] [Created] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-01-24 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-5635:
--

 Summary: Improve Docker tooling to make it easier to build images 
and launch Flink via Docker tools
 Key: FLINK-5635
 URL: https://issues.apache.org/jira/browse/FLINK-5635
 Project: Flink
  Issue Type: Improvement
  Components: Docker
Affects Versions: 1.2.0
Reporter: Jamie Grier
 Fix For: 1.2.0


This is a bit of a catch-all ticket for general improvements to the Flink on 
Docker experience.

Things to improve:
  - Make it possible to build a Docker image from your own flink-dist directory 
as well as official releases.
  - Make it possible to override the image name so a user can more easily 
publish these images to their Docker repository
  - Provide scripts that show how to properly run on Docker Swarm or similar 
environments with overlay networking (Kubernetes) without using host networking.
  - Log to stdout rather than to files.
  - Work properly with docker-compose for local deployment as well as 
production deployments (Swarm/k8s)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Categorize or GroupBy datastream data and process with CEP separately

2017-01-04 Thread Jamie Grier
If you are trying to do the same CEP computation for each event type that's
exactly what will happen for a KeyedStream.  For example if you key by
event type you can think of this like creating a separate substream for
each key/eventType and then applying the CEP operations to each substream
individually.  Is this not what you're trying to do?



On Wed, Jan 4, 2017 at 3:32 AM, madhairsilence <harish.kum...@tcs.com>
wrote:

>
> This is my code.
>
> *SplitStream splitStream =  inputStream.split(new
> OutputSelector() {
>
> @Override
> public Iterable select(MonitoringEvent me) {
>
> List ml = new ArrayList();
> ml.add(me.getEventType());
> return ml;
> }*
> I have stream of Monitoring Events coming in random order temp : 80,
> pressure : 70 , humidity :80, temp:30...
>
> With the above code, am splitting the stream , eventType wise i.e
> temperatureStream, pressureStream.
>
> The problem is , if I know the eventType, i can select it from the
> splitStream like
>
> *splitStream.select('temperatureStream')*
> but the eventType is dynamic and not pre-defined.
>
> How will I apply CEP for this dynamic stream. The CEP would be like, if the
>
> *temperate is > 90 for past 10 minutes ...
>
> pressure is > 90 for past 10 minutes ...*
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Categorize-or-
> GroupBy-datastream-data-and-process-with-CEP-separately-tp15139p15148.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Categorize or GroupBy datastream data and process with CEP separately

2017-01-03 Thread Jamie Grier
I think what you want here is to apply CEP processing on a KeyedStream -
see the last CEP Example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/libs/cep.html#examples


On Tue, Jan 3, 2017 at 3:30 AM, madhairsilence <harish.kum...@tcs.com>
wrote:

> Assume I have a datastream
>
> *x:1, y:2 , z:3 , x:7 , y:-1, z:0, z:3 , z:2, y:3 ,x: 2 ,y:6*
> How do I put x,y,z in their own bucket and apply my CEP rule on it.
>
> *x:1, x:7,x: 2
> y:2, y:-1, y:3 , y:6
> z:3, z:0 , z:3, z:2*
> Or to put it in other way. How do I split the stream in to these
> categories(one stream for each x,y,z). I would get 3 sub streams which has
> their own CEP processing.
>
> The challenge here is , the x,y,z are not pre-defined.So I cannot
> pre-create
> streams and assign using an if or switch statement.
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Categorize-or-
> GroupBy-datastream-data-and-process-with-CEP-separately-tp15139.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Dynamic Scaling

2017-01-02 Thread Jamie Grier
Hi Govind,

In Flink 1.2 (feature complete, undergoing test) you will be able to scale
your jobs/operators up and down at will, however you'll have to build a
little tooling around it yourself and scale based on your own metrics.  You
should be able to integrate this with Docker Swarm or Amazon auto-scaling
groups but I haven't done it myself yet.

The way this will really work is the following sequence:

1) Detect that you want to scale up or down (this part is up to you)
2) Flink Job cancel with Savepoint -- this will shut down with a savepoint
in such a way that no messages will need to be re-processed.
3) Launch Flink job from savepoint with different parallelism.

That's it.  You should be able to script this such that the whole process
takes just a couple of seconds.  What I don't have for you right now is any
sort of DIRECT integration with Docker or Amazon for scaling.  You have to
trigger this procedure yourself based on metrics, etc.   Do you think this
will work for your use case?

On Fri, Dec 23, 2016 at 11:11 AM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Hi,
>
> We have a computation heavy streaming flink job which will be processing
> around 100 million message at peak time and around 1 million messages in
> non peak time. We need the capability to dynamically scale so that the
> computation operator can scale up and down during high or low work loads
> respectively without restarting the job in order to lower the machine
> costs.
>
> Is there an ETA on when the rescaling a single operator without restart
> feature will be released?
>
> Is it possible to auto scale one of the operators with docker swarm or
> Amazon ECS auto scaling based on kafka consumer lag or cpu consumption? If
> so can I get some documentation or steps to achieve this behaviour.
>
> Also is there any document on what are the tasks of a job manager apart
> from scheduling and reporting status?
>
> Since there is just one job manager we just wanted to check if there would
> be any potential scaling limitations as the processing capacity increases.
>
> Thanks
> Govind
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


[jira] [Created] (FLINK-4980) Include example source code in Flink binary distribution

2016-10-31 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-4980:
--

 Summary: Include example source code in Flink binary distribution
 Key: FLINK-4980
 URL: https://issues.apache.org/jira/browse/FLINK-4980
 Project: Flink
  Issue Type: Improvement
Reporter: Jamie Grier


I think we should include the Flink examples source code in the binary 
distribution of Flink.  This would allow people to download Flink and run 
examples (as now), but also play around with and modify the examples.

Right now they would have to actually get the Flink source distribution if they 
wanted the examples source -- which I think is onerous.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Timely function interface and timer params

2016-10-29 Thread Jamie Grier
Hi guys,

Good points, Gyula.  I think it would be much easier on a user if there
could be multiple timers in flight per key.  I prefer the second approach,
though, where a user associates some bit of metadata with the timer and we
pass it back to them in the onTimer() callback, otherwise they are forced
to maintain this state themselves.

It looks to me like somehow exposing the namespaces, even if it's simpler
and just a string, is the way to go.

I'm really excited by this guys!  I think the TimelyFlatMap and
TimelyCoFlatMap are going to get a LOT of use.  This is gonna make a lot of
people happy.

-Jamie


On Fri, Oct 28, 2016 at 1:58 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Gyula,
> if you look at the internal API you'll notice that it is pretty much like
> your second proposal. Just for reference, the interface is roughly this:
>
> public interface InternalTimerService {
>   long currentProcessingTime();
>   long currentWatermark();
>   void registerProcessingTimeTimer(N namespace, long time);
>   void deleteProcessingTimeTimer(N namespace, long time);
>   void registerEventTimeTimer(N namespace, long time);
>   void deleteEventTimeTimer(N namespace, long time);
> }
>
> that namespace bit can be anything for which you can provide a
> TypeSerializer.
>
> IMHO, this goes back a bit to the discussion about adding a low level
> operator-like interface that allows pretty much anything a custom operator
> can do but with a defined, stable interface. The internal operator
> interface is somewhat in flux, still, so I wouldn't recommend people to use
> it directly.
>
> The only thing missing, really, from TimelyFlatMap is access to namespaces
> for state and timers. With that, you could implement even the
> WindowOperator as a TimelyFlatMap since I worked on abstracting everything
> that it uses away behind interfaces that any operator can use. The last
> pice, the generic timer API went in last, of course. :-)
>
> Cheers,
> Aljoscha
>
> On Fri, 28 Oct 2016 at 16:55 Gyula Fóra <gyf...@apache.org> wrote:
>
> > Hello,
> >
> > I was thinking about the methods provided by the timely functions and the
> > timerservice and I am wondering if it makes sense to change them a little
> > so they can cover a wider set of use case. Maybe I missed something
> > completely obvious so please shoot me down in that case :)
> >
> > Currently the user gets a TimerService to register timers that will in
> the
> > future call the onTimer method. It is not completely obvious to me how
> > would I implement a function that needs to trigger two types of callbacks
> > in the future. If I get only one onTimer method I should be able to pass
> in
> > some sort of parameter or flag so I can branch in my onTimer
> > implementation.
> >
> > As parameters are not supported I am left with states that are scoped to
> > the keys which is also pretty useless if I want to trigger different
> timed
> > actions for the same keys.
> >
> > I know this is quite tricky but I see some alternative options:
> >  - The register timer method returns a unique (per key) timer id, so we
> can
> > associate state with this id to fetch info about the timer registered.
> (We
> > could also remove timers with this id and maybe add methods to remove all
> > for the current key)
> >  - Allow the users to pass a custom parameter when registering the
> > callback, and the parameter would be passed to the onTimer method
> >  - Allow users to pass custom callback functions when registering the
> > timers, but this would mean we have to support some sort of context for
> > accessing the state (like the window context before)
> >  - We could go for an annotation based API like in beam but thats
> probably
> > not good to mix in the current ones
> >
> > I personally prefer the first one.
> >
> > What do you think?
> >
> > Regards,
> > Gyula
> >
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


[jira] [Created] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.

2016-10-27 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-4947:
--

 Summary: Make all configuration possible via flink-conf.yaml and 
CLI.
 Key: FLINK-4947
 URL: https://issues.apache.org/jira/browse/FLINK-4947
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Jamie Grier
 Fix For: 1.2.0


I think it's important to make all configuration possible via the 
flink-conf.yaml and the command line.

As an example:  To enable "externalizedCheckpoints" you must actually call the 
StreamExecutionEnvironment#enableExternalizedCheckpoints() method from your 
Flink program.

Another example of this would be configuring the RocksDB state backend.

I think it important to make deployment flexible and easy to build tools 
around.  For example, the infrastructure teams that make these configuration 
decisions and provide tools for deploying Flink apps, will be different from 
the teams deploying apps.  The team writing apps should not have to set all of 
this lower level configuration up in their programs.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Flink: How to handle external app configuration changes in flink

2016-09-26 Thread Jamie Grier
Hi Govindarajan,

Typically the way people do this is to create a stream of configuration
changes and consume this like any other stream.  For the specific case of
filtering for example you may have a data stream and a stream of filters
that you want to run the data through.  The typically approach in the Flink
API would then be

val dataStream = env.addSource(dataSource).keyBy("userId")val
filterStream = env.addSource(filterSource).keyBy("userId")
val connectedStream = dataStream
  .connect(filterStream)
  .flatMap(yourFilterFunction)

​
You would maintain your filters as state in your filter function.  Notice
that in this example both streams are keyed the same way.

If it is not possible to distribute the configuration by key (it really
depends on your use case) you can instead "broadcast" that state so that
each instance of yourFilterFunction sees the same configuration messages
and will end up building the same state.  For example:

val dataStream = env.addSource(dataSource).keyBy("userId")val
filterStream = env.addSource(filterSource).broadcast()
val connectedStream = dataStream
  .connect(filterStream)
  .flatMap(yourFilterFunction)

​
I hope that helps.

-Jamie




On Mon, Sep 26, 2016 at 4:34 AM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Hi,
>
> My requirement is to stream millions of records in a day and it has huge
> dependency on external configuration parameters. For example, a user can go
> and change the required setting anytime in the web application and after
> the change is made, the streaming has to happen with the new application
> config parameters. These are app level configurations and we also have some
> dynamic exclude parameters which each data has to be passed through and
> filtered.
>
> I see that flink doesn’t have global state which is shared across all task
> managers and subtasks. Having a centralized cache is an option but for each
> parameter I would have to read it from cache which will increase the
> latency. Please advise on the better approach to handle these kind of
> scenarios and how other applications are handling it. Thanks.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Questions on flink

2016-09-26 Thread Jamie Grier
Hi Govindarajan,

I've put some answers in-line below..

On Sat, Sep 24, 2016 at 7:32 PM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Hi,
>
> I'm working on apache flink for data streaming and I have few questions.
> Any help is greatly appreciated. Thanks.
>
> 1) Are there any restrictions on creating tumbling windows. For example,
> if I want to create a tumbling window per user id for 2 secs and let’s say
> if I have more than 10 million user id's would that be a problem. (I'm
> using keyBy user id and then creating a timeWindow for 2 secs)? How are
> these windows maintained internally in flink?
>

That should not be a problem in general.  An important question may be how
many unique keys will you see in two seconds.  This is more important than
your total key cardinality of 10 Million and probably a *much* smaller
number unless your input message rate is really high.

>
> 2) I looked at rebalance for round robin partitioning. Let’s say I have a
> cluster set up and if I have a parallelism of 1 for source and if I do a
> rebalance, will my data be shuffled across machines to improve performance?
> If so is there a specific port using which the data is transferred to other
> nodes in the cluster?
>

Yes, rebalance() does a round-robin distribution of messages to other
machines in the cluster.  There is not a specific port used for each
TaskManager to communicate on but rather an available port is assigned at
runtime.  This is the default.  You can also set this to a specific port if
you have reason and a lot depends on how you will deploy -- via YARN or as
a standalone Flink cluster.


>
> 3) Are there any limitations on state maintenance? I'm planning to
> maintain some user id related data which could grow very large. I read
> about flink using rocks db to maintain the state. Just wanted to check if
> there are any limitations on how much data can be maintained?
>

Yes, there are limits.  The total data that can be maintained today is
determined by the fact that Flink has to periodically snapshot this data
and copy it to a persistent storage system such as HDFS whether you are
using RocksDB or not.  The aggregate bandwidth required to your storage
system (like HDFS) is your total Flink state size multiplied by your Flink
checkpoint interval.


> 4) Also where is the state maintained if the amount of data is less? (I
> guess in JVM memory) If I have several machines on my cluster can every
> node get the current state version?
>

I'm not exactly sure what you're asking here.  All data is check-pointed to
a persistent store which must be accessible from each machine in the
cluster.


> 5) I need a way to send external configuration changes to flink. Lets say
> there is a new parameter that has to added or an external change which has
> to be updated inside flink's state, how can this be done?
>

The typical way to do this is to consume that configuration as a stream and
hold the configuration internally in the state of a particular user
function.


>
> Thanks
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Some thoughts about the lower-level Flink APIs

2016-08-15 Thread Jamie Grier
You lost me at lattice, Aljoscha ;)

I do think something like the more powerful N-way FlatMap w/ Timers
Aljoscha is describing here would probably solve most of the problem.
Often Flink's higher level primitives work well for people and that's
great.  It's just that I also spend a fair amount of time discussing with
people how to map what they know they want to do onto operations that
aren't a perfect fit and it sometimes liberates them when they realize they
can just implement it the way they want by dropping down a level.  They
usually don't go there themselves, though.

I mention teaching this "first" and then the higher layers I guess because
that's just a matter of teaching philosophy.  I think it's good to to see
the basic operations that are available first and then understand that the
other abstractions are built on top of that.  That way you're not afraid to
drop-down to basics when you know what you want to get done.

-Jamie


On Mon, Aug 15, 2016 at 2:11 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi All,
> I also thought about this recently. A good think would be to add a good
> user facing operator that behaves more or less like an enhanced FlatMap
> with multiple inputs, multiple outputs, state access and keyed timers. I'm
> a bit hesitant, though, since users rarely think about the implications
> that come with state updating and out-of-order events. If you don't
> implement a stateful operator correctly you have pretty much arbitrary
> results.
>
> The problem with out-of-order event arrival and state update is that the
> state basically has to monotonically transition "upwards" through a lattice
> for the computation to make sense. I know this sounds rather theoretical so
> I'll try to explain with an example. Say you have an operator that waits
> for timestamped elements A, B, C to arrive in timestamp order and then does
> some processing. The naive approach would be to have a small state machine
> that tracks what element you have seen so far. The state machine has three
> states: NOTHING, SEEN_A, SEEN_B and SEEN_C. The state machine is supposed
> to traverse these states linearly as the elements arrive. This doesn't
> work, however, when elements arrive in an order that does not match their
> timestamp order. What the user should do is to have a "Set" state that
> keeps track of the elements that it has seen. Once it has seen {A, B, C}
> the operator must check the timestamps and then do the processing, if
> required. The set of possible combinations of A, B, and C forms a lattice
> when combined with the "subset" operation. And traversal through these sets
> is monotonically "upwards" so it works regardless of the order that the
> elements arrive in. (I recently pointed this out on the Beam mailing list
> and Kenneth Knowles rightly pointed out that what I was describing was in
> fact a lattice.)
>
> I know this is a bit off-topic but I think it's very easy for users to
> write wrong operations when they are dealing with state. We should still
> have a good API for it, though. Just wanted to make people aware of this.
>
> Cheers,
> Aljoscha
>
> On Mon, 15 Aug 2016 at 08:18 Matthias J. Sax <mj...@apache.org> wrote:
>
> > It really depends on the skill level of the developer. Using low-level
> > API requires to think about many details (eg. state handling etc.) that
> > could be done wrong.
> >
> > As Flink gets a broader community, more people will use it who might not
> > have the required skill level to deal with low-level API. For more
> > trained uses, it is of course a powerful tool!
> >
> > I guess it boils down to the question, what type of developer Flink
> > targets, if low-level API should be offensive advertised or not. Also
> > keep in mind, that many people criticized Storm's low-level API as hard
> > to program etc.
> >
> >
> > -Matthias
> >
> > On 08/15/2016 07:46 AM, Gyula Fóra wrote:
> > > Hi Jamie,
> > >
> > > I agree that it is often much easier to work on the lower level APIs if
> > you
> > > know what you are doing.
> > >
> > > I think it would be nice to have very clean abstractions on that level
> so
> > > we could teach this to the users first but currently I thinm its not
> easy
> > > enough to be good starting point.
> > >
> > > The user needs to understand a lot about the system if the dont want to
> > > hurt other parts of the pipeline. For insance working with the
> > > streamrecords, propagating watermarks, working with state internals
> > >
> > > This all might be overwhelming at the first glance. But maybe we can
> slim
> >

Some thoughts about the lower-level Flink APIs

2016-08-13 Thread Jamie Grier
Hey all,

I've noticed a few times now when trying to help users implement particular
things in the Flink API that it can be complicated to map what they know
they are trying to do onto higher-level Flink concepts such as windowing or
Connect/CoFlatMap/ValueState, etc.

At some point it just becomes easier to think about writing a Flink
operator yourself that is integrated into the pipeline with a transform()
call.

It can just be easier to think at a more basic level.  For example I can
write an operator that can consume one or two input streams (should
probably be N), update state which is managed for me fault tolerantly, and
output elements or setup timers/triggers that give me callbacks from which
I can also update state or emit elements.

When you think at this level you realize you can program just about
anything you want.  You can create whatever fault-tolerant data structures
you want, and easily execute robust stateful computation over data streams
at scale.  This is the real technology and power of Flink IMO.

Also, at this level I don't have to think about the complexities of
windowing semantics, learn as much API, etc.  I can easily have some inputs
that are broadcast, others that are keyed, manage my own state in whatever
data structure makes sense, etc.  If I know exactly what I actually want to
do I can just do it with the full power of my chosen language, data
structures, etc.  I'm not "restricted" to trying to map everything onto
higher-level Flink constructs which is sometimes actually more complicated.

Programming at this level is actually fairly easy to do but people seem a
bit afraid of this level of the API.  They think of it as low-level or
custom hacking..

Anyway, I guess my thought is this..  Should we explain Flink to people at
this level *first*?  Show that you have nearly unlimited power and
flexibility to build what you want *and only then* from there explain the
higher level APIs they can use *if* those match their use cases well.

Would this better demonstrate to people the power of Flink and maybe
*liberate* them a bit from feeling they have to map their problem onto a
more complex set of higher level primitives?  I see people trying to
shoe-horn what they are really trying to do, which is simple to explain in
english, onto windows, triggers, CoFlatMaps, etc, and this get's
complicated sometimes.  It's like an impedance mismatch.  You could just
solve the problem very easily programmed in straight Java/Scala.

Anyway, it's very easy to drop down a level in the API and program whatever
you want but users don't seem to *perceive* it that way.

Just some thoughts...  Any feedback?  Have any of you had similar
experiences when working with newer Flink users or as a newer Flink user
yourself?  Can/should we do anything to make the *lower* level API more
accessible/visible to users?

-Jamie


[jira] [Created] (FLINK-4391) Provide support for asynchronous operations over streams

2016-08-13 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-4391:
--

 Summary: Provide support for asynchronous operations over streams
 Key: FLINK-4391
 URL: https://issues.apache.org/jira/browse/FLINK-4391
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Reporter: Jamie Grier


Many Flink users need to do asynchronous processing driven by data from a 
DataStream.  The classic example would be joining against an external database 
in order to enrich a stream with extra information.

It would be nice to add general support for this type of operation in the Flink 
API.  Ideally this could simply take the form of a new operator that manages 
async operations, keeps so many of them in flight, and then emits results to 
downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI

2016-03-29 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-3680:
--

 Summary: Remove or improve (not set) text in the Job Plan UI
 Key: FLINK-3680
 URL: https://issues.apache.org/jira/browse/FLINK-3680
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Reporter: Jamie Grier


When running streaming jobs the UI display (not set) in the UI in a few 
different places.  This is not the case for batch jobs.

To illustrate I've included screen shots of the UI for the batch and streaming 
WordCount examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-03-29 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-3679:
--

 Summary: DeserializationSchema should handle zero or more outputs 
for every input
 Key: FLINK-3679
 URL: https://issues.apache.org/jira/browse/FLINK-3679
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Reporter: Jamie Grier


There are a couple of issues with the DeserializationSchema API that I think 
should be improved.  This request has come to me via an existing Flink user.

The main issue is simply that the API assumes that there is a one-to-one 
mapping between input and outputs.  In reality there are scenarios where one 
input message (say from Kafka) might actually map to zero or more logical 
elements in the pipeline.

Particularly important here is the case where you receive a message from a 
source (such as Kafka) and say the raw bytes don't deserialize properly.  Right 
now the only recourse is to throw IOException and therefore fail the job.  

This is definitely not good since bad data is a reality and failing the job is 
not the right option.  If the job fails we'll just end up replaying the bad 
data and the whole thing will start again.

Instead in this case it would be best if the user could just return the empty 
set.

The other case is where one input message should logically be multiple output 
messages.  This case is probably less important since there are other ways to 
do this but in general it might be good to make the 
DeserializationSchema.deserialize() method return a collection rather than a 
single element.

Maybe we need to support a DeserializationSchema variant that has semantics 
more like that of FlatMap.







--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3627) Task stuck on lock in StreamSource when cancelling

2016-03-19 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-3627:
--

 Summary: Task stuck on lock in StreamSource when cancelling
 Key: FLINK-3627
 URL: https://issues.apache.org/jira/browse/FLINK-3627
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Jamie Grier


I've seen this occur a couple of times when the # of network buffers is set too 
low.  The job fails with the an appropriate message indicating that the user 
should increase the # of network buffers.  However, some of the task threads 
then hang with a stack trace similar to the following.

2016-03-16 13:38:54,017 WARN  org.apache.flink.runtime.taskmanager.Task 
- Task 'Source: EventGenerator -> (Flat Map, blah -> Filter -> 
Projection -> Flat Map -> Timestamps/Watermarks -> Map) (46/144)' did not react 
to cancelling signal, but is stuck in method:
 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:317)
flink.benchmark.generator.LoadGeneratorSource.run(LoadGeneratorSource.java:38)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3617) NPE from CaseClassSerializer when dealing with null Option field

2016-03-15 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-3617:
--

 Summary: NPE from CaseClassSerializer when dealing with null 
Option field
 Key: FLINK-3617
 URL: https://issues.apache.org/jira/browse/FLINK-3617
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.0.0
Reporter: Jamie Grier


This error occurs when serializing a Scala case class with an field of Option[] 
type where the value is not Some or None, but null.

If this is not supported we should have a good error message.

java.lang.RuntimeException: ConsumerThread threw an exception: null
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at 
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:107)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:84)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: New user of Flink

2016-03-09 Thread Jamie Grier
Yes, in IntelliJ you just need to setup a JDK.  If you don't already have
one just download and install a recent JDK and then point IntelliJ to it.

On Wed, Mar 9, 2016 at 12:27 PM, janardhan shetty <janardhan...@gmail.com>
wrote:

> Hi Flink team,
>
> I am setting up my dev environment to start developing flink applications
> primarily on Java.
> Can you pls provide few clarifications:
>
> Fyi,
> First time use of Intelli J and Flink:
>
> Clarifications needed:
>
> 1. After importing quickstart as maven project in intelli J . It asks for
> SDK not defined
> Do we need to select JDK and point to local jdk install path?
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Introducing a PR template

2016-02-19 Thread Jamie Grier
+1

On Fri, Feb 19, 2016 at 9:30 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Martin,
>
> "mvn install" does include the goals "test" and "verify".
> In fact, "verify" is enough, because "install" does only copy the results
> into the local Maven repository (~/.m2/repository).
>
> So I think
>  - [ ] Tests pass (`mvn test`)
>  - [ ] Build passes (`mvn install`)
>  - [ ] Check style passes (`mvn verfiy`)
>
> can be condensed to
>   - [ ] Build passes (`mvn clean verify`)
>
> Otherwise, this looks good, IMO :-)
>
> Thanks, Fabian
>
> 2016-02-19 18:19 GMT+01:00 Martin Liesenberg <martin.liesenb...@gmail.com
> >:
>
> > Based on the recent discussion in the email thread 'Extending and
> improving
> > our "How to contribute" page', I propose to introduce the following
> > template for PRs
> >
> > 
> > Thanks for contributing to Apache Flink, before you open your PR please
> > kindly take into consideration the following check list.
> > Once you are sure, all items on the list can be checked, feel free to
> open
> > your PR. For more information please refer to the How To Contribute guide
> > linked above.
> >
> > ### General
> >   - [ ] Is there an associated JIRA issue
> >   - [ ] This PR addresses includes a single change
> >   - [ ] New functionality is covered by tests
> >   - [ ] Documentation is up to date
> >
> > ### Code health
> >  - [ ] Tests pass (`mvn test`)
> >  - [ ] Build passes (`mvn install`)
> >  - [ ] Check style passes (`mvn verfiy`)
> >  - [ ] JavaDoc for new `public` methods has been added
> > ---
> >
> >
> > The intended effects would be:
> > - reduce friction in the PR process created by basic oversights such as
> > checkstyle violations or missing tests
> > - provide a helping hand for new contributors
> >
> > I tried to condense the suggestion on the mailing list to make it not too
> > long and intimidating but at the same time cover the most important
> points.
> >
> > Looking forward to your input.
> > Best regards
> > martin
> >
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Hi all

2016-02-17 Thread Jamie Grier
Welcome, Josep!

On Wed, Feb 17, 2016 at 11:27 AM, Josep Rubio <jo...@datag.es> wrote:

> Hi All,
>
> I will introduce myself as I’m new in the list, my name is Josep Rubió and
> I’m from Barcelona.
>
> About Flink I just started with, I haven’t gone further than reading some
> documentation and compiling it.
> I’m new with this but I’d like to do a small contribution, I will send
> another email to the list explaining how I’ll try to help.
>
> Thanks!!
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com