[jira] [Created] (FLINK-22270) Python test pipeline no output for 900 seconds

2021-04-13 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-22270:
-

 Summary: Python test pipeline no output for 900 seconds
 Key: FLINK-22270
 URL: https://issues.apache.org/jira/browse/FLINK-22270
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.12.2
Reporter: Guowei Ma


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16481=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=455fddbf-5921-5b71-25ac-92992ad80b28=18771



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22269) JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInSuspend failed.

2021-04-13 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-22269:
-

 Summary: 
JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInSuspend
 failed.
 Key: FLINK-22269
 URL: https://issues.apache.org/jira/browse/FLINK-22269
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Guowei Ma


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16451=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=5360d54c-8d94-5d85-304e-a89267eb785a=9197


{code:java}
java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertTrue(Assert.java:52)
at 
org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithRestartsHelper(JobMasterStopWithSavepointITCase.java:195)
at 
org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInSuspend(JobMasterStopWithSavepointITCase.java:161)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22268) JobMasterStopWithSavepointITCase.testRestartCheckpointCoordinatorIfStopWithSavepointFails fail because of "Not all required tasks are currently running."

2021-04-13 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-22268:
-

 Summary: 
JobMasterStopWithSavepointITCase.testRestartCheckpointCoordinatorIfStopWithSavepointFails
 fail because of "Not all required tasks are currently running."
 Key: FLINK-22268
 URL: https://issues.apache.org/jira/browse/FLINK-22268
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Guowei Ma


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16451=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=f508e270-48d6-5f1e-3138-42a17e0714f0=3899




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22267) Savepoint an application for source of upsert-kafka, then restart the application from the savepoint, state not be recovered.

2021-04-13 Thread Carl (Jira)
Carl created FLINK-22267:


 Summary: Savepoint an application for source of upsert-kafka, then 
restart the application from the savepoint, state not be recovered.  
 Key: FLINK-22267
 URL: https://issues.apache.org/jira/browse/FLINK-22267
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.12.2
Reporter: Carl
 Attachments: image-2021-04-14-11-17-00-207.png

My operation is as follows:

1. Savepoint an application for source of upsert-kafka

2. Delete the upsert Kafka topic data

3. restart the application from the savepoint

4. Log shows that the offset has been restored, but the state has not been 
restored

What I want to confirm is that restart from savepoint in the source 
upsert-kafka application not restore the state from savepoint state but restore 
the state from earliest offset kafka message?

If so, why reset offset:

!image-2021-04-14-11-17-00-207.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22266) JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithoutRestartsHelper fail

2021-04-13 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-22266:
-

 Summary: 
JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithoutRestartsHelper
 fail
 Key: FLINK-22266
 URL: https://issues.apache.org/jira/browse/FLINK-22266
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Guowei Ma


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16451=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=f508e270-48d6-5f1e-3138-42a17e0714f0=3884


{code:java}
[ERROR] 
throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate(org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase)
  Time elapsed: 0.154 s  <<< FAILURE!
java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertTrue(Assert.java:52)
at 
org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithoutRestartsHelper(JobMasterStopWithSavepointITCase.java:154)
at 
org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate(JobMasterStopWithSavepointITCase.java:138)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22265) Abnormal document display

2021-04-13 Thread BoYi Zhang (Jira)
BoYi Zhang created FLINK-22265:
--

 Summary: Abnormal document display
 Key: FLINK-22265
 URL: https://issues.apache.org/jira/browse/FLINK-22265
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: BoYi Zhang
 Attachments: image-2021-04-14-11-00-24-999.png, 
image-2021-04-14-11-01-47-226.png

 

As shown in the figure, the document displays an exception :

 

!image-2021-04-14-11-00-24-999.png!

 

After repair :

 

!image-2021-04-14-11-01-47-226.png!

 

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


subscribed

2021-04-13 Thread boyi
您好!


有什么问题随时与我联系和沟通,期待您的回信,祝好!


2021-04-14

[jira] [Created] (FLINK-22264) Fix misleading statement about per-job mode support for Kubernetes in Concept/Flink Architecture page

2021-04-13 Thread Fuyao Li (Jira)
Fuyao Li created FLINK-22264:


 Summary: Fix misleading statement about per-job mode support for 
Kubernetes in Concept/Flink Architecture page
 Key: FLINK-22264
 URL: https://issues.apache.org/jira/browse/FLINK-22264
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 1.12.2
Reporter: Fuyao Li
 Fix For: 1.13.0


I noticed a conflict in the document for per-job mode support for Kubernetes.

In the doc here [1], it mentions

in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.

It implies per job mode is supported in Kubernetes.

 

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

 

To avoid the misunderstanding, I think we should fix the statement in the 
concept page. I had a discussion with Yang Wang on flink user mailing list 
earlier. (link still not available in the archive for now.)

 

