Pull request 的 CI build 不能自动触发

2022-05-02 Thread mefor sy
大家好,我第一次提交 Flink PR https://github.com/apache/flink/pull/19617,构建失败修复后,CI
build 不能自动触发,  @flinkbot  run
azure 也不能,请问应该怎么处理?


Re: Table API filter scala表达方式提示类型不匹配

2022-05-02 Thread Arthur Li
没有问题了,因为隐式转换问题。

> 2022年4月30日 22:47,Arthur Li  写道:
> 
> Hi all,
> 
> 我在使用filter scala表达方式时,提示需要Expression类型,但是实际是String类型,但是这种写法在源码文档里是推荐的。
> 
> 我的代码:
> sensorTab
>   // 使用Expression方式,例如 tab.select($"key", $"value".avg + " The average" as 
> "average")
>   .select($"id", $"temperature")
>   // 使用Expression方式
>   //  .filter($("id").isEqual("sensor_1"))
>   // 使用这种方式,类型不匹配
>   // Required Expression, Found String
>   .filter($"id" === "sensor_1”) // error
>   .toDataStream
>   .print("select sensor_1")
> 
> 
> 源码文档:
> 
> Filters out elements that don't pass the filter predicate. Similar to a SQL 
> WHERE clause.
> Example:
>  
>  tab.filter($("name").isEqual("Fred"));
>  
> Scala Example:
>  
>  tab.filter($"name" === "Fred”)
> 
>  Table filter(Expression predicate);
> 



Re: flink operator sometimes cannot start jobmanager after upgrading

2022-05-02 Thread Yang Wang
I am afraid we do not handle the scenario that the JobManager deployment is
deleted externally.

Best,
Yang

Őrhidi Mátyás  于2022年5月2日周一 16:52写道:

> I filed a Jira for tracking this issue:
> https://issues.apache.org/jira/browse/FLINK-27468
>
> On Mon, May 2, 2022 at 10:31 AM Őrhidi Mátyás 
> wrote:
>
>> This can be reproduced simply by deleting the kubernetes deployment. The
>> operator cannot recover from this state automatically, by defining a
>> restartNonce on the deployment should recover the state.
>>
>> Regards,
>> Matyas
>>
>> On Mon, May 2, 2022 at 10:00 AM Márton Balassi 
>> wrote:
>>
>>> Hi ChangZhuo,
>>>
>>> Thanks for reporting this, I think I have just run into this myself too.
>>> Will try to reproduce it, but I do not fully comprehend it yet. If anyone
>>> has a way to reproduce it is more than welcome. :-)
>>>
>>> On Fri, Apr 29, 2022 at 12:16 PM ChangZhuo Chen (陳昌倬) 
>>> wrote:
>>>
 Hi,

 We found that flink operator [0] sometimes cannot start jobmanager after
 upgrading FlinkDeployment. We need to recreate FlinkDeployment to fix
 the problem. Anyone has this issue?

 The following is redacted log from flink operator. After status becomes
 MISSING, it keeps in MISSING status for at least 15 minutes.


 2022-04-29 09:41:15,141 o.a.f.c.d.a.c.ApplicationClusterDeployer
 [INFO ][namespace/flink-deployment-name] Submitting application in
 'Application Mode'.
 2022-04-29 09:41:15,145 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
 ][namespace/flink-deployment-name] The derived from fraction jvm overhead
 memory (2.400gb (2576980416 bytes)) is greater than its max value
 1024.000mb (1073741824 bytes), max value will be used instead
 2022-04-29 09:41:15,146 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
 ][namespace/flink-deployment-name] The derived from fraction jvm overhead
 memory (5.200gb (5583457568 bytes)) is greater than its max value
 1024.000mb (1073741824 bytes), max value will be used instead
 2022-04-29 09:41:15,146 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
 ][namespace/flink-deployment-name] The derived from fraction network memory
 (5.050gb (5422396292 bytes)) is greater than its max value 4.000gb
 (4294967296 bytes), max value will be used instead
 2022-04-29 09:41:15,237 o.a.f.k.u.KubernetesUtils  [INFO
 ][namespace/flink-deployment-name] Kubernetes deployment requires a fixed
 port. Configuration high-availability.jobmanager.port will be set to 6123
 2022-04-29 09:41:15,508 o.a.f.k.KubernetesClusterDescriptor [WARN
 ][namespace/flink-deployment-name] Please note that Flink client
 operations(e.g. cancel, list, stop, savepoint, etc.) won't work from
 outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type'
 has been set to ClusterIP.
 2022-04-29 09:41:15,508 o.a.f.k.KubernetesClusterDescriptor [INFO
 ][namespace/flink-deployment-name] Create flink application cluster
 flink-deployment-name successfully, JobManager Web Interface:
 http://flink-deployment-name.namespace:8081
 2022-04-29 09:41:15,510 o.a.f.k.o.s.FlinkService   [INFO
 ][namespace/flink-deployment-name] Application cluster successfully 
 deployed
 2022-04-29 09:41:15,583 o.a.f.k.o.c.FlinkDeploymentController [INFO
 ][namespace/flink-deployment-name] Reconciliation successfully completed
 2022-04-29 09:41:15,684 o.a.f.k.o.c.FlinkDeploymentController [INFO
 ][namespace/flink-deployment-name] Starting reconciliation
 2022-04-29 09:41:15,686 o.a.f.k.o.o.JobObserver[INFO
 ][namespace/flink-deployment-name] Observing JobManager deployment.
 Previous status: DEPLOYING
 2022-04-29 09:41:15,792 o.a.f.k.o.o.JobObserver[INFO
 ][namespace/flink-deployment-name] JobManager is being deployed
 2022-04-29 09:41:15,792 o.a.f.k.o.c.FlinkDeploymentController [INFO
 ][namespace/flink-deployment-name] Reconciliation successfully completed
 2022-04-29 09:41:20,795 o.a.f.k.o.c.FlinkDeploymentController [INFO
 ][namespace/flink-deployment-name] Starting reconciliation
 2022-04-29 09:41:20,797 o.a.f.k.o.o.JobObserver[INFO
 ][namespace/flink-deployment-name] Observing JobManager deployment.
 Previous status: DEPLOYING
 2022-04-29 09:41:20,896 o.a.f.k.o.o.JobObserver[INFO
 ][namespace/flink-deployment-name] JobManager is being deployed
 2022-04-29 09:41:20,897 o.a.f.k.o.c.FlinkDeploymentController [INFO
 ][namespace/flink-deployment-name] Reconciliation successfully completed
 2022-04-29 09:41:25,899 o.a.f.k.o.c.FlinkDeploymentController [INFO
 ][namespace/flink-deployment-name] Starting reconciliation
 2022-04-29 09:41:25,901 o.a.f.k.o.o.JobObserver[INFO
 ][namespace/flink-deployment-name] Observing JobManager deployment.
 Previous status: DEPLOYING
 

Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-02 Thread 陳昌倬
Hi,

We found that taskmanager Prometheus endpoint does not work after
upgrading from 1.14.3 to 1.15.0. Jobmanager Prometheus endpoint is okay
in 1.15.0, so we think the problem is not in image we used. Any idea how
to fix this problem?


Also, we found the following log in taskmanager, but not jobmanager. Not
sure if they are related to this issue.

