Please unsubscribe me

2021-07-16 Thread R Bhaaagi
On Sat, 17 Jul 2021 at 6:37 AM, JING ZHANG (Jira)  wrote:

> JING ZHANG created FLINK-23413:
> --
>
>  Summary: Port RelTimeIndicatorConverter from SCALA to JAVA
>  Key: FLINK-23413
>  URL: https://issues.apache.org/jira/browse/FLINK-23413
>  Project: Flink
>   Issue Type: Sub-task
>   Components: Table SQL / Planner
> Reporter: JING ZHANG
>
>
>
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.3.4#803005)
>


[jira] [Created] (FLINK-23415) Shade kryo library to reduce class conflict

2021-07-16 Thread Aitozi (Jira)
Aitozi created FLINK-23415:
--

 Summary: Shade kryo library to reduce class conflict
 Key: FLINK-23415
 URL: https://issues.apache.org/jira/browse/FLINK-23415
 Project: Flink
  Issue Type: Improvement
Reporter: Aitozi


Can we shade Kryo used in flink like other common libraries in flink 
[https://github.com/apache/flink-shaded] and 
https://issues.apache.org/jira/browse/HIVE-5915 . Because Kryo is also a widely 
used libraries and not compatibility between some versions, which often caused 
user program conflict with flink internal. cc [~chesnay]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23414) Split Match_ROWTIME return type converter into a separator class

2021-07-16 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23414:
--

 Summary: Split Match_ROWTIME return type converter into a 
separator class
 Key: FLINK-23414
 URL: https://issues.apache.org/jira/browse/FLINK-23414
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23413) Port RelTimeIndicatorConverter from SCALA to JAVA

2021-07-16 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23413:
--

 Summary: Port RelTimeIndicatorConverter from SCALA to JAVA
 Key: FLINK-23413
 URL: https://issues.apache.org/jira/browse/FLINK-23413
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23412) Improve sourceSink description

2021-07-16 Thread Mans Singh (Jira)
Mans Singh created FLINK-23412:
--

 Summary: Improve sourceSink description
 Key: FLINK-23412
 URL: https://issues.apache.org/jira/browse/FLINK-23412
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.13.1
Reporter: Mans Singh
 Fix For: 1.14.0


The table/sourcesink documentation indicates:

{quote} the sink can solely accept insert-only rows and write out bounded 
streams.{quote}

Perhaps can be:

{quote} the sink can only accept insert-only rows and write out bounded 
streams.{quote}

Also, improving full stack example bullet points.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Introduction email

2021-07-16 Thread Srinivasulu Punuru
Hi Flink Devs,

I am Srini, I work for stream processing team at LinkedIn. LinkedIn is
taking a big bet on Apache Flink and migrating all the existing streaming
SQL apps to Flink. You might have seen mails from some of our team members
past few months. Thanks a lot for your support!

I just wanted to Say Hi to everyone before I take up some of the starter
Jiras and start contributing.

Thanks Again! Looking forward to collaboration :)

Here are some of the quick notes about our Flink scenarios.

   1. We will be using Flink SQL just for stream processing applications.
   2. Most of our current SQL apps are stateless, But stateful SQL
   capabilities is one of the reasons we are migrating to Flink. SQL state
   management is an area of interest.
   3. We also have customers asking for batch and streaming convergence, So
   SQL based batch <-> streaming convergence or engine portability of SQL apps
   is an area of interest.
   4. We are initially on prem. But LinkedIn as a whole is betting on
   Cloud. So taking advantage some of the cloud capabilities like Storage
   compute disaggregation, Elastic compute (for auto-scaling) for Flink would
   be interesting.
   5. We also provide a managed streaming SQL service i.e. We manage the
   SQL jobs for our developers. So reliability, operability and quick recovery
   is critical as well :).

Thanks,
Srini.


Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-16 Thread Yun Gao
Hi Till, Dawid

Very thanks for all the thoughts!

1) Regarding the event used to notify finish()

Logically there indeed two purposes to introduce the new event, one is for 
working
around the limitations fo the unaligned checkpoint and one for notifying the 
finish(), 
and as Dawid has pointed out for implementation we want to unify them into one 
event and
currently it is named as EndOfUserRecordsEvent in realistic. Sorry for the 
inconsistency and 
I'll unify the terminology in the FLIP. 

2) Regarding the lifecycle of the tasks & operators

Glad that we should be in general on the same page, and the FSM description 
indeed simplify the 
understanding of the process a lot. 

For the order of emit MAX_WATERMARK and call finish(), I should have made a 
mistake, and as 
both of you have pointed out finish() should be called after emitting 
MAX_WATERMARK, this indeed 
should be two atomic subprocess in order: handing MAX_WATERMARK -> get to 
FINISHED, and
we in fact could not call finish() between aligned on MAX_WATERMARK and emit 
MAX_WATERMARK. 

A small detail for the relationship between emit MAX_WATERMARK and finish() is 
that although MAX_WATERMARK
would fire all the event time triggers and finalize structures like windows, 
perhaps sometimes users may still need
to do some flush in the finish() method for non-event-time based data, like 
when a file sink uses a rolling policy 
based on size, then on finish() it needs to flush and close the current opened 
files. 

We also mentioned different methods to distinguish whether we need to emit 
MAX_WATERMARK and call finish(),
like not emit such events or emit events with special flags, I think all these 
methods would be able to achieve the 
target and perhaps we could leave the choices of the detailed methods to when 
we are implementing the code. 

For waiting for the next checkpoint / savepoint after FINISHED, I also agree 
with that currently we could have the 
same process for both the final checkpoint and stop-with-savepoint cases, and 
do not need to distinguish if we are
waiting for a checkpoint or a savepoint. 

Best,
Yun



--
From:Till Rohrmann 
Send Time:2021 Jul. 16 (Fri.) 23:05
To:Yun Gao 
Cc:dev 
Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

Thanks for your responses Dawid and Yun,

I agree with you that the whole lifecycle section of the
StreamOperator/StreamTask/Task in the FLIP needs to be better fleshed out.
I have to admit that I am a bit confused by the new events you are
mentioning because I couldn't find them in the FLIP. I assume that
EndOfUserRecordsEvent has been introduced to work around the limitations of
unaligned checkpoints and that EndofInputEvent is something that does not
exist yet. Maybe properly writing things down and agreeing on the same
terminology could help our discussion.

I also agree with you that we need some kind of new event (one that is
conceptually bound to the StreamTask) that says that we have reached the
end of input. Let's name this event for the further discussion
EndOfInputEvent.

The way I've thought about the problem is the following: First let us
define two simple state machines for the StreamOperator and the StreamTask:

StreamOperator FSM: RUNNING -> FINISHED -> CLOSED

RUNNING: We process input records and produce output records.
RUNNING -> FINISHED: Happens upon calling StreamOperator.finish() or if the
source reaches the end of data (does not mean that we flush anything). This
also emits EndOfInputEvent to downstream operators
FINISHED: The operator must not output events.
FINISHED -> CLOSED: Happens upon calling StreamOperator.close() (close all
resources)

StreamTask FSM: RUNNING -> WAITING_FOR_FINAL_CHECKPOINT -> CLOSED

RUNNING: The StreamOperator is running.
RUNNING -> WAITING_FOR_FINAL_CHECKPOINT: Upon receiving EndOfInputEvent on
all inputs. This will also call StreamOperator.finish() and send
EndOfInputEvent to its downstream tasks. (Alternatively if the
EndOfInputEvent processing happens in the StreamOperator, then we can
listen to the state transition RUNNING -> FINISHED of the StreamOperator).
WAITING_FOR_FINAL_CHECKPOINT: As the name says we wait for the next
successful checkpoint to complete.
WAITING_FOR_FINAL_CHECKPOINT -> CLOSED: Once the next checkpoint completes
successfully (e.g. StreamTask.notifyCheckpointComplete is called).

So the first thing to note is that StreamOperator.finish() does not flush
anything but simply tells the StreamOperator that it should no longer
produce output. The basic idea would be that whenever we need to flush
our buffered data, then we send a MAX_WATERMARK before sending an
EndOfInputEvent. Of course, we could also integrate this property in the
EndOfInputEvent(boolean flush_buffered_data) but we already have the
watermark mechanism. Of course, we could also change
StreamOperator.finish() into .finish(boolean flush) and translate a
MAX_WATERMARK into 

Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-16 Thread Steven Wu
To avoid confusion, can we either rename "SourceMetricGroup" to "
SplitReaderMetricGroup" or add "Reader" to the setter method names?