[1] 
[https://ci.apache.org/projects/flink/flink-docs-master/docs/concepts/flink-architecture/#flink-job-cluster]

[2] 
[https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#per-job-cluster-mode]

[3] 
[https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Add jobId and JobName variable to JobManager metrics in per-job deployment mode

2021-04-13 Thread Lu Niu
Hi, Flink dev

Could you share your thoughts about
https://issues.apache.org/jira/browse/FLINK-22164 ?

context:
We expose all flink metrics to an external system for monitoring and
alerting. However, JobManager metrics only have one variable ,
which is not enough to target to one job when job is deployed to YARN. If
flink job runs in per-job mode, which ensure one job per cluster, we can
add jobId and JobName to JobMananger metrics.


Best
Lu


Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-04-13 Thread Stephan Ewen
Thanks all for this discussion. Looks like there are lots of ideas and
folks that are eager to do things, so let's see how we can get this moving.

My take on this is the following:

There will probably not be one Hybrid source, but possibly multiple ones,
because of different strategies/requirements.
- One may be very simple, with switching points known up-front. Would
be good to have this in a very simple implementation.
- There may be one where the switch is dynamic and the readers need to
report back where they left off.
- There may be one that switches back and forth multiple times during a
job, for example Kakfa going to DFS whenever it falls behind retention, in
order to catch up again.

This also seems hard to "design on paper"; I expect there are nuances in a
production setup that affect some details of the design. So I'd feel most
comfortable in adding a variant of the hybrid source to Flink that has been
used already in a real use case (not necessarily in production, but maybe
in a testing/staging environment, so it seems to meet all requirements).


What do you think about the following approach?
  - If there is a tested PoC, let's try to get it contributed to Flink
without trying to make it much more general.
  - When we see similar but a bit different requirements for another hybrid
source, then let's try to evolve the contributed one.
  - If we see new requirements that are so different that they don't fit
well with the existing hybrid source, then let us look at building a second
hybrid source for those requirements.

We need to make connector contributions in general more easy, and I think
it is not a bad thing to end up with different approaches and see how these
play out against each other when being used by users. For example switching
with known boundaries, dynamic switching, back-and-forth-switching, etc.
(I know some committers are planning to do some work on making
connector contributions easier, with standardized testing frameworks,
decoupled CI, etc.)

Best,
Stephan


On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise  wrote:

> Hi,
>
> As mentioned in my previous email, I had been working on a prototype for
> the hybrid source.
>
> You can find it at https://github.com/tweise/flink/pull/1
>
> It contains:
> * Switching with configurable chain of sources
> * Fixed or dynamic start positions
> * Test with MockSource and FileSource
>
> The purpose of the above PR is to gather feedback and help drive consensus
> on the FLIP.
>
> * How to support a dynamic start position within the source chain?
>
> Relevant in those (few?) cases where start positions are not known upfront.
> You can find an example of what that might look like in the tests:
>
>
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
>
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
>
> When switching, the enumerator of the previous source needs to
> supply information about consumed splits that allows to set the start
> position for the next source. That could be something like the last
> processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> doesn't track finished splits.)
>
> See previous discussion regarding start/end position. The prototype shows
> the use of checkpoint state with converter function.
>
> * Should readers be deployed dynamically?
>
> The prototype assumes a static source chain that is fixed at job submission
> time. Conceivably there could be use cases that require more flexibility.
> Such as switching one KafkaSource for another. A step in that direction
> would be to deploy the actual readers dynamically, at the time of switching
> source.
>
> Looking forward to feedback and suggestions for next steps!
>
> Thomas
>
> On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise  wrote:
>
> > Hi Nicholas,
> >
> > Thanks for the reply. I had implemented a small PoC. It switches a
> > configurable sequence of sources with predefined bounds. I'm using the
> > unmodified MockSource for illustration. It does not require a
> "Switchable"
> > interface. I looked at the code you shared and the delegation and
> signaling
> > works quite similar. That's a good validation.
> >
> > Hi Kezhu,
> >
> > Thanks for bringing the more detailed discussion regarding the start/end
> > position. I think in most cases the start and end positions will be known
> > when the job is submitted. If we take a File -> Kafka source chain as
> > example, there would most likely be a timestamp at which we want to
> > transition from files to reading from Kafka. So we would either set the
> > start position for Kafka based on that timestamp or provide the offsets
> > directly. (Note that I'm skipping a few related nuances here. In order to
> > achieve an exact switch without duplication or gap, we may also need some
> > overlap and filtering, but that's a separate issue.)
> >
> > The point 

Re: [DISCUSS] Backport FLIP-27 Kafka source connector fixes with API change to release-1.12.

2021-04-13 Thread Stephan Ewen
Hi all!

Generally, avoiding API changes in Bug fix versions is the right thing, in
my opinion.

But this case is a bit special, because we are changing something that
never worked properly in the first place.
So we are not breaking a "running thing" here, but making it usable.

So +1 from my side to backport these changes, I think we make more users
happy than angry with this.

Best,
Stephan


On Thu, Apr 8, 2021 at 11:35 AM Becket Qin  wrote:

> Hi Arvid,
>
> There are interface changes to the Kafka source, and there is a backwards
> compatible change in the base source implementation. Therefore technically
> speaking, users might be able to run the Kafka source in 1.13 with a 1.12
> Flink job. However, it could be tricky because there might be some
> dependent jar conflicts between 1.12 and 1.13. So this solution seems a
> little fragile.
>
> I'd second Till's question if there is an issue for users that start with
> > the current Kafka source (+bugfixes) to later upgrade to 1.13 Kafka
> source
> > with API changes.
>
>
> Just to clarify, the bug fixes themselves include API changes, they are not
> separable. So we basically have three options here:
>
> 1. Do not backport fixes in 1.12. So users have to upgrade to 1.13 in order
> to use the new Kafka source.
> 2. Write some completely different fixes for release 1.12 and ask users to
> migrate to the new API when they upgrade to 1.13
> 3. Backport the fix with API changes to 1.12. So users don't need to handle
> interface change when they upgrade to 1.13+.
>
> Personally I think option 3 here is better because it does not really
> introduce any trouble to the users. The downside is that we do need to
> change the API of Kafka source in 1.12. Given that the changed API won't be
> useful without these bug fixes, changing the API seems to be doing more
> good than bad here.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Thu, Apr 8, 2021 at 2:39 PM Arvid Heise  wrote:
>
> > Hi Becket,
> >
> > did you need to change anything to the source interface itself? Wouldn't
> it
> > be possible for users to simply use the 1.13 connector with their Flink
> > 1.12 deployment?
> >
> > I think the late-upgrade argument can be made for any feature, but I also
> > see that the Kafka connector is of high interest.
> >
> > I'd second Till's question if there is an issue for users that start with
> > the current Kafka source (+bugfixes) to later upgrade to 1.13 Kafka
> source
> > with API changes.
> >
> > Best,
> >
> > Arvid
> >
> > On Thu, Apr 8, 2021 at 2:54 AM Becket Qin  wrote:
> >
> > > Thanks for the comment, Till and Thomas.
> > >
> > > As far as I know there are some users who have just upgraded their
> Flink
> > > version from 1.8 / 1.9 to Flink 1.12 and might not upgrade the version
> > in 6
> > > months or more. There are also some organizations that have the
> strategy
> > of
> > > not running the latest version of a project, but only the second latest
> > > version with bug fixes. So those users may still benefit from the
> > backport.
> > > However, arguably the old Kafka source is there anyways in 1.12, so
> they
> > > should not be blocked on having the new source.
> > >
> > > I am leaning towards backporting the fixes mainly because this way we
> > might
> > > have more users migrating to the new Source and provide feedback. It
> will
> > > take some time for the users to pick up 1.13, especially for the users
> > > running Flink at large scale. So backporting the fixes to 1.12 would
> help
> > > get the new source to be used sooner.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Apr 8, 2021 at 12:40 AM Thomas Weise  wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for fixing the new KafkaSource issues.
> > > >
> > > > I'm interested in using these fixes with 1.12 for experimental
> > purposes.
> > > >
> > > > +1 for backporting. 1.12 is the current stable release and users who
> > > would
> > > > like to try the FLIP-27 sources are likely to use that release.
> > > >
> > > > Thomas
> > > >
> > > > On Wed, Apr 7, 2021 at 2:50 AM Till Rohrmann 
> > > wrote:
> > > >
> > > > > Hi Becket,
> > > > >
> > > > > If I remember correctly, then we deliberately not documented the
> > Kafka
> > > > > connector in the 1.12 release. Hence, from this point there should
> be
> > > no
> > > > > need to backport any fixes because users are not aware of this
> > feature.
> > > > >
> > > > > On the other hand this also means that we should be able to break
> > > > anything
> > > > > we want to. Consequently, backporting these fixes should be
> possible.
> > > > >
> > > > > The question would probably be whether we want to ship new features
> > > with
> > > > a
> > > > > bug fix release. Do we know of any users who want to use the new
> > Kafka
> > > > > source, are using the 1.12 version and cannot upgrade to 1.13 once
> it
> > > is
> > > > > released? If this is the case, then this could be an argument for
> > > > shipping
> > > > > this feature with 

Re: Automatic backpressure detection

2021-04-13 Thread Lu Niu
Cool. Thanks!

Best
Lu

On Mon, Apr 12, 2021 at 11:27 PM Piotr Nowojski 
wrote:

> Hi,
>
> Yes. Back-pressure from AsyncOperator should be correctly reported via
> isBackPressured, backPressuredMsPerSecond metrics and by extension in the
> WebUI from 1.13.
>
> Piotre
>
> pon., 12 kwi 2021 o 23:17 Lu Niu  napisał(a):
>
> > Hi, Piotr
> >
> > Thanks for your detailed reply! It is mentioned here we cannot observe
> > backpressure generated from  AsyncOperator in Flink UI in 1.9.1. Is it
> > fixed in the latest version? Thank you!
> >
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Function-Not-Generating-Backpressure-td26766.html
> >
> > Best
> > Lu
> >
> > On Tue, Apr 6, 2021 at 11:14 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi,
> > >
> > > Yes, you can use `isBackPressured` to monitor a task's back-pressure.
> > > However keep in mind:
> > > a) You are going to miss some nice way to visualize this information,
> > which
> > > is present in 1.13's WebUI.
> > > b) `isBackPressured` is a sampling based metric. If your job has
> varying
> > > load, for example all windows firing at the same processing time, every
> > > couple of seconds, causing intermittent back-pressure, this metric will
> > > show it randomly as `true` or `false`.
> > > c) `isBackPressured` is slightly less accurate compared to
> > > `backPressuredTimeMsPerSecond`. There are some corner cases when for a
> > > brief amount of time it can return `true`, while a task is still
> running,
> > > while the time based metrics work in a different much more accurate
> way.
> > >
> > > About back porting the patches, if you want to create a custom Flink
> > build
> > > it should be do-able. There will be some conflicts for sure, so you
> will
> > > need to understand Flink's code.
> > >
> > > Best,
> > > Piotrek
> > >
> > > śr., 7 kwi 2021 o 02:32 Lu Niu  napisał(a):
> > >
> > > > Hi, Piotr
> > > >
> > > > Thanks for replying!
> > > >
> > > > We don't have a plan to upgrade to 1.13 in short term. We are using
> > flink
> > > > 1.11 and I notice there is a metric called isBackpressured. Is that
> > > enough
> > > > to solve 1? If not, would backporting patches regarding
> > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> > idleTimeMsPerSecond
> > > > work? And do you have an estimate of how difficult it is?
> > > >
> > > >
> > > > Best
> > > > Lu
> > > >
> > > >
> > > >
> > > > On Tue, Apr 6, 2021 at 12:18 AM Piotr Nowojski  >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Lately we overhauled the backpressure detection [1] and a
> screenshot
> > > > > preview of those efforts is attached here [2]. I encourage you to
> > check
> > > > the
> > > > > 1.13 RC0 build and how the current mechanism works for you [3]. To
> > > > support
> > > > > those WebUI changes we have added a couple of new metrics:
> > > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> > > > idleTimeMsPerSecond.
> > > > >
> > > > > 1. I believe that solves 1.
> > > > > 2. This still requires a bit of manual investigation. Once you
> locate
> > > > > backpressuring task, you can check the detail subtask stats to
> check
> > if
> > > > all
> > > > > parallel instances are uniformly backpressured/busy or not. If you
> > > would
> > > > > like to add a hint "it looks like you have a data skew in Task XYZ
> ",
> > > > that
> > > > > I believe could be added to the WebUI.
> > > > > 3. The tricky part is how to display this kind of information.
> > > Currently
> > > > I
> > > > > would recommend just export/report
> > > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> > > idleTimeMsPerSecond
> > > > > metrics for every task to an external system and  display them for
> > > > example
> > > > > in Graphana.
> > > > >
> > > > > The blog post you are referencing is quite outdated, especially
> with
> > > > those
> > > > > new changes from 1.13. I'm hoping to write a new one pretty soon.
> > > > >
> > > > > Piotrek
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-14712
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-14814?focusedCommentId=17256926=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17256926
> > > > > [3]
> > > > >
> > > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/202104.mbox/%3c1d2412ce-d4d0-ed50-6181-1b610e16d...@apache.org%3E
> > > > >
> > > > > pon., 5 kwi 2021 o 23:20 Lu Niu  napisał(a):
> > > > >
> > > > > > Hi, Flink dev
> > > > > >
> > > > > > Lately, we want to develop some tools to:
> > > > > > 1. show backpressure operator without manual operation
> > > > > > 2. Provide suggestions to mitigate back pressure after checking
> > data
> > > > > skew,
> > > > > > external service RPC etc.
> > > > > > 3. Show back pressure history
> > > > > >
> > > > > > Could anyone share their experience with such tooling?
> > > > > > Also, I notice backpressure monitoring and detection is 

[jira] [Created] (FLINK-22263) Using TIMESTAMPADD function with partition value has some problem when push partition into TableSource

2021-04-13 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-22263:
-

 Summary: Using TIMESTAMPADD function with partition value has some 
problem  when push partition into  TableSource
 Key: FLINK-22263
 URL: https://issues.apache.org/jira/browse/FLINK-22263
 Project: Flink
  Issue Type: Bug
Reporter: hehuiyuan


SQL (table api):
{code:java}
CREATE CATALOG myhive
WITH (
'type' = 'hive',
'default-database' = 'hhy'
);

INSERT INTO  default_catalog.default_database.table_sink select * from  
myhive.hhy.tmp_flink_test where dt=CAST(TIMESTAMPADD(DAY, -1, CURRENT_DATE) as 
varchar);

{code}
 

Error log:
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Data 
type 'INTERVAL SECOND(3) NOT NULL' with conversion class 'java.time.Duration' 
does not support a value literal of class 'java.math.BigDecimal'.Exception in 
thread "main" org.apache.flink.table.api.ValidationException: Data type 
'INTERVAL SECOND(3) NOT NULL' with conversion class 'java.time.Duration' does 
not support a value literal of class 'java.math.BigDecimal'. at 
org.apache.flink.table.expressions.ValueLiteralExpression.validateValueDataType(ValueLiteralExpression.java:286)
 at 
org.apache.flink.table.expressions.ValueLiteralExpression.(ValueLiteralExpression.java:79)
 at 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral(ApiExpressionUtils.java:251)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:432)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:340)
 at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1173) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
 at 