2022-05-03 01:48:32,839 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name 'numBytesInLocal'. Metric will not be reported.[10.210.47.134, 
taskmanager, , , , 8, Shuffle, Netty, Input]
2022-05-03 01:48:32,839 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name 'numBytesInLocalPerSecond'. Metric will not be reported.[10.210.47.134, 
taskmanager, , , , 8, Shuffle, Netty, Input]
...


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Migrating Flink apps across cloud with state

2022-05-02 Thread Hemanga Borah
Hello,
 We are attempting to port our Flink applications from one cloud provider
to another.

 These Flink applications consume data from Kafka topics and output to
various destinations (Kafka or databases). The applications have states
stored in them. Some of these stored states are aggregations, for example,
at times we store hours (or days) worth of data to aggregate over time.
Some other applications have cached information for data enrichment, for
example, we store data in Flink state for days, so that we can join them
with newly arrived data. The amount of data on the input topics is a lot,
and it will be expensive to reprocess the data from the beginning of the
topic.

 As such, we want to retain the state of the application when we move to a
different cloud provider so that we can retain the aggregations and cache,
and do not have to start from the beginning of the input topics.

 We are replicating the Kafka topics using MirrorMaker 2. This is our
procedure:

   - Replicate the input topics of each Flink application from source cloud
   to destination cloud.
   - Take a savepoint of the Flink application on the source cloud provider.
   - Start the Flink application on the destination cloud provider using
   the savepoint from the source cloud provider.


However, this does not work as we want because there is a difference in
offset in the new topics in the new cloud provider (because of MirrorMaker
implementation). The offsets of the new topic do not match the ones stored
on the Flink savepoint, hence, Flink cannot map to the offsets of the new
topic during startup.

Has anyone tried to move clouds while retaining the Flink state?

Thanks,
Hemanga


RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-05-02 Thread Alexis Sarda-Espinosa
Ok

Regards,
Alexis.

From: Peter Brucia 
Sent: Freitag, 22. April 2022 15:31
To: Alexis Sarda-Espinosa 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

No
Sent from my iPhone



Re: How should I call external HTTP services with PyFlink?

2022-05-02 Thread Francis Conroy
Hi Dhavan,

We have looked at using pyflink for data stream enrichment and found the
performance lacking compared to the java counterpart. One option for you
might be to use statefun for the enrichment stages. We've also changed our
model for enrichment, we're pushing the enrichment data into the pipeline
instead of pulling it, but this won't work in a lot of situations.

Hope that gives you some ideas.

On Mon, 2 May 2022 at 22:54, Dhavan Vaidya 
wrote:

> Hello!
>
> I want to make HTTP(S) calls to enrich data streams. The HTTP services are
> running on our VPC, so the delay is limited, but sometimes these services
> end up calling third party APIs, and latencies become high.
>
> From documentation (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
> it seems PyFlink does not support "asyncio operator" like Java does (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
> Am I missing something? How should this be approached?
>
> Thanks!
>
> --
> Dhavan
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: Re: How to define event time and Watermark for intermediary joined table in FlinkSQL?

2022-05-02 Thread liuxiangcao
Thanks for the jira link.

Actually my comment in the initial email "In Java DataStream API, you can
easily do so within flink topology without having to create a separate
kafka topic: "  is incorrect.

I took a closer look and realized Flink Java DataStream also does not
support redefining TimestampAssigner on a JoinedStreams
.
It will simply use the event timestamp and watermark from the input
streams.

On Fri, Apr 29, 2022 at 7:35 AM Xuyang  wrote:

> I think it's not a good idea to defining a watermark on a view, because
> currently the view is only a set of SQL query text in Flink , and a query
> should not contain a watermark definition. You can see the discussion here:
> https://issues.apache.org/jira/browse/FLINK-22804
> Maybe you can open a jira again to discuss the behavior you expect.
>
> 在 2022-04-29 13:30:34,"liuxiangcao"  写道:
>
> Hi Shengkai,
>
> Thank you for the reply.
>
> The UDF getEventTimeInNS uses timestamps of both streamA and streamB to
> calculate the true event time for streamB events.
>
> For illustrating purpose, we can consider it to be like this:
>
> public Long eval(
> Long baseTimeStampFromA,
> Long timestampA
> Long timestampB) {
>   return baseTimeStampFromA + timestampB - timestampA;
> }
>
> Basically I need to redefine the event timestamp and watermark for the
> output stream of a join operator.
>
> You are right. Ideally I hope FlinkSQL can support defining a watermark on
> a view.  Do you know if this was discussed in the Flink community before?
> Wondering whether this may be supported in future.
>
> On Thu, Apr 21, 2022 at 2:44 AM Shengkai Fang  wrote:
>
>> Hi,
>>
>> The watermark of the join operator is the minimum of the watermark of the
>> input streams.
>>
>> ```
>> JoinOperator.watermark = min(left.watermark, right.watermark);
>> ```
>>
>> I think it's enough for most cases.  Could you share more details about
>> the logic in the UDF getEventTimeInNS?
>>
>> I think the better solution comparing to the intermediate table is to
>> define the watermark on the VIEW. But Flink doesn't support it now.
>>
>> Best,
>> Shengkai
>>
>>
>>
>>
>> liuxiangcao  于2022年4月16日周六 03:07写道:
>>
>>> Hi Flink community,
>>>
>>> *Here is the context: *
>>> Theoretically, I would like to write following query but it won't work
>>> since we can only define the WATERMARK in a table DDL:
>>>
>>> INSERT into tableC
>>> select tableA.field1
>>>  SUM(1) as `count`,
>>>  time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>>>  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>>> from tableA join tableB
>>> on tableA.joinCol == tableB.joinCol
>>> group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
>>> (note: getEventTimeInNS is a UDF that calculates event time using 
>>> tableA.timestamp and tableB.timestamp)
>>>
>>>
>>> so I have to define a intermediary table to store the results from
>>> joining, and defining event time and watermark in the table DDL, then
>>> performs tumbling windowing on the intermediary table:
>>>
>>> CREATE TABLE IntermediaryTable (
>>>field1,
>>>   `eventTimestampInNanoseconds`  BIGINT,
>>>time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/100, 3),
>>>WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'topic' = 'IntermediaryTable',
>>>   'properties.bootstrap.servers' = 'xx',
>>>   'properties.group.id' = 'contextevent-streaming-sql',
>>>   'format' = 'avro'
>>> );
>>>
>>> INSERT INTO IntermediaryTable
>>> select tableA.field1
>>>   tableB.field2,
>>>   getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>>> from tableA join tableB
>>> on tableA.joinCol == tableB.joinCol;
>>>
>>> Then, I can perform tumbling window aggregation on the IntermediaryTable:
>>>
>>> INSERT INTO countTable
>>> (select event.field1
>>> SUM(1) as `count`
>>>  from IntermediaryTable event
>>>  GROUP BY
>>>   TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
>>>   event.field1
>>> );
>>>
>>>
>>> This is not convenient because the IntermediaryTable writes to another
>>> kafka topic that is only used by the tumbling window aggregation. When I
>>> try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
>>> END;", it will fail complaining the topic does not exist. I either have to
>>> first create this kafka topic beforehand, or run a separate job to INSERT
>>> INTO IntermediaryTable.
>>>
>>> In Java DataStream API, you can easily do so within flink topology
>>> without having to create a separate kafka topic:
>>>
>>> final DataStream joinedStream =
>>>  StreamA.join(StreamB)
>>>  .where()
>>>  .equalTo()
>>>  .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
>>>  

Re: How to debug Metaspace exception?

2022-05-02 Thread John Smith
Ok, I don't think I'm running user code on the job manager. Basically. I'm
running a standalone cluster.

3 zookeepers
3 job managers
3 task managers.

I submit my jobs via the UI.

But in case I'll copy the config iver to the job managers.



On Mon, May 2, 2022 at 11:00 AM Chesnay Schepler  wrote:

> There are cases where user-code is run on the JobManager.
> I'm not sure whether though that applies to the JDBC sources.
>
> On 02/05/2022 15:45, John Smith wrote:
>
> Why do the JDBC jars need to be on the job manager node though?
>
> On Mon, May 2, 2022 at 9:36 AM Chesnay Schepler 
> wrote:
>
>> yes.
>> But if you can ensure that the driver isn't bundled by any user-jar you
>> can also skip the pattern configuration step.
>>
>> The pattern looks correct formatting-wise; you could try whether
>> com.microsoft.sqlserver.jdbc. is enough to solve the issue.
>>
>> On 02/05/2022 14:41, John Smith wrote:
>>
>> Oh, so I should copy the jars to the lib folder and
>> set classloader.parent-first-patterns.additional:
>> "org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the task
>> managers and job managers?
>>
>> Also is my pattern correct?
>> "org.apache.ignite.;com.microsoft.sqlserver.jdbc."
>>
>> Just to be sure I'm running a standalone cluster using zookeeper. So I
>> have 3 zookeepers, 3 job managers and 3 task managers.
>>
>>
>> On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler 
>> wrote:
>>
>>> And you do should make sure that it is set for both processes!
>>>
>>> On 02/05/2022 08:43, Chesnay Schepler wrote:
>>>
>>> The setting itself isn't taskmanager specific; it applies to both the
>>> job- and taskmanager process.
>>>
>>> On 02/05/2022 05:29, John Smith wrote:
>>>
>>> Also just to be sure this is a Task Manager setting right?
>>>
>>> On Thu, Apr 28, 2022 at 11:13 AM John Smith 
>>> wrote:
>>>
 I assume you will take action on your side to track and fix the doc? :)

 On Thu, Apr 28, 2022 at 11:12 AM John Smith 
 wrote:

> Ok so to summarize...
>
> - Build my job jar and have the JDBC driver as a compile only
> dependency and copy the JDBC driver to flink lib folder.
>
> Or
>
> - Build my job jar and include JDBC driver in the shadow, plus copy
> the JDBC driver in the flink lib folder, plus  make an entry in config for
> classloader.parent-first-patterns-additional
> 
>
>
> On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
> wrote:
>
>> I think what I meant was "either add it to /lib, or [if it is already
>> in /lib but also bundled in the jar] add it to the parent-first 
>> patterns."
>>
>> On 28/04/2022 15:56, Chesnay Schepler wrote:
>>
>> Pretty sure, even though I seemingly documented it incorrectly :)
>>
>> On 28/04/2022 15:49, John Smith wrote:
>>
>> You sure?
>>
>>-
>>
>>*JDBC*: JDBC drivers leak references outside the user code
>>classloader. To ensure that these classes are only loaded once you 
>> should
>>either add the driver jars to Flink’s lib/ folder, or add the
>>driver classes to the list of parent-first loaded class via
>>classloader.parent-first-patterns-additional
>>
>> 
>>.
>>
>>It says either or
>>
>>
>> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
>> wrote:
>>
>>> You're misinterpreting the docs.
>>>
>>> The parent/child-first classloading controls where Flink looks for a
>>> class *first*, specifically whether we first load from /lib or the
>>> user-jar.
>>> It does not allow you to load something from the user-jar in the
>>> parent classloader. That's just not how it works.
>>>
>>> It must be in /lib.
>>>
>>> On 27/04/2022 04:59, John Smith wrote:
>>>
>>> Hi Chesnay as per the docs...
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>>>
>>> You can either put the jars in task manager lib folder or use
>>> classloader.parent-first-patterns-additional
>>> 
>>>
>>> I prefer the latter like this: the dependency stays with the
>>> user-jar and not on the task manager.
>>>
>>> On Tue, Apr 26, 2022 at 9:52 PM John Smith 
>>> wrote:
>>>
 Ok so I should put the Apache ignite and my Microsoft drivers in
 the lib folders of my task managers?

 And then in my job jar only include them as compile time
 dependencies?


 On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler <

Re: How to debug Metaspace exception?

2022-05-02 Thread Chesnay Schepler

There are cases where user-code is run on the JobManager.
I'm not sure whether though that applies to the JDBC sources.

On 02/05/2022 15:45, John Smith wrote:

Why do the JDBC jars need to be on the job manager node though?

On Mon, May 2, 2022 at 9:36 AM Chesnay Schepler  
wrote:


yes.
But if you can ensure that the driver isn't bundled by any
user-jar you can also skip the pattern configuration step.

The pattern looks correct formatting-wise; you could try whether
com.microsoft.sqlserver.jdbc. is enough to solve the issue.

On 02/05/2022 14:41, John Smith wrote:

Oh, so I should copy the jars to the lib folder and
set classloader.parent-first-patterns.additional:
"org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the
task managers and job managers?

Also is my pattern correct?
"org.apache.ignite.;com.microsoft.sqlserver.jdbc."

Just to be sure I'm running a standalone cluster using zookeeper.
So I have 3 zookeepers, 3 job managers and 3 task managers.


On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler
 wrote:

And you do should make sure that it is set for both processes!

On 02/05/2022 08:43, Chesnay Schepler wrote:

The setting itself isn't taskmanager specific; it applies to
both the job- and taskmanager process.

On 02/05/2022 05:29, John Smith wrote:

Also just to be sure this is a Task Manager setting right?

On Thu, Apr 28, 2022 at 11:13 AM John Smith
 wrote:

I assume you will take action on your side to track and
fix the doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith
 wrote:

Ok so to summarize...

- Build my job jar and have the JDBC driver as a
compile only dependency and copy the JDBC driver to
flink lib folder.

Or

- Build my job jar and include JDBC driver in the
shadow, plus copy the JDBC driver in the flink lib
folder, plus  make an entry in config for
|classloader.parent-first-patterns-additional|




On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler
 wrote:

I think what I meant was "either add it to
/lib, or [if it is already in /lib but also
bundled in the jar] add it to the parent-first
patterns."

On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly
documented it incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references
outside the user code classloader. To
ensure that these classes are only loaded
once you should either add the driver
jars to Flink’s |lib/| folder, or add the
driver classes to the list of
parent-first loaded class via
|classloader.parent-first-patterns-additional|

.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay
Schepler  wrote:

You're misinterpreting the docs.

The parent/child-first classloading
controls where Flink looks for a class
/first/, specifically whether we first
load from /lib or the user-jar.
It does not allow you to load something
from the user-jar in the parent
classloader. That's just not how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task
manager lib folder or use
|classloader.parent-first-patterns-additional|



I prefer the latter like this: the
dependency stays with the user-jar and
not on the task manager.


trigger once (batch job with streaming semantics)

2022-05-02 Thread Georg Heiler
Hi,

spark
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
offers a variety of triggers.

In particular, it also has the "once" mode:

*One-time micro-batch* The query will execute *only one* micro-batch to
process all the available data and then stop on its own. This is useful in
scenarios you want to periodically spin up a cluster, process everything
that is available since the last period, and then shutdown the cluster. In
some case, this may lead to significant cost savings.

Does flink have a similar possibility?

Best,
Georg


????

2022-05-02 Thread ??????


Flink-SQL returning duplicate rows for some records

2022-05-02 Thread Joost Molenaar
Hello all,

I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by
Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason,
Flink-SQL shows a new row when I update the boolean field, but updates the
row in place when I update the text field, and I'm not understanding why
this happens. My ultimate goal is to use Flink-SQL to do a join on records
that come from both sides of a 1:N relation in the foreign database, to
expose a more ready to consume JSON object to downstream consumers.

The source table is defined like this in MS-SQL:

CREATE TABLE todo_list (
id int IDENTITY NOT NULL,
done bit NOT NULL DEFAULT 0,
name varchar(MAX) NOT NULL,
CONSTRAINT PK_todo_list PRIMARY KEY (id)
);

This is the configuration I'm sending to Debezium, note that I'm not
including the
JSON-schema in both keys and values:

{
"name": "todo-connector",
"config": {
"connector.class":
"io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.server.name": "mssql",
"database.hostname": "10.88.10.1",
"database.port": "1433",
"database.user": "sa",
"database.password": "...",
"database.dbname": "todo",
"database.history.kafka.bootstrap.servers": "10.88.10.10:9092",
"database.history.kafka.topic": "schema-changes.todo",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}

So Debezium is publishing events to Kafka with keys like this:

{"id":3}

And values like this (whitespace added for readability), this is updating the
value of the 'name' field:

{
  "before": {
"id": 3,
"done": false,
"name": "test"
  },
  "after": {
"id": 3,
"done": false,
"name": "test2"
  },
  "source": {
"version": "1.9.0.Final",
"connector": "sqlserver",
"name": "mssql",
"ts_ms": 1651497653043,
"snapshot": "false",
"db": "todo",
"sequence": null,
"schema": "dbo",
"table": "todo_list",
"change_lsn": "0025:0d58:0002",
"commit_lsn": "0025:0d58:0003",
"event_serial_no": 2
  },
  "op": "u",
  "ts_ms": 1651497654127,
  "transaction": null
}

(I verified this using a Python script that follows the relevant Kafka topic.)

Next, I'm trying to follow this CDC stream in Flink by adding the
Kafka connector
for Flink SQL, defining a source table and starting a job in the Flink-SQL CLI:

ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar';

CREATE TABLE todo_list (
k_id BIGINT,
done BOOLEAN,
name STRING
)
WITH (
'connector'='kafka',
'topic'='mssql.dbo.todo_list',
'properties.bootstrap.servers'='10.88.10.10:9092',
'properties.group.id'='flinksql-todo-list',
'scan.startup.mode'='earliest-offset',
'key.format'='json',
'key.fields-prefix'='k_',
'key.fields'='k_id',
'value.format'='debezium-json',
'value.debezium-json.schema-include'='false',
'value.fields-include'='EXCEPT_KEY'
);

SELECT * FROM todo_list;

Now, when I perform a query like this in the MS-SQL database:

UPDATE todo_list SET name='test2' WHERE id=3;

Now I see that the Flink-SQL client updates the row with id=3 to have the new
value "test2" for the 'name' field, as I was expecting. However, when I
duplicate the 'done' field to have a different value, Flink-SQL seems to leave
the old row with values (3, False, 'test2') intact, and shows a new row with
values (3, True, 'test2').

I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the first
parentheses in the CREATE TABLE statement, but this seems to make no
difference, except when running `DESCRIBE todo_list` in Flink-SQL.

I have no idea why the boolean field would cause different behavior than the
text field. Am I missing some piece of configuration, are my expectations
wrong?


Regards,
Joost Molenaar


Re: How to debug Metaspace exception?

2022-05-02 Thread John Smith
Why do the JDBC jars need to be on the job manager node though?

On Mon, May 2, 2022 at 9:36 AM Chesnay Schepler  wrote:

> yes.
> But if you can ensure that the driver isn't bundled by any user-jar you
> can also skip the pattern configuration step.
>
> The pattern looks correct formatting-wise; you could try whether
> com.microsoft.sqlserver.jdbc. is enough to solve the issue.
>
> On 02/05/2022 14:41, John Smith wrote:
>
> Oh, so I should copy the jars to the lib folder and
> set classloader.parent-first-patterns.additional:
> "org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the task
> managers and job managers?
>
> Also is my pattern correct?
> "org.apache.ignite.;com.microsoft.sqlserver.jdbc."
>
> Just to be sure I'm running a standalone cluster using zookeeper. So I
> have 3 zookeepers, 3 job managers and 3 task managers.
>
>
> On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler 
> wrote:
>
>> And you do should make sure that it is set for both processes!
>>
>> On 02/05/2022 08:43, Chesnay Schepler wrote:
>>
>> The setting itself isn't taskmanager specific; it applies to both the
>> job- and taskmanager process.
>>
>> On 02/05/2022 05:29, John Smith wrote:
>>
>> Also just to be sure this is a Task Manager setting right?
>>
>> On Thu, Apr 28, 2022 at 11:13 AM John Smith 
>> wrote:
>>
>>> I assume you will take action on your side to track and fix the doc? :)
>>>
>>> On Thu, Apr 28, 2022 at 11:12 AM John Smith 
>>> wrote:
>>>
 Ok so to summarize...

 - Build my job jar and have the JDBC driver as a compile only
 dependency and copy the JDBC driver to flink lib folder.

 Or

 - Build my job jar and include JDBC driver in the shadow, plus copy the
 JDBC driver in the flink lib folder, plus  make an entry in config for
 classloader.parent-first-patterns-additional
 


 On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
 wrote:

> I think what I meant was "either add it to /lib, or [if it is already
> in /lib but also bundled in the jar] add it to the parent-first patterns."
>
> On 28/04/2022 15:56, Chesnay Schepler wrote:
>
> Pretty sure, even though I seemingly documented it incorrectly :)
>
> On 28/04/2022 15:49, John Smith wrote:
>
> You sure?
>
>-
>
>*JDBC*: JDBC drivers leak references outside the user code
>classloader. To ensure that these classes are only loaded once you 
> should
>either add the driver jars to Flink’s lib/ folder, or add the
>driver classes to the list of parent-first loaded class via
>classloader.parent-first-patterns-additional
>
> 
>.
>
>It says either or
>
>
> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
> wrote:
>
>> You're misinterpreting the docs.
>>
>> The parent/child-first classloading controls where Flink looks for a
>> class *first*, specifically whether we first load from /lib or the
>> user-jar.
>> It does not allow you to load something from the user-jar in the
>> parent classloader. That's just not how it works.
>>
>> It must be in /lib.
>>
>> On 27/04/2022 04:59, John Smith wrote:
>>
>> Hi Chesnay as per the docs...
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>>
>> You can either put the jars in task manager lib folder or use
>> classloader.parent-first-patterns-additional
>> 
>>
>> I prefer the latter like this: the dependency stays with the user-jar
>> and not on the task manager.
>>
>> On Tue, Apr 26, 2022 at 9:52 PM John Smith 
>> wrote:
>>
>>> Ok so I should put the Apache ignite and my Microsoft drivers in the
>>> lib folders of my task managers?
>>>
>>> And then in my job jar only include them as compile time
>>> dependencies?
>>>
>>>
>>> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler <
>>> ches...@apache.org> wrote:
>>>
 JDBC drivers are well-known for leaking classloaders unfortunately.

 You have correctly identified your alternatives.

 You must put the jdbc driver into /lib instead. Setting only the
 parent-first pattern shouldn't affect anything.
 That is only relevant if something is in both in /lib and the
 user-jar, telling Flink to prioritize what is in lib.



 On 26/04/2022 15:35, John Smith wrote:

 So I put classloader.parent-first-patterns.additional:

Pyflink -> Redshift/S3/Firehose

2022-05-02 Thread Zain Haider Nemati
Hi,
I am working on writing a flink processor which has to send transformed
data to redshift/S3.
I do not find any sort of documentation for pyflink in reference to how to
send data to firehose,s3 or redshift. Would appreciate some help here.


Re: How to debug Metaspace exception?

2022-05-02 Thread Chesnay Schepler

yes.
But if you can ensure that the driver isn't bundled by any user-jar you 
can also skip the pattern configuration step.


The pattern looks correct formatting-wise; you could try whether 
com.microsoft.sqlserver.jdbc. is enough to solve the issue.


On 02/05/2022 14:41, John Smith wrote:
Oh, so I should copy the jars to the lib folder and 
set classloader.parent-first-patterns.additional: 
"org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the task 
managers and job managers?


Also is my pattern correct? 
"org.apache.ignite.;com.microsoft.sqlserver.jdbc."


Just to be sure I'm running a standalone cluster using zookeeper. So I 
have 3 zookeepers, 3 job managers and 3 task managers.



On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler  
wrote:


And you do should make sure that it is set for both processes!

On 02/05/2022 08:43, Chesnay Schepler wrote:

The setting itself isn't taskmanager specific; it applies to both
the job- and taskmanager process.

On 02/05/2022 05:29, John Smith wrote:

Also just to be sure this is a Task Manager setting right?

On Thu, Apr 28, 2022 at 11:13 AM John Smith
 wrote:

I assume you will take action on your side to track and fix
the doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith
 wrote:

Ok so to summarize...

- Build my job jar and have the JDBC driver as a compile
only dependency and copy the JDBC driver to flink lib
folder.

Or

- Build my job jar and include JDBC driver in the
shadow, plus copy the JDBC driver in the flink lib
folder, plus  make an entry in config for
|classloader.parent-first-patterns-additional|




On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler
 wrote:

I think what I meant was "either add it to /lib, or
[if it is already in /lib but also bundled in the
jar] add it to the parent-first patterns."

On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly documented it
incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside
the user code classloader. To ensure that
these classes are only loaded once you should
either add the driver jars to Flink’s
|lib/| folder, or add the driver classes to
the list of parent-first loaded class via
|classloader.parent-first-patterns-additional|

.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler
 wrote:

You're misinterpreting the docs.

The parent/child-first classloading controls
where Flink looks for a class /first/,
specifically whether we first load from /lib
or the user-jar.
It does not allow you to load something from
the user-jar in the parent classloader. That's
just not how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager
lib folder or use
|classloader.parent-first-patterns-additional|



I prefer the latter like this: the
dependency stays with the user-jar and not on
the task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and
my Microsoft drivers in the lib folders
of my task managers?

And then in my job jar only include them
as compile time dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay
Schepler  wrote:

JDBC drivers are well-known for
leaking classloaders unfortunately.


How should I call external HTTP services with PyFlink?

2022-05-02 Thread Dhavan Vaidya
Hello!

I want to make HTTP(S) calls to enrich data streams. The HTTP services are
running on our VPC, so the delay is limited, but sometimes these services
end up calling third party APIs, and latencies become high.

>From documentation (
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
it seems PyFlink does not support "asyncio operator" like Java does (
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
Am I missing something? How should this be approached?

Thanks!

--
Dhavan


Re: How to debug Metaspace exception?

2022-05-02 Thread John Smith
Oh, so I should copy the jars to the lib folder and
set classloader.parent-first-patterns.additional:
"org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the task
managers and job managers?

Also is my pattern correct?
"org.apache.ignite.;com.microsoft.sqlserver.jdbc."

Just to be sure I'm running a standalone cluster using zookeeper. So I have
3 zookeepers, 3 job managers and 3 task managers.


On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler  wrote:

> And you do should make sure that it is set for both processes!
>
> On 02/05/2022 08:43, Chesnay Schepler wrote:
>
> The setting itself isn't taskmanager specific; it applies to both the job-
> and taskmanager process.
>
> On 02/05/2022 05:29, John Smith wrote:
>
> Also just to be sure this is a Task Manager setting right?
>
> On Thu, Apr 28, 2022 at 11:13 AM John Smith 
> wrote:
>
>> I assume you will take action on your side to track and fix the doc? :)
>>
>> On Thu, Apr 28, 2022 at 11:12 AM John Smith 
>> wrote:
>>
>>> Ok so to summarize...
>>>
>>> - Build my job jar and have the JDBC driver as a compile only
>>> dependency and copy the JDBC driver to flink lib folder.
>>>
>>> Or
>>>
>>> - Build my job jar and include JDBC driver in the shadow, plus copy the
>>> JDBC driver in the flink lib folder, plus  make an entry in config for
>>> classloader.parent-first-patterns-additional
>>> 
>>>
>>>
>>> On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
>>> wrote:
>>>
 I think what I meant was "either add it to /lib, or [if it is already
 in /lib but also bundled in the jar] add it to the parent-first patterns."

 On 28/04/2022 15:56, Chesnay Schepler wrote:

 Pretty sure, even though I seemingly documented it incorrectly :)

 On 28/04/2022 15:49, John Smith wrote:

 You sure?

-

*JDBC*: JDBC drivers leak references outside the user code
classloader. To ensure that these classes are only loaded once you 
 should
either add the driver jars to Flink’s lib/ folder, or add the
driver classes to the list of parent-first loaded class via
classloader.parent-first-patterns-additional

 
.

It says either or


 On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
 wrote:

> You're misinterpreting the docs.
>
> The parent/child-first classloading controls where Flink looks for a
> class *first*, specifically whether we first load from /lib or the
> user-jar.
> It does not allow you to load something from the user-jar in the
> parent classloader. That's just not how it works.
>
> It must be in /lib.
>
> On 27/04/2022 04:59, John Smith wrote:
>
> Hi Chesnay as per the docs...
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>
> You can either put the jars in task manager lib folder or use
> classloader.parent-first-patterns-additional
> 
>
> I prefer the latter like this: the dependency stays with the user-jar
> and not on the task manager.
>
> On Tue, Apr 26, 2022 at 9:52 PM John Smith 
> wrote:
>
>> Ok so I should put the Apache ignite and my Microsoft drivers in the
>> lib folders of my task managers?
>>
>> And then in my job jar only include them as compile time
>> dependencies?
>>
>>
>> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
>> wrote:
>>
>>> JDBC drivers are well-known for leaking classloaders unfortunately.
>>>
>>> You have correctly identified your alternatives.
>>>
>>> You must put the jdbc driver into /lib instead. Setting only the
>>> parent-first pattern shouldn't affect anything.
>>> That is only relevant if something is in both in /lib and the
>>> user-jar, telling Flink to prioritize what is in lib.
>>>
>>>
>>>
>>> On 26/04/2022 15:35, John Smith wrote:
>>>
>>> So I put classloader.parent-first-patterns.additional:
>>> "org.apache.ignite." in the task config and so far I don't think I'm
>>> getting "java.lang.OutOfMemoryError: Metaspace" any more.
>>>
>>> Or it's too early to tell.
>>>
>>> Though now, the task managers are shutting down due to some
>>> other failures.
>>>
>>> So maybe because tasks were failing and reloading often the task
>>> manager was running out of Metspace. But now maybe it's just
>>> cleanly shutting down.
>>>
>>> On Wed, Apr 20, 2022 at 11:35 AM John Smith 
>>> wrote:
>>>
 Or I can put in the config 

