[ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-03 Thread Xintong Song
Dear Community,

I'm pleased to share this good news with everyone. As some of you may have
already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1].

"Apache Flink greatly expanded the use of stream data-processing." --
SIGMOD Awards Committee

SIGMOD is one of the most influential data management research conferences
in the world. The Systems Award is awarded to an individual or set of
individuals to recognize the development of a software or hardware system
whose technical contributions have had significant impact on the theory or
practice of large-scale data management systems. Winning of the award
indicates the high recognition of Flink's technological advancement and
industry influence from academia.

As an open-source project, Flink wouldn't have come this far without the
wide, active and supportive community behind it. Kudos to all of us who
helped make this happen, including the over 1,400 contributors and many
others who contributed in ways beyond code.

Best,

Xintong (on behalf of the Flink PMC)


[1] https://sigmod.org/2023-sigmod-systems-award/


Re: [DISCUSS] Issue tracking workflow

2022-10-24 Thread Xintong Song
I agree with you that option 1) would be the best for us. Let's keep hoping
for the best.

Option 4), as you said, comes with prices. At the moment, I don't have
thorough answers to your questions.

Just one quick response, I think there's a good chance that we can import
current Jira tickets into GH. Jira supports exporting issues with fields
that you specified as CSV/XML/... files. With a brief google search, I
found some tools that help bulk creating issues in GH. E.g.,
github-csv-tools [1] is described to support importing issues with title,
body, labels, status and milestones from a CSV file. That's probably good
enough for us to search/filter the issues in GH, and a link to the Jira
ticket can be posted in the GH issue for complete conversation history and
other details.

Best,

Xintong


[1] https://github.com/gavinr/github-csv-tools



On Mon, Oct 24, 2022 at 3:58 PM Martijn Visser 
wrote:

> Hi Xintong,
>
> I'm also not in favour of option 2, I think that two systems will result
> in an administrative burden and less-efficient workflow. I'm also not in
> favour of option 3, I think that this will result in first time
> users/contributors in not-filling their first bug report, user question or
> feature request.
>
> I'm still hoping for option 1 while the discussion is not finished with
> Infra.
>
> If we assume that option 1 won't be possible, then I think option 4 will
> be the best-option-left. I'm not necessarily in favour, because of a number
> of problems it will introduce:
>
> 1. I don't think importing current Jira tickets into Github Issues is a
> realistic option. So we would have two administrations. Before you create a
> new ticket, you should check if it exists both as a Jira ticket and as a
> Github Issue.
> 2. How would we deal with completing a PR? There must be one
> administration leading for the changelog generation (to avoid that you're
> missing an item), which could then only be Github Issues. So would we
> require all PRs that are merged to exist as a Github Issue?
> 3. There's no longer one central administration, which is especially
> valuable to track all issues across projects like the different connectors,
> Flink ML, Table Store etc.
> 4. Our current CI labeling works on the Jira issues, not on the Github
> Issues labels.
>
> Best regards,
>
> Martijn
>
> On Mon, Oct 24, 2022 at 7:29 AM Xintong Song 
> wrote:
>
>> Hi devs and users,
>>
>> As many of you may have already noticed, Infra announced that they will
>> soon disable public Jira account signups [1]. That means, in order for
>> someone who is not yet a Jira user to open or comment on an issue, he/she
>> has to first reach out to a PMC member to create an account for him/her.
>> This raises the bar for new contributors and users to participate in
>> community interactions, making it necessary for us to consider whether (and
>> how) we should change our issue tracking workflows.
>>
>> I can see a few possible options.
>>
>> 1. Reaching out to Infra and trying to change their mind on this
>> decision. I’ve already been trying this [2], and so far the feedback seems
>> unoptimistic.
>> 2. Using both Jira (for development issues) & Github Issues (for
>> customer-facing issues), as Infra suggested.
>> 3. Stay with using Jira only, so that new Jira users need to ask on the
>> mailing lists / Slack for creating accounts.
>> 4. Migrating to Github Issues completely.
>>
>> Personally, I’m leaning toward option 4).
>>
>> TBH, I don’t see any good reason for option 2). I’d expect using two
>> different issue tracking tools at the same time would be complex and
>> chaotic. Option 3) is probably more friendly to existing developers and
>> users, while being less friendly to newcomers. Option 4) on the contrary,
>> is more friendly to newcomers, at some migration cost which might be
>> non-trivial but once for all.
>>
>> Github issues have been widely used by many open source projects,
>> including Kubernetes, Flink CDC, and Apache projects Iceberg and Airflow.
>> With a set of well-designed labels, we should be able to achieve most of
>> the Jira functions / features that we currently rely on. Moreover, it
>> better integrates the issue tracking and code contributing systems, and
>> would be easier to access (I believe there’s more GH users than Jira /
>> mailing lists).
>>
>> All in all, I’d suggest to keep monitoring Infra’s feedback on option 1),
>> while taking steps (investigation, workflow & label design) preparing for
>> option 4).
>>
>> Looking forward to hearing what you think about this.
>>
>> Best,
>>
>> Xintong
>>
>>
>> [1] https://lists.apache.org/thread/jx9d7sp690ro660pjpttwtg209w3m39w
>>
>> [2] https://lists.apache.org/thread/fjjtk30dxf6fyoo4q3rmkhh028or40fw
>>
>>


[DISCUSS] Issue tracking workflow

2022-10-23 Thread Xintong Song
Hi devs and users,

As many of you may have already noticed, Infra announced that they will
soon disable public Jira account signups [1]. That means, in order for
someone who is not yet a Jira user to open or comment on an issue, he/she
has to first reach out to a PMC member to create an account for him/her.
This raises the bar for new contributors and users to participate in
community interactions, making it necessary for us to consider whether (and
how) we should change our issue tracking workflows.

I can see a few possible options.

1. Reaching out to Infra and trying to change their mind on this decision.
I’ve already been trying this [2], and so far the feedback seems
unoptimistic.
2. Using both Jira (for development issues) & Github Issues (for
customer-facing issues), as Infra suggested.
3. Stay with using Jira only, so that new Jira users need to ask on the
mailing lists / Slack for creating accounts.
4. Migrating to Github Issues completely.

Personally, I’m leaning toward option 4).

TBH, I don’t see any good reason for option 2). I’d expect using two
different issue tracking tools at the same time would be complex and
chaotic. Option 3) is probably more friendly to existing developers and
users, while being less friendly to newcomers. Option 4) on the contrary,
is more friendly to newcomers, at some migration cost which might be
non-trivial but once for all.

Github issues have been widely used by many open source projects, including
Kubernetes, Flink CDC, and Apache projects Iceberg and Airflow. With a set
of well-designed labels, we should be able to achieve most of the Jira
functions / features that we currently rely on. Moreover, it better
integrates the issue tracking and code contributing systems, and would be
easier to access (I believe there’s more GH users than Jira / mailing
lists).

All in all, I’d suggest to keep monitoring Infra’s feedback on option 1),
while taking steps (investigation, workflow & label design) preparing for
option 4).

Looking forward to hearing what you think about this.

Best,

Xintong


[1] https://lists.apache.org/thread/jx9d7sp690ro660pjpttwtg209w3m39w

[2] https://lists.apache.org/thread/fjjtk30dxf6fyoo4q3rmkhh028or40fw


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-13 Thread Xintong Song
hain anymore.
>>>>> > >
>>>>> > > What I would suggest is to stick with what we got (although I
>>>>> despise the name numRecordsSend), and alias the numRecordsOut metric for
>>>>> all non-TwoPhaseCommittingSink.
>>>>> > >
>>>>> > > On 11/10/2022 05:54, Qingsheng Ren wrote:
>>>>> > >
>>>>> > > Thanks for the details Chesnay!
>>>>> > >
>>>>> > > By “alias” I mean to respect the original definition made in
>>>>> FLIP-33 for numRecordsOut, which is the number of records written to the
>>>>> external system, and keep numRecordsSend as the same value as 
>>>>> numRecordsOut
>>>>> for compatibility.
>>>>> > >
>>>>> > > I think keeping numRecordsOut for the output to the external
>>>>> system is more intuitive to end users because in most cases the metric of
>>>>> data flow output is more essential. I agree with you that a new metric is
>>>>> required, but considering compatibility and users’ intuition I prefer to
>>>>> keep the initial definition of numRecordsOut in FLIP-33 and name a new
>>>>> metric for sink writer’s output to downstream operators. This might be
>>>>> against consistency with metrics in other operators in Flink but maybe 
>>>>> it’s
>>>>> acceptable to have the sink as a special case.
>>>>> > >
>>>>> > > Best,
>>>>> > > Qingsheng
>>>>> > > On Oct 10, 2022, 19:13 +0800, Chesnay Schepler ,
>>>>> wrote:
>>>>> > >
>>>>> > > > I’m with Xintong’s idea to treat numXXXSend as an alias of
>>>>> numXXXOut
>>>>> > >
>>>>> > > But that's not possible. If it were that simple there would have
>>>>> never been a need to introduce another metric in the first place.
>>>>> > >
>>>>> > > It's a rather fundamental issue with how the new sinks work, in
>>>>> that they emit data to the external system (usually considered as
>>>>> "numRecordsOut" of sinks) while _also_ sending data to a downstream
>>>>> operator (usually considered as "numRecordsOut" of tasks).
>>>>> > > The original issue was that the numRecordsOut of the sink counted
>>>>> both (which is completely wrong).
>>>>> > >
>>>>> > > A new metric was always required; otherwise you inevitably end up
>>>>> breaking some semantic.
>>>>> > > Adding a new metric for what the sink writes to the external
>>>>> system is, for better or worse, more consistent with how these metrics
>>>>> usually work in Flink.
>>>>> > >
>>>>> > > On 10/10/2022 12:45, Qingsheng Ren wrote:
>>>>> > >
>>>>> > > Thanks everyone for joining the discussion!
>>>>> > >
>>>>> > > > Do you have any idea what has happened in the process here?
>>>>> > >
>>>>> > > The discussion in this PR [1] shows some details and could be
>>>>> helpful to understand the original motivation of the renaming. We do have 
>>>>> a
>>>>> test case for guarding metrics but unfortunaly the case was also modified
>>>>> so the defense was broken.
>>>>> > >
>>>>> > > I think the reason why both the developer and the reviewer forgot
>>>>> to trigger an discussion and gave a green pass on the change is that
>>>>> metrics are quite “trivial” to be noticed as public APIs. As mentioned by
>>>>> Martijn I couldn’t find a place noting that metrics are public APIs and
>>>>> should be treated carefully while contributing and reviewing.
>>>>> > >
>>>>> > > IMHO three actions could be made to prevent this kind of changes
>>>>> in the future:
>>>>> > >
>>>>> > > a. Add test case for metrics (which we already have in
>>>>> SinkMetricsITCase)
>>>>> > > b. We emphasize that any public-interface breaking changes should
>>>>> be proposed by a FLIP or discussed in mailing list, and should be listed 
>>>>> in
>>>>> the release note.
>>>>&g

Re: Is Flink 1.15.3 planned in foreseeable future?

2022-10-13 Thread Xintong Song
Actually, this is an on-going discussion related to 1.15.3. The community
discovered a breaking change in 1.15.x and is discussing how to resolve
this right now [1]. There is very likely a 1.15.3 release after this is
resolved.

Best,

Xintong


[1] https://lists.apache.org/thread/vxhty3q97s7pw2zn0jhkyd6sxwwodzbv

On Thu, Oct 13, 2022 at 3:51 PM Maciek Próchniak  wrote:

> Hello,
>
> I suppose that committers are heavily concentrated on 1.16, but are
> there plans to have 1.15.3 out?
>
> We've been affected by https://issues.apache.org/jira/browse/FLINK-28488
> and it's preventing us from using 1.15.x at this moment.
>
>
> thanks,
>
> maciek
>
>


Re: Job Manager getting restarted while restarting task manager

2022-10-12 Thread Xintong Song
I meant your jobmanager also received a SIGTERM signal, and you would need
to figure out where it comes from.

To be specific, this line of log:

> 2022-10-11 22:11:21,683 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.
>

I believe this is from the jobmanager log, as `ClusterEntrypoint` is a
class used by jobmanager only.

Best,

Xintong



On Thu, Oct 13, 2022 at 9:06 AM yu'an huang  wrote:

> Hi,
>
> Which deployment mode do you use? What is the Flink version?
> I think killing TaskManagers won't make the JobMananger restart. You can
> provide the whole log as an attachment to investigate.
>
> On Wed, 12 Oct 2022 at 6:01 PM, Puneet Duggal 
> wrote:
>
>> Hi Xintong Song,
>>
>> Thanks for your immediate reply. Yes, I do restart task manager via kill
>> command and then flink restart because I have seen cases where simple flink
>> restart does not pickup the latest configuration. But what I am confused
>> about is why killing the task manager process and then restarting it is
>> causing the job manager to stop and restart.
>>
>> Regards,
>> Puneet
>>
>>
>> On 12-Oct-2022, at 7:33 AM, Xintong Song  wrote:
>>
>> The log shows that the jobmanager received a SIGTERM signal from
>> external. Depending on how you deploy Flink, that could be a 'kill '
>> command, or a kubernetes pod removal / eviction, etc. You may want to check
>> where the signal came from.
>>
>> Best,
>> Xintong
>>
>>
>>
>> On Wed, Oct 12, 2022 at 6:26 AM Puneet Duggal 
>> wrote:
>>
>>> Hi,
>>>
>>> I am facing an issue where when restarting task manager after adding
>>> some configuration changes, even though task manager restarts successfully
>>> with the updated configuration change, is causing the leader job manager to
>>> restart as well. Pasting the leader job manager logs here
>>>
>>>
>>> 2022-10-11 22:11:02,207 WARN  akka.remote.ReliableDeliverySupervisor
>>>[] - Association with remote system [
>>> akka.tcp://flink@:35376] has failed, address is now gated for
>>> [50] ms. Reason: [Disassociated]
>>> 2022-10-11 22:11:02,411 WARN
>>> akka.remote.transport.netty.NettyTransport   [] - Remote
>>> connection to [null] failed with java.net.ConnectException: Connection
>>> refused: /:35376
>>> 2022-10-11 22:11:02,413 WARN  akka.remote.ReliableDeliverySupervisor
>>>[] - Association with remote system [
>>> akka.tcp://flink@:35376] has failed, address is now gated for
>>> [50] ms. Reason: [Association failed with [
>>> akka.tcp://flink@:35376]] Caused by: [java.net.ConnectException:
>>> Connection refused: /:35376]
>>> 2022-10-11 22:11:02,682 WARN
>>> akka.remote.transport.netty.NettyTransport   [] - Remote
>>> connection to [null] failed with java.net.ConnectException: Connection
>>> refused: /:35376
>>> 2022-10-11 22:11:02,683 WARN  akka.remote.ReliableDeliverySupervisor
>>>[] - Association with remote system [
>>> akka.tcp://flink@:35376] has failed, address is now gated for
>>> [50] ms. Reason: [Association failed with [
>>> akka.tcp://flink@:35376]] Caused by: [java.net.ConnectException:
>>> Connection refused: /:35376]
>>> 2022-10-11 22:11:12,702 WARN
>>> akka.remote.transport.netty.NettyTransport   [] - Remote
>>> connection to [null] failed with java.net.ConnectException: Connection
>>> refused: /:35376
>>> 2022-10-11 22:11:12,703 WARN  akka.remote.ReliableDeliverySupervisor
>>>[] - Association with remote system [
>>> akka.tcp://flink@:35376] has failed, address is now gated for
>>> [50] ms. Reason: [Association failed with [
>>> akka.tcp://flink@:35376]] Caused by: [java.net.ConnectException:
>>> Connection refused: /:35376]
>>> 2022-10-11 22:11:21,683 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED
>>> SIGNAL 15: SIGTERM. Shutting down as requested.
>>> 2022-10-11 22:11:21,687 INFO  org.apache.flink.runtime.blob.BlobServer
>>>[] - Stopped BLOB server at 0.0.0.0:33887
>>>
>>>
>>> Regards,
>>> Puneet
>>>
>>>
>>>
>>


Re: Job Manager getting restarted while restarting task manager

2022-10-11 Thread Xintong Song
The log shows that the jobmanager received a SIGTERM signal from external.
Depending on how you deploy Flink, that could be a 'kill ' command, or
a kubernetes pod removal / eviction, etc. You may want to check where the
signal came from.

Best,

Xintong



On Wed, Oct 12, 2022 at 6:26 AM Puneet Duggal 
wrote:

> Hi,
>
> I am facing an issue where when restarting task manager after adding some
> configuration changes, even though task manager restarts successfully with
> the updated configuration change, is causing the leader job manager to
> restart as well. Pasting the leader job manager logs here
>
>
> 2022-10-11 22:11:02,207 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system 
> [akka.tcp://flink@:35376]
> has failed, address is now gated for [50] ms. Reason: [Disassociated]
> 2022-10-11 22:11:02,411 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: /:35376
> 2022-10-11 22:11:02,413 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system 
> [akka.tcp://flink@:35376]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@:35376]] Caused by:
> [java.net.ConnectException: Connection refused: /:35376]
> 2022-10-11 22:11:02,682 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: /:35376
> 2022-10-11 22:11:02,683 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system 
> [akka.tcp://flink@:35376]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@:35376]] Caused by:
> [java.net.ConnectException: Connection refused: /:35376]
> 2022-10-11 22:11:12,702 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: /:35376
> 2022-10-11 22:11:12,703 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system 
> [akka.tcp://flink@:35376]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@:35376]] Caused by:
> [java.net.ConnectException: Connection refused: /:35376]
> 2022-10-11 22:11:21,683 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2022-10-11 22:11:21,687 INFO  org.apache.flink.runtime.blob.BlobServer
>  [] - Stopped BLOB server at 0.0.0.0:33887
>
>
> Regards,
> Puneet
>
>
>


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Xintong Song
+1 for reverting these changes in Flink 1.16.

For 1.15.3, can we make these metrics available via both names (numXXXOut
and numXXXSend)? In this way we don't break it for those who already
migrated to 1.15 and numXXXSend. That means we still need to change
SinkWriterOperator to use another metric name in 1.15.3, which IIUC is
internal to Flink sink.

I'm overall +1 to change numXXXOut back to its original semantics. AFAIK
(from meetup / flink-forward questionaires), most users do not migrate to a
new Flink release immediately, until the next 1-2 major releases are out.

Best,

Xintong



On Mon, Oct 10, 2022 at 5:26 PM Martijn Visser 
wrote:

> Hi Qingsheng,
>
> Do you have any idea what has happened in the process here? Do we know why
> they were changed? I was under the impression that these metric names were
> newly introduced due to the new interfaces and because it still depends on
> each connector implementing these.
>
> Sidenote: metric names are not mentioned in the FLIP process as a public
> API. Might make sense to have a separate follow-up to add that to the list
> (I do think we should list them there).
>
> +1 for reverting this and make this change in Flink 1.16
>
> I'm not in favour of releasing a Flink 1.15.3 with this change: I think the
> impact is too big for a patch version, especially given how long Flink 1.15
> is already out there.
>
> Best regards,
>
> Martijn
>
> On Mon, Oct 10, 2022 at 11:13 AM Leonard Xu  wrote:
>
> > Thanks Qingsheng for starting this thread.
> >
> > +1 on reverting sink metric name and releasing 1.15.3 to fix this
> > inconsistent behavior.
> >
> >
> > Best,
> > Leonard
> >
> >
> >
> >
> >
> > 2022年10月10日 下午3:06,Jark Wu  写道:
> >
> > Thanks for discovering this problem, Qingsheng!
> >
> > I'm also +1 for reverting the breaking changes.
> >
> > IIUC, currently, the behavior of "numXXXOut" metrics of the new and old
> > sink is inconsistent.
> > We have to break one of them to have consistent behavior. Sink V2 is an
> > evolving API which is just introduced in 1.15.
> > I think it makes sense to break the unstable API instead of the stable
> API
> > which many connectors and users depend on.
> >
> > Best,
> > Jark
> >
> >
> >
> > On Mon, 10 Oct 2022 at 11:36, Jingsong Li 
> wrote:
> >
> >> Thanks for driving, Qingsheng.
> >>
> >> +1 for reverting sink metric name.
> >>
> >> We often forget that metric is also one of the important APIs.
> >>
> >> +1 for releasing 1.15.3 to fix this.
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Sun, Oct 9, 2022 at 11:35 PM Becket Qin 
> wrote:
> >> >
> >> > Thanks for raising the discussion, Qingsheng,
> >> >
> >> > +1 on reverting the breaking changes.
> >> >
> >> > In addition, we might want to release a 1.15.3 to fix this and update
> >> the previous release docs with this known issue, so that users can
> upgrade
> >> to 1.15.3 when they hit it. It would also be good to add some backwards
> >> compatibility tests on metrics to avoid unintended breaking changes like
> >> this in the future.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> > On Sun, Oct 9, 2022 at 10:35 AM Qingsheng Ren 
> wrote:
> >> >>
> >> >> Hi devs and users,
> >> >>
> >> >> I’d like to start a discussion about reverting a breaking change
> about
> >> sink metrics made in 1.15 by FLINK-26126 [1] and FLINK-26492 [2].
> >> >>
> >> >> TL;DR
> >> >>
> >> >> All sink metrics with name “numXXXOut” defined in FLIP-33 are replace
> >> by “numXXXSend” in FLINK-26126 and FLINK-26492. Considering metric names
> >> are public APIs, this is a breaking change to end users and not backward
> >> compatible. Also unfortunately this breaking change was not discussed in
> >> the mailing list before.
> >> >>
> >> >> Background
> >> >>
> >> >> As defined previously in FLIP-33 (the FLIP page has been changed so
> >> please refer to the old version [3] ), metric “numRecordsOut” is used
> for
> >> reporting the total number of output records since the sink started
> (number
> >> of records written to the external system), and similarly for
> >> “numRecordsOutPerSecond”, “numBytesOut”, “numBytesOutPerSecond” and
> >> “numRecordsOutError”. Most sinks are following this naming and
> definition.
> >> However, these metrics are ambiguous in the new Sink API as “numXXXOut”
> >> could be used by the output of SinkWriterOperator for reporting number
> of
> >> Committables delivered to SinkCommitterOperator. In order to resolve the
> >> conflict, FLINK-26126 and FLINK-26492 changed names of these metrics
> with
> >> “numXXXSend”.
> >> >>
> >> >> Necessity of reverting this change
> >> >>
> >> >> - Metric names are actually public API, as end users need to
> configure
> >> metric collecting and alerting system with metric names. Users have to
> >> reset all configurations related to affected metrics.
> >> >> - This could also affect custom and external sinks not maintained by
> >> Flink, which might have implemented with numXXXOut metrics.
> >> >> - The number of records 

Re: Flink TaskManager memory configuration failed

2022-06-22 Thread Xintong Song
512mb is just too small for a TaskManager. You would need to either
increase it, or decrease the other memory components (which currently use
default values).

The 64mb Total Flink Memory comes from the 512mb Total Process Memory minus
192mb minimum JVM Overhead and 256mb default JVM Metaspace.

Best,

Xintong



On Thu, Jun 23, 2022 at 1:08 PM yu'an huang  wrote:

> Hi John,
>
> May I know what is your Flink version you are trying?
>
>
>
> On Thu, 23 Jun 2022 at 3:43 AM, John Tipper 
> wrote:
>
>> Hi all,
>>
>> I'm wanting to run quite a number of PyFlink jobs on Kubernetes, where
>> the amount of state and number of events being processed is small and
>> therefore I'd like to use as little memory in my clusters as possible so I
>> can bin pack most efficiently. I'm running a Flink cluster per job. I'm
>> currently trying to see how small I can make the memory settings.
>>
>> I set taskmanager.memory.process.size: 512mb in my Flink config. The
>> container is being started with requested memory of 512Mi.
>>
>>  When my TaskManager starts up, I get an error message:
>>
>> IllegalConfigurationException: Sum of configured Framework Heap Memory
>> (128mb), Framework Off-Heap Memory (128mb), Task Off-Heap Memory (0
>> bytes), Managed Memory (25.6mb) and Network Memory (64mb) exceed configured
>> Total Flink Memory (64mb).
>>
>>
>> My understanding of the docs (
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/)
>> is that I should just need to set taskmanager.memory.process.size. Where
>> is the 64mb figure coming from which makes up the Total Flink Memory? How
>> do I change this?
>>
>> Many thanks,
>>
>> John
>>
>


[ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Xintong Song
Hi everyone,

I'm very happy to announce that the Apache Flink community has created a
dedicated Slack workspace [1]. Welcome to join us on Slack.

## Join the Slack workspace

You can join the Slack workspace by either of the following two ways:
1. Click the invitation link posted on the project website [2].
2. Ask anyone who already joined the Slack workspace to invite you.

We recommend 2), if available. Due to Slack limitations, the invitation
link in 1) expires and needs manual updates after every 100 invites. If it
is expired, please reach out to the dev / user mailing lists.

## Community rules

When using the community Slack workspace, please follow these community
rules:
* *Be respectful* - This is the most important rule!
* All important decisions and conclusions *must be reflected back to the
mailing lists*. "If it didn’t happen on a mailing list, it didn’t happen."
- The Apache Mottos [3]
* Use *Slack threads* to keep parallel conversations from overwhelming a
channel.
* Please *do not direct message* people for troubleshooting, Jira assigning
and PR review. These should be picked-up voluntarily.


## Maintenance


Committers can refer to this wiki page [4] for information needed for
maintaining the Slack workspace.


Thanks Jark, Martijn and Robert for helping setting up the Slack workspace.


Best,

Xintong


[1] https://apache-flink.slack.com/

[2] https://flink.apache.org/community.html#slack

[3] http://theapacheway.com/on-list/

[4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management


[ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Xintong Song
Hi everyone,

I'm very happy to announce that the Apache Flink community has created a
dedicated Slack workspace [1]. Welcome to join us on Slack.

## Join the Slack workspace

You can join the Slack workspace by either of the following two ways:
1. Click the invitation link posted on the project website [2].
2. Ask anyone who already joined the Slack workspace to invite you.

We recommend 2), if available. Due to Slack limitations, the invitation
link in 1) expires and needs manual updates after every 100 invites. If it
is expired, please reach out to the dev / user mailing lists.

## Community rules

When using the community Slack workspace, please follow these community
rules:
* *Be respectful* - This is the most important rule!
* All important decisions and conclusions *must be reflected back to the
mailing lists*. "If it didn’t happen on a mailing list, it didn’t happen."
- The Apache Mottos [3]
* Use *Slack threads* to keep parallel conversations from overwhelming a
channel.
* Please *do not direct message* people for troubleshooting, Jira assigning
and PR review. These should be picked-up voluntarily.


## Maintenance


Committers can refer to this wiki page [4] for information needed for
maintaining the Slack workspace.


Thanks Jark, Martijn and Robert for helping setting up the Slack workspace.


Best,

Xintong


[1] https://apache-flink.slack.com/

[2] https://flink.apache.org/community.html#slack

[3] http://theapacheway.com/on-list/

[4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-11 Thread Xintong Song
>
> To make some progress, maybe we decide on chat vs forum vs none and then
> go into a deeper discussion on the implementation or is there anything
> about Slack that would be complete blocker for the implementation?
>

Sure, then I'd be +1 for chat. From my side, the initiative is more about
making communication more efficient, rather than making information easier
to find.

Thank you~

Xintong Song



On Wed, May 11, 2022 at 5:39 PM Konstantin Knauf  wrote:

> I don't think we can maintain two additional channels. Some people have
> already concerns about covering one additional channel.
>
> I think, a forum provides a better user experience than a mailing list.
> Information is structured better, you can edit messages, sign up and search
> is easier.
>
> To make some progress, maybe we decide on chat vs forum vs none and then
> go into a deeper discussion on the implementation or is there anything
> about Slack that would be complete blocker for the implementation?
>
>
>
> Am Mi., 11. Mai 2022 um 07:35 Uhr schrieb Xintong Song <
> tonysong...@gmail.com>:
>
>> I agree with Robert on reworking the "Community" and "Getting Help" pages
>> to emphasize how we position the mailing lists and Slack, and on revisiting
>> in 6-12 months.
>>
>> Concerning dedicated Apache Flink Slack vs. ASF Slack, I'm with
>> Konstantin. I'd expect it to be easier for having more channels and keeping
>> them organized, managing permissions for different roles, adding bots, etc.
>>
>> IMO, having Slack is about improving the communication efficiency when
>> you are already in a discussion, and we expect such improvement would
>> motivate users to interact more with each other. From that perspective,
>> forums are not much better than mailing lists.
>>
>> I'm also open to forums as well, but not as an alternative to Slack. I
>> definitely see how forums help in keeping information organized and easy to
>> find. However, I'm a bit concerned about the maintenance overhead. I'm not
>> very familiar with Discourse or Reddit. My impression is that they are not
>> as easy to set up and maintain as Slack.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1] https://asktug.com/
>>
>> On Tue, May 10, 2022 at 4:50 PM Konstantin Knauf 
>> wrote:
>>
>>> Thanks for starting this discussion again. I am pretty much with Timo
>>> here. Slack or Discourse as an alternative for the user community, and
>>> mailing list for the contributing, design discussion, etc. I definitely see
>>> the friction of joining a mailing list and understand if users are
>>> intimidated.
>>>
>>> I am leaning towards a forum aka Discourse over a chat aka Slack. This
>>> is about asking for help, finding information and thoughtful discussion
>>> more so than casual chatting, right? For this a forum, where it is easier
>>> to find and comment on older threads and topics just makes more sense to
>>> me. A well-done Discourse forum is much more inviting and vibrant than a
>>> mailing list. Just from a tool perspective, discourse would have the
>>> advantage of being Open Source and so we could probably self-host it on an
>>> ASF machine. [1]
>>>
>>> When it comes to Slack, I definitely see the benefit of a dedicated
>>> Apache Flink Slack compared to ASF Slack. For example, we could have more
>>> channels (e.g. look how many channels Airflow is using
>>> http://apache-airflow.slack-archives.org) and we could generally
>>> customize the experience more towards Apache Flink.  If we go for Slack,
>>> let's definitely try to archive it like Airflow did. If we do this, we
>>> don't necessarily need infinite message retention in Slack itself.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> [1] https://github.com/discourse/discourse
>>>
>>>
>>> Am Di., 10. Mai 2022 um 10:20 Uhr schrieb Timo Walther <
>>> twal...@apache.org>:
>>>
>>>> I also think that a real-time channel is long overdue. The Flink
>>>> community in China has shown that such a platform can be useful for
>>>> improving the collaboration within the community. The DingTalk channel of
>>>> 10k+ users collectively helping each other is great to see. It could also
>>>> reduce the burden from committers for answering frequently asked questions.
>>>>
>>>> Personally, I'm a mailing list fan esp. when it comes to design
>>>> discussions. In my opinion, the dev@ mailing list should d

Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-10 Thread Xintong Song
I agree with Robert on reworking the "Community" and "Getting Help" pages
to emphasize how we position the mailing lists and Slack, and on revisiting
in 6-12 months.

Concerning dedicated Apache Flink Slack vs. ASF Slack, I'm with Konstantin.
I'd expect it to be easier for having more channels and keeping them
organized, managing permissions for different roles, adding bots, etc.

IMO, having Slack is about improving the communication efficiency when you
are already in a discussion, and we expect such improvement would motivate
users to interact more with each other. From that perspective, forums are
not much better than mailing lists.

I'm also open to forums as well, but not as an alternative to Slack. I
definitely see how forums help in keeping information organized and easy to
find. However, I'm a bit concerned about the maintenance overhead. I'm not
very familiar with Discourse or Reddit. My impression is that they are not
as easy to set up and maintain as Slack.

Thank you~

Xintong Song


[1] https://asktug.com/

On Tue, May 10, 2022 at 4:50 PM Konstantin Knauf  wrote:

> Thanks for starting this discussion again. I am pretty much with Timo
> here. Slack or Discourse as an alternative for the user community, and
> mailing list for the contributing, design discussion, etc. I definitely see
> the friction of joining a mailing list and understand if users are
> intimidated.
>
> I am leaning towards a forum aka Discourse over a chat aka Slack. This is
> about asking for help, finding information and thoughtful discussion more
> so than casual chatting, right? For this a forum, where it is easier to
> find and comment on older threads and topics just makes more sense to me. A
> well-done Discourse forum is much more inviting and vibrant than a mailing
> list. Just from a tool perspective, discourse would have the advantage of
> being Open Source and so we could probably self-host it on an ASF machine.
> [1]
>
> When it comes to Slack, I definitely see the benefit of a dedicated Apache
> Flink Slack compared to ASF Slack. For example, we could have more channels
> (e.g. look how many channels Airflow is using
> http://apache-airflow.slack-archives.org) and we could generally
> customize the experience more towards Apache Flink.  If we go for Slack,
> let's definitely try to archive it like Airflow did. If we do this, we
> don't necessarily need infinite message retention in Slack itself.
>
> Cheers,
>
> Konstantin
>
> [1] https://github.com/discourse/discourse
>
>
> Am Di., 10. Mai 2022 um 10:20 Uhr schrieb Timo Walther  >:
>
>> I also think that a real-time channel is long overdue. The Flink
>> community in China has shown that such a platform can be useful for
>> improving the collaboration within the community. The DingTalk channel of
>> 10k+ users collectively helping each other is great to see. It could also
>> reduce the burden from committers for answering frequently asked questions.
>>
>> Personally, I'm a mailing list fan esp. when it comes to design
>> discussions. In my opinion, the dev@ mailing list should definitely stay
>> where and how it is. However, I understand that users might not want to
>> subscribe to a mailing list for a single question and get their mailbox
>> filled with unrelated discussions afterwards. Esp. in a company setting it
>> might not be easy to setup a dedicated email address for mailing lists and
>> setting up rules is also not convenient.
>>
>> It would be great if we could use the ASF Slack. We should find an
>> official, accessible channel. I would be open for the right tool. It might
>> make sense to also look into Discourse or even Reddit? The latter would
>> definitely be easier to index by a search engine. Discourse is actually
>> made for modern real-time forums.
>>
>> Regards,
>> Timo
>>
>>
>> Am 10.05.22 um 09:59 schrieb David Anderson:
>>
>> Thank you @Xintong Song  for sharing the
>> experience of the Flink China community.
>>
>> I'm become convinced we should give Slack a try, both for discussions
>> among the core developers, and as a place where the community can reach out
>> for help. I am in favor of using the ASF slack, as we will need a paid
>> instance for this to go well, and joining it is easy enough (took me about
>> 2 minutes). Thanks, Robert, for suggesting we go down this route.
>>
>> David
>>
>> On Tue, May 10, 2022 at 8:21 AM Robert Metzger 
>> wrote:
>>
>>> It seems that we'd have to use invite links on the Flink website for
>>> people to join our Slack (1)
>>> These links can be configured to have no time-expiration, but they will
>>> expire 

Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-07 Thread Xintong Song
Thanks all for the valuable feedback.

It seems most people are overall positive about using Slack for dev
discussions, as long as they are properly reflected back to the MLs.
- We definitely need a code of conduct that clearly specifies what people
should / should not do.
- Contributors pinging well-known reviewers /committers, I think that also
happens now on JIRA / Github. Personally, I'd understand a no-reply as a
"soft no". We may consider to also put that in the cod of conduct.

Concerning using Slack for user QAs, it seem the major concern is that, we
may end up repeatedly answering the same questions from different users,
due to lack of capacity for archiving and searching historical
conversations. TBH, I don't have a good solution for the archivability and
searchability. I investigated some tools like Zapier [1], but none of them
seems suitable for us. However, I'd like to share 2 arguments.
- The purpose of Slack is to make the communication more efficient? By
*efficient*, I mean saving time for both question askers and helpers with
instance messages, file transmissions, even voice / video calls, etc.
(Especially for cases where back and forth is needed, as David mentioned.)
It does not mean questions that do not get enough attentions on MLs are now
guaranteed to be answered immediately. We can probably put that into the
code of conduct, and kindly guide users to first search and initiate
questions on MLs.
- I'd also like to share some experience from the Flink China community. We
have 3 DingTalk groups with totally 25k members (might be less, I didn't do
deduplication), posting hundreds of messages daily. What I'm really excited
about is that, there are way more interactions between users & users than
between users & developers. Users are helping each other, sharing
experiences, sending screenshots / log files / documentations and solving
problems together. We the developers seldom get pinged, if not proactively
joined the conversations. The DingTalk groups are way more active compared
to the user-zh@ ML, which I'd attribute to the improvement of interaction
experiences. Admittedly, there are questions being repeatedly asked &
answered, but TBH I don't think that compares to the benefit of a
self-driven user community. I'd really love to see if we can bring such
success to the global English-speaking community.

Concerning StackOverFlow, it definitely worth more attention from the
community. Thanks for the suggestion / reminder, Piotr & David. I think
Slack and StackOverFlow are probably not mutual exclusive.

Thank you~

Xintong Song


[1] https://zapier.com/



On Sat, May 7, 2022 at 9:50 AM Jingsong Li  wrote:

> Most of the open source communities I know have set up their slack
> channels, such as Apache Iceberg [1], Apache Druid [2], etc.
> So I think slack can be worth trying.
>
> David is right, there are some cases that need to communicate back and
> forth, slack communication will be more effective.
>
> But back to the question, ultimately it's about whether there are
> enough core developers willing to invest time in the slack, to
> discuss, to answer questions, to communicate.
> And whether there will be enough time to reply to the mailing list and
> stackoverflow after we put in the slack (which we need to do).
>
> [1] https://iceberg.apache.org/community/#slack
> [2] https://druid.apache.org/community/
>
> On Fri, May 6, 2022 at 10:06 PM David Anderson 
> wrote:
> >
> > I have mixed feelings about this.
> >
> > I have been rather visible on stack overflow, and as a result I get a
> lot of DMs asking for help. I enjoy helping, but want to do it on a
> platform where the responses can be searched and shared.
> >
> > It is currently the case that good questions on stack overflow
> frequently go unanswered because no one with the necessary expertise takes
> the time to respond. If the Flink community has the collective energy to do
> more user outreach, more involvement on stack overflow would be a good
> place to start. Adding slack as another way for users to request help from
> those who are already actively providing support on the existing
> communication channels might just lead to burnout.
> >
> > On the other hand, there are rather rare, but very interesting cases
> where considerable back and forth is needed to figure out what's going on.
> This can happen, for example, when the requirements are unusual, or when a
> difficult to diagnose bug is involved. In these circumstances, something
> like slack is much better suited than email or stack overflow.
> >
> > David
> >
> > On Fri, May 6, 2022 at 3:04 PM Becket Qin  wrote:
> >>
> >> Thanks for the proposal, Xintong.
> >>
> >> While I share the same concerns as those mentioned in the previous
> discussion

Fwd: [Discuss] Creating an Apache Flink slack workspace

2022-05-06 Thread Xintong Song
Thank you~

Xintong Song



-- Forwarded message -
From: Xintong Song 
Date: Fri, May 6, 2022 at 5:07 PM
Subject: Re: [Discuss] Creating an Apache Flink slack workspace
To: private 
Cc: Chesnay Schepler 


Hi Chesnay,

Correct me if I'm wrong, I don't find this is *repeatedly* discussed on the
ML. The only discussions I find are [1] & [2], which are 4 years ago. On
the other hand, I do find many users are asking questions about whether
Slack should be supported [2][3][4]. Besides, I also find a recent
discussion thread from ComDev [5], where alternative communication channels
are being discussed. It seems to me ASF is quite open to having such
additional channels and they have been worked well for many projects
already.

I see two reasons for brining this discussion again:
1. There are indeed many things that have change during the past 4 years.
We have more contributors, including committers and PMC members, and even
more users from various organizations and timezones. That also means more
discussions and Q are happening.
2. The proposal here is different from the previous discussion. Instead of
maintaining a channel for Flink in the ASF workspace, here we are proposing
to create a dedicated Apache Flink slack workspace. And instead of *moving*
the discussion to Slack, we are proposing to add a Slack Workspace as an
addition to the ML.

Below is your opinions that I found from your previous -1 [1]. IIUR, these
are all about the using ASF Slack Workspace. If I overlooked anything,
please let me know.

> 1. According to INFRA-14292 <
> https://issues.apache.org/jira/browse/INFRA-14292> the ASF Slack isn't
> run by the ASF. This alone puts this service into rather questionable
> territory as it /looks/ like an official ASF service. If anyone can provide
> information to the contrary, please do so.

2. We already discuss things on the mailing lists, JIRA and GitHub. All of
> these are available to the public, whereas the slack channel requires an
> @apache mail address, i.e. you have to be a committer. This minimizes the
> target audience rather significantly. I would much rather prefer something
> that is also available to contributors.


I do agree this should be decided by the whole community. I'll forward this
to dev@ and user@ ML.

Thank you~

Xintong Song


[1] https://lists.apache.org/thread/gxwv49ssq82g06dbhy339x6rdxtlcv3d
[2] https://lists.apache.org/thread/kcym1sozkrtwxw1fjbnwk1nqrrlzolcc
[3] https://lists.apache.org/thread/7rmd3ov6sv3wwhflp97n4czz25hvmqm6
[4] https://lists.apache.org/thread/n5y1kzv50bkkbl3ys494dglyxl45bmts
[5] https://lists.apache.org/thread/fzwd3lj0x53hkq3od5ot0y719dn3kj1j

On Fri, May 6, 2022 at 3:05 PM Chesnay Schepler  wrote:

> This has been repeatedly discussed on the ML over the years and was
> rejected every time.
>
> I don't see that anything has changed that would invalidate the previously
> raised arguments against it, so I'm still -1 on it.
>
> This is also not something the PMC should decide anyway, but the project
> as a whole.
>
> On 06/05/2022 06:48, Jark Wu wrote:
>
> Thank Xintong, for starting this exciting topic.
>
> I think Slack would be an essential addition to the mailing list.
> I have talked with some Flink users, and they are surprised
> Flink doesn't have Slack yet, and they would love to use Slack.
> We can also see a trend that new open-source communities
> are using Slack as the community base camp.
>
> Slack is also helpful for brainstorming and asking people for opinions and
> use cases.
> I think Slack is not only another place for Q but also a connection to
> the Flink users.
> We can create more channels to make the community have more social
> attributes, for example,
>  - Share ideas, projects, integrations, articles, and presentations
> related to Flink in the #shows channel
>  - Flink releases, events in the #news channel
>
> Thus, I'm +1 to create an Apache Flink slack, and I can help set up the
> Flink slack and maintain it.
>
> Best,
> Jark
>
> On Fri, 6 May 2022 at 10:38, Xintong Song  wrote:
>
>> Hi all,
>>
>> I’d like to start a discussion on creating an Apache Flink slack
>> workspace.
>>
>> ## Motivation
>> Today many organizations choose to do real time communication through
>> slack. IMHO, we, Flink, as a technique for real time computing, should
>> embrace the more real time way for communication, especially for ad-hoc
>> questions and interactions. With more and more contributors from different
>> organizations joining this community, it would be good to provide a common
>> channel for such real time communications. Therefore, I'd propose to create
>> an Apache Flink slack workspace that is maintained by the Flink PMC.
>>
>> ## Benefits
>> - Easier to reach out to people. 

Re: Missing metrics in Flink v 1.15.0 rc-0

2022-04-06 Thread Xintong Song
Hi Peter,

Have you compared the DAT topologies in 1.15 / 1.14?

I think it's expected that "Records Received", "Bytes Sent" and "Records
Sent" are 0. These metrics trace the internal data exchanges between Flink
tasks. External data changes, i.e., source reading / sink writing data from
/ to external systems, are not counted. In your case, there's only 1
vertex in the DAG, thus no internal data exchanges.

Thank you~

Xintong Song



On Wed, Apr 6, 2022 at 11:21 PM Peter Schrott  wrote:

> Hi there,
>
> I just successfully upgraded our Flink cluster to 1.15.0 rc0 - also the
> corresponding job is running on this version. Looks great so far!
>
> In the Web UI I noticed some metrics are missing, especially "Records
> Received", "Bytes Sent" and "Records Sent". Those were shown in v 1.14.4.
> See attached screenshot.
>
> Other than that I noticed, when using
> org.apache.flink.metrics.prometheus.PrometheusReporter , the taskmanager
> on which the job is running does not report the metrics on the configured
> port. Rather it returns:
>
> ➜  ~ curl http://flink-taskmanager-xx:/
> curl: (52) Empty reply from server
>
> The other taskmanager reports metrics.
>
> The exporter is configured as followed:
>
> # Prometheus metrics
> metrics.reporters: prom
> metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.port: xx
>
> Is this a known issue with flink 1.15 rc0?
>
> Best, Peter
>
> [image: missingmetricsinui.png]
>


Re: Flink ad-hoc方向问题

2022-03-24 Thread Xintong Song
是有这个规划的。

目前已经有一些相对零散的调度性能方面的优化在社区做起来了 [1],后续还有一些比较大的 feature 还在酝酿中。

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-25318

On Thu, Mar 24, 2022 at 1:54 PM LuNing Wang  wrote:

>
> Flink未来会在ad-hoc方向投入吗?类似Flink自带Trino/Presto的性能优化方式,这样批、流、OLAP/ad-hoc只需要一个引擎就可以。
>


Re: TM OOMKilled

2022-02-15 Thread Xintong Song
Thanks Alexey,

In my experience, common causes for TM OOMKill are:
1. RocksDB uses more memory than expected. Unfortunately, the memory hard
limit is not supported by RocksDB. Flink conservatively estimates RocksDB's
memory footprint and tunes its parameters accordingly, which is not 100%
safe.
2. The job (connectors, udfs, and their dependencies) may need direct and
native memory. When native memory is needed, increasing task off-heap
memory may not be as helpful as increasing the jvm overhead.
3. There could also be memory leaks, leading to continuously increasing
memory footprint. Based on your description that the OOM happens about
every 2days, this is highly suspected.

For 1 & 2, increase jvm overhead would help. For 3, you many need to
investigate the heap/thread dump to find out where the leak come from.

I'd suggest to first increase the jvm overhead see if it fix the problem.
If the problem is not fixed, but the job runs longer before the OOM
happens, then it's likely the 3rd case. Moreover, you can monitor the pod
memory footprint changes if such metrics are available.

Thank you~

Xintong Song



On Tue, Feb 15, 2022 at 11:56 PM Alexey Trenikhun  wrote:

> Hi Xintong,
> I've checked - `state.backend.rocksdb.memory.managed` is not explicitly
> configured, so as you wrote it should be true by default.
>
> Regarding task off-heap, I believe KafkaConsumer needed off-heap memory
> some time ago
>
> ------
> *From:* Xintong Song 
> *Sent:* Monday, February 14, 2022 10:06 PM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: TM OOMKilled
>
> Hi Alexey,
>
> You may want to double check if `state.backend.rocksdb.memory.managed` is
> configured to `true`. (This should be `true` by default.)
>
> Another question that may or may not be related. I noticed that you have
> configured 128MB task off-heap memory, which IIRC the default should be 0.
> Could you share what that is for?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Feb 15, 2022 at 12:10 PM Alexey Trenikhun  wrote:
>
> Hello,
> We use RocksDB, but there is no problem with Java heap, which is limited
> by 3.523gb, the problem with total container memory. The pod is killed
> not due OutOfMemoryError,  but because total container memory exceeds 10gb
>
> Thanks,
> Alexey
> --
> *From:* Caizhi Weng 
> *Sent:* Monday, February 14, 2022 6:42:05 PM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: TM OOMKilled
>
> Hi!
>
> Heap memory usage depends heavily on your job and your state backend.
> Which state backend are you using and if possible could you share your user
> code or explain what operations your job is doing?
>
> Alexey Trenikhun  于2022年2月15日周二 05:17写道:
>
> Hello,
> We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also
> have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *),
> once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on
> even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't
> see failed save points, so I assume OOM happens right after savepoint
> taken. However OOMKilled doesn't happen on every save point, so maybe this
> is a random correlation.
> I've reserved 2G for JVM overhead, but somehow it is not enough ? Any
> known issues with memory and savepoints? Any suggestions how to
> troubleshoot this?
>
>  Final TaskExecutor Memory configuration:
>Total Process Memory:  10.000gb (10737418240 bytes)
>  Total Flink Memory:  7.547gb (8103395328 bytes)
>Total JVM Heap Memory: 3.523gb (3783262149 bytes)
>  Framework:   128.000mb (134217728 bytes)
>  Task:3.398gb (3649044421 bytes)
>Total Off-heap Memory: 4.023gb (4320133179 bytes)
>  Managed: 3.019gb (3241358179 bytes)
>  Total JVM Direct Memory: 1.005gb (1078775000 bytes)
>Framework: 128.000mb (134217728 bytes)
>Task:  128.000mb (134217728 bytes)
>Network:   772.800mb (810339544 bytes)
>  JVM Metaspace:   256.000mb (268435456 bytes)
>  JVM Overhead:2.203gb (2365587456 bytes)
>
> Thanks,
> Alexey
>
>


Re: TM OOMKilled

2022-02-14 Thread Xintong Song
Hi Alexey,

You may want to double check if `state.backend.rocksdb.memory.managed` is
configured to `true`. (This should be `true` by default.)

Another question that may or may not be related. I noticed that you have
configured 128MB task off-heap memory, which IIRC the default should be 0.
Could you share what that is for?

Thank you~

Xintong Song



On Tue, Feb 15, 2022 at 12:10 PM Alexey Trenikhun  wrote:

> Hello,
> We use RocksDB, but there is no problem with Java heap, which is limited
> by 3.523gb, the problem with total container memory. The pod is killed
> not due OutOfMemoryError,  but because total container memory exceeds 10gb
>
> Thanks,
> Alexey
> --
> *From:* Caizhi Weng 
> *Sent:* Monday, February 14, 2022 6:42:05 PM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: TM OOMKilled
>
> Hi!
>
> Heap memory usage depends heavily on your job and your state backend.
> Which state backend are you using and if possible could you share your user
> code or explain what operations your job is doing?
>
> Alexey Trenikhun  于2022年2月15日周二 05:17写道:
>
> Hello,
> We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also
> have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *),
> once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on
> even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't
> see failed save points, so I assume OOM happens right after savepoint
> taken. However OOMKilled doesn't happen on every save point, so maybe this
> is a random correlation.
> I've reserved 2G for JVM overhead, but somehow it is not enough ? Any
> known issues with memory and savepoints? Any suggestions how to
> troubleshoot this?
>
>  Final TaskExecutor Memory configuration:
>Total Process Memory:  10.000gb (10737418240 bytes)
>  Total Flink Memory:  7.547gb (8103395328 bytes)
>Total JVM Heap Memory: 3.523gb (3783262149 bytes)
>  Framework:   128.000mb (134217728 bytes)
>  Task:3.398gb (3649044421 bytes)
>Total Off-heap Memory: 4.023gb (4320133179 bytes)
>  Managed: 3.019gb (3241358179 bytes)
>  Total JVM Direct Memory: 1.005gb (1078775000 bytes)
>Framework: 128.000mb (134217728 bytes)
>Task:  128.000mb (134217728 bytes)
>Network:   772.800mb (810339544 bytes)
>  JVM Metaspace:   256.000mb (268435456 bytes)
>  JVM Overhead:2.203gb (2365587456 bytes)
>
> Thanks,
> Alexey
>
>


Re: [DISCUSS] Future of Per-Job Mode

2022-01-24 Thread Xintong Song
Sorry for joining the discussion late.

I'm leaning towards deprecating the per-job mode soonish, and eventually
dropping it in the long-term.
- One less deployment mode makes it easier for users (especially newcomers)
to understand. Deprecating the per-job mode sends the signal that it is
legacy, not recommended, and in most cases users do not need to care about
it.
- For most (if not all) user demands that are satisfied by the per-job mode
but not by the application mode, AFAICS, they can be either workaround or
eventually addressed by the application mode. E.g., make application mode
support shipping local dependencies.
- I'm not sure about dropping the per-job mode soonish, as many users are
still working with it. We'd better not force these users to migrate to the
application mode when upgrading the Flink version.

Thank you~

Xintong Song



On Fri, Jan 21, 2022 at 4:30 PM Konstantin Knauf  wrote:

> Thanks Thomas & Biao for your feedback.
>
> Any additional opinions on how we should proceed with per job-mode? As you
> might have guessed, I am leaning towards proposing to deprecate per-job
> mode.
>
> On Thu, Jan 13, 2022 at 5:11 PM Thomas Weise  wrote:
>
>> Regarding session mode:
>>
>> ## Session Mode
>> * main() method executed in client
>>
>> Session mode also supports execution of the main method on Jobmanager
>> with submission through REST API. That's how Flinkk k8s operators like
>> [1] work. It's actually an important capability because it allows for
>> allocation of the cluster resources prior to taking down the previous
>> job during upgrade when the goal is optimization for availability.
>>
>> Thanks,
>> Thomas
>>
>> [1] https://github.com/lyft/flinkk8soperator
>>
>> On Thu, Jan 13, 2022 at 12:32 AM Konstantin Knauf 
>> wrote:
>> >
>> > Hi everyone,
>> >
>> > I would like to discuss and understand if the benefits of having Per-Job
>> > Mode in Apache Flink outweigh its drawbacks.
>> >
>> >
>> > *# Background: Flink's Deployment Modes*
>> > Flink currently has three deployment modes. They differ in the following
>> > dimensions:
>> > * main() method executed on Jobmanager or Client
>> > * dependencies shipped by client or bundled with all nodes
>> > * number of jobs per cluster & relationship between job and cluster
>> > lifecycle* (supported resource providers)
>> >
>> > ## Application Mode
>> > * main() method executed on Jobmanager
>> > * dependencies already need to be available on all nodes
>> > * dedicated cluster for all jobs executed from the same main()-method
>> > (Note: applications with more than one job, currently still significant
>> > limitations like missing high-availability). Technically, a session
>> cluster
>> > dedicated to all jobs submitted from the same main() method.
>> > * supported by standalone, native kubernetes, YARN
>> >
>> > ## Session Mode
>> > * main() method executed in client
>> > * dependencies are distributed from and by the client to all nodes
>> > * cluster is shared by multiple jobs submitted from different clients,
>> > independent lifecycle
>> > * supported by standalone, Native Kubernetes, YARN
>> >
>> > ## Per-Job Mode
>> > * main() method executed in client
>> > * dependencies are distributed from and by the client to all nodes
>> > * dedicated cluster for a single job
>> > * supported by YARN only
>> >
>> >
>> > *# Reasons to Keep** There are use cases where you might need the
>> > combination of a single job per cluster, but main() method execution in
>> the
>> > client. This combination is only supported by per-job mode.
>> > * It currently exists. Existing users will need to migrate to either
>> > session or application mode.
>> >
>> >
>> > *# Reasons to Drop** With Per-Job Mode and Application Mode we have two
>> > modes that for most users probably do the same thing. Specifically, for
>> > those users that don't care where the main() method is executed and
>> want to
>> > submit a single job per cluster. Having two ways to do the same thing is
>> > confusing.
>> > * Per-Job Mode is only supported by YARN anyway. If we keep it, we
>> should
>> > work towards support in Kubernetes and Standalone, too, to reduce
>> special
>> > casing.
>> > * Dropping per-job mode would reduce complexity in the code and allow
>> us to
>> > dedicate more resources to the other two deploym

Re: Flink native k8s integration vs. operator

2022-01-13 Thread Xintong Song
Thanks for volunteering to drive this effort, Marton, Thomas and Gyula.

Looking forward to the public discussion. Please feel free to reach out if
there's anything you need from us.

Thank you~

Xintong Song



On Fri, Jan 14, 2022 at 8:27 AM Chenya Zhang 
wrote:

> Thanks Thomas, Gyula, and Marton for driving this effort! It would greatly
> ease the adoption of Apache Flink on Kubernetes and help to address the
> current operational pain points as mentioned. Look forward to the proposal
> and more discussions!
>
> Best,
> Chenya
>
> On Thu, Jan 13, 2022 at 12:15 PM Márton Balassi 
> wrote:
>
>> Hi All,
>>
>> I am pleased to see the level of enthusiasm and technical consideration
>> already emerging in this thread. I wholeheartedly support building an
>> operator and endorsing it via placing it under the Apache Flink umbrella
>> (as a separate repository) as the current lack of it is clearly becoming
>> an
>> adoption bottleneck for large scale Flink users. The next logical step is
>> to write a FLIP to agree on the technical details, so that we can put
>> forward the proposal to the Flink PMC for creating a new repository with a
>> clear purpose in mind. I volunteer to work with Thomas and Gyula on the
>> initial wording on the proposal which we will put up for public discussion
>> in the coming weeks.
>>
>> Best,
>> Marton
>>
>> On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf 
>> wrote:
>>
>> > Hi Thomas,
>> >
>> > Yes, I was referring to a separate repository under Apache Flink.
>> >
>> > Cheers,
>> >
>> > Konstantin
>> >
>> > On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
>> >
>> >> Hi everyone,
>> >>
>> >> Thanks for the feedback and discussion. A few additional thoughts:
>> >>
>> >> [Konstantin] > With respect to common lifecycle management operations:
>> >> these features are
>> >> > not available (within Apache Flink) for any of the other resource
>> >> providers
>> >> > (YARN, Standalone) either. From this perspective, I wouldn't consider
>> >> this
>> >> > a shortcoming of the Kubernetes integration.
>> >>
>> >> I think time and evolution of the ecosystem are factors to consider as
>> >> well. The state and usage of Flink was much different when YARN
>> >> integration was novel. Expectations are different today and the
>> >> lifecycle functionality provided by an operator may as well be
>> >> considered essential to support the concept of a Flink application on
>> >> k8s. After few years learning from operator experience outside of
>> >> Flink it might be a good time to fill the gap.
>> >>
>> >> [Konstantin] > I still believe that we should keep this focus on low
>> >> > level composable building blocks (like Jobs and Snapshots) in Apache
>> >> Flink
>> >> > to make it easy for everyone to build fitting higher level
>> abstractions
>> >> > like a FlinkApplication Custom Resource on top of it.
>> >>
>> >> I completely agree that it is important that the basic functions of
>> >> Flink are solid and continued focus is necessary. Thanks for sharing
>> >> the pointers, these are great improvements. At the same time,
>> >> ecosystem, contributor base and user spectrum are growing. There have
>> >> been significant additions in many areas of Flink including connectors
>> >> and higher level abstractions like statefun, SQL and Python. It's also
>> >> evident from additional repositories/subprojects that we have in Flink
>> >> today.
>> >>
>> >> [Konstantin] > Having said this, if others in the community have the
>> >> capacity to push and
>> >> > *maintain* a somewhat minimal "reference" Kubernetes Operator for
>> Apache
>> >> > Flink, I don't see any blockers. If or when this happens, I'd see
>> some
>> >> > clear benefits of using a separate repository (easier independent
>> >> > versioning and releases, different build system & tooling (go, I
>> >> assume)).
>> >>
>> >> Naturally different contributors to the project have different focus.
>> >> Let's find out if there is strong enough interest to take this on and
>> >> strong enough commitment to maintain. As I see it, there is a
>> >> tremendous amount of internal investment goi

Re: Flink native k8s integration vs. operator

2022-01-06 Thread Xintong Song
Hi folks,

Thanks for the discussion. I'd like to share my two cents on this topic.

Firstly, I'd like to clarify my understanding of the concepts "native k8s
integration" and "active resource management".
- Native k8s integration means Flink's master interacts with k8s' api
server directly. It acts like embedding an operator inside Flink's master,
which manages the resources (pod, deployment, configmap, etc.) and watches
/ reacts to related events.
- Active resource management means Flink can actively start / terminate
workers as needed. Its key characteristic is that the resource a Flink
deployment uses is decided by the job's execution plan, unlike the opposite
reactive mode (resource available to the deployment decides the execution
plan) or the standalone mode (both execution plan and deployment resources
are predefined).

Currently, we have the yarn and native k8s deployments (and the recently
removed mesos deployment) in active mode, due to their ability to request /
release worker resources from the underlying cluster. And all the existing
operators, AFAIK, work with a Flink standalone deployment, where Flink
cannot request / release resources by itself.

>From this perspective, I think a large part of the native k8s integration
advantages come from the active mode: being able to better understand the
job's resource requirements and adjust the deployment resource accordingly.
Both fine-grained resource management (customizing TM resources for
different tasks / operators) and adaptive batch scheduler (rescale the
deployment w.r.t. different stages) fall into this category.

I'm wondering if we can have an operator that also works with the active
mode. Instead of talking to the api server directly for adding / deleting
resources, Flink's active resource manager can talk to the operator (via
CR) about the resources the deployment needs, and let the operator to
actually add / remove the resources. The operator should be able to work
with (active) or without (standalone) the information of deployment's
resource requirements. In this way, users are free to choose between active
and reactive (e.g., HPA) rescaling, while always benefiting from the
beyond-deployment lifecycle (upgrades, savepoint management, etc.) and
alignment with the K8s ecosystem (Flink client free, operating via kubectl,
etc.).

Thank you~

Xintong Song



On Thu, Jan 6, 2022 at 1:06 PM Thomas Weise  wrote:

> Hi David,
>
> Thank you for the reply and context!
>
> As for workload types and where native integration might fit: I think
> that any k8s native solution that satisfies category 3) can also take
> care of 1) and 2) while the native integration by itself can't achieve
> that. Existence of [1] might serve as further indication.
>
> The k8s operator pattern would be an essential building block for a
> k8s native solution that is interoperable with k8s ecosystem tooling
> like kubectl, which is why [2] and subsequent derived art were
> created. Specifically the CRD allows us to directly express the
> concept of a Flink application consisting of job manager and task
> manager pods along with associated create/update/delete operations.
>
> Would it make sense to gauge interest to have such an operator as part
> of Flink? It appears so from discussions like [3]. I think such
> addition would significantly lower the barrier to adoption, since like
> you mentioned one cannot really run mission critical streaming
> workloads with just the Apache Flink release binaries alone. While it
> is great to have multiple k8s operators to choose from that are
> managed outside Flink, it is unfortunately also evident that today's
> hot operator turns into tomorrow's tech debt. I think such fate would
> be less likely within the project, when multiple parties can join
> forces and benefit from each other's contributions. There were similar
> considerations and discussions around Docker images in the past.
>
> Out of the features that you listed it is particularly the application
> upgrade that needs to be solved through an external process like
> operator. The good thing is that many folks have already thought hard
> about this and in existing implementations we see different strategies
> that have their merit and production mileage (certainly applies to
> [2]). We could combine the best of these ideas into a unified
> implementation as part of Flink itself as starting point.
>
> Cheers,
> Thomas
>
>
> [1] https://github.com/wangyang0918/flink-native-k8s-operator
> [2] https://github.com/lyft/flinkk8soperator
> [3] https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080
>
>
> On Tue, Jan 4, 2022 at 4:04 AM David Morávek  wrote:
> >
> > Hi Thomas,
> >
> > AFAIK there are no specific plans in this direction with the native
> integration, but I'd 

Re: How to handle java.lang.OutOfMemoryError: Metaspace

2021-12-26 Thread Xintong Song
Hi John,

Sounds to me you have a Flink standalone cluster deployed directly on
physical hosts. If that is the case, use `t.m.flink.size` instead of
`t.m.process.size`. The latter does not limit the overall memory
consumption of the processes, and is only used for calculating how much
non-JVM memory the process should leave in a containerized setup, which
does no good in a non-containerized setup.

When running into a Metaspace OOM, the standard solution is to increase
`t.m.jvm-metaspace.size`. If this is impractical due to the physical
limitations, you may also try to decrease `taskmanager.numberOfTaskSlots`.
If you have multiple jobs submitted to a shared Flink cluster, decreasing
the number of slots in a task manager should also reduce the amount of
classes loaded by the JVM, thus requiring less metaspace.

Thank you~

Xintong Song



On Mon, Dec 27, 2021 at 9:08 AM John Smith  wrote:

> Ok I tried taskmanager.memory.process.size: 7168m
>
> It's worst, the task manager can barely start before it throws
> java.lang.OutOfMemoryError: Metaspace
>
> I will try...
> taskmanager.memory.flink.size: 5120m
> taskmanager.memory.jvm-metaspace.size: 2048m
>
>
> On Sun, 26 Dec 2021 at 19:46, John Smith  wrote:
>
>> Hi running Flink 1.10
>>
>> I have
>>
>> taskmanager.memory.flink.size: 6144m
>> taskmanager.memory.jvm-metaspace.size: 1024m
>> taskmanager.numberOfTaskSlots: 8
>> parallelism.default: 1
>>
>> 1- The host has a physical ram of 8GB. I'm better off just to configure
>> "taskmanager.memory.process.size" as 7GB and let flink figure it out?
>> 2- Is there a way for me to calculate how much metspace my jobs require
>> or are using?
>>
>> 2021-12-24 04:53:32,511 ERROR
>> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
>> Thread 'flink-akka.actor.default-dispatcher-86' produced an uncaught
>> exception. Stopping the process...
>> java.lang.OutOfMemoryError: Metaspace
>>
>


Re: 托管内存为什么不能够指定最小或者最大值?

2021-12-21 Thread Xintong Song
Network 和 JVM Overhead 之所以采用了 min-max,是因为这两项如果太小往往会导致
Failure,如果太大也并不会对性能有多少帮助属于资源浪费。

相反,Managed 内存有时候可以很小,甚至一些场景下可以为 0,且增大 Managed 内存通常是有助于提高性能的,所以设计上没有引入
min-max 的配置。

Thank you~

Xintong Song



On Wed, Dec 22, 2021 at 10:53 AM johnjlong  wrote:

> 大佬们,托管内存为什么不能够指定最小或者最大值?
> 还是说 taskmanager.memory.managed.fraction 计算出来的就是最大值?
>
>
> | |
> johnjlong
> |
> |
> johnjl...@163.com
> |
> 签名由网易邮箱大师定制


Re: [DISCUSS] Changing the minimal supported version of Hadoop

2021-12-21 Thread Xintong Song
Sorry to join the discussion late.

+1 for dropping support for hadoop versions < 2.8 from my side.

TBH, warping the reflection based logic with safeguards sounds a bit
neither fish nor fowl to me. It weakens the major benefits that we look for
by dropping support for early versions.
- The codebase is simplified, but not significantly. We still have the
complexity of understanding which APIs may not exist in early versions.
- Without CI, we provide no guarantee that Flink will still work with early
hadoop versions. Or otherwise we fail to simplify the CI.

I'd suggest to say we no longer support hadoop versions < 2.8 at all. And
if that is not permitted by our users, we may consider to keep the codebase
as is and wait for a bit longer.

WDYT?

Thank you~

Xintong Song


[1]
https://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-common/Compatibility.html#Wire_compatibility

On Wed, Dec 22, 2021 at 12:52 AM David Morávek  wrote:

> CC user@f.a.o
>
> Is anyone aware of something that blocks us from doing the upgrade?
>
> D.
>
> On Tue, Dec 21, 2021 at 5:50 PM David Morávek 
> wrote:
>
>> Hi Martijn,
>>
>> from person experience, most Hadoop users are lagging behind the release
>> lines by a lot, because upgrading a Hadoop cluster is not really a simply
>> task to achieve. I think for now, we can stay a bit conservative, nothing
>> blocks us for using 2.8.5 as we don't use any "newer" APIs in the code.
>>
>> As for Till's concern, we can still wrap the reflection based logic, to
>> be skipped in case of "NoClassDefFound" instead of "ClassNotFound" as we do
>> now.
>>
>> D.
>>
>>
>> On Tue, Dec 14, 2021 at 5:23 PM Martijn Visser 
>> wrote:
>>
>>> Hi David,
>>>
>>> Thanks for bringing this up for discussion! Given that Hadoop 2.8 is
>>> considered EOL, shouldn't we bump the version to Hadoop 2.10? [1]
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>>
>>> https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Active+Release+Lines
>>>
>>> On Tue, 14 Dec 2021 at 10:28, Till Rohrmann 
>>> wrote:
>>>
>>> > Hi David,
>>> >
>>> > I think we haven't updated our Hadoop dependencies in a long time.
>>> Hence,
>>> > it is probably time to do so. So +1 for upgrading to the latest patch
>>> > release.
>>> >
>>> > If newer 2.x Hadoop versions are compatible with 2.y with x >= y, then
>>> I
>>> > don't see a problem with dropping support for pre-bundled Hadoop
>>> versions <
>>> > 2.8. This could indeed help us decrease our build matrix a bit and,
>>> thus,
>>> > saving some build time.
>>> >
>>> > Concerning simplifying our code base to get rid of reflection logic
>>> etc. we
>>> > still might have to add a safeguard for features that are not
>>> supported by
>>> > earlier versions. According to the docs
>>> >
>>> > > YARN applications that attempt to use new APIs (including new fields
>>> in
>>> > data structures) that have not yet been deployed to the cluster can
>>> expect
>>> > link exceptions
>>> >
>>> > we can see link exceptions. We could get around this by saying that
>>> Flink
>>> > no longer supports Hadoop < 2.8. But this should be checked with our
>>> users
>>> > on the user ML at least.
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Dec 14, 2021 at 9:25 AM David Morávek  wrote:
>>> >
>>> > > Hi,
>>> > >
>>> > > I'd like to start a discussion about upgrading a minimal Hadoop
>>> version
>>> > > that Flink supports.
>>> > >
>>> > > Even though the default value for `hadoop.version` property is set to
>>> > > 2.8.3, we're still ensuring both runtime and compile compatibility
>>> with
>>> > > Hadoop 2.4.x with the scheduled pipeline[1].
>>> > >
>>> > > Here is list of dates of the latest releases for each minor version
>>> up to
>>> > > 2.8.x
>>> > >
>>> > > - Hadoop 2.4.1: Last commit on 6/30/2014
>>> > > - Hadoop 2.5.2: Last commit on 11/15/2014
>>> > > - Hadoop 2.6.5: Last commit on 10/11/2016
>>> > > - Hadoop 2.7.7: Last commit on 7/18/2018
>>> > > - Hadoop 2.8.5: Last commit on 

Re: Direct buffer memory in job with hbase client

2021-12-15 Thread Xintong Song
Hi Anton,

You may want to try increasing the task off-heap memory, as your tasks are
using hbase client which needs off-heap (direct) memory. The default task
off-heap memory is 0 because most tasks do not use off-heap memory.

Unfortunately, I cannot advise on how much task off-heap memory your job
needs, which probably depends on your hbase client configurations.

Thank you~

Xintong Song



On Wed, Dec 15, 2021 at 1:40 PM Anton  wrote:

> Hi, from time to time my job is stopping to process messages with warn
> message listed below. Tried to increase jobmanager.memory.process.size and
> taskmanager.memory.process.size but it didn’t help.
>
> What else can I try? “Framework Off-heap” is 128mb now as seen is task
> manager dashboard and Task Off-heap is 0b. Documentation says that “You
> should only change this value if you are sure that the Flink framework
> needs more memory.” And I’m not sure about it.
>
> Flink version is 1.13.2.
>
>
>
> 2021-11-29 14:06:53,659 WARN
> org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline [] - An
> exceptionCaught() event was fired, and it reached at the tail of the
> pipeline. It usually means the last handler in the pipeline did not handle
> the exception.
>
> org.apache.hbase.thirdparty.io.netty.channel.ChannelPipelineException:
> org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler.handlerAdded()
> has thrown an exception; removed.
>
> at
> org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:624)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline.addFirst(DefaultChannelPipeline.java:181)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline.addFirst(DefaultChannelPipeline.java:358)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline.addFirst(DefaultChannelPipeline.java:339)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hadoop.hbase.ipc.NettyRpcConnection.saslNegotiate(NettyRpcConnection.java:215)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hadoop.hbase.ipc.NettyRpcConnection.access$600(NettyRpcConnection.java:76)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hadoop.hbase.ipc.NettyRpcConnection$2.operationComplete(NettyRpcConnection.java:289)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hadoop.hbase.ipc.NettyRpcConnection$2.operationComplete(NettyRpcConnection.java:277)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
> [blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261caa9da2:?]
>
> at
> org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
> [blob_p-6eb282e9e614ab47d8c0b44

Re: High Availability on Kubernetes

2021-10-25 Thread Xintong Song
Without HA, your job can restore from the latest successful checkpoint only
if your jobmanager process / pod has not failed. If the jobmanager failed,
the new jobmanager brought up by Kubernetes will not be able to find the
latest successful checkpoint without HA. Jobmanager can fail due to not
only pod evictions, but also other problems (jvm out-of-memory, remote
storage connection downtime, etc.).

Thank you~

Xintong Song



On Tue, Oct 26, 2021 at 7:39 AM Deshpande, Omkar 
wrote:

> Hello,
>
> We are running flink on Kubernetes(Standalone) in application cluster
> mode. The job manager is deployed as a deployment.
> We only deploy one instance/replica of job manager. So, the leader
> election service is not required.
> And we have set flink task execution retries to infinite.
>
> Do we still need a HA setup? We have tested our application without
> configuring the HA, and it seems to restore from checkpoints after failures.
> Does the flink job manager keep the information that it would otherwise
> store in HA system, in memory?
> If it does, then the only reason to configure HA is to achieve resiliency
> in case of pod evictions(caused by node failures or scheduling etc.)?
>
> Thanks,
> Omkar
>


Re: 如何查看1.10的中文文档

2021-10-08 Thread Xintong Song
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/

Thank you~

Xintong Song



On Sat, Oct 9, 2021 at 10:57 AM 杨浩  wrote:

> 我们公司用的flink版本是release-1.10,请问如何查看该版本的中文文档,
>
>
> 英文文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/
> 中文只能看最新的:https://flink.apache.org/zh/flink-architecture.html
>
>


[ANNOUNCE] Release 1.14.0, release candidate #0

2021-08-29 Thread Xintong Song
Hi everyone,

The RC0 for Apache Flink 1.14.0 has been created. This is still a
preview-only release candidate to drive the current testing efforts and so
no official votes will take place. It has all the artifacts that we would
typically have for a release, except for the release note and the website
pull request for the release announcement.

The following contents are available for your review:
- the preview source release and binary convenience releases [1], which are
signed with the key with
fingerprint F8E419AA0B60C28879E876859DFF40967ABFC5A4 [2].
- all artifacts that would normally be deployed to the Maven Central
Repository [3].
- source code tag "release-1.14.0-rc0" [4]

Your help testing the release will be greatly appreciated!

Thank you~
David, Joe & Xintong

[1] https://dist.apache.org/repos/dist/dev/flink/flink-1.14.0-rc0/
[2] https://dist.apache.org/repos/dist/release/flink/KEYS
[3] https://repository.apache.org/content/repositories/orgapacheflink-1448
[4] https://github.com/apache/flink/releases/tag/release-1.14.0-rc0


Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-09 Thread Xintong Song
Thanks Yun and everyone~!

Thank you~

Xintong Song



On Mon, Aug 9, 2021 at 10:14 PM Till Rohrmann  wrote:

> Thanks Yun Tang for being our release manager and the great work! Also
> thanks a lot to everyone who contributed to this release.
>
> Cheers,
> Till
>
> On Mon, Aug 9, 2021 at 9:48 AM Yu Li  wrote:
>
>> Thanks Yun Tang for being our release manager and everyone else who made
>> the release possible!
>>
>> Best Regards,
>> Yu
>>
>>
>> On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:
>>
>>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.13.2, which is the second bugfix release for the Apache
>>> Flink 1.13 series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218==12315522
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Regards,
>>> Yun Tang
>>>
>>


Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-09 Thread Xintong Song
Thanks Yun and everyone~!

Thank you~

Xintong Song



On Mon, Aug 9, 2021 at 10:14 PM Till Rohrmann  wrote:

> Thanks Yun Tang for being our release manager and the great work! Also
> thanks a lot to everyone who contributed to this release.
>
> Cheers,
> Till
>
> On Mon, Aug 9, 2021 at 9:48 AM Yu Li  wrote:
>
>> Thanks Yun Tang for being our release manager and everyone else who made
>> the release possible!
>>
>> Best Regards,
>> Yu
>>
>>
>> On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:
>>
>>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.13.2, which is the second bugfix release for the Apache
>>> Flink 1.13 series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218==12315522
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Regards,
>>> Yun Tang
>>>
>>


Re: Memory usage UI

2021-07-01 Thread Xintong Song
Hi Sudharsan,

The non-heap max is decided by JVM automatically and is not controlled by
Flink. Moreover, it doesn't mean Flink will use up to that size of non-heap
memory.

These metrics are fetched directly from JVM and do not correspond well with
Flink's memory configurations, which very often lead to confusions.

Since Flink-1.12, we have introduced a new web ui for the memory metrics,
where the legacy metrics are preserved only for backward compatibility and
are placed in an `Advanced` pane. I'd recommend ignoring them in 99% of the
cases.

Thank you~

Xintong Song



On Fri, Jul 2, 2021 at 2:34 AM Sudharsan R  wrote:

> Hi,
>
> On my flink setup, I have taskmanager.memory.process.size  set to 2536M. I
> expect all the memory components shown on the UI to add up to this number.
> However, I don't see this.
>
>
> I have flink managed memory: 811Mb
>
> JVM heap max: 886Mb
>
> JVM non-heap max: 744Mb
>
> Direct memory: 204Mb
>
>
> This adds up to 2645Mb. Am i adding up things i should not be?
>
> Also, how was JVM non-heap max derived to be 744Mb?
>
>
> Thanks
>
> Sudharsan
>
>
>


Re: Flink v1.12.2 Kubernetes Session Mode cannot mount log4j.properties in configMap

2021-06-21 Thread Xintong Song
Hi Chenyu,

First of all, there are two different ways of deploying Flink on Kubernetes.
- Standalone Kubernetes [1], which uses yaml files to deploy a Flink
Standalone cluster on Kubernetes.
- Native Kubernetes [2], which Flink ResourceManager interacts with
Kubernetes API Server and allocates resources dynamically.

>From what you've described, it seems to me you are using the standalone
Kubernetes deployment. The codes you find are for the native Kubernetes
deployment, and should have no effect in your case.

Here are examples how to mount flink-conf.yaml and log4j-console.properties
in a session cluster [3]. Please be aware that in standalone Kubernetes
deployment, Flink looks for log4j-console.properties instead of
log4j.properties. By default, this will write the logs to stdout, so that
the logs can be viewed by the `kubectl logs` command.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#session-cluster-resource-definitions

On Sun, Jun 20, 2021 at 10:58 AM Chenyu Zheng  wrote:

> Hi contributors!
>
> I’m trying to setup Flink v1.12.2 in Kubernetes Session Mode, but I found
> that I cannot mount log4j.properties in configmap to the jobmanager
> container. Is this a expected behavior? Could you share me some ways to
> mount log4j.properties to my container?
>
> My yaml:
>
>
>
> *apiVersion: v1*
>
> *data:*
>
> *  flink-conf.yaml: |-*
>
> *taskmanager.numberOfTaskSlots: 1*
>
> *blob.server.port: 6124*
>
> *kubernetes.rest-service.exposed.type: ClusterIP*
>
> *kubernetes.jobmanager.cpu: 1.00*
>
> *high-availability.storageDir:
> s3p://hulu-caposv2-flink-s3-bucket/session-cluster-test/ha-backup/*
>
> *queryable-state.proxy.ports: 6125*
>
> *kubernetes.service-account: stream-app*
>
> *high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory*
>
> *jobmanager.memory.process.size: 1024m*
>
> *taskmanager.memory.process.size: 1024m*
>
> *kubernetes.taskmanager.annotations:
> cluster-autoscaler.kubernetes.io/safe-to-evict:false
> <http://cluster-autoscaler.kubernetes.io/safe-to-evict:false>*
>
> *kubernetes.namespace: test123*
>
> *restart-strategy: fixed-delay*
>
> *restart-strategy.fixed-delay.attempts: 5*
>
> *kubernetes.taskmanager.cpu: 1.00*
>
> *state.backend: filesystem*
>
> *parallelism.default: 4*
>
> *kubernetes.container.image:
> cubox.prod.hulu.com/proxy/flink:1.12.2-scala_2.11-java8-stdout7
> <http://cubox.prod.hulu.com/proxy/flink:1.12.2-scala_2.11-java8-stdout7>*
>
> *kubernetes.taskmanager.labels:
> capos_id:session-cluster-test,stream-component:jobmanager*
>
> *state.checkpoints.dir:
> s3p://hulu-caposv2-flink-s3-bucket/session-cluster-test/checkpoints/*
>
> *kubernetes.cluster-id: session-cluster-test*
>
> *kubernetes.jobmanager.annotations:
> cluster-autoscaler.kubernetes.io/safe-to-evict:false
> <http://cluster-autoscaler.kubernetes.io/safe-to-evict:false>*
>
> *state.savepoints.dir:
> s3p://hulu-caposv2-flink-s3-bucket/session-cluster-test/savepoints/*
>
> *restart-strategy.fixed-delay.delay: 15s*
>
> *taskmanager.rpc.port: 6122*
>
> *jobmanager.rpc.address: session-cluster-test-flink-jobmanager*
>
> *kubernetes.jobmanager.labels:
> capos_id:session-cluster-test,stream-component:jobmanager*
>
> *jobmanager.rpc.port: 6123*
>
> *  log4j.properties: |-*
>
> *logger.kafka.name <http://logger.kafka.name> = org.apache.kafka*
>
> *logger.hadoop.level = INFO*
>
> *appender.rolling.type = RollingFile*
>
> *appender.rolling.filePattern = ${sys:log.file}.%i*
>
> *appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n*
>
> *logger.netty.name <http://logger.netty.name> =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline*
>
> *rootLogger = INFO, rolling*
>
> *logger.akka.name <http://logger.akka.name> = akka*
>
> *appender.rolling.strategy.type = DefaultRolloverStrategy*
>
> *logger.akka.level = INFO*
>
> *appender.rolling.append = false*
>
> *logger.hadoop.name <http://logger.hadoop.name> = org.apache.hadoop*
>
> *appender.rolling.fileName = ${sys:log.file}*
>
> *appender.rolling.poli

Re: Resource Planning

2021-06-15 Thread Xintong Song
Hi Thomas,

It would be helpful if you can provide the jobmanager/taskmanager logs, and
gc logs if possible.

Additionally, you may consider to monitor the cpu/memory related metrics
[1], see if there's anything abnormal when the problem is observed.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html



On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang  wrote:

> Hi,
>
> I'm trying to see if we have been given enough resources (i.e. CPU and
> memory) to each task node to perform a deduplication job. Currently, the
> job is not running very stable. What I have been observing is that after a
> couple of days run, we will suddenly see backpressure happen on one
> arbitrary ec2 instance in the cluster and when that happens, we will have
> to give up the current state and restart the job with an empty state. We
> can no longer take savepoint as it would timeout after 10 minutes, which is
> understandable.
>
> Additional Observations
>
> When the backpressure happens, we see an increase in our state read time
> (we are measuring it using a custom metric) from about 0.1 milliseconds to
> 40-60 milliseconds on that specific problematic ec2 instance. We tried to
> reboot that ec2 instance, so that the corresponding tasks would be assigned
> to a different ec2 instance, but the problem persists.
>
> However, I’m not sure if this read time increase is a symptom or the cause
> of the problem.
>
> Background about this deduplication job:
>
> We are making sessionization with deduplication on an event stream by a
> session key that is embedded in the event. The throughput of the input
> stream is around 50k records per second. The after-aggregation output is
> around 8k records per second.
>
> We are currently using RocksDb-backend state with SSD support and in the
> state, we are storing session keys with a TTL of 1 week. Based on the
> current throughput, this could become really huge. I assume RocksDB would
> flush to the disc as needed, but please correct me if I am wrong.
>
> Information about the cluster:
>
> I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge). I'm
> using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job
> running in the Yarn session.
>
> Questions:
>
> 1. Currently, I'm starting the yarn session w/ 7g memory on both the Task
> Manager and and the Job Manager, so that each Yarn container could get 1
> CPU. Is this setting reasonable based on your experience?
>
> Here is the command I used to start the Yarn cluster:
>
> export HADOOP_CLASSPATH=`hadoop classpath` &&
> /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached
>
> 2. Is there a scientific way to tell what's the right amount of resources
> I should give to an arbitrary job? Or is this a try and see kinda process?
>
> 3. Right now, I'm suspecting resources caused the job to run unstably, but
> I'm not quite sure. Any other potential causes here? How should I debug
> from here if resources are not the issue? Is there a way to detect memory
> leaks?
>
> Thanks in advance!
>
> Thomas
>
>


Re: Add control mode for flink

2021-06-08 Thread Xintong Song
>
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>

TBH, I don't really know yet. We feel that the control flow is a
non-trivial topic and it would be better to bring it up publicly as early
as possible, while the concrete plan is still on the way.

Personally, I'm leaning towards not touching the existing watermarks and
checkpoint barriers in the first step.
- I'd expect the control flow to be introduced as an experimental feature
that takes time to stabilize. It would be better that the existing
important features like checkpointing and watermarks stay unaffected.
- Checkpoint barriers are a little different, as other control messages
somehow rely on it to achieve exactly once consistency. Without the
concrete design, I'm not entirely sure whether it can be properly modeled
as a special case of general control messages.
- Watermarks are probably similar to the other control messages. However,
it's already exposed to users as public APIs. If we want to migrate it to
the new control flow, we'd be very careful not to break any compatibility.


Thank you~

Xintong Song



On Wed, Jun 9, 2021 at 11:30 AM Steven Wu  wrote:

> > producing control events from JobMaster is similar to triggering a
> savepoint.
>
> Paul, here is what I see the difference. Upon job or jobmanager recovery,
> we don't need to recover and replay the savepoint trigger signal.
>
> On Tue, Jun 8, 2021 at 8:20 PM Paul Lam  wrote:
>
>> +1 for this feature. Setting up a separate control stream is too much for
>> many use cases, it would very helpful if users can leverage the built-in
>> control flow of Flink.
>>
>> My 2 cents:
>> 1. @Steven IMHO, producing control events from JobMaster is similar to
>> triggering a savepoint. The REST api is non-blocking, and users should poll
>> the results to confirm the operation is succeeded. If something goes wrong,
>> it’s user’s responsibility to retry.
>> 2. There are two kinds of existing special elements, special stream
>> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
>> flow through the whole DAG, but events needs to be acknowledged by
>> downstream and can overtake records, while stream records are not). So I’m
>> wondering if we plan to unify the two approaches in the new control flow
>> (as Xintong mentioned both in the previous mails)?
>>
>> Best,
>> Paul Lam
>>
>> 2021年6月8日 14:08,Steven Wu  写道:
>>
>>
>> I can see the benefits of control flow. E.g., it might help the old (and
>> inactive) FLIP-17 side input. I would suggest that we add more details of
>> some of the potential use cases.
>>
>> Here is one mismatch with using control flow for dynamic config. Dynamic
>> config is typically targeted/loaded by one specific operator. Control flow
>> will propagate the dynamic config to all operators. not a problem per se
>>
>> Regarding using the REST api (to jobmanager) for accepting control
>> signals from external system, where are we going to persist/checkpoint the
>> signal? jobmanager can die before the control signal is propagated and
>> checkpointed. Did we lose the control signal in this case?
>>
>>
>> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song 
>> wrote:
>>
>>> +1 on separating the effort into two steps:
>>>
>>>1. Introduce a common control flow framework, with flexible
>>>interfaces for generating / reacting to control messages for various
>>>purposes.
>>>2. Features that leverating the control flow can be worked on
>>>concurrently
>>>
>>> Meantime, keeping collecting potential features that may leverage the
>>> control flow should be helpful. It provides good inputs for the control
>>> flow framework design, to make the framework common enough to cover the
>>> potential use cases.
>>>
>>> My suggestions on the next steps:
>>>
>>>1. Allow more time for opinions to be heard and potential use cases
>>>to be collected
>>>    2. Draft a FLIP with the scope of common control flow framework
>>>3. We probably need a poc implementation to make sure the framework
>>>covers at least the following scenarios
>>>   1. Produce control events from arbitrary operators
>>>   2. Produce control 

Re: Re: Add control mode for flink

2021-06-08 Thread Xintong Song
+1 on separating the effort into two steps:

   1. Introduce a common control flow framework, with flexible interfaces
   for generating / reacting to control messages for various purposes.
   2. Features that leverating the control flow can be worked on
   concurrently

Meantime, keeping collecting potential features that may leverage the
control flow should be helpful. It provides good inputs for the control
flow framework design, to make the framework common enough to cover the
potential use cases.

My suggestions on the next steps:

   1. Allow more time for opinions to be heard and potential use cases to
   be collected
   2. Draft a FLIP with the scope of common control flow framework
   3. We probably need a poc implementation to make sure the framework
   covers at least the following scenarios
  1. Produce control events from arbitrary operators
  2. Produce control events from JobMaster
  3. Consume control events from arbitrary operators downstream where
  the events are produced


Thank you~

Xintong Song



On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:

> Very thanks Jiangang for bringing this up and very thanks for the
> discussion!
>
> I also agree with the summarization by Xintong and Jing that control flow
> seems to be
> a common buidling block for many functionalities and dynamic configuration
> framework
> is a representative application that frequently required by users.
> Regarding the control flow,
> currently we are also considering the design of iteration for the
> flink-ml, and as Xintong has pointed
> out, it also required the control flow in cases like detection global
> termination inside the iteration
>  (in this case we need to broadcast an event through the iteration body
> to detect if there are still
> records reside in the iteration body). And regarding  whether to implement
> the dynamic configuration
> framework, I also agree with Xintong that the consistency guarantee would
> be a point to consider, we
> might consider if we need to ensure every operator could receive the
> dynamic configuration.
>
> Best,
> Yun
>
>
>
> --
> Sender:kai wang
> Date:2021/06/08 11:52:12
> Recipient:JING ZHANG
> Cc:刘建刚; Xintong Song [via Apache Flink User
> Mailing List archive.]; user<
> user@flink.apache.org>; dev
> Theme:Re: Add control mode for flink
>
>
>
> I'm big +1 for this feature.
>
>1. Limit the input qps.
>2. Change log level for debug.
>
> in my team, the two examples above are needed
>
> JING ZHANG  于2021年6月8日周二 上午11:18写道:
>
>> Thanks Jiangang for bringing this up.
>> As mentioned in Jiangang's email, `dynamic configuration framework`
>> provides many useful functions in Kuaishou, because it could update job
>> behavior without relaunching the job. The functions are very popular in
>> Kuaishou, we also see similar demands in maillist [1].
>>
>> I'm big +1 for this feature.
>>
>> Thanks Xintong and Yun for deep thoughts about the issue. I like the idea
>> about introducing control mode in Flink.
>> It takes the original issue a big step closer to essence which also
>> provides the possibility for more fantastic features as mentioned in
>> Xintong and Jark's response.
>> Based on the idea, there are at least two milestones to achieve the goals
>> which were proposed by Jiangang:
>> (1) Build a common control flow framework in Flink.
>>  It focuses on control flow propagation. And, how to integrate the
>> common control flow framework with existing mechanisms.
>> (2) Builds a dynamic configuration framework which is exposed to users
>> directly.
>>  We could see dynamic configuration framework is a top application on
>> the underlying control flow framework.
>>  It focuses on the Public API which receives configuration updating
>> requests from users. Besides, it is necessary to introduce an API
>> protection mechanism to avoid job performance degradation caused by too
>> many control events.
>>
>> I suggest splitting the whole design into two after we reach a consensus
>> on whether to introduce this feature because these two sub-topic all need
>> careful design.
>>
>>
>> [
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>> ]
>>
>> Best regards,
>> JING ZHANG
>>
>> 刘建刚  于2021年6月8日周二 上午10:01写道:
>>
>>> Thanks Xintong Song for the detailed supplement. Since flink is
>>> long-running, it is similar to many services. So interacting with it or
>>> controlling it is a common desire

Re: Add control mode for flink

2021-06-07 Thread Xintong Song
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went
public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what
we truly want to do from my side is to make the concept of "control flow"
explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of
Flink. One of their custom features is allowing dynamically changing
operator behaviors via the REST APIs. He's willing to contribute this
feature to the community, and came to Yun Gao and me for suggestions. After
discussion, we feel that the underlying question to be answered is how do
we model the control flow in Flink. Dynamically controlling jobs via REST
API can be one of the features built on top of the control flow, and there
could be others.

## Control flow
Control flow refers to the communication channels for sending
events/signals to/between tasks/operators, that changes Flink's behavior in
a way that may or may not affect the computation logic. Typical control
events/signals Flink currently has are watermarks and checkpoint barriers.

In general, for modeling control flow, the following questions should be
considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the
control flow work together with the exact-once consistency.

1) & 2) may vary depending on the use cases, while 3) & 4) probably share
many things in common. A unified control flow model would help deduplicate
the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by
all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST
command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning,
but may potentially benefit from the control flow model. (Please correct me
if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal
downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for
indicating the end of each mini-batch, which makes it tricky to deal with
event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it
would be helpful to have specific events signaling that a reloading is
finished.
  * Bootstrap dimension table join: This is similar to the previous one. In
cases where we want to fully load the dimension table before starting
joining the mainstream, it would be helpful to have an event signaling the
finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think
it's quite convenient. Currently, to dynamically change the behavior of an
operator, we need to set up a separate source for the control events and
leverage broadcast state. Being able to send the events via REST APIs
definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible
approach. The reason we are in favor of introducing the control flow is
that:
- It benefits not only this specific dynamic controlling feature, but
potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚  wrote:

> Thank you for the reply. I have checked the post you mentioned. The
> dynamic config may be useful sometimes. But it is hard to keep data
> consistent in flink, for example, what if the dynamic config will take
> effect when failover. Since dynamic config is a desire for users, maybe
> flink can support it in some way.
>
> For the control mode, dynamic config is just one of the control modes. In
> the google doc, I have list some other cases. For example, control events
> are generated in operators or external services. Besides user's dynamic
> config, flink system can support some common dynamic configuration, like
> qps limit, checkpoint control and so on.
>
> It needs good design to handle the control mode structure. Based on that,
> other control features can be added easily later, like changing log level
> when job is running. In the end, flink will not just process data, but also
> interact with users to receive control events like a service.
>
> Steven Wu  于2021年6月4日周五 下午11:11写道:
>
>> I am not sure if we should solve this problem in Flink. This is more like
>>

Re: In native k8s application mode, how can I know whether the job is failed or finished?

2021-06-03 Thread Xintong Song
There are two ways to access the status of a job after it is finished.

1. You can try native k8s deployment in session mode. When jobs are
finished in this mode, TMs will be automatically released after a
short period of time, while JM will not be terminated until you explicitly
shutdown the session cluster. Thus, status of historical jobs can be
accessed via the JM.

2. You can try setting up a history server [1], where information of
finished jobs can be archived.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/advanced/historyserver/

On Thu, Jun 3, 2021 at 2:46 PM 刘逍  wrote:

> Hi,
>
> We are currently using Flink 1.6 standalone mode, but the lack of
> isolation is a headache for us. At present, I am trying application mode
> of Flink 1.13.0 on native K8s.
>
> I found that as soon as the job ends, whether it ends normally or
> abnormally, the jobmanager can no longer be accessed, so the "flink
> list" command cannot get the final state of the job.
>
> K8s pod will also be deleted immediately, "kubectl get pod" can only see
> "running", "terminating", and then "not found".
>
> The Flink job needs to be managed by our internal scheduling system, so
> I need to find a way to let the scheduling system know whether the job
> ends normally or abnormally.
>
> Is there any way?
>


Re: yarn ship from s3

2021-05-26 Thread Xintong Song
Hi Vijay,

Currently, Flink only supports shipping files from the local machine where
job is submitted.

There are tickets [1][2][3] tracking the efforts that shipping files from
remote paths, e.g., http, hdfs, etc. Once the efforts are done, adding s3
as an additional supported schema should be straightforward.

Unfortunately, these efforts are still in progress, and are more or less
staled recently.

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-20681
[2] https://issues.apache.org/jira/browse/FLINK-20811
[3] https://issues.apache.org/jira/browse/FLINK-20867

On Thu, May 27, 2021 at 12:23 AM Vijayendra Yadav 
wrote:

> Hi Pohl,
>
> I tried to ship my property file. Example: *-yarn.ship-files
> s3://applib/xx/xx/1.0-SNAPSHOT/application.properties  \*
>
>
> *Error:*
>
> 6:21:37.163 [main] ERROR org.apache.flink.client.cli.CliFrontend - Invalid
> command line arguments.
> org.apache.flink.client.cli.CliArgsException: Could not build the program
> from JAR file: JAR file does not exist: -yarn.ship-files
> at
> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:244)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:223)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_292]
> at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_292]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
> [hadoop-common-2.10.0-amzn-0.jar:?]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> Caused by: java.io.FileNotFoundException: JAR file does not exist:
> -yarn.ship-files
> at
> org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:740)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:717)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:242)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> ... 8 more
> Could not build the program from JAR file: JAR file does not exist:
> -yarn.ship-files
>
>
> *Thanks,*
>
> *Vijay*
>
> On Tue, May 25, 2021 at 11:58 PM Matthias Pohl 
> wrote:
>
>> Hi Vijay,
>> have you tried yarn-ship-files [1] or yarn-ship-archives [2]? Maybe,
>> that's what you're looking for...
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-files
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-archives
>>
>> On Tue, May 25, 2021 at 5:56 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Piotr,
>>>
>>> I have been doing the same process as you mentioned so far, now I am
>>> migrating the deployment process using AWS CDK and AWS Step Functions, kind
>>> of like the CICD process.
>>> I added a download step of jar and configs (1, 2, 3 and 4) from S3 using
>>> command-runner.jar (AWS Step); it loaded that into one of the Master nodes
>>> (out of 3). In the next step when I launched Flink Job it would not find
>>> build because Job is launched in some other yarn node.
>>>
>>> I was hoping just like *Apache spark *where whatever files we provide
>>> in *--file*s are shipped to yarn (s3 to yarn workfirectory), Flink
>>> should also have a solution.
>>>
>>> Thanks,
>>> Vijay
>>>
>>>
>>> On Tue, May 25, 2021 at 12:50 AM Piotr Nowojski 
>>> wrote:
>>>
>>>> Hi Vijay,
>>>>
>>>> I'm not sure if I understand your question correctly. You have jar and
>>>> configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
>>>> those? Can you simply download those things (whole directory containing
>>>> those) to the machine that will be starting the Flink job?
>>>>
>>>> Best, Piotrek
>>>>
>>>> wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
>>>> napisał(a):
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I am trying to find a way to ship files from aws s3 for a flink
>>>>> streaming job, I am running on AWS EMR. What i need to ship are following:
>>>>> 1) application jar
>>>>> 2) application property file
>>>>> 3) custom flink-conf.yaml
>>>>> 4) log4j application specific
>>>>>
>>>>> Please let me know options.
>>>>>
>>>>> Thanks,
>>>>> Vijay
>>>>
>>>>