[jira] [Created] (FLINK-22262) Flink on Kubernetes ConfigMaps are created without OwnerReference

2021-04-13 Thread Andrea Peruffo (Jira)
Andrea Peruffo created FLINK-22262:
--

 Summary: Flink on Kubernetes ConfigMaps are created without 
OwnerReference
 Key: FLINK-22262
 URL: https://issues.apache.org/jira/browse/FLINK-22262
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.0
Reporter: Andrea Peruffo


According to the documentation:
[https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#manual-resource-cleanup]

The ConfigMaps created along with the Flink deployment is supposed to have an 
OwnerReference pointing to the Deployment itself, unfortunately, this doesn't 
happen and causes all sorts of issues when the classpath and the jars of the 
job are updated.
i.e.:
Without manually removing the ConfigMap of the Job I cannot update the Jars of 
the Job.

Can you please give guidance if there are additional caveats on manually 
removing the ConfigMap? Any other workaround that can be used?

Thanks in advance.

Example ConfigMap:

{{apiVersion: v1}}
{{data:}}
{{ address: akka.tcp://flink@10.0.2.13:6123/user/rpc/jobmanager_2}}
{{ checkpointID-049: 
rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAABOEtzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAUC9tbnQvZmxpbmsvc3RvcmFnZS9rc2hhL3RheGktcmlkZS1mYXJlLXByb2Nlc3Nvci9jb21wbGV0ZWRDaGVja3BvaW50MDQ0YTc2OWRkNDgxeA==}}
{{ counter: "50"}}
{{ sessionId: 0c2b69ee-6b41-48d3-b7fd-1bf2eda94f0f}}
{{kind: ConfigMap}}
{{metadata:}}
{{ annotations:}}
{{ control-plane.alpha.kubernetes.io/leader: 
'\{"holderIdentity":"0f25a2cc-e212-46b0-8ba9-faac0732a316","leaseDuration":15.0,"acquireTime":"2021-04-13T14:30:51.439000Z","renewTime":"2021-04-13T14:39:32.011000Z","leaderTransitions":105}'}}
{{ creationTimestamp: "2021-04-13T14:30:51Z"}}
{{ labels:}}
{{ app: taxi-ride-fare-processor}}
{{ configmap-type: high-availability}}
{{ type: flink-native-kubernetes}}
{{ name: 
taxi-ride-fare-processor--jobmanager-leader}}
{{ namespace: taxi-ride-fare}}
{{ resourceVersion: "64100"}}
{{ selfLink: 
/api/v1/namespaces/taxi-ride-fare/configmaps/taxi-ride-fare-processor--jobmanager-leader}}
{{ uid: 9f912495-382a-45de-a789-fd5ad2a2459d}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22261) Python StreamingModeDataStreamTests is failed on Azure

2021-04-13 Thread Jark Wu (Jira)
Jark Wu created FLINK-22261:
---

 Summary: Python StreamingModeDataStreamTests is failed on Azure
 Key: FLINK-22261
 URL: https://issues.apache.org/jira/browse/FLINK-22261
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0
Reporter: Jark Wu
 Fix For: 1.13.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16443=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3

{code}
2021-04-13T11:49:32.1640428Z === FAILURES 
===
2021-04-13T11:49:32.1641478Z _ 
StreamingModeDataStreamTests.test_keyed_process_function_with_state __
2021-04-13T11:49:32.1641744Z 
2021-04-13T11:49:32.1642074Z self = 

2021-04-13T11:49:32.1642359Z 
2021-04-13T11:49:32.1642606Z def 
test_keyed_process_function_with_state(self):
2021-04-13T11:49:32.1644412Z self.env.set_parallelism(1)
2021-04-13T11:49:32.1644941Z 
self.env.get_config().set_auto_watermark_interval(2000)
2021-04-13T11:49:32.1645447Z 
self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
2021-04-13T11:49:32.1647182Z data_stream = 
self.env.from_collection([(1, 'hi', '1603708211000'),
2021-04-13T11:49:32.1648276Z 
(2, 'hello', '1603708224000'),
2021-04-13T11:49:32.1661775Z 
(3, 'hi', '1603708226000'),
2021-04-13T11:49:32.1663379Z 
(4, 'hello', '1603708289000'),
2021-04-13T11:49:32.1665197Z 
(5, 'hi', '1603708291000'),
2021-04-13T11:49:32.1666200Z 
(6, 'hello', '1603708293000')],
2021-04-13T11:49:32.1666827Z
type_info=Types.ROW([Types.INT(), Types.STRING(),
2021-04-13T11:49:32.1667449Z
 Types.STRING()]))
2021-04-13T11:49:32.1667830Z 
2021-04-13T11:49:32.1668351Z class 
MyTimestampAssigner(TimestampAssigner):
2021-04-13T11:49:32.1668755Z 
2021-04-13T11:49:32.1669783Z def extract_timestamp(self, value, 
record_timestamp) -> int:
2021-04-13T11:49:32.1670386Z return int(value[2])
2021-04-13T11:49:32.1670672Z 
2021-04-13T11:49:32.1671063Z class 
MyProcessFunction(KeyedProcessFunction):
2021-04-13T11:49:32.1671434Z 
2021-04-13T11:49:32.1671727Z def __init__(self):
2021-04-13T11:49:32.1672090Z self.value_state = None
2021-04-13T11:49:32.1685812Z self.list_state = None
2021-04-13T11:49:32.1686276Z self.map_state = None
2021-04-13T11:49:32.1686609Z 
2021-04-13T11:49:32.1687039Z def open(self, runtime_context: 
RuntimeContext):
2021-04-13T11:49:32.1688350Z value_state_descriptor = 
ValueStateDescriptor('value_state', Types.INT())
2021-04-13T11:49:32.1688953Z self.value_state = 
runtime_context.get_state(value_state_descriptor)
2021-04-13T11:49:32.1689892Z list_state_descriptor = 
ListStateDescriptor('list_state', Types.INT())
2021-04-13T11:49:32.1690492Z self.list_state = 
runtime_context.get_list_state(list_state_descriptor)
2021-04-13T11:49:32.1691407Z map_state_descriptor = 
MapStateDescriptor('map_state', Types.INT(), Types.STRING())
2021-04-13T11:49:32.1692052Z self.map_state = 
runtime_context.get_map_state(map_state_descriptor)
2021-04-13T11:49:32.1692481Z 
2021-04-13T11:49:32.1693134Z def process_element(self, value, ctx):
2021-04-13T11:49:32.1693632Z current_value = 
self.value_state.value()
2021-04-13T11:49:32.1694106Z self.value_state.update(value[0])
2021-04-13T11:49:32.1694573Z current_list = [_ for _ in 
self.list_state.get()]
2021-04-13T11:49:32.1695051Z self.list_state.add(value[0])
2021-04-13T11:49:32.1695445Z map_entries_string = []
2021-04-13T11:49:32.1695902Z for k, v in self.map_state.items():
2021-04-13T11:49:32.1696822Z 
map_entries_string.append(str(k) + ': ' + str(v))
2021-04-13T11:49:32.1697700Z map_entries_string = '{' + ', 
'.join(map_entries_string) + '}'
2021-04-13T11:49:32.1698483Z self.map_state.put(value[0], 
value[1])
2021-04-13T11:49:32.1698941Z current_key = ctx.get_current_key()
2021-04-13T11:49:32.1699840Z yield "current key: {}, current 
value state: {}, current list state: {}, " \
2021-04-13T11:49:32.1700593Z   "current map state: {}, 
current value: {}".format(str(current_key),
2021-04-13T11:49:32.1701275Z 

[jira] [Created] (FLINK-22260) Source schema in CREATE TABLE LIKE statements is not inferred correctly

2021-04-13 Thread Jira
Ingo Bürk created FLINK-22260:
-

 Summary: Source schema in CREATE TABLE LIKE statements is not 
inferred correctly
 Key: FLINK-22260
 URL: https://issues.apache.org/jira/browse/FLINK-22260
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.0
Reporter: Ingo Bürk


When using a LIKE statement such as in the following (assume some_sink and 
some_source to be two tables with the same schema)
{code:java}
CREATE TEMPORARY TABLE b LIKE some_sink
INSERT INTO b SELECT * FROM some_source{code}
the source schema for the INSERT operation is not actually inferred correctly, 
causing the entire query to fail:
{quote}org.apache.flink.table.api.ValidationException: Column types of query 
result and sink for registered table 'default.default.b' do not match.
Cause: Different number of columns.

Query schema: [name: STRING, ts: TIMESTAMP(3) *ROWTIME*]
Sink schema:  []
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22259) UnalignedCheckpointITCase fails with "Value too large for header, this indicates that the test is running too long"

2021-04-13 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22259:


 Summary: UnalignedCheckpointITCase fails with "Value too large for 
header, this indicates that the test is running too long"
 Key: FLINK-22259
 URL: https://issues.apache.org/jira/browse/FLINK-22259
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.13.0
Reporter: Dawid Wysakowicz
 Fix For: 1.13.0


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16419=logs=34f41360-6c0d-54d3-11a1-0292a2def1d9=2d56e022-1ace-542f-bf1a-b37dd63243f2=9672]

 
{code:java}
2021-04-13T07:37:31.9388503Z [ERROR] execute[pipeline with remote channels, p = 
1, timeout = 0](org.apache.flink.test.checkpointing.UnalignedCheckpointITCase)  
Time elapsed: 1,420.285 s  <<< ERROR!
2021-04-13T07:37:31.9395135Z 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
2021-04-13T07:37:31.9395717Zat 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
2021-04-13T07:37:31.9396274Zat 
org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:168)
2021-04-13T07:37:31.9396866Zat 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:274)
2021-04-13T07:37:31.9397318Zat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2021-04-13T07:37:31.9397723Zat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2021-04-13T07:37:31.9398312Zat 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2021-04-13T07:37:31.9398724Zat 
java.lang.reflect.Method.invoke(Method.java:498)
2021-04-13T07:37:31.9401916Zat 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
2021-04-13T07:37:31.9402764Zat 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2021-04-13T07:37:31.9403756Zat 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
2021-04-13T07:37:31.9404222Zat 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2021-04-13T07:37:31.9404624Zat 
org.junit.rules.Verifier$1.evaluate(Verifier.java:35)
2021-04-13T07:37:31.9405008Zat 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
2021-04-13T07:37:31.9405449Zat 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
2021-04-13T07:37:31.9405855Zat 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
2021-04-13T07:37:31.9406362Zat 
org.junit.rules.RunRules.evaluate(RunRules.java:20)
2021-04-13T07:37:31.9406774Zat 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
2021-04-13T07:37:31.9407512Zat 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
2021-04-13T07:37:31.9408202Zat 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
2021-04-13T07:37:31.9408655Zat 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
2021-04-13T07:37:31.9409083Zat 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
2021-04-13T07:37:31.9409521Zat 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
2021-04-13T07:37:31.9410114Zat 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
2021-04-13T07:37:31.9410775Zat 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
2021-04-13T07:37:31.9411398Zat 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
2021-04-13T07:37:31.9411914Zat 
org.junit.runners.Suite.runChild(Suite.java:128)
2021-04-13T07:37:31.9412292Zat 
org.junit.runners.Suite.runChild(Suite.java:27)
2021-04-13T07:37:31.9412670Zat 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
2021-04-13T07:37:31.9413097Zat 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
2021-04-13T07:37:31.9413538Zat 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
2021-04-13T07:37:31.9413964Zat 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
2021-04-13T07:37:31.9414405Zat 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
2021-04-13T07:37:31.9414834Zat 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
2021-04-13T07:37:31.9415263Zat 
org.junit.rules.RunRules.evaluate(RunRules.java:20)
2021-04-13T07:37:31.9415661Zat 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
2021-04-13T07:37:31.9416099Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
2021-04-13T07:37:31.9416773Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
2021-04-13T07:37:31.9417404Zat 

