Re: Is RestClusterClient recommended?

2023-11-22 Thread Chesnay Schepler
Don't use the RestClusterClient; you can generate one from the openapi 
spec (see the docs).


On 16/11/2023 20:36, Adrian Alexandru Vasiliu wrote:

Hello,

For a programmatic use in Java of the Flink REST API, which of these 
options would be the best choice?


 1. Direct use via a REST client
 2. The RestClusterClient


 API

RestClusterClient is appealing, because it embeds a bunch of code that 
we wouldn't need to write and maintain.


But I also see reasons to stay away from it:

 1. Its javadoc
 
can
be found by searching the web (versioned per version of Flink /
flink-clients), but I didn't find a user documentation.
https://nightlies.apache.org/flink/flink-docs-stable/
 doesn't
seem to mention RestClusterClient.
 2. Its API isn't marked @Public nor @PublicEvolving.
3.
In 2019, Till Rohrmann (Flink PMC member) wrote: "Flink's cluster
REST API has been designed to work with any REST client. The
RestClusterClient which comes with the flink-clients module is
intended for internal usage."
https://stackoverflow.com/a/56127387/1723384

4.
If at a later time we would bring authentication to the Flink REST
API, say via nginx proxy side-car, RestClusterClient wouldn't know
how to deal with it.

Do I miss something?
Would the community nowadays recommend using RestClusterClient, at 
least in situations without authentication?


Thanks,
Adrian
Unless otherwise stated above:

Compagnie IBM France
Siège Social : 17, avenue de l'Europe, 92275 Bois-Colombes Cedex
RCS Nanterre 552 118 465
Forme Sociale : S.A.S.
Capital Social : 664 069 390,60 €
SIRET : 552 118 465 03644 - Code NAF 6203Z


Re: Metrics with labels

2023-10-17 Thread Chesnay Schepler

> I think this is a general issue with the Flink metrics.

Not quite. There are a few instance in Flink were code wasn't updated to 
encode metadata as additional labels, and the RocksDB metrics may be one 
of them.
Also for RocksDB, you could try setting 
"state.backend.rocksdb.metrics.column-family-as-variable: true" to 
resolve this particular problem.


> If I define a custom metric, it is not supported to use labels

You can do so via MetricGroup#addGroup(String key, String value).
See 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#user-variables


On 17/10/2023 14:31, Lars Skjærven wrote:

Hello,

We're experiencing difficulties in using Flink metrics in a generic 
way since various properties are included in the name of the metric 
itself. This makes it difficult to generate sensible (and general) 
dashboards (with aggregations).


One example is the metric for rocksdb estimated live data size 
(state.backend.rocksdb.metrics.estimate-live-data-size). the name 
appears as : 
flink_taskmanager_job_task_operator__state_rocksdb_estimate_live_data_size 
.


If, on the other hand, the state name was included as label, this 
would facilitate aggregation across states, i.e.:

flink_taskmanager_job_task_operator_state_rocksdb_estimate_live_data_size{state_descriptor="my_state_descriptor"}

I think this is a general issue with the Flink metrics. If I define a 
custom metric, it is not supported to use labels 
(https://prometheus.io/docs/practices/naming/#labels) in a dynamic way.


Thanks !

Lars



Re: Java 21 for flink

2023-07-10 Thread Chesnay Schepler

At this time, no.

On 08/07/2023 04:00, Prasanna kumar wrote:

Hi all,

Java 21 plans to support light weight thread called fiber based on 
Project LOOM which will increase the concurrency to great extent.


Is there any plan for flink to leverage it?

Thanks,
Prasanna.





Re: Changing vertex parallelism

2023-07-04 Thread Chesnay Schepler

On 03/07/2023 19:13, Nikolaos Paraskakis wrote:

Is there any way changing job vertex parallelism during runtime (downtime 
included)? For example, via REST API?


At this time, no.

1.18.0 will ship with FLIP-291 
, 
which allows you to change a jobs parallelism IF you are using the 
adaptive scheduler 
.


Re: Flink migration 1.15 to 1.17 - Os version change

2023-06-19 Thread Chesnay Schepler
The openjdk images are deprecated, which prevented us from releasing our 
docker images via some distribution channels.


https://issues.apache.org/jira/browse/FLINK-29137

When we switched to the Temurin images we didn't have to change a whole 
lot, so you might be able to reconstruct a openjdk-based image for Flink 
1.17.

https://github.com/apache/flink-docker/commit/3c259f46231b97202925a111a8205193c15bbf78

In theory you should be able to take the existing Dockerfile 
and 
change the base without having to change anything else.


On 19/06/2023 11:11, Arek Dankiewicz wrote:

Hello All,
We recently wanted to migrate flink docker image version from 1.15.1 
to 1.17.1 and encountered a problem with our infrastructure not being 
able to correctly handle an app built on Ubuntu 22+ due to an older 
version of the docker.
Unfortunately, this is out of our control, and we would like to change 
the os on which the flink 1.17 image is built from 
eclipse-temurin:11-jre-jammy to e.g. openjdk:11.


I would like to know whether the change to debian for version 1.17 is 
a disruptive change and what the OS change between 1.15 and 1.16 was 
caused by.


Kindest regards,
Arkadiusz




Re: [DISCUSS] Status of Statefun Project

2023-06-06 Thread Chesnay Schepler
If you were to fork it /and want to redistribute it/ then the short 
version is that


1. you have to adhere to the Apache licensing requirements
2. you have to make it clear that your fork does not belong to the
   Apache Flink project. (Trademarks and all that)

Neither should be significant hurdles (there should also be plenty of 
online resources regarding 1), and if you do this then you can freely 
share your fork with others.


I've also pinged Martijn to take a look at this thread.
To my knowledge the project hasn't decided anything yet.

On 27/05/2023 04:05, Galen Warren wrote:

Ok, I get it. No interest.

If this project is being abandoned, I guess I'll work with my own fork. Is
there anything I should consider here? Can I share it with other people who
use this project?

On Tue, May 16, 2023 at 10:50 AM Galen Warren
wrote:


Hi Martijn, since you opened this discussion thread, I'm curious what your
thoughts are in light of the responses? Thanks.

On Wed, Apr 19, 2023 at 1:21 PM Galen Warren
wrote:


I use Apache Flink for stream processing, and StateFun as a hand-off

point for the rest of the application.
It serves well as a bridge between a Flink Streaming job and
micro-services.


This is essentially how I use it as well, and I would also be sad to see
it sunsetted. It works well; I don't know that there is a lot of new
development required, but if there are no new Statefun releases, then
Statefun can only be used with older Flink versions.

On Tue, Apr 18, 2023 at 10:04 PM Marco Villalobos <
mvillalo...@kineteque.com> wrote:


I am currently using Stateful Functions in my application.

I use Apache Flink for stream processing, and StateFun as a hand-off
point for the rest of the application.
It serves well as a bridge between a Flink Streaming job and
micro-services.

I would be disappointed if StateFun was sunsetted.  Its a good idea.

If there is anything I can do to help, as a contributor perhaps, please
let me know.


On Apr 3, 2023, at 2:02 AM, Martijn Visser

wrote:

Hi everyone,

I want to open a discussion on the status of the Statefun Project [1]

in Apache Flink. As you might have noticed, there hasn't been much
development over the past months in the Statefun repository [2]. There is
currently a lack of active contributors and committers who are able to help
with the maintenance of the project.

In order to improve the situation, we need to solve the lack of

committers and the lack of contributors.

On the lack of committers:

1. Ideally, there are some of the current Flink committers who have

the bandwidth and can help with reviewing PRs and merging them.

2. If that's not an option, it could be a consideration that current

committers only approve and review PRs, that are approved by those who are
willing to contribute to Statefun and if the CI passes

On the lack of contributors:

3. Next to having this discussion on the Dev and User mailing list, we

can also create a blog with a call for new contributors on the Flink
project website, send out some tweets on the Flink / Statefun twitter
accounts, post messages on Slack etc. In that message, we would inform how
those that are interested in contributing can start and where they could
reach out for more information.

There's also option 4. where a group of interested people would split

Statefun from the Flink project and make it a separate top level project
under the Apache Flink umbrella (similar as recently has happened with
Flink Table Store, which has become Apache Paimon).

If we see no improvements in the coming period, we should consider

sunsetting Statefun and communicate that clearly to the users.

I'm looking forward to your thoughts.

Best regards,

Martijn

[1]https://nightlies.apache.org/flink/flink-statefun-docs-master/  <

https://nightlies.apache.org/flink/flink-statefun-docs-master/>

[2]https://github.com/apache/flink-statefun  <

https://github.com/apache/flink-statefun>



Re: StatsdMetricsReporter is emitting all metric types as gauges

2023-05-12 Thread Chesnay Schepler
nit: Whether the Flink counter abstraction is cumulative or not is 
irrelevant w.r.t. what we send to metric backends.


For example, for Datadog we do send count diffs instead of the current 
total count.


On 10/05/2023 09:33, Hang Ruan wrote:

Hi, Iris,

The Flink counter is cumulative. There are `inc` and `dec` methods in it.
As the value of the counter has been calculated in Flink, we do not 
need use the counter metric in statsd to calculate.


Best,
Hang

Iris Grace Endozo  于2023年5月10日周三 14:53写道:

Hey Hang,

Thanks for the prompt response. Does this mean the Flink counters
emitted via statsd are cumulative then? From the spec
https://github.com/b/statsd_spec#counters


> A counter is a gauge calculated at the server. Metrics sent by
the client increment or decrement the value of the gauge rather
than giving its current value.

This means that counters are not monotonic and work like deltas
that are aggregated on the server side.

Cheers, Iris.

--

*Iris Grace Endozo,* Senior Software Engineer
*M *+61 435 108 697
*E* iris.end...@gmail.com
On 10 May 2023 at 1:02 PM +1000, Hang Ruan
, wrote:

Hi, Iris,

The metrics have already be calculated  in Flink. So we only need
to report these metric as the gauges.
For example, the metric `metricA` is a Flink counter and is
increased from 1 to 2. The statsd gauge will be 2 now. If we
register it as a statsd counter, we will send 1 and 2 to the
statsd counter. The statsd counter will be 3, which is a wrong
result.

Best,
Hang

Iris Grace Endozo  于2023年5月9日周二
19:19写道:

Hey folks trying to troubleshoot why counter metrics are
appearing as gauges on my end. Is it expected that the
StatsdMetricsReporter is reporting gauges for counters as well?

Looking at this one:

https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L207:
the statsd specifications state that counters need to be
reported as :|c[|@] but it
seems it's defaulting to "%s:%s|g" in the above. Ref:
https://github.com/b/statsd_spec#counters

Wondering if anyone else has hit this issue or there's an
existing issue?

Cheers, Iris.

--

*Iris Grace Endozo,* Senior Software Engineer
*M *+61 435 108 697
*E* iris.end...@gmail.com



Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)

2023-04-28 Thread Chesnay Schepler
We don't know yet. I wanted to run some more experiments to see if I 
cant get Scala 2.12.7 working on Java 17.


If that doesn't work, then it would also be an option to bump Scala in 
the Java 17 builds (breaking savepoint compatibility), and users should 
just only use the Java APIs.


The alternative to _that_ is doing this when we drop the Scala API.

On 28/04/2023 01:11, Thomas Weise wrote:
Is the intention to bump the Flink major version and only support Java 
17+? If so, can Scala not be upgraded at the same time?


Thanks,
Thomas


On Thu, Apr 27, 2023 at 4:53 PM Martijn Visser 
 wrote:


Scala 2.12.7 doesn't compile on Java 17, see
https://issues.apache.org/jira/browse/FLINK-25000.

On Thu, Apr 27, 2023 at 3:11 PM Jing Ge  wrote:

> Thanks Tamir for the information. According to the latest
comment of the
> task FLINK-24998, this bug should be gone while using the latest
JDK 17. I
> was wondering whether it means that there are no more issues to
stop us
> releasing a major Flink version to support Java 17? Did I miss
something?
>
> Best regards,
> Jing
>
> On Thu, Apr 27, 2023 at 8:18 AM Tamir Sagi

> wrote:
>
>> More details about the JDK bug here
>> https://bugs.openjdk.org/browse/JDK-8277529
>>
>> Related Jira ticket
>> https://issues.apache.org/jira/browse/FLINK-24998
>>
>> --
>> *From:* Jing Ge via user 
>> *Sent:* Monday, April 24, 2023 11:15 PM
>> *To:* Chesnay Schepler 
>> *Cc:* Piotr Nowojski ; Alexis
Sarda-Espinosa <
>> sarda.espin...@gmail.com>; Martijn Visser
;
>> d...@flink.apache.org ; user

>> *Subject:* Re: [Discussion] - Release major Flink version to
support JDK
>> 17 (LTS)
>>
>>
>> *EXTERNAL EMAIL*
>>
>>
>> Thanks Chesnay for working on this. Would you like to share
more info
>> about the JDK bug?
>>
>> Best regards,
>> Jing
>>
>> On Mon, Apr 24, 2023 at 11:39 AM Chesnay Schepler

>> wrote:
>>
>> As it turns out Kryo isn't a blocker; we ran into a JDK bug.
>>
>> On 31/03/2023 08:57, Chesnay Schepler wrote:
>>
>>
>>

https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5#migration-guide
>>
>> Kroy themselves state that v5 likely can't read v2 data.
>>
>> However, both versions can be on the classpath without
classpath as v5
>> offers a versioned artifact that includes the version in the
package.
>>
>> It probably wouldn't be difficult to migrate a savepoint to
Kryo v5,
>> purely from a read/write perspective.
>>
>> The bigger question is how we expose this new Kryo version in
the API. If
>> we stick to the versioned jar we need to either duplicate all
current
>> Kryo-related APIs or find a better way to integrate other
serialization
>> stacks.
>> On 30/03/2023 17:50, Piotr Nowojski wrote:
>>
>> Hey,
>>
>> > 1. The Flink community agrees that we upgrade Kryo to a later
version,
>> which means breaking all checkpoint/savepoint compatibility and
releasing a
>> Flink 2.0 with Java 17 support added and Java 8 and Flink Scala
API support
>> dropped. This is probably the quickest way, but would still
mean that we
>> expose Kryo in the Flink APIs, which is the main reason why we
haven't been
>> able to upgrade Kryo at all.
>>
>> This sounds pretty bad to me.
>>
>> Has anyone looked into what it would take to provide a smooth
migration
>> from Kryo2 -> Kryo5?
>>
>> Best,
>> Piotrek
>>
>> czw., 30 mar 2023 o 16:54 Alexis Sarda-Espinosa

>> napisał(a):
>>
>> Hi Martijn,
>>
>> just to be sure, if all state-related classes use a POJO
serializer, Kryo
>> will never come into play, right? Given FLINK-16686 [1], I
wonder how many
>> users actually have jobs with Kryo and RocksDB, but even if
there aren't
>> many, that still leaves those who don't use RocksDB for
>> checkpoints/savepoints.
>>
>> If Kryo were to stay in the Flink APIs in v1.X, is it
impossible to let
>> users choose between v2/v5 jars by separating them like log4j2
jars?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16686
>>
>> Regards,
 

Re: Can I setup standby taskmanagers while using reactive mode?

2023-04-26 Thread Chesnay Schepler
Reactive mode doesn't support standby taskmanagers. As you said it 
always uses all available resources in the cluster.


I can see it being useful though to not always scale to MAX but (MAX - 
some_offset).


I'd suggest to file a ticket.

On 26/04/2023 00:17, Wei Hou via user wrote:

Hi Flink community,

We are trying to use Flink’s reactive mode with Kubernetes HPA for autoscaling, 
however since the reactive mode will always use all available resources, it 
causes a problem when we need standby task managers for fast failure recover: 
The job will always use these extra standby task managers as active task 
manager to process data.

I wonder if you have any suggestion on this, should we avoid using Flink 
reactive mode together with standby task managers?

Best,
Wei






Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)

2023-04-24 Thread Chesnay Schepler

As it turns out Kryo isn't a blocker; we ran into a JDK bug.

On 31/03/2023 08:57, Chesnay Schepler wrote:

https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5#migration-guide

Kroy themselves state that v5 likely can't read v2 data.

However, both versions can be on the classpath without classpath as v5 
offers a versioned artifact that includes the version in the package.


It probably wouldn't be difficult to migrate a savepoint to Kryo v5, 
purely from a read/write perspective.


The bigger question is how we expose this new Kryo version in the API. 
If we stick to the versioned jar we need to either duplicate all 
current Kryo-related APIs or find a better way to integrate other 
serialization stacks.


On 30/03/2023 17:50, Piotr Nowojski wrote:

Hey,

> 1. The Flink community agrees that we upgrade Kryo to a later 
version, which means breaking all checkpoint/savepoint compatibility 
and releasing a Flink 2.0 with Java 17 support added and Java 8 and 
Flink Scala API support dropped. This is probably the quickest way, 
but would still mean that we expose Kryo in the Flink APIs, which is 
the main reason why we haven't been able to upgrade Kryo at all.


This sounds pretty bad to me.

Has anyone looked into what it would take to provide a smooth 
migration from Kryo2 -> Kryo5?


Best,
Piotrek

czw., 30 mar 2023 o 16:54 Alexis Sarda-Espinosa 
 napisał(a):


Hi Martijn,

just to be sure, if all state-related classes use a POJO
serializer, Kryo will never come into play, right? Given
FLINK-16686 [1], I wonder how many users actually have jobs with
Kryo and RocksDB, but even if there aren't many, that still
leaves those who don't use RocksDB for checkpoints/savepoints.

If Kryo were to stay in the Flink APIs in v1.X, is it impossible
to let users choose between v2/v5 jars by separating them like
log4j2 jars?

[1] https://issues.apache.org/jira/browse/FLINK-16686

Regards,
Alexis.

Am Do., 30. März 2023 um 14:26 Uhr schrieb Martijn Visser
:

Hi all,

I also saw a thread on this topic from Clayton Wohl [1] on
this topic, which I'm including in this discussion thread to
avoid that it gets lost.

From my perspective, there's two main ways to get to Java 17:

1. The Flink community agrees that we upgrade Kryo to a later
version, which means breaking all checkpoint/savepoint
compatibility and releasing a Flink 2.0 with Java 17 support
added and Java 8 and Flink Scala API support dropped. This is
probably the quickest way, but would still mean that we
expose Kryo in the Flink APIs, which is the main reason why
we haven't been able to upgrade Kryo at all.
2. There's a contributor who makes a contribution that bumps
Kryo, but either a) automagically reads in all old
checkpoints/savepoints in using Kryo v2 and writes them to
new snapshots using Kryo v5 (like is mentioned in the Kryo
migration guide [2][3] or b) provides an offline tool that
allows users that are interested in migrating their snapshots
manually before starting from a newer version. That
potentially could prevent the need to introduce a new Flink
major version. In both scenarios, ideally the contributor
would also help with avoiding the exposure of Kryo so that we
will be in a better shape in the future.

It would be good to get the opinion of the community for
either of these two options, or potentially for another one
that I haven't mentioned. If it appears that there's an
overall agreement on the direction, I would propose that a
FLIP gets created which describes the entire process.

Looking forward to the thoughts of others, including the
Users (therefore including the User ML).

Best regards,

Martijn

[1]
https://lists.apache.org/thread/qcw8wy9dv8szxx9bh49nz7jnth22p1v2
[2]
https://lists.apache.org/thread/gv49jfkhmbshxdvzzozh017ntkst3sgq
[3] https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5

On Sun, Mar 19, 2023 at 8:16 AM Tamir Sagi
 wrote:

I agree, there are several options to mitigate the
migration from v2 to v5.
yet, Oracle roadmap is to end JDK 11 support in September
this year.




From: ConradJam 
Sent: Thursday, March 16, 2023 4:36 AM
To: d...@flink.apache.org 
Subject: Re: [Discussion] - Release major Flink version
to support JDK 17 (LTS)

EXTERNAL EMAIL



Thanks for your start this discuss


I have been tracking this problem for a long time, until
I saw a
conversation in ISSUSE a few days ago and learned that
the Kryo v

Re: [Discussion] - Take findify/flink-scala-api under Flink umbrella

2023-04-17 Thread Chesnay Schepler

> they require additional hop to serialize Scala objects

This doesn't necessarily mean that we need a Scala API, because a beefed 
up type extraction could also solve this.


> This single committer is now with us and ready to maintain it in open 
source. The best situation to be :-)


Have you considered maintaining the wrappers as part of flink-extended? 
https://github.com/flink-extended


On 17/04/2023 09:45, Alexey Novakov via user wrote:

Hi Günter, David,

Let me reply to you both in one email. First of all, thank you for 
engaging.


Günter:
- I fully agree that losing Scala API as officially supported in Flink 
would be very unfortunate. Future of Scala is interesting and will 
bring more benefits to Flink users.


Just to remind everyone, Flink Scala users can't only use Java API, 
they require additional hop to serialize Scala objects. This is one of 
the reasons why Flink still has Scala API (2.11) and why a few more 
3-rd party wrappers appeared to support newer versions of Scala when 
it became possible.


David:
Let me address your concerns.

1. It is indeed not a very active project. This is exactly the reason, 
I want to save https://github.com/findify/flink-scala-api from dying, 
because it is quite a good library to work with. Our Idea is to hit 
two targets: get a newer/official Scala API for Flink and do not let 
the 3rd-party (currently) library to sink. I use this library for 
daily work.
2. It works for Flink 1.15, support of Flink 1.16. requires just 
publishing a new version. I guess it is a one line change in the 
build.sbt file. Will see if more changes would be needed. I think the 
nature of changes will be similar like in StateFun, i.e. adopt to 
breaking changes of public methods and/or switch from deprecated 
methods to newer alternatives. Migrating further should not be a 
problem. Again, Scala API is supposed to be a thin wrapper on top of 
Java API, so that it is not labour-intensive
3. That single person left Findify (Roman) and they did not pay much 
attention to it. Actually, there is no other better alternative for 
Scala wrapper currently. This single committer is now with us and 
ready to maintain it in open source. The best situation to be :-)
4/5. Yes, same as #1. You can see some PRs in the queue from a Scala 
bot, but Findify does not merge them. The library is so small and 
covers most of the needs that additional changes are not yet 
identified/needed. I agree this could be a sign that few people are 
using it.


I have no idea which companies or users use this library. Is it 
really important to know? I just want to provide proper substitution 
to guarantee Scala is used further with Flink.
I know that the official Scala API was used or still used by world 
known enterprises.


Thank you for your suggestion. I have included dev ML in the original 
email. Let me try to come up with a more detailed plan.


Among maintainers you will get me, Roman (main dev of 
https://github.com/findify/flink-scala-api) and maybe Günter.


What is the downside or loss if we import this library into the Flink 
and in a few years nobody will use it? I guess we'll just depreciate it?
I just propose my free time to maintain that. As per Roman, required 
work to maintain the library is very simple.


Best regards,
Alexey

On Sun, Apr 16, 2023 at 11:46 AM David Morávek  wrote:

cc dev@f.a.o

On Sun, Apr 16, 2023 at 11:42 AM David Morávek 
wrote:

Hi Alexey,

I'm a bit skeptical because, looking at the project, I see a
couple of red flags:

- The project is inactive. The last release and commit are
both from the last May.
- The project has not been adapted for the last two Flink
versions, which signals a lack of users.
- All commits are by a single person, which could mean that
there is no community around the project.
- There was no external contribution (except the Scala bot).
- There is no fork of the project (except the Scala bot).

>  As I know, FIndify does not want or cannot maintain this
library.

Who are the users of the library? I'd assume Findify no longer
uses it if they're abandoning it.

> which would be similar to the StateFun

We're currently dealing with a lack of maintainers for
StateFun, so we should have a solid building ground around the
project to avoid the same issue.


I think there is value in having a modern Scala API, but we
should have a bigger plan to address the future of Flink Scala
APIs than importing an unmaintained library and calling it a
day. I suggest starting a thread on the dev ML and concluding
the overall plan first.

Best,
D.

On Sun, Apr 16, 2023 at 10:48 AM guenterh.lists
 wrote:

Hello Alexey

Thank you for your initiative and your suggestion!

I can only fully support 

Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)