Yes, we should  add the "unassigned/pending splits" enumerator metric. I
tried to publish those metrics for IcebergSourceEnumerator and ran into an
issue [1]. I don't want to distract the discussion with the jira ticket.

[1] https://issues.apache.org/jira/browse/FLINK-21000

On Thu, Jul 15, 2021 at 1:01 PM Arvid Heise  wrote:

> Hi Steven,
>
> The semantics are unchanged compared to FLIP-33 [1] but I see your point.
>
> In reality, pending records would be mostly for event storage systems
> (Kafka, Kinesis, ...). Here, we would report the consumer lag effectively.
> If consumer lag is more prominent, we could also rename it.
>
> For pending bytes, this is mostly related to file source or any kind of
> byte streams. At this point, we can only capture the assigned splits on
> reader levels. I don't think it makes sense to add the same metric to the
> enumerator as that might induce too much I/O on the job master. I could
> rather envision another metric that captures how many unassigned splits
> there are. In general, I think it would be a good idea to add another type
> of top-level metric group for SplitEnumerator called
> SplitEnumeratorMetricGroup in SplitEnumeratorContext. There we could add
> unassigned/pending splits metric. WDYT?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>
> On Wed, Jul 14, 2021 at 9:00 AM Steven Wu  wrote:
>
> > I am trying to understand what those two metrics really capture
> >
> > > G setPendingBytesGauge(G pendingBytesGauge);
> >
> >-  use file source as an example, it captures the remaining bytes for
> >the current file split that the reader is processing? How would users
> >interpret or use this metric? enumerator keeps tracks of the
> >pending/unassigned splits, which is an indication of the size of the
> >backlog. that would be very useful
> >
> >
> > > G setPendingRecordsGauge(G pendingRecordsGauge);
> >
> >- In the Kafka source case, this is intended to capture the consumer
> lag
> >(log head offset from broker - current record offset)? that could be
> > used
> >to capture the size of the backlog
> >
> >
> >
> > On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise  wrote:
> >
> > > Hi Becket,
> > >
> > > I believe 1+2 has been answered by Chesnay already. Just to add to 2:
> I'm
> > > not the biggest fan of reusing task metrics but that's what FLIP-33 and
> > > different folks suggested. I'd probably keep task I/O metrics only for
> > > internal things and add a new metric for external calls. Then, we could
> > > even allow users to track I/O in AsyncIO (which would currently be a
> > mess).
> > > However, with the current abstraction, it would be relatively easy to
> add
> > > separate metrics later.
> > >
> > > 3. As outlined in the JavaDoc and in the draft PR [1], it's up to the
> > user
> > > to implement it in a way that fetch time always corresponds to the
> latest
> > > polled record. For SourceReaderBase, I have added a new
> > > RecordsWithSplitIds#lastFetchTime (with default return value null) that
> > > sets the last fetch time automatically whenever the next batch is
> > selected.
> > > Tbh this metric is a bit more challenging to implement for
> > > non-SourceReaderBase sources but I have not found a better, thread-safe
> > > way. Of course, we could shift the complete calculation into user-land
> > but
> > > I'm not sure that this is easier.
> > > For your scenarios:
> > > - in A, you assume SourceReaderBase. In that case, we could eagerly
> > report
> > > the metric as sketched by you. It depends on the definition of "last
> > > processed record" in FLIP-33, whether this eager reporting is more
> > correct
> > > than the lazy reporting that I have proposed. The former case assumes
> > "last
> > > processed record" = last fetched record, while the latter case assumes
> > > "last processed record" = "last polled record". For the proposed
> > solution,
> > > the user would just need to implement
> RecordsWithSplitIds#lastFetchTime,
> > > which typically corresponds to the creation time of the
> > RecordsWithSplitIds
> > > instance.
> > > - B is not assuming SourceReaderBase.
> > > If it's SourceReaderBase, the same proposed solution works out of the
> > box:
> > > SourceOperator intercepts the emitted event time and uses the fetch
> time
> > of
> > > the current batch.
> > > If it's not SourceReaderBase, the user would need to attach the
> timestamp
> > > to the handover protocol if multi-threaded and set the
> lastFetchTimeGauge
> > > when a value in the handover protocol is selected (typically a batch).
> > > If it's a single threaded source, the user could directly set the
> current
> > > timestamp after fetching the records in a sync fashion.
> > > The bad case is if the user is fetching individual records (either sync
> > or
> > > async), th

[DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-16 Thread Till Rohrmann
Hi everyone,

Since Flink 1.5 we have the same heartbeat timeout and interval default
values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
10s. These values were mainly chosen to compensate for lengthy GC pauses
and blocking operations that were executed in the main threads of Flink's
components. Since then, there were quite some advancements wrt the JVM's
GCs and we also got rid of a lot of blocking calls that were executed in
the main thread. Moreover, a long heartbeat.timeout causes long recovery
times in case of a TaskManager loss because the system can only properly
recover after the dead TaskManager has been removed from the scheduler.
Hence, I wanted to propose to change the timeout and interval to:

heartbeat.timeout: 15s
heartbeat.interval: 3s

Since there is no perfect solution that fits all use cases, I would really
like to hear from you what you think about it and how you configure these
heartbeat options. Based on your experience we might actually come up with
better default values that allow us to be resilient but also to detect
failed components fast. FLIP-185 can be found here [1].

[1] https://cwiki.apache.org/confluence/x/GAoBCw

Cheers,
Till


Re: [DISCUSS] FLIP-171: Async Sink

2021-07-16 Thread Till Rohrmann
Sure, thanks for the pointers.

Cheers,
Till

On Fri, Jul 16, 2021 at 6:19 PM Hausmann, Steffen 
wrote:

> Hi Till,
>
> You are right, I’ve left out some implementation details, which have
> actually changed a couple of time as part of the ongoing discussion. You
> can find our current prototype here [1] and a sample implementation of the
> KPL free Kinesis sink here [2].
>
> I plan to update the FLIP. But I think would it be make sense to wait
> until the implementation has stabilized enough before we update the FLIP to
> the final state.
>
> Does that make sense?
>
> Cheers, Steffen
>
> [1]
> https://github.com/sthm/flink/tree/flip-171-177/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink
> [2]
> https://github.com/sthm/flink/blob/flip-171-177/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
>
> From: Till Rohrmann 
> Date: Friday, 16. July 2021 at 18:10
> To: Piotr Nowojski 
> Cc: Steffen Hausmann , "dev@flink.apache.org" <
> dev@flink.apache.org>, Arvid Heise 
> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
>
>
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
> Hi Steffen,
>
> I've taken another look at the FLIP and I stumbled across a couple of
> inconsistencies. I think it is mainly because of the lacking code. For
> example, it is not fully clear to me based on the current FLIP how we
> ensure that there are no in-flight requests when
> AsyncSinkWriter.snapshotState is called. Also the concrete implementation
> of the AsyncSinkCommitter could be helpful for understanding how the
> AsyncSinkWriter works in the end. Do you plan to update the FLIP
> accordingly?
>
> Cheers,
> Till
>
> On Wed, Jun 30, 2021 at 8:36 AM Piotr Nowojski  > wrote:
> Thanks for addressing this issue :)
>
> Best, Piotrek
>
> wt., 29 cze 2021 o 17:58 Hausmann, Steffen  shau...@amazon.de>> napisał(a):
> Hey Poitr,
>
> I've just adapted the FLIP and changed the signature for the
> `submitRequestEntries` method:
>
> protected abstract void submitRequestEntries(List
> requestEntries, ResultFuture requestResult);
>
> In addition, we are likely to use an AtomicLong to track the number of
> outstanding requests, as you have proposed in 2b). I've already indicated
> this in the FLIP, but it's not fully fleshed out. But as you have said,
> that seems to be an implementation detail and the important part is the
> change of the `submitRequestEntries` signature.
>
> Thanks for your feedback!
>
> Cheers, Steffen
>
>
> On 25.06.21, 17:05, "Hausmann, Steffen"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> Hi Piotr,
>
> I’m happy to take your guidance on this. I need to think through your
> proposals and I’ll follow-up on Monday with some more context so that we
> can close the discussion on these details. But for now, I’ll close the vote.
>
> Thanks, Steffen
>
> From: Piotr Nowojski mailto:pnowoj...@apache.org
> >>
> Date: Friday, 25. June 2021 at 14:48
> To: Till Rohrmann mailto:trohrm...@apache.org>>
> Cc: Steffen Hausmann mailto:shau...@amazon.de>>, "
> dev@flink.apache.org"  >, Arvid Heise  ar...@apache.org>>
> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
>
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
> Hey,
>
> I've just synced with Arvid about a couple of more remarks from my
> side and he shared mine concerns.
>
> 1. I would very strongly recommend ditching `CompletableFuture `
> from the  `protected abstract CompletableFuture
> submitRequestEntries(List requestEntries);`  in favor of
> something like
> `org.apache.flink.streaming.api.functions.async.ResultFuture` interface.
> `CompletableFuture` would partially make the threading model of the
> `AsyncSincWriter` part of the public API and it would tie our hands.
> Regardless how `CompletableFuture` is used, it imposes performance
> overhead because it's synchronisation/volatile inside of it. On the other
> hand something like:
>
> protected abstract void submitRequestEntries(List
> requestEntries, ResultFuture requestResult);
>
> Would allow us to implement the threading model as we wish.
> `ResultFuture` could be backed via `CompletableFuture` underneath, but
> it could also be something more efficient.  I will explain what I have in
> mind in a second.
>
> 2. It looks to me that proposed `AsyncSinkWriter` Internals are not
> very efficient and maybe the threading model hasn't been thought through?
> Esp