Re: [DISCUSS] Flink should rely on flink-shaded SNAPSHOT artifacts

2021-04-13 Thread Chesnay Schepler
@Robert We can workaround the snapshot limit issue fairly easily; this 
limit is imposed per version, so if we modify the version to include the 
commit hash this limit does not apply. This should also make it easier 
to work with from the Flink side because a commit hash is easier to 
copy than figuring out which timestamp you need.


@Till Yes, the release process involves too many manual steps, and I 
don't want to spend time on it unless I'm sure we don't need another 
one. Many of these cannot be automated AFAIK, like the book-keeping at 
reporter.apache.org, preparing and handling the voting threads, 
preparing the flink-web PR, managing the snapshot repository, and so on.


As for removing the SNAPSHOT version, we can enable an enforcer rule to 
forbid snapshot dependencies. It's a good safeguard in any case. We can 
furthermore automate the removal, and do it as part of the release 
branch creation.


On 4/13/2021 10:47 AM, Till Rohrmann wrote:
Thanks for creating this proposal Chesnay. I do understand the problem 
you want to fix.


What I am wondering is why we don't release flink-shaded more often. 
Does the release process cause too much overhead? If this is the case, 
then we could look into what is causing the overhead and whether we 
can improve the situation. Concerning the noise, I personally don't 
see it as a problem.