Re: reactive mode and back pressure

2021-05-17 Thread Xintong Song
Yes, it does.
Internally, each re-scheduling is performed as stop-and-resume the job,
similar to a failover. Without checkpoints, the job will always restore
from the very beginning.

Thank you~

Xintong Song



On Mon, May 17, 2021 at 2:54 PM Alexey Trenikhun  wrote:

> Hi Xintong,
> Does reactive mode need checkpoint for re-scheduling ?
>
> Thanks,
> Alexey
>
> ------
> *From:* Xintong Song 
> *Sent:* Sunday, May 16, 2021 7:30:15 PM
> *To:* Flink User Mail List 
> *Subject:* Re: reactive mode and back pressure
>
> Hi Alexey,
>
> I don't think the new reactive mode makes any changes to the
> checkpoint/savepoint mechanism, at least not at the moment.
>
> However, you might want to take a look at the unaligned checkpoint [1].
> The unaligned checkpoint is designed to be tolerant with back pressure.
> AFAIK, this can work with both the default and the new reactive modes.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints
>
>
>
> On Fri, May 14, 2021 at 11:29 PM Alexey Trenikhun  wrote:
>
> Hello,
>
> Is new reactive mode can operate under back pressure? Old manual rescaling
> via taking savepoint didn't work with system under back pressure, since it
> was practically impossible to take savepoint, so wondering is reactive mode
> expected to be better in this regards ?
>
> Thanks,
> Alexey
>
>