2023-03-31 Thread Chesnay Schepler

https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5#migration-guide

Kroy themselves state that v5 likely can't read v2 data.

However, both versions can be on the classpath without classpath as v5 
offers a versioned artifact that includes the version in the package.


It probably wouldn't be difficult to migrate a savepoint to Kryo v5, 
purely from a read/write perspective.


The bigger question is how we expose this new Kryo version in the API. 
If we stick to the versioned jar we need to either duplicate all current 
Kryo-related APIs or find a better way to integrate other serialization 
stacks.


On 30/03/2023 17:50, Piotr Nowojski wrote:

Hey,

> 1. The Flink community agrees that we upgrade Kryo to a later 
version, which means breaking all checkpoint/savepoint compatibility 
and releasing a Flink 2.0 with Java 17 support added and Java 8 and 
Flink Scala API support dropped. This is probably the quickest way, 
but would still mean that we expose Kryo in the Flink APIs, which is 
the main reason why we haven't been able to upgrade Kryo at all.


This sounds pretty bad to me.

Has anyone looked into what it would take to provide a smooth 
migration from Kryo2 -> Kryo5?


Best,
Piotrek

czw., 30 mar 2023 o 16:54 Alexis Sarda-Espinosa 
 napisał(a):


Hi Martijn,

just to be sure, if all state-related classes use a POJO
serializer, Kryo will never come into play, right? Given
FLINK-16686 [1], I wonder how many users actually have jobs with
Kryo and RocksDB, but even if there aren't many, that still leaves
those who don't use RocksDB for checkpoints/savepoints.

If Kryo were to stay in the Flink APIs in v1.X, is it impossible
to let users choose between v2/v5 jars by separating them like
log4j2 jars?

[1] https://issues.apache.org/jira/browse/FLINK-16686

Regards,
Alexis.

Am Do., 30. März 2023 um 14:26 Uhr schrieb Martijn Visser
:

Hi all,

I also saw a thread on this topic from Clayton Wohl [1] on
this topic, which I'm including in this discussion thread to
avoid that it gets lost.

From my perspective, there's two main ways to get to Java 17:

1. The Flink community agrees that we upgrade Kryo to a later
version, which means breaking all checkpoint/savepoint
compatibility and releasing a Flink 2.0 with Java 17 support
added and Java 8 and Flink Scala API support dropped. This is
probably the quickest way, but would still mean that we expose
Kryo in the Flink APIs, which is the main reason why we
haven't been able to upgrade Kryo at all.
2. There's a contributor who makes a contribution that bumps
Kryo, but either a) automagically reads in all old
checkpoints/savepoints in using Kryo v2 and writes them to new
snapshots using Kryo v5 (like is mentioned in the Kryo
migration guide [2][3] or b) provides an offline tool that
allows users that are interested in migrating their snapshots
manually before starting from a newer version. That
potentially could prevent the need to introduce a new Flink
major version. In both scenarios, ideally the contributor
would also help with avoiding the exposure of Kryo so that we
will be in a better shape in the future.

It would be good to get the opinion of the community for
either of these two options, or potentially for another one
that I haven't mentioned. If it appears that there's an
overall agreement on the direction, I would propose that a
FLIP gets created which describes the entire process.

Looking forward to the thoughts of others, including the Users
(therefore including the User ML).

Best regards,

Martijn

[1]
https://lists.apache.org/thread/qcw8wy9dv8szxx9bh49nz7jnth22p1v2
[2]
https://lists.apache.org/thread/gv49jfkhmbshxdvzzozh017ntkst3sgq
[3] https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5

On Sun, Mar 19, 2023 at 8:16 AM Tamir Sagi
 wrote:

I agree, there are several options to mitigate the
migration from v2 to v5.
yet, Oracle roadmap is to end JDK 11 support in September
this year.




From: ConradJam 
Sent: Thursday, March 16, 2023 4:36 AM
To: d...@flink.apache.org 
Subject: Re: [Discussion] - Release major Flink version to
support JDK 17 (LTS)

EXTERNAL EMAIL



Thanks for your start this discuss


I have been tracking this problem for a long time, until I
saw a
conversation in ISSUSE a few days ago and learned that the
Kryo version
problem will affect the JDK17 compilation of snapshots [1]
FLINK-24998 ,


Re: Disable the chain of the Sink operator

2023-02-16 Thread Chesnay Schepler
As far as I know that chain between committer and writer is also 
required for correctness.


On 16/02/2023 14:53, weijie guo wrote:

Hi wu,

I don't think it is a good choice to directly change the strategy of 
chain. Operator chain usually has better performance and resource 
utilization. If we directly change the chain policy between them, 
users can no longer chain them together, which is not a good starting 
point.


Best regards,

Weijie



wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:

Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will
combine Writer and Committer into one thread, and the thread name
is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
In this way, when the *Committer.commit()* method is very slow, it
will block the*SinkWriter.write()* method to receive upstream data.

The client can use the *env.disableOperatorChaining() *method to
split the thread into two threads:*[Sink: Writer (1/1)#0] *and
*[Sink: Committer (1/1)#0]*. This Committer. The commit method
will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector,
the client can be prevented from setting and disabling the chain.
Or is there a better way to make*Committer.commit()* not block
*SinkWriter.write()*?

Looking forward for your reply.
Thanks && Regards,
di.wu


2023年2月16日 下午6:54,Shammon FY  写道:

Hi

Do you mean how to disable `chain` in your custom sink
connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:


Hello

The current Sink operator will be split into two operations,
Writer and
Commiter. By default, they will be chained together and executed
on the
same thread.
So sometimes when the commiter is very slow, it will block the data
writer, causing back pressure.

At present, FlinkSQL can be solved by disabling the chain
globally, and
DataStream can partially disable the chain through the
disableChaining
method, but both of them need to be set by the user.

Can the strategy of the Chain be changed in the Custom Sink
Connector to
separate Writer and Commiter?

Thanks && Regards,
di.wu







Re: Disable the chain of the Sink operator

2023-02-16 Thread Chesnay Schepler
As far as I know that chain between committer and writer is also 
required for correctness.


On 16/02/2023 14:53, weijie guo wrote:

Hi wu,

I don't think it is a good choice to directly change the strategy of 
chain. Operator chain usually has better performance and resource 
utilization. If we directly change the chain policy between them, 
users can no longer chain them together, which is not a good starting 
point.


Best regards,

Weijie



wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:

Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will
combine Writer and Committer into one thread, and the thread name
is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
In this way, when the *Committer.commit()* method is very slow, it
will block the*SinkWriter.write()* method to receive upstream data.

The client can use the *env.disableOperatorChaining() *method to
split the thread into two threads:*[Sink: Writer (1/1)#0] *and
*[Sink: Committer (1/1)#0]*. This Committer. The commit method
will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector,
the client can be prevented from setting and disabling the chain.
Or is there a better way to make*Committer.commit()* not block
*SinkWriter.write()*?

Looking forward for your reply.
Thanks && Regards,
di.wu


2023年2月16日 下午6:54,Shammon FY  写道:

Hi

Do you mean how to disable `chain` in your custom sink
connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:


Hello

The current Sink operator will be split into two operations,
Writer and
Commiter. By default, they will be chained together and executed
on the
same thread.
So sometimes when the commiter is very slow, it will block the data
writer, causing back pressure.

At present, FlinkSQL can be solved by disabling the chain
globally, and
DataStream can partially disable the chain through the
disableChaining
method, but both of them need to be set by the user.

Can the strategy of the Chain be changed in the Custom Sink
Connector to
separate Writer and Commiter?

Thanks && Regards,
di.wu







Re: Kryo version 2.24.0 from 2015?

2023-02-09 Thread Chesnay Schepler
> Can't programmers just code up migration tools to the current version 
of Kryo or whatever serialization platform you choose?


Well yes, if someone writes a tool to implement a reasonable migration 
path than we may be able to upgrade Kryo.

Until that happens we are blocked on upgrading Kryo.

> Versions older than 3.x aren't supposed to compile nor run correctly 
under Java 11+.


In fact our Java 17 support is currently blocked by an issue that we 
suspect is related to Kryo.

https://issues.apache.org/jira/browse/FLINK-24998

> I'd presume you would make a tool to upgrade files with Kryo 
persisted state in savepoints and checkpoints


That doesn't cover everything and may not necessarily be a viable approach.

Kryo is exposed a fair bit in our APIs (mistakes of the past...) so 
users that have custom Serializers might also have to change things.

Upgrading Kryo is thus also an API breaking change.

As for viability, such an approach implies taking down the application 
on one Flink version, converting the state, and restarting the job on a 
newer Flink version.
This implies a certain amount of downtime for the application, which 
depending on the state size may just not be acceptable to a user.
Having to migrate the savepoint and upgrading Flink at the same time is 
also not ideal since it makes the effort more complicated; being able to 
run the job on the same Flink version with a different Kryo version 
would make things easier, but that'd mean we have to be able to run 2 
Kryo versions in parallel.


Something else to consider is when we already break everything to 
upgrade Kryo, then maybe things should be re-written such that upgrading 
Kryo isn't such a problem in the future; in essence reworking how Kryo 
is integrated into Flink.


That said, the v5 migration guide is quite interesting; specifically 
that Kryo offers a versioned jar.


On 09/02/2023 17:32, Clayton Wohl wrote:
What do you mean you are blocked? Can't programmers just code up 
migration tools to the current version of Kryo or whatever 
serialization platform you choose?


Can't you follow the Kryo migration guide that supports loading data 
serialized with Kryo v2 and reserializing with Kryo v5?

https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5

I'd presume you would make a tool to upgrade files with Kryo persisted 
state in savepoints and checkpoints, that would allow for users to 
register custom serializers. I also presume that new versions of Flink 
would politely refuse to start with old format state files and require 
the migration process to be completed.


Kryo v2 also pulls in objenesis v2.1 from 2013, before Java 8. 
Versions older than 3.x aren't supposed to compile nor run correctly 
under Java 11+.




On Thu, Feb 9, 2023 at 2:34 AM Chesnay Schepler  
wrote:


 > you can't reasonably stay on the 2015 version forever, refuse to
adopt any of the updates or fixes in the 8 years since then, and
reasonably expect things to continue to work well.

We are well aware that Kryo is a ticking time bomb.

 > Is there any possibility a future release of Flink can upgrade
to a
recent version of Kryo serialization?

Of course there is, but someone needs to figure out a way to do this
without breaking everything or providing a reasonable upgrade path,
which has been blocking us so far.

On 09/02/2023 07:34, Clayton Wohl wrote:
> I've noticed the latest Flink is using the Kryo serializer library
> version 2.24.0 which is back from 2015!
>
> The Kryo project is actively maintained, it's on version 5.4.0, so
> 2.24.0 is really quite ancient. I presume the concern is
maintaining
> compatibility with persisted savepoints. That's a valid concern,
but
> you can't reasonably stay on the 2015 version forever, refuse to
adopt
> any of the updates or fixes in the 8 years since then, and
reasonably
> expect things to continue to work well.
>
> Is there any possibility a future release of Flink can upgrade to a
> recent version of Kryo serialization?
>
>
>



Re: Kryo version 2.24.0 from 2015?

2023-02-09 Thread Chesnay Schepler
> you can't reasonably stay on the 2015 version forever, refuse to 
adopt any of the updates or fixes in the 8 years since then, and 
reasonably expect things to continue to work well.


We are well aware that Kryo is a ticking time bomb.

> Is there any possibility a future release of Flink can upgrade to a 
recent version of Kryo serialization?


Of course there is, but someone needs to figure out a way to do this 
without breaking everything or providing a reasonable upgrade path, 
which has been blocking us so far.


On 09/02/2023 07:34, Clayton Wohl wrote:
I've noticed the latest Flink is using the Kryo serializer library 
version 2.24.0 which is back from 2015!


The Kryo project is actively maintained, it's on version 5.4.0, so 
2.24.0 is really quite ancient. I presume the concern is maintaining 
compatibility with persisted savepoints. That's a valid concern, but 
you can't reasonably stay on the 2015 version forever, refuse to adopt 
any of the updates or fixes in the 8 years since then, and reasonably 
expect things to continue to work well.


Is there any possibility a future release of Flink can upgrade to a 
recent version of Kryo serialization?








Re: Docker image Flink 1.15.4

2023-01-26 Thread Chesnay Schepler

1.15.4 is not released yet.

On 26/01/2023 16:06, Peng Zhang wrote:

Hi,

We would like to use Flink 1.15.4 docker image. The latest seems 
1.15.3. Could you make a docker release Flink 1.1.5.4? Thanks!


There is a blocking bug 
https://issues.apache.org/jira/browse/FLINK-28695 
 in 1.5.3 and fixed 
in 1.15.4


BR,
Peng



Re: Problem with custom SerializationSchema in Flink 1.15

2023-01-24 Thread Chesnay Schepler
It's a known issue that various connectors/wrappers/etc did not respect 
the schema lifecycle.


This was fixed in 1.16.0 in 
https://issues.apache.org/jira/browse/FLINK-28807.


You will have to lazily initialize the mapper in the serialize() method 
for previous versions.


On 24/01/2023 11:52, Peter Schrott wrote:

Hi Flink-User!


I recently updated a Flink job from Flink version 1.13 to 1.15 
(managed by AWS). The Flink Job is written in Java.


I found out that the Kinesis Producer was deprecated in favour of 
Kinesis Streams Sink [1]. When upgrading to the new sink I stumbled 
upon a problem withe a custom Serialisation Schema. I am using a 
custom implementation of the Serialisation Schema to deserialize 
result POJOs to JSON using Jacksons Object Mapper. This Object Mapper 
is initialised and set up in the open() method of the Serialisation 
Schema. The problem is, that this open method is not call intially.


I have not found any but report or indications towards this issue. Is 
this known or am I just “holding it wrong” (aka missing something)?


I created a minimal reproducible on my GitHub repo: 
https://github.com/peterschrott/flink-sink-open



Best & Thanks,
Peter


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kinesis/#kinesis-producer




Re: Metrics exposed by flink containing long label values

2023-01-12 Thread Chesnay Schepler
See 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#scope-variables-excludes


On 06/01/2023 12:09, Surendra Lalwani via user wrote:

Hi Team,

We are exposing metrics by Flink to prometheus but we are seeing it 
contains various labels even including operator details. and 
task_name, do we have any way from which we can remove these extra / 
unnecessary labels?


Thanks and Regards ,
Surendra Lalwani



IMPORTANT NOTICE: This e-mail, including any attachments, may contain 
confidential information and is intended only for the addressee(s) 
named above. If you are not the intended recipient(s), you should not 
disseminate, distribute, or copy this e-mail. Please notify the sender 
by reply e-mail immediately if you have received this e-mail in error 
and permanently delete all copies of the original message from your 
system. E-mail transmission cannot be guaranteed to be secure as it 
could be intercepted, corrupted, lost, destroyed, arrive late or 
incomplete, or contain viruses. Company accepts no liability for any 
damage or loss of confidential information caused by this email or due 
to any virus transmitted by this email or otherwise. 




Re: Async IO & Retry: how to get error details

2023-01-12 Thread Chesnay Schepler
Retry logic and per-request timeouts should be setup within 
asyncInvoke() (with all error-handling being done via plain 
CompletableFuture logic), with timeout() sort of acting as a global 
timeout after which you want the job to fail (e.g., to guard against 
mistakes in the asyncInvoke() logic).


The reason you shouldn't use timeout() for retries is that it is only 
ever called once for an input element.


On 04/01/2023 13:40, Yoni Gibbs wrote:
I'm following the documentation here 
, 
getting async IO to retry on failure. What I want is for the async 
action to be attempted, say, three times, and then give up and 
continue processing further records. If it fails after three times, I 
want to sink the record to a DLQ. I believe the way I should do that 
is by overriding |timeout|​, and in there outputting the record to a 
side output, which I then sink to a DLQ of some sort. (Correct me if 
I'm wrong and there's a better way of doing this.)


The record in the DLQ should contain error information about what went 
wrong (e.g. the exceptions that occurred on the three failed 
attempts). How can I get access to this in the |timeout|​ function?


Thanks!




Re: Using filesystem plugin with MiniCluster

2023-01-12 Thread Chesnay Schepler

There is no good way in 1.15 IIRC.

Adding a dependency on flink-s3-fs-hadoop _can_ work, if you dont run 
into dependency conflicts.


Otherwise you have to create a plugin manager yourself, point it to some 
local directory via a system property (I think?), and then eagerly call 
FileSystem#initialize with the plugin manager.


On 04/01/2023 02:11, Yaroslav Tkachenko wrote:

Hey Ken,

I have flink-s3-fs-hadoop as a provided dependency in my project, and 
I've configured my IDE to include provided dependencies when starting 
applications. Works just fine.


On Tue, Jan 3, 2023 at 5:06 PM Ken Krugler 
 wrote:


Hi all,

With Flink 1.15.x, is there a way to use the S3 Presto plugin when
running code on the MiniCluster?

I can’t just add that jar as a dependency when testing, as I get:

java.lang.NoClassDefFoundError: Could not initialize class
com.facebook.presto.hive.s3.PrestoS3FileSystem
at

org.apache.flink.fs.s3presto.S3FileSystemFactory.createHadoopFileSystem(S3FileSystemFactory.java:88)
~[flink-s3-fs-presto-1.15.1.jar:1.15.1]
at

org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126)
~[flink-s3-fs-presto-1.15.1.jar:1.15.1]

I assume that’s because of this warning in the Flink docs:


The s3 file systems (flink-s3-fs-presto and flink-s3-fs-hadoop)
can only be used as plugins as we already removed the
relocations. Placing them in libs/ will result in system failures.


In the latest Flink JavaDocs, there’s a way to specify the
PluginManager for the MiniClusterConfiguration, but I don’t see
that in Flink 1.15.x

So is there a workaround to allow me to run a test from inside of
my IDE, using the MiniCluster, that reads from S3?

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Re: Flink reactive mode for application clusters on AWS EKS

2023-01-12 Thread Chesnay Schepler
The adaptive scheduler and reactive mode both already support 
application clusters since 1.13.


https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/

On 19/12/2022 10:17, Tamir Sagi wrote:

Hey,

We are running stream jobs on application clusters (v1.15.2) on AWS EKS.

I was reviewing the following pages on Flink confluence

  * Reactive mode [1]
  * Adaptive Scheduler [2]

I also encountered the following POC conducted by Robert Metzger 
(@rmetzger_ ) on 06 May 2021. [3]


my question is whether that feature will be supported in the future 
for application clusters or not.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler

[3] https://flink.apache.org/2021/05/06/reactive-mode.html


Thanks,
Tamir.


Confidentiality: This communication and any attachments are intended 
for the above-named persons only and may be confidential and/or 
legally privileged. Any opinions expressed in this communication are 
not necessarily those of NICE Actimize. If this communication has come 
to you in error you must take no action based on it, nor must you copy 
or show it to anyone; please delete/destroy and inform the sender by 
e-mail immediately.

Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail 
and attachments are free from any virus, we advise that in keeping 
with good computing practice the recipient should ensure they are 
actually virus free.




[ANNOUNCE] Apache flink-connector-cassandra 3.0.0 released

2022-12-06 Thread Chesnay Schepler
|The Apache Flink community is very happy to announce the release of 
Apache flink-connector-cassandra 3.0.0. |
|Apache Flink® is an open-source stream processing framework ||for| 
|distributed, high-performing, always-available, and accurate data 
streaming applications.|

|The release is available ||for| |download at:|
|https:||//flink.apache.org/downloads.html|
|
|
This release marks the first time we have released this connector 
separately from the main Flink release.

Over time more connectors will be migrated to this release model.

This release is equivalent to the connector version released alongside 
Flink 1.16.0 and acts as a drop-in replacement. ||

|
|
|The full release notes are available in Jira:|
|https://issues.apache.org/jira/projects/FLINK/versions/12352593|
||
|We would like to thank all contributors of the Apache Flink community 
who made ||this| |release possible!|

||
|Regards,|
|chesnay|


Re: Query about flink job manager dashboard

2022-11-30 Thread Chesnay Schepler
There's no way to /disable/ the UI. (But you could cut out the 
javascript stuff from the flink-dist jar)


I'm curious why you'd want that though; since it works against the REST 
API it provides a strict subset of the REST API functionality.


On 30/11/2022 16:25, Berkay Polat wrote:

Hi Chesnay,

I have a similar question on this topic. Is there an option to disable 
the frontend altogether but still use REST APIs?


Thanks

On Wed, Nov 30, 2022 at 1:37 AM Chesnay Schepler  
wrote:


There's no way to disable the jar submission in the UI but have it
still work via the REST API.

On 30/11/2022 06:16, naga sudhakar wrote:

After disabling the cancel, submit flags facing issues with below
api calls.

1) /jars giving 404
2) /jars/upload
3) /jars/{jarid}/run

Is there any config changes needed to have these apis work?


On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar,
 wrote:

Hi,
We are able to disable this cancela nd upload otpion in ui.
But this is having issues with endpoints for below.
Get call for /jars to list all uploaded jars and post call
/jars/{jarid}/run are giving 404 after disabling the two flags.
Is the process of uploading jars and running a jar with
specific id changes after this change?

Please suggest.

Thanks & Regards,
Nagasudhakar

On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser,
 wrote:

Hi,

1) No, that's currently not possible.
2) You could consider disabling to disallow uploading new
JARs and/or cancelling jobs from the UI. See

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui

<https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/*advanced-options-for-flink-web-ui__;Iw!!DCbAVzZNrAf4!B-eZxS_lrz683kg6TY5I6gI7lex3qiRrQTWoM_EbECDFr62njjFlxqyUFfLFS6GofOfFjNbK9HcXrwyq0w$>

Best regards,

Martijn

On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar
 wrote:

Hi Team,
Greetings!!!
I am a software developer using apache flink and
deploying flink jobs using the same. I have two
queries about flink job manager dashboard. Can
you please help with below?

1) is it possible to add login mechanism for the
flink job manager dash board and have a role
based mechanism for viewing running jobs,
cancelling jobs, adding the jobs?
2) is it possible to disable to dash bord display
but use api to do the same operations using API?


Thanks,
Nagasudhakar.



--
*BERKAY POLAT*
Software Engineer SMTS | MuleSoft at Salesforce
Mobile: 443-710-7021

<https://smart.salesforce.com/sig/bpolat//us_mb/default/link.html>




Re: Query about flink job manager dashboard

2022-11-30 Thread Chesnay Schepler
There's no way to disable the jar submission in the UI but have it still 
work via the REST API.


On 30/11/2022 06:16, naga sudhakar wrote:
After disabling the cancel, submit flags facing issues with below api 
calls.


1) /jars giving 404
2) /jars/upload
3) /jars/{jarid}/run

Is there any config changes needed to have these apis work?


On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar,  
wrote:


Hi,
We are able to disable this cancela nd upload otpion in ui.
But this is having issues with endpoints for below.
Get call for /jars to list all uploaded jars and post call
/jars/{jarid}/run are giving 404 after disabling the two flags.
Is the process of uploading jars and running a jar with specific
id changes after this change?

Please suggest.

Thanks & Regards,
Nagasudhakar

On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser,
 wrote:

Hi,

1) No, that's currently not possible.
2) You could consider disabling to disallow uploading new JARs
and/or cancelling jobs from the UI. See

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui

Best regards,

Martijn

On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar
 wrote:

Hi Team,
Greetings!!!
I am a software developer using apache flink and
deploying flink jobs using the same. I have two
queries about flink job manager dashboard. Can you
please help with below?

1) is it possible to add login mechanism for the flink
job manager dash board and have a role based mechanism
for viewing running jobs, cancelling jobs, adding the
jobs?
2) is it possible to disable to dash bord display but
use api to do the same operations using API?


Thanks,
Nagasudhakar.



[ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Chesnay Schepler
The Apache Flink community is very happy to announce the release of 
Apache Flink Elasticsearch Connector 3.0.0.


Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

This release marks the first time we have released a connector 
separately from the main Flink release.

Over time more connectors will be migrated to this release model.

This release is equivalent to the connector version released alongside 
Flink 1.16.0 and acts as a drop-in replacement.


The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352291

We would like to thank all contributors of the Apache Flink community 
who made this release possible!


Regards,
Chesnay


[ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Chesnay Schepler
The Apache Flink community is very happy to announce the release of 
Apache Flink Elasticsearch Connector 3.0.0.


Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

This release marks the first time we have released a connector 
separately from the main Flink release.

Over time more connectors will be migrated to this release model.

This release is equivalent to the connector version released alongside 
Flink 1.16.0 and acts as a drop-in replacement.


The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352291

We would like to thank all contributors of the Apache Flink community 
who made this release possible!


Regards,
Chesnay


Re: Kinesis Connector does not work

2022-11-08 Thread Chesnay Schepler
This is a general thing; see 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/configuration/connector/


The python documentation isn't particularly clear on how to use Java 
connectors. The easiest thing would be to use the "sql-*" connector jars 
I guess.


On 08/11/2022 11:49, Matt Fysh wrote:
Ok thanks, will give that a try. Is that something that should be 
added to the Kinesis connector docs page? There are existing 
instructions there for adding the flink-connector-kinesis jar as a 
dependency, but no instructions for adding commons-logging


Or if this is something more general, it might be something to talk 
about in the Python section of the docs because most Python users are 
not going to understand the interplay between Java classes.


On Tue, 8 Nov 2022 at 18:50, Chesnay Schepler  wrote:

Said dependency (on commons-logging) is not meant to be provided
by the
docker image, but bundled in your user-jar (along with the connector).

On 08/11/2022 02:14, Matt Fysh wrote:
> Hi, I'm following the kinesis connector instructions as documented
> here:
>

https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kinesis/

>
>
> I'm also running Flink in standalone session mode using docker
compose
> and the Python images, as described in the Flink docs
(Deployment section)
>
> When I try to run a basic datastream.print() / env.execute()
example
> with a kinesis source, I get the following error. From my limited
> understanding of Java, it seems the Kinesis connector is using a
> shaded version of the AWS Java SDK, and that older version of
the SDK
> is trying to load a class that is no longer present in the 1.16.0
> Flink docker images. Is there a workaround for this? Thanks
>
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/commons/logging/LogFactory
> at
>

org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration.(ClientConfiguration.java:47)
> at
>

org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
> at
>

org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36)
> at
>

org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268)
> at
>

org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:152)




Re: Kinesis Connector does not work

2022-11-08 Thread Chesnay Schepler
Said dependency (on commons-logging) is not meant to be provided by the 
docker image, but bundled in your user-jar (along with the connector).


On 08/11/2022 02:14, Matt Fysh wrote:
Hi, I'm following the kinesis connector instructions as documented 
here: 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kinesis/ 



I'm also running Flink in standalone session mode using docker compose 
and the Python images, as described in the Flink docs (Deployment section)


When I try to run a basic datastream.print() / env.execute() example 
with a kinesis source, I get the following error. From my limited 
understanding of Java, it seems the Kinesis connector is using a 
shaded version of the AWS Java SDK, and that older version of the SDK 
is trying to load a class that is no longer present in the 1.16.0 
Flink docker images. Is there a workaround for this? Thanks


Caused by: java.lang.NoClassDefFoundError: 
org/apache/commons/logging/LogFactory
at 
org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration.(ClientConfiguration.java:47)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:152)





Re: [Security] - Critical OpenSSL Vulnerability

2022-11-01 Thread Chesnay Schepler

We just push new images with the same tags.

On 01/11/2022 14:35, Matthias Pohl wrote:
The Docker image for Flink 1.12.7 uses an older base image which comes 
with openssl 1.1.1k. There was a previous post in the OpenSSL mailing 
list reporting a low vulnerability being fixed with 3.0.6 and 1.1.1r 
(both versions being explicitly mentioned) [1]. Therefore, I 
understand the post in a way that only 3.0.x would be affected and, as 
a consequence, Docker images below 1.13- would be fine.


I verified Mason's finding that only 1.14+ Docker images are affected. 
No entire release is necessary as far as I understand. Theoretically, 
we would only have to push newer Docker images to the registry. I'm 
not sure what the right approach is when it comes to versioning. I'm 
curious about Chesnay's opinion on that one (CC'd).


[1] 
https://mta.openssl.org/pipermail/openssl-announce/2022-October/000233.html


On Tue, Nov 1, 2022 at 7:06 AM Prasanna kumar 
 wrote:


Could we also get an emergency patch to 1.12 version as well ,
because upgrading flink to a newer version on production in a
short time would be high in effort and longer in duration as well .

Thanks,
Prasanna

On Tue, Nov 1, 2022 at 11:30 AM Prasanna kumar
 wrote:

If flink version 1.12 also affected ?

Thanks,
Prasanna.

On Tue, Nov 1, 2022 at 10:40 AM Mason Chen
 wrote:

Hi Tamir and Martjin,

We have also noticed this internally. So far, we have
found that the *latest* Flink Java 11/Scala 2.12 docker
images *1.14, 1.15, and 1.16* are affected, which all have
the *openssl 3.0.2 *dependency. It would be good to
discuss an emergency release when this patch comes out
tomorrow, as it is the highest priority level from their
severity rating.

Best,
Mason

On Mon, Oct 31, 2022 at 1:10 PM Martijn Visser
 wrote:

Hi Tamir,

That depends on a) if Flink is vulnerable and b) if
yes, how vulnerable that would be.

Best regards,

Martijn

Op ma 31 okt. 2022 om 19:22 schreef Tamir Sagi


Hey all,

Following that link

https://eu01.z.antigena.com/l/CjXA7qEmnn79gc24BA2Hb6K2OVR-yGlLfMyp4smo5aXj5Z6WC0dSiHCRPqjSz972DkRNssUoTbxKmp5Pi3IaaVB983yfLJ9MUZY9LYtnBMEKJP5DcQqmhR3SktltkbVG8b7nSRa84kWSnwNJFuXFLA2GrMLTVG7mXdy59-ykolsAWAVAJSDgRdWCv6xN0iczvQ


due to critical vulnerability , there will be an
important release of OpenSSl v3.0.7 tomorrow
November 1st.

Is there any plan to update Flink with the newest
version?

Thanks.
Tamir


Confidentiality: This communication and any
attachments are intended for the above-named
persons only and may be confidential and/or
legally privileged. Any opinions expressed in this
communication are not necessarily those of NICE
Actimize. If this communication has come to you in
error you must take no action based on it, nor
must you copy or show it to anyone; please
delete/destroy and inform the sender by e-mail
immediately.
Monitoring: NICE Actimize may monitor incoming and
outgoing e-mails.
Viruses: Although we have taken steps toward
ensuring that this e-mail and attachments are free
from any virus, we advise that in keeping with
good computing practice the recipient should
ensure they are actually virus free.

-- 
Martijn

https://twitter.com/MartijnVisser82

https://github.com/MartijnVisser




Re: flink-table-api-scala-bridge sources

2022-10-31 Thread Chesnay Schepler

Thanks for reporting the issue; I've filed a ticket (FLINK-29803).

On 25/10/2022 09:40, Clemens Valiente wrote:

Hi everyone

I noticed when going through the scala datastream/table api bridge in 
my IDE I cannot see the source of the code. I believe it is because 
the Sources are missing on maven:
https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_2.12/1.15.2/ 

If you have a look at the -sources.jar you will see it doesn't 
actually contain any sources. It would be very helpful to have these 
sources published since they contain the API a lot of users will end 
up calling, like StreamTableEnvironment.toChangelogStream.

Thanks a lot
Clemens


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities 
(collectively, “Grab”), you are deemed to have consented to the 
processing of your personal data as set out in the Privacy Notice 
which can be viewed at https://grab.com/privacy/ 



 This email contains confidential information that may be 
privileged and is only for the intended recipient(s). If you are not 
the intended recipient(s), please do not disseminate, distribute or 
copy this email. Please notify Grab immediately if you have received 
this by mistake and delete this email from your system. Email 
transmission may not be secure or error-free as any information could 
be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab does not accept liability for any errors or 
omissions in this email that arise as a result of email transmission. 
All intellectual property rights in this email 
and any attachments shall remain vested in Grab, unless otherwise 
provided by law




Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-13 Thread Chesnay Schepler
Support for records has not been investigated yet. We're still at the 
stage of getting things to run at all on Java 17.


It _may_ be possible, it _may_ not be.

On 13/10/2022 07:39, Salva Alcántara wrote:

Hi Martijn,

Maybe a bit of an off-topic, but regarding Java 17 support, will it be 
possible to replace POJOs with Java records in existing applications?


In a project I maintain we use Lombok a lot, but with Java records we 
would probably stop using it (or significantly reduce its usage).


Will there be a way to promote existing POJOs (either written 
"manually" or using Lombok) to Java records without breaking 
serialization? (assuming that those POJOs are used as immutable 
values, e.g., setters are never used).


Regards,

Salva

On Wed, Oct 12, 2022 at 9:11 PM Martijn Visser 
 wrote:


Hi everyone,

Thanks again for all your feedback. It's very much appreciated.

My overall feeling is that people are not opposed to the FLIP.
There is demand for adding Java 17 support before dropping the
Scala APIs. Given that the proposal for actually dropping the
Scala APIs would only happen with a Flink 2.0 and Java 17 support
would either happen in a new minor version or a new major version
(I haven't seen a FLIP or discussion being opened adding Java 17
support, only on deprecating Java 8), Java 17 support would either
be there earlier (in a new minor version) or at the same time
(with Flink 2.0) when the Scala APIs would be dropped.

If there are no more discussion topics, I would move this FLIP to
a vote at the beginning of next week.

Best regards,

Martijn

On Sun, Oct 9, 2022 at 10:36 AM guenterh.lists
 wrote:

Hi Martijn

I do not maintain a large production application based on
Flink, so it would not be a problem for me to convert existing
implementations to whatever API.

I am working in the area of cultural heritage, which is mainly
about the processing of structured (meta)-data (scientific
libraries, archives and museums)
My impression: People without much background/experience with
Java implementations find it easier to get into the functional
mindset as supported in Scala. That's why I think it would be
very unfortunate if the use of Scala in Flink becomes more and
more limited or neglected.

I think using the Java API in Scala is a possible way also in
my environment.

In the last weeks I tried to port the examples from the "Flink
Course" of Daniel Ciorcilan (https://rockthejvm.com/p/flink -
he mainly offers Scala courses), which are exclusively based
on the native Scala API, to the Java API. This has worked
without any problems as far as I can see. So far I haven't
tried any examples based on the Table API or streaming
workflows in batch mode (which would be important for our
environment).

My main trouble: So far I don't know enough about the
limitations of using the Java API in a Scala implementation
and what that means. My current understanding: the limitation
is mainly in deriving the type information in generic APIs
with Scala types. For me it would be very significant and
helpful if there would be more information, descriptions and
examples about this topic.

So far unfortunately I had too little time to deal with a
wrapper like flink-scala-api
(https://github.com/findify/flink-scala-api ) and the current
alternative is probably going to be deprecated in the future
(https://github.com/ariskk/flink4s/issues/17#issuecomment-1125806808
)

Günter


On 04.10.22 13:58, Martijn Visser wrote:

Hi Marton,

You're making a good point, I originally wanted to include
already the User mailing list to get their feedback but
forgot to do so. I'll do some more outreach via other
channels as well.

@Users of Flink, I've made a proposal to deprecate and remove
Scala API support in a future version of Flink. Your feedback
on this topic is very much appreciated.

Regarding the large Scala codebase for Flink, a potential
alternative could be to have a wrapper for all Java APIs that
makes them available as Scala APIs. However, this still
requires Scala maintainers and I don't think that we
currently have those in our community. The easiest solution
for them would be to use the Java APIs directly. Yes it would
involve work, but we won't actually be able to remove the
Scala APIs until Flink 2.0 so there's still time for that :)

Best regards,

Martijn

On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi
 wrote:

Hi Martjin,

Thanks for compiling the FLIP. I agree with the 

Re: Flink falls back on to kryo serializer for GenericTypes

2022-10-12 Thread Chesnay Schepler
There's no alternative to Kryo for generic types, apart from 
implementing your Flink serializer (but technically at that point the 
type is no longer treated as a generic type).


enableForAvro only forces Avro to be used for POJO types.

On 11/10/2022 09:29, Sucheth S wrote:

Hello,

How to avoid flink's kryo serializer for GenericTypes ? Kryo is having 
some performance issues.


Tried below but no luck.
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();
Tried this - env.getConfig().disableGenericTypes();
getting - Generic types have been disabled in the ExecutionConfig and type 
org.apache.avro.generic.GenericRecord is treated as a generic type


Regards,
Sucheth Shivakumar
website: https://sucheths.com
mobile : +1(650)-576-8050
San Mateo, United States




Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-11 Thread Chesnay Schepler

Currently I think that would be a mistake.

Ultimately what we have here is the culmination of us never really 
considering how the numRecordsOut metric should behave for operators 
that emit data to other operators _and_ external systems. This goes 
beyond sinks.
This even applies to numRecordsIn, for cases where functions query/write 
data from/to the outside, (e.g., Async IO).


Having 2 separate metrics for that, 1 exclusively for internal data 
transfers, and 1 exclusively for external data transfers, is the only 
way to get a consistent metric definition in the long-run.

We can jump back-and-forth now or just commit to it.

I don't think we can really judge this based on FLIP-33. It was IIRC 
written before the two phase sinks were added, which heavily blurred the 
lines of what a sink even is. Because it definitely is _not_ the last 
operator in a chain anymore.


What I would suggest is to stick with what we got (although I despise 
the name numRecordsSend), and alias the numRecordsOut metric for all 
non-TwoPhaseCommittingSink.


On 11/10/2022 05:54, Qingsheng Ren wrote:

Thanks for the details Chesnay!

By “alias” I mean to respect the original definition made in FLIP-33 
for numRecordsOut, which is the number of records written to the 
external system, and keep numRecordsSend as the same value as 
numRecordsOut for compatibility.


I think keeping numRecordsOut for the output to the external system is 
more intuitive to end users because in most cases the metric of data 
flow output is more essential. I agree with you that a new metric is 
required, but considering compatibility and users’ intuition I prefer 
to keep the initial definition of numRecordsOut in FLIP-33 and name a 
new metric for sink writer’s output to downstream operators. This 
might be against consistency with metrics in other operators in Flink 
but maybe it’s acceptable to have the sink as a special case.


Best,
Qingsheng
On Oct 10, 2022, 19:13 +0800, Chesnay Schepler , 
wrote:

> I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut

But that's not possible. If it were that simple there would have 
never been a need to introduce another metric in the first place.


It's a rather fundamental issue with how the new sinks work, in that 
they emit data to the external system (usually considered as 
"numRecordsOut" of sinks) while _also_ sending data to a downstream 
operator (usually considered as "numRecordsOut" of tasks).
The original issue was that the numRecordsOut of the sink counted 
both (which is completely wrong).


A new metric was always required; otherwise you inevitably end up 
breaking /some/ semantic.
Adding a new metric for what the sink writes to the external system 
is, for better or worse, more consistent with how these metrics 
usually work in Flink.


On 10/10/2022 12:45, Qingsheng Ren wrote:

Thanks everyone for joining the discussion!

> Do you have any idea what has happened in the process here?

The discussion in this PR [1] shows some details and could be 
helpful to understand the original motivation of the renaming. We do 
have a test case for guarding metrics but unfortunaly the case was 
also modified so the defense was broken.


I think the reason why both the developer and the reviewer forgot to 
trigger an discussion and gave a green pass on the change is that 
metrics are quite “trivial” to be noticed as public APIs. As 
mentioned by Martijn I couldn’t find a place noting that metrics are 
public APIs and should be treated carefully while contributing and 
reviewing.


IMHO three actions could be made to prevent this kind of changes in 
the future:


a. Add test case for metrics (which we already have in 
SinkMetricsITCase)
b. We emphasize that any public-interface breaking changes should be 
proposed by a FLIP or discussed in mailing list, and should be 
listed in the release note.
c. We remind contributors and reviewers about what should be 
considered as public API, and include metric names in it.


For b and c these two pages [2][3] might be proper places.

About the patch to revert this, it looks like we have a consensus on 
1.16. As of 1.15 I think it’s worthy to trigger a minor version. I 
didn’t see complaints about this for now so it should be OK to save 
the situation asap. I’m with Xintong’s idea to treat numXXXSend as 
an alias of numXXXOut considering there could possibly some users 
have already adapted their system to the new naming, and have 
another internal metric for reflecting number of outgoing 
committable batches (actually the numRecordsIn of sink committer 
operator should be carrying this info already).


[1] https://github.com/apache/flink/pull/18825
[2] https://flink.apache.org/contributing/contribute-code.html
[3] https://flink.apache.org/contributing/reviewing-prs.html

Best,
Qingsheng
On Oct 10, 2022, 17:40 +0800, Xintong Song , 
wrote:

+1 for reverting these changes in Flink 1.16.

For 1.15.3, can we make these metric

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Chesnay Schepler

> I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut

But that's not possible. If it were that simple there would have never 
been a need to introduce another metric in the first place.


It's a rather fundamental issue with how the new sinks work, in that 
they emit data to the external system (usually considered as 
"numRecordsOut" of sinks) while _also_ sending data to a downstream 
operator (usually considered as "numRecordsOut" of tasks).
The original issue was that the numRecordsOut of the sink counted both 
(which is completely wrong).


A new metric was always required; otherwise you inevitably end up 
breaking /some/ semantic.
Adding a new metric for what the sink writes to the external system is, 
for better or worse, more consistent with how these metrics usually work 
in Flink.


On 10/10/2022 12:45, Qingsheng Ren wrote:

Thanks everyone for joining the discussion!

> Do you have any idea what has happened in the process here?

The discussion in this PR [1] shows some details and could be helpful 
to understand the original motivation of the renaming. We do have a 
test case for guarding metrics but unfortunaly the case was also 
modified so the defense was broken.


I think the reason why both the developer and the reviewer forgot to 
trigger an discussion and gave a green pass on the change is that 
metrics are quite “trivial” to be noticed as public APIs. As mentioned 
by Martijn I couldn’t find a place noting that metrics are public APIs 
and should be treated carefully while contributing and reviewing.


IMHO three actions could be made to prevent this kind of changes in 
the future:


a. Add test case for metrics (which we already have in SinkMetricsITCase)
b. We emphasize that any public-interface breaking changes should be 
proposed by a FLIP or discussed in mailing list, and should be listed 
in the release note.
c. We remind contributors and reviewers about what should be 
considered as public API, and include metric names in it.


For b and c these two pages [2][3] might be proper places.

About the patch to revert this, it looks like we have a consensus on 
1.16. As of 1.15 I think it’s worthy to trigger a minor version. I 
didn’t see complaints about this for now so it should be OK to save 
the situation asap. I’m with Xintong’s idea to treat numXXXSend as an 
alias of numXXXOut considering there could possibly some users have 
already adapted their system to the new naming, and have another 
internal metric for reflecting number of outgoing committable batches 
(actually the numRecordsIn of sink committer operator should be 
carrying this info already).


[1] https://github.com/apache/flink/pull/18825
[2] https://flink.apache.org/contributing/contribute-code.html
[3] https://flink.apache.org/contributing/reviewing-prs.html

Best,
Qingsheng
On Oct 10, 2022, 17:40 +0800, Xintong Song , wrote:

+1 for reverting these changes in Flink 1.16.

For 1.15.3, can we make these metrics available via both names 
(numXXXOut and numXXXSend)? In this way we don't break it for those 
who already migrated to 1.15 and numXXXSend. That means we still need 
to change SinkWriterOperator to use another metric name in 1.15.3, 
which IIUC is internal to Flink sink.


I'm overall +1 to change numXXXOut back to its original semantics. 
AFAIK (from meetup / flink-forward questionaires), most users do not 
migrate to a new Flink release immediately, until the next 1-2 major 
releases are out.


Best,

Xintong



On Mon, Oct 10, 2022 at 5:26 PM Martijn Visser 
 wrote:


Hi Qingsheng,

Do you have any idea what has happened in the process here? Do we
know why
they were changed? I was under the impression that these metric
names were
newly introduced due to the new interfaces and because it still
depends on
each connector implementing these.

Sidenote: metric names are not mentioned in the FLIP process as a
public
API. Might make sense to have a separate follow-up to add that to
the list
(I do think we should list them there).

+1 for reverting this and make this change in Flink 1.16

I'm not in favour of releasing a Flink 1.15.3 with this change: I
think the
impact is too big for a patch version, especially given how long
Flink 1.15
is already out there.

Best regards,

Martijn

On Mon, Oct 10, 2022 at 11:13 AM Leonard Xu 
wrote:

> Thanks Qingsheng for starting this thread.
>
> +1 on reverting sink metric name and releasing 1.15.3 to fix this
> inconsistent behavior.
>
>
> Best,
> Leonard
>
>
>
>
>
> 2022年10月10日 下午3:06,Jark Wu  写道:
>
> Thanks for discovering this problem, Qingsheng!
>
> I'm also +1 for reverting the breaking changes.
>
> IIUC, currently, the behavior of "numXXXOut" metrics of the new
and old
> sink is inconsistent.
> We have to break one of them to have consistent behavior. Sink
V2 is an
 

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Chesnay Schepler

On 10/10/2022 11:24, Martijn Visser wrote:
Sidenote: metric names are not mentioned in the FLIP process as a 
public API. Might make sense to have a separate follow-up to add that 
to the list (I do think we should list them there).


That's a general issue we have. There's a lot of things we _ usually_ 
treat as a public API without having written it down; including


 * config options (I mean _keys_, not ConfigOption members)
 * CLI
 * REST API
 * metric names
 * scripts in distribution bin/ directory


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread Chesnay Schepler
> It's possible that for the sake of the Scala API, we would 
occasionally require some changes in the Java API. As long as those 
changes are not detrimental to Java users, they should be considered.


That's exactly the model we're trying to get to. Don't fix 
scala-specific issues with scala code, but rather on the Java side as 
much as possible which could also benefit other JVM languages (e.g., 
Kotlin).


> A question regarding the Flink wrapper: would it be possible to keep 
it under the Flink project's umbrella? Or does it need to be a 
completely separate structure? I'm not aware of the full organizational 
implications of this, I'm afraid.


Technically it can be under the Flink umbrella, but then Flink would 
still be (at least for a while) be the bottleneck because we'd have to 
review any changes coming in.
That would only improve once several new committers were added to take 
care of this project.
(In the end you'd just split Flink and the Scala API _codebases_, but 
achieve little else)


> And if that is what it takes to move beyond Scala 2.12.7… This has 
been a big pain point for us.


I'm curious what target Scala versions people are currently interested in.
I would've expected that everyone wants to migrate to Scala 3, for which 
several wrapper projects around Flink already exist.


On 05/10/2022 12:35, Gaël Renoux wrote:

Hello everyone,

I've already answered a bit on Twitter, I'll develop my thoughts a bit 
here. For context, my company (DataDome) has a significant codebase on 
Scala Flink (around 110K LOC), having been using it since 2017. I 
myself am an enthusiastic Scala developer (I don't think I'd like 
moving back to Java)


Given that, I think separating the Scala support from Flink is 
actually a good move long term. We would then have a full-Java Flink, 
and a separate Scala wrapper. It would help a lot in solving the 
skills issue: Flink maintainers would no longer need to be fluent in 
Scala, and maintainers of the Scala wrapper would not need a deep 
knowledge of Flink's inner workings, just the API would be sufficient. 
And if that is what it takes to move beyond Scala 2.12.7… This has 
been a big pain point for us.


I'm not too worried about finding contributors for the Scala wrapper. 
Within my company, we have developed additional wrappers and extension 
methods (for parts where we felt the Flink Scala API was 
insufficient), and we've been looking at ways we could contribute 
back. What held us back was our lack of knowledge of the full Flink 
environment (we're only using the Scala Datastream API). I don't think 
we're the only ones in that situation. One major point, though, is 
that Flink developers would not be completely rid of us ;-) It's 
possible that for the sake of the Scala API, we would occasionally 
require some changes in the Java API. As long as those changes are not 
detrimental to Java users, they should be considered.


A question regarding the Flink wrapper: would it be possible to keep 
it under the Flink project's umbrella? Or does it need to be a 
completely separate structure? I'm not aware of the full 
organizational implications of this, I'm afraid.


Finally, the hard part would be the migration to the new version. My 
dream solution would be to have the existing Scala API be entirely 
converted into a Scala wrapper over the Java API. That way, migration 
would be pretty minimal: add a dependency, change the imports for the 
Scala API, and we're done. However, even starting from the existing 
flink4s project, that's still quite a lot of work. So, more 
realistically, I'd settle for at least a partial implementation. We 
would have some broken code that we could fix, but at the very least 
I'd like the basic DataStream functions (process, uid, name…) to be 
available.


Thanks for all the work that went into making Flink what it is!


Gaël Renoux - Lead R Engineer
E - gael.ren...@datadome.co
W - www.datadome.co 



On Wed, Oct 5, 2022 at 9:30 AM Maciek Próchniak  wrote:

Hi Martin,

Could you please remind what was the conclusion of discussion on
upgrading Scala to 2.12.15/16?
https://lists.apache.org/thread/hwksnsqyg7n3djymo7m1s7loymxxbc3t -
I couldn't find any follow-up vote?

If it's acceptable to break binary compatibility by such an
upgrade, then upgrading to JDK17 before 2.0 will be doable?


thanks,

maciek


On 04.10.2022 18:21, Martijn Visser wrote:

Hi Yaroslav,

Thanks for the feedback, that's much appreciated! Regarding Java
17 as a prerequisite, we would have to break compatibility
already since Scala 2.12.7 doesn't compile on Java 17 [1].

Given that we can only remove Scala APIs with the next major
Flink (2.0) version, would that still impact you a lot? I do
imagine that if we get to a Flink 2.0 version there would be more
breaking involved anyway. The biggest consequence of deprecating
support for Scala in Flink 1.x would 

Re: How to read flink state data without setting uid?

2022-09-22 Thread Chesnay Schepler
You will need to reload the savepoint with the original job and add uids 
to all operators (while also setting the uid hashes on all operators to 
properly restore the state).


On 22/09/2022 11:06, Chesnay Schepler wrote:

Currently the state processor API does not support that.

On 22/09/2022 11:02, BIGO wrote:
I didn't set the uid for my flink operator, is there any way to read 
the flink state data? State Processor API requires uid. Thanks.





Re: How to read flink state data without setting uid?

2022-09-22 Thread Chesnay Schepler

Currently the state processor API does not support that.

On 22/09/2022 11:02, BIGO wrote:
I didn't set the uid for my flink operator, is there any way to read 
the flink state data? State Processor API requires uid. Thanks.




Re: Fail to build Flink 1.15.1

2022-09-12 Thread Chesnay Schepler
I think you want to use --single-branch instead of --depth 1; the latter 
only overrides top-level files, leaving the remaining files on whatever 
version you checked out previously.


https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---depthltdepthgt

On 10/09/2022 16:08, Jun Qin wrote:

|Thanks Chesnay, |
|I wanted to check out the tagged release 1.15.1. I did it in this way:|
|git clone --depth 1 --branch release-1.15.1 
g...@github.com:apache/flink.git|

||
|This seems cause the problem. With the same java/maven, I can build 
the branch 1.15. |




On Sep 9, 2022, at 11:58 PM, Chesnay Schepler  wrote:

hmm...we've only seen that error in older Flink version: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/flinkdev/ide_setup/#compilation-fails-with-cannot-find-symbol-symbol-method-defineclass-location-class-sunmiscunsafe


Please double-check whether you actually checked out 1.15.1; I can't 
reference to sun.misc.Unsafe in the 1.15.1 version of the mentioned 
class: 
https://github.com/apache/flink/blob/release-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java


On 09/09/2022 22:01, Jun Qin wrote:

I have an issue when build a clean Flink 1.15.1 on MacOS with:
mvn clean install -DskipTests -Dfast

% echo $JAVA_HOME
/usr/local/Cellar/openjdk@11/11.0.16.1

% mvn -version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /usr/local/Cellar/maven/3.8.6/libexec
Java version: 11.0.16.1, vendor: Homebrew, runtime: 
/usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home

Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: “mac"

% java -version
openjdk version "11.0.16.1" 2022-08-12
OpenJDK Runtime Environment Homebrew (build 11.0.16.1+0)
OpenJDK 64-Bit Server VM Homebrew (build 11.0.16.1+0, mixed mode)

It failed with:
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile 
(default-compile) on project flink-test-utils-junit: Compilation failure
[ERROR] 
/Workspace/flink-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[244,53] 
cannot find symbol
[ERROR] symbol:   method 
defineClass(java.lang.String,byte[],int,int,java.lang.ClassLoader,java.security.ProtectionDomain)

[ERROR] location: class sun.misc.Unsafe

I tried also with a downloaded maven 3.2.5 binary (maven 3.2 has 
been disabled in brew):

% ~/Downloads/apache-maven-3.2.5/bin/mvn -version
Apache Maven 3.2.5 (12a6b3acb947671f09b81f49094c53f426d8cea1; 
2014-12-14T18:29:23+01:00)

Maven home: /Users/jqin/Downloads/apache-maven-3.2.5
Java version: 11.0.16.1, vendor: Homebrew
Java home: 
/usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home

Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: "mac"

it failed at the same place with the same error message.

Anything I did is wrong?

Jun







Re: New licensing for Akka

2022-09-09 Thread Chesnay Schepler

That is great to hear. I have updated the blog post accordingly.

On 09/09/2022 19:29, Matthias Pohl wrote:
Looks like there will be a bit of a grace period till Sep 2023 for 
vulnerability fixes in akka 2.6.x [1]


[1] https://discuss.lightbend.com/t/2-6-x-maintenance-proposal/9949

On Wed, Sep 7, 2022 at 4:30 PM Robin Cassan via user 
 wrote:


Thanks a lot for your answers, this is reassuring!

Cheers

Le mer. 7 sept. 2022 à 13:12, Chesnay Schepler
 a écrit :

Just to squash concerns, we will make sure this license change
will not
affect Flink users in any way.

On 07/09/2022 11:14, Robin Cassan via user wrote:
> Hi all!
> It seems Akka have announced a licensing change
>
https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
> If I understand correctly, this could end-up increasing cost
a lot for
> companies using Flink in production. Do you know if the Flink
> developers have any initial reaction as to how this could be
handled
> (using a Fork? moving out of akka, even though it's probably
> incredibly complex?)? Are we right to assume that this
license applies
> when using akka through Flink?
>
> Thanks a lot!
> Robin




Re: Fail to build Flink 1.15.1

2022-09-09 Thread Chesnay Schepler
hmm...we've only seen that error in older Flink version: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/flinkdev/ide_setup/#compilation-fails-with-cannot-find-symbol-symbol-method-defineclass-location-class-sunmiscunsafe


Please double-check whether you actually checked out 1.15.1; I can't 
reference to sun.misc.Unsafe in the 1.15.1 version of the mentioned 
class: 
https://github.com/apache/flink/blob/release-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java


On 09/09/2022 22:01, Jun Qin wrote:

I have an issue when build a clean Flink 1.15.1 on MacOS with:
mvn clean install -DskipTests -Dfast

% echo $JAVA_HOME
/usr/local/Cellar/openjdk@11/11.0.16.1

% mvn -version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /usr/local/Cellar/maven/3.8.6/libexec
Java version: 11.0.16.1, vendor: Homebrew, runtime: 
/usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home

Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: “mac"

% java -version
openjdk version "11.0.16.1" 2022-08-12
OpenJDK Runtime Environment Homebrew (build 11.0.16.1+0)
OpenJDK 64-Bit Server VM Homebrew (build 11.0.16.1+0, mixed mode)

It failed with:
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile 
(default-compile) on project flink-test-utils-junit: Compilation failure
[ERROR] 
/Workspace/flink-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[244,53] 
cannot find symbol
[ERROR]   symbol: method 
defineClass(java.lang.String,byte[],int,int,java.lang.ClassLoader,java.security.ProtectionDomain)

[ERROR]   location: class sun.misc.Unsafe

I tried also with a downloaded maven 3.2.5 binary (maven 3.2 has been 
disabled in brew):

% ~/Downloads/apache-maven-3.2.5/bin/mvn -version
Apache Maven 3.2.5 (12a6b3acb947671f09b81f49094c53f426d8cea1; 
2014-12-14T18:29:23+01:00)

Maven home: /Users/jqin/Downloads/apache-maven-3.2.5
Java version: 11.0.16.1, vendor: Homebrew
Java home: 
/usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home

Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: "mac"

it failed at the same place with the same error message.

Anything I did is wrong?

Jun




[NOTICE] Blog post regarding Akka's licensing change

2022-09-08 Thread Chesnay Schepler

Hello,

You may have heard about a recent change to the licensing of Akka.
We just published a blog-post regarding this change and what it means 
for Flink.


https://flink.apache.org/news/2022/09/08/akka-license-change.html

TL;DR:

Flink is not in any immediate danger and we will ensure that users are 
not affected by this change.
The licensing of Flink will not change; it will stay Apache-licensed and 
will only contain dependencies that are compatible with it.

We will not use Akka versions with the new license.

Regards,
Chesnay


Re: [NOTICE] Switch docker image base to Eclipse Temurin

2022-09-08 Thread Chesnay Schepler

At first glance this might happen when an older docker version is used:

https://github.com/adoptium/temurin-build/issues/2974

You may need to upgrade to Docker 20.10.5+.

On 08/09/2022 12:33, Sigalit Eliazov wrote:

Hi all,

We pulled the new image and we are facing an issue to start the job 
manager pod.
we are using version 1.14.5-java11 and the cluster is started using 
flink operator


the error is

[ERROR] Could not get JVM parameters and dynamic configurations properly.

[ERROR] Raw output from BashJavaUtils:

[0.011s][warning][os,thread] Failed to start thread "VM Thread" - 
pthread_create failed (EPERM) for attributes: stacksize: 1024k, 
guardsize: 4k, detached.


Error occurred during initialization of VM.


we have tried to change the jvm args of by setting -Xms256m -Xmx1g

but it did not help


any guidance will be appreciated

Thanks

Sigalit


On Mon, Sep 5, 2022 at 1:21 PM Chesnay Schepler  
wrote:


* September 7th

On 05/09/2022 11:27, Chesnay Schepler wrote:
> On Wednesday, September 9th, the Flink 1.14.5/1.15.2 Docker images
> will switch bases
>
> FROM openjdk:8/11-jar (Debian-based)
> TO eclipse-temurin:8/11-jre-jammy (Ubuntu-based)
>
> due to the deprecation of the OpenJDK images.
>
> Users that customized the images are advised to check for breaking
> changes.
>
> The source Dockerfile for the new images is available at
>

https://github.com/apache/flink-docker/tree/4794f9425513fb4c0b55ec1efd629e8eb7e5d8c5.

>




Re: Cassandra sink with Flink 1.15

2022-09-07 Thread Chesnay Schepler
Are you running into this in the IDE, or when submitting the job to a 
Flink cluster?


If it is the first, then you're probably affected by the Scala-free 
Flink efforts. Either add an explicit dependency on 
flink-streaming-scala or migrate to Flink tuples.


On 07/09/2022 14:17, Lars Skjærven wrote:

Hello,

When upgrading from 1.14 to 1.15 we bumped into a type issue when 
attempting to sink to Cassandra (scala 2.12.13). This was working 
nicely in 1.14. Any tip is highly appreciated.


Using a MapFunction() to generate the stream of tuples:

CassandraSink
 .addSink(
mystream.map(new ToTupleMapper)
  )...

Exception: No support for the type of the given DataStream: 
GenericType


Or with a lambda function:

CassandraSink
 .addSink(
    mystream.map((v: MyCaseClass => (v.key v.someLongValue))
  )...

Caused by: 
org.apache.flink.api.common.functions.InvalidTypesException: The 
generic type parameters of 'Tuple2' are missing. In many cases lambda 
methods don't provide enough information for automatic type extraction 
when Java generics are involved. An easy workaround is to use an 
(anonymous) class instead that implements the 
'org.apache.flink.api.common.functions.MapFunction' interface. 
Otherwise the type has to be specified explicitly using type information.






Re: Slow Tests in Flink 1.15

2022-09-07 Thread Chesnay Schepler
The test that gotten slow; how many test cases does it actually contain 
/ how many jobs does it actually run?

Are these tests using the table/sql API?

On 07/09/2022 14:15, Alexey Trenikhun wrote:
We are also observing extreme slow down (5+ minutes vs 15 seconds) in 
1 of 2 integration tests . Both tests use Kafka. The slow test 
uses org.apache.flink.runtime.minicluster.TestingMiniCluster, this 
test tests complete job, which consumes and produces Kafka messages. 
Not affected test extends org.apache.flink.test.util.AbstractTestBase 
which uses MiniClusterWithClientResource, this test is simpler 
and only produce Kafka messages.


Thanks,
Alexey

*From:* Matthias Pohl via user 
*Sent:* Tuesday, September 6, 2022 6:36 AM
*To:* David Jost 
*Cc:* user@flink.apache.org 
*Subject:* Re: Slow Tests in Flink 1.15
Hi David,
I guess, you're referring to [1]. But as Chesnay already pointed out 
in the previous thread: It would be helpful to get more insights into 
what exactly your tests are executing (logs, code, ...). That would 
help identifying the cause.

> Can you give us a more complete stacktrace so we can see what call in
> Flink is waiting for something?
>
> Does this happen to all of your tests?
> Can you provide us with an example that we can try ourselves? If not,
> can you describe the test structure (e.g., is it using a
> MiniClusterResource).

Matthias

[1] https://lists.apache.org/thread/yhhprwyf29kgypzzqdmjgft4qs25yyhk

On Mon, Sep 5, 2022 at 4:59 PM David Jost  wrote:

Hi,

we were going to upgrade our application from Flink 1.14.4 to
Flink 1.15.2, when we noticed, that all our job tests, using a
MiniClusterWithClientResource, are multiple times slower in 1.15
than before in 1.14. I, unfortunately, have not found mentions in
that regard in the changelog or documentation. The slowdown is
rather extreme I hope to find a solution to this. I saw it
mentioned once in the mailing list, but there was no (public)
outcome to it.

I would appreciate any help on this. Thank you in advance.

Best
 David



Re: New licensing for Akka

2022-09-07 Thread Chesnay Schepler
Just to squash concerns, we will make sure this license change will not 
affect Flink users in any way.


On 07/09/2022 11:14, Robin Cassan via user wrote:

Hi all!
It seems Akka have announced a licensing change 
https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
If I understand correctly, this could end-up increasing cost a lot for 
companies using Flink in production. Do you know if the Flink 
developers have any initial reaction as to how this could be handled 
(using a Fork? moving out of akka, even though it's probably 
incredibly complex?)? Are we right to assume that this license applies 
when using akka through Flink?


Thanks a lot!
Robin





Re: New licensing for Akka

2022-09-07 Thread Chesnay Schepler

We'll have to look into it.

The license would apply to usages of Flink.
That said, I'm not sure if we'd even be allowed to use Akka under that 
license since it puts significant restrictions on the use of the software.
If that is the case, then it's either use a fork created by another 
party or switch to a different library.


On 07/09/2022 11:14, Robin Cassan via user wrote:

Hi all!
It seems Akka have announced a licensing change 
https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
If I understand correctly, this could end-up increasing cost a lot for 
companies using Flink in production. Do you know if the Flink 
developers have any initial reaction as to how this could be handled 
(using a Fork? moving out of akka, even though it's probably 
incredibly complex?)? Are we right to assume that this license applies 
when using akka through Flink?


Thanks a lot!
Robin





Re: [NOTICE] Switch docker image base to Eclipse Temurin

2022-09-05 Thread Chesnay Schepler

* September 7th

On 05/09/2022 11:27, Chesnay Schepler wrote:
On Wednesday, September 9th, the Flink 1.14.5/1.15.2 Docker images 
will switch bases


FROM openjdk:8/11-jar (Debian-based)
TO eclipse-temurin:8/11-jre-jammy (Ubuntu-based)

due to the deprecation of the OpenJDK images.

Users that customized the images are advised to check for breaking 
changes.


The source Dockerfile for the new images is available at 
https://github.com/apache/flink-docker/tree/4794f9425513fb4c0b55ec1efd629e8eb7e5d8c5. 






[NOTICE] Switch docker image base to Eclipse Temurin

2022-09-05 Thread Chesnay Schepler
On Wednesday, September 9th, the Flink 1.14.5/1.15.2 Docker images will 
switch bases


FROM openjdk:8/11-jar (Debian-based)
TO eclipse-temurin:8/11-jre-jammy (Ubuntu-based)

due to the deprecation of the OpenJDK images.

Users that customized the images are advised to check for breaking changes.

The source Dockerfile for the new images is available at 
https://github.com/apache/flink-docker/tree/4794f9425513fb4c0b55ec1efd629e8eb7e5d8c5. 



Re: Exception when calculating throughputEMA in 1.14.3

2022-08-23 Thread Chesnay Schepler
Since 1.14.6 has not been released yet your best bet is to either 
disable the debloating feature, upgrade to 1.15, or build Flink yourself.


On 23/08/2022 08:48, Chesnay Schepler wrote:

https://issues.apache.org/jira/browse/FLINK-25454

On 23/08/2022 04:54, Liting Liu (litiliu) wrote:
Hi, we are using 1.14.3, but got "Time should be non negative" after 
the job has been running for days.
What should i do to get rid of this Exception? Do i have to disable 
the network-debloating feature?
Does it's caused by System.currentTimeMillis doesn't always return a 
value bigger than before?


java.lang.IllegalArgumentException: Time should be non negative
  at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:792) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:784) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]






Re: Exception when calculating throughputEMA in 1.14.3

2022-08-23 Thread Chesnay Schepler

https://issues.apache.org/jira/browse/FLINK-25454

On 23/08/2022 04:54, Liting Liu (litiliu) wrote:
Hi, we are using 1.14.3, but got "Time should be non negative" after 
the job has been running for days.
What should i do to get rid of this Exception? Do i have to disable 
the network-debloating feature?
Does it's caused by System.currentTimeMillis doesn't always return a 
value bigger than before?


java.lang.IllegalArgumentException: Time should be non negative
  at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:792) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:784) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]




Re: Flink Histogram not emitting sum while using Metrics Reporter

2022-08-18 Thread Chesnay Schepler
We currently do not compute sums for Histograms, and hence this isn't 
exported to Prometheus.
You'd need to use custom histogram (that actually computes sums) and a 
custom prometheus reporter (that extracts said sum if present) to 
implement this yourself.


On 18/08/2022 18:37, Sarang Vadali via user wrote:


Hi,


I am currently registering a Flink Histogram and using the Prometheus 
Metrics Reporter to send this metric to our Time Series Data Storage. 
When Prometheus grabs this metric and converts it to the "summary" 
type, there is no sum found (only the streaming quantiles and count). 
This is causing an issue when our metrics agent is attempting to 
capture the Flink Histogram/Prometheus Summary.


I was wondering if in a newer version (than 1.13.6) the histogram sum 
is computed by Flink and what that version would be? If not, is there 
any work around so that a Flink histogram can emit all 3 elements 
(quantiles, sum, and count) in Prometheus format?



SARANG VADALI
AMTS | Salesforce
Office: 925-216-1829
Mobile: 925-216-1829






Re: Metrics OOM java heap space

2022-08-15 Thread Chesnay Schepler
The granularity setting isn't relevant because it only matters when you 
enable latency metrics, but they are opt-in and the default config is used.


You can only enable/disable specific metrics in the upcoming 1.16.0.

@Yuriy: You said you had 270k Strings in the StreamConfig; is that 
accurate? How many StreamConfig instances are there anyhow? Asking since 
that is a strange number to have.
I wouldn't conclude that metrics are the problem; it could just be that 
you're already running close to the memory budged limit, and the 
additional memory requirements by metrics just ever so slightly push you 
over it.


On 14/08/2022 10:41, yu'an huang wrote:
You can follow the ticked 
https://issues.apache.org/jira/browse/FLINK-10243 as mentioned in that 
stack overflow question to set this parameter:


“metrics.latency.granularity": 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#metrics-latency-granularity



You only have 1.688gb for your TaskManager. I also suggest you to 
increate the memory configuration otherwise the test may still fail.





On 12 Aug 2022, at 10:52 PM, Yuriy Kutlunin 
 wrote:


Hello Yuan,

I don't override any default settings, docker-compose.yml:
services:
  jobmanager:
    image: flink:1.15.1-java11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:1.15.1-java11
    depends_on:
      - jobmanager
    command: taskmanager
    ports:
      - "8084:8084"
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

        env.java.opts: -XX:+HeapDumpOnOutOfMemoryError
 From TaskManager log:
INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:          1.688gb (1811939328 bytes)
INFO  [] -     Total Flink Memory:          1.250gb (1342177280 bytes)
INFO  [] -       Total JVM Heap Memory:     512.000mb (536870902 bytes)
INFO  [] -         Framework:               128.000mb (134217728 bytes)
INFO  [] -         Task:                    384.000mb (402653174 bytes)
INFO  [] -       Total Off-heap Memory:     768.000mb (805306378 bytes)
INFO  [] -         Managed:                 512.000mb (536870920 bytes)
INFO  [] -         Total JVM Direct Memory: 256.000mb (268435458 bytes)
INFO  [] -           Framework:             128.000mb (134217728 bytes)
INFO  [] -           Task:                  0 bytes
INFO  [] -           Network:               128.000mb (134217730 bytes)
INFO  [] -     JVM Metaspace:               256.000mb (268435456 bytes)
INFO  [] -     JVM Overhead:                192.000mb (201326592 bytes)

I would prefer not to configure memory (at this point), because 
memory consumption depends on job structure, so it always can exceed 
configured values.


My next guess is that the problem is not in metrics content, but in 
their number, which increases with the number of operators.
So the next question is if there is a way to exclude metric 
generation on operator level.

Found same question without correct answer on SOF:
https://stackoverflow.com/questions/54215245/apache-flink-limit-the-amount-of-metrics-exposed

On Fri, Aug 12, 2022 at 4:05 AM yu'an huang  wrote:
Hi Yuriy,

How do you set your TaskMananger Memory? I think 40MB is not 
significant high for Flink. And It’s normal to see memory increase if 
you have more parallelism or set another metrics on. You can try 
setting larger moratory for Flink as explained by following documents.


https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/

Best
Yuan



On 12 Aug 2022, at 12:51 AM, Yuriy Kutlunin 
 wrote:


Hi all,

I'm running Flink Cluster in Session Mode via docker-compose as 
stated in docs:

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#session-cluster-yml

After submitting a test job with many intermediate SQL operations 
(~500 select * from ...) and metrics turned on (JMX or Prometheus) I 
got OOM: java heap space on initialization stage.


Turning metrics off allows the job to get to the Running state.
Heap consumption also depends on parallelism - same job succeeds 
when submitted with parallelism 1 instead of 2.


There are Task Manager logs for 4 cases:
JMX parallelism 1 (succeeded)
JMX parallelism 2 (failed)
Prometheus parallelism 2 (failed)
No metrics parallelism 2 (succeeded)

Post OOM heap dump (JMX parallelism 2) shows 2 main consumption points:
1. Big value (40MB) for some task configuration
2. Many instances (~270k) of some heavy (20KB) value in StreamConfig

Seems like all these heavy values are related to weird task names, 
which includes all the operations:
Received task Source: source1 -> SourceConversion[2001] -> mapping1 
-> SourceConversion[2003] 

Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Chesnay Schepler
This is somewhat implied in 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#consumer-offset-committing.


/> Note that Kafka source does //*NOT*//rely on committed offsets for 
fault tolerance. Committing offset is only for exposing the progress of 
consumer and consuming group for monitoring./