My main concern is that it can easily slip our minds to change the 
flink-shaded SNAPSHOT version to a non SNAPSHOT version and that it 
introduces another manual step. If we forget to change the version, 
then the Flink release does not build against a stable set of 
dependencies. Moreover, I also second Robert's concern that a single 
commit to flink-shaded can then break downstream projects (Flink in 
this case) if we rely on the SNAPSHOT builds. Having to scan poms for 
some references sounds like an indicator that this might not be the 
most straight forward approach.


Cheers,
Till

On Tue, Apr 13, 2021 at 9:26 AM Robert Metzger > wrote:


Thanks a lot for your responses.

I didn't know that you can explicitly refer to the timestamped
snapshots of
the artifacts. The limitation to the last 2 snapshots means that a
push to
flink-shaded can break our main CI? This sounds very fragile to
me, given
that the setup itself is probably a bit uncommon and difficult to
understand.

Maybe we should add an automated check to flink-shaded that warns
if a PR
would break Flink's CI? (by checking out flink and scanning the
poms for
references to a timestamp-to-be-deleted)
Or we ask Infra to keep more than two snapshots for flink-shaded?



On Mon, Apr 12, 2021 at 4:41 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

> a) yes.
> b) maven by default adds a timestamp to snapshot artifacts that
we can
> use. The apache repository retains the last 2 snapshots, so we
do need
> to keep things in sync a fair bit, but there are rarely commits
made in
> flink-shaded that I don't think this will be a problem.
> c) a-SNAPSHOT-uniquesuffix => a.0
>
> On 4/12/2021 3:07 PM, Robert Metzger wrote:
> > Thanks a lot for your proposal, I'm generally open to the idea
> >
> > I have a few questions:
> > a) Does this mean that we are changing flink-shaded to deploy
snapshot
> > artifacts to Apache's snapshot maven repository, and change
Flink's
> parent
> > pom to point to this snapshot repo?
> > b) How do you plan to generate the unique SNAPSHOT version on
CI? Will we
> > increment the version on every push to flink-shaded:master ?
> > c) How do the unique SNAPSHOT versions relate to the final release
> versions?
> >
> >
> >
> >
> > On Mon, Apr 12, 2021 at 1:48 PM Konstantin Knauf
mailto:kna...@apache.org>>
> wrote:
> >
> >> Sounds good. +1
> >>
> >> On Mon, Apr 12, 2021 at 1:23 PM Chesnay Schepler
mailto:ches...@apache.org>>
> >> wrote:
> >>
> >>> Hello all,
> >>>
> >>> I would like to propose a change in how the Flink master
interacts with
> >>> Flink-shaded.
> >>>
> >>> TL;DR: Release snapshot artifacts for flink-shaded, and have
the Flink
> >>> master rely on specific snapshot versions for earlier
dependency bumps.
> >>>
> >>>
> >>> Aa a project we have come to the general conclusion that
dependencies
> >>> should be bumped as early in the release cycle as possible.
This both
> >>> prevents cases where some undefined amount of work is still
waiting for
> >>> as when we want to release the next version (working against
the goal
> of
> >>> always being in a releasable state), and it gives us more
time to
> >>> evaluate the stability and performance of system. Finally it
gives us
> >>> ample time to look for alternatives if an issue is found.

[jira] [Created] (FLINK-22258) Adaptive Scheduler: Show history of rescales in the Web UI

2021-04-13 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-22258:
--

 Summary: Adaptive Scheduler: Show history of rescales in the Web UI
 Key: FLINK-22258
 URL: https://issues.apache.org/jira/browse/FLINK-22258
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Web Frontend
Reporter: Robert Metzger


