Re: StreamingFileSink question

2022-08-31 Thread David Anderson
If I remember correctly, there's a fix for this in Flink 1.14 (but the
feature is disabled by default in 1.14, and enabled by default in 1.15).
(I'm thinking
that execution.checkpointing.checkpoints-after-tasks-finish.enabled [1]
takes care of this.)

With Flink 1.13 I believe you'll have to handle this yourself somehow.

Regards,
David

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled

On Wed, Aug 31, 2022 at 6:26 AM David Clutter 
wrote:

> I am using Flink 1.13.1 on AWS EMR 6.4.  I have an existing application
> using DataStream API that I would like to modify to write output to S3.  I
> am testing the StreamingFileSink with a bounded input.  I have enabled
> checkpointing.
>
> A couple questions:
> 1) When the program finishes, all the files remain .inprogress.  Is that
> "Important Note 2" in the documentation
> ?
> Is there a solution to this other than renaming the files myself?  Renaming
> the files in S3 could be costly I think.
>
> 2) If I use a deprecated method such as DataStream.writeAsText() is that
> guaranteed to write *all* the records from the stream, as long as the job
> does not fail?  I understand checkpointing will not be effective here.
>
> Thanks,
> David
>


Re: Why this example does not save anything to file?

2022-08-31 Thread David Anderson
With the parallelism set to 2, you will get 2 files. More than 2, actually
-- e.g., with hourly buckets, you'll get 2 files per hour.

If the i/o bandwidth of a single instance is sufficient to handle your
expected throughput, then you can set the parallelism of the sink (or the
entire pipeline) to one. If it's not, then you'll be glad to have multiple
instances each handling a slice of the job and writing to independent
filesystems.

David

On Wed, Aug 31, 2022 at 8:44 AM  wrote:

>
> Doesn't it depends on 'sink.parallelism'?
> If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' =
> '1' just one file...
>
> But I think doing like that I reduce the number of tasks so it will have
> negative impact on performance :-(
>
>
>
> Sent: Tuesday, August 30, 2022 at 3:22 PM
> From: "Martijn Visser" 
> To: pod...@gmx.com, user@flink.apache.org
> Subject: Re: Why this example does not save anything to file?
>
> Hi Mike,
>
> I think that's caused by you not having enabled checkpointing. If you
> enable that, it should be resolved I think.
>
> Best regards,
>
> Martijn
>
> On Wed, Aug 3, 2022 at 9:01 PM mailto:pod...@gmx.com]>
> wrote:
> Thank you very much Martijn you dedicated your productive time to help me!
> I'm new noob in this subject - I took that example somewhere from
> Internet. I see problem for guys like me is that Flink syntax changes from
> version to version quite significantly. So here not 'connector.type' but
> 'connector' etc.
>
> Additional problem was that there was no error that something is wrong and
> in addition 'select from' in next lines display result from table...
>
> Anyway, I was expecting single file 'test5.txt' as a result but got file
> for each row.
>
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
> ...
>
>
> Can it be just one file?
> Best,
>
> Mike
>
>
> Sent: Wednesday, August 03, 2022 at 4:03 PM
> From: "Martijn Visser"  martijnvis...@apache.org]>
> To: pod...@gmx.com[mailto:pod...@gmx.com]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org]
> Subject: Re: Why this example does not save anything to file?
>
> I've verified your code locally and it doesn't work indeed, at least not
> with the latest Flink version (I've tested it with Flink 1.15). There are a
> couple of reasons for that:
>
> 1. You've mentioned in this thread that there's no problem with the
> 'csv.field-delimiter'. There is actually, because the default is a , and
> not a ; as documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options]
> 2. When adding this option, Flink wouldn't compile because the SQL
> statement uses options that are different then documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D]
> .
> You have connector.type, connector.path and format.type listed. It should
> be connector, path and format.
>
> In the end, I used the following code and the expect result was properly
> written:
>
> tEnv.executeSql(
> "CREATE TABLE Table1 (column_name1 STRING, column_name2
> DOUBLE) WITH ('connector' = 'filesystem', 'path' =
> 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' =
> ';')");
>
> Best regards,
>
> Martijn
>
> Op di 2 aug. 2022 om 00:42 schreef mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:
>
> No, I do not have it
>
>
>
> Sent: Monday, August 01, 2022 at 4:43 PM
> From: "Martijn Visser"  martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:
> martijnvis...@apache.org]]>
> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:
> pod...@gmx.com]]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org][mailto:
> user@flink.apache.org[mailto:user@flink.apache.org]]
> Subject: Re: Why this example does not save anything to file?
>
> That's Flink fault-tolerance mechanism, see
> 