/
/
and 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version/

/

/> Set |setStartFromGroupOffsets(true)| on the consumer so that we get 
read offsets from Kafka. This will only take effect when there is no 
read offset in Flink state, which is why the next step is very important./

/
/
/
/
Dynamic partition discovery shouldn't have an effect because you are not 
creating partitions/topics.

//

On 21/07/2022 12:14, Alexis Sarda-Espinosa wrote:

I would suggest updating the documentation to include that statement.

I imagine dynamic partition discovery has no effect on this?

Regards,
Alexis.

Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler 
:


Flink only reads the offsets from Kafka when the job is initially
started from a clear slate.
Once checkpoints are involved it only relies on offsets stored in
the state.

On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote:

Hello again,

I just performed a test
using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST).
I did a few tests in the following order, and I noticed a few
weird things. Note that our job uses Processing Time windows, so
watermarks are irrelevant.

1. After the job had been running for a while, we manually moved
the consumer group's offset to 12 hours in the past [1] (without
restarting the job).
  - After this, the consumer simply stopped reading messages -
the consumer lag in Kafka stayed at around 150k (no new data arrived)

2. We restarted the job with a checkpoint.
  - The consumer lag in Kafka dropped down to 0, but no data was
emitted from the windows.

3. We stopped the job, moved the offset again, and restarted
Without any checkpoint/savepoint.
  - This time the consumer correctly processed the backlog and