As a user, I would like to see the history of rescale events in the web UI 
(number of slots used, task parallelisms)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22257) Clarify Flink ConfigOptions Usage

2021-04-13 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-22257:
---

 Summary: Clarify Flink ConfigOptions Usage
 Key: FLINK-22257
 URL: https://issues.apache.org/jira/browse/FLINK-22257
 Project: Flink
  Issue Type: Improvement
Reporter: Fabian Paul


For users, it is hard to determine which ConfigOptions are relevant for the 
different stages of a Flink application.

Beginning from the translation of the user program to the execution on the 
cluster. In particular which options can be configured through the different 
channels.
 * Cluster configuration (i.e. flink-conf.yaml)
 * Application configuration, code-based

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22256) Persist checkpoint type information

2021-04-13 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-22256:
---

 Summary: Persist checkpoint type information
 Key: FLINK-22256
 URL: https://issues.apache.org/jira/browse/FLINK-22256
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Fabian Paul


As a user, it is retrospectively difficult to determine what kind of checkpoint 
(i.e. incremental, unaligned, ...) was performed when looking only at the 
persisted checkpoint metadata.

The only way would be to look into the execution configuration of the job which 
might not be available anymore and can be scattered across the application code 
and cluster configuration.

It would be highly beneficial if such information would be part of the 
persisted metadata to not track these external pointers.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22255) AdaptiveScheduler improvements

2021-04-13 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-22255:
-

 Summary: AdaptiveScheduler improvements
 Key: FLINK-22255
 URL: https://issues.apache.org/jira/browse/FLINK-22255
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Till Rohrmann
 Fix For: 1.14.0


This ticket collects the improvements for the {{AdaptiveScheduler}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22254) Only trigger scale up if the resources have stabilized

2021-04-13 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-22254:
-

 Summary: Only trigger scale up if the resources have stabilized
 Key: FLINK-22254
 URL: https://issues.apache.org/jira/browse/FLINK-22254
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Till Rohrmann
 Fix For: 1.14.0


In order to improve the scale up behaviour of the {{AdaptiveScheduler}} we 
should only trigger a scale up operation if the resource set has stabilized. 
That way it should be possible to immediately restart the job after its 
cancellation if the resource set has not changed in the meantime.

One way to implement this behaviour is to only notify the {{State}} about new 
resources after they have stabilized.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Flink should rely on flink-shaded SNAPSHOT artifacts

2021-04-13 Thread Till Rohrmann
Thanks for creating this proposal Chesnay. I do understand the problem you
want to fix.

What I am wondering is why we don't release flink-shaded more often. Does
the release process cause too much overhead? If this is the case, then we
could look into what is causing the overhead and whether we can improve the
situation. Concerning the noise, I personally don't see it as a problem.

My main concern is that it can easily slip our minds to change the
flink-shaded SNAPSHOT version to a non SNAPSHOT version and that it
introduces another manual step. If we forget to change the version, then
the Flink release does not build against a stable set of dependencies.
Moreover, I also second Robert's concern that a single commit to
flink-shaded can then break downstream projects (Flink in this case) if we
rely on the SNAPSHOT builds. Having to scan poms for some references sounds
like an indicator that this might not be the most straight forward approach.

Cheers,
Till

On Tue, Apr 13, 2021 at 9:26 AM Robert Metzger  wrote:

> Thanks a lot for your responses.
>
> I didn't know that you can explicitly refer to the timestamped snapshots of
> the artifacts. The limitation to the last 2 snapshots means that a push to
> flink-shaded can break our main CI? This sounds very fragile to me, given
> that the setup itself is probably a bit uncommon and difficult to
> understand.
>
> Maybe we should add an automated check to flink-shaded that warns if a PR
> would break Flink's CI? (by checking out flink and scanning the poms for
> references to a timestamp-to-be-deleted)
> Or we ask Infra to keep more than two snapshots for flink-shaded?
>
>
>
> On Mon, Apr 12, 2021 at 4:41 PM Chesnay Schepler 
> wrote:
>
> > a) yes.
> > b) maven by default adds a timestamp to snapshot artifacts that we can
> > use. The apache repository retains the last 2 snapshots, so we do need
> > to keep things in sync a fair bit, but there are rarely commits made in
> > flink-shaded that I don't think this will be a problem.
> > c) a-SNAPSHOT-uniquesuffix => a.0
> >
> > On 4/12/2021 3:07 PM, Robert Metzger wrote:
> > > Thanks a lot for your proposal, I'm generally open to the idea
> > >
> > > I have a few questions:
> > > a) Does this mean that we are changing flink-shaded to deploy snapshot
> > > artifacts to Apache's snapshot maven repository, and change Flink's
> > parent
> > > pom to point to this snapshot repo?
> > > b) How do you plan to generate the unique SNAPSHOT version on CI? Will
> we
> > > increment the version on every push to flink-shaded:master ?
> > > c) How do the unique SNAPSHOT versions relate to the final release
> > versions?
> > >
> > >
> > >
> > >
> > > On Mon, Apr 12, 2021 at 1:48 PM Konstantin Knauf 
> > wrote:
> > >
> > >> Sounds good. +1
> > >>
> > >> On Mon, Apr 12, 2021 at 1:23 PM Chesnay Schepler 
> > >> wrote:
> > >>
> > >>> Hello all,
> > >>>
> > >>> I would like to propose a change in how the Flink master interacts
> with
> > >>> Flink-shaded.
> > >>>
> > >>> TL;DR: Release snapshot artifacts for flink-shaded, and have the
> Flink
> > >>> master rely on specific snapshot versions for earlier dependency
> bumps.
> > >>>
> > >>>
> > >>> Aa a project we have come to the general conclusion that dependencies
> > >>> should be bumped as early in the release cycle as possible. This both
> > >>> prevents cases where some undefined amount of work is still waiting
> for
> > >>> as when we want to release the next version (working against the goal
> > of
> > >>> always being in a releasable state), and it gives us more time to
> > >>> evaluate the stability and performance of system. Finally it gives us
> > >>> ample time to look for alternatives if an issue is found.
> > >>>
> > >>> Currently, this conclusion is at odds with how we handle
> flink-shaded.
> > >>> Flink has always relied on flink-shaded artifacts that went through a
> > >>> proper release cycle. However, since we want to create as few
> releases
> > >>> as possible due to the overhead/noise/etc., flink-shaded releases are
> > >>> typically relegated to the end of the release cycle.
> > >>> This is particularly troublesome since flink-shaded dependencies are
> > >>> used in the core of Flink, and hence usage of them cannot be avoided.
> > >>>
> > >>> As a compromise between these 2 goals I propose the following:
> > >>> - we deploy SNAPSHOT artifacts for flink-shaded for every change made
> > >>> - every deployed artifact has a unique version, that is automatically
> > >>> set via maven (=> no overhead on our side)
> > >>> - once such an artifact is released we update the Flink dependency to
> > >>> point to this _specific_ flink-shaded snapshot artifact
> > >>>   - to be clear, this is a manual step, which implies that things
> > >>> cannot break all of a sudden because something was pushed to
> > flink-shaded
> > >>> - once the Flink release cycle ends, we publish a proper flink-shaded
> > >>> release, and change the Flink dependency in the release branch
> > 

Re: handle SUSPENDED in ZooKeeperLeaderRetrievalService

2021-04-13 Thread Till Rohrmann
Hi Chenqin,