Re: [DISCUSS] FLIP-171: Async Sink

2021-07-16 Thread Hausmann, Steffen
Hi Till,

You are right, I’ve left out some implementation details, which have actually 
changed a couple of time as part of the ongoing discussion. You can find our 
current prototype here [1] and a sample implementation of the KPL free Kinesis 
sink here [2].

I plan to update the FLIP. But I think would it be make sense to wait until the 
implementation has stabilized enough before we update the FLIP to the final 
state.

Does that make sense?

Cheers, Steffen

[1] 
https://github.com/sthm/flink/tree/flip-171-177/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink
[2] 
https://github.com/sthm/flink/blob/flip-171-177/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java

From: Till Rohrmann 
Date: Friday, 16. July 2021 at 18:10
To: Piotr Nowojski 
Cc: Steffen Hausmann , "dev@flink.apache.org" 
, Arvid Heise 
Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Steffen,

I've taken another look at the FLIP and I stumbled across a couple of 
inconsistencies. I think it is mainly because of the lacking code. For example, 
it is not fully clear to me based on the current FLIP how we ensure that there 
are no in-flight requests when AsyncSinkWriter.snapshotState is called. Also 
the concrete implementation of the AsyncSinkCommitter could be helpful for 
understanding how the AsyncSinkWriter works in the end. Do you plan to update 
the FLIP accordingly?

Cheers,
Till

On Wed, Jun 30, 2021 at 8:36 AM Piotr Nowojski 
mailto:pnowoj...@apache.org>> wrote:
Thanks for addressing this issue :)

Best, Piotrek

wt., 29 cze 2021 o 17:58 Hausmann, Steffen 
mailto:shau...@amazon.de>> napisał(a):
Hey Poitr,

I've just adapted the FLIP and changed the signature for the 
`submitRequestEntries` method:

protected abstract void submitRequestEntries(List 
requestEntries, ResultFuture requestResult);

In addition, we are likely to use an AtomicLong to track the number of 
outstanding requests, as you have proposed in 2b). I've already indicated this 
in the FLIP, but it's not fully fleshed out. But as you have said, that seems 
to be an implementation detail and the important part is the change of the 
`submitRequestEntries` signature.

Thanks for your feedback!

Cheers, Steffen


On 25.06.21, 17:05, "Hausmann, Steffen"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi Piotr,

I’m happy to take your guidance on this. I need to think through your 
proposals and I’ll follow-up on Monday with some more context so that we can 
close the discussion on these details. But for now, I’ll close the vote.

Thanks, Steffen

From: Piotr Nowojski mailto:pnowoj...@apache.org>>
Date: Friday, 25. June 2021 at 14:48
To: Till Rohrmann mailto:trohrm...@apache.org>>
Cc: Steffen Hausmann mailto:shau...@amazon.de>>, 
"dev@flink.apache.org" 
mailto:dev@flink.apache.org>>, Arvid Heise 
mailto:ar...@apache.org>>
Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink


CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.


Hey,

I've just synced with Arvid about a couple of more remarks from my side and 
he shared mine concerns.

1. I would very strongly recommend ditching `CompletableFuture ` from 
the  `protected abstract CompletableFuture 
submitRequestEntries(List requestEntries);`  in favor of 
something like `org.apache.flink.streaming.api.functions.async.ResultFuture` 
interface. `CompletableFuture` would partially make the threading model of 
the `AsyncSincWriter` part of the public API and it would tie our hands. 
Regardless how `CompletableFuture` is used, it imposes performance overhead 
because it's synchronisation/volatile inside of it. On the other hand something 
like:

protected abstract void submitRequestEntries(List 
requestEntries, ResultFuture requestResult);

Would allow us to implement the threading model as we wish. `ResultFuture` 
could be backed via `CompletableFuture` underneath, but it could also be 
something more efficient.  I will explain what I have in mind in a second.

2. It looks to me that proposed `AsyncSinkWriter` Internals are not very 
efficient and maybe the threading model hasn't been thought through? Especially 
private fields:

private final BlockingDeque bufferedRequestEntries;
private BlockingDeque> inFlightRequests;

are a bit strange to me. Why do we need two separate thread safe 
collections? Why do we need a `BlockingDeque` of `CompletableFuture`s? If we 
are already using a 

Re: [DISCUSS] FLIP-171: Async Sink

2021-07-16 Thread Till Rohrmann
Hi Steffen,

I've taken another look at the FLIP and I stumbled across a couple of
inconsistencies. I think it is mainly because of the lacking code. For
example, it is not fully clear to me based on the current FLIP how we
ensure that there are no in-flight requests when
AsyncSinkWriter.snapshotState is called. Also the concrete implementation
of the AsyncSinkCommitter could be helpful for understanding how the
AsyncSinkWriter works in the end. Do you plan to update the FLIP
accordingly?

Cheers,
Till

On Wed, Jun 30, 2021 at 8:36 AM Piotr Nowojski  wrote:

> Thanks for addressing this issue :)
>
> Best, Piotrek
>
> wt., 29 cze 2021 o 17:58 Hausmann, Steffen  napisał(a):
>
>> Hey Poitr,
>>
>> I've just adapted the FLIP and changed the signature for the
>> `submitRequestEntries` method:
>>
>> protected abstract void submitRequestEntries(List
>> requestEntries, ResultFuture requestResult);
>>
>> In addition, we are likely to use an AtomicLong to track the number of
>> outstanding requests, as you have proposed in 2b). I've already indicated
>> this in the FLIP, but it's not fully fleshed out. But as you have said,
>> that seems to be an implementation detail and the important part is the
>> change of the `submitRequestEntries` signature.
>>
>> Thanks for your feedback!
>>
>> Cheers, Steffen
>>
>>
>> On 25.06.21, 17:05, "Hausmann, Steffen" 
>> wrote:
>>
>> CAUTION: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hi Piotr,
>>
>> I’m happy to take your guidance on this. I need to think through your
>> proposals and I’ll follow-up on Monday with some more context so that we
>> can close the discussion on these details. But for now, I’ll close the vote.
>>
>> Thanks, Steffen
>>
>> From: Piotr Nowojski 
>> Date: Friday, 25. June 2021 at 14:48
>> To: Till Rohrmann 
>> Cc: Steffen Hausmann , "dev@flink.apache.org" <
>> dev@flink.apache.org>, Arvid Heise 
>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
>>
>>
>> CAUTION: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>> Hey,
>>
>> I've just synced with Arvid about a couple of more remarks from my
>> side and he shared mine concerns.
>>
>> 1. I would very strongly recommend ditching `CompletableFuture `
>> from the  `protected abstract CompletableFuture
>> submitRequestEntries(List requestEntries);`  in favor of
>> something like
>> `org.apache.flink.streaming.api.functions.async.ResultFuture` interface.
>> `CompletableFuture` would partially make the threading model of the
>> `AsyncSincWriter` part of the public API and it would tie our hands.
>> Regardless how `CompletableFuture` is used, it imposes performance
>> overhead because it's synchronisation/volatile inside of it. On the other
>> hand something like:
>>
>> protected abstract void submitRequestEntries(List
>> requestEntries, ResultFuture requestResult);
>>
>> Would allow us to implement the threading model as we wish.
>> `ResultFuture` could be backed via `CompletableFuture` underneath, but
>> it could also be something more efficient.  I will explain what I have in
>> mind in a second.
>>
>> 2. It looks to me that proposed `AsyncSinkWriter` Internals are not
>> very efficient and maybe the threading model hasn't been thought through?
>> Especially private fields:
>>
>> private final BlockingDeque bufferedRequestEntries;
>> private BlockingDeque> inFlightRequests;
>>
>> are a bit strange to me. Why do we need two separate thread safe
>> collections? Why do we need a `BlockingDeque` of `CompletableFuture`s?
>> If we are already using a fully synchronised collection, there should be no
>> need for another layer of thread safe `CompletableFuture`.
>>
>> As I understand, the threading model of the `AsyncSinkWriter` is very
>> similar to that of the `AsyncWaitOperator`, with very similar requirements
>> for inducing backpressure. How I would see it implemented is for example:
>>
>> a) Having a single lock, that would encompass the whole
>> `AsyncSinkWriter#flush()` method. `flush()` would be called from the task
>> thread (mailbox). To induce backpressure, `#flush()` would just call
>> `lock.wait()`. `ResultFuture#complete(...)` called from an async thread,
>> would also synchronize on the same lock, and mark some of the inflight
>> requests as completed and call `lock.notify()`.
>>
>> b) More efficient solution. On the hot path we would have for example
>> only `AtomicLong numberOfInFlightRequests`. Task thread would be bumping
>> it, `ResultFuture#complete()` would be decreasing it. If the task thread
>> when bumping `numberOfInFlightRequests` exceeds a threshold, he goes to
>> sleep/wait on a lock or some `CompletableFuture`. If
>> `ResultFuture#complete()` when d