emitted events from the windows.

This was done with Flink 1.15.0.

Is this expected? In other words, if there's a mismatch between
Flink's state's offset and Kafka's offset, will the job be unable
to run?



[1] The command to move the offset was:

kafka-consumer-groups.sh \
  --bootstrap-server ... \
  --topic our-topic \
  --group our-group \
  --command-config kafka-preprod.properties \
  --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
  --execute

Regards,
Alexis.

Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa
:

Hi Yaroslav,

The test I did was just using earliest, I'll test with
committed offset again, thanks.

Regards,
Alexis.

On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko,
 wrote:

Hi Alexis,

Do you use OffsetsInitializer.committedOffsets() to
specify your Kafka consumer offsets? In this case, it
should get the offsets from Kafka and not the state.

On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa
 wrote:

Hello,

Regarding the new Kafka source (configure with a
consumer group), I found out that if I manually
change the group's offset with Kafka's admin API
independently of Flink (while the job is running),
the Flink source will ignore that and reset it to
whatever it stored internally. Is there any way to
prevent this?

Regards,
Alexis.





Re: Job id in logs

2022-07-21 Thread Chesnay Schepler
No, that is not possible. There are too man shared components (many of 
which not being aware of jobs at all) for this to be feasible.


On 21/07/2022 10:49, Lior Liviev wrote:
Hello, is there a way to add job Id to logs to distinguish between 
different jobs?




Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Chesnay Schepler
Flink only reads the offsets from Kafka when the job is initially 
started from a clear slate.

Once checkpoints are involved it only relies on offsets stored in the state.

On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote:

Hello again,

I just performed a test 
using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). 
I did a few tests in the following order, and I noticed a few weird 
things. Note that our job uses Processing Time windows, so watermarks 
are irrelevant.


1. After the job had been running for a while, we manually moved the 
consumer group's offset to 12 hours in the past [1] (without 
restarting the job).
  - After this, the consumer simply stopped reading messages - the 
consumer lag in Kafka stayed at around 150k (no new data arrived)


2. We restarted the job with a checkpoint.
  - The consumer lag in Kafka dropped down to 0, but no data was 
emitted from the windows.


3. We stopped the job, moved the offset again, and restarted Without 
any checkpoint/savepoint.
  - This time the consumer correctly processed the backlog and emitted 
events from the windows.


This was done with Flink 1.15.0.

Is this expected? In other words, if there's a mismatch between 
Flink's state's offset and Kafka's offset, will the job be unable to run?




[1] The command to move the offset was:

kafka-consumer-groups.sh \
  --bootstrap-server ... \
  --topic our-topic \
  --group our-group \
  --command-config kafka-preprod.properties \
  --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
  --execute

Regards,
Alexis.

Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa 
:


Hi Yaroslav,

The test I did was just using earliest, I'll test with committed
offset again, thanks.

Regards,
Alexis.

On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko,
 wrote:

Hi Alexis,

Do you use OffsetsInitializer.committedOffsets() to specify
your Kafka consumer offsets? In this case, it should get the
offsets from Kafka and not the state.

On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa
 wrote:

Hello,

Regarding the new Kafka source (configure with a consumer
group), I found out that if I manually change the group's
offset with Kafka's admin API independently of Flink
(while the job is running), the Flink source will ignore
that and reset it to whatever it stored internally. Is
there any way to prevent this?

Regards,
Alexis.



Re: Did the semantics of Kafka's earliest offset change with the new source API?

2022-07-15 Thread Chesnay Schepler
I'm not sure about the previous behavior, but at the very least 
according to the documentation the behavior is identical.


1.12: 
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-start-position-configuration


/|setStartFromEarliest()|/// //|setStartFromLatest()|//: Start from the 
earliest / latest record. Under these modes, committed offsets in Kafka 
will be ignored and not used as starting positions./


On 13/07/2022 18:53, Alexis Sarda-Espinosa wrote:

Hello,

I have a job running with Flink 1.15.0 that consumes from Kafka with 
the new KafkaSource API, setting a group ID explicitly and specifying 
OffsetsInitializer.earliest() as a starting offset. Today I restarted 
the job ignoring both savepoint and checkpoint, and the consumer 
started reading from the first available message in the broker (from 
24 hours ago), i.e. it completely ignored the offsets that were 
committed to Kafka. If I 
use OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST) 
instead, the problem seems to go away.


With the previous FlinkKafkaConsumer, using earliest didn't cause any 
such issues. Was this changed in the aforementioned way on purpose?


Regards,
Alexis.



Re: Unaligned checkpoint waiting in 'start delay' with AsyncDataStream

2022-07-13 Thread Chesnay Schepler

Ah OK I could reproduce the problem.

Seems to be tied to the capacity of the async operator; if you half that 
the start delay is doubled.
It looks like classic back-pressure delaying checkpoints, which kinda 
makes sense,
if you ignore that unaligned checkpoints are enabled which are supposed 
to prevent that from happening.


I think it'd be best to create a ticket; either something isn't behaving 
as it should or the documentation is incomplete.


On 12/07/2022 20:43, Nathan Sharp wrote:

I have not found a solution yet, but some points:
  - A co-worker has reproduced this issue on their own box using the recipe 
given below
  - I have tried using rocksdb state backend, which did not help
  - I have tried adding additional TaskWorkers, which did not help
  - I have checked the TaskWorker stats and nothing seems awry. No memory 
consumption, for example. Nothing obvious in the stack traces
  - If I change the code to be sequential instead of async, checkpoints work 
fine
  - The log file merely shows the checkpoint being triggered, then it being 
completed 47 seconds later. No additional information is logged.
  - See the attached image for the UI representation, which shows that the delay is under 
the "Start Time" column.

  Chesnay, how was your Flink cluster configured when it worked for you? Are 
you able to reproduce it using my docker-compose file?

Thanks again!
   Nathan

-Original Message-
From: Nathan Sharp
Sent: Monday, July 4, 2022 10:00 AM
To: 'Chesnay Schepler' ; user@flink.apache.org
Subject: RE: Unaligned checkpoint waiting in 'start delay' with AsyncDataStream

Thank you for trying it out! Hopefully, there is just some setting that needs 
to be changed.

I have an Ubuntu VM where I created a single node Docker swarm. Then I used the 
following command to run Flink 1.15.0 using the docker-compose.yml file in the 
repository:

docker stack up -c docker-compose.yml flink

Then I used Flink's web UI to upload the .jar file and run it with default 
settings.

   Nathan







Re: Unaligned checkpoint waiting in 'start delay' with AsyncDataStream

2022-07-04 Thread Chesnay Schepler
I ran your code in the IDE and it worked just fine; checkpoints are 
being completed and results are printed to the console.


Can you expand on how you run the job?

On 02/07/2022 00:26, Nathan Sharp wrote:

I am attempting to use unaligned checkpointing with AsyncDataStream, but the checkpoints 
sit in "start delay" until the job finishes. I am brand new to Flink, so it is 
entirely reasonable to assume the problem is with my code.

I published my test code at 
https://github.com/phxnsharp/AsyncDataStreamCheckpointReproduction

Searching the web seems to indicate the most common issue is asyncInvoke 
blocking. I added a test to make sure that that is not true.

Any help would be greatly appreciated!

  Thanks,
   Nathan





Re: How to mock new DataSource/Sink

2022-07-04 Thread Chesnay Schepler

It is indeed not easy to mock sources/sink with the new interfaces.

There is an effort to make this easier for sources in the future 
(FLIP-238 
).


For the time being I'd stick with the old APIs for mock sources/sinks.

On 04/07/2022 10:23, David Jost wrote:

Hi,

we are currently looking at replacing our sinks and sources with the respective 
counterparts using the 'new' data source/sink API (mainly Kafka). What holds us 
back is that we are not sure how to test the pipeline with mocked 
sources/sinks. Up till now, we somewhat followed the 'Testing docs'[0] and 
created a simple SinkFunction, as well as a ParallelSourceFunction, where we 
could get data in and out at our leisure. They could be easily plugged into the 
pipeline for the tests. But with the new API, it seems way too cumbersome to go 
such an approach, as there is a lot of overhead in creating a sink or source on 
your own (now).

I would love to know, what the intended or recommended way is here. I know, 
that I can still use the old API, but that a) feels wrong, and b) requires us 
to expose the DataStream, which is not necessary in the current setup.

I appreciate any ideas or even examples on this.

Thank you in advance.

Best
   David


[0]:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/testing/#testing-flink-jobs




Re: Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-17 Thread Chesnay Schepler
We did several changes to the savepoint rest API backend, where 
something may have snuck in.
The odd thing is that you only see the issue for stop-with-savepoint, 
which are internally handled the same way as savepoints.


On 16/06/2022 17:57, Peter Westermann wrote:


We run a standalone Flink cluster in session mode (but we usually only 
run one job per cluster; session mode just fits better with our 
deployment workflow than application mode).


We trigger hourly savepoints and also use savepoints to stop a job and 
then restart with a new version of the jar.


I haven’t seen any issue with the hourly savepoints (without stopping 
the job).  For these, I can see messages such as Evicted result with 
trigger id 30f9457373eba7b9de1bdeaf591a6956 because its TTL of 300s 
has expired.


~5 minutes after savepoint completion.

When the stop-with-savepoint status lookup fails with Exception 
occurred in REST handler: There is no savepoint operation with 
triggerId=cee5054245598efb42245b3046a6ae75


I still see Evicted result with trigger id 
cee5054245598efb42245b3046a6ae75because its TTL of 300s has expired.~5 
minutes after savepoint completion.


The documentation 
<https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/rest_api/#api> 
for Flink 1.15 mentions a new feature:


/For (stop-with-)savepoint operations you can control this 
//triggerId// by setting it in the body of the request that triggers 
the operation. This allow you to safely* retry such operations without 
triggering multiple savepoints./


Could this have anything to do with the error I am seeing?

Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com <mailto:peter.westerm...@genesys.com>

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 <http://www.genesys.com/>

*From: *Chesnay Schepler 
*Date: *Thursday, June 16, 2022 at 11:32 AM
*To: *Peter Westermann , 
user@flink.apache.org 

*Subject: *Re: Sporadic issues with savepoint status lookup in Flink 1.15

* EXTERNAL EMAIL - Please use caution with links and attachments *



ok that shouldn't happen. I couldn't find anything wrong in the code 
so far; will continue trying to reproduce it.


If this happens, does it persist indefinitely for a particular 
triggerId, or does it reappear later on again?


Are you only ever triggering a single savepoint for a given job?

Are you using session or application clusters?

On 16/06/2022 16:59, Peter Westermann wrote:

If it happens it happens immediately. Once we receive the
triggerId from */jobs/:jobid/stop *or*/jobs/:jobid/savepoints* we
poll */jobs/:jobid/savepoints/:triggerid *every second until the
status is no longer IN_PROGRESS.

Peter Westermann

Analytics Software Architect

peter.westerm...@genesys.com <mailto:peter.westerm...@genesys.com>

<http://www.genesys.com/>

*From: *Chesnay Schepler 
<mailto:ches...@apache.org>
*Date: *Thursday, June 16, 2022 at 10:55 AM
*To: *Peter Westermann 
<mailto:no.westerm...@genesys.com>, user@flink.apache.org
 <mailto:user@flink.apache.org>
*Subject: *Re: Sporadic issues with savepoint status lookup in
Flink 1.15

* EXTERNAL EMAIL - Please use caution with links and attachments *



There is an expected case where this might happen:

if too much time has elapsed since the savepoint was completed
(default 5 minutes; controlled by rest.async.store-duration)

Did this happen earlier than that?

On 16/06/2022 15:53, Peter Westermann wrote:

We recently upgraded one of our Flink clusters to version
1.15.0 and are now seeing sporadic issues when stopping a job
with a savepoint via the REST API. This happens for
*/jobs/:jobid/savepoints *and*/jobs/:jobid/stop*:

The job finishes with a savepoint but the triggerId returned
from the REST API seems to be invalid. Any lookups via
*/jobs/:jobid/savepoints/:triggerid* fail with a 404 and the
following error:

org.apache.flink.runtime.rest.handler.RestHandlerException:
There is no savepoint operation with
triggerId=cee5054245598efb42245b3046a6ae75 for job
0995a9461f0178294ea71c9accbe750c

Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com <mailto:peter.westerm...@genesys.com>

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 <http://www.genesys.com/>



Re: Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-16 Thread Chesnay Schepler
ok that shouldn't happen. I couldn't find anything wrong in the code so 
far; will continue trying to reproduce it.


If this happens, does it persist indefinitely for a particular 
triggerId, or does it reappear later on again?

Are you only ever triggering a single savepoint for a given job?

Are you using session or application clusters?

On 16/06/2022 16:59, Peter Westermann wrote:


If it happens it happens immediately. Once we receive the triggerId 
from */jobs/:jobid/stop *or*/jobs/:jobid/savepoints* we poll 
*/jobs/:jobid/savepoints/:triggerid *every second until the status is 
no longer IN_PROGRESS.


Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com <mailto:peter.westerm...@genesys.com>

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 <http://www.genesys.com/>

*From: *Chesnay Schepler 
*Date: *Thursday, June 16, 2022 at 10:55 AM
*To: *Peter Westermann , 
user@flink.apache.org 

*Subject: *Re: Sporadic issues with savepoint status lookup in Flink 1.15

* EXTERNAL EMAIL - Please use caution with links and attachments *



There is an expected case where this might happen:

if too much time has elapsed since the savepoint was completed 
(default 5 minutes; controlled by rest.async.store-duration)


Did this happen earlier than that?

On 16/06/2022 15:53, Peter Westermann wrote:

We recently upgraded one of our Flink clusters to version 1.15.0
and are now seeing sporadic issues when stopping a job with a
savepoint via the REST API. This happens for
*/jobs/:jobid/savepoints *and*/jobs/:jobid/stop*:

The job finishes with a savepoint but the triggerId returned from
the REST API seems to be invalid. Any lookups via
*/jobs/:jobid/savepoints/:triggerid* fail with a 404 and the
following error:

org.apache.flink.runtime.rest.handler.RestHandlerException: There
is no savepoint operation with
triggerId=cee5054245598efb42245b3046a6ae75 for job
0995a9461f0178294ea71c9accbe750c

Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com <mailto:peter.westerm...@genesys.com>

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 <http://www.genesys.com/>



Re: Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-16 Thread Chesnay Schepler

Are there any log messages from the CompletedOperationCache in the logs?

On 16/06/2022 16:54, Chesnay Schepler wrote:

There is an expected case where this might happen:
if too much time has elapsed since the savepoint was completed 
(default 5 minutes; controlled by rest.async.store-duration)


Did this happen earlier than that?

On 16/06/2022 15:53, Peter Westermann wrote:


We recently upgraded one of our Flink clusters to version 1.15.0 and 
are now seeing sporadic issues when stopping a job with a savepoint 
via the REST API. This happens for */jobs/:jobid/savepoints 
*and*/jobs/:jobid/stop*:


The job finishes with a savepoint but the triggerId returned from the 
REST API seems to be invalid. Any lookups via 
*/jobs/:jobid/savepoints/:triggerid* fail with a 404 and the 
following error:


org.apache.flink.runtime.rest.handler.RestHandlerException: There is 
no savepoint operation with 
triggerId=cee5054245598efb42245b3046a6ae75 for job 
0995a9461f0178294ea71c9accbe750c


Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com <mailto:peter.westerm...@genesys.com>

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 <http://www.genesys.com/>





Re: Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-16 Thread Chesnay Schepler

There is an expected case where this might happen:
if too much time has elapsed since the savepoint was completed (default 
5 minutes; controlled by rest.async.store-duration)


Did this happen earlier than that?

On 16/06/2022 15:53, Peter Westermann wrote:


We recently upgraded one of our Flink clusters to version 1.15.0 and 
are now seeing sporadic issues when stopping a job with a savepoint 
via the REST API. This happens for */jobs/:jobid/savepoints 
*and*/jobs/:jobid/stop*:


The job finishes with a savepoint but the triggerId returned from the 
REST API seems to be invalid. Any lookups via 
*/jobs/:jobid/savepoints/:triggerid* fail with a 404 and the following 
error:


org.apache.flink.runtime.rest.handler.RestHandlerException: There is 
no savepoint operation with triggerId=cee5054245598efb42245b3046a6ae75 
for job 0995a9461f0178294ea71c9accbe750c


Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com 

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 



Re: Flink Shaded dependencies and extending Flink APIs

2022-06-13 Thread Chesnay Schepler

Can we find a more robust way to support this?

Both flink-shaded, any relocation pattern and 
JsonRowDataSerializationSchema are Flink internals that users shouldn't 
use/rely on.


On 13/06/2022 12:26, Qingsheng Ren wrote:

Hi Andrew,

This is indeed a tricky case since Flink doesn't provide non-shaded
JAR for flink-json. One hacky solution in my mind is like:

1. Create a module let's say "wikimedia-event-utilities-shaded" that
relocates Jackson in the same way and uses the same Jackson version as
flink-shaded-jackson
2. Deploy the module to a local or remote Maven repository
3. Let your custom format depend on the
"wikimedia-event-utilities-shaded" module, then all Jackson
dependencies are relocated in the same way.

Another solution is that you can serialize then deserialize the
"different" ObjectNode to do the conversion but this sacrifices the
performance.

Hope this could be helpful!

Best regards,

Qingsheng

On Thu, Jun 9, 2022 at 8:29 PM Andrew Otto  wrote:

Hi all,

I'm working on an integration project trying to write some library code that 
will allow us at the Wikimedia Foundation to use Flink with our 'Event 
Platform'.  Specifically, I'm trying to write a reusable step near the end of a 
pipeline that will ensure our JSON events satisfy some criteria before 
producing them to Kafka.  Details here.

I'm experimenting with writing my own custom format to do this.  But all I 
really need to do is override JsonRowDataSerializationSchema's serialize method 
and augment and validate the ObjectNode before it is serialized to byte[].

I'm running into an issue where the ObjectNode that is used by Flink here is 
the shaded one: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,
 whereas the WMF code I want to use to augment the ObjectNode is using a 
regular non shaded one.  I can't pass the shaded ObjectNode instance to a 
function that takes a non shaded one, and I can't cast the shaded ObjectNode to 
non shaded either.

My Q is: is there a way to extend Flink APIs that use shaded dependencies?  I suppose I 
could copy/paste the whole of the "json" format code that I need into my 
project and just make it my own, but this feels quite obnoxious.

Thank you!
-Andrew Otto
  Wikimedia Foundation






Re: Needed help with skipping savepoint state (unsure how to set --allowNonRestoredState in Docker)

2022-06-07 Thread Chesnay Schepler
You are on the right path with using the --allowNonRestoredState flag; 
we'll just have to find the right place to put it w.r.t. your setup.


Which docker images are you using (flink/statefun/something custom), and 
how do you submit the job?


On 03/06/2022 01:17, Bhavani Balasubramanyam wrote:

Hi,

I am Bhavani, a Software Engineer at Reddit. I'm trying to upgrade the
Flink version in my application from 3.0.0 to version 3.2.0, and in the
process I see the below error, where the  the operator has been removed,
and the checkpoint is unable to recover:


- Jun 2 14:32:03
snooron-worker-perspective-flink-staging-statefun-master-5f8hzd master
ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal
error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job
 failed. at

org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:913)
~[flink-dist_2.12-1.14.3.jar:1.14.3] at

org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:473)
~[flink-dist_2.12-1.14.3.jar:1.14.3] at

org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:450)
~[flink-dist_2.12-1.14.3.jar:1.14.3] at

org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:427)
~[flink-dist_2.12-1.14.3.jar:1.14.3] at
java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?] at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(Unknown
Source) ~[?:?] at