Re: The heartbeat of JobManager timed out

2021-05-16 Thread Xintong Song
Hi Alexey & Smile,

JM & RM are located in the same process, thus it's unlikely a network
issue. Such timeouts are usually caused by one of the two endpoints not
responding timely.

Some common causes:
- The process is under severe GC pressure. You can check the GC logs for
the pressure.
- Insufficient CPU resource. You may check the cpu workload of the physical
machine (standalone) or pod/container (K8s/Yarn).
- Busy RPC main thread. Even if there's sufficient CPU resources (multiple
cores), the processing capacity can be limited by the single-pointed RPC
main threads. This is usually observed for large scale jobs (in terms of
number of vertices and parallelism). In that case, we would have to
increase the heartbeat timeout.

Thank you~

Xintong Song



On Mon, May 17, 2021 at 11:12 AM Smile  wrote:

> JM log shows this:
>
> INFO  org.apache.flink.yarn.YarnResourceManager - The
> heartbeat of JobManager with id 41e3ef1f248d24ddefdccd1887947106 timed out.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: reactive mode and back pressure

2021-05-16 Thread Xintong Song
Hi Alexey,

I don't think the new reactive mode makes any changes to the
checkpoint/savepoint mechanism, at least not at the moment.

However, you might want to take a look at the unaligned checkpoint [1]. The
unaligned checkpoint is designed to be tolerant with back pressure. AFAIK,
this can work with both the default and the new reactive modes.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints



On Fri, May 14, 2021 at 11:29 PM Alexey Trenikhun  wrote:

> Hello,
>
> Is new reactive mode can operate under back pressure? Old manual rescaling
> via taking savepoint didn't work with system under back pressure, since it
> was practically impossible to take savepoint, so wondering is reactive mode
> expected to be better in this regards ?
>
> Thanks,
> Alexey
>


Re: How does JobManager terminate dangling task manager

2021-05-13 Thread Xintong Song
Hi narasimha,

For each TaskManager, there are two kinds of connections to the JobManager
process.
- One single connection to the ResourceManager, which allows RM to monitor
the slots' availability and assign them to Flink jobs.
- Connections to each JobMaster that the slots of this TM are assigned to.

Upon the JobMaster-TM disconnection, all tasks running on the TM that are
from the corresponding job are failed immediately. Take the Kafka source as
an example, that's where the task stops consuming data from Kafka.
Upon the RM-TM disconnection, TM kills itself if it cannot reconnect to the
RM within a certain time.
Since JobMaster and RM are in the same process, when one of the two
connections breaks, the other usually also breaks. In cases not, RM-TM
disconnection does not fail the running tasks, until the reconnection
timeout.

As for failover consistency, that is guaranteed by the checkpointing
mechanism. The new task does not resume from the exact position where the
old task is stopped. Instead, it resumes from the last successful
checkpoint.

Thank you~

Xintong Song



On Thu, May 13, 2021 at 5:38 PM Guowei Ma  wrote:

> Hi,
> In fact, not only JobManager(ResoruceManager) will kill TimeOut's
> TaskManager, but if TaskManager finds that it cannot connect to
> JobManager(ResourceManager), it will also exit by itself.
> You can look at the time period during which the HB timeout occurred and
> what happened in the log. Under normal circumstances, I also look at what
> the GC situation was like at that time.
> Best,
> Guowei
>
>
> On Thu, May 13, 2021 at 11:06 AM narasimha  wrote:
>
>> Hi,
>>
>> Trying to understand how JobManager. kills TaskManager that didn't
>> respond for heartbeat after a certain time.
>>
>> For example:
>>
>> If a network connection b/w JobManager and TaskManager is lost for some
>> reasons, the JobManager will bring up another Taskmanager post
>> hearbeat timeout.
>> In such a case, how does JobManager make sure all connections like to
>> Kafka from lost Taskmanager are cut down and the new one will take from a
>> certain consistent point.
>>
>> Also want to learn ways to debug what caused the timeout, our job fairly
>> handles 5k records/s, not a heavy traffic job.
>> --
>> A.Narasimha Swamy
>>
>


Re: Question regarding cpu limit config in Flink standalone mode

2021-05-06 Thread Xintong Song
Hi Fan,

For a java application, you cannot specify how many cpu a process should
use. The JVM process will always try to use as much cpu time as it needs.
The limitation can only come from external: hardware limit, OS scheduling,
cgroups, etc.

On Kubernetes, it is the pod's resource specifications that decide how many
cpu resources a Flink JM/TM can use.
- For the standalone kubernetes deployment, you can specify the pods'
resources in your yaml files.
- For the native kubernetes deployment, TM pods are requested by Flink's
ResourceManager. Thus, the configuration option `kubernets.taskmanager.cpu`
controls the cpu resource of pods Flink requests from Kubernetes.