[jira] [Created] (FLINK-23411) Expose Flink checkpoint details metrics

2021-07-16 Thread Jun Qin (Jira)
Jun Qin created FLINK-23411:
---

 Summary: Expose Flink checkpoint details metrics
 Key: FLINK-23411
 URL: https://issues.apache.org/jira/browse/FLINK-23411
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Affects Versions: 1.12.4, 1.13.1
Reporter: Jun Qin


The checkpoint metrics as shown in the Flink Web UI like the 
sync/async/alignment/start delay are not exposed to the metrics system. This 
makes problem investigation harder when Web UI is not enabled: those numbers 
can not get in the DEBUG logs. I think we should see how we can expose metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23410) Use a pool of KafkaProducers to commit Kafka Transactions

2021-07-16 Thread Jun Qin (Jira)
Jun Qin created FLINK-23410:
---

 Summary: Use a pool of KafkaProducers to commit Kafka Transactions
 Key: FLINK-23410
 URL: https://issues.apache.org/jira/browse/FLINK-23410
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.12.4, 1.13.1
Reporter: Jun Qin


Currently, {{FlinkKafkaProducer}} contains {{kafkaProducersPoolSize}} (it is 5 
by default). But  {{kafkaProducersPoolSize}} is only used to calculate next 
transactionalIds. There is actually no KafkaProducer pool in  
{{FlinkKafkaProducer}}. This means, for every checkpoint, Flink creates a new 
KakfaProducer (therefore a new thread) and get a new producer id from Kafka 
before it can initialize/commit a transaction. When the checkpoint is complete 
and transaction is committed, the thread is shutdown.  This is inefficient not 
only in terms of Flink's CPU usage (to shutdown/recreate threads) but also in 
terms of the network communication to Kafka (to re-request the producer Id from 
Kafka).  This JIRA is opened to actually implement the KafkaProducer pool.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-16 Thread Till Rohrmann
Thanks for your responses Dawid and Yun,

I agree with you that the whole lifecycle section of the
StreamOperator/StreamTask/Task in the FLIP needs to be better fleshed out.
I have to admit that I am a bit confused by the new events you are
mentioning because I couldn't find them in the FLIP. I assume that
EndOfUserRecordsEvent has been introduced to work around the limitations of
unaligned checkpoints and that EndofInputEvent is something that does not
exist yet. Maybe properly writing things down and agreeing on the same
terminology could help our discussion.

I also agree with you that we need some kind of new event (one that is
conceptually bound to the StreamTask) that says that we have reached the
end of input. Let's name this event for the further discussion
EndOfInputEvent.

The way I've thought about the problem is the following: First let us
define two simple state machines for the StreamOperator and the StreamTask:

StreamOperator FSM: RUNNING -> FINISHED -> CLOSED

RUNNING: We process input records and produce output records.
RUNNING -> FINISHED: Happens upon calling StreamOperator.finish() or if the
source reaches the end of data (does not mean that we flush anything). This
also emits EndOfInputEvent to downstream operators
FINISHED: The operator must not output events.
FINISHED -> CLOSED: Happens upon calling StreamOperator.close() (close all
resources)

StreamTask FSM: RUNNING -> WAITING_FOR_FINAL_CHECKPOINT -> CLOSED

RUNNING: The StreamOperator is running.
RUNNING -> WAITING_FOR_FINAL_CHECKPOINT: Upon receiving EndOfInputEvent on
all inputs. This will also call StreamOperator.finish() and send
EndOfInputEvent to its downstream tasks. (Alternatively if the
EndOfInputEvent processing happens in the StreamOperator, then we can
listen to the state transition RUNNING -> FINISHED of the StreamOperator).
WAITING_FOR_FINAL_CHECKPOINT: As the name says we wait for the next
successful checkpoint to complete.
WAITING_FOR_FINAL_CHECKPOINT -> CLOSED: Once the next checkpoint completes
successfully (e.g. StreamTask.notifyCheckpointComplete is called).

So the first thing to note is that StreamOperator.finish() does not flush
anything but simply tells the StreamOperator that it should no longer
produce output. The basic idea would be that whenever we need to flush
our buffered data, then we send a MAX_WATERMARK before sending an
EndOfInputEvent. Of course, we could also integrate this property in the
EndOfInputEvent(boolean flush_buffered_data) but we already have the
watermark mechanism. Of course, we could also change
StreamOperator.finish() into .finish(boolean flush) and translate a
MAX_WATERMARK into .finish(true).

What would it look like if the source reaches its end?

Upon reaching the end, the source would send a MAX_WATERMARK followed by an
EndOfInputEvent and then go into the FINISHED state. This will cause the
StreamTask to go into the WAITING_FOR_FINAL_CHECKPOINT state. The
downstream tasks upon receiving MAX_WATERMARK and EndOfInputEvent on all
channels, would first flush all their data (because of MAX_WATERMARK), then
forward MAX_WATERMARK to its downstream operator, go into the FINISHED
state (because of EndOfInputEvent) and finally also sent an EndOfInputEvent
to its downstream operators. The owning StreamTask would also go into the
WAITING_FOR_FINAL_CHECKPOINT state. And all StreamOperators and StreamTasks
would terminate after the next successful checkpoint. That would then
trigger the EndOfPartitionEvent that will terminate the TCP connections.

What would it look like if we trigger stop-with-savepoint w/ drain?

We would send our sources a signal that they should stop processing and
advance the time. This means that they emit a MAX_WATERMARK and then go
into the FINISHED state (meaning that they send an EndOfInputEvent). Then
everything is the same as with the first case only that we will trigger a
savepoint just after sending the stop processing signal.

What would it look like if we trigger stop-with-savepoint w/o drain?

We would send our sources a signal that they should stop processing. This
means that the sources go into FINISHED which means that we are sending an
EndOfInputEvent. Then everything is the same as with the first case only
that we will trigger a savepoint just after sending the stop processing
signal.

One thing I would like to emphasize here is that the
StreamOperators/StreamTasks don't have to know anything about a specific
savepoint/checkpoint. They simply follow their state machine that says that
you have to wait for a checkpoint if you want to stop. It just happens that
in the case of stop-with-savepoint the next checkpoint will be a savepoint.
That way we decouple the lifecycle management and the stop-with-savepoint
operations a bit better.

Cheers,
Till

On Fri, Jul 16, 2021 at 2:22 PM Yun Gao  wrote:

> Hi Till,
>
> > Ok, so the plan is that finish() will flush all pending events and then
> send the MAX_WATERMARK.
>
> > What I am missing is the connection b

Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-16 Thread Yun Gao
Hi Till,

> Ok, so the plan is that finish() will flush all pending events and then send 
> the MAX_WATERMARK.

> What I am missing is the connection between the lifecycle of the operator and 
> signals the StreamTask and Task might receive (and also their life cycles). 
> At the moment we do have the EndOfPartitionEvent that we send once the Task 
> reaches its end. Based on what signals will 
> StreamOperator.finish() be called? What kind of other signals for the 
> StreamTask do we have to introduce in order to support stop-with-savepoint 
> w/o --drain if we don't want to call finish()?