org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
~[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at

org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
~[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at

org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
~[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at

org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
~[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at

org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
~[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at

org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
~[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.actor.Actor.aroundReceive(Actor.scala:537)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.actor.Actor.aroundReceive$(Actor.scala:535)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.actor.ActorCell.invoke(ActorCell.scala:548)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.dispatch.Mailbox.run(Mailbox.scala:231)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
akka.dispatch.Mailbox.exec(Mailbox.scala:243)
[flink-rpc-akka_0833457a-e96a-488d-b0fd-a4aafb20352d.jar:1.14.3] at
java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?] at

Re: Unable to retrieve savepoint status from non-leader/standby in HA with Flink 1.15

2022-06-07 Thread Chesnay Schepler

I think your analysis is correct; I'll file a ticket.

On 03/06/2022 15:28, Nick Birnberg wrote:

Hello everyone!

Our current setup has us running Flink on Kubernetes in HA mode 
(Zookeeper) with multiple JobManagers. This appears to be a regression 
from 1.14.


We can use the flink CLI to communicate with the REST API to reproduce 
this. We directly target a standby JobManager (by using `kubectl 
port-forward $STANDBY_JM 8081`. And then run `flink savepoint -m 
localhost:8081 $JOB_ID`. This command triggers the savepoint  via the 
REST API and polls for it using the triggerId.


Relevant stack trace:

org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Internal 
server error while retrieving status of savepoint operation with 
triggerId=10e6bb05749f572cf4ee5eee9b4959c7 for job 
488f4846310e2763dd1c338d7d7f55bb.
at 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.createInternalServerError(SavepointHandlers.java:352)
at 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.access$000(SavepointHandlers.java:115)
at 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$null$0(SavepointHandlers.java:311)

...
Caused by: 
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed 
to serialize the result for RPC call : getTriggeredSavepointStatus.
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:405)

...
Caused by: java.io.NotSerializableException: 
org.apache.flink.runtime.rest.handler.async.OperationResult
at 
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at 
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:388)

... 30 more
]
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:532)


The savepoint itself is successful and this is not a problem if we 
target the leader JobManager. This seems similar to 
https://issues.apache.org/jira/browse/FLINK-26779 and I would think 
that the solution would be to 
have org.apache.flink.runtime.rest.handler.async.OperationResult 
implement Serializable, but I wanted a quick sanity check to make sure 
this is reproducible outside of our environment before moving forward.


Thank you!





Re: Slow tests on Flink 1.15

2022-06-07 Thread Chesnay Schepler
Can you give us a more complete stacktrace so we can see what call in 
Flink is waiting for something?


Does this happen to all of your tests?
Can you provide us with an example that we can try ourselves? If not, 
can you describe the test structure (e.g., is it using a 
MiniClusterResource).


On 02/06/2022 11:48, Lasse Nedergaard wrote:

Hi.

Just tried to upgrade from 1.14.2 to 1.15.0. It went well and our jobs runs as 
expected.

We have a number of test, testing the entire job so we mock input and output 
and start our job with mocked data. After upgrading a simple test now takes 
minutes where it before was less than a minute.
If I run a test in debug data are processed right a way but the job are stuck 
in park method in the idk.internal.misc.unsafe object I Java 11 and it’s called 
from StreamExecutionEnviroment.execute (job client mini cluster).

Any idea why this happens and what I’m missing?

Med venlig hilsen / Best regards
Lasse Nedergaard





Re: SV: How to enable statefun metrics for the Slf4j reporter

2022-05-23 Thread Chesnay Schepler

Just to double-check, you are checking the taskmanager logs, correct?

On 23/05/2022 15:24, Christopher Gustafson wrote:


Yes, Flink metrics are showing up as usual, but none of the ones that 
are listed in the StateFun documentation.



*Från:* Chesnay Schepler 
*Skickat:* den 23 maj 2022 14:29:15
*Till:* Christopher Gustafson; user@flink.apache.org
*Ämne:* Re: How to enable statefun metrics for the Slf4j reporter
You shouldn't have to do more than that.

Flink metrics are showing up as expected? Including metrics from tasks?

On 23/05/2022 14:03, Christopher Gustafson wrote:


Hi!


I am trying to enable the StateFun metrics in the documentation to be 
logged using the Slf4j reporter but I cannot figure out how to do it, 
and the documentation is pretty vague if you are not familiar with 
the Flink metrics beforehand. Could someone show me how to enable it, 
i.e what entries need to be added to my conf/flink.conf file for 
example? Currently, I am enabling the reporter with the following 
lines in my Flink config:


metrics.reporter.slf4j.factory.class: 
org.apache.flink.metrics.slf4j.Slf4jReporterFactory
metrics.reporter.slf4j.interval: 1 SECONDS
But I cannot see any of the StateFun specific metrics in the logs.


Best Regards,

Christopher Gustafson





Re: flink sql api, exception when setting "table.exec.state.ttl"

2022-05-23 Thread Chesnay Schepler

You're probably mixing Flink versions.

From the stack trace we can see that Flink classes are being loaded 
from 2 different jars 
(rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); I'd 
suggest to resolve that first and see if the error persists.


On 23/05/2022 14:32, 李诗君 wrote:

flink version: 1.13.5

java code:

StreamExecutionEnvironment env 
=StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings 
settings =EnvironmentSettings.newInstance()
 .useBlinkPlanner()
 .inStreamingMode()
 .build(); StreamTableEnvironmenttableEnv = 
StreamTableEnvironment.create(env, 
settings);env.enableCheckpointing(6);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6); 
env.getCheckpointConfig().setCheckpointTimeout(6); 
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
env.getCheckpointConfig().enableExternalizedCheckpoints(
 CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // 
env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints"); 
env.setStateBackend(new RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints", true)); tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); Configuration configuration =tableEnv.getConfig().getConfiguration(); // 
configuration.setString("table.exec.resource.default-parallelism","16"); 
configuration.setString("table.exec.state.ttl","720");


and when I submit this job , I got this:

Sink: 
Sink(table=[default_catalog.default_database.rts_board_trans_compute], 
fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, 
average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched 
from INITIALIZING to FAILED on container_1647420330066_0473_01_02 
@ test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native 
Method) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
 

Re: How to enable statefun metrics for the Slf4j reporter

2022-05-23 Thread Chesnay Schepler

You shouldn't have to do more than that.

Flink metrics are showing up as expected? Including metrics from tasks?

On 23/05/2022 14:03, Christopher Gustafson wrote:


Hi!


I am trying to enable the StateFun metrics in the documentation to be 
logged using the Slf4j reporter but I cannot figure out how to do it, 
and the documentation is pretty vague if you are not familiar with the 
Flink metrics beforehand. Could someone show me how to enable it, i.e 
what entries need to be added to my conf/flink.conf file for example? 
Currently, I am enabling the reporter with the following lines in my 
Flink config:


metrics.reporter.slf4j.factory.class: 
org.apache.flink.metrics.slf4j.Slf4jReporterFactory
metrics.reporter.slf4j.interval: 1 SECONDS
But I cannot see any of the StateFun specific metrics in the logs.


Best Regards,

Christopher Gustafson



Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-04 Thread Chesnay Schepler
Ah that's unfortunate. Yeah the feature freeze was quite a bit earlier 
than I remembered :(


On 04/05/2022 15:31, Peter Schrott wrote:

Hi Chesnay,

Thanks again for the hints.

Unfortunately the metrics filtering feature is not part of 1.15.0. It 
seems to be part of 1.16.0: 
https://issues.apache.org/jira/browse/FLINK-21585
I was already wondering why I could not find the feature in the docs 
you linked.


> Disabling the kafka metrics _should_ work
Setting `'properties.register.consumer.metrics' = 'false',` and 
'properties.register.producer.metrics' = 'false',` in the SQL table 
options for source / sink works. Remaining metrics are exposed on 9200.
The thing is I wanted to investigate in the consumer behavior in the 
first place :D That`s how I came across the bug.


Anyways, big thanks for your greate support!


On Wed, May 4, 2022 at 1:53 PM Chesnay Schepler  
wrote:


Disabling the kafka metrics _should_ work.

Alternatively you could use the new generic feature to filter metrics:


https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/metric_reporters/#filter-excludes

metrics.reporter..filter.excludes:
*KafkaProducer*;*KafkaConsumer*

This should disable all kafka metrics. (You could also drill down
and exclude specific problematic metrics; see the docs.)

On 04/05/2022 13:36, Peter Schrott wrote:

Allright! Thanks!

I tried to dig a bit deeper and see if there is any workaround
for that problem. I tried to switch off reporting the Kafka
metrics, but I was not quite successful. I am using the table api
Kafka connector.

Do you have any suggestions on how to overcome this?

Could you also provide the ticket number after creation?

Thanks, Peter

On Wed, May 4, 2022 at 1:22 PM Chesnay Schepler
 wrote:

Yes, that looks like a new bug in 1.15.
The migration to the new non-deprecated Kafka API in the
KafkaMetricMutableWrapper was done incorrectly.

This should affect every job that uses the new kafka connector.

Thank you for debugging the issue!

I will create a ticket.

On 04/05/2022 12:24, Peter Schrott wrote:

As the stracktrace says, class cast exception occurs here:

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37

I found the following metrics to be affected (might be more):
MetricName [name=version, group=app-info, description=Metric
indicating version, tags={client-id=producer-3}]
-> value: "6.2.2-ccs" (String)

MetricName [name=start-time-ms, group=app-info,
description=Metric indicating start-time-ms,
tags={client-id=producer-3}]
-> value: 1651654724987 (Long)

MetricName [name=commit-id, group=app-info,
description=Metric indicating commit-id,
tags={client-id=producer-3}]
-> value: "2ceb5dc7891720b7" (String)

Problematic code part seems to be introduced with "Bump
Kafka version to 2.8":

https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068

Is this a potential bug introduced in 1.15.0?

Best, Peter

On Wed, May 4, 2022 at 9:58 AM Peter Schrott
 wrote:

Sorry for the spamming!

Just after jumping into the debug-session I noticed that
there are indeed exceptions thrown when fetching the
metrics on port 9200:

13657 INFO   [ScalaTest-run]com.sun.net.httpserver   - HttpServer 
created http0.0.0.0/0.0.0.0:9200 13658 INFO   
[ScalaTest-run]com.sun.net.httpserver   - context created: /
13658 INFO   [ScalaTest-run]com.sun.net.httpserver   - context 
created: /metrics
13659 INFO   [ScalaTest-run]o.a.f.m.p.PrometheusReporter   - 
Started PrometheusReporter HTTP server on port9200.
13745 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
14028 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
14998 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
15580 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
16022 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
16458 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
16885 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
17381 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
17809 DEBUG  [

Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-04 Thread Chesnay Schepler

Disabling the kafka metrics _should_ work.

Alternatively you could use the new generic feature to filter metrics:

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/metric_reporters/#filter-excludes

metrics.reporter..filter.excludes: 
*KafkaProducer*;*KafkaConsumer*


This should disable all kafka metrics. (You could also drill down and 
exclude specific problematic metrics; see the docs.)


On 04/05/2022 13:36, Peter Schrott wrote:

Allright! Thanks!

I tried to dig a bit deeper and see if there is any workaround for 
that problem. I tried to switch off reporting the Kafka metrics, but I 
was not quite successful. I am using the table api Kafka connector.


Do you have any suggestions on how to overcome this?

Could you also provide the ticket number after creation?

Thanks, Peter

On Wed, May 4, 2022 at 1:22 PM Chesnay Schepler  
wrote:


Yes, that looks like a new bug in 1.15.
The migration to the new non-deprecated Kafka API in the
KafkaMetricMutableWrapper was done incorrectly.

This should affect every job that uses the new kafka connector.

Thank you for debugging the issue!

I will create a ticket.

On 04/05/2022 12:24, Peter Schrott wrote:

As the stracktrace says, class cast exception occurs here:

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37

I found the following metrics to be affected (might be more):
MetricName [name=version, group=app-info, description=Metric
indicating version, tags={client-id=producer-3}]
-> value: "6.2.2-ccs" (String)

MetricName [name=start-time-ms, group=app-info,
description=Metric indicating start-time-ms,
tags={client-id=producer-3}]
-> value: 1651654724987 (Long)

MetricName [name=commit-id, group=app-info, description=Metric
indicating commit-id, tags={client-id=producer-3}]
-> value: "2ceb5dc7891720b7" (String)

Problematic code part seems to be introduced with "Bump Kafka
version to 2.8":

https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068

Is this a potential bug introduced in 1.15.0?

Best, Peter

On Wed, May 4, 2022 at 9:58 AM Peter Schrott
 wrote:

Sorry for the spamming!

Just after jumping into the debug-session I noticed that
there are indeed exceptions thrown when fetching the metrics
on port 9200:

13657 INFO   [ScalaTest-run]com.sun.net.httpserver   - HttpServer 
created http0.0.0.0/0.0.0.0:9200 13658 INFO   
[ScalaTest-run]com.sun.net.httpserver   - context created: /
13658 INFO   [ScalaTest-run]com.sun.net.httpserver   - context created: 
/metrics
13659 INFO   [ScalaTest-run]o.a.f.m.p.PrometheusReporter   - Started 
PrometheusReporter HTTP server on port9200.
13745 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
14028 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
14998 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
15580 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
16022 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
16458 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
16885 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
17381 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
17809 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
18259 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
18695 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
19159 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
19758 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
20112 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
20544 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
20989 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
21419 DEBUG  [prometheus-http-1-2]o.a.f.m.p.PrometheusReporter   - 
Invalid type for 
Gaugeorg.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3fae55e7:java.lang.String,
 only number types and booleans are supported by this reporter.
21421 DEBUG  [prometheus-http-1-2]com.sun.net.ht

Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-04 Thread Chesnay Schepler

https://issues.apache.org/jira/browse/FLINK-27487

On 04/05/2022 13:22, Chesnay Schepler wrote:

Yes, that looks like a new bug in 1.15.
The migration to the new non-deprecated Kafka API in the 
KafkaMetricMutableWrapper was done incorrectly.


This should affect every job that uses the new kafka connector.

Thank you for debugging the issue!

I will create a ticket.

On 04/05/2022 12:24, Peter Schrott wrote:

As the stracktrace says, class cast exception occurs here:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37

I found the following metrics to be affected (might be more):
MetricName [name=version, group=app-info, description=Metric 
indicating version, tags={client-id=producer-3}]

-> value: "6.2.2-ccs" (String)

MetricName [name=start-time-ms, group=app-info, description=Metric 
indicating start-time-ms, tags={client-id=producer-3}]

-> value: 1651654724987 (Long)

MetricName [name=commit-id, group=app-info, description=Metric 
indicating commit-id, tags={client-id=producer-3}]

-> value: "2ceb5dc7891720b7" (String)

Problematic code part seems to be introduced with "Bump Kafka version 
to 2.8":

https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068

Is this a potential bug introduced in 1.15.0?

Best, Peter

On Wed, May 4, 2022 at 9:58 AM Peter Schrott  
wrote:


Sorry for the spamming!

Just after jumping into the debug-session I noticed that there
are indeed exceptions thrown when fetching the metrics on port 9200:

13657 INFO   [ScalaTest-run]com.sun.net.httpserver   - HttpServer created 
http0.0.0.0/0.0.0.0:9200 13658 INFO   [ScalaTest-run]com.sun.net.httpserver   - 
context created: /
13658 INFO   [ScalaTest-run]com.sun.net.httpserver   - context created: 
/metrics
13659 INFO   [ScalaTest-run]o.a.f.m.p.PrometheusReporter   - Started 
PrometheusReporter HTTP server on port9200.
13745 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
14028 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
14998 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
15580 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
16022 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
16458 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
16885 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
17381 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
17809 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
18259 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
18695 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
19159 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
19758 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
20112 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
20544 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
20989 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
21419 DEBUG  [prometheus-http-1-2]o.a.f.m.p.PrometheusReporter   - Invalid 
type for 
Gaugeorg.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3fae55e7:java.lang.String,
 only number types and booleans are supported by this reporter.
21421 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / HTTP/1.1 
 [200   OK] ()
21847 DEBUG  [prometheus-http-1-3]o.a.f.m.p.PrometheusReporter   - Invalid 
type for 
Gaugeorg.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics$$Lambda$4076/388206242@78846648:java.lang.String,
 only number types and booleans are supported by this reporter.
21851 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   
-ServerImpl.Exchange  (2)
java.lang.ClassCastException:java.lang.Long  cannot be cast 
tojava.lang.Double at

org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37)
at

org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27)
at

org.apache.flink.metrics.prometheus.AbstractPrometheusReporter$2.get(AbstractPrometheusReporter.java:262)
at io.prometheus.client.Gauge.collect(Gauge.java:317) at

Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-04 Thread Chesnay Schepler
io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:223)
at

io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:144)
at
io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22)
at

io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) at
sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83) at
com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82) at

sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) at
sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647) at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) 21851  TRACE 
[prometheus-http-1-3]com.sun.net.httpserver   - Closing 
connection:java.nio.channels.SocketChannel[connected local=/127.0.0.1:9200  
remote=/127.0.0.1:50508]


For my defence: This jul - slf4j - logback setup is really nasty :O

Best, Peter



On Wed, May 4, 2022 at 9:47 AM Peter Schrott
 wrote:

Hi Chesnay,

Thanks for that support! Just for compilation: Running the
"Problem-Job" locally as test in Intellij (as Chesney
suggested above) reproduces the described problem:

➜  ~ curl localhost:9200
curl: (52) Empty reply from server  


Doing the same with other jobs metrics are available on
localhost:9200.

One other thing I noticed yesterday in the cluster is that
job/task specific metrics are available for a very short time
after the job is started (for around a few seconds). E.g:

# HELP flink_taskmanager_job_task_backPressuredTimeMsPerSecond 
backPressuredTimeMsPerSecond (scope: taskmanager_job_task)

After all tasks are "green" in the webui, the "empty reply
from server" is back.

1)
I changed the prometheus config in my cluster, but as you
saied, it does not have any impact.

2)
For the logging in a test scenario, I also had to add the
following lines in my test class:

SLF4JBridgeHandler.removeHandlersForRootLogger()
SLF4JBridgeHandler.install()

(source:
https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html)
 As well as resetting log levels for jul in my logback.xml:


true 

This infos just for completeness, if someone else stumbles upon.

I set the following loggers to lvl TRACE:

  
 

When running the job in a local test as suggested above I get
the following log messages:

12701 INFO   [ScalaTest-run]com.sun.net.httpserver   - HttpServer 
created http0.0.0.0/0.0.0.0:9200 12703 INFO   
[ScalaTest-run]com.sun.net.httpserver   - context created: /
12703 INFO   [ScalaTest-run]com.sun.net.httpserver   - context created: 
/metrics
12704 INFO   [ScalaTest-run]o.a.f.m.p.PrometheusReporter   - Started 
PrometheusReporter HTTP server on port9200.


3)
I have not tried to reproduce in a local cluster yet, as the
issue is also reproducible in the test environment. But thanks
for the hint - could be very helpful!

 __

From the observations it does not seem like there is a problem
with the http server itself. I am just making assumptions: It
feels like there is a problem with reading and providing the
metrics. As the issue reproducible in the local setup I have
the comfy option to debug in Intellij now - I'll spend my day
with this if no other hints or ideas arise.

    Thanks & Best, Peter

On Tue, May 3, 2022 at 4:01 PM Chesnay Schepler
 wrote:

> I noticed that my config of the PrometheusReporter is
different here. I have: `metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter`. I
will investigate if this is a problem.

That's not a problem.

> Which trace logs are interesting?

The logging config I provided should highlight the
relevant bits (com.sun.net.httpserver).
At least in my local tests this is where any interesting
things were logged.
Note that this part of the code uses java.util.logging,
not slf4j/log4j.

> When running a local flink (start-cluster.sh), I do not
have a certain url/port to access the taskmanager, right?

If you configure a port range it should b

Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Chesnay Schepler
> I noticed that my config of the PrometheusReporter is different here. 
I have: `metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter`. I will 
investigate if this is a problem.


That's not a problem.

> Which trace logs are interesting?

The logging config I provided should highlight the relevant bits 
(com.sun.net.httpserver).
At least in my local tests this is where any interesting things were 
logged.

Note that this part of the code uses java.util.logging, not slf4j/log4j.

> When running a local flink (start-cluster.sh), I do not have a 
certain url/port to access the taskmanager, right?


If you configure a port range it should be as simple as curl 
localhost:.

You can find the used port in the taskmanager logs.
Or just try the first N ports in the range ;)

On 03/05/2022 14:11, Peter Schrott wrote:

Hi Chesnay,

Thanks for the code snipped. Which trace logs are interesting? Of 
"org.apache.flink.metrics.prometheus.PrometheusReporter"?
I could also add this logger settings in the environment where the 
problem is present.


Other than that, I am not sure how to reproduce this issue in a local 
setup. In the cluster where the metrics are missing I am navigating to 
the certain taskmanager and try to access the metrics via the 
configured prometheus port. When running a local flink 
(start-cluster.sh), I do not have a certain url/port to access the 
taskmanager, right?


I noticed that my config of the PrometheusReporter is different here. 
I have: `metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter`. I will 
investigate if this is a problem.


Unfortunately I can not provide my job at the moment. It 
contains business logic and it is tightly coupled with our Kafka 
systems. I will check the option of creating a sample job to reproduce 
the problem.


Best, Peter

On Tue, May 3, 2022 at 12:48 PM Chesnay Schepler  
wrote:


You'd help me out greatly if you could provide me with a sample
job that runs into the issue.

So far I wasn't able to reproduce the issue,
but it should be clear that there is some given 3 separate reports,
although it is strange that so far it was only reported for
Prometheus.

If one of you is able to reproduce the issue within a Test and is
feeling adventurous,
then you might be able to get more information by forwarding the
java.util.logging
to SLF4J. Below is some code to get you started.

DebuggingTest.java:

class DebuggingTest {

 static {
 LogManager.getLogManager().getLogger("").setLevel(Level.FINEST);
 SLF4JBridgeHandler.removeHandlersForRootLogger();
 SLF4JBridgeHandler.install();
 miniClusterExtension =
 new MiniClusterExtension(
 new MiniClusterResourceConfiguration.Builder()
 .setConfiguration(getConfiguration())
 .setNumberSlotsPerTaskManager(1)
 .build());
 }

 @RegisterExtension private static final MiniClusterExtension 
miniClusterExtension;

 private static Configuration getConfiguration() {
 final Configuration configuration = new Configuration();

 configuration.setString(
 "metrics.reporter.prom.factory.class", 
PrometheusReporterFactory.class.getName());
 configuration.setString("metrics.reporter.prom.port", "9200-9300");

 return configuration;
 }

 @Test
 void runJob() throws Exception {
 
 }
}


pom.xml:


org.slf4j
jul-to-slf4j
1.7.32

log4j2-test.properties:

rootLogger.level = off
rootLogger.appenderRef.test.ref = TestLogger

logger.http.name  <http://logger.http.name>  = com.sun.net.httpserver
logger.http.level = trace

appender.testlogger.name  <http://appender.testlogger.name>  = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote:

On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote:

Hi!

I also discovered problems with the PrometheusReporter on Flink 1.15.0,
coming from 1.14.4. I already consulted the mailing list:
https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
I have not found the underlying problem or a solution to it.

Actually, after re-checking, I see the same log WARNINGS as
ChangZhou described.

As I described, it seems to be an issue with my job. If no job, or an
example job runs on the taskmanager the basic metrics work just fine. Maybe
ChangZhou can confirm th

Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Chesnay Schepler
You'd help me out greatly if you could provide me with a sample job that 
runs into the issue.


So far I wasn't able to reproduce the issue,
but it should be clear that there is some given 3 separate reports,
although it is strange that so far it was only reported for Prometheus.

If one of you is able to reproduce the issue within a Test and is 
feeling adventurous,
then you might be able to get more information by forwarding the 
java.util.logging

to SLF4J. Below is some code to get you started.

DebuggingTest.java:

class DebuggingTest {

static {
LogManager.getLogManager().getLogger("").setLevel(Level.FINEST);
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
miniClusterExtension =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberSlotsPerTaskManager(1)
.build());
}

@RegisterExtension private static final MiniClusterExtension 
miniClusterExtension;

private static Configuration getConfiguration() {
final Configuration configuration = new Configuration();

configuration.setString(
"metrics.reporter.prom.factory.class", 
PrometheusReporterFactory.class.getName());
configuration.setString("metrics.reporter.prom.port", "9200-9300");

return configuration;
}

@Test
void runJob() throws Exception {

}
}


pom.xml:


   org.slf4j
   jul-to-slf4j
   1.7.32

log4j2-test.properties:

rootLogger.level = off
rootLogger.appenderRef.test.ref = TestLogger

logger.http.name = com.sun.net.httpserver
logger.http.level = trace

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote:

On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote:

Hi!

I also discovered problems with the PrometheusReporter on Flink 1.15.0,
coming from 1.14.4. I already consulted the mailing list:
https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
I have not found the underlying problem or a solution to it.

Actually, after re-checking, I see the same log WARNINGS as
ChangZhou described.

As I described, it seems to be an issue with my job. If no job, or an
example job runs on the taskmanager the basic metrics work just fine. Maybe
ChangZhou can confirm this?

@ChangZhou what's your job setup? I am running a streaming SQL job, but
also using data streams API to create the streaming environment and from
that the table environment and finally using a StatementSet to execute
multiple SQL statements in one job.


We are running a streaming application with low level API with
Kubernetes operator FlinkDeployment.




Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Chesnay Schepler

Is there any warning in the logs containing "Error while handling metric"?

On 03/05/2022 10:18, ChangZhuo Chen (陳昌倬) wrote:

On Tue, May 03, 2022 at 01:00:42AM -0700, Mason Chen wrote:

Hi ChangZhou,

The warning log indicates that the metric was previously defined and so the
runtime is handling the "duplicate" metric by ignoring it. This is
typically a benign message unless you rely on this metric. Is it possible
that you are using the same task name for different tasks? It would be
defined by the `.name(...)` API in your job graph instantiation.

Can you clarify what it means that your endpoint isn't working--some
metrics missing, endpoint is timing out, etc.? Also, can you confirm from
logs that the PrometheusReporter was created properly?

Endpoint isn't working means we got empty reply from Prometheus
endpoint. The following is our testing for taskmanager Prometheus
endpoint.

 curl localhost:9249
 curl: (52) Empty reply from server

We have the following log in taskmanager, so PrometheusReporter was
created properly.

 2022-05-03 01:48:16,678 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: metrics.reporter.prom.class, 