The current rationale behind assuming a leadership loss when seeing a
SUSPENDED connection is to assume the worst and to be on the safe side.

Yang Wang is correct. FLINK-10052 [1] has the goal to make the behaviour
configurable. Unfortunately, the community did not have enough time to
complete this feature.

[1] https://issues.apache.org/jira/browse/FLINK-10052

Cheers,
Till

On Tue, Apr 13, 2021 at 8:25 AM Yang Wang  wrote:

> This might be related with FLINK-10052[1].
> Unfortunately, we do not have any progress on this ticket.
>
> cc @Till Rohrmann 
>
> Best,
> Yang
>
> chenqin  于2021年4月13日周二 上午7:31写道:
>
>> Hi there,
>>
>> We observed several 1.11 job running in 1.11 restart due to job leader
>> lost.
>> Dig deeper, the issue seems related to SUSPENDED state handler in
>> ZooKeeperLeaderRetrievalService.
>>
>> ASFAIK, suspended state is expected when zk is not certain if leader is
>> still alive. It can follow up with RECONNECT or LOST. In current
>> implementation [1] , we treat suspended state same as lost state and
>> actively shutdown job. This pose stability issue on large HA setting.
>>
>> My question is can we get some insight behind this decision and could we
>> add
>> some tunable configuration for user to decide how long they can endure
>> such
>> uncertain suspended state in their jobs.
>>
>> Thanks,
>> Chen
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L201
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>>
>


[jira] [Created] (FLINK-22253) Update backpressure monitoring documentation

2021-04-13 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22253:


 Summary: Update backpressure monitoring documentation
 Key: FLINK-22253
 URL: https://issues.apache.org/jira/browse/FLINK-22253
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Network
Affects Versions: 1.13.0
Reporter: Dawid Wysakowicz


We should update the page: 
[https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/back_pressure/]

with the new backpressure monitoring mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Flink should rely on flink-shaded SNAPSHOT artifacts

2021-04-13 Thread Robert Metzger
Thanks a lot for your responses.

I didn't know that you can explicitly refer to the timestamped snapshots of
the artifacts. The limitation to the last 2 snapshots means that a push to
flink-shaded can break our main CI? This sounds very fragile to me, given
that the setup itself is probably a bit uncommon and difficult to
understand.

Maybe we should add an automated check to flink-shaded that warns if a PR
would break Flink's CI? (by checking out flink and scanning the poms for
references to a timestamp-to-be-deleted)
Or we ask Infra to keep more than two snapshots for flink-shaded?



On Mon, Apr 12, 2021 at 4:41 PM Chesnay Schepler  wrote:

> a) yes.
> b) maven by default adds a timestamp to snapshot artifacts that we can
> use. The apache repository retains the last 2 snapshots, so we do need
> to keep things in sync a fair bit, but there are rarely commits made in
> flink-shaded that I don't think this will be a problem.
> c) a-SNAPSHOT-uniquesuffix => a.0
>
> On 4/12/2021 3:07 PM, Robert Metzger wrote:
> > Thanks a lot for your proposal, I'm generally open to the idea
> >
> > I have a few questions:
> > a) Does this mean that we are changing flink-shaded to deploy snapshot
> > artifacts to Apache's snapshot maven repository, and change Flink's
> parent
> > pom to point to this snapshot repo?
> > b) How do you plan to generate the unique SNAPSHOT version on CI? Will we
> > increment the version on every push to flink-shaded:master ?
> > c) How do the unique SNAPSHOT versions relate to the final release
> versions?
> >
> >
> >
> >
> > On Mon, Apr 12, 2021 at 1:48 PM Konstantin Knauf 
> wrote:
> >
> >> Sounds good. +1
> >>
> >> On Mon, Apr 12, 2021 at 1:23 PM Chesnay Schepler 
> >> wrote:
> >>
> >>> Hello all,
> >>>
> >>> I would like to propose a change in how the Flink master interacts with
> >>> Flink-shaded.
> >>>
> >>> TL;DR: Release snapshot artifacts for flink-shaded, and have the Flink
> >>> master rely on specific snapshot versions for earlier dependency bumps.
> >>>
> >>>
> >>> Aa a project we have come to the general conclusion that dependencies
> >>> should be bumped as early in the release cycle as possible. This both
> >>> prevents cases where some undefined amount of work is still waiting for
> >>> as when we want to release the next version (working against the goal
> of
> >>> always being in a releasable state), and it gives us more time to
> >>> evaluate the stability and performance of system. Finally it gives us
> >>> ample time to look for alternatives if an issue is found.
> >>>
> >>> Currently, this conclusion is at odds with how we handle flink-shaded.
> >>> Flink has always relied on flink-shaded artifacts that went through a
> >>> proper release cycle. However, since we want to create as few releases
> >>> as possible due to the overhead/noise/etc., flink-shaded releases are
> >>> typically relegated to the end of the release cycle.
> >>> This is particularly troublesome since flink-shaded dependencies are
> >>> used in the core of Flink, and hence usage of them cannot be avoided.
> >>>
> >>> As a compromise between these 2 goals I propose the following:
> >>> - we deploy SNAPSHOT artifacts for flink-shaded for every change made
> >>> - every deployed artifact has a unique version, that is automatically
> >>> set via maven (=> no overhead on our side)
> >>> - once such an artifact is released we update the Flink dependency to
> >>> point to this _specific_ flink-shaded snapshot artifact
> >>>   - to be clear, this is a manual step, which implies that things
> >>> cannot break all of a sudden because something was pushed to
> flink-shaded
> >>> - once the Flink release cycle ends, we publish a proper flink-shaded
> >>> release, and change the Flink dependency in the release branch
> >> accordingly
> >>> This should give us the best of both worlds: We have as few releases as
> >>> necessary (at most 1 per Flink release cycle), but can update the
> >>> dependencies in Flink as soon as possible.
> >>> Furthermore, this can also be considered a test run for how multiple
> >>> repos with the same release cycle could be developed in sync with each
> >>> other.
> >>>
> >>> Let me know what you think.
> >>>
> >>> Regards,
> >>>
> >>> Chesnay
> >>>
> >>>
> >> --
> >>
> >> Konstantin Knauf
> >>
> >> https://twitter.com/snntrable
> >>
> >> https://github.com/knaufk
> >>
>
>


[jira] [Created] (FLINK-22252) Backquotes are not rendered correctly in config option descriptions

2021-04-13 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-22252:
--

 Summary: Backquotes are not rendered correctly in config option 
descriptions
 Key: FLINK-22252
 URL: https://issues.apache.org/jira/browse/FLINK-22252
 Project: Flink
  Issue Type: Task
  Components: Documentation, Runtime / Configuration
Affects Versions: 1.13.0
Reporter: Robert Metzger
 Fix For: 1.13.0


This is how the config options are rendered in the 1.13 docs:
{code}
`none`, `off`, `disable`: No restart strategy.
`fixeddelay`, `fixed-delay`: Fixed delay restart strategy. More details can be 
found here.
`failurerate`, `failure-rate`: Failure rate restart strategy. More details can 
be found here.
{code}.
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#state-backend

Here's the rendering in the old docs.
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#restart-strategy



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Automatic backpressure detection

2021-04-13 Thread Piotr Nowojski
Hi,

Yes. Back-pressure from AsyncOperator should be correctly reported via
isBackPressured, backPressuredMsPerSecond metrics and by extension in the
WebUI from 1.13.

Piotre