Currently the plan is to introduce a new event (like EndofInputEvent) to first 
notify all the tasks from sources to 
sink to first call finish() (or skip finish() if stop-with-savepoint), then the 
tasks would wait for the last checkpoint and the finally emit EndOfPartition. 
We could 
add a flag to `EndofInputEvent` to indicating if the finish() method would be 
called. 

From the view of a single task, the life cycle of a source task would looks like

1. Open & Initialize
2. Emits all the records
3. Call finish() for all the operators
4. Emit MAX_WATERMARK
5. Emit EndofInputEvent {finish = true} 
6. Wait for the checkpoint trigger, emit CheckpointBarrier and wait for the 
notification of the final checkpoint if needed.
7. Call close() for all the operators
8. Emit EndOfPartitionEvent


for normal finish. For stop-with-savepoint [--drain], it would looks like

1. Open & Initialize
2. Received stop-with-savepoint (--drain) request
3. Finish the task (e.g. interrupt the legacy source thread or suspend the 
mailbox for the new source).
4. Call finish() for all the operators.
5. Emit MAX_WATERMARK if --drain
6. Emit EndofInputEvent  {finish = is drain} 
7. Emit CheckpointBarrier for this savepoint and wait for the notification of 
the final savepoint if needed.
8. Call close() for all the operators
9. Emit EndOfPartitionEvent

Then the lifecycle of a non-source task would be

1. Open & Initialize
2. runMailboxLoop()
3. Received MAX_WATERMARK from all the channels. --> trigger all the timers. 
4. Received EndofInputEvent from all the input channels  --> Call finish() for 
all the operators.
5. Emit MAX_WATERMARK if --drain (this is nature with the watermark alignment 
mechanism)
6. Emit the received EndofInputEvent.
7. Wait for the checkpoint barrier aligned, emit CheckpointBarrier and wait for 
the notification of the final checkpoint / savepoint if needed.
8. Call close() for all the operators
9. Wait for received all EndOfPartitionEvent, release network resources and 
emit EndOfPartitionEvent

> If finish() sends MAX_WATERMARK after flushing all pending events and 
> finish() itself is being triggered by some endOfInputEvent from the 
> StreamTask, then we might actually send MAX_WATERMARK multiple times.

I think we might not send MAX_WATERMARK multiple times in that for non-source 
tasks, both MAX_WATERMARK and EndOfInputEvent would be processed only after 
they are received from event input channels. 
Then after process the current task would emit MAX_WATERMARK and 
EndOfInputEvent again to the following tasks, thus both the two events would 
only be sent once for each task ? 

Best,
Yun




--
From:Till Rohrmann 
Send Time:2021 Jul. 16 (Fri.) 17:26
To:Yun Gao 
Cc:dev 
Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

Ok, so the plan is that finish() will flush all pending events and then send 
the MAX_WATERMARK.

What I am missing is the connection between the lifecycle of the operator and 
signals the StreamTask and Task might receive (and also their life cycles). At 
the moment we do have the EndOfPartitionEvent that we send once the Task 
reaches its end. Based on what signals will StreamOperator.finish() be called? 
What kind of other signals for the StreamTask do we have to introduce in order 
to support stop-with-savepoint w/o --drain if we don't want to call finish()?

If finish() sends MAX_WATERMARK after flushing all pending events and finish() 
itself is being triggered by some endOfInputEvent from the StreamTask, then we 
might actually send MAX_WATERMARK multiple times.

I think what I am lacking a bit is how the StreamOperator lifecycle will fit 
together with the lifecycle of its owner and the corresponding signals we have 
to send.

Cheers,
Till

Cheers,
Till
On Fri, Jul 16, 2021 at 10:40 AM Yun Gao  wrote:

Hi Till,

Sorry that I do not see the reply when sending the last mail, I think as a 
whole we are on the same page for the
1. For normal stop-with-savepoint, we do not call finish() and do not emit 
MAX_WATERMARK
2. For normal finish / stop-with-savepoint --drain, we would call finish() and 
emit MAX_WATERMARK

> But then there is the question, how do we signal the operator that the next 
> checkpoint is supposed to stop the operator 
> (how will the operator's lifecycle look in this case)? Maybe we simpl

Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-16 Thread Dawid Wysakowicz
Hi all,

I must admit I think I agree with Till that the relation between
life-cycles of the (Stream)Task and the Operator is missing in the FLIP.
Moreover I am now a bit concerned that we missed the relation with
stopping with savepoint. I am still positive we can incorporate that in
the current efforts. At the same time I think the proposed Operator APIs
still make sense.

First I'd like to start from the Operator's point of view. *
*

*endOfInput vs finish*

The way I see the relation between endOfInput/finish/MAX_WATERMARK/close
is that basically endOfInput() and finish() are the same for one-input
operators and the difference for multi input operators is that
endOfInput is called for each input separately and then once all inputs
finish the finish() is called. For simplicity I will from now on use
just the finish method and leave the endOfInput out of the discussion.

*w drain vs w/o drain*

I agree we have two cases of finishing based on the reason:

 1. we consumed all data (bounded source or stop-with-savepoint w drain)
 2. stop-with-savepoint w/o drain

I think we're at the same page that finish() should be called only in
the first case. I suggest we keep the current behaviour that the we emit
MAX_WATERMARK separately and then call finish(). For even-time based
scenarios this might be duplicated information, but it does not harm.
The order would be that we first emit MAX_WATERMARK and then call
finish(). Of course we would emit MAX_WATERMARK only in the first case.

Lastly the assumption should be that if there is finish() there will be
one extra snapshotState/notifyCheckpointComplete. For the w/o drain the
operator should not care when it is closed() or which savepoint was the
last one.

That's from the operator point of view.

Now I'd like to move on to the (Stream)Task point of view. I think here
lay the problems that we missed and Till rightly points those out.

*When should we call finish()*

In the current implementation we call finish() once we receive the
EndOfPartitionEvent which, as Till said, makes it impossible to bring
down the entire topology in a single savepoint. We can finish all
chained operators in a single Task, but not all Tasks in a DAG. It makes
it also a bit harder to distinguish if we should call the finish() then
or not. Point taken and it is an important omission from our side, in my
opinion.

However, we introduced EndOfUserRecordsEvent which says there will be no
more records, but the channel is not closed and you can expect other
events such as e.g. CheckpointBarriers. The original idea behind this
event was to make unaligned checkpoints work well with checkpoints after
tasks finished. I think we could and should use that event to call the
finish() method, though. That way we could call finish() before the
producer task finishes. This would also solve the problem of savepoint
w/o drain, as this event would not be emitted in that case and we would
not call the finish().

Having that a StreamTask could finish if all following conditions are met:

  * in case of all data consumed
  o received EndOfUserRecordsEvent from all channels
  o called finish() method
  o checkpoint was triggered and its completion was acknowledged
(notifyCheckpointComplete)
  o received EndOfPartitionEvent from all channels
  * stop-with-savepoint w/o drain
  o savepoint acknowledged
  o received EndOfPartitionEvent from all channels

We can distinguish that from the previous case, because we would receive
EndOfPartitionEvent from all channels without receiving
EndOfUserRecordsEvent before that.

Lets then see how this could look for end of data for two Tasks:

Task1 (Source) -> Task2

Task1:

(consume all data) -> call finish() -> send EndOfUserRecordsEvent ->
wait for checkpoint x trigger -> wait for checkpoint x complete + wait
for EndOfUserRecordsEvent acknowledged -> close / send EndOfPartitionEvent

Task2:

receive EndOfUserRecordsEvent -> send EndOfUserRecordsEvent -> wait for
checkpoint y barrier (it could happen that y=x) -> wait for checkpoint y
complete + wait for EndOfUserRecordsEvent acknowledged -> close

The savepoint w drain is very similar, it just replaces consume all
data, and in this case y will always be x, because all sources will
finish with the same savepoint.

For savepoint w/o drain it's slightly different:

Task1:

savepoint w/o drain -> wait for savepoint complete -> close / send
EndOfPartitionEvent

Task 2:

EndOfPartition received + wait for savepoint complete -> close

Does that make sense?

Best,

Dawid

On 16/07/2021 11:26, Till Rohrmann wrote:
> Ok, so the plan is that finish() will flush all pending events and then
> send the MAX_WATERMARK.
>
> What I am missing is the connection between the lifecycle of the operator
> and signals the StreamTask and Task might receive (and also their life
> cycles). At the moment we do have the EndOfPartitionEvent that we send once
> the Task reaches its end. Based on what signals will
> StreamOpe

[jira] [Created] (FLINK-23409) CrossITCase fails with "NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout"

2021-07-16 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-23409:


 Summary: CrossITCase fails with "NoResourceAvailableException: 
Slot request bulk is not fulfillable! Could not allocate the required slot 
within slot request timeout"
 Key: FLINK-23409
 URL: https://issues.apache.org/jira/browse/FLINK-23409
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination, Table SQL / Planner
Affects Versions: 1.14.0
Reporter: Dawid Wysakowicz


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20548&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=5360d54c-8d94-5d85-304e-a89267eb785a&l=10074

{code}
Jul 16 09:21:37 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
Jul 16 09:21:37 at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
Jul 16 09:21:37 at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
Jul 16 09:21:37 at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
Jul 16 09:21:37 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
Jul 16 09:21:37 at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
Jul 16 09:21:37 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
Jul 16 09:21:37 at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
Jul 16 09:21:37 ... 4 more
Jul 16 09:21:37 Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Slot request bulk is not fulfillable! Could not allocate the required slot 
within slot request timeout
Jul 16 09:21:37 at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
Jul 16 09:21:37 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
Jul 16 09:21:37 at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
Jul 16 09:21:37 at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
Jul 16 09:21:37 ... 31 more
Jul 16 09:21:37 Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Slot request bulk is not fulfillable! Could not allocate the required slot 
within slot request timeout
Jul 16 09:21:37 at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
Jul 16 09:21:37 ... 24 more
Jul 16 09:21:37 Caused by: java.util.concurrent.TimeoutException: Timeout has 
occurred: 30 ms
Jul 16 09:21:37 ... 25 more

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23408) Wait for a checkpoint completed after finishing a task