Thank you~

Xintong Song



On Fri, May 7, 2021 at 10:35 AM Fan Xie  wrote:

> Hi Flink Community,
>
> Recently I am working on an auto-scaling project that needs to dynamically
> adjust the cpu config of Flink standalone jobs . Our jobs will be running
> on *standalone* mode in a k8s cluster. After going through the
> configuration doc:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/,
> I can't find a config that can directly control the cpu of a standalone
> flink job. I can only see *kubernetes.taskmanager.cpu*, but looks like
> this config is only useful in native k8s mode. I also notice another
> config: *taskmanager.numberOfTaskSlots* that can control the cpu config
> in an indirect way. Is there any reason why we can't config the cpu for a
> standalone job directly?
>
> Thanks for answering my question.
>
> Best,
> Fan
>
>


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-05 Thread Xintong Song
Thanks Dawid & Guowei as the release managers, and everyone who has
contributed to this release.


Thank you~

Xintong Song



On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:

> Thanks Dawid & Guowei for the great work, thanks everyone involved.
>
> Best,
> Leonard
>
> 在 2021年5月5日,17:12,Theo Diefenthal  写道:
>
> Thanks for managing the release. +1. I like the focus on improving
> operations with this version.
>
> --
> *Von: *"Matthias Pohl" 
> *An: *"Etienne Chauchot" 
> *CC: *"dev" , "Dawid Wysakowicz" <
> dwysakow...@apache.org>, "user" ,
> annou...@apache.org
> *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
> *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
>
> Yes, thanks for managing the release, Dawid & Guowei! +1
>
> On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
> wrote:
>
>> Congrats to everyone involved !
>>
>> Best
>>
>> Etienne
>> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.13.0.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349287
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Guowei & Dawid
>>
>>
>
>


Re: [ANNOUNCE] Flink Jira Bot fully live (& Useful Filters to Work with the Bot)

2021-04-22 Thread Xintong Song
Thanks for driving this, Konstantin.
Great job~!

Thank you~

Xintong Song



On Thu, Apr 22, 2021 at 11:57 PM Matthias Pohl 
wrote:

> Thanks for setting this up, Konstantin. +1
>
> On Thu, Apr 22, 2021 at 11:16 AM Konstantin Knauf 
> wrote:
>
>> Hi everyone,
>>
>> all of the Jira Bot rules are live now. Particularly in the beginning the
>> Jira Bot will be very active, because the rules apply to a lot of old,
>> stale tickets. So, if you get a huge amount of emails from the Flink Jira
>> Bot right now, this will get better. In any case, the Flink Jira Bot (or
>> the rules that it implements) demand some changes to how we work with Jira.
>>
>> Here are a few points to make this transition easier for us:
>>
>> *1) Retrospective*
>>
>> In 3-4 weeks I would like to collect feedback. What is working well? What
>> is not working well or getting in your way? Is the bot moving us closer to
>> the goals mentioned in the initial email? Specifically, the
>> initial parameterization [1] of the bot was kind of an educated guess. I
>> will open a [DISCUSS]ion thread to collect feedback and proposals for
>> changes around that time.
>>
>> *2) Use Sub-Tasks*
>>
>> The bot will ask you for an update on assigned tickets after quite a
>> short time for Flink standards. If you are working on a ticket that takes
>> longer, consider splitting it up in Sub-Tasks. Activity on Sub-Tasks counts
>> as activity for the parent ticket, too. So, as long as any subtask is
>> moving along you won't be nagged by the bot.
>>
>>
>> *3) Useful Filters*
>>
>> You've probably received a lot of emails already, in particular if you
>> are watching many tickets. Here are a few JIRA filters to find the tickets,
>> that are probably most important to you and have been updated by the bot:
>>
>> Tickets that *you are assigned to*, which are "stale-assigned"
>>
>> https://issues.apache.org/jira/issues/?filter=12350499
>>
>> Tickets that *you reported*, which are stale in anyway:
>>
>> https://issues.apache.org/jira/issues/?filter=12350500
>>
>> If you are a maintainer of some components, you might find the following
>> filters useful (replace with your own components):
>>
>> *All tickets that are about to be closed*
>> project = FLINK AND component in ("Build System", "BuildSystem / Shaded",
>> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
>> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
>> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
>> resolution = Unresolved AND labels in (stale-minor)
>>
>> *Bugs that are about to be deprioritized or closed*
>> project = FLINK AND type = BUG AND component in ("Build System",
>> "BuildSystem / Shaded", "Deployment / Kubernetes", "Deployment / Mesos",
>> "Deployment / YARN", flink-docker, "Release System", "Runtime /
>> Coordination", "Runtime / Metrics", "Runtime / Queryable State", "Runtime /
>> REST", Travis) AND resolution = Unresolved AND labels in (stale-major,
>> stale-blocker, stale-critical, stale-minor)
>>
>>
>> *Tickets that are stale-assigned, but already have a PR available*project
>> = FLINK AND component in ("Build System", "BuildSystem / Shaded",
>> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
>> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
>> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
>> resolution = Unresolved AND labels in (stale-assigned) AND labels in
>> (pull-request-available)
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1] https://github.com/apache/flink-jira-bot/blob/master/config.yaml
>>
>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>
>


Re: Clarification about Flink's managed memory and metric monitoring

2021-04-13 Thread Xintong Song
These metrics should also be available via REST.

You can check the original design doc [1] for which metrics the UI is using.

Thank you~

Xintong Song


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager

On Tue, Apr 13, 2021 at 9:08 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hi Xintong,
>
>
>
> Thanks for the info. Is there any way to access these metrics outside of
> the UI? I suppose Flink’s reporters might provide them, but will they also
> be available through the REST interface (or another interface)?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Xintong Song 
> *Sent:* Tuesday, 13 April 2021 14:30
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Clarification about Flink's managed memory and metric
> monitoring
>
>
>
> Hi Alexis,
>
>
>
> First of all, I strongly recommend not to look into the JVM metrics. These
> metrics are fetched directly from JVM and do not well correspond to Flink's
> memory configurations. They were introduced a long time ago and are
> preserved mostly for compatibility. IMO, they bring more confusion than
> convenience. In Flink-1.12, there is a newly designed TM metrics page in
> the web ui, which clearly shows how the metrics correspond to Flink's
> memory configurations (if any).
>
>
>
> Concerning your questions.
>
> 1. Yes, increasing framework/task off-heap memory sizes should increase
> the direct memory capacity. Increasing the network memory size should also
> do that.
>
> 2. When 'state.backend.rocksdb.memory.managed' is true, RocksDB uses
> managed memory. Managed memory is not measured by any JVM metrics. It's not
> managed by JVM, meaning that it's not limited by '-XX:MaxDirectMemorySize'
> and is not controlled by the garbage collectors.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Tue, Apr 13, 2021 at 7:53 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hello,
>
>
>
> I have a Flink TM configured with taskmanager.memory.managed.size: 1372m.
> There is a streaming job using RocksDB for checkpoints, so I assume some of
> this memory will indeed be used.
>
>
>
> I was looking at the metrics exposed through the REST interface, and I
> queried some of them:
>
>
>
> /taskmanagers/c3c960d79c1eb2341806bfa2b2d66328/metrics?get=Status.JVM.Memory.Heap.Committed,Status.JVM.Memory.NonHeap.Committed,Status.JVM.Memory.Direct.MemoryUsed
> | jq
>
> [
>
>   {
>
> "id": "Status.JVM.Memory.Heap.Committed",
>
> "value": "1652031488"
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.NonHeap.Committed",
>
> "value": "234291200"
>  223 MiB
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.Direct.MemoryUsed",
>
> "value": "375015427"
> 358 MiB
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.Direct.TotalCapacity",
>
> "value": "375063552"
> 358 MiB
>
>   }
>
> ]
>
>
>
> I presume direct memory is being used by Flink and its networking stack,
> as well as by the JVM itself. To be sure:
>
>
>
>1. Increasing "taskmanager.memory.framework.off-heap.size" or
>"taskmanager.memory.task.off-heap.size" should increase
>Status.JVM.Memory.Direct.TotalCapacity, right?
>2. I presume the native memory used by RocksDB cannot be tracked with
>these JVM metrics even if "state.backend.rocksdb.memory.managed" is true,
>right?
>
>
>
> Based on this question:
> https://stackoverflow.com/questions/30622818/off-heap-native-heap-direct-memory-and-native-memory,
> I imagine Flink/RocksDB either allocates memory completely independently of
> the JVM, or it uses unsafe. Since the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html#managed-memory)
> states that "Managed memory is managed by Flink and is allocated as native
> memory (off-heap)", I thought this native memory might show up as part of
> direct memory tracking, but I guess it doesn’t.
>
>
>
> Regards,
>
> Alexis.
>
>
>
>


Re: Clarification about Flink's managed memory and metric monitoring

2021-04-13 Thread Xintong Song
Hi Alexis,

First of all, I strongly recommend not to look into the JVM metrics. These
metrics are fetched directly from JVM and do not well correspond to Flink's
memory configurations. They were introduced a long time ago and are
preserved mostly for compatibility. IMO, they bring more confusion than
convenience. In Flink-1.12, there is a newly designed TM metrics page in
the web ui, which clearly shows how the metrics correspond to Flink's
memory configurations (if any).

Concerning your questions.
1. Yes, increasing framework/task off-heap memory sizes should increase the
direct memory capacity. Increasing the network memory size should also do
that.
2. When 'state.backend.rocksdb.memory.managed' is true, RocksDB uses
managed memory. Managed memory is not measured by any JVM metrics. It's not
managed by JVM, meaning that it's not limited by '-XX:MaxDirectMemorySize'
and is not controlled by the garbage collectors.

Thank you~

Xintong Song



On Tue, Apr 13, 2021 at 7:53 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hello,
>
>
>
> I have a Flink TM configured with taskmanager.memory.managed.size: 1372m.
> There is a streaming job using RocksDB for checkpoints, so I assume some of
> this memory will indeed be used.
>
>
>
> I was looking at the metrics exposed through the REST interface, and I
> queried some of them:
>
>
>
> /taskmanagers/c3c960d79c1eb2341806bfa2b2d66328/metrics?get=Status.JVM.Memory.Heap.Committed,Status.JVM.Memory.NonHeap.Committed,Status.JVM.Memory.Direct.MemoryUsed
> | jq
>
> [
>
>   {
>
> "id": "Status.JVM.Memory.Heap.Committed",
>
> "value": "1652031488"
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.NonHeap.Committed",
>
> "value": "234291200"
>  223 MiB
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.Direct.MemoryUsed",
>
> "value": "375015427"
> 358 MiB
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.Direct.TotalCapacity",
>
> "value": "375063552"
> 358 MiB
>
>   }
>
> ]
>
>
>
> I presume direct memory is being used by Flink and its networking stack,
> as well as by the JVM itself. To be sure:
>
>
>
>1. Increasing "taskmanager.memory.framework.off-heap.size" or
>"taskmanager.memory.task.off-heap.size" should increase
>Status.JVM.Memory.Direct.TotalCapacity, right?
>2. I presume the native memory used by RocksDB cannot be tracked with
>these JVM metrics even if "state.backend.rocksdb.memory.managed" is true,
>right?
>
>
>
> Based on this question:
> https://stackoverflow.com/questions/30622818/off-heap-native-heap-direct-memory-and-native-memory,
> I imagine Flink/RocksDB either allocates memory completely independently of
> the JVM, or it uses unsafe. Since the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html#managed-memory)
> states that "Managed memory is managed by Flink and is allocated as native
> memory (off-heap)", I thought this native memory might show up as part of
> direct memory tracking, but I guess it doesn’t.
>
>
>
> Regards,
>
> Alexis.
>
>
>


Re: Re: flink的cpu和内存资源分配

2021-04-12 Thread Xintong Song
你截图的日志也明确显示了各部分的内存大小分别是多少,heap 只是其中一部分,所有的加起来才是你配置的 1728m。

调整配置是可以让 TM 用到更多的内存,至于能否提升性能,那要看你的计算任务瓶颈是否在内存上。如果瓶颈在 cpu、io
甚至上游数据源,那一味调大内存也帮助不大。

Thank you~

Xintong Song



On Mon, Apr 12, 2021 at 10:32 AM penguin.  wrote:

> 谢谢!因为我是一个机器作为一个TM,flink配置文件中默认的taskmanager.memory.process.size
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#taskmanager-memory-process-size>
> 大小是1728m,然后日志里面显示堆内存512。
> 如果我把这个参数taskmanager.memory.process.size
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#taskmanager-memory-process-size>
> 调大一点比如4GB,是否会对任务执行的性能有所提升呢?
> 默认如下
> INFO [] - The derived from fraction jvm overhead memory (172.800mb (
> 181193935 bytes)) is less than its min value 192.000mb (201326592 bytes),
> min value will be used instead
> INFO [] - Final TaskExecutor Memory configuration:
> INFO [] - Total Process Memory: 1.688gb (1811939328 bytes)
> INFO [] - Total Flink Memory: 1.250gb (1342177280 bytes)
> INFO [] - Total JVM Heap Memory: 512.000mb (536870902 bytes)
> INFO [] - Framework: 128.000mb (134217728 bytes)
> INFO [] - Task: 384.000mb (402653174 bytes)
> INFO [] - Total Off-heap Memory: 768.000mb (805306378 bytes)
> INFO [] - Managed: 512.000mb (536870920 bytes)
> INFO [] - Total JVM Direct Memory: 256.000mb (268435458 bytes)
> INFO [] - Framework: 128.000mb (134217728 bytes)
> INFO [] - Task: 0 bytes
> INFO [] - Network: 128.000mb (134217730 bytes)
> INFO [] - JVM Metaspace: 256.000mb (268435456 bytes)
> INFO [] - JVM Overhead: 192.000mb (201326592 bytes)
>
> 调为4GB后:
>
>
>
>
>
> Penguin.
>
>
>
>
>
>
>
> 在 2021-04-12 10:04:32,"Xintong Song"  写道:
> >>
> >> 现在比如一个节点16核cpu 16g内存,4个slot;
> >
> >
> >你这里所说的节点,应该指的是 Flink TM 所在的物理机或虚拟机吧。
> >
> >你这里混淆了好几个概念
> >
> >- 节点、TM、slot 是三个不同层次的概念。16c16g 是针对节点的,4 slot 是针对 TM 的。一个节点是可以有多个 TM的。
> >
> >- TM 的内存不仅包括堆内存,还包括其他内存类型,因此 512M 不代表 TM 的内存大小。
> >
> >- TM 的 cpu 和内存是否会超用,很大程度上取决于你的运行环境。从 Flink 自身来看,Heap、Direct、Mataspace
> >这几种内存都是不会超用的,但是 Native 内存有一部分是有可能超用的,另外 CPU 也是有可能超用的。但是通常 K8s/Yarn
> >运行环境中都提供外围的资源限制,比如不允许资源超用或只允许一定比例的资源超用,这个要看具体的环境配置。
> >
> >
> >可以看一下内存模型与配置相关的几篇官方文档 [1]。
> >
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >[1]
> >https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/memory/mem_setup.html
> >
> >On Sun, Apr 11, 2021 at 9:16 PM penguin.  wrote:
> >
> >> 得知flink的内存是隔离的,cpu不能隔离;
> >> 现在比如一个节点16核cpu 16g内存,4个slot;
> >> 通过调试和日志,发现每个slot拥有1个cpu,那么4个slot就占用4个cpu核心。且堆内存为512M。
> >> 这样的话其他12个cpu核心以及那么大的内存是没有被使用然后浪费了吗?
> >>
> >>
> >> 期待回复,多谢!
>
>
>
>
>


Re: flink的cpu和内存资源分配

2021-04-11 Thread Xintong Song
>
> 现在比如一个节点16核cpu 16g内存,4个slot;


你这里所说的节点,应该指的是 Flink TM 所在的物理机或虚拟机吧。

你这里混淆了好几个概念

- 节点、TM、slot 是三个不同层次的概念。16c16g 是针对节点的,4 slot 是针对 TM 的。一个节点是可以有多个 TM的。

- TM 的内存不仅包括堆内存,还包括其他内存类型,因此 512M 不代表 TM 的内存大小。

- TM 的 cpu 和内存是否会超用,很大程度上取决于你的运行环境。从 Flink 自身来看,Heap、Direct、Mataspace
这几种内存都是不会超用的,但是 Native 内存有一部分是有可能超用的,另外 CPU 也是有可能超用的。但是通常 K8s/Yarn
运行环境中都提供外围的资源限制,比如不允许资源超用或只允许一定比例的资源超用,这个要看具体的环境配置。


可以看一下内存模型与配置相关的几篇官方文档 [1]。


Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/memory/mem_setup.html

On Sun, Apr 11, 2021 at 9:16 PM penguin.  wrote:

> 得知flink的内存是隔离的,cpu不能隔离;
> 现在比如一个节点16核cpu 16g内存,4个slot;
> 通过调试和日志,发现每个slot拥有1个cpu,那么4个slot就占用4个cpu核心。且堆内存为512M。
> 这样的话其他12个cpu核心以及那么大的内存是没有被使用然后浪费了吗?
>
>
> 期待回复,多谢!


Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-03-28 Thread Xintong Song
+1
It's already a matter of fact for a while that we no longer port new
features to the Mesos deployment.

Thank you~

Xintong Song



On Fri, Mar 26, 2021 at 10:37 PM Till Rohrmann  wrote:

> +1 for officially deprecating this component for the 1.13 release.
>
> Cheers,
> Till
>
> On Thu, Mar 25, 2021 at 1:49 PM Konstantin Knauf 
> wrote:
>
>> Hi Matthias,
>>
>> Thank you for following up on this. +1 to officially deprecate Mesos in
>> the code and documentation, too. It will be confusing for users if this
>> diverges from the roadmap.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Thu, Mar 25, 2021 at 12:23 PM Matthias Pohl 
>> wrote:
>>
>>> Hi everyone,
>>> considering the upcoming release of Flink 1.13, I wanted to revive the
>>> discussion about the Mesos support ones more. Mesos is also already
>>> listed
>>> as deprecated in Flink's overall roadmap [1]. Maybe, it's time to align
>>> the
>>> documentation accordingly to make it more explicit?
>>>
>>> What do you think?
>>>
>>> Best,
>>> Matthias
>>>
>>> [1] https://flink.apache.org/roadmap.html#feature-radar
>>>
>>> On Wed, Oct 28, 2020 at 9:40 AM Till Rohrmann 
>>> wrote:
>>>
>>> > Hi Oleksandr,
>>> >
>>> > yes you are right. The biggest problem is at the moment the lack of
>>> test
>>> > coverage and thereby confidence to make changes. We have some e2e tests
>>> > which you can find here [1]. These tests are, however, quite coarse
>>> grained
>>> > and are missing a lot of cases. One idea would be to add a Mesos e2e
>>> test
>>> > based on Flink's end-to-end test framework [2]. I think what needs to
>>> be
>>> > done there is to add a Mesos resource and a way to submit jobs to a
>>> Mesos
>>> > cluster to write e2e tests.
>>> >
>>> > [1] https://github.com/apache/flink/tree/master/flink-jepsen
>>> > [2]
>>> >
>>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi <
>>> > o.nitavs...@criteo.com> wrote:
>>> >
>>> >> Hello Xintong,
>>> >>
>>> >> Thanks for the insights and support.
>>> >>
>>> >> Browsing the Mesos backlog and didn't identify anything critical,
>>> which
>>> >> is left there.
>>> >>
>>> >> I see that there are were quite a lot of contributions to the Flink
>>> Mesos
>>> >> in the recent version:
>>> >> https://github.com/apache/flink/commits/master/flink-mesos.
>>> >> We plan to validate the current Flink master (or release 1.12 branch)
>>> our
>>> >> Mesos setup. In case of any issues, we will try to propose changes.
>>> >> My feeling is that our test results shouldn't affect the Flink 1.12
>>> >> release cycle. And if any potential commits will land into the 1.12.1
>>> it
>>> >> should be totally fine.
>>> >>
>>> >> In the future, we would be glad to help you guys with any
>>> >> maintenance-related questions. One of the highest priorities around
>>> this
>>> >> component seems to be the development of the full e2e test.
>>> >>
>>> >> Kind Regards
>>> >> Oleksandr Nitavskyi
>>> >> 
>>> >> From: Xintong Song 
>>> >> Sent: Tuesday, October 27, 2020 7:14 AM
>>> >> To: dev ; user 
>>> >> Cc: Piyush Narang 
>>> >> Subject: [BULK]Re: [SURVEY] Remove Mesos support
>>> >>
>>> >> Hi Piyush,
>>> >>
>>> >> Thanks a lot for sharing the information. It would be a great relief
>>> that
>>> >> you are good with Flink on Mesos as is.
>>> >>
>>> >> As for the jira issues, I believe the most essential ones should have
>>> >> already been resolved. You may find some remaining open issues here
>>> [1],
>>> >> but not all of them are necessary if we decide to keep Flink on Mesos
>>> as is.
>>> >>
>>> >> At the moment and in the short future, I think helps are mostly
>>> needed on
&

Re: flink 使用yarn部署,报错:Maximum Memory: 8192 Requested: 10240MB. Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values

2021-03-21 Thread Xintong Song
报错信息里已经说明了:你的 Yarn 集群配置允许的最大 container 是 2g,而你的 flink 配置的 TM 大小是 10g。

Thank you~

Xintong Song



On Sat, Mar 20, 2021 at 7:52 PM william <712677...@qq.com> wrote:

> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn session cluster
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:425)
> at
>
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:606)
> at
>
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:860)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
>
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:860)
> Caused by:
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
> cluster does not have the requested resources for the TaskManagers
> available!
> Maximum Memory: 8192 Requested: 10240MB. Please check the
> 'yarn.scheduler.maximum-allocation-mb' and the
> 'yarn.nodemanager.resource.memory-mb' configuration values
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Evenly Spreading Out Source Tasks

2021-03-15 Thread Xintong Song
If all the tasks have the same parallelism 36, your job should only
allocate 36 slots. The evenly-spread-out-slots option should help in your
case.

Is it possible for you to share the complete jobmanager logs?


Thank you~

Xintong Song



On Tue, Mar 16, 2021 at 12:46 AM Aeden Jameson 
wrote:

> Hi Xintong,
>
> Thanks for replying.  Yes, you understood my scenario. Every task
> has the same parallelism since we're using FlinkSql unless there is a
> way to change the parallelism of the source task that I have missed.
> Your explanation of the setting makes sense and is what I ended up
> concluding. Assuming one can't change the parallelism of FlinkSQL
> tasks other than the sink-parallelism option I've concluded when using
> FlinkSQL that have to plan at the cluster level. e.g. Reduce the task
> slots, increase the partitions, reduce the TM's (possibily making them
> bigger) etc...
>
> Aeden
>
> On Sun, Mar 14, 2021 at 10:41 PM Xintong Song 
> wrote:
> >
> > Hi Aeden,
> >
> > IIUC, the topic being read has 36 partitions means that your source task
> has a parallelism of 36. What's the parallelism of other tasks? Is the job
> taking use of all the 72 (18 TMs * 4 slots/TM) slots?
> >
> > I'm afraid currently there's no good way to guarantee subtasks of a task
> are spread out evenly.
> >
> > The configuration option you mentioned makes sure slots are allocated
> from TMs evenly, it does not affect how tasks are distributed over the
> allocated slots.
> > E.g., say your job has two tasks A & B, with parallelism 36 & 54
> respectively. That means, with the default slot sharing strategy, your job
> needs 54 slots in total to be executed. With the configuration enabled, it
> is guaranteed that for each TM 3 slots are occupied. For B (parallelism
> 54), there's a subtask deployed in each slot, thus 3 subtasks on each TM.
> As for A, there're only 36 slots containing a subtask of it, and there's no
> guarantee which 36 out of the 54 contain it.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Mar 15, 2021 at 3:54 AM Chesnay Schepler 
> wrote:
> >>
> >> Is this a brand-new job, with the cluster having all 18 TMs at the time
> >> of submission? (or did you add more TMs while the job was running)
> >>
> >> On 3/12/2021 5:47 PM, Aeden Jameson wrote:
> >> > Hi Matthias,
> >> >
> >> > Yes, all the task managers have the same hardware/memory
> configuration.
> >> >
> >> > Aeden
> >> >
> >> > On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl 
> wrote:
> >> >> Hi Aeden,
> >> >> just to be sure: All task managers have the same hardware/memory
> configuration, haven't they? I'm not 100% sure whether this affects the
> slot selection in the end, but it looks like this parameter has also an
> influence on the slot matching strategy preferring slots with less
> utilization of resources [1].
> >> >>
> >> >> I'm gonna add Chesnay to the thread. He might have more insights
> here. @Chesnay are there any other things that might affect the slot
> selection when actually trying to evenly spread out the slots?
> >> >>
> >> >> Matthias
> >> >>
> >> >> [1]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
> >> >>
> >> >> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson <
> aeden.jame...@gmail.com> wrote:
> >> >>> Hi Arvid,
> >> >>>
> >> >>>Thanks for responding. I did check the configuration tab of the
> job
> >> >>> manager and the setting cluster.evenly-spread-out-slots: true is
> >> >>> there. However I'm still observing unevenness in the distribution of
> >> >>> source tasks. Perhaps this additional information could shed light.
> >> >>>
> >> >>> Version: 1.12.1
> >> >>> Deployment Mode: Application
> >> >>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
> >> >>> Flink operator https://github.com/lyft/flinkk8soperator
> >> >>>
> >> >>> I did place the setting under the flinkConfig section,
> >> >>>
> >> >>> apiVersion: flink.k8s.io/v1beta1
> >> >>> 
> >> >>> spec:
> >> >>>flinkConfig:
> >> >>>  cluste

Re: Evenly Spreading Out Source Tasks

2021-03-14 Thread Xintong Song
Hi Aeden,

IIUC, the topic being read has 36 partitions means that your source task
has a parallelism of 36. What's the parallelism of other tasks? Is the job
taking use of all the 72 (18 TMs * 4 slots/TM) slots?

I'm afraid currently there's no good way to guarantee subtasks of a task
are spread out evenly.