Re: flink operator sometimes cannot start jobmanager after upgrading

2022-05-02 Thread Őrhidi Mátyás
I filed a Jira for tracking this issue:
https://issues.apache.org/jira/browse/FLINK-27468

On Mon, May 2, 2022 at 10:31 AM Őrhidi Mátyás 
wrote:

> This can be reproduced simply by deleting the kubernetes deployment. The
> operator cannot recover from this state automatically, by defining a
> restartNonce on the deployment should recover the state.
>
> Regards,
> Matyas
>
> On Mon, May 2, 2022 at 10:00 AM Márton Balassi 
> wrote:
>
>> Hi ChangZhuo,
>>
>> Thanks for reporting this, I think I have just run into this myself too.
>> Will try to reproduce it, but I do not fully comprehend it yet. If anyone
>> has a way to reproduce it is more than welcome. :-)
>>
>> On Fri, Apr 29, 2022 at 12:16 PM ChangZhuo Chen (陳昌倬) 
>> wrote:
>>
>>> Hi,
>>>
>>> We found that flink operator [0] sometimes cannot start jobmanager after
>>> upgrading FlinkDeployment. We need to recreate FlinkDeployment to fix
>>> the problem. Anyone has this issue?
>>>
>>> The following is redacted log from flink operator. After status becomes
>>> MISSING, it keeps in MISSING status for at least 15 minutes.
>>>
>>>
>>> 2022-04-29 09:41:15,141 o.a.f.c.d.a.c.ApplicationClusterDeployer
>>> [INFO ][namespace/flink-deployment-name] Submitting application in
>>> 'Application Mode'.
>>> 2022-04-29 09:41:15,145 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
>>> ][namespace/flink-deployment-name] The derived from fraction jvm overhead
>>> memory (2.400gb (2576980416 bytes)) is greater than its max value
>>> 1024.000mb (1073741824 bytes), max value will be used instead
>>> 2022-04-29 09:41:15,146 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
>>> ][namespace/flink-deployment-name] The derived from fraction jvm overhead
>>> memory (5.200gb (5583457568 bytes)) is greater than its max value
>>> 1024.000mb (1073741824 bytes), max value will be used instead
>>> 2022-04-29 09:41:15,146 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
>>> ][namespace/flink-deployment-name] The derived from fraction network memory
>>> (5.050gb (5422396292 bytes)) is greater than its max value 4.000gb
>>> (4294967296 bytes), max value will be used instead
>>> 2022-04-29 09:41:15,237 o.a.f.k.u.KubernetesUtils  [INFO
>>> ][namespace/flink-deployment-name] Kubernetes deployment requires a fixed
>>> port. Configuration high-availability.jobmanager.port will be set to 6123
>>> 2022-04-29 09:41:15,508 o.a.f.k.KubernetesClusterDescriptor [WARN
>>> ][namespace/flink-deployment-name] Please note that Flink client
>>> operations(e.g. cancel, list, stop, savepoint, etc.) won't work from
>>> outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type'
>>> has been set to ClusterIP.
>>> 2022-04-29 09:41:15,508 o.a.f.k.KubernetesClusterDescriptor [INFO
>>> ][namespace/flink-deployment-name] Create flink application cluster
>>> flink-deployment-name successfully, JobManager Web Interface:
>>> http://flink-deployment-name.namespace:8081
>>> 2022-04-29 09:41:15,510 o.a.f.k.o.s.FlinkService   [INFO
>>> ][namespace/flink-deployment-name] Application cluster successfully deployed
>>> 2022-04-29 09:41:15,583 o.a.f.k.o.c.FlinkDeploymentController [INFO
>>> ][namespace/flink-deployment-name] Reconciliation successfully completed
>>> 2022-04-29 09:41:15,684 o.a.f.k.o.c.FlinkDeploymentController [INFO
>>> ][namespace/flink-deployment-name] Starting reconciliation
>>> 2022-04-29 09:41:15,686 o.a.f.k.o.o.JobObserver[INFO
>>> ][namespace/flink-deployment-name] Observing JobManager deployment.
>>> Previous status: DEPLOYING
>>> 2022-04-29 09:41:15,792 o.a.f.k.o.o.JobObserver[INFO
>>> ][namespace/flink-deployment-name] JobManager is being deployed
>>> 2022-04-29 09:41:15,792 o.a.f.k.o.c.FlinkDeploymentController [INFO
>>> ][namespace/flink-deployment-name] Reconciliation successfully completed
>>> 2022-04-29 09:41:20,795 o.a.f.k.o.c.FlinkDeploymentController [INFO
>>> ][namespace/flink-deployment-name] Starting reconciliation
>>> 2022-04-29 09:41:20,797 o.a.f.k.o.o.JobObserver[INFO
>>> ][namespace/flink-deployment-name] Observing JobManager deployment.
>>> Previous status: DEPLOYING
>>> 2022-04-29 09:41:20,896 o.a.f.k.o.o.JobObserver[INFO
>>> ][namespace/flink-deployment-name] JobManager is being deployed
>>> 2022-04-29 09:41:20,897 o.a.f.k.o.c.FlinkDeploymentController [INFO
>>> ][namespace/flink-deployment-name] Reconciliation successfully completed
>>> 2022-04-29 09:41:25,899 o.a.f.k.o.c.FlinkDeploymentController [INFO
>>> ][namespace/flink-deployment-name] Starting reconciliation
>>> 2022-04-29 09:41:25,901 o.a.f.k.o.o.JobObserver[INFO
>>> ][namespace/flink-deployment-name] Observing JobManager deployment.
>>> Previous status: DEPLOYING
>>> 2022-04-29 09:41:25,997 o.a.f.k.o.o.JobObserver[INFO
>>> ][namespace/flink-deployment-name] JobManager is being deployed
>>> 2022-04-29 09:41:25,998 o.a.f.k.o.c.FlinkDeploymentController [INFO
>>> ][namespace/flink-deployment-name] 

Re: flink operator sometimes cannot start jobmanager after upgrading

2022-05-02 Thread Őrhidi Mátyás
This can be reproduced simply by deleting the kubernetes deployment. The
operator cannot recover from this state automatically, by defining a
restartNonce on the deployment should recover the state.

Regards,
Matyas

On Mon, May 2, 2022 at 10:00 AM Márton Balassi 
wrote:

> Hi ChangZhuo,
>
> Thanks for reporting this, I think I have just run into this myself too.
> Will try to reproduce it, but I do not fully comprehend it yet. If anyone
> has a way to reproduce it is more than welcome. :-)
>
> On Fri, Apr 29, 2022 at 12:16 PM ChangZhuo Chen (陳昌倬) 
> wrote:
>
>> Hi,
>>
>> We found that flink operator [0] sometimes cannot start jobmanager after
>> upgrading FlinkDeployment. We need to recreate FlinkDeployment to fix
>> the problem. Anyone has this issue?
>>
>> The following is redacted log from flink operator. After status becomes
>> MISSING, it keeps in MISSING status for at least 15 minutes.
>>
>>
>> 2022-04-29 09:41:15,141 o.a.f.c.d.a.c.ApplicationClusterDeployer
>> [INFO ][namespace/flink-deployment-name] Submitting application in
>> 'Application Mode'.
>> 2022-04-29 09:41:15,145 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
>> ][namespace/flink-deployment-name] The derived from fraction jvm overhead
>> memory (2.400gb (2576980416 bytes)) is greater than its max value
>> 1024.000mb (1073741824 bytes), max value will be used instead
>> 2022-04-29 09:41:15,146 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
>> ][namespace/flink-deployment-name] The derived from fraction jvm overhead
>> memory (5.200gb (5583457568 bytes)) is greater than its max value
>> 1024.000mb (1073741824 bytes), max value will be used instead
>> 2022-04-29 09:41:15,146 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
>> ][namespace/flink-deployment-name] The derived from fraction network memory
>> (5.050gb (5422396292 bytes)) is greater than its max value 4.000gb
>> (4294967296 bytes), max value will be used instead
>> 2022-04-29 09:41:15,237 o.a.f.k.u.KubernetesUtils  [INFO
>> ][namespace/flink-deployment-name] Kubernetes deployment requires a fixed
>> port. Configuration high-availability.jobmanager.port will be set to 6123
>> 2022-04-29 09:41:15,508 o.a.f.k.KubernetesClusterDescriptor [WARN
>> ][namespace/flink-deployment-name] Please note that Flink client
>> operations(e.g. cancel, list, stop, savepoint, etc.) won't work from
>> outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type'
>> has been set to ClusterIP.
>> 2022-04-29 09:41:15,508 o.a.f.k.KubernetesClusterDescriptor [INFO
>> ][namespace/flink-deployment-name] Create flink application cluster
>> flink-deployment-name successfully, JobManager Web Interface:
>> http://flink-deployment-name.namespace:8081
>> 2022-04-29 09:41:15,510 o.a.f.k.o.s.FlinkService   [INFO
>> ][namespace/flink-deployment-name] Application cluster successfully deployed
>> 2022-04-29 09:41:15,583 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][namespace/flink-deployment-name] Reconciliation successfully completed
>> 2022-04-29 09:41:15,684 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][namespace/flink-deployment-name] Starting reconciliation
>> 2022-04-29 09:41:15,686 o.a.f.k.o.o.JobObserver[INFO
>> ][namespace/flink-deployment-name] Observing JobManager deployment.
>> Previous status: DEPLOYING
>> 2022-04-29 09:41:15,792 o.a.f.k.o.o.JobObserver[INFO
>> ][namespace/flink-deployment-name] JobManager is being deployed
>> 2022-04-29 09:41:15,792 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][namespace/flink-deployment-name] Reconciliation successfully completed
>> 2022-04-29 09:41:20,795 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][namespace/flink-deployment-name] Starting reconciliation
>> 2022-04-29 09:41:20,797 o.a.f.k.o.o.JobObserver[INFO
>> ][namespace/flink-deployment-name] Observing JobManager deployment.
>> Previous status: DEPLOYING
>> 2022-04-29 09:41:20,896 o.a.f.k.o.o.JobObserver[INFO
>> ][namespace/flink-deployment-name] JobManager is being deployed
>> 2022-04-29 09:41:20,897 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][namespace/flink-deployment-name] Reconciliation successfully completed
>> 2022-04-29 09:41:25,899 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][namespace/flink-deployment-name] Starting reconciliation
>> 2022-04-29 09:41:25,901 o.a.f.k.o.o.JobObserver[INFO
>> ][namespace/flink-deployment-name] Observing JobManager deployment.
>> Previous status: DEPLOYING
>> 2022-04-29 09:41:25,997 o.a.f.k.o.o.JobObserver[INFO
>> ][namespace/flink-deployment-name] JobManager is being deployed
>> 2022-04-29 09:41:25,998 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][namespace/flink-deployment-name] Reconciliation successfully completed
>> 2022-04-29 09:41:29,518 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][namespace/flink-deployment-name] Starting reconciliation
>> 2022-04-29 09:41:29,520 o.a.f.k.o.o.JobObserver[INFO
>> 

Re: flink operator sometimes cannot start jobmanager after upgrading

2022-05-02 Thread Márton Balassi
Hi ChangZhuo,

Thanks for reporting this, I think I have just run into this myself too.
Will try to reproduce it, but I do not fully comprehend it yet. If anyone
has a way to reproduce it is more than welcome. :-)

On Fri, Apr 29, 2022 at 12:16 PM ChangZhuo Chen (陳昌倬) 
wrote:

> Hi,
>
> We found that flink operator [0] sometimes cannot start jobmanager after
> upgrading FlinkDeployment. We need to recreate FlinkDeployment to fix
> the problem. Anyone has this issue?
>
> The following is redacted log from flink operator. After status becomes
> MISSING, it keeps in MISSING status for at least 15 minutes.
>
>
> 2022-04-29 09:41:15,141 o.a.f.c.d.a.c.ApplicationClusterDeployer [INFO
> ][namespace/flink-deployment-name] Submitting application in 'Application
> Mode'.
> 2022-04-29 09:41:15,145 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
> ][namespace/flink-deployment-name] The derived from fraction jvm overhead
> memory (2.400gb (2576980416 bytes)) is greater than its max value
> 1024.000mb (1073741824 bytes), max value will be used instead
> 2022-04-29 09:41:15,146 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
> ][namespace/flink-deployment-name] The derived from fraction jvm overhead
> memory (5.200gb (5583457568 bytes)) is greater than its max value
> 1024.000mb (1073741824 bytes), max value will be used instead
> 2022-04-29 09:41:15,146 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
> ][namespace/flink-deployment-name] The derived from fraction network memory
> (5.050gb (5422396292 bytes)) is greater than its max value 4.000gb
> (4294967296 bytes), max value will be used instead
> 2022-04-29 09:41:15,237 o.a.f.k.u.KubernetesUtils  [INFO
> ][namespace/flink-deployment-name] Kubernetes deployment requires a fixed
> port. Configuration high-availability.jobmanager.port will be set to 6123
> 2022-04-29 09:41:15,508 o.a.f.k.KubernetesClusterDescriptor [WARN
> ][namespace/flink-deployment-name] Please note that Flink client
> operations(e.g. cancel, list, stop, savepoint, etc.) won't work from
> outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type'
> has been set to ClusterIP.
> 2022-04-29 09:41:15,508 o.a.f.k.KubernetesClusterDescriptor [INFO
> ][namespace/flink-deployment-name] Create flink application cluster
> flink-deployment-name successfully, JobManager Web Interface:
> http://flink-deployment-name.namespace:8081
> 2022-04-29 09:41:15,510 o.a.f.k.o.s.FlinkService   [INFO
> ][namespace/flink-deployment-name] Application cluster successfully deployed
> 2022-04-29 09:41:15,583 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][namespace/flink-deployment-name] Reconciliation successfully completed
> 2022-04-29 09:41:15,684 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][namespace/flink-deployment-name] Starting reconciliation
> 2022-04-29 09:41:15,686 o.a.f.k.o.o.JobObserver[INFO
> ][namespace/flink-deployment-name] Observing JobManager deployment.
> Previous status: DEPLOYING
> 2022-04-29 09:41:15,792 o.a.f.k.o.o.JobObserver[INFO
> ][namespace/flink-deployment-name] JobManager is being deployed
> 2022-04-29 09:41:15,792 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][namespace/flink-deployment-name] Reconciliation successfully completed
> 2022-04-29 09:41:20,795 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][namespace/flink-deployment-name] Starting reconciliation
> 2022-04-29 09:41:20,797 o.a.f.k.o.o.JobObserver[INFO
> ][namespace/flink-deployment-name] Observing JobManager deployment.
> Previous status: DEPLOYING
> 2022-04-29 09:41:20,896 o.a.f.k.o.o.JobObserver[INFO
> ][namespace/flink-deployment-name] JobManager is being deployed
> 2022-04-29 09:41:20,897 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][namespace/flink-deployment-name] Reconciliation successfully completed
> 2022-04-29 09:41:25,899 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][namespace/flink-deployment-name] Starting reconciliation
> 2022-04-29 09:41:25,901 o.a.f.k.o.o.JobObserver[INFO
> ][namespace/flink-deployment-name] Observing JobManager deployment.
> Previous status: DEPLOYING
> 2022-04-29 09:41:25,997 o.a.f.k.o.o.JobObserver[INFO
> ][namespace/flink-deployment-name] JobManager is being deployed
> 2022-04-29 09:41:25,998 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][namespace/flink-deployment-name] Reconciliation successfully completed
> 2022-04-29 09:41:29,518 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][namespace/flink-deployment-name] Starting reconciliation
> 2022-04-29 09:41:29,520 o.a.f.k.o.o.JobObserver[INFO
> ][namespace/flink-deployment-name] Observing JobManager deployment.
> Previous status: DEPLOYING
> 2022-04-29 09:41:30,631 o.a.f.k.o.o.JobObserver[INFO
> ][namespace/flink-deployment-name] JobManager is being deployed
> 2022-04-29 09:41:30,631 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][namespace/flink-deployment-name] Reconciliation 