2021-07-16 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-23408:


 Summary: Wait for a checkpoint completed after finishing a task
 Key: FLINK-23408
 URL: https://issues.apache.org/jira/browse/FLINK-23408
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Task
Reporter: Dawid Wysakowicz


Before finishing a task we should wait for a checkpoint issued after 
{{finish()}} to commit all pending transactions created from the {{finish()}} 
method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23407) Mechanism to document enums used for ConfigOptions

2021-07-16 Thread Jira
Ingo Bürk created FLINK-23407:
-

 Summary: Mechanism to document enums used for ConfigOptions
 Key: FLINK-23407
 URL: https://issues.apache.org/jira/browse/FLINK-23407
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core
Reporter: Ingo Bürk
Assignee: Ingo Bürk






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[VOTE] FLIP-184: Refine ShuffleMaster lifecycle management for pluggable shuffle service framework

2021-07-16 Thread Yingjie Cao
Hi all,

I'd like to start a vote on FLIP-184 [1] which was
discussed in [2] [3]. The vote will be open for at least 72 hours
until 7.21 unless there is an objection.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-184%3A+Refine+ShuffleMaster+lifecycle+management+for+pluggable+shuffle+service+framework
[2]
https://lists.apache.org/thread.html/radbbabfcfb6bec305ddf7aeefb983232f96b18ba013f0ae2ee500288%40%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/r93e3a72506f3e7ffd3c1ab860b5d1a21f8a47b059f2f2fdd05ca1d46%40%3Cdev.flink.apache.org%3E


Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-16 Thread Till Rohrmann
Ok, so the plan is that finish() will flush all pending events and then
send the MAX_WATERMARK.

What I am missing is the connection between the lifecycle of the operator
and signals the StreamTask and Task might receive (and also their life
cycles). At the moment we do have the EndOfPartitionEvent that we send once
the Task reaches its end. Based on what signals will
StreamOperator.finish() be called? What kind of other signals for the
StreamTask do we have to introduce in order to support stop-with-savepoint
w/o --drain if we don't want to call finish()?

If finish() sends MAX_WATERMARK after flushing all pending events and
finish() itself is being triggered by some endOfInputEvent from the
StreamTask, then we might actually send MAX_WATERMARK multiple times.

I think what I am lacking a bit is how the StreamOperator lifecycle will
fit together with the lifecycle of its owner and the corresponding signals
we have to send.

Cheers,
Till

Cheers,
Till

On Fri, Jul 16, 2021 at 10:40 AM Yun Gao  wrote:

> Hi Till,
>
> Sorry that I do not see the reply when sending the last mail, I think as a
> whole we are on the same page for the
> 1. For normal stop-with-savepoint, we do not call finish() and do not emit
> MAX_WATERMARK
> 2. For normal finish / stop-with-savepoint --drain, we would call finish()
> and emit MAX_WATERMARK
>
> > But then there is the question, how do we signal the operator that the
> next checkpoint is supposed to stop the operator
> > (how will the operator's lifecycle look in this case)? Maybe we simply
> don't tell the operator and handle this situation on the
> > StreamTask level.
>
> Logically I think in this case UDF seems do not need to know the next
> checkpoint is supposed to stop the operator since the final
> checkpoint in this case have no difference with the ordinary checkpoints.
>
> > So I guess the question is will finish() advance the time to the end or
> is this a separate mechanism (e.g. explicit watermarks).
>
> I tend to have an explicit MAX_WATERMARK since it makes watermark
> processing to be unified with normal cases and make the meanings of
> each event explicit. But this might be a private preference and both
> methods would work.
>
> Very sorry for not making the whole thing clear in the FLIP again, if
> there are no other concerns I'll update the FLIP with the above conclusions
> to make it precise in this part.
>
>
> Best,
> Yun
>
> --
> From:Till Rohrmann 
> Send Time:2021 Jul. 16 (Fri.) 16:00
> To:dev 
> Cc:Yun Gao 
> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> Finished
>
> I think we should try to sort this out because it might affect how and
> when finish() will be called (or in general how the operator lifecycle
> looks like).
>
> To give an example let's take a look at the stop-with-savepoint w/ and w/o
> --drain:
>
> 1) stop-with-savepoint w/o --drain: Conceptually what we would like to do
> is to stop processing w/o completing any windows or waiting for the
> AsyncOperator to finish its operations. All these unfinished operations
> should become part of the final checkpoint so that we can resume from it at
> a later point. Depending on what finish() does (flush unfinished windows or
> not), this method must or must not be called. Assuming that finish()
> flushes unfinished windows/waits for uncompleted async operations, we
> clearly shouldn't call it. But then there is the question, how do we
> signal the operator that the next checkpoint is supposed to stop the
> operator (how will the operator's lifecycle look in this case)? Maybe we
> simply don't tell the operator and handle this situation on the StreamTask
> level. If finish() does not flush unfinished windows, then it shouldn't be
> a problem.
>
> 2) stop-with-savepoint w/ --drain: Here we want to complete all pending
> operations and flush out all results because we don't intend to resume the
> job. Conceptually, we tell the system that we have reached MAX_WATERMARK.
> If finish() is defined so that it implicitly advances the watermark to
> MAX_WATERMARK, then there is no problem. If finish() does not have this
> semantic, then we need to send the MAX_WATERMARK before sending the
> endOfInput event to a downstream task. In fact, stop-with-savepoint /w
> --drain shouldn't be a lot different from a bounded source that reaches its
> end. It would also send MAX_WATERMARK and then signal the endOfInput event
> (note that endOfInput is decoupled from the event time here).
>
> So I guess the question is will finish() advance the time to the end or is
> this a separate mechanism (e.g. explicit watermarks).
>
> Concerning how to handle processing time, I am a bit unsure tbh. I can see
> arguments for completing processing time windows/firing processing time
> timers when calling stop-with-savepoint w/ --drain. On the other hand, I
> could also see that people want to define actions based on the wall clock
> time that are

Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-16 Thread Yun Gao
Hi Till,

Sorry that I do not see the reply when sending the last mail, I think as a 
whole we are on the same page for the
1. For normal stop-with-savepoint, we do not call finish() and do not emit 
MAX_WATERMARK
2. For normal finish / stop-with-savepoint --drain, we would call finish() and 
emit MAX_WATERMARK