The configuration option you mentioned makes sure slots are allocated from
TMs evenly, it does not affect how tasks are distributed over the allocated
slots.
E.g., say your job has two tasks A & B, with parallelism 36 & 54
respectively. That means, with the default slot sharing strategy, your job
needs 54 slots in total to be executed. With the configuration enabled, it
is guaranteed that for each TM 3 slots are occupied. For B (parallelism
54), there's a subtask deployed in each slot, thus 3 subtasks on each TM.
As for A, there're only 36 slots containing a subtask of it, and there's no
guarantee which 36 out of the 54 contain it.

Thank you~

Xintong Song



On Mon, Mar 15, 2021 at 3:54 AM Chesnay Schepler  wrote:

> Is this a brand-new job, with the cluster having all 18 TMs at the time
> of submission? (or did you add more TMs while the job was running)
>
> On 3/12/2021 5:47 PM, Aeden Jameson wrote:
> > Hi Matthias,
> >
> > Yes, all the task managers have the same hardware/memory configuration.
> >
> > Aeden
> >
> > On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl 
> wrote:
> >> Hi Aeden,
> >> just to be sure: All task managers have the same hardware/memory
> configuration, haven't they? I'm not 100% sure whether this affects the
> slot selection in the end, but it looks like this parameter has also an
> influence on the slot matching strategy preferring slots with less
> utilization of resources [1].
> >>
> >> I'm gonna add Chesnay to the thread. He might have more insights here.
> @Chesnay are there any other things that might affect the slot selection
> when actually trying to evenly spread out the slots?
> >>
> >> Matthias
> >>
> >> [1]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
> >>
> >> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson 
> wrote:
> >>> Hi Arvid,
> >>>
> >>>Thanks for responding. I did check the configuration tab of the job
> >>> manager and the setting cluster.evenly-spread-out-slots: true is
> >>> there. However I'm still observing unevenness in the distribution of
> >>> source tasks. Perhaps this additional information could shed light.
> >>>
> >>> Version: 1.12.1
> >>> Deployment Mode: Application
> >>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
> >>> Flink operator https://github.com/lyft/flinkk8soperator
> >>>
> >>> I did place the setting under the flinkConfig section,
> >>>
> >>> apiVersion: flink.k8s.io/v1beta1
> >>> 
> >>> spec:
> >>>flinkConfig:
> >>>  cluster.evenly-spread-out-slots: true
> >>>  high-availability: zookeeper
> >>>  ...
> >>>  state.backend: filesystem
> >>>  ...
> >>>jobManagerConfig:
> >>>  envConfig:
> >>>  
> >>>
> >>> Would you explain how the setting ends up evenly distributing active
> >>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
> >>> TM3 ... TM18 in order and starting again. In my case I have 36
> >>> partitions and 18 nodes so after the second pass in assignment I would
> >>> end up with 2 subtasks in the consumer group on each TM. And then
> >>> subsequent passes result in inactive consumers.
> >>>
> >>>
> >>> Thank you,
> >>> Aeden
> >>>
> >>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise  wrote:
> >>>> Hi Aeden,
> >>>>
> >>>> the option that you mentioned should have actually caused your
> desired behavior. Can you double-check that it's set for the job (you can
> look at the config in the Flink UI to be 100% sure).
> >>>>
> >>>> Another option is to simply give all task managers 2 slots. In that
> way, the scheduler can only evenly distribute.
> >>>>
> >>>> On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <
> aeden.jame...@gmail.com> wrote:
> >>>>>  I have a cluster with 18 task managers 4 task slots each
> running a
> >>>>> job whose source/sink(s) are declared with FlinkSQL using the Kafka
> >>>>> connector. The topic being read has 36 partitions. The problem I'm
> >>>>> observing is that the subtasks for the sources are not evenly
> >>>>> distributed. For example, 1 task manager will have 4 active source
> >>>>> subtasks and other TM's none. Is there a way to force  each task
> >>>>> manager to have 2 active source subtasks.  I tried using the setting
> >>>>> cluster.evenly-spread-out-slots: true , but that didn't have the
> >>>>> desired effect.
> >>>>>
> >>>>> --
> >>>>> Thank you,
> >>>>> Aeden
>
>
>


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-07 Thread Xintong Song
Hi Hemant,
I don't see any problem in your settings. Any exceptions suggesting why TM
containers are not coming up?

Thank you~

Xintong Song



On Sat, Mar 6, 2021 at 3:53 PM bat man  wrote:

> Hi Xintong Song,
> I tried using the java options to generate heap dump referring to docs[1]
> in flink-conf.yaml, however after adding this the task manager containers
> are not coming up. Note that I am using EMR. Am i doing anything wrong here?
>
> env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/tmp/dump.hprof"
>
> Thanks,
> Hemant
>
>
>
>
>
> On Fri, Mar 5, 2021 at 3:05 PM Xintong Song  wrote:
>
>> Hi Hemant,
>>
>> This exception generally suggests that JVM is running out of heap memory.
>> Per the official documentation [1], the amount of live data barely fits
>> into the Java heap having little free space for new allocations.
>>
>> You can try to increase the heap size following these guides [2].
>>
>> If a memory leak is suspected, to further understand where the memory is
>> consumed, you may need to dump the heap on OOMs and looking for unexpected
>> memory usages leveraging profiling tools.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks002.html
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html
>>
>>
>>
>> On Fri, Mar 5, 2021 at 4:24 PM bat man  wrote:
>>
>>> Hi,
>>>
>>> Getting the below OOM but the job failed 4-5 times and recovered from
>>> there.
>>>
>>> j
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
>>> exceededat
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)Caused by:
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>>
>>> Is there any way I can debug this. since the job after a few re-starts
>>> started running fine. what could be the reason behind this.
>>>
>>> Thanks,
>>> Hemant
>>>
>>


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-05 Thread Xintong Song
Hi Hemant,

This exception generally suggests that JVM is running out of heap memory.
Per the official documentation [1], the amount of live data barely fits
into the Java heap having little free space for new allocations.

You can try to increase the heap size following these guides [2].

If a memory leak is suspected, to further understand where the memory is
consumed, you may need to dump the heap on OOMs and looking for unexpected
memory usages leveraging profiling tools.

Thank you~

Xintong Song


[1]
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks002.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html



On Fri, Mar 5, 2021 at 4:24 PM bat man  wrote:

> Hi,
>
> Getting the below OOM but the job failed 4-5 times and recovered from
> there.
>
> j
>
>
>
>
>
>
>
> *ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
> exceededat
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)Caused by:
> java.lang.OutOfMemoryError: GC overhead limit exceeded*
>
> Is there any way I can debug this. since the job after a few re-starts
> started running fine. what could be the reason behind this.
>
> Thanks,
> Hemant
>


Re: yarn.containers.vcores使用问题

2021-03-04 Thread Xintong Song
你的 flink 是什么版本?
部署模式是 per-job 还是 session?
“看到任务配置参数也生效了”具体是在哪里看到的?

Thank you~

Xintong Song



On Thu, Mar 4, 2021 at 4:35 PM 阿华田  wrote:

> 使用-yD yarn.containers.vcores=4
> 区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu  slot一比一申请的
> 各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: Scaling Higher than 10k Nodes

2021-03-04 Thread Xintong Song
Hi Joey,

Quick question: by *nodes*, do you mean Flink task manager processes, or
physical/virtual machines (like ecs, yarn NM)?

In our production, we run flink workloads on several Yarn/Kubernetes
clusters, where each cluster typically has 2k~5k machines. Most Flink
workloads are deployed in single-job mode, giving us thousands (sometimes
more than 10k) of flink instances concurrently running on each cluster. In
this way, the scale of each flink instance is usually not extremely large
(less than 1000 TMs), and we rely on the power of Yarn/Kubernetes to deal
with the large number of instances.

There're also cases that a single flink job is extremely large. We had a
batch workload from last year's double-11 event, with 8k max per-stage
parallelism and up to 30k task managers running at the same time. At that
scale, we run into problems with the single-point JobManager process, such
as tremendous memory consumption, buzy rpc main thread, etc. To make that
case work, we did many optimizations on our internal flink version, which
we are trying to contribute to the community. See FLINK-21110 [1] for the
details.

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-21110

On Thu, Mar 4, 2021 at 3:39 PM Piotr Nowojski  wrote:

> Hi Joey,
>
> Sorry for not responding to your question sooner. As you can imagine there
> are not many users running Flink at such scale. As far as I know, Alibaba
> is running the largest/one of the largest clusters, I'm asking for someone
> who is familiar with those deployments to take a look at this conversation.
> I hope someone will respond here soon :)
>
> Best,
> Piotrek
>
> pon., 1 mar 2021 o 14:43 Joey Tran  napisał(a):
>
>> Hi, I was looking at Apache Beam/Flink for some of our data processing
>> needs, but when reading about the resource managers
>> (YARN/mesos/Kubernetes), it seems like they all top out at around 10k
>> nodes. What are recommended solutions for scaling higher than this?
>>
>> Thanks in advance,
>> Joey
>>
>


Re: Flink problem

2021-02-19 Thread Xintong Song
What you're looking for might be Session Window[1].

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows

On Fri, Feb 19, 2021 at 7:35 PM ゞ野蠻遊戲χ  wrote:

> hi all
>
>  For example, if A user message A (uerId: 001) is reported, and no
> record is reported again by userId: 001 within 10 minutes, record A will be
> sent out. How can this be achieved in Flink?
>
> Thanks
> Jiazhi
>
> --
> 发自我的iPhone
>


Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-02 Thread Xintong Song
>
> How is the memory measured?

I meant which flink or k8s metric is collected? I'm asking because
depending on which metric is used, the *container memory usage* can be
defined differently. E.g., whether mmap memory is included.

Also, could you share the effective memory configurations for the
taskmanagers? You should find something like the following at the
beginning of taskmanger logs.

INFO  [] - Final TaskExecutor Memory configuration:
> INFO  [] -   Total Process Memory:  1.688gb (1811939328 bytes)
> INFO  [] - Total Flink Memory:  1.250gb (1342177280 bytes)
> INFO  [] -   Total JVM Heap Memory: 512.000mb (536870902 bytes)
> INFO  [] - Framework:   128.000mb (134217728 bytes)
> INFO  [] - Task:384.000mb (402653174 bytes)
> INFO  [] -   Total Off-heap Memory: 768.000mb (805306378 bytes)
> INFO  [] - Managed: 512.000mb (536870920 bytes)
> INFO  [] - Total JVM Direct Memory: 256.000mb (268435458 bytes)
> INFO  [] -   Framework: 128.000mb (134217728 bytes)
> INFO  [] -   Task:  0 bytes
> INFO  [] -   Network:   128.000mb (134217730 bytes)
> INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
> INFO  [] - JVM Overhead:    192.000mb (201326592 bytes)


Thank you~

Xintong Song



On Tue, Feb 2, 2021 at 8:59 PM Randal Pitt  wrote:

> Hi Xintong Song,
>
> Correct, we are using standalone k8s. Task managers are deployed as a
> statefulset so have consistent pod names. We tried using native k8s (in
> fact
> I'd prefer to) but got persistent
> "io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 242214695 (242413759)" errors which resulted in jobs being
> restarted every 30-60 minutes.
>
> We are using Prometheus Node Exporter to capture memory usage. The graph
> shows the metric:
>
>
> sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
> by (pod_name)
>
> I've  attached the original
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png>
>
> so Nabble doesn't shrink it.
>
> Best regards,
>
> Randal.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-02 Thread Xintong Song
Hi Randal,
The image is too blurred to be clearly seen.
I have a few questions.
- IIUC, you are using the standalone K8s deployment [1], not the native K8s
deployment [2]. Could you confirm that?
- How is the memory measured?

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html



On Tue, Feb 2, 2021 at 7:24 PM Randal Pitt  wrote:

> Hi,
>
> We're running Flink 1.11.3 on Kubernetes. We have a job with parallelism of
> 10 running on 10 task managers each with 1 task slot. The job has 4 time
> windows with 2 different keys, 2 windows have reducers and 2 are processed
> by window functions. State is stored in RocksDB.
>
> We've noticed when a pod is restarted (say if the node it was on is
> restarted) the job restarts and the memory usage of the remaining 9 pods
> increases by roughly 1GB over the next 1-2 hours then stays at that level.
> If another pod restarts the remaining 9 increase in memory usage again.
> Eventually one or more pods reach the 6GB limit and are OOMKilled, leading
> to the job restarting and memory usage increasing again.
>
> If left it can lead to the situation where an OOMKill directly leads to an
> OOMKill which directly leads to another. At this point it requires manual
> intervention to resolve.
>
> I think it's exceedingly likely the excessive memory usage is in RocksDB
> rather than Flink, my question is whether there's anything we can do about
> the increase in memory usage after a failure?
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png>
>
>
> Best regards,
>
> Randal.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread Xintong Song
你之前的理解是正确的。Yarn 的 AM 就是 Flink 的 JM。

你看到的文档描述是有问题的。我查了一下 git history,你所摘录的内容 2014 年撰写的,描述的应该是项目初期的 on yarn
部署方式,早已经过时了。这部分内容在最新的 1.12 版本文档中已经被移除了。

Thank you~

Xintong Song



On Tue, Feb 2, 2021 at 6:43 PM lp <973182...@qq.com> wrote:

> 或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn
> ,ApplicationMaster对应的实现是啥?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-01-31 Thread Xintong Song
Hi Colletta,

This error is kind of expected if the JobMaster / ResourceManager does not
maintain a stable connection to the ZooKeeper service, which may be caused
by network issues, GC pause, or unstable ZK service availability.

By "similar issue", what I meant is I'm not aware of any issue related to
the upgrading of the ZK version that may cause the leadership loss.

Thank you~

Xintong Song



On Sun, Jan 31, 2021 at 4:14 AM Colletta, Edward 
wrote:

> “but I'm not aware of any similar issue reported since the upgrading”
>
> For the record, we experienced this same error on Flink 1.11.2 this past
> week.
>
>
>
> *From:* Xintong Song 
> *Sent:* Friday, January 29, 2021 7:34 PM
> *To:* user 
> *Subject:* Re: Flink 1.11 job hit error "Job leader lost leadership" or
> "ResourceManager leader changed to new address null"
>
>
>
> *This email is from an external source - **exercise caution regarding
> links and attachments. *
>
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Sat, Jan 30, 2021 at 8:27 AM Xintong Song 
> wrote:
>
> There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not
> aware of any similar issue reported since the upgrading.
>
> I would suggest the following:
>
> - Turn on the DEBUG log see if there's any valuable details
>
> - Maybe try asking in the Apache Zookeeper community, see if this is a
> known issue.
>
>
> Thank you~
> Xintong Song
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Jan 30, 2021 at 6:47 AM Lu Niu  wrote:
>
> Hi, Xintong
>
>
>
> Thanks for replying. Could it relate to the zk version? We are a platform
> team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9
> and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced
> in 1.11 jobs. That's why we think it is related to version upgrade.
>
>
>
> Best
>
> Lu
>
>
>
> On Thu, Jan 28, 2021 at 7:56 PM Xintong Song 
> wrote:
>
> The ZK client side uses 15s connection timeout and 60s session timeout
> in Flink. There's nothing similar to a heartbeat interval configured, which
> I assume is up to ZK's internal implementation. These things have not
> changed in FLink since at least 2017.
>
>
>
> If both ZK client and server complain about timeout, and there's no gc
> issue spotted, I would consider a network instability.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Fri, Jan 29, 2021 at 3:15 AM Lu Niu  wrote:
>
> After checking the log I found the root cause is zk client timeout on TM:
>
> ```
>
> 2021-01-25 14:01:49,600 WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f
> 2021-01-25 14:01:49,610 INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
> 2021-01-25 14:01:49,711 INFO
> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
> - State change: SUSPENDED
> 2021-01-25 14:01:49,711 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
> 27ac39342913d29baac4cde13062c4a4 with leader id
> b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
> 2021-01-25 14:01:49,712 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
> connection for job 27ac39342913d29baac4cde13062c4a4.
> 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Sink:
> USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
> (d5b5887e639874cb70d7fef939b957b7).
> 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task -
> Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
> (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: JobManager responsible for
> 27ac39342913d29baac4cde13062c4a4 lost the leadership.
>
> ```
>
>
>
> I checked that TM gc log, no gc issues. it also shows client timeout in
> zookeeper server log. How frequently the zk client sync 

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-01-29 Thread Xintong Song
Thank you~

Xintong Song



On Sat, Jan 30, 2021 at 8:27 AM Xintong Song  wrote:

> There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not
> aware of any similar issue reported since the upgrading.
> I would suggest the following:
> - Turn on the DEBUG log see if there's any valuable details
> - Maybe try asking in the Apache Zookeeper community, see if this is a
> known issue.
>
> Thank you~
> Xintong Song
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Jan 30, 2021 at 6:47 AM Lu Niu  wrote:
>
>> Hi, Xintong
>>
>> Thanks for replying. Could it relate to the zk version? We are a platform
>> team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9
>> and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced
>> in 1.11 jobs. That's why we think it is related to version upgrade.
>>
>> Best
>> Lu
>>
>> On Thu, Jan 28, 2021 at 7:56 PM Xintong Song 
>> wrote:
>>
>>> The ZK client side uses 15s connection timeout and 60s session timeout
>>> in Flink. There's nothing similar to a heartbeat interval configured, which
>>> I assume is up to ZK's internal implementation. These things have not
>>> changed in FLink since at least 2017.
>>>
>>> If both ZK client and server complain about timeout, and there's no gc
>>> issue spotted, I would consider a network instability.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, Jan 29, 2021 at 3:15 AM Lu Niu  wrote:
>>>
>>>> After checking the log I found the root cause is zk client timeout on
>>>> TM:
>>>> ```
>>>> 2021-01-25 14:01:49,600 WARN
>>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
>>>> session timed out, have not heard from server in 40020ms for sessionid
>>>> 0x404f9ca531a5d6f
>>>> 2021-01-25 14:01:49,610 INFO
>>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
>>>> session timed out, have not heard from server in 40020ms for sessionid
>>>> 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
>>>> 2021-01-25 14:01:49,711 INFO
>>>> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
>>>> - State change: SUSPENDED
>>>> 2021-01-25 14:01:49,711 WARN
>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>>>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>>>> ZooKeeper.
>>>> 2021-01-25 14:01:49,712 INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
>>>> 27ac39342913d29baac4cde13062c4a4 with leader id
>>>> b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
>>>> 2021-01-25 14:01:49,712 WARN
>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>>>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>>>> ZooKeeper.
>>>> 2021-01-25 14:01:49,712 INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
>>>> connection for job 27ac39342913d29baac4cde13062c4a4.
>>>> 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task
>>>> - Attempting to fail task externally Sink:
>>>> USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
>>>> (d5b5887e639874cb70d7fef939b957b7).
>>>> 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task
>>>> - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
>>>> (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
>>>> org.apache.flink.util.FlinkException: JobManager responsible for
>>>> 27ac39342913d29baac4cde13062c4a4 lost the leadership.
>>>> ```
>>>>
>>>> I checked that TM gc log, no gc issues. it also shows client timeout in
>>>> zookeeper server log. How frequently the zk client sync with server side in
>>>> flink? The log says client doesn't heartbeat to server for 40s. Any help?
>>>> thanks!
>>>>
>>>> Best
>>>> Lu
>>>>
>>>>
>>>> On Thu, Dec 17, 2020 at 6:10 PM Xintong Song 
>>>> wrote:
>>>>
>>>>> I'm not aware of any significant changes to the HA components between
>>>>> 1.9/1.11.
>>>>> Would you mind sharing the com

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-01-29 Thread Xintong Song
There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not
aware of any similar issue reported since the upgrading.
I would suggest the following:
- Turn on the DEBUG log see if there's any valuable details
- Maybe try asking in the Apache Zookeeper community, see if this is a
known issue.

Thank you~
Xintong Song



Thank you~

Xintong Song



On Sat, Jan 30, 2021 at 6:47 AM Lu Niu  wrote:

> Hi, Xintong
>
> Thanks for replying. Could it relate to the zk version? We are a platform
> team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9
> and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced
> in 1.11 jobs. That's why we think it is related to version upgrade.
>
> Best
> Lu
>
> On Thu, Jan 28, 2021 at 7:56 PM Xintong Song 
> wrote:
>
>> The ZK client side uses 15s connection timeout and 60s session timeout
>> in Flink. There's nothing similar to a heartbeat interval configured, which
>> I assume is up to ZK's internal implementation. These things have not
>> changed in FLink since at least 2017.
>>
>> If both ZK client and server complain about timeout, and there's no gc
>> issue spotted, I would consider a network instability.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Jan 29, 2021 at 3:15 AM Lu Niu  wrote:
>>
>>> After checking the log I found the root cause is zk client timeout on TM:
>>> ```
>>> 2021-01-25 14:01:49,600 WARN
>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
>>> session timed out, have not heard from server in 40020ms for sessionid
>>> 0x404f9ca531a5d6f
>>> 2021-01-25 14:01:49,610 INFO
>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
>>> session timed out, have not heard from server in 40020ms for sessionid
>>> 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
>>> 2021-01-25 14:01:49,711 INFO
>>> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
>>> - State change: SUSPENDED
>>> 2021-01-25 14:01:49,711 WARN
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>>> ZooKeeper.
>>> 2021-01-25 14:01:49,712 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
>>> 27ac39342913d29baac4cde13062c4a4 with leader id
>>> b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
>>> 2021-01-25 14:01:49,712 WARN
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>>> ZooKeeper.
>>> 2021-01-25 14:01:49,712 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
>>> connection for job 27ac39342913d29baac4cde13062c4a4.
>>> 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task -
>>> Attempting to fail task externally Sink:
>>> USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
>>> (d5b5887e639874cb70d7fef939b957b7).
>>> 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task -
>>> Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
>>> (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
>>> org.apache.flink.util.FlinkException: JobManager responsible for
>>> 27ac39342913d29baac4cde13062c4a4 lost the leadership.
>>> ```
>>>
>>> I checked that TM gc log, no gc issues. it also shows client timeout in
>>> zookeeper server log. How frequently the zk client sync with server side in
>>> flink? The log says client doesn't heartbeat to server for 40s. Any help?
>>> thanks!
>>>
>>> Best
>>> Lu
>>>
>>>
>>> On Thu, Dec 17, 2020 at 6:10 PM Xintong Song 
>>> wrote:
>>>
>>>> I'm not aware of any significant changes to the HA components between
>>>> 1.9/1.11.
>>>> Would you mind sharing the complete jobmanager/taskmanager logs?
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Fri, Dec 18, 2020 at 8:53 AM Lu Niu  wrote:
>>>>
>>>>> Hi, Xintong
>>>>>
>>>>> Thanks for replying and your suggestion. I did check the ZK side but
>>>>> there is nothing interesting. The error message actually shows that only
>>>>> on

[ANNOUNCE] Apache Flink 1.10.3 released

2021-01-28 Thread Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2021/01/29/release-1.10.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xintong Song


[ANNOUNCE] Apache Flink 1.10.3 released

2021-01-28 Thread Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2021/01/29/release-1.10.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xintong Song


Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-01-28 Thread Xintong Song
The ZK client side uses 15s connection timeout and 60s session timeout
in Flink. There's nothing similar to a heartbeat interval configured, which
I assume is up to ZK's internal implementation. These things have not
changed in FLink since at least 2017.

If both ZK client and server complain about timeout, and there's no gc
issue spotted, I would consider a network instability.

Thank you~

Xintong Song



On Fri, Jan 29, 2021 at 3:15 AM Lu Niu  wrote:

> After checking the log I found the root cause is zk client timeout on TM:
> ```
> 2021-01-25 14:01:49,600 WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f
> 2021-01-25 14:01:49,610 INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
> 2021-01-25 14:01:49,711 INFO
> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
> - State change: SUSPENDED
> 2021-01-25 14:01:49,711 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
> 27ac39342913d29baac4cde13062c4a4 with leader id
> b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
> 2021-01-25 14:01:49,712 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
> connection for job 27ac39342913d29baac4cde13062c4a4.
> 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Sink:
> USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
> (d5b5887e639874cb70d7fef939b957b7).
> 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task -
> Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
> (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: JobManager responsible for
> 27ac39342913d29baac4cde13062c4a4 lost the leadership.
> ```
>
> I checked that TM gc log, no gc issues. it also shows client timeout in
> zookeeper server log. How frequently the zk client sync with server side in
> flink? The log says client doesn't heartbeat to server for 40s. Any help?
> thanks!
>
> Best
> Lu
>
>
> On Thu, Dec 17, 2020 at 6:10 PM Xintong Song 
> wrote:
>
>> I'm not aware of any significant changes to the HA components between
>> 1.9/1.11.
>> Would you mind sharing the complete jobmanager/taskmanager logs?
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Dec 18, 2020 at 8:53 AM Lu Niu  wrote:
>>
>>> Hi, Xintong
>>>
>>> Thanks for replying and your suggestion. I did check the ZK side but
>>> there is nothing interesting. The error message actually shows that only
>>> one TM thought JM lost leadership while others ran fine. Also, this
>>> happened only after we migrated from 1.9 to 1.11. Is it possible this is
>>> introduced by 1.11?
>>>
>>> Best
>>> Lu
>>>
>>> On Wed, Dec 16, 2020 at 5:56 PM Xintong Song 
>>> wrote:
>>>
>>>> Hi Lu,
>>>>
>>>> I assume you are using ZooKeeper as the HA service?
>>>>
>>>> A common cause of unexpected leadership lost is the instability of HA
>>>> service. E.g., if ZK does not receive heartbeat from Flink RM for a
>>>> certain period of time, it will revoke the leadership and notify
>>>> other components. You can look into the ZooKeeper logs checking why RM's
>>>> leadership is revoked.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Thu, Dec 17, 2020 at 8:42 AM Lu Niu  wrote:
>>>>
>>>>> Hi, Flink users
>>>>>
>>>>> Recently we migrated to flink 1.11 and see exceptions like:
>>>>> ```
>>>>> 2020-12-15 12:41:01,199 INFO
>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
>>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->
>>>>> USER_MATERIALIZED_EVENT_SIGNAL-

Re: Flink1.12 批处理模式,分词统计时单词个数为1的单词不会被打印

2021-01-28 Thread Xintong Song
你用的应该是 1.12.0 版本吧。这是一个已知问题 [1],升级到 1.12.1 有修复。

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-20764

On Thu, Jan 28, 2021 at 4:55 PM xhyan0427 <15527609...@163.com> wrote:

> 代码:
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setRuntimeMode(RuntimeExecutionMode.BATCH)  // 在DataStream
> API上以批处理方式执行
>
> // 本地测试文件
> val inputStream =
> env.readTextFile(getClass.getResource("/hello.txt").getPath)
>
> // 分词统计,问题:批处理模式的时候,sum 为 1 的单词不会被打印
> val resultStream = inputStream
>   .flatMap(_.split(","))
>   .filter(_.nonEmpty)
>   .map((_, 1))
>   .keyBy(_._1)
>   .sum(1)
> resultStream.print()
> env.execute("word count")
>
> 测试文件的数据内容:
> hello,flink
> hello,flink
> hello,hive
> hello,hive
> hello,hbase
> hello,hbase
> hello,scala
> hello,kafka
> hello,kafka
>
>
> 测试结果:hello/flink/hive/hbase/kafka的和大于1,会打印出来;但是 scala的个数为1,不会被打印出来
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: pyflink 1.12.1 没有 python 3.8 安装文件

2021-01-23 Thread Xintong Song
Apache 官方也能下载到。

https://dist.apache.org/repos/dist/release/flink/flink-1.12.1/python/


Thank you~

Xintong Song


On Sun, Jan 24, 2021 at 11:39 AM macdoor  wrote:

> 谢谢!不好意思没有仔细读文档,现在哪里能下载build 好的 Linux 下的 Python 3.8 的 pyflink 1.12.1
> 吗?觉得自己build的还是不放心
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: pyflink 1.12.1 没有 python 3.8 安装文件

2021-01-23 Thread Xintong Song
目前是不全的。Flink 在 PyPI 上的项目存储空间满了,导致 Python 3.8 Linux 的包上传 不上去。目前已经联系 PyPI
对项目空间扩容,但是 PyPI 的审核流程比较慢还没有完成扩容。

1.12.1 release notes [1] 里面也有提到。

>
>- The source and python 3.8 linux wheel packages for Apache Flink
>1.12.1 are temporary missing on PyPI
><https://pypi.org/project/apache-flink>, due to the project space
>limit. The request for increasing the space limit is currently under the
>PyPI review process. During this time, you can build the package
>manually
>
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink>
>  if
>needed.
>
>
Thank you~

Xintong Song


[1] https://flink.apache.org/news/2021/01/19/release-1.12.1.html



On Sun, Jan 24, 2021 at 9:12 AM macdoor  wrote:

> 在 Linux python 3.8上无法安装 pyflink 1.12.1 ,最高是 1.12.0,查看可以提供的安装文件
> https://pypi.org/project/apache-flink/#files 中,python 3.8 只有一个安装文件
> apache_flink-1.12.1-cp38-cp38-macosx_10_9_x86_64.whl 。
>
> 而 pyflink 1.12.0 的 python 3.8 有 2个安装文件
> apache_flink-1.12.0-cp38-cp38-manylinux1_x86_64.whl 和
> apache_flink-1.12.0-cp38-cp38-macosx_10_9_x86_64.whl 。
>
> pyflink 1.12.1 安装文件不全吗?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: FlinkUserCodeClassLoader在session模式的集群下是如何卸载类的

2021-01-21 Thread Xintong Song
standalone 集群 session 模式,作业的 main 方法应该是在 client 端执行的。


Thank you~

Xintong Song



On Fri, Jan 22, 2021 at 9:52 AM Asahi Lee <978466...@qq.com> wrote:

> 你好!
>   
> 我使用的是flink-1.12.0版本,启动的单机集群;在我的flink程序main方法中,我使用URLClassLoader加载了一个
> http://a.jar的jar包 <http://a.xn--jarjar-gz4j285u>,通过该类加载器加载我的自定义代码类,然后通过rest
> api 调用jar/run接口执行我提前上传好的uber-jar,
> 当我多次启动job时,我的自定义代码类没有重新加载,请问这种情况如何处理?而我uber-jar中的类有更新则发现类是重新加载的,请问这是如何实现的?谢谢!
>   


Re: flink heartbeat timeout

2021-01-20 Thread Xintong Song
1. 50s 的 timeout 时间通常应该是够用的。建议排查一下 timeout 当时环境中是否存在网络抖动,或者 JM/TM 进程是否存在长时间
GC 导致不响应。
2. 目前 flink 集群配置无法做到不重启热更新

Thank you~

Xintong Song



On Thu, Jan 21, 2021 at 11:39 AM guoxb__...@sina.com 
wrote:

> Hi
>
> *问题描述:*
>
>  
> 我在使用flink进行流式计算任务,我的程序造昨晚上21点启动的,当时看是正常的,数据也是正常处理的,在今早9点时候查看,任务被自动重启了,查看日志,报错如下:
>
> 从报错上来看是由于超时时间引起的,查看资料,是需要调整该参数参数:
> heartbeat.timeout,官网文档支出默认值是5,但是这样以来的话,就需要重启flink服务了,这在我们生产上是不允许的。
>
> *问题:*
> 1、该错误的原因目前只是经过猜测,还没有确定具体的问题,希望有经验的朋友指点一二,万分感谢
> 2、如果我真的需要设置heartbeat.timeout这个参数的话,如何在不通过重启flink集群的方式来实现,万分感谢
> 说明:
> 我的flink版本是:1.11.0
> --
> guoxb__...@sina.com
>


Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-20 Thread Xintong Song
cc @Jark
看起来像是 JDBC connector 的问题。这块你熟悉吗?或者知道谁比较熟悉吗?

Thank you~

Xintong Song



On Wed, Jan 20, 2021 at 8:07 PM YueKun  wrote:

> hi,不确定是否能看到图片,Jmap导出的数据分析看如下:<
> http://apache-flink.147419.n8.nabble.com/file/t1276/WX20210120-191436.png>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-20 Thread Xintong Song
JDBC连接是谁创建的,能找到相关调用栈吗,是 flink 提供的 connector 还是用户代码?

Thank you~

Xintong Song



On Wed, Jan 20, 2021 at 6:32 PM YueKun  wrote:

> 目前看泄漏是因为 mysql 的 JDBC 引起的,和
>
> http://apache-flink.147419.n8.nabble.com/1-11-1-OutOfMemoryError-Metaspace-td8367.html#a8399
> 这个问题一样。这个有什么解决方法吗?需要更换 mysql-connector-java 版本吗? 我目前用的 5.1.49 版本
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-19 Thread Xintong Song
你用的是 Flink 是哪个版本?Flink 有一些已知的、已修复的 metaspace 泄露问题 [1] [2],看下是否符合你的情况。

另外,也不排除与你的代码实现、用到的依赖包的实现相关。具体问题定位需要 jstack / jmap 检查一下是否有此前任务的残留 thread /
object。

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-16408
[2] https://issues.apache.org/jira/browse/FLINK-20333

On Tue, Jan 19, 2021 at 4:07 PM 岳坤  wrote:

> Hi, 有个 JVM Metaspace OOM 的问题想请求下帮助,我通过pyflink 提交一些 Batch 任务,任务内执行的是查询
> Mysql 的数据统计完写入 Kafka,之后任务就 finished 了。但是发现每次执行任务,JVM Metaspace
> 的内存会不断增长,即使任务结束后,内存仍然不会释放减少。 这个可能是Flink的哪里设置不对导致的吗?还是代码原因呢? JVM Met


Re: 1.12.0版本启动异常 on yarn per job方式

2021-01-19 Thread Xintong Song
检查一下你的作业 jar 包里是否把 hadoop 依赖也打进去了。一般情况下 hadoop 依赖应该设成 provided,如果作业确实有需要用到和
yarn 集群不同版本的 hadoop 依赖,需要 shade。

Thank you~
Xintong Song



Thank you~

Xintong Song



On Tue, Jan 19, 2021 at 3:31 PM guanyq  wrote:

> 看错误是与hadoop-common-2.7.4.jar冲突,但是不知道如何解决。
> help
> 2021-01-1915:12:47,922ERRORorg.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
> [] - Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> Could not start the ResourceManager akka.tcp://flink@dn138.hadoop.unicom
> :45554/user/rpc/resourcemanager_0
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> Caused by: java.lang.NoSuchMethodError:
> org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186)
> ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> ~[FlinkDataProcess.ja

[ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 Thread Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.12.1, which is the first bugfix release for the Apache Flink 1.12
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2021/01/19/release-1.12.1.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xintong


[ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 Thread Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.12.1, which is the first bugfix release for the Apache Flink 1.12
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2021/01/19/release-1.12.1.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xintong


Re: Resource changed on src filesystem after upgrade

2021-01-18 Thread Xintong Song
Hi Mark,

Two quick questions that might help us understand what's going on.
- Does this error happen for every of your dataset jobs? For a problematic
job, does it happen for every container?
- What is the `jobs.jar`? Is it under `lib/`, `opt` of your client side
filesystem, or specified as `yarn.ship-files`, `yarn.ship-archives` or
`yarn.provided.lib.dirs`? This helps us to locate the code path that this
file went through.

Thank you~

Xintong Song



On Sun, Jan 17, 2021 at 10:32 PM Mark Davis  wrote:

> Hi all,
> I am upgrading my DataSet jobs from Flink 1.8 to 1.12.
> After the upgrade I started to receive the errors like this one:
>
> 14:12:57,441 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager  -
> Worker container_e120_1608377880203_0751_01_000112 is terminated.
> Diagnostics: Resource
> hdfs://bigdata/user/hadoop/.flink/application_1608377880203_0751/jobs.jar
> changed on src filesystem (expected 1610892446439, was 1610892446971
> java.io.IOException: Resourceh
> dfs://bigdata/user/hadoop/.flink/application_1608377880203_0751/jobs.jar
> changed on src filesystem (expected 1610892446439, was 1610892446971
> at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:257)
> at
> org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
> at
> org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
> at
> org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:228)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:221)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:209)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I understand it is somehow related to FLINK-12195, but this time it comes
> from the Hadoop code. I am running a very old version of the HDP platform
> v.2.6.5 so it might be the one to blame.
> But the code was working perfectly fine before the upgrade, so I am
> confused.
> Could you please advise.
>
> Thank you!
>   Mark
>


Re: 腾讯安检测的这个Apache Flink目录遍历漏洞风险通告,社区会对以前的版本根据修复吗

2021-01-09 Thread Xintong Song
社区目前没有计划对以前版本修复。

一般情况下,Flink 社区只对最近的两个版本进行维护。也就是说目前最新的版本是 1.12,因此会对 1.11、1.12 这两个版本进行
bugfix,而一旦 1.13 发布 1.11 就停止维护了。

Thank you~

Xintong Song



On Fri, Jan 8, 2021 at 6:24 PM zhouyajun  wrote:

> 报告链接:https://s.tencent.com/research/bsafe/1215.html
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: How does Flink handle shorted lived keyed streams

2020-12-24 Thread Xintong Song
I believe what you are looking for is the State TTL [1][2].


Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

[2]
https://ci.apache.org/projects/flink/flink-docs-stabledev/table/config.html#table-exec-state-ttl



On Wed, Dec 23, 2020 at 11:57 PM narasimha  wrote:

> Hi,
>
> Belos is the use case.
>
> Have a stream of transaction events, success/failure of a transaction can
> be determined by those events.
> Partitioning stream by transaction id and applying CEP to determine the
> success/failure of a transaction.
> Each transaction keyed stream is valid only until the final status is
> found. Which can end up having large inactive keyed streams in the system.
>
> Know that using keygroup flink distributes the keyedstream to tasks based
> on it, but still there will be a large set of inactive keys.
>
> Does this have any side effects? If so what has to be done to overcome
> humongous keyed streams?
>
> --
> A.Narasimha Swamy
>


[ANNOUNCE] Apache Flink 1.11.3 released

2020-12-18 Thread Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.3, which is the third bugfix release for the Apache Flink 1.11
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/12/18/release-1.11.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348761

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gordon & Xintong


[ANNOUNCE] Apache Flink 1.11.3 released

2020-12-18 Thread Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.3, which is the third bugfix release for the Apache Flink 1.11
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/12/18/release-1.11.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348761

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gordon & Xintong


Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2020-12-17 Thread Xintong Song
I'm not aware of any significant changes to the HA components between
1.9/1.11.
Would you mind sharing the complete jobmanager/taskmanager logs?

Thank you~

Xintong Song



On Fri, Dec 18, 2020 at 8:53 AM Lu Niu  wrote:

> Hi, Xintong
>
> Thanks for replying and your suggestion. I did check the ZK side but there
> is nothing interesting. The error message actually shows that only one TM
> thought JM lost leadership while others ran fine. Also, this happened only
> after we migrated from 1.9 to 1.11. Is it possible this is introduced by
> 1.11?
>
> Best
> Lu
>
> On Wed, Dec 16, 2020 at 5:56 PM Xintong Song 
> wrote:
>
>> Hi Lu,
>>
>> I assume you are using ZooKeeper as the HA service?
>>
>> A common cause of unexpected leadership lost is the instability of HA
>> service. E.g., if ZK does not receive heartbeat from Flink RM for a
>> certain period of time, it will revoke the leadership and notify
>> other components. You can look into the ZooKeeper logs checking why RM's
>> leadership is revoked.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Dec 17, 2020 at 8:42 AM Lu Niu  wrote:
>>
>>> Hi, Flink users
>>>
>>> Recently we migrated to flink 1.11 and see exceptions like:
>>> ```
>>> 2020-12-15 12:41:01,199 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60)
>>> (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on
>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
>>> java.lang.Exception: Job leader for job id
>>> 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>>> [nrtg-1.11_deploy.jar:?]
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>> [nrtg-1.11_deploy.jar:?]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>> [nrtg-1.11_deploy.jar:?]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> [nrtg-1.11_deploy.jar:?]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:581)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
>>> ```
>>>
>>> ```
>>> 2020-12-15 01:01:39,531 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
>>> USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner ->
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360)
>>> (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on
>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
>>> org.apache.flink.util.FlinkException: ResourceManager leader changed t

Re: flink 1.12 RocksDBStateBackend 报错

2020-12-17 Thread Xintong Song
https://issues.apache.org/jira/browse/FLINK-20646

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 11:40 PM zhisheng  wrote:

> hi,xintong
>
> 有对应的 Issue ID 吗?
>
> Xintong Song  于2020年12月17日周四 下午4:48写道:
>
> > 确实是 1.12.0 的 bug。
> > 我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
> > ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
> > 我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com>
> wrote:
> >
> > > 1.12设置 env.setStateBackend(new RocksDBStateBackend(checkpointPath,
> > > true))之后会报错
> > > :
> > > Caused by: java.lang.IllegalArgumentException: The fraction of memory
> to
> > > allocate should not be 0. Please make sure that all types of managed
> > memory
> > > consumers contained in the job are configured with a non-negative
> weight
> > > via
> > > `taskmanager.memory.managed.consumer-weights`.
> > >
> > > 但查看源码这个参数是默认值。
> > > 最终找到原因是
> > > Streamconfig下getManagedMemoryFractionOperatorUseCaseOfSlot中
> > > config缺少key : managedMemFraction.STATE_BACKEND
> > > 当设置
> > > config.setDouble("managedMemFraction.STATE_BACKEND", 0.7)
> > > 后,程序正常。
> > > 代码如下
> > > https://paste.ubuntu.com/p/9WrBz3Xrc6/
> > >
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > >
> >
>


Re: flink 1.12 RocksDBStateBackend 报错

2020-12-17 Thread Xintong Song
确实是 1.12.0 的 bug。
我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com> wrote:

> 1.12设置 env.setStateBackend(new RocksDBStateBackend(checkpointPath,
> true))之后会报错
> :
> Caused by: java.lang.IllegalArgumentException: The fraction of memory to
> allocate should not be 0. Please make sure that all types of managed memory
> consumers contained in the job are configured with a non-negative weight
> via
> `taskmanager.memory.managed.consumer-weights`.
>
> 但查看源码这个参数是默认值。
> 最终找到原因是
> Streamconfig下getManagedMemoryFractionOperatorUseCaseOfSlot中
> config缺少key : managedMemFraction.STATE_BACKEND
> 当设置
> config.setDouble("managedMemFraction.STATE_BACKEND", 0.7)
> 后,程序正常。
> 代码如下
> https://paste.ubuntu.com/p/9WrBz3Xrc6/
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2020-12-16 Thread Xintong Song
Hi Lu,

I assume you are using ZooKeeper as the HA service?

A common cause of unexpected leadership lost is the instability of HA
service. E.g., if ZK does not receive heartbeat from Flink RM for a
certain period of time, it will revoke the leadership and notify
other components. You can look into the ZooKeeper logs checking why RM's
leadership is revoked.

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 8:42 AM Lu Niu  wrote:

> Hi, Flink users
>
> Recently we migrated to flink 1.11 and see exceptions like:
> ```
> 2020-12-15 12:41:01,199 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60)
> (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
> java.lang.Exception: Job leader for job id
> 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
> ~[nrtg-1.11_deploy.jar:?]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
> ~[nrtg-1.11_deploy.jar:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[nrtg-1.11_deploy.jar:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[nrtg-1.11_deploy.jar:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[nrtg-1.11_deploy.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> [nrtg-1.11_deploy.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> [nrtg-1.11_deploy.jar:?]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [nrtg-1.11_deploy.jar:?]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> [nrtg-1.11_deploy.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [nrtg-1.11_deploy.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [nrtg-1.11_deploy.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [nrtg-1.11_deploy.jar:?]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
> [nrtg-1.11_deploy.jar:?]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
> [nrtg-1.11_deploy.jar:?]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
> [nrtg-1.11_deploy.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:581)
> [nrtg-1.11_deploy.jar:?]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
> [nrtg-1.11_deploy.jar:?]
> at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
> ```
>
> ```
> 2020-12-15 01:01:39,531 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner ->
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360)
> (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
> org.apache.flink.util.FlinkException: ResourceManager leader changed to
> new address null
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
> ~[nrtg-1.11_deploy.jar:?]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
> ~[nrtg-1.11_deploy.jar:?]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
> ~[nrtg-1.11_deploy.jar:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[nrtg-1.11_deploy.jar:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[nrtg-1.11_deploy.jar:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[nrtg-1.11_deploy.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> [nrtg-1.11_deploy.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> [nrtg-1.11_deploy.jar:?]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [nrtg-1.11_deploy.jar:?]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> [nrtg-1.11_deploy.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [nrtg-1.1

Re: Re: 直接内存溢出

2020-12-16 Thread Xintong Song
日志文件开头会打环境信息,包括 JVM 参数。

Thank you~

Xintong Song



On Wed, Dec 16, 2020 at 10:01 PM aven  wrote:

> 感谢回复,我尝试一下这两个参数。
> 我还有一个问题,flink的内存配置参数在启动,在运行时是否有办法查看。
> 或者在启动的时候是可以通过日志打印出来吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best!
> Aven
>
>
>
>
>
> 在 2020-12-16 19:22:19,"Xintong Song"  写道:
> >可以使用这两个参数。
> >
> >   - containerized.heap-cutoff-ratio
> >   - containerized.heap-cutoff-min
> >
> >cutoff 的含义是从 container 内存中额外留出一部分,不作为 flink 的 heap/network/managed
> >内存。这部分内存通常是用于 JVM、用户代码、第三方依赖的对外内存开销。Flink 计算 MaxDirectMemorySize 参数时也会把
> >cutoff 算进去,因此调大 cutoff 也可以起到放宽直接内存上限的效果。
> >
> >
> >另外,Flink 1.9 及以前的内存模型是比较混乱的,建议有条件的话尽快升级到新版本。
> >
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >
> >On Wed, Dec 16, 2020 at 7:07 PM 巫旭阳  wrote:
> >
> >> 报错信息如下
> >> Caused by: java.lang.OutOfMemoryError: Direct buffer memory at
> >> java.nio.Bits.reserveMemory(Bits.java:693)
> >>
> >>  at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at
> >> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >>
> >>  at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> >>
> >>  at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> >>
> >>  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
> >>
> >>  at
> >> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
> >>
> >>
> >>  at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
> >>
> >>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> >>
> >>  at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> >>
> >>
> >>  at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
> >>
> >>
> >>  at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >>
> >>
> >>
> >> 版本
> >>   Flink:1.9.1
> >>   kafka-client:0.10.0.1
> >> 环境
> >>   on yarn
> >> JVM参数
> >>   -Xms14336m
> >>   -Xmx14336m
> >>   -XX:MaxDirectMemorySize=6144m
> >> flink-conf.yml
> >>  使用的是默认的参数
> >>  Stream任务,并且没有使用RocksDB
> >>
> >> 目前初步怀疑是Flink 的堆外内存占用过大导致kafka consumer 无法申请堆外内存导致OOM。但根据官方文档的配置
> >> taskmanager.memory.fraction=0.7 这个应该在我的程序中不生效
> >> taskmanager.network.memory.fraction=0.1
> >>
> >>
> >> 这样的配置下来应该用户代码可使用的堆外内存为6144m*0.9=5529m
> >> 我的问题是
> >> 在我当前的环境下是否还有我没注意到的Flink堆外内存配置,或者Flink需要占用的堆外内存是我所不了解的。
> >> 除了控制kafka comsumer 的流量以外有没有什么其他的调整方式?
> >>
> >>
> >> Best
> >> Aven
> >>
> >>
>


Re: 直接内存溢出

2020-12-16 Thread Xintong Song
可以使用这两个参数。

   - containerized.heap-cutoff-ratio
   - containerized.heap-cutoff-min

cutoff 的含义是从 container 内存中额外留出一部分,不作为 flink 的 heap/network/managed
内存。这部分内存通常是用于 JVM、用户代码、第三方依赖的对外内存开销。Flink 计算 MaxDirectMemorySize 参数时也会把
cutoff 算进去,因此调大 cutoff 也可以起到放宽直接内存上限的效果。


另外,Flink 1.9 及以前的内存模型是比较混乱的,建议有条件的话尽快升级到新版本。


Thank you~

Xintong Song



On Wed, Dec 16, 2020 at 7:07 PM 巫旭阳  wrote:

> 报错信息如下
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory at
> java.nio.Bits.reserveMemory(Bits.java:693)
>
>  at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at
> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>  at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
>
>  at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>
>  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>
>  at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
>
>  at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>
>  at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>
>  at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
>
>  at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
>
>  at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
>
>
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>
>  at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>
>
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>
>
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>
>
>
> 版本
>   Flink:1.9.1
>   kafka-client:0.10.0.1
> 环境
>   on yarn
> JVM参数
>   -Xms14336m
>   -Xmx14336m
>   -XX:MaxDirectMemorySize=6144m
> flink-conf.yml
>  使用的是默认的参数
>  Stream任务,并且没有使用RocksDB
>
> 目前初步怀疑是Flink 的堆外内存占用过大导致kafka consumer 无法申请堆外内存导致OOM。但根据官方文档的配置
> taskmanager.memory.fraction=0.7 这个应该在我的程序中不生效
> taskmanager.network.memory.fraction=0.1
>
>
> 这样的配置下来应该用户代码可使用的堆外内存为6144m*0.9=5529m
> 我的问题是
> 在我当前的环境下是否还有我没注意到的Flink堆外内存配置,或者Flink需要占用的堆外内存是我所不了解的。
> 除了控制kafka comsumer 的流量以外有没有什么其他的调整方式?
>
>
> Best
> Aven
>
>


Re: Flink 1.10.0 on yarn 提交job失败

2020-12-15 Thread Xintong Song
看起来是 Yarn 没有给应用设置 hadoop classpath。可以登机器确认一下 launch_container.sh
的内容,container 启动命令里是否包含了正确的 hadoop classpath。Yarn 是定制过的版本吗?按理说开源版本都会给
container 设置 hadoop classpath 的。

1.10 版本以前可以运行是因为 flink 自带了 shaded hadoop,从 1.10 版本开始 flink 默认不再携带 shaded
hadoop,而是使用集群环境的 hadoop 依赖。你也可以自己携带 shaded hadoop[1],应该也可以运行。

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#adding-hadoop-to-lib

On Sat, Dec 12, 2020 at 8:05 PM Jacob <17691150...@163.com> wrote:

> Hello, 请问在flink 1.10.0 on yarn提交job出现此问题是什么原因,hadoop
> jar包依赖吗?该程序在1.10以下的版本均可运行,在1.10.0无法提交。
>
> 谢谢!
> 
>
> [jacob@hadoop001 bin]$ ./yarn logs -applicationId
> application_1603495749855_57650
> 20/12/11 18:52:55 INFO client.RMProxy: Connecting to ResourceManager at
> localhost:8032
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/opt/app/hadoop_client/e11_backend/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/opt/app/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 20/12/11 18:52:57 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
>
> Container: container_1603495749855_57650_02_01 on localhost
>
> =
> LogType:jobmanager.err
> Log Upload Time:Fri Dec 11 18:49:21 -0800 2020
> LogLength:2368
> Log Contents:
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/11/datafeed-website-filter_flink-0.0.1-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/data/hadoop/dn/sde/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/17/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
> Exception in thread "main" java.lang.NoSuchMethodError:
>
> org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V
> at
>
> org.apache.hadoop.mapreduce.util.ConfigUtil.addDeprecatedKeys(ConfigUtil.java:54)
> at
>
> org.apache.hadoop.mapreduce.util.ConfigUtil.loadResources(ConfigUtil.java:42)
> at org.apache.hadoop.mapred.JobConf.(JobConf.java:119)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
>
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1659)
> at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:91)
> at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
> at
>
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
> at org.apache.hadoop.security.Groups.(Groups.java:55)
> at
>
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:182)
> at
>
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:235)
> at
>
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:214)
> at
>
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:669)
> at
>
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571)
> at
>
> org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
> at
>
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11.2 on yarn 可用slot始终为0,job无法提交

2020-12-09 Thread Xintong Song
jobmanager 的日志方便发下吗?
另外,可以看下 yarn 是否分配了 taskmanager 的 container,如果有的话通过 yarn 获取以下 taskmanager
的日志。

Thank you~

Xintong Song



On Thu, Dec 10, 2020 at 9:55 AM Jacob <17691150...@163.com> wrote:

> <
> http://apache-flink.147419.n8.nabble.com/file/t1162/Screenshot_2020-12-09_153858.png>
>
>
>
> 启动命令:
> ./bin/flink run-application -t yarn-application
> -Djobmanager.memory.process.size=2048m
> -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name="Test Job"
> -c com.jacob.Main /opt/app/test.jar
>
> Hadoop集群 资源充足。flink无法为job分配slot。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink-yarn模块加载外部文件的问题

2020-12-06 Thread Xintong Song
Hi,

你指的是 `yarn.provided.lib.dirs` 吗?
这个地方设计的时候确实是只考虑到了 hdfs,没有考虑 http 文件。

我刚刚也和熟悉 hadoop 的朋友确认了一下,从 yarn 的层面上应该也是支持 http 文件的,所以 flink 最好也是能支持 http 文件。
由于整个 feature 在设计的时候都没有考虑到 http 文件的问题,不确定修复了正则匹配的问题后是否还会有其他问题。
我开了 FLINK-20505 来跟踪 provided lib 支持 http 文件的整体进度。

https://issues.apache.org/jira/browse/FLINK-20505

Thank you~

Xintong Song



On Mon, Dec 7, 2020 at 10:03 AM zhou chao  wrote:

> hi all, 最近在1.11上使用io.extra-file加载外部http文件出现一点小问题
>
> 由于http的文件在FileSystem.getFileStatus去拿状态时会走HttpFileSystem的类的getFileStatus方法,该方法返回的FileStatus中length为-1。
> 在client端校验通过后,在decodeYarnLocalResourceDescriptor的时候会碰到问题。
> 异常如下:
> 2020-12-04 17:01:28.955 ERROR org.apache.flink.yarn.YarnResourceManager -
> Could not start TaskManager in container containerXX.
> org.apache.flink.util.FlinkException: Error to parse
> YarnLocalResourceDescriptor from YarnLocalResourceDescriptor{key=X.jar,
> path=https://XXX.jar, size=-1, modificationTime=0,
> visibility=APPLICATION}
> at
> org.apache.flink.yarn.YarnLocalResourceDescriptor.fromString(YarnLocalResourceDescriptor.java:99)
> at
> org.apache.flink.yarn.Utils.decodeYarnLocalResourceDescriptorListFromString(Utils.java:721)
> at
> org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:626)
> at
> org.apache.flink.yarn.YarnResourceManager.getOrCreateContainerLaunchContext(YarnResourceManager.java:746)
> at
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:726)
> at
> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:500)
> at
> org.apache.flink.yarn.YarnResourceManager.onContainersOfResourceAllocated(YarnResourceManager.java:455)
> at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:415)
> 。
> 因为正则匹配的size只能匹配数字
>
> private static final Pattern LOCAL_RESOURCE_DESC_FORMAT =
> Pattern.compile("YarnLocalResourceDescriptor\\{" +
>"key=(\\S+), path=(\\S+), size=([\\d]+), modificationTime=([\\d]+),
> visibility=(\\S+), type=(\\S+)}");
>
> 负号不能被匹配上,想要实现加载http的文件需要mock一个length大于-1的FileStatus。
> 想问下各位大佬,看到有加载remote文件的功能,这块功能是不是给远程hdfs上文件用的,并没有考虑到http文件呢?
>


Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Xintong Song
FYI, I've opened FLINK-20503 for this.
https://issues.apache.org/jira/browse/FLINK-20503

Thank you~

Xintong Song



On Mon, Dec 7, 2020 at 11:10 AM Xintong Song  wrote:

> I forgot to mention that it is designed that task managers always have
> `Double#MAX_VALUE` cpu cores in local execution.
>
>
> And I think Yangze is right. The log "The configuration option
> taskmanager.cpu.cores required for local execution is not set, setting it
> to" can be misleading for users. Will fire an issue on that.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
> On Mon, Dec 7, 2020 at 11:03 AM Xintong Song 
> wrote:
>
>> Hi Rex,
>>
>> We're running this in a local environment so that may be contributing to
>>> what we're seeing.
>>>
>> Just to double check on this. By `local environment`, you mean running
>> flink without setting up a standalone cluster or submitting it to a
>> K8s/Yarn cluster? (Typically executing from an IDE, running `flink run -t
>> local`, or running your own application that calls
>> `ExecutionEnvironment#execute`).
>> If yes, then this is kind of expected.
>>
>> A couple of things that might help you understand this.
>>
>>- Running on a local environment means setting up the Flink cluster
>>within the current process (the IDE process if executed from an IDE, the
>>flink client process if using `flink run -t local`, or your own 
>> application
>>process). That also means most of the resource configurations cannot take
>>effect, because the resources of the JVM are already determined. Please
>>refer to the memory configuration documents for options that still take
>>effect in local execution. [1][2]
>>- David is correct that `taskmanager.cpu.cores` is only intended for
>>internal usages. I assume you learnt about this configuration by reading
>>the source codes? If true, please also be aware that the JavaDoc
>>of `TaskManagerOption#CPU_CORES` says "DO NOT USE THIS CONFIG OPTION", and
>>it is also annotated with `ExcludeFromDocumentation` so that users do not
>>learn this option from the documents.
>>- Flink does not really control how many cpu cores it uses. However,
>>when running on an external resource management system (K8s, Yarn, Mesos),
>>it requires a certain amount of cpu resources for its containers/pods, and
>>allows the external system to control its cpu usage. You can use the
>>following configuration options to control how many cpu cores are 
>> requested
>>in such cases.
>>   - kubernetes.jobmanager.cpu
>>   - kubernetes.taskmanager.cpu
>>   - yarn.appmaster.vcores
>>   - yarn.containers.vcores
>>   - mesos.resourcemanager.tasks.cpus
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#local-execution
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_jobmanager.html#local-execution
>>
>>
>> On Mon, Dec 7, 2020 at 10:32 AM Yangze Guo  wrote:
>>
>>> Hi, Rex,
>>>
>>> Can you share more logs for it. Did you see something like "The
>>> configuration option taskmanager.cpu.cores required for local
>>> execution is not set, setting it to" in your logs?
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> Best,
>>> Yangze Guo
>>>
>>>
>>> On Sat, Dec 5, 2020 at 6:53 PM David Anderson 
>>> wrote:
>>> >
>>> > taskmanager.cpu.cores is intended for internal use only -- you aren't
>>> meant to set this option. What happens if you leave it alone?
>>> >
>>> > Regards,
>>> > David
>>> >
>>> >
>>> > On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
>>> >>
>>> >> We're running this in a local environment so that may be contributing
>>> to what we're seeing.
>>> >>
>>> >> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
>>> >>>
>>> >>> Hello,
>>> >>>
>>> >>> I'm tuning flink for parallelism right now and when I look at the
>>> JobManager I see
>>> >>> taskmanager.cpu.cores1.7976931348623157E308
>>> >>> Which looks like the maximum double number.
>>> >>>
>>> >>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper
&

Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Xintong Song
I forgot to mention that it is designed that task managers always have
`Double#MAX_VALUE` cpu cores in local execution.


And I think Yangze is right. The log "The configuration option
taskmanager.cpu.cores required for local execution is not set, setting it
to" can be misleading for users. Will fire an issue on that.


Thank you~

Xintong Song




On Mon, Dec 7, 2020 at 11:03 AM Xintong Song  wrote:

> Hi Rex,
>
> We're running this in a local environment so that may be contributing to
>> what we're seeing.
>>
> Just to double check on this. By `local environment`, you mean running
> flink without setting up a standalone cluster or submitting it to a
> K8s/Yarn cluster? (Typically executing from an IDE, running `flink run -t
> local`, or running your own application that calls
> `ExecutionEnvironment#execute`).
> If yes, then this is kind of expected.
>
> A couple of things that might help you understand this.
>
>- Running on a local environment means setting up the Flink cluster
>within the current process (the IDE process if executed from an IDE, the
>flink client process if using `flink run -t local`, or your own application
>process). That also means most of the resource configurations cannot take
>effect, because the resources of the JVM are already determined. Please
>refer to the memory configuration documents for options that still take
>effect in local execution. [1][2]
>- David is correct that `taskmanager.cpu.cores` is only intended for
>internal usages. I assume you learnt about this configuration by reading
>the source codes? If true, please also be aware that the JavaDoc
>of `TaskManagerOption#CPU_CORES` says "DO NOT USE THIS CONFIG OPTION", and
>it is also annotated with `ExcludeFromDocumentation` so that users do not
>learn this option from the documents.
>- Flink does not really control how many cpu cores it uses. However,
>when running on an external resource management system (K8s, Yarn, Mesos),
>it requires a certain amount of cpu resources for its containers/pods, and
>allows the external system to control its cpu usage. You can use the
>following configuration options to control how many cpu cores are requested
>in such cases.
>   - kubernetes.jobmanager.cpu
>   - kubernetes.taskmanager.cpu
>   - yarn.appmaster.vcores
>   - yarn.containers.vcores
>   - mesos.resourcemanager.tasks.cpus
>
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#local-execution
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_jobmanager.html#local-execution
>
>
> On Mon, Dec 7, 2020 at 10:32 AM Yangze Guo  wrote:
>
>> Hi, Rex,
>>
>> Can you share more logs for it. Did you see something like "The
>> configuration option taskmanager.cpu.cores required for local
>> execution is not set, setting it to" in your logs?
>>
>> Best,
>> Yangze Guo
>>
>> Best,
>> Yangze Guo
>>
>>
>> On Sat, Dec 5, 2020 at 6:53 PM David Anderson 
>> wrote:
>> >
>> > taskmanager.cpu.cores is intended for internal use only -- you aren't
>> meant to set this option. What happens if you leave it alone?
>> >
>> > Regards,
>> > David
>> >
>> >
>> > On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
>> >>
>> >> We're running this in a local environment so that may be contributing
>> to what we're seeing.
>> >>
>> >> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> I'm tuning flink for parallelism right now and when I look at the
>> JobManager I see
>> >>> taskmanager.cpu.cores1.7976931348623157E308
>> >>> Which looks like the maximum double number.
>> >>>
>> >>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper
>> threading. We have 37 operators so we rounded up and set 40 task slots.
>> >>>
>> >>> Here is our configuration
>> >>>
>> >>> "vmArgs": "-Xmx16g -Xms16g -XX:MaxDirectMemorySize=1207959552
>> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/tmp/flink.log
>> -Dtaskmanager.memory.framework.off-heap.size=134217728b
>> -Dtaskmanager.memory.network.max=1073741824b
>> -Dtaskmanager.memory.network.min=1073741824b
>> -Dtaskmanager.memory.framework.heap.size=134217728b
>> -Dtaskmanager.memory.managed.size=6335076856b
>> -Dtaskmanager.memory.task.heap.size=8160437768b
>> -Dtaskmanager.memory.task.off-heap.size=0b
>> -Dtaskmanager.numberOfTaskSlots=40 -Dtaskmanager.cpu.cores=16.0"
>> >>>
>> >>> We then tried with -Dtaskmanager.cpu.cores=7.0 and still ended up
>> with that very odd value for cpu cores.
>> >>>
>> >>> How do we correctly adjust this?
>> >>>
>> >>> Thanks!
>> >>> --
>> >>>
>> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
>> >>>
>> >>>
>> >>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>> >>
>> >>
>> >>
>> >> --
>> >>
>> >> Rex Fenley  |  Software Engineer - Mobile and Backend
>> >>
>> >>
>> >> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>>
>


Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Xintong Song
Hi Rex,

We're running this in a local environment so that may be contributing to
> what we're seeing.
>
Just to double check on this. By `local environment`, you mean running
flink without setting up a standalone cluster or submitting it to a
K8s/Yarn cluster? (Typically executing from an IDE, running `flink run -t
local`, or running your own application that calls
`ExecutionEnvironment#execute`).
If yes, then this is kind of expected.

A couple of things that might help you understand this.

   - Running on a local environment means setting up the Flink cluster
   within the current process (the IDE process if executed from an IDE, the
   flink client process if using `flink run -t local`, or your own application
   process). That also means most of the resource configurations cannot take
   effect, because the resources of the JVM are already determined. Please
   refer to the memory configuration documents for options that still take
   effect in local execution. [1][2]
   - David is correct that `taskmanager.cpu.cores` is only intended for
   internal usages. I assume you learnt about this configuration by reading
   the source codes? If true, please also be aware that the JavaDoc
   of `TaskManagerOption#CPU_CORES` says "DO NOT USE THIS CONFIG OPTION", and
   it is also annotated with `ExcludeFromDocumentation` so that users do not
   learn this option from the documents.
   - Flink does not really control how many cpu cores it uses. However,
   when running on an external resource management system (K8s, Yarn, Mesos),
   it requires a certain amount of cpu resources for its containers/pods, and
   allows the external system to control its cpu usage. You can use the
   following configuration options to control how many cpu cores are requested
   in such cases.
  - kubernetes.jobmanager.cpu
  - kubernetes.taskmanager.cpu
  - yarn.appmaster.vcores
  - yarn.containers.vcores
  - mesos.resourcemanager.tasks.cpus


Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#local-execution
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_jobmanager.html#local-execution


On Mon, Dec 7, 2020 at 10:32 AM Yangze Guo  wrote:

> Hi, Rex,
>
> Can you share more logs for it. Did you see something like "The
> configuration option taskmanager.cpu.cores required for local
> execution is not set, setting it to" in your logs?
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Sat, Dec 5, 2020 at 6:53 PM David Anderson 
> wrote:
> >
> > taskmanager.cpu.cores is intended for internal use only -- you aren't
> meant to set this option. What happens if you leave it alone?
> >
> > Regards,
> > David
> >
> >
> > On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
> >>
> >> We're running this in a local environment so that may be contributing
> to what we're seeing.
> >>
> >> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
> >>>
> >>> Hello,
> >>>
> >>> I'm tuning flink for parallelism right now and when I look at the
> JobManager I see
> >>> taskmanager.cpu.cores1.7976931348623157E308
> >>> Which looks like the maximum double number.
> >>>
> >>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper
> threading. We have 37 operators so we rounded up and set 40 task slots.
> >>>
> >>> Here is our configuration
> >>>
> >>> "vmArgs": "-Xmx16g -Xms16g -XX:MaxDirectMemorySize=1207959552
> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/tmp/flink.log
> -Dtaskmanager.memory.framework.off-heap.size=134217728b
> -Dtaskmanager.memory.network.max=1073741824b
> -Dtaskmanager.memory.network.min=1073741824b
> -Dtaskmanager.memory.framework.heap.size=134217728b
> -Dtaskmanager.memory.managed.size=6335076856b
> -Dtaskmanager.memory.task.heap.size=8160437768b
> -Dtaskmanager.memory.task.off-heap.size=0b
> -Dtaskmanager.numberOfTaskSlots=40 -Dtaskmanager.cpu.cores=16.0"
> >>>
> >>> We then tried with -Dtaskmanager.cpu.cores=7.0 and still ended up with
> that very odd value for cpu cores.
> >>>
> >>> How do we correctly adjust this?
> >>>
> >>> Thanks!
> >>> --
> >>>
> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>>
> >>>
> >>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
> >>
> >>
> >>
> >> --
> >>
> >> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>


Re: 回复: flink-1.11.2 的 内存溢出问题

2020-11-16 Thread Xintong Song
理论上一个 TM 可以拆分成多少 slot 并没有硬性的限制,但是并不是说并发越大,性能就一定越好。
增大并发,会增加作业对内存的需求。TM 上的 slot 数量过多时,可能会造成 GC 压力大、网络内存不足、OOM 等情况。另外,同一个 TM 上的
slot 多了,运行的 task 多了,也会给框架造成一定的压力。
建议先观察一下  TM 的 cpu 使用情况,如果作业确实存在处理性能不足(延迟增大、存在反压)同时 TM container 的 cpu
(多核)利用率上不去,再考虑调大并发。

Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 10:43 AM 史 正超  wrote:

> 谢谢 Xintong
> 大神回复,看了你很多视频。顺便请教个问题,slot的内存有最小的限制吗?我想用有限的资源情况下,把taskmanager的内存slot拆分成最小,以此来达到最大并发。这种拆分有没有一个合理的范围。
> 比如 1 个TM,8G, 那它拆分的最小slot数量 有没有一个限制。
> 
> 发件人: Xintong Song 
> 发送时间: 2020年11月17日 1:53
> 收件人: user-zh 
> 主题: Re: 回复: flink-1.11.2 的 内存溢出问题
>
> >
> > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> >
>
> 不可以的,这个是集群配置。
>
> 可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Nov 17, 2020 at 9:31 AM Andrew <874269...@qq.com> wrote:
>
> > 应该是不可以这样配置的, 通过配置文件;
> > taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
> >
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> > 这种属于任务运行时配置!
> >
> >
> >
> > --原始邮件--
> > 发件人:
> >   "user-zh"
> > <
> > shizhengc...@outlook.com;
> > 发送时间:2020年11月16日(星期一) 晚上7:14
> > 收件人:"user-zh@flink.apache.org" >
> > 主题:回复: flink-1.11.2 的 内存溢出问题
> >
> >
> >
> > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size 这个参数可以通过
> > 下面代码动态设置吗?
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> >
> > ____
> > 发件人: Xintong Song  > 发送时间: 2020年11月16日 10:59
> > 收件人: user-zh  > 主题: Re: flink-1.11.2 的 内存溢出问题
> >
> > 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> > 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> > 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Nov 16, 2020 at 6:38 PM 史 正超  wrote:
> >
> >  flink-on-yarn . per-job模式,重启是kafka的group.id
> >  没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> >  
> >  发件人: Xintong Song  >  发送时间: 2020年11月16日 10:11
> >  收件人: user-zh  >  主题: Re: flink-1.11.2 的 内存溢出问题
> > 
> >  是什么部署模式呢?standalone?
> >  之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> > 
> >  Thank you~
> > 
> >  Xintong Song
> > 
> > 
> > 
> >  On Mon, Nov 16, 2020 at 5:53 PM 史 正超  > wrote:
> > 
> >   使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> >  
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> >  
> >   2020-11-16 17:44:52
> >   java.lang.OutOfMemoryError: Direct buffer memory. The direct
> >  out-of-memory
> >   error has occurred. This can mean two things: either job(s)
> > require(s) a
> >   larger size of JVM direct memory or there is a direct memory
> > leak. The
> >   direct memory can be allocated by user code or some of its
> > dependencies.
> >  In
> >   this case 'taskmanager.memory.task.off-heap.size' configuration
> > option
> >   should be increased. Flink framework and its dependencies also
> > consume
> >  the
> >   direct memory, mostly for network communication. The most of
> > network
> >  memory
> >   is managed by Flink and should not result in out-of-memory
> > error. In
> >   certain special cases, in particular for jobs with high
> > parallelism, the
> >   framework may require more direct memory which is not managed
> by
> > Flink.
> >  In
> >   this case 'taskmanager.memory.framework.off-heap.size'
> > configuration
> >  option
> >   should be increased. If the error persists then there is
> > probably a
> >  direct
> >   memory leak in user code or some of its dependencies which has
> > to be
> >   investigated and fixed. The task executor has to be shutdown...
> >   at
> > java.nio.Bits.reserveMemory(Bits.java:658)
> >   at
> > java.nio.DirectByteBuffer. >   at
> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >   at
> > sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> >   at
> > sun.nio.ch.IOUtil.read(IOUtil.java:195)
> >   at
> > sun.nio.ch.SocketChannelImpl.read(

Re: 回复: flink-1.11.2 的 内存溢出问题

2020-11-16 Thread Xintong Song
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>

不可以的,这个是集群配置。

可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。


Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 9:31 AM Andrew <874269...@qq.com> wrote:

> 应该是不可以这样配置的, 通过配置文件;
> taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
>
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
> 这种属于任务运行时配置!
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> shizhengc...@outlook.com;
> 发送时间:2020年11月16日(星期一) 晚上7:14
> 收件人:"user-zh@flink.apache.org"
> 主题:回复: flink-1.11.2 的 内存溢出问题
>
>
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size 这个参数可以通过
> 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>
> 
> 发件人: Xintong Song  发送时间: 2020年11月16日 10:59
> 收件人: user-zh  主题: Re: flink-1.11.2 的 内存溢出问题
>
> 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 6:38 PM 史 正超 
>  flink-on-yarn . per-job模式,重启是kafka的group.id
>  没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
>  
>  发件人: Xintong Song   发送时间: 2020年11月16日 10:11
>  收件人: user-zh   主题: Re: flink-1.11.2 的 内存溢出问题
> 
>  是什么部署模式呢?standalone?
>  之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> 
>  Thank you~
> 
>  Xintong Song
> 
> 
> 
>  On Mon, Nov 16, 2020 at 5:53 PM 史 正超  wrote:
> 
>   使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
>  
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
>  
>   2020-11-16 17:44:52
>   java.lang.OutOfMemoryError: Direct buffer memory. The direct
>  out-of-memory
>   error has occurred. This can mean two things: either job(s)
> require(s) a
>   larger size of JVM direct memory or there is a direct memory
> leak. The
>   direct memory can be allocated by user code or some of its
> dependencies.
>  In
>   this case 'taskmanager.memory.task.off-heap.size' configuration
> option
>   should be increased. Flink framework and its dependencies also
> consume
>  the
>   direct memory, mostly for network communication. The most of
> network
>  memory
>   is managed by Flink and should not result in out-of-memory
> error. In
>   certain special cases, in particular for jobs with high
> parallelism, the
>   framework may require more direct memory which is not managed by
> Flink.
>  In
>   this case 'taskmanager.memory.framework.off-heap.size'
> configuration
>  option
>   should be increased. If the error persists then there is
> probably a
>  direct
>   memory leak in user code or some of its dependencies which has
> to be
>   investigated and fixed. The task executor has to be shutdown...
>   at
> java.nio.Bits.reserveMemory(Bits.java:658)
>   at
> java.nio.DirectByteBuffer.   at
> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>   at
> sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>   at
> sun.nio.ch.IOUtil.read(IOUtil.java:195)
>   at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>   at
>  
> 
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
>   at
>  
> 
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
>   at
>  
> 
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
>   at
>  
> 
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
>   at
>  
> 
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
>   at
>  
> 
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
>   at
>  
> 
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
>   at
>  
> 
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
>   at
>  
> 
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.interna

Re: flink-1.11.2 的 内存溢出问题

2020-11-16 Thread Xintong Song
是什么部署模式呢?standalone?
之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 5:53 PM 史 正超  wrote:

> 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
>
> 2020-11-16 17:44:52
> java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory
> error has occurred. This can mean two things: either job(s) require(s) a
> larger size of JVM direct memory or there is a direct memory leak. The
> direct memory can be allocated by user code or some of its dependencies. In
> this case 'taskmanager.memory.task.off-heap.size' configuration option
> should be increased. Flink framework and its dependencies also consume the
> direct memory, mostly for network communication. The most of network memory
> is managed by Flink and should not result in out-of-memory error. In
> certain special cases, in particular for jobs with high parallelism, the
> framework may require more direct memory which is not managed by Flink. In
> this case 'taskmanager.memory.framework.off-heap.size' configuration option
> should be increased. If the error persists then there is probably a direct
> memory leak in user code or some of its dependencies which has to be
> investigated and fixed. The task executor has to be shutdown...
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
>
>
>


Re: Flink AutoScaling EMR

2020-11-15 Thread Xintong Song
>
> Is there a way to make the new yarn job only on the new hardware?

I think you can simply decommission the nodes from Yarn, so that new
containers will not be allocated from those nodes. You might also need a
large decommission timeout, upon which all the remaining running contains
on the decommissioning node will be killed.

Thank you~

Xintong Song



On Fri, Nov 13, 2020 at 2:57 PM Robert Metzger  wrote:

> Hi,
> it seems that YARN has a feature for targeting specific hardware:
> https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
> In any case, you'll need enough spare resources for some time to be able
> to run your job twice for this kind of "zero downtime handover"
>
> On Thu, Nov 12, 2020 at 6:10 PM Rex Fenley  wrote:
>
>> Awesome, thanks! Is there a way to make the new yarn job only on the new
>> hardware? Or would the two jobs have to run on intersecting hardware and
>> then would be switched on/off, which means we'll need a buffer of resources
>> for our orchestration?
>>
>> Also, good point on recovery. I'll spend some time looking into this.
>>
>> Thanks
>>
>>
>> On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger 
>> wrote:
>>
>>> Hey Rex,
>>>
>>> the second approach (spinning up a standby job and then doing a
>>> handover) sounds more promising to implement, without rewriting half of the
>>> Flink codebase ;)
>>> What you need is a tool that orchestrates creating a savepoint, starting
>>> a second job from the savepoint and then communicating with a custom sink
>>> implementation that can be switched on/off in the two jobs.
>>> With that approach, you should have almost no downtime, just increased
>>> resource requirements during such a handover.
>>>
>>> What you need to consider as well is that this handover process only
>>> works for scheduled maintenance. If you have a system failure, you'll have
>>> downtime until the last checkpoint is restored.
>>> If you are trying to reduce the potential downtime overall, I would
>>> rather recommend optimizing the checkpoint restore time, as this can cover
>>> both scheduled maintenance and system failures.
>>>
>>> Best,
>>> Robert
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley  wrote:
>>>
>>>> Another thought, would it be possible to
>>>> * Spin up new core or task nodes.
>>>> * Run a new copy of the same job on these new nodes from a savepoint.
>>>> * Have the new job *not* write to the sink until the other job is torn
>>>> down?
>>>>
>>>> This would allow us to be eventually consistent and maintain writes
>>>> going through without downtime. As long as whatever is buffering the sink
>>>> doesn't run out of space it should work just fine.
>>>>
>>>> We're hoping to achieve consistency in less than 30s ideally.
>>>>
>>>> Again though, if we could get savepoints to restore in less than 30s
>>>> that would likely be sufficient for our purposes.
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm trying to find a solution for auto scaling our Flink EMR cluster
>>>>> with 0 downtime using RocksDB as state storage and S3 backing store.
>>>>>
>>>>> My current thoughts are like so:
>>>>> * Scaling an Operator dynamically would require all keyed state to be
>>>>> available to the set of subtasks for that operator, therefore a set of
>>>>> subtasks must be reading to and writing from the same RocksDB. I.e. to
>>>>> scale in and out subtasks in that set, they need to read from the same
>>>>> Rocks.
>>>>> * Since subtasks can run on different core nodes, is it possible to
>>>>> have different core nodes read/write to the same RocksDB?
>>>>> * When's the safe point to scale in and out an operator? Only right
>>>>> after a checkpoint possibly?
>>>>>
>>>>> If the above is not possible then we'll have to use save points which
>>>>> means some downtime, therefore:
>>>>> * Scaling out during high traffic is arguably more important to react
>>>>> quickly to than scaling in during low traffic. Is it possible to add more
>>>>> core nodes to EMR without disturb

  1   2   3   4   >