Re: Why this example does not save anything to file?

2022-08-31 Thread podunk


Doesn't it depends on 'sink.parallelism'?
If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' = '1' 
just one file...
 
But I think doing like that I reduce the number of tasks so it will have 
negative impact on performance :-(
 
 

Sent: Tuesday, August 30, 2022 at 3:22 PM
From: "Martijn Visser" 
To: pod...@gmx.com, user@flink.apache.org
Subject: Re: Why this example does not save anything to file?

Hi Mike,
 
I think that's caused by you not having enabled checkpointing. If you enable 
that, it should be resolved I think.
 
Best regards,
 
Martijn 

On Wed, Aug 3, 2022 at 9:01 PM mailto:pod...@gmx.com]> wrote:
Thank you very much Martijn you dedicated your productive time to help me!
I'm new noob in this subject - I took that example somewhere from Internet. I 
see problem for guys like me is that Flink syntax changes from version to 
version quite significantly. So here not 'connector.type' but 'connector' etc.
 
Additional problem was that there was no error that something is wrong and in 
addition 'select from' in next lines display result from table...
 
Anyway, I was expecting single file 'test5.txt' as a result but got file for 
each row.
 
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
...
 

Can it be just one file?
Best,
 
Mike
 

Sent: Wednesday, August 03, 2022 at 4:03 PM
From: "Martijn Visser" 
mailto:martijnvis...@apache.org]>
To: pod...@gmx.com[mailto:pod...@gmx.com]
Cc: user@flink.apache.org[mailto:user@flink.apache.org]
Subject: Re: Why this example does not save anything to file?

I've verified your code locally and it doesn't work indeed, at least not with 
the latest Flink version (I've tested it with Flink 1.15). There are a couple 
of reasons for that:
 
1. You've mentioned in this thread that there's no problem with the 
'csv.field-delimiter'. There is actually, because the default is a , and not a 
; as documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options]
2. When adding this option, Flink wouldn't compile because the SQL statement 
uses options that are different then documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D].
 You have connector.type, connector.path and format.type listed. It should be 
connector, path and format. 
 
In the end, I used the following code and the expect result was properly 
written:

        tEnv.executeSql(
                "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) 
WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 
'format' = 'csv', 'csv.field-delimiter' = ';')");
 
Best regards,
 
Martijn 

Op di 2 aug. 2022 om 00:42 schreef 
mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:

No, I do not have it
 
 

Sent: Monday, August 01, 2022 at 4:43 PM
From: "Martijn Visser" 
mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]>
To: 
pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]
Cc: 
user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]
Subject: Re: Why this example does not save anything to file?

That's Flink fault-tolerance mechanism, see 
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/][https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D]
 

Op ma 1 aug. 2022 om 16:37 schreef 
mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:

What's that?
 
 

Sent: Monday, August 01, 2022 at 2:49 PM
From: "Martijn Visser" 
mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]>
To: 
pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]
Cc: 
user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]
Subject: Re: Why this example does not save anything to file?

Do you have checkpointing enabled? 
 

Op za 30 jul. 2022 om 17:31 schreef 
mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:

Thanks David but there's no problem with that (probably ";" is default 
separator).
I can read the file and insert into "Table1" (I said that in 

Re:Re: get NoSuchMethodError when using flink flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar

2022-08-31 Thread Xuyang
Hi, Liu.
It seems that you may use other own jars and thay has the common-lang3 with 
other versions, which may cause the version conflict.
My suggesstion is that you can shade this dependency in your own jars or in 
'flink-table-planner', and the latter may require you to compile flink manually.




--

Best!
Xuyang




在 2022-08-31 20:28:43,"yuxia"  写道:

How do you use `flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar`? Do you use sql 
client ? Do you put it in FLINK_HOME/lib?
If it's for sql client, I think you can remove the jar from  FLINK_HOME/lib, 
but add it in Flink SQL client using `add jar 
'flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar' `, and set 
'org.apache.commons.' the to parent-first[1]

But I think the better way is to relocate the class.
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-default


Best regards,
Yuxia


发件人: "Liting Liu (litiliu)" 
收件人: "User" 
发送时间: 星期三, 2022年 8 月 31日 下午 5:14:35
主题: get NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar



Hi, i got NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar.
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoSuchMethodError: 
org.apache.commons.lang3.StringUtils.join([IC)Ljava/lang/String;
at 
org.apache.flink.table.planner.plan.utils.RankProcessStrategy$UpdateFastStrategy.toString(RankProcessStrategy.java:129)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.explain(RelDescriptionWriterImpl.java:67)
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.done(RelDescriptionWriterImpl.java:96)
at 
org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:246)
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription(FlinkRelNode.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription$(FlinkRelNode.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank.getRelDetailedDescription(StreamPhysicalRank.scala:41)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:701)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1(FlinkChangelogModeInferenceProgram.scala:738)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1$adapted(FlinkChangelogModeInferenceProgram.scala:730)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitRankStrategies(FlinkChangelogModeInferenceProgram.scala:730)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:489)
   
Seems there is an embeded StringUtils in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. which confilict with other 
class.


What should i do?
Do I have to manually excude StringUtils.class in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar?
   




StreamingFileSink question

2022-08-31 Thread David Clutter
I am using Flink 1.13.1 on AWS EMR 6.4.  I have an existing application
using DataStream API that I would like to modify to write output to S3.  I
am testing the StreamingFileSink with a bounded input.  I have enabled
checkpointing.

A couple questions:
1) When the program finishes, all the files remain .inprogress.  Is that
"Important Note 2" in the documentation
?
Is there a solution to this other than renaming the files myself?  Renaming
the files in S3 could be costly I think.

2) If I use a deprecated method such as DataStream.writeAsText() is that
guaranteed to write *all* the records from the stream, as long as the job
does not fail?  I understand checkpointing will not be effective here.

Thanks,
David


Re: Question Regarding State Migrations in Ververica Platform

2022-08-31 Thread Rion Williams
+dev

> On Aug 30, 2022, at 11:20 AM, Rion Williams  wrote:
> 
> 
> Hi all,
> 
> I wasn't sure if this would be the best audience, if not, please advise if 
> you know of a better place to ask it. I figured that at least some folks here 
> either work for Ververica or might have used their platform.
> 
> tl;dr; I'm trying to migrate an existing stateful Flink job to run in 
> Ververica Platform (Community) and I'm noticing that it doesn't seem that all 
> of the state is being properly handed off (only _metadata).
> 
> I'm currently in the process of migrating an existing Flink job that is 
> running in Kubernetes on its own to run within the Ververica platform. The 
> issue here is that the job itself is stateful, so I want to ensure I can 
> migrate over that state so when the new job kicks off, it's a fairly seamless 
> transition.
> 
> Basically, what I've done up to this point is create a script as part of the 
> Ververica platform deployment that will:
> Check for the existence of any of the known jobs that have been migrated.
> If one is found, it will stop the job, taking a full savepoint, and store the 
> savepoint path within a configmap for that job used solely for migration 
> purposes.
> If one is not found, it will assume the job has been migrated.
> Create a Deployment for each of the new jobs, pointing to the appropriate 
> configuration, jars, etc.
> Check for the presence of one of the previous migration configmaps and issue 
> a request to create a savepoint for that deployment.
> This involves using the Ververica REST API to grab the appropriate deployment 
> information and issuing a request to the Savepoints endpoint of the same REST 
> API to "add" the savepoint.
> I've confirmed the above "works" and indeed stops any legacy jobs, creates 
> the resources (i.e. configmaps) used for the migration, starts up the new job 
> within Ververica and I can see evidence within the UI that a savepoint was 
> "COPIED" for that deployment.
> 
> However, when comparing (in GCS) the previous savepoint for the old job and 
> the one now managed by Ververica for the job, I notice that the new one only 
> contains a single _metadata file:
> 
> 
> 
> Whereas the previous contained a metadata file and another related data file:
> 
> 
> This leads me to believe that the new job might not know about any items 
> previously stored in state, which could be problematic.
> 
> When reviewing over the documentation for "manually adding a savepoint" for 
> Ververica Platform 2.6, I noticed that the payload to the Savepoints endpoint 
> looked like the following, which was what I used:
> metadata:
>   deploymentId: ${deploymentId}
>   annotations:
> com.dataartisans.appmanager.controller.deployment.spec.version: 
> ${deploymentSpecVersion}
>   type: ${type} (used FULL in my case)
> spec:
>   savepointLocation:  ${savepointLocation}
>   flinkSavepointId: ----
> status:
>   state: COMPLETED
> 
> The empty UUID was a bit concerning and I was curious if that might be the 
> reason my additional data files didn't come across from the savepoint as well 
> (I noticed in 2.7 this is an optional argument in the payload). I don't see 
> much more for any additional configuration that would otherwise specify to 
> pull everything including _metadata.
> 
> Any ideas or guidance would be helpful. 
> 
> Rion
> 
> 
> 
> 