org.apache.flink.metrics.prometheus.PrometheusReporter
 ...
 2022-05-03 01:48:23,665 INFO  
org.apache.flink.metrics.prometheus.PrometheusReporter   [] - Started 
PrometheusReporter HTTP server on port 9249.
 2022-05-03 01:48:23,669 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl  [] - Reporting 
metrics for reporter prom of type 
org.apache.flink.metrics.prometheus.PrometheusReporter.






Re: How to debug Metaspace exception?

2022-05-02 Thread Chesnay Schepler

There are cases where user-code is run on the JobManager.
I'm not sure whether though that applies to the JDBC sources.

On 02/05/2022 15:45, John Smith wrote:

Why do the JDBC jars need to be on the job manager node though?

On Mon, May 2, 2022 at 9:36 AM Chesnay Schepler  
wrote:


yes.
But if you can ensure that the driver isn't bundled by any
user-jar you can also skip the pattern configuration step.

The pattern looks correct formatting-wise; you could try whether
com.microsoft.sqlserver.jdbc. is enough to solve the issue.

On 02/05/2022 14:41, John Smith wrote:

Oh, so I should copy the jars to the lib folder and
set classloader.parent-first-patterns.additional:
"org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the
task managers and job managers?

Also is my pattern correct?
"org.apache.ignite.;com.microsoft.sqlserver.jdbc."

Just to be sure I'm running a standalone cluster using zookeeper.
So I have 3 zookeepers, 3 job managers and 3 task managers.


On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler
 wrote:

And you do should make sure that it is set for both processes!

On 02/05/2022 08:43, Chesnay Schepler wrote:

The setting itself isn't taskmanager specific; it applies to
both the job- and taskmanager process.

On 02/05/2022 05:29, John Smith wrote:

Also just to be sure this is a Task Manager setting right?

On Thu, Apr 28, 2022 at 11:13 AM John Smith
 wrote:

I assume you will take action on your side to track and
fix the doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith
 wrote:

Ok so to summarize...

- Build my job jar and have the JDBC driver as a
compile only dependency and copy the JDBC driver to
flink lib folder.

Or

- Build my job jar and include JDBC driver in the
shadow, plus copy the JDBC driver in the flink lib
folder, plus  make an entry in config for
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>


On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler
 wrote:

I think what I meant was "either add it to
/lib, or [if it is already in /lib but also
bundled in the jar] add it to the parent-first
patterns."

        On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly
documented it incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references
outside the user code classloader. To
ensure that these classes are only loaded
once you should either add the driver
jars to Flink’s |lib/| folder, or add the
driver classes to the list of
parent-first loaded class via
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>.

It says either or


        On Wed, Apr 27, 2022 at 3:44 AM Chesnay
Schepler  wrote:

You're misinterpreting the docs.

The parent/child-first classloading
controls where Flink looks for a class
/first/, specifically whether we first
load from /lib or the user-jar.
It does not allow you to load something
from the user-jar in the parent
classloader. That's just not how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task
manager lib folder or use
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>

I prefer the latter like this: the
dependency stays with the user-jar and

Re: How to debug Metaspace exception?

2022-05-02 Thread Chesnay Schepler

yes.
But if you can ensure that the driver isn't bundled by any user-jar you 
can also skip the pattern configuration step.


The pattern looks correct formatting-wise; you could try whether 
com.microsoft.sqlserver.jdbc. is enough to solve the issue.


On 02/05/2022 14:41, John Smith wrote:
Oh, so I should copy the jars to the lib folder and 
set classloader.parent-first-patterns.additional: 
"org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the task 
managers and job managers?


Also is my pattern correct? 
"org.apache.ignite.;com.microsoft.sqlserver.jdbc."


Just to be sure I'm running a standalone cluster using zookeeper. So I 
have 3 zookeepers, 3 job managers and 3 task managers.



On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler  
wrote:


And you do should make sure that it is set for both processes!

On 02/05/2022 08:43, Chesnay Schepler wrote:

The setting itself isn't taskmanager specific; it applies to both
the job- and taskmanager process.

On 02/05/2022 05:29, John Smith wrote:

Also just to be sure this is a Task Manager setting right?

On Thu, Apr 28, 2022 at 11:13 AM John Smith
 wrote:

I assume you will take action on your side to track and fix
the doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith
 wrote:

Ok so to summarize...

- Build my job jar and have the JDBC driver as a compile
only dependency and copy the JDBC driver to flink lib
folder.

Or

- Build my job jar and include JDBC driver in the
shadow, plus copy the JDBC driver in the flink lib
folder, plus  make an entry in config for
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>


On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler
 wrote:

I think what I meant was "either add it to /lib, or
[if it is already in /lib but also bundled in the
jar] add it to the parent-first patterns."

        On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly documented it
incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside
the user code classloader. To ensure that
these classes are only loaded once you should
either add the driver jars to Flink’s
|lib/| folder, or add the driver classes to
the list of parent-first loaded class via
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>.

It says either or


        On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler
 wrote:

You're misinterpreting the docs.

The parent/child-first classloading controls
where Flink looks for a class /first/,
specifically whether we first load from /lib
or the user-jar.
It does not allow you to load something from
the user-jar in the parent classloader. That's
just not how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager
lib folder or use
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>

I prefer the latter like this: the
dependency stays with the user-jar and not on
the task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and
my Microsoft drivers in the lib folders
of my task managers?

And then in my job jar only include them
as compile time dependencies?


    On Tue, Apr 26, 2022 at 10:42 AM Chesnay
Schepler  wrote:

JDBC drivers are well-known for
  

Re: How to debug Metaspace exception?

2022-05-02 Thread Chesnay Schepler

And you do should make sure that it is set for both processes!

On 02/05/2022 08:43, Chesnay Schepler wrote:
The setting itself isn't taskmanager specific; it applies to both the 
job- and taskmanager process.


On 02/05/2022 05:29, John Smith wrote:

Also just to be sure this is a Task Manager setting right?

On Thu, Apr 28, 2022 at 11:13 AM John Smith  
wrote:


I assume you will take action on your side to track and fix the
doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith
 wrote:

Ok so to summarize...

- Build my job jar and have the JDBC driver as a compile only
dependency and copy the JDBC driver to flink lib folder.

Or

- Build my job jar and include JDBC driver in the shadow,
plus copy the JDBC driver in the flink lib folder, plus  make
an entry in config for
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>


On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler
 wrote:

I think what I meant was "either add it to /lib, or [if
it is already in /lib but also bundled in the jar] add it
to the parent-first patterns."

On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly documented it
incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside the
user code classloader. To ensure that these classes
are only loaded once you should either add the
driver jars to Flink’s |lib/| folder, or add the
driver classes to the list of parent-first loaded
class via
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler
 wrote:

You're misinterpreting the docs.

The parent/child-first classloading controls where
Flink looks for a class /first/, specifically
whether we first load from /lib or the user-jar.
It does not allow you to load something from the
user-jar in the parent classloader. That's just not
how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager lib
folder or use
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>

I prefer the latter like this: the
dependency stays with the user-jar and not on the
task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and my
Microsoft drivers in the lib folders of my
task managers?

And then in my job jar only include them as
compile time dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay
Schepler  wrote:

JDBC drivers are well-known for leaking
classloaders unfortunately.

You have correctly identified your
alternatives.

You must put the jdbc driver into /lib
instead. Setting only the parent-first
pattern shouldn't affect anything.
That is only relevant if something is in
both in /lib and the user-jar, telling
Flink to prioritize what is in lib.



On 26/04/2022 15:35, John Smith wrote:

So I
put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config
and so far I don't think I'm getting
"java.lang.OutOfMemoryError: Metaspace"
any more.

Or it's too early to tell.

Though now, the task managers are
shutting down due to some other failures.

   

Re: How to debug Metaspace exception?

2022-05-02 Thread Chesnay Schepler
The setting itself isn't taskmanager specific; it applies to both the 
job- and taskmanager process.


On 02/05/2022 05:29, John Smith wrote:

Also just to be sure this is a Task Manager setting right?

On Thu, Apr 28, 2022 at 11:13 AM John Smith  
wrote:


I assume you will take action on your side to track and fix the
doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith
 wrote:

Ok so to summarize...

- Build my job jar and have the JDBC driver as a compile only
dependency and copy the JDBC driver to flink lib folder.

Or

- Build my job jar and include JDBC driver in the shadow, plus
copy the JDBC driver in the flink lib folder, plus  make an
entry in config for
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>


On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler
 wrote:

I think what I meant was "either add it to /lib, or [if it
is already in /lib but also bundled in the jar] add it to
the parent-first patterns."

On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly documented it
incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside the
user code classloader. To ensure that these classes
are only loaded once you should either add the
driver jars to Flink’s |lib/| folder, or add the
driver classes to the list of parent-first loaded
class via
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler
 wrote:

You're misinterpreting the docs.

The parent/child-first classloading controls where
Flink looks for a class /first/, specifically
whether we first load from /lib or the user-jar.
It does not allow you to load something from the
user-jar in the parent classloader. That's just not
how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager lib
folder or use
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>

I prefer the latter like this: the dependency stays
with the user-jar and not on the task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and my
Microsoft drivers in the lib folders of my task
managers?

And then in my job jar only include them as
compile time dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay
Schepler  wrote:

JDBC drivers are well-known for leaking
classloaders unfortunately.

You have correctly identified your
alternatives.

You must put the jdbc driver into /lib
instead. Setting only the parent-first
pattern shouldn't affect anything.
That is only relevant if something is in
both in /lib and the user-jar, telling
Flink to prioritize what is in lib.



On 26/04/2022 15:35, John Smith wrote:

So I
put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config
and so far I don't think I'm getting
"java.lang.OutOfMemoryError: Metaspace"
any more.

Or it's too early to tell.

Though now, the task managers are shutting
down due to some other failures.

So maybe because tasks were failing and
reloading often the task manager was

Re: How to debug Metaspace exception?

2022-04-28 Thread Chesnay Schepler
I think what I meant was "either add it to /lib, or [if it is already in 
/lib but also bundled in the jar] add it to the parent-first patterns."


On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly documented it incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside the user code
classloader. To ensure that these classes are only loaded once
you should either add the driver jars to Flink’s |lib/| folder,
or add the driver classes to the list of parent-first loaded
class via |classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler  
wrote:


You're misinterpreting the docs.

The parent/child-first classloading controls where Flink looks
for a class /first/, specifically whether we first load from /lib
or the user-jar.
It does not allow you to load something from the user-jar in the
parent classloader. That's just not how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager lib folder or use
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>

I prefer the latter like this: the dependency stays with the
user-jar and not on the task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and my Microsoft
drivers in the lib folders of my task managers?

And then in my job jar only include them as compile time
dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler
 wrote:

JDBC drivers are well-known for leaking classloaders
unfortunately.

You have correctly identified your alternatives.

You must put the jdbc driver into /lib instead. Setting
only the parent-first pattern shouldn't affect anything.
That is only relevant if something is in both in /lib
and the user-jar, telling Flink to prioritize what is in
lib.



On 26/04/2022 15:35, John Smith wrote:

So I put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config and so far I
don't think I'm getting "java.lang.OutOfMemoryError:
Metaspace" any more.

Or it's too early to tell.

Though now, the task managers are shutting down due to
some other failures.

So maybe because tasks were failing and reloading often
the task manager was running out of Metspace. But now
maybe it's just cleanly shutting down.

On Wed, Apr 20, 2022 at 11:35 AM John Smith
 wrote:

Or I can put in the config to treat
org.apache.ignite. classes as first class?

On Tue, Apr 19, 2022 at 10:18 PM John Smith
 wrote:

Ok, so I loaded the dump into Eclipse Mat and
followed:

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

- On the Histogram, I got over 30 entries for:
ChildFirstClassLoader
- Then I clicked on one of them "Merge Shortest
Path..." and picked "Exclude all
phantom/weak/soft references"
- Which then gave me: SqlDriverManager > Apache
Ignite JdbcThin Driver

So i'm guessing anything JDBC based. I should
copy into the task manager libs folder and my
jobs make the dependencies as compile only?

On Tue, Apr 19, 2022 at 12:18 PM Yaroslav
Tkachenko  wrote:

Also

https://shopify.engineering/optimizing-apache-flink-applications-tips
might be helpful (has a section on
profiling, as well as classloading).

    On Tue, Apr 19, 2022 at 4:35 AM Chesnay
Schepler  wrote:

We have a very rough "guide" in the
wiki (it's just the specific steps I
took to debug another leak):

https://cwiki.apache.org/confluence/display/FLINK/Debugging+C

Re: How to debug Metaspace exception?

2022-04-28 Thread Chesnay Schepler

Pretty sure, even though I seemingly documented it incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside the user code
classloader. To ensure that these classes are only loaded once you
should either add the driver jars to Flink’s |lib/| folder, or add
the driver classes to the list of parent-first loaded class via
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler  
wrote:


You're misinterpreting the docs.

The parent/child-first classloading controls where Flink looks for
a class /first/, specifically whether we first load from /lib or
the user-jar.
It does not allow you to load something from the user-jar in the
parent classloader. That's just not how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager lib folder or use
|classloader.parent-first-patterns-additional|

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>

I prefer the latter like this: the dependency stays with the
user-jar and not on the task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and my Microsoft drivers
in the lib folders of my task managers?

And then in my job jar only include them as compile time
dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler
 wrote:

JDBC drivers are well-known for leaking classloaders
unfortunately.

You have correctly identified your alternatives.

You must put the jdbc driver into /lib instead. Setting
only the parent-first pattern shouldn't affect anything.
That is only relevant if something is in both in /lib and
the user-jar, telling Flink to prioritize what is in lib.



On 26/04/2022 15:35, John Smith wrote:

So I put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config and so far I
don't think I'm getting "java.lang.OutOfMemoryError:
Metaspace" any more.

Or it's too early to tell.

Though now, the task managers are shutting down due to
some other failures.

So maybe because tasks were failing and reloading often
the task manager was running out of Metspace. But now
maybe it's just cleanly shutting down.

On Wed, Apr 20, 2022 at 11:35 AM John Smith
 wrote:

Or I can put in the config to treat
org.apache.ignite. classes as first class?

On Tue, Apr 19, 2022 at 10:18 PM John Smith
 wrote:

Ok, so I loaded the dump into Eclipse Mat and
followed:

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

- On the Histogram, I got over 30 entries for:
ChildFirstClassLoader
- Then I clicked on one of them "Merge Shortest
Path..." and picked "Exclude all
phantom/weak/soft references"
- Which then gave me: SqlDriverManager > Apache
Ignite JdbcThin Driver

So i'm guessing anything JDBC based. I should
copy into the task manager libs folder and my
jobs make the dependencies as compile only?

On Tue, Apr 19, 2022 at 12:18 PM Yaroslav
Tkachenko  wrote:

Also

https://shopify.engineering/optimizing-apache-flink-applications-tips
might be helpful (has a section on
profiling, as well as classloading).

    On Tue, Apr 19, 2022 at 4:35 AM Chesnay
Schepler  wrote:

We have a very rough "guide" in the wiki
(it's just the specific steps I took to
debug another leak):

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

On 19/04/2022 12:01, huweihua wrote:

Hi, John

Sorry for the late reply. You can use
 

Re: How to debug Metaspace exception?

2022-04-27 Thread Chesnay Schepler

You're misinterpreting the docs.

The parent/child-first classloading controls where Flink looks for a 
class /first/, specifically whether we first load from /lib or the user-jar.
It does not allow you to load something from the user-jar in the parent 
classloader. That's just not how it works.


It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:
Hi Chesnay as per the docs... 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/


You can either put the jars in task manager lib folder or use 
|classloader.parent-first-patterns-additional| 
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>


I prefer the latter like this: the dependency stays with the user-jar 
and not on the task manager.


On Tue, Apr 26, 2022 at 9:52 PM John Smith  wrote:

Ok so I should put the Apache ignite and my Microsoft drivers in
the lib folders of my task managers?

And then in my job jar only include them as compile time
dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler
 wrote:

JDBC drivers are well-known for leaking classloaders
unfortunately.

You have correctly identified your alternatives.

You must put the jdbc driver into /lib instead. Setting only
the parent-first pattern shouldn't affect anything.
That is only relevant if something is in both in /lib and the
user-jar, telling Flink to prioritize what is in lib.



On 26/04/2022 15:35, John Smith wrote:

So I put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config and so far I don't
think I'm getting "java.lang.OutOfMemoryError: Metaspace" any
more.

Or it's too early to tell.

Though now, the task managers are shutting down due to some
other failures.

So maybe because tasks were failing and reloading often the
task manager was running out of Metspace. But now maybe it's
just cleanly shutting down.

On Wed, Apr 20, 2022 at 11:35 AM John Smith
 wrote:

Or I can put in the config to treat org.apache.ignite.
classes as first class?

On Tue, Apr 19, 2022 at 10:18 PM John Smith
 wrote:

Ok, so I loaded the dump into Eclipse Mat and
followed:

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

- On the Histogram, I got over 30 entries for:
ChildFirstClassLoader
- Then I clicked on one of them "Merge Shortest
Path..." and picked "Exclude all phantom/weak/soft
references"
- Which then gave me: SqlDriverManager > Apache
Ignite JdbcThin Driver

So i'm guessing anything JDBC based. I should copy
into the task manager libs folder and my jobs make
the dependencies as compile only?

On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko
 wrote:

Also

https://shopify.engineering/optimizing-apache-flink-applications-tips
might be helpful (has a section on profiling, as
well as classloading).

        On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler
 wrote:

We have a very rough "guide" in the wiki
(it's just the specific steps I took to debug
another leak):

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

On 19/04/2022 12:01, huweihua wrote:

Hi, John

Sorry for the late reply. You can use MAT[1]
to analyze the dump file. Check whether have
too many loaded classes.

[1] https://www.eclipse.org/mat/


2022年4月18日 下午9:55,John Smith
 写道:

Hi, can anyone help with this? I never
looked at a dump file before.

On Thu, Apr 14, 2022 at 11:59 AM John Smith
 wrote:

Hi, so I have a dump file. What do I
look for?

On Thu, Mar 31, 2022 at 3:28 PM John
Smith  wrote:

Ok so if there's a leak, if I
manually stop the job and restart
it from the UI multiple times, I
won't see the issue because because
   

Re: How to debug Metaspace exception?

2022-04-26 Thread Chesnay Schepler

JDBC drivers are well-known for leaking classloaders unfortunately.

You have correctly identified your alternatives.

You must put the jdbc driver into /lib instead. Setting only the 
parent-first pattern shouldn't affect anything.
That is only relevant if something is in both in /lib and the user-jar, 
telling Flink to prioritize what is in lib.




On 26/04/2022 15:35, John Smith wrote:
So I put classloader.parent-first-patterns.additional: 
"org.apache.ignite." in the task config and so far I don't think I'm 
getting "java.lang.OutOfMemoryError: Metaspace" any more.


Or it's too early to tell.

Though now, the task managers are shutting down due to some 
other failures.


So maybe because tasks were failing and reloading often the task 
manager was running out of Metspace. But now maybe it's just 
cleanly shutting down.


On Wed, Apr 20, 2022 at 11:35 AM John Smith  
wrote:


Or I can put in the config to treat org.apache.ignite. classes as
first class?

On Tue, Apr 19, 2022 at 10:18 PM John Smith
 wrote:

Ok, so I loaded the dump into Eclipse Mat and followed:

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

- On the Histogram, I got over 30 entries for:
ChildFirstClassLoader
- Then I clicked on one of them "Merge Shortest Path..." and
picked "Exclude all phantom/weak/soft references"
- Which then gave me: SqlDriverManager > Apache Ignite
JdbcThin Driver

So i'm guessing anything JDBC based. I should copy into the
task manager libs folder and my jobs make the dependencies as
compile only?

On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko
 wrote:

Also

https://shopify.engineering/optimizing-apache-flink-applications-tips
might be helpful (has a section on profiling, as well as
classloading).

        On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler
 wrote:

We have a very rough "guide" in the wiki (it's just
the specific steps I took to debug another leak):

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

On 19/04/2022 12:01, huweihua wrote:

Hi, John

Sorry for the late reply. You can use MAT[1] to
analyze the dump file. Check whether have too many
loaded classes.

[1] https://www.eclipse.org/mat/


2022年4月18日 下午9:55,John Smith
 写道:

Hi, can anyone help with this? I never looked at a
dump file before.

On Thu, Apr 14, 2022 at 11:59 AM John Smith
 wrote:

Hi, so I have a dump file. What do I look for?

On Thu, Mar 31, 2022 at 3:28 PM John Smith
 wrote:

Ok so if there's a leak, if I manually stop
the job and restart it from the UI multiple
times, I won't see the issue because because
the classes are unloaded correctly?


On Thu, Mar 31, 2022 at 9:20 AM huweihua
 wrote:


The difference is that manually
canceling the job stops the JobMaster,
but automatic failover keeps the
JobMaster running. But looking on
TaskManager, it doesn't make much difference



2022年3月31日 上午4:01,John Smith
 写道:

Also if I manually cancel and restart
the same job over and over is it the
same as if flink was restarting a job
due to failure?

I.e: When I click "Cancel Job" on the
UI is the job completely unloaded vs
when the job scheduler restarts a job
because if whatever reason?

Lile this I'll stop and restart the job
a few times or maybe I can trick my job
to fail and have the scheduler restart
it. Ok let me think about this...

On Wed, Mar 30, 2022 at 10:24 AM 胡伟华
 wrote:


So if I run the same jobs in my
dev env will I still be able to
see the similar dump?

I think running the same job in dev
should be reprodu

Re: Problems with PrometheusReporter

2022-04-21 Thread Chesnay Schepler
Please check the logs for warnings. It could be that a metric registered 
by a job is throwing exceptions.


On 20/04/2022 18:45, Peter Schrott wrote:

Hi kuweiha,

Just to confirm, you tried with 1.15 - none of the rcs are working for me?

This port is definitely free as it was already used on the same hosts 
with Flink 1.14.4. And as I said, when no job is running on the 
taskmanager it actually reports metrics on that certain port - I only 
get the "empty response" when a job is running on the taskmanager I am 
querying. Did you also run a job and could you access metrics like 
flink_taskmanager_job_*?


The logs only tell me that everything is working fine:
2022-04-20 13:46:39,597 INFO  [main] 
o.a.f.r.metrics.MetricRegistryImpl:? - Reporting metrics for reporter 
prom of type org.apache.flink.metrics.prometheus.PrometheusReporter.

and
2022-04-20 12:12:26,394 INFO  [main] o.a.f.m.p.PrometheusReporter:? - 
Started PrometheusReporter HTTP server on port 


Best & thanks,
Peter


On Wed, Apr 20, 2022 at 6:30 PM huweihua  wrote:

Hi, Peter
I have not been able to reproduce this problem.

From your description, it is possible that the specified port 
has been listened by other processes, and PrometheusReporter
failed to start.
You can confirm it from taskmanager.log, or check if port  of
the host is being listened by the TaskManager process.



2022年4月20日 下午10:48,Peter Schrott  写道:

Hi Flink-Users,

After upgrading to Flink 1.15 (rc3) (coming from 1.14) I noticed
that there is a problem with the metrics exposed through the
PrometheusReporter.

It is configured as followed in the flink-config.yml:
metrics.reporters: prom
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 

My cluster is running in standalone mode with 2 taskmanagers and
2 jobmanagers.

More specifically:

On the taskmanger that runs a job I get curl: (52) Empty reply
from server when I call curl localhost:. I was looking for
the metrics in the namespace flink_taskmanager_job_*, which are
only - and obviously - exposed on the taskmanager running a job.

On the other taskmanger that runs no job I get a response with a
couple of metrics of the namespace flink_taskmanager_Status- as
expected.