Re: How to debug Metaspace exception?

2022-05-02 Thread Chesnay Schepler

And you do should make sure that it is set for both processes!

On 02/05/2022 08:43, Chesnay Schepler wrote:
The setting itself isn't taskmanager specific; it applies to both the 
job- and taskmanager process.


On 02/05/2022 05:29, John Smith wrote:

Also just to be sure this is a Task Manager setting right?

On Thu, Apr 28, 2022 at 11:13 AM John Smith  
wrote:


I assume you will take action on your side to track and fix the
doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith
 wrote:

Ok so to summarize...

- Build my job jar and have the JDBC driver as a compile only
dependency and copy the JDBC driver to flink lib folder.

Or

- Build my job jar and include JDBC driver in the shadow,
plus copy the JDBC driver in the flink lib folder, plus  make
an entry in config for
|classloader.parent-first-patterns-additional|




On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler
 wrote:

I think what I meant was "either add it to /lib, or [if
it is already in /lib but also bundled in the jar] add it
to the parent-first patterns."

On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly documented it
incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside the
user code classloader. To ensure that these classes
are only loaded once you should either add the
driver jars to Flink’s |lib/| folder, or add the
driver classes to the list of parent-first loaded
class via
|classloader.parent-first-patterns-additional|

.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler
 wrote:

You're misinterpreting the docs.

The parent/child-first classloading controls where
Flink looks for a class /first/, specifically
whether we first load from /lib or the user-jar.
It does not allow you to load something from the
user-jar in the parent classloader. That's just not
how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager lib
folder or use
|classloader.parent-first-patterns-additional|



I prefer the latter like this: the
dependency stays with the user-jar and not on the
task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and my
Microsoft drivers in the lib folders of my
task managers?

And then in my job jar only include them as
compile time dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay
Schepler  wrote:

JDBC drivers are well-known for leaking
classloaders unfortunately.

You have correctly identified your
alternatives.

You must put the jdbc driver into /lib
instead. Setting only the parent-first
pattern shouldn't affect anything.
That is only relevant if something is in
both in /lib and the user-jar, telling
Flink to prioritize what is in lib.



On 26/04/2022 15:35, John Smith wrote:

So I
put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config
and so far I don't think I'm getting
"java.lang.OutOfMemoryError: Metaspace"
any more.

Or it's too early to tell.

Though now, the task managers are
shutting down due to some other failures.

So maybe because tasks were 

Re: How to debug Metaspace exception?

2022-05-02 Thread Chesnay Schepler
The setting itself isn't taskmanager specific; it applies to both the 
job- and taskmanager process.


On 02/05/2022 05:29, John Smith wrote:

Also just to be sure this is a Task Manager setting right?

On Thu, Apr 28, 2022 at 11:13 AM John Smith  
wrote:


I assume you will take action on your side to track and fix the
doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith
 wrote:

Ok so to summarize...

- Build my job jar and have the JDBC driver as a compile only
dependency and copy the JDBC driver to flink lib folder.

Or

- Build my job jar and include JDBC driver in the shadow, plus
copy the JDBC driver in the flink lib folder, plus  make an
entry in config for
|classloader.parent-first-patterns-additional|




On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler
 wrote:

I think what I meant was "either add it to /lib, or [if it
is already in /lib but also bundled in the jar] add it to
the parent-first patterns."

On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly documented it
incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside the
user code classloader. To ensure that these classes
are only loaded once you should either add the
driver jars to Flink’s |lib/| folder, or add the
driver classes to the list of parent-first loaded
class via
|classloader.parent-first-patterns-additional|

.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler
 wrote:

You're misinterpreting the docs.

The parent/child-first classloading controls where
Flink looks for a class /first/, specifically
whether we first load from /lib or the user-jar.
It does not allow you to load something from the
user-jar in the parent classloader. That's just not
how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager lib
folder or use
|classloader.parent-first-patterns-additional|



I prefer the latter like this: the dependency stays
with the user-jar and not on the task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and my
Microsoft drivers in the lib folders of my task
managers?

And then in my job jar only include them as
compile time dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay
Schepler  wrote:

JDBC drivers are well-known for leaking
classloaders unfortunately.

You have correctly identified your
alternatives.

You must put the jdbc driver into /lib
instead. Setting only the parent-first
pattern shouldn't affect anything.
That is only relevant if something is in
both in /lib and the user-jar, telling
Flink to prioritize what is in lib.



On 26/04/2022 15:35, John Smith wrote:

So I
put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config
and so far I don't think I'm getting
"java.lang.OutOfMemoryError: Metaspace"
any more.

Or it's too early to tell.

Though now, the task managers are shutting
down due to some other failures.

So maybe because tasks were failing and
reloading often the task manager was
running out of Metspace. But 

About job execution

2022-05-02 Thread Ww J
Hello,

I read some articles on internet about job execution of Flink and have some 
questions. When the dispatcher receives a job, the dispatcher will start a 
JobManager. After the job is finished, will the JobManager be shut down? For 
the TaskManager, after the job is finished, will the TaskManager be shut down? 
Or is the TaskManager shared among all the tasks?

Thanks.

Jack