flink on k8s的taskmanager为啥不是Replica Sets

2022-08-31 Thread Jiacheng Jiang
请问大伙:

Flink on k8s里面,为什么taskmanager是单独的pod,而不是做成Replica Sets 利用副本数来控制taskmanager的数量





从 Windows 版邮件发送



Re: get NoSuchMethodError when using flink flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar

2022-08-31 Thread yuxia
How do you use `flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar`? Do you use sql 
client ? Do you put it in FLINK_HOME/lib? 
If it's for sql client, I think you can remove the jar from FLINK_HOME/lib, but 
add it in Flink SQL client using `add jar 
'flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar' `, and set 
'org.apache.commons.' the to parent-first[1] 

But I think the better way is to relocate the class. 
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-default
 

Best regards, 
Yuxia 


发件人: "Liting Liu (litiliu)"  
收件人: "User"  
发送时间: 星期三, 2022年 8 月 31日 下午 5:14:35 
主题: get NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar 

Hi, i got NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. 
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue. 
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) 
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
Caused by: java.lang.NoSuchMethodError: 
org.apache.commons.lang3.StringUtils.join([IC)Ljava/lang/String; 
at 
org.apache.flink.table.planner.plan.utils.RankProcessStrategy$UpdateFastStrategy.toString(RankProcessStrategy.java:129)
 
at java.lang.String.valueOf(String.java:2994) 
at java.lang.StringBuilder.append(StringBuilder.java:136) 
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.explain(RelDescriptionWriterImpl.java:67)
 
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.done(RelDescriptionWriterImpl.java:96)
 
at org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:246) 
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription(FlinkRelNode.scala:50)
 
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription$(FlinkRelNode.scala:46)
 
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank.getRelDetailedDescription(StreamPhysicalRank.scala:41)
 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:701)
 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1(FlinkChangelogModeInferenceProgram.scala:738)
 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1$adapted(FlinkChangelogModeInferenceProgram.scala:730)
 
at scala.collection.Iterator.foreach(Iterator.scala:937) 
at scala.collection.Iterator.foreach$(Iterator.scala:937) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) 
at scala.collection.IterableLike.foreach(IterableLike.scala:70) 
at scala.collection.IterableLike.foreach$(IterableLike.scala:69) 
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitRankStrategies(FlinkChangelogModeInferenceProgram.scala:730)
 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:489)
 

Seems there is an embeded StringUtils in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. which confilict with other 
class. 

What should i do? 
Do I have to manually excude StringUtils.class in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar? 




Re: 【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-31 Thread Wu,Zhiheng
找不到TM的日志。因为TM还没有启动起来,pod就挂了
我看下是否是这个原因,目前确实没有增加-Dkubernetes.taskmanager.service-account这个参数
-Dkubernetes.taskmanager.service-account这个参数是在./bin/kubernetes-session.sh启动session集群的时候加的吗

在 2022/8/31 下午4:10,“Yang Wang” 写入:

我猜测你是因为没有给TM设置service account,导致TM没有权限从K8s ConfigMap拿到leader,从而注册到RM、JM

-Dkubernetes.taskmanager.service-account=wuzhiheng \


Best,
Yang

Xuyang  于2022年8月30日周二 23:22写道:

> Hi, 能贴一下TM的日志吗,看Warn的日志貌似是TM一直起不来
> 在 2022-08-30 03:45:43,"Wu,Zhiheng"  写道:
> >【问题描述】
> >启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务
> >
> >1. 任务配置和启动过程
> >
> >a)  修改conf/flink.yaml配置文件,增加HA配置
> >kubernetes.cluster-id: realtime-monitor
> >high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> >high-availability.storageDir:
> file:///opt/flink/checkpoint/recovery/monitor//
> 这是一个NFS路径,以pvc挂载到pod
> >
> >b)  先通过以下命令创建一个无状态部署,建立一个session集群
> >
> >./bin/kubernetes-session.sh \
> >
> 
>-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
> \
> >
> >-Dkubernetes.pod-template-file=./conf/pod-template.yaml \
> >
> >-Dkubernetes.cluster-id=realtime-monitor \
> >
> >-Dkubernetes.jobmanager.service-account=wuzhiheng \
> >
> >-Dkubernetes.namespace=monitor \
> >
> >-Dtaskmanager.numberOfTaskSlots=6 \
> >
> >-Dtaskmanager.memory.process.size=8192m \
> >
> >-Djobmanager.memory.process.size=2048m
> >
> >c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志
> >
> >2022-08-29 23:49:04,150 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-13 is created.
> >
> >2022-08-29 23:49:04,152 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-12 is created.
> >
> >2022-08-29 23:49:04,161 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-12
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-13
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-13 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:07,176 WARN
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Reaching max start worker failure rate: 12 events detected in the recent
> interval, reaching the threshold 10.00.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Will not retry creating worker in 3000 ms.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6} was requested in current attempt and
> has not registered. Current pending count after removing: 1.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod
> terminated, container termination statuses:
> [flink-main-container(exitCode=1, reason=Error, message=null)], pod 
status:
> Failed(reason=null, message=null)
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0,
> taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes,
> numSlots=6}, current pending count: 2.
> >
> 