> But then there is the question, how do we signal the operator that the next 
> checkpoint is supposed to stop the operator 
> (how will the operator's lifecycle look in this case)? Maybe we simply don't 
> tell the operator and handle this situation on the 
> StreamTask level.

Logically I think in this case UDF seems do not need to know the next 
checkpoint is supposed to stop the operator since the final
checkpoint in this case have no difference with the ordinary checkpoints. 

> So I guess the question is will finish() advance the time to the end or is 
> this a separate mechanism (e.g. explicit watermarks).

I tend to have an explicit MAX_WATERMARK since it makes watermark processing to 
be unified with normal cases and make the meanings of
each event explicit. But this might be a private preference and both methods 
would work. 

Very sorry for not making the whole thing clear in the FLIP again, if there are 
no other concerns I'll update the FLIP with the above conclusions
to make it precise in this part. 


Best,
Yun


--
From:Till Rohrmann 
Send Time:2021 Jul. 16 (Fri.) 16:00
To:dev 
Cc:Yun Gao 
Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

I think we should try to sort this out because it might affect how and when 
finish() will be called (or in general how the operator lifecycle looks like).

To give an example let's take a look at the stop-with-savepoint w/ and w/o 
--drain:

1) stop-with-savepoint w/o --drain: Conceptually what we would like to do is to 
stop processing w/o completing any windows or waiting for the AsyncOperator to 
finish its operations. All these unfinished operations should become part of 
the final checkpoint so that we can resume from it at a later point. Depending 
on what finish() does (flush unfinished windows or not), this method must or 
must not be called. Assuming that finish() flushes unfinished windows/waits for 
uncompleted async operations, we clearly shouldn't call it. But then there is 
the question, how do we signal the operator that the next checkpoint is 
supposed to stop the operator (how will the operator's lifecycle look in this 
case)? Maybe we simply don't tell the operator and handle this situation on the 
StreamTask level. If finish() does not flush unfinished windows, then it 
shouldn't be a problem.

2) stop-with-savepoint w/ --drain: Here we want to complete all pending 
operations and flush out all results because we don't intend to resume the job. 
Conceptually, we tell the system that we have reached MAX_WATERMARK. If 
finish() is defined so that it implicitly advances the watermark to 
MAX_WATERMARK, then there is no problem. If finish() does not have this 
semantic, then we need to send the MAX_WATERMARK before sending the endOfInput 
event to a downstream task. In fact, stop-with-savepoint /w --drain shouldn't 
be a lot different from a bounded source that reaches its end. It would also 
send MAX_WATERMARK and then signal the endOfInput event (note that endOfInput 
is decoupled from the event time here).

So I guess the question is will finish() advance the time to the end or is this 
a separate mechanism (e.g. explicit watermarks).

Concerning how to handle processing time, I am a bit unsure tbh. I can see 
arguments for completing processing time windows/firing processing time timers 
when calling stop-with-savepoint w/ --drain. On the other hand, I could also 
see that people want to define actions based on the wall clock time that are 
independent of the stream state and, thus, would want to ignore them if the 
Flink application is stopped before reaching this time.

Cheers,
Till
On Fri, Jul 16, 2021 at 7:48 AM Piotr Nowojski  wrote:
Hi Till,

 > 1) Does endOfInput entail sending of the MAX_WATERMARK?
 >
 > 2) StreamOperator.finish says to flush all buffered events. Would a
 > WindowOperator close all windows and emit the results upon calling
 > finish, for example?

 1) currently they are independent but parallel mechanisms. With event time,
 they are basically the same.
 2) it probably should for the sake of processing time windows.

 Here you are touching the bit of the current design that I like the least.
 We basically have now three different ways of conveying very similar things:
 a) sending `MAX_WATERMARK`, used by event time WindowOperator (what about
 processing time?)
 b) endInput(), used for example by AsyncWaitOperator to flush it's internal
 state
 c) finish(), used for example by ContinuousFileReaderOperator

 It's a bit messy and I'm not sure if this should be strengthened out? Each
 one of those has a little bit dif

Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-16 Thread Yun Gao
Hi Till, Piotr

Very thanks for the comments!

> 1) Does endOfInput entail sending of the MAX_WATERMARK?

I also agree with Piotr that currently they are independent mechanisms, and 
they are basically the same
for the event time. 

For more details, first there are some difference among the three scenarios 
regarding the finish: 
For normal finish and stop-with-savepoint --drain,  the job would not be 
expected  to be restarted, 
and for stop-with-savepoint the job would be expected restart later. 

Then for finish / stop-with-savepoint --drain, currently Flink would emit 
MAX_WATERMARK before the 
EndOfPartition. Besides, as we have discussed before [1], endOfInput / finish() 
should also only be called
for finish / stop-with-savepoint --drain. Thus currently they always occurs at 
the same time. After the change,
we could emit MAX_WATERMARK before endOfInput event for the finish / 
stop-with-savepoint --drain cases.

> 2) StreamOperator.finish says to flush all buffered events. Would a
> WindowOperator close all windows and emit the results upon calling
> finish, for example?

As discussed above for stop-with-savepoint, we would always keep the window as 
is, and restore them after restart. 
Then for the finish / stop-with-savepoint --drain, I think perhaps it depends 
on the Triggers.  For 
event-time triggers / process time triggers, it would be reasonable to flush 
all the windows since logically
the time would always elapse and the window would always get triggered in a 
logical future. But for triggers
like CountTrigger, no matter how much time pass logically, the windows would 
not trigger, thus we may not
 flush these windows. If there are requirements we may provide additional 
triggers. 

> It's a bit messy and I'm not sure if this should be strengthened out? Each 
> one of those has a little bit different semantic/meaning, 
> but at the same time they are very similar. For single input operators 
> `endInput()` and `finish()` are actually the very same thing. 

Currently MAX_WATERMARK / endInput / finish indeed always happen at the same 
time, and for single input operators `endInput()` and `finish()` 
are indeed the same thing. During the last discussion we ever mentioned this 
issue and at then we thought that we might deprecate `endInput()`
in the future, then we would only have endInput(int input) and finish(). 

Best,
Yun


[1] https://issues.apache.org/jira/browse/FLINK-21132



--
From:Piotr Nowojski 
Send Time:2021 Jul. 16 (Fri.) 13:48
To:dev 
Cc:Yun Gao 
Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

Hi Till,

> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>
> 2) StreamOperator.finish says to flush all buffered events. Would a> 
> WindowOperator close all windows and emit the results upon calling
> finish, for example?

1) currently they are independent but parallel mechanisms. With event time, 
they are basically the same.
2) it probably should for the sake of processing time windows.

Here you are touching the bit of the current design that I like the least. We 
basically have now three different ways of conveying very similar things:
a) sending `MAX_WATERMARK`, used by event time WindowOperator (what about 
processing time?)
b) endInput(), used for example by AsyncWaitOperator to flush it's internal 
state
c) finish(), used for example by ContinuousFileReaderOperator 

It's a bit messy and I'm not sure if this should be strengthened out? Each one 
of those has a little bit different semantic/meaning, but at the same time they 
are very similar. For single input operators `endInput()` and `finish()` are 
actually the very same thing. 

Piotrek
czw., 15 lip 2021 o 16:47 Till Rohrmann  napisał(a):
Thanks for updating the FLIP. Based on the new section about
 stop-with-savepoint [--drain] I got two other questions:

 1) Does endOfInput entail sending of the MAX_WATERMARK?

 2) StreamOperator.finish says to flush all buffered events. Would a
 WindowOperator close all windows and emit the results upon calling
 finish, for example?

 Cheers,
 Till

 On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann  wrote:

 > Thanks a lot for your answers and clarifications Yun.
 >
 > 1+2) Agreed, this can be a future improvement if this becomes a problem.
 >
 > 3) Great, this will help a lot with understanding the FLIP.
 >
 > Cheers,
 > Till
 >
 > On Wed, Jul 14, 2021 at 5:41 PM Yun Gao 
 > wrote:
 >
 >> Hi Till,
 >>
 >> Very thanks for the review and comments!
 >>
 >> 1) First I think in fact we could be able to do the computation outside
 >> of the main thread,
 >> and the current implementation mainly due to the computation is in
 >> general fast and we
 >> initially want to have a simplified first version.
 >>
 >> The main requirement here is to have a constant view of the state of the
 >> tasks, otherwise
 >> for example if we have A -> B, if A is running when we check if we need
 >> to trigger A,

Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-16 Thread Till Rohrmann
I think we should try to sort this out because it might affect how and when
finish() will be called (or in general how the operator lifecycle looks
like).

To give an example let's take a look at the stop-with-savepoint w/ and w/o
--drain:

1) stop-with-savepoint w/o --drain: Conceptually what we would like to do
is to stop processing w/o completing any windows or waiting for the
AsyncOperator to finish its operations. All these unfinished operations
should become part of the final checkpoint so that we can resume from it at
a later point. Depending on what finish() does (flush unfinished windows or
not), this method must or must not be called. Assuming that finish()
flushes unfinished windows/waits for uncompleted async operations, we
clearly shouldn't call it. But then there is the question, how do we
signal the operator that the next checkpoint is supposed to stop the
operator (how will the operator's lifecycle look in this case)? Maybe we
simply don't tell the operator and handle this situation on the StreamTask
level. If finish() does not flush unfinished windows, then it shouldn't be
a problem.

2) stop-with-savepoint w/ --drain: Here we want to complete all pending
operations and flush out all results because we don't intend to resume the
job. Conceptually, we tell the system that we have reached MAX_WATERMARK.
If finish() is defined so that it implicitly advances the watermark to
MAX_WATERMARK, then there is no problem. If finish() does not have this
semantic, then we need to send the MAX_WATERMARK before sending the
endOfInput event to a downstream task. In fact, stop-with-savepoint /w
--drain shouldn't be a lot different from a bounded source that reaches its
end. It would also send MAX_WATERMARK and then signal the endOfInput event
(note that endOfInput is decoupled from the event time here).

So I guess the question is will finish() advance the time to the end or is
this a separate mechanism (e.g. explicit watermarks).

Concerning how to handle processing time, I am a bit unsure tbh. I can see
arguments for completing processing time windows/firing processing time
timers when calling stop-with-savepoint w/ --drain. On the other hand, I
could also see that people want to define actions based on the wall clock
time that are independent of the stream state and, thus, would want to
ignore them if the Flink application is stopped before reaching this time.

Cheers,
Till

On Fri, Jul 16, 2021 at 7:48 AM Piotr Nowojski  wrote:

> Hi Till,
>
> > 1) Does endOfInput entail sending of the MAX_WATERMARK?
> >
> > 2) StreamOperator.finish says to flush all buffered events. Would a
> > WindowOperator close all windows and emit the results upon calling
> > finish, for example?
>
> 1) currently they are independent but parallel mechanisms. With event time,
> they are basically the same.
> 2) it probably should for the sake of processing time windows.
>
> Here you are touching the bit of the current design that I like the least.
> We basically have now three different ways of conveying very similar
> things:
> a) sending `MAX_WATERMARK`, used by event time WindowOperator (what about
> processing time?)
> b) endInput(), used for example by AsyncWaitOperator to flush it's internal
> state
> c) finish(), used for example by ContinuousFileReaderOperator
>
> It's a bit messy and I'm not sure if this should be strengthened out? Each
> one of those has a little bit different semantic/meaning, but at the same
> time they are very similar. For single input operators `endInput()` and
> `finish()` are actually the very same thing.
>
> Piotrek
>
> czw., 15 lip 2021 o 16:47 Till Rohrmann  napisał(a):
>
> > Thanks for updating the FLIP. Based on the new section about
> > stop-with-savepoint [--drain] I got two other questions:
> >
> > 1) Does endOfInput entail sending of the MAX_WATERMARK?
> >
> > 2) StreamOperator.finish says to flush all buffered events. Would a
> > WindowOperator close all windows and emit the results upon calling
> > finish, for example?
> >
> > Cheers,
> > Till
> >
> > On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann 
> > wrote:
> >
> > > Thanks a lot for your answers and clarifications Yun.
> > >
> > > 1+2) Agreed, this can be a future improvement if this becomes a
> problem.
> > >
> > > 3) Great, this will help a lot with understanding the FLIP.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jul 14, 2021 at 5:41 PM Yun Gao 
> > > wrote:
> > >
> > >> Hi Till,
> > >>
> > >> Very thanks for the review and comments!
> > >>
> > >> 1) First I think in fact we could be able to do the computation
> outside
> > >> of the main thread,
> > >> and the current implementation mainly due to the computation is in
> > >> general fast and we
> > >> initially want to have a simplified first version.
> > >>
> > >> The main requirement here is to have a constant view of the state of
> the
> > >> tasks, otherwise
> > >> for example if we have A -> B, if A is running when we check if we
> need
> > >> to trigger A, we will

Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-16 Thread Till Rohrmann
I think this is a good idea. +1 for this approach. Are you gonna update the
FLIP accordingly?

Cheers,
Till

On Thu, Jul 15, 2021 at 9:33 PM Steven Wu  wrote:

> I really like the new idea.
>
> On Thu, Jul 15, 2021 at 11:51 AM Piotr Nowojski 
> wrote:
>
> > Hi Till,
> >
> > >  I assume that buffer sizes are only
> > > changed for newly assigned buffers/credits, right? Otherwise, the data
> > > could already be on the wire and then it wouldn't fit on the receiver
> > side.
> > > Or do we have a back channel mechanism to tell the sender that a part
> of
> > a
> > > buffer needs to be resent once more capacity is available?
> >
> > Initially our implementation proposal was intending to implement the
> first
> > option. Buffer size would be attached to a credit message, so first
> > received would need to allocate a buffer with the updated size, send the
> > credit upstream, and sender would be allowed to only send as much data as
> > in the credit. So there would be no way and no problem with changing
> buffer
> > sizes while something is "on the wire".
> >
> > However Anton suggested an even simpler idea to me today. There is
> actually
> > no problem with receivers supporting all buffer sizes up to the maximum
> > allowed size (current configured memory segment size). Thus new buffer
> size
> > can be treated as a recommendation by the sender. We can announce a new
> > buffer size, and the sender will start capping the newly requested buffer
> > to that size, but we can still send already filled buffers in chunks with
> > any size, as long as it's below max memory segment size. In this way we
> can
> > leave any already filled in buffers on the sender side untouched and we
> do
> > not need to partition/slice them before sending them down, making at
> least
> > the initial version even simpler. This way we also do not need to
> > differentiate that different credits have different sizes. We just
> announce
> > a single value "recommended/requested buffer size".
> >
> > Piotrek
> >
> > czw., 15 lip 2021 o 17:27 Till Rohrmann 
> napisał(a):
> >
> > > Hi everyone,
> > >
> > > Thanks a lot for creating this FLIP Anton and Piotr. I think it looks
> > like
> > > a very promising solution for speeding up our checkpoints and being
> able
> > to
> > > create them more reliably.
> > >
> > > Following up on Steven's question: I assume that buffer sizes are only
> > > changed for newly assigned buffers/credits, right? Otherwise, the data
> > > could already be on the wire and then it wouldn't fit on the receiver
> > side.
> > > Or do we have a back channel mechanism to tell the sender that a part
> of
> > a
> > > buffer needs to be resent once more capacity is available?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Steven,
> > > >
> > > > As downstream/upstream nodes are decoupled, if downstream nodes
> adjust
> > > > first it's buffer size first, there will be a lag until this updated
> > > buffer
> > > > size information reaches the upstream node.. It is a problem, but it
> > has
> > > a
> > > > quite simple solution that we described in the FLIP document:
> > > >
> > > > > Sending the buffer of the right size.
> > > > > It is not enough to know just the number of available buffers
> > (credits)
> > > > for the downstream because the size of these buffers can be
> different.
> > > > > So we are proposing to resolve this problem in the following way:
> If
> > > the
> > > > downstream buffer size is changed then the upstream should send
> > > > > the buffer of the size not greater than the new one regardless of
> how
> > > big
> > > > the current buffer on the upstream. (pollBuffer should receive
> > > > > parameters like bufferSize and return buffer not greater than it)
> > > >
> > > > So apart from adding buffer size information to the `AddCredit`
> > message,
> > > we
> > > > will need to support a case where upstream subpartition has already
> > > > produced a buffer with older size (for example 32KB), while the next
> > > credit
> > > > arrives with an allowance for a smaller size (16KB). In that case, we
> > are
> > > > only allowed to send a portion of the data from this buffer that fits
> > > into
> > > > the new updated buffer size, and keep announcing the remaining part
> as
> > > > available backlog.
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > >
> > > > śr., 14 lip 2021 o 08:33 Steven Wu 
> napisał(a):
> > > >
> > > > >- The subtask observes the changes in the throughput and changes
> > the
> > > > >buffer size during the whole life period of the task.
> > > > >- The subtask sends buffer size and number of available buffers
> to
> > > the
> > > > >upstream to the corresponding subpartition.
> > > > >- Upstream changes the buffer size corresponding to the received
> > > > >information.
> > > > >- Upstream sends the data and number of filled buffers to the
> > > > downstream
> > > > >
> > > > >
> >