When configuring the JMXReporterFactory for too. I find the
desired and all other metrics via VisualVM on that
taskmanager running the job. Also in the Flink web ui, in the
"Jobs -> Overview -> Metrics" part I can select and visualize
metrics like flink_taskmanager_job_task_busyTimeMsPerSecond.

Does someone have any idea what's going on here? maybe even
confirm my findings?

Best & thanks,
Peter





Re: How to debug Metaspace exception?

2022-04-19 Thread Chesnay Schepler
We have a very rough "guide" in the wiki (it's just the specific steps I 
took to debug another leak):

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

On 19/04/2022 12:01, huweihua wrote:

Hi, John

Sorry for the late reply. You can use MAT[1] to analyze the dump file. 
Check whether have too many loaded classes.


[1] https://www.eclipse.org/mat/


2022年4月18日 下午9:55,John Smith  写道:

Hi, can anyone help with this? I never looked at a dump file before.

On Thu, Apr 14, 2022 at 11:59 AM John Smith  
wrote:


Hi, so I have a dump file. What do I look for?

On Thu, Mar 31, 2022 at 3:28 PM John Smith
 wrote:

Ok so if there's a leak, if I manually stop the job and
restart it from the UI multiple times, I won't see the issue
because because the classes are unloaded correctly?


On Thu, Mar 31, 2022 at 9:20 AM huweihua
 wrote:


The difference is that manually canceling the job stops
the JobMaster, but automatic failover keeps the JobMaster
running. But looking on TaskManager, it doesn't make much
difference



2022年3月31日 上午4:01,John Smith 
写道:

Also if I manually cancel and restart the same job over
and over is it the same as if flink was restarting a job
due to failure?

I.e: When I click "Cancel Job" on the UI is the job
completely unloaded vs when the job scheduler restarts a
job because if whatever reason?

Lile this I'll stop and restart the job a few times or
maybe I can trick my job to fail and have the scheduler
restart it. Ok let me think about this...

On Wed, Mar 30, 2022 at 10:24 AM 胡伟华
 wrote:


So if I run the same jobs in my dev env will I
still be able to see the similar dump?

I think running the same job in dev should be
reproducible, maybe you can have a try.


 If not I would have to wait at a low volume time
to do it on production. Aldo if I recall the dump
is as big as the JVM memory right so if I have 10GB
configed for the JVM the dump will be 10GB file?

Yes, JMAP will pause the JVM, the time of pause
depends on the size to dump. you can use "jmap
-dump:live" to dump only the reachable objects, this
will take a brief pause




2022年3月30日 下午9:47,John Smith
 写道:

I have 3 task managers (see config below). There is
total of 10 jobs with 25 slots being used.
The jobs are 100% ETL I.e; They load Json,
transform it and push it to JDBC, only 1 job of the
10 is pushing to Apache Ignite cluster.

FOR JMAP. I know that it will pause the task
manager. So if I run the same jobs in my dev env
will I still be able to see the similar dump? I I
assume so. If not I would have to wait at a low
volume time to do it on production. Aldo if I
recall the dump is as big as the JVM memory right
so if I have 10GB configed for the JVM the dump
will be 10GB file?


# Operating system has 16GB total.
env.ssh.opts: -l flink -oStrictHostKeyChecking=no

cluster.evenly-spread-out-slots: true

taskmanager.memory.flink.size: 10240m
taskmanager.memory.jvm-metaspace.size: 2048m
taskmanager.numberOfTaskSlots: 16
parallelism.default: 1

high-availability: zookeeper
high-availability.storageDir:
file:///mnt/flink/ha/flink_1_14/
high-availability.zookeeper.quorum: ...
high-availability.zookeeper.path.root: /flink_1_14
high-availability.cluster-id: /flink_1_14_cluster_0001

web.upload.dir: /mnt/flink/uploads/flink_1_14

state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir:
file:///mnt/flink/checkpoints/flink_1_14
state.savepoints.dir:
file:///mnt/flink/savepoints/flink_1_14

On Wed, Mar 30, 2022 at 2:16 AM 胡伟华
 wrote:

Hi, John

Could you tell us you application scenario? Is
it a flink session cluster with a lot of jobs?

Maybe you can try to dump the memory with jmap
and use tools such as MAT to analyze whether
there are abnormal classes and classloaders


> 2022年3月30日 上午6:09,John Smith
 写道:
 

Re: Broadcast state corrupted ?

2022-04-13 Thread Chesnay Schepler
Is the failing environment using Azure, or MinIO? Which Flink filesystem 
did you use?
Where there any errors in the job that took this savepoint? How was the 
cluster/job shut down?
Does this happen reliably in the 1 environment, or only once? (did you 
try to reproduce it?)


AFAIK sequences of AAA corresponding to NUL bytes.

I'm wondering if that would mean that part of the data wasn't written 
properly.

Currently my theory is that either:
a) some data wasn't flushed to the file (i.e., the savepoint was never 
completely on disk)
b) some user operation (like copying to another directory) corrupted the 
file

c) some hardware issue corrupted the file.

On 13/04/2022 16:50, Alexey Trenikhun wrote:
Any suggestions how to troubleshoot the issue? I still can reproduce 
the problem in environment A


Thanks,
Alexey

*From:* Alexey Trenikhun 
*Sent:* Tuesday, April 12, 2022 7:10:17 AM
*To:* Chesnay Schepler ; Flink User Mail List 


*Subject:* Re: Broadcast state corrupted ?
I’ve tried to restore job in environment A (where we observe problem) 
from savepoint taken in environment B - restored fine. So looks 
something in environment A corrupts savepoint.


*From:* Alexey Trenikhun 
*Sent:* Monday, April 11, 2022 7:10:51 AM
*To:* Chesnay Schepler ; Flink User Mail List 


*Subject:* Re: Broadcast state corrupted ?
I didn’t try same savepoint cross environments. Operator with 
broadcast state was added recently,  I rolled back all environments, 
created save points with old version, upgraded to version with 
broadcast state, all 4 were upgraded fine, took savepoints in each 
environment and tried to restore from them, 3 restored and 4th failed 
(same environment as original failure). Two environments are deployed 
in Azure AKS and using Azure Blob Storage, two other are local and use 
MinIO. Failure happens in one of local environments.


*From:* Chesnay Schepler 
*Sent:* Monday, April 11, 2022 2:28:48 AM
*To:* Alexey Trenikhun ; Flink User Mail List 


*Subject:* Re: Broadcast state corrupted ?
Am I understanding things correctly in that the same savepoint cannot 
be restored from in 1 environment, while it works fine in 3 others?

If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:

Hello,
We have KeyedBroadcastProcessFunction with broadcast 
state MapStateDescriptor, where 
PbCfgTenantDictionary is Protobuf type, for which we 
custom TypeInformation/TypeSerializer. In one of environment, we 
can't restore job from savepoint because seems state data is 
corrupted. I've added to logging to TypeSerializer :


public void serialize(T t, DataOutputView dataOutputView) throws 
IOException {

    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser parser = 
Unchecked.cast(prototype.getParserForType());

    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", 
data.length);

      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but 
data is different, after 4980 base64 characters (3735 bytes) there 
are only A=

Strangely but the problem effects only 1 environment of 4 I've tried





Re: Discuss making KafkaSubscriber Public

2022-04-13 Thread Chesnay Schepler
Could you expand a bit on possible alternative implementations that 
require this interface to become public, opposed to providing more 
built-in ways to subscribe?


On 13/04/2022 11:26, Qingsheng Ren wrote:

Thanks for the proposal Mason! I think exposing `KafkaSubscriber` as public API 
is helpful for users to implement more complex subscription logics.

+1 (non-binding)

Cheers,

Qingsheng


On Apr 12, 2022, at 11:46, Mason Chen  wrote:

Hi Flink Devs,

I was looking to contribute to 
https://issues.apache.org/jira/browse/FLINK-24660, which is a ticket to track 
changing the KafkaSubscriber from Internal to PublicEvolving.

In the PR, it seems a few of us have agreement on making the subscriber 
pluggable in the KafkaSource, but I'd like to raise the question nevertheless. 
Furthermore, there is also interest from various Flink mailing threads and on 
the Jira ticket itself for the ticket, so I think the change would be 
beneficial to the users. There is already some feedback to make the contract of 
handling removed splits by the KafkaSource and subscriber clearer in the docs.

I have yet to address all the PR feedback, but does anyone have any concerns 
before I proceed further?

Best,
Mason





Re: Broadcast state corrupted ?

2022-04-11 Thread Chesnay Schepler
Am I understanding things correctly in that the same savepoint cannot be 
restored from in 1 environment, while it works fine in 3 others?

If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:

Hello,
We have KeyedBroadcastProcessFunction with broadcast 
state MapStateDescriptor, where 
PbCfgTenantDictionary is Protobuf type, for which we 
custom TypeInformation/TypeSerializer. In one of environment, we can't 
restore job from savepoint because seems state data is corrupted. I've 
added to logging to TypeSerializer :


public void serialize(T t, DataOutputView dataOutputView) throws 
IOException {

    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser parser = 
Unchecked.cast(prototype.getParserForType());

    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but 
data is different, after 4980 base64 characters (3735 bytes) there are 
only A=

Strangely but the problem effects only 1 environment of 4 I've tried




Re: Unsubscribe

2022-04-11 Thread Chesnay Schepler

To unsubscribe you need to send a mail to user-unsubscr...@flink.apache.org.


Re: Unsubscribed

2022-04-11 Thread Chesnay Schepler

To unsubscribe you need to send a mail to user-unsubscr...@flink.apache.org.


Re: Issue with Flink UI for Flink 1.14.0

2022-03-18 Thread Chesnay Schepler

That issues tracked under https://issues.apache.org/jira/browse/FLINK-25904.
We don't yet know the cause.

On 18/03/2022 13:37, Peter Westermann wrote:


Just started testing Flink 1.14.4 since that fixes FLINK-25732 … and I 
am now running into another UI issue. On the jobmanager that is not 
currently the active jobmanager, the checkpoints tab is not working. 
It just displays “No Data“, when I use the corresponding API 
/jobs/{id}/checkpoints, I get the following error:


{"errors":["Internal server error.","side:\norg.apache.commons.math3.exception.NullArgumentException: input 
array\n\tat 
org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650)\n\tat 
org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158)\n\tat 
org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272)\n\tat 
org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241)\n\tat 
org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:158)\n\tat 
org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:52)\n\tat 
org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108)\n\tat 
org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81)\n\tat 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129)\n\tat 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84)\n\tat 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58)\n\tat 
org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68)\n\tat 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87)\n\tat 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)\n\tat 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)\n\tat 
util.TokenAwareRunnable.run(TokenAwareRunnable.java:28)\n\tat 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat 
java.base/java.lang.Thread.run(Thread.java:834)\n\nEnd of exception on 
server side>"]}


Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com <mailto:peter.westerm...@genesys.com>

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 <http://www.genesys.com/>

*From: *Chesnay Schepler 
*Date: *Friday, January 21, 2022 at 3:28 AM
*To: *Peter Westermann , Dawid Wysakowicz 
, user@flink.apache.org 

*Subject: *Re: Issue with Flink UI for Flink 1.14.0

While FLINK-24550 was indeed fixed unfortunately a similar bug was 
also introduced (https://issues.apache.org/jira/browse/FLINK-25732).


On 20/01/2022 21:18, Peter Westermann wrote:

Just tried this again with Flink 1.14.3 since
https://issues.apache.org/jira/browse/FLINK-24550 is listed as
fixed. I am running into similar errors when calling the
/v1/jobs/overview endpoint (without any running jobs):

{"errors":["Internal server error.",""]}

Peter Westermann

Team Lead – Realtime Analytics

peter.westerm...@genesys.com <mailto:peter.westerm...@genesys.com>

<http://www.genesys.com/>

*From: *Dawid Wysakowicz 
<mailto:dwysakow...@apache.org>
*Date: *Thursday, October 14, 2021 at 10:00 AM
*To: *Peter Westermann 
<mailto:no.westerm...@genesys.com>, user@flink.apache.org
 <mailto:user@flink.apache.org>
*Subject: *Re: Issue with Flink UI for Flink 1.14.0

I am afraid it is a bug in flink 1.14. I created a ticket for it
FLINK-24550[1]. I believe we should pick it up soonish. Thanks for
reporting the issue!

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-24550

On 13/10/2021 20:32, Peter Westermann wrote:

Hello,

I just started testing Flink 1.14.0 and noticed some weird
behavior. This i

Re: scala shell not part of 1.14.4 download

2022-03-18 Thread Chesnay Schepler
The Scala Shell only works with Scala 2.11. You will need to use the 
Scala 2.11 Flink distribution.


On 18/03/2022 12:42, Georg Heiler wrote:

Hi,

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/repls/scala_shell/ 
mentions:


|bin/start-scala-shell.sh local |
|a script to start a scala REPL shell. |
|But the download for Flink 
https://www.apache.org/dyn/closer.lua/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz 
|

|Does not seem to include this script anymore. |
|Am I missing something? |
|How can I still start a scala repl? |
|Best, |
|Georg |




Re: Using another FileSystem configuration while creating a job

2022-03-10 Thread Chesnay Schepler
> if we want to use two sets of credentials, for example to access two 
different AWS buckets, that would not be feasible at the moment?


That is correct.

> As it seems that this limitation is quite an important one, is there 
a place where we can find this documented?


I don't think it is explicitly documented; likely because we assume that 
users configure the filesystem through the flink-conf.yaml (and only 
document it as such), which inherently prevents that.


> Would it mean that in order to achieve this, we would have to set up 
two clusters and publishing to a temporary medium? For example, using 
two clusters one configured to access CephFS, one for AWS S3 then 
publish that to Kafka (or use Kafka Connect)?


That would be one approach, yes.

> We are requesting that [it should be possible to configure 
credentials per job]


It is exceedingly unlikely for this to be implemented in the foreseeable 
future.



There are some workarounds though.
For S3 in particular you could capitalize on the fact that we have 2 
filesystem plugins (s3-fs-hadoop and s3-fs-presto), which you could use 
at the same time so long as you use different schemes ( s3a (hadoop) / 
s3p (presto) ) for the different buckets.
You could also generalize this by taking an existing filesystem plugin 
from Flink and adjusting the contained FileSystemFactory to use a 
different scheme and config keys. It's a bit annyoing, but it should 
work (now and in the future).
Essentially you'd pretend that there are N completely different 
filesystems, but they are actually all the same implementation just with 
different configurations.


On 10/03/2022 13:30, Gil De Grove wrote:

Hello Chesnay,

Thanks for the reply.

I wonder something based on your reply, if we want to use two sets of 
credentials, for example to access two different AWS buckets, that 
would not be feasible at the moment?
One example I have in mind would be to separate the credentials for 
accessing data vs storing metadata for a given cluster.
Another use case would be to create a Flink job that consume data 
stored on AWS S3 and/or on MinIO.
Would it mean that in order to achieve this, we would have to set up 
two clusters and publishing to a temporary medium? For example, using 
two clusters one configured to access CephFS, one for AWS S3 then 
publish that to Kafka (or use Kafka Connect)?


We are requesting that, as we would like to use the Hybrid source with 
a FileSystem and a Kafka consumer, and this limitation would probably 
make us rethink the architecture.
As it seems that this limitation is quite an important one, is there a 
place where we can find this documented? Maybe a FLIP? Or an entry in 
the Flink Documentation?


Thanks again for your help,
Gil


On Thu, 10 Mar 2022 at 10:57, Chesnay Schepler  wrote:

The FileSystem class is essentially one big singleton, with only 1
instance of each FileSystem implementation being loaded, shared
across all jobs.
For that reason we do not support job-specific FileSystem
configurations.
Note that we generally also don't really support configuring the
FileSystems at runtime. The entire codebase assumes that the
initialization happens when the process is started.

You'll need to run that job in a separate cluster.

Overall, this sounds like something that should run externally;
assert some precondition, then configure Flink appropriately, then
run the job.

On 08/03/2022 08:47, Gil De Grove wrote:

Hello everyone,

First of all, sorry for cross posting, I asked on SO, but David
Anderson suggested me to reach out to the community via the
mailing list. The link to the SO question is the following:

https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job

I'll post the answer on SO as soon as I have one :)

I post here the content of the question, so if anyone can help,
please let me know;


  Summary

We are currently facing an issue with the FileSystem abstraction
in Flink. We have a job that can dynamically connect to an S3
source (meaning it's defined at runtime). We discovered a bug in
our code, and it could be due to a wrong assumption on the way
the FileSystem works.


  Bug explanation

During the initialization of the job, (so in the job manager) we
manipulate the FS to check that some files exist in order to fail
gracefully before the job is executed. In our case, we need to
set dynamically the FS. It can be either HDFS, S3 on AWS or S3 on
MinIO. We want the FS configuration to be specific for the job,
and different from the cluster one (different access key,
different endpoint, etc.).

Here is an extract of the code we are using to do so:

|private void validateFileSystemAccess(Configuration
configuration) throws IOException { // Create a plugin manager
from the configuration P

Re: Customizing backpressure mechanism for RichParallelSourceFunction

2022-03-10 Thread Chesnay Schepler

It's not possible to send events to sources; data only flows in 1 direction.

On 03/03/2022 06:31, Le Xu wrote:

Hello!

I have a dataflow pipeline built using Flink's 
RichParallelSourceFunction as parallel sources. I'm wondering if there 
are any mechanisms that I could use to implement *ack-based* 
back-pressure mechanism by sending ACK messages from operators (within 
the runtime) to source functions to achieve user-level back-pressure 
mechanisms? I understand that Flink uses flow control to perform back 
pressure internally but I'd like to check whether it is possible at 
all to send any events from operator to sources.


Thanks in advance!

Le





Re: Using another FileSystem configuration while creating a job

2022-03-10 Thread Chesnay Schepler
The FileSystem class is essentially one big singleton, with only 1 
instance of each FileSystem implementation being loaded, shared across 
all jobs.

For that reason we do not support job-specific FileSystem configurations.
Note that we generally also don't really support configuring the 
FileSystems at runtime. The entire codebase assumes that the 
initialization happens when the process is started.


You'll need to run that job in a separate cluster.

Overall, this sounds like something that should run externally; assert 
some precondition, then configure Flink appropriately, then run the job.


On 08/03/2022 08:47, Gil De Grove wrote:

Hello everyone,

First of all, sorry for cross posting, I asked on SO, but David 
Anderson suggested me to reach out to the community via the mailing 
list. The link to the SO question is the following: 
https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job


I'll post the answer on SO as soon as I have one :)

I post here the content of the question, so if anyone can help, please 
let me know;



  Summary

We are currently facing an issue with the FileSystem abstraction in 
Flink. We have a job that can dynamically connect to an S3 source 
(meaning it's defined at runtime). We discovered a bug in our code, 
and it could be due to a wrong assumption on the way the FileSystem works.



  Bug explanation

During the initialization of the job, (so in the job manager) we 
manipulate the FS to check that some files exist in order to fail 
gracefully before the job is executed. In our case, we need to set 
dynamically the FS. It can be either HDFS, S3 on AWS or S3 on MinIO. 
We want the FS configuration to be specific for the job, and different 
from the cluster one (different access key, different endpoint, etc.).


Here is an extract of the code we are using to do so:

|private void validateFileSystemAccess(Configuration configuration) 
throws IOException { // Create a plugin manager from the configuration 
PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(configuration); // Init 
the FileSystem from the configuration 
FileSystem.initialize(configuration, pluginManager); // Validate the 
FileSystem: an exception is thrown if FS configuration is wrong Path 
archiverPath = new Path(this.archiverPath); 
archiverPath.getFileSystem().exists(new Path("/")); } |


After starting that specific kind of job, we notice that:

 1. the checkpointing does not work for this job, it throws a
credential error.
 2. the job manager cannot upload the artifacts needed by the history
server for all jobs already running of all kind (not only this
specific kind of job).

If we do not deploy that kind of job, the upload of artifacts and the 
checkpointing work as expected on the cluster.


We think that this issue might come from the |FileSystem.initialize()| 
that overrides the configuration for all the FileSystems. We think 
that because of this, the next call to |FileSystem.get()| returns the 
FileSystem we configured in |validateFileSystemAccess| instead of the 
cluster configured one.



  Questions

Could our hypothesis be correct? If so, how could we provide a 
specific configuration for the FileSystem without impacting the whole 
cluster?


Regards,
Gil



Re: Savepoint API challenged with large savepoints

2022-03-10 Thread Chesnay Schepler

That all sounds very interesting; I'd go ahead with creating tickets.

On 08/03/2022 13:43, Schwalbe Matthias wrote:


Dear Flink Team,

In the last weeks I was faced with a large savepoint (around 40GiB) 
that contained lots of obsolete data points and overwhelmed our 
infrastructure (i.e. failed to load/restart).


We could not afford to lose the state, hence I spent the time to 
transcode the savepoint into something smaller (ended up with 2.5 GiB).


During my efforts I encountered a couple of points that make savepoint 
API uneasy with larger savepoints, found simple solutions …


I would like to contribute my findings and ‘fixes’, however on my 
corporate infrastructure I cannot fork/build Flink locally nor PR the 
changes later on.


Before creating Jira tickets I wanted to quickly discuss the matter.

Findings:

  * /(We are currently on Flink 1.13 (RocksDB state backend) but all
findings apply as well to the latest version)/
  * WritableSavepoint.write(…) falls back to
JobManagerCheckpointStorage which restricts savepoint size to 5MiB
  o See relevant exception stack here [1]
  o This is because
SavepointTaskManagerRuntimeInfo.getConfiguration() always
returns empty Configuration, hence
  o Neither “state.checkpoint-storage” nor “state.checkpoints.dir”
are/can be configured
  o ‘fix’: provide
SavepointTaskManagerRuntimeInfo.getConfiguration() with a
meaningful implementation and set configuration in
SavepointEnvironment.getTaskManagerInfo()
  * When loading a state, MultiStateKeyIterator load and bufferes the
whole state in memory before it event processes a single data point
  o This is absolutely no problem for small state (hence the unit
tests work fine)
  o MultiStateKeyIterator ctor sets up a java Stream that iterates
all state descriptors and flattens all datapoints contained within
  o The java.util.stream.Stream#flatMap function causes the
buffering of the whole data set when enumerated later on
  o See call stack [2]
  + I our case this is 150e6 data points (> 1GiB just for the
pointers to the data, let alone the data itself ~30GiB)
  o I’m not aware of some instrumentation if Stream in order to
avoid the problem, hence
  o I coded an alternative implementation of MultiStateKeyIterator
that avoids using java Stream,
  o I can contribute our implementation
(MultiStateKeyIteratorNoStreams)
  * I found out that, at least when using LocalFileSystem on a windows
system, read I/O to load a savepoint is unbuffered,
  o See example stack [3]
  o i.e. in order to load only a long in a serializer, it needs to
go into kernel mode 8 times and load the 8 bytes one by one
  o I coded a BufferedFSDataInputStreamWrapper that allows to
opt-in buffered reads on any FileSystem implementation
  o In our setting savepoint load is now 30 times faster
  o I’ve once seen a Jira ticket as to improve savepoint load time
in general (lost the link unfortunately), maybe this approach
can help with it
  o not sure if HDFS has got the same problem
  o I can contribute my implementation

Looking forward to your comments

Matthias (Thias) Schwalbe

[1] exception stack:

8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] 
ERROR BatchTask - Error in task code:  MapPartition 
(bb312595cb5ccc27fd3b2c729bbb9136) (2/4)


java.util.concurrent.ExecutionException: java.io.IOException: Size of 
the state is larger than the maximum permitted memory-backed state. 
Size=180075318 , maxSize=5242880 . Consider using a different state 
backend, like the File System State backend.


    at java.util.concurrent.FutureTask.report(FutureTask.java:122)

    at java.util.concurrent.FutureTask.get(FutureTask.java:192)

    at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)


    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)


    at 
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)


    at 
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)


    at 
org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)


    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)


    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)


    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)


    at 
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)


    at 

  1   2   3   4   5   6   7   8   9   10   >