pon., 12 kwi 2021 o 23:17 Lu Niu  napisał(a):

> Hi, Piotr
>
> Thanks for your detailed reply! It is mentioned here we cannot observe
> backpressure generated from  AsyncOperator in Flink UI in 1.9.1. Is it
> fixed in the latest version? Thank you!
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Function-Not-Generating-Backpressure-td26766.html
>
> Best
> Lu
>
> On Tue, Apr 6, 2021 at 11:14 PM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > Yes, you can use `isBackPressured` to monitor a task's back-pressure.
> > However keep in mind:
> > a) You are going to miss some nice way to visualize this information,
> which
> > is present in 1.13's WebUI.
> > b) `isBackPressured` is a sampling based metric. If your job has varying
> > load, for example all windows firing at the same processing time, every
> > couple of seconds, causing intermittent back-pressure, this metric will
> > show it randomly as `true` or `false`.
> > c) `isBackPressured` is slightly less accurate compared to
> > `backPressuredTimeMsPerSecond`. There are some corner cases when for a
> > brief amount of time it can return `true`, while a task is still running,
> > while the time based metrics work in a different much more accurate way.
> >
> > About back porting the patches, if you want to create a custom Flink
> build
> > it should be do-able. There will be some conflicts for sure, so you will
> > need to understand Flink's code.
> >
> > Best,
> > Piotrek
> >
> > śr., 7 kwi 2021 o 02:32 Lu Niu  napisał(a):
> >
> > > Hi, Piotr
> > >
> > > Thanks for replying!
> > >
> > > We don't have a plan to upgrade to 1.13 in short term. We are using
> flink
> > > 1.11 and I notice there is a metric called isBackpressured. Is that
> > enough
> > > to solve 1? If not, would backporting patches regarding
> > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> idleTimeMsPerSecond
> > > work? And do you have an estimate of how difficult it is?
> > >
> > >
> > > Best
> > > Lu
> > >
> > >
> > >
> > > On Tue, Apr 6, 2021 at 12:18 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Lately we overhauled the backpressure detection [1] and a screenshot
> > > > preview of those efforts is attached here [2]. I encourage you to
> check
> > > the
> > > > 1.13 RC0 build and how the current mechanism works for you [3]. To
> > > support
> > > > those WebUI changes we have added a couple of new metrics:
> > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> > > idleTimeMsPerSecond.
> > > >
> > > > 1. I believe that solves 1.
> > > > 2. This still requires a bit of manual investigation. Once you locate
> > > > backpressuring task, you can check the detail subtask stats to check
> if
> > > all
> > > > parallel instances are uniformly backpressured/busy or not. If you
> > would
> > > > like to add a hint "it looks like you have a data skew in Task XYZ ",
> > > that
> > > > I believe could be added to the WebUI.
> > > > 3. The tricky part is how to display this kind of information.
> > Currently
> > > I
> > > > would recommend just export/report
> > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> > idleTimeMsPerSecond
> > > > metrics for every task to an external system and  display them for
> > > example
> > > > in Graphana.
> > > >
> > > > The blog post you are referencing is quite outdated, especially with
> > > those
> > > > new changes from 1.13. I'm hoping to write a new one pretty soon.
> > > >
> > > > Piotrek
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-14712
> > > > [2]
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-14814?focusedCommentId=17256926=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17256926
> > > > [3]
> > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/202104.mbox/%3c1d2412ce-d4d0-ed50-6181-1b610e16d...@apache.org%3E
> > > >
> > > > pon., 5 kwi 2021 o 23:20 Lu Niu  napisał(a):
> > > >
> > > > > Hi, Flink dev
> > > > >
> > > > > Lately, we want to develop some tools to:
> > > > > 1. show backpressure operator without manual operation
> > > > > 2. Provide suggestions to mitigate back pressure after checking
> data
> > > > skew,
> > > > > external service RPC etc.
> > > > > 3. Show back pressure history
> > > > >
> > > > > Could anyone share their experience with such tooling?
> > > > > Also, I notice backpressure monitoring and detection is mentioned
> > > across
> > > > > multiple places. Could someone help to explain how these connect to
> > > each
> > > > > other? Maybe some of them are outdated? Thanks!
> > > > >
> > > > > 1. The official doc introduces monitoring back pressure through web
> > UI.
> > > > >
> > > > >
> > > >
> > >
> >
> 

Re: handle SUSPENDED in ZooKeeperLeaderRetrievalService

2021-04-13 Thread Yang Wang
This might be related with FLINK-10052[1].
Unfortunately, we do not have any progress on this ticket.

cc @Till Rohrmann 

Best,
Yang

chenqin  于2021年4月13日周二 上午7:31写道:

> Hi there,
>
> We observed several 1.11 job running in 1.11 restart due to job leader
> lost.
> Dig deeper, the issue seems related to SUSPENDED state handler in
> ZooKeeperLeaderRetrievalService.
>
> ASFAIK, suspended state is expected when zk is not certain if leader is
> still alive. It can follow up with RECONNECT or LOST. In current
> implementation [1] , we treat suspended state same as lost state and
> actively shutdown job. This pose stability issue on large HA setting.
>
> My question is can we get some insight behind this decision and could we
> add
> some tunable configuration for user to decide how long they can endure such
> uncertain suspended state in their jobs.
>
> Thanks,
> Chen
>
> [1]
>
> https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L201
>
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


[jira] [Created] (FLINK-22251) PrometheusReporterEndToEndITCase.testReporter fail because Dispatcher REST endpoint did not start in time.

2021-04-13 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-22251:
-

 Summary: PrometheusReporterEndToEndITCase.testReporter fail 
because Dispatcher REST endpoint did not start in time.
 Key: FLINK-22251
 URL: https://issues.apache.org/jira/browse/FLINK-22251
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.12.2
Reporter: Guowei Ma


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16363=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529=26526


{code:java}


Apr 12 12:48:58 [ERROR] Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 114.355 s <<< FAILURE! - in 
org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase
Apr 12 12:48:58 [ERROR] testReporter[0: Jar in 'lib', instantiated via 
reflection](org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase)
  Time elapsed: 55.061 s  <<< FAILURE!
Apr 12 12:48:58 java.lang.AssertionError: Dispatcher REST endpoint did not 
start in time.
Apr 12 12:48:58 at 
org.apache.flink.tests.util.flink.FlinkDistribution.startFlinkCluster(FlinkDistribution.java:148)
Apr 12 12:48:58 at 
org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource.startCluster(LocalStandaloneFlinkResource.java:133)
Apr 12 12:48:58 at 
org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase.testReporter(PrometheusReporterEndToEndITCase.java:231)
Apr 12 12:48:58 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Apr 12 12:48:58 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Apr 12 12:48:58 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Apr 12 12:48:58 at java.lang.reflect.Method.invoke(Method.java:498)
Apr 12 12:48:58 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
Apr 12 12:48:58 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Apr 12 12:48:58 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
Apr 12 12:48:58 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Apr 12 12:48:58 at 
org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
Apr 12 12:48:58 at 
org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
Apr 12 12:48:58 at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
Apr 12 12:48:58 at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
Apr 12 12:48:58 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
Apr 12 12:48:58 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
Apr 12 12:48:58 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
Apr 12 12:48:58 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
Apr 12 12:48:58 at 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
Apr 12 12:48:58 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
Apr 12 12:48:58 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
Apr 12 12:48:58 at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
Apr 12 12:48:58 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
Apr 12 12:48:58 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)


{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)