get NoSuchMethodError when using flink flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar

2022-08-31 Thread Liting Liu (litiliu)
Hi, i got NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar.
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoSuchMethodError: 
org.apache.commons.lang3.StringUtils.join([IC)Ljava/lang/String;
at 
org.apache.flink.table.planner.plan.utils.RankProcessStrategy$UpdateFastStrategy.toString(RankProcessStrategy.java:129)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.explain(RelDescriptionWriterImpl.java:67)
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.done(RelDescriptionWriterImpl.java:96)
at 
org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:246)
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription(FlinkRelNode.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription$(FlinkRelNode.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank.getRelDetailedDescription(StreamPhysicalRank.scala:41)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:701)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1(FlinkChangelogModeInferenceProgram.scala:738)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1$adapted(FlinkChangelogModeInferenceProgram.scala:730)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitRankStrategies(FlinkChangelogModeInferenceProgram.scala:730)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:489)

Seems there is an embeded StringUtils in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. which confilict with other 
class.

What should i do?
Do I have to manually excude StringUtils.class in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar?



Re: 【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-31 Thread Yang Wang
我猜测你是因为没有给TM设置service account,导致TM没有权限从K8s ConfigMap拿到leader,从而注册到RM、JM

-Dkubernetes.taskmanager.service-account=wuzhiheng \


Best,
Yang

Xuyang  于2022年8月30日周二 23:22写道:

> Hi, 能贴一下TM的日志吗,看Warn的日志貌似是TM一直起不来
> 在 2022-08-30 03:45:43,"Wu,Zhiheng"  写道:
> >【问题描述】
> >启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务
> >
> >1. 任务配置和启动过程
> >
> >a)  修改conf/flink.yaml配置文件,增加HA配置
> >kubernetes.cluster-id: realtime-monitor
> >high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> >high-availability.storageDir:
> file:///opt/flink/checkpoint/recovery/monitor//
> 这是一个NFS路径,以pvc挂载到pod
> >
> >b)  先通过以下命令创建一个无状态部署,建立一个session集群
> >
> >./bin/kubernetes-session.sh \
> >
> >-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
> \
> >
> >-Dkubernetes.pod-template-file=./conf/pod-template.yaml \
> >
> >-Dkubernetes.cluster-id=realtime-monitor \
> >
> >-Dkubernetes.jobmanager.service-account=wuzhiheng \
> >
> >-Dkubernetes.namespace=monitor \
> >
> >-Dtaskmanager.numberOfTaskSlots=6 \
> >
> >-Dtaskmanager.memory.process.size=8192m \
> >
> >-Djobmanager.memory.process.size=2048m
> >
> >c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志
> >
> >2022-08-29 23:49:04,150 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-13 is created.
> >
> >2022-08-29 23:49:04,152 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-12 is created.
> >
> >2022-08-29 23:49:04,161 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-12
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-13
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-13 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:07,176 WARN
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Reaching max start worker failure rate: 12 events detected in the recent
> interval, reaching the threshold 10.00.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Will not retry creating worker in 3000 ms.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6} was requested in current attempt and
> has not registered. Current pending count after removing: 1.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod
> terminated, container termination statuses:
> [flink-main-container(exitCode=1, reason=Error, message=null)], pod status:
> Failed(reason=null, message=null)
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0,
> taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes,
> numSlots=6}, current pending count: 2.
> >
> >2022-08-29 23:49:07,514 WARN
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Reaching max start worker failure rate: 13 events detected in the recent
> interval, reaching the threshold 10.00.
> >
> >2022-08-29 23:49:07,514 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-13 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6} was requested in current attempt and
> has not registered.