Re: yarn ship from s3

2021-05-26 Thread Vijayendra Yadav
Thank You Xintong, I will look for these updates in the near future.

Regards,
Vijay

On Wed, May 26, 2021 at 6:40 PM Xintong Song  wrote:

> Hi Vijay,
>
> Currently, Flink only supports shipping files from the local machine where
> job is submitted.
>
> There are tickets [1][2][3] tracking the efforts that shipping files from
> remote paths, e.g., http, hdfs, etc. Once the efforts are done, adding s3
> as an additional supported schema should be straightforward.
>
> Unfortunately, these efforts are still in progress, and are more or less
> staled recently.
>
> Thank you~
>
> Xintong Song
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20681
> [2] https://issues.apache.org/jira/browse/FLINK-20811
> [3] https://issues.apache.org/jira/browse/FLINK-20867
>
> On Thu, May 27, 2021 at 12:23 AM Vijayendra Yadav 
> wrote:
>
>> Hi Pohl,
>>
>> I tried to ship my property file. Example: *-yarn.ship-files
>> s3://applib/xx/xx/1.0-SNAPSHOT/application.properties  \*
>>
>>
>> *Error:*
>>
>> 6:21:37.163 [main] ERROR org.apache.flink.client.cli.CliFrontend -
>> Invalid command line arguments.
>> org.apache.flink.client.cli.CliArgsException: Could not build the program
>> from JAR file: JAR file does not exist: -yarn.ship-files
>> at
>> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:244)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:223)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at java.security.AccessController.doPrivileged(Native Method)
>> ~[?:1.8.0_292]
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> [?:1.8.0_292]
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
>> [hadoop-common-2.10.0-amzn-0.jar:?]
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> Caused by: java.io.FileNotFoundException: JAR file does not exist:
>> -yarn.ship-files
>> at
>> org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:740)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:717)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:242)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> ... 8 more
>> Could not build the program from JAR file: JAR file does not exist:
>> -yarn.ship-files
>>
>>
>> *Thanks,*
>>
>> *Vijay*
>>
>> On Tue, May 25, 2021 at 11:58 PM Matthias Pohl 
>> wrote:
>>
>>> Hi Vijay,
>>> have you tried yarn-ship-files [1] or yarn-ship-archives [2]? Maybe,
>>> that's what you're looking for...
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-files
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-archives
>>>
>>> On Tue, May 25, 2021 at 5:56 PM Vijayendra Yadav 
>>> wrote:
>>>
 Hi Piotr,

 I have been doing the same process as you mentioned so far, now I am
 migrating the deployment process using AWS CDK and AWS Step Functions, kind
 of like the CICD process.
 I added a download step of jar and configs (1, 2, 3 and 4) from S3
 using command-runner.jar (AWS Step); it loaded that into one of the Master
 nodes (out of 3). In the next step when I launched Flink Job it would not
 find build because Job is launched in some other yarn node.

 I was hoping just like *Apache spark *where whatever files we provide
 in *--file*s are shipped to yarn (s3 to yarn workfirectory), Flink
 should also have a solution.

 Thanks,
 Vijay


 On Tue, May 25, 2021 at 12:50 AM Piotr Nowojski 
 wrote:

> Hi Vijay,
>
> I'm not sure if I understand your question correctly. You have jar and
> configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
> those? Can you simply download those things (whole directory containing
> those) to the machine that will be starting the Flink job?
>
> Best, Piotrek
>
> wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
> napisał(a):
>
>> Hi Team,
>>
>> I am trying to find a way to ship files from aws s3 for a flink
>> streaming job, I am running on AWS EMR. What i need to ship are 
>> following:
>> 1) application jar
>> 2) application property file

Re: Time needed to read from Kafka source

2021-05-26 Thread B.B.
Hi,
I forgot to mention that we are using Flink 1.12.0. This is a job that has
only minimum components. Reading from source and printing it.
Profiling was my next step to do. Regarding memory I didn't see any
bottlenecks.
I guess I will have to do some investigating in the metric part of Flink.

BR,
BB

On Tue, 25 May 2021 at 17:12, Piotr Nowojski  wrote:

> Hi,
>
> That's a throughput of 700 records/second, which should be well below
> theoretical limits of any deserializer (from hundreds thousands up to tens
> of millions records/second/per single operator), unless your records are
> huge or very complex.
>
> Long story short, I don't know of a magic bullet to help you solve your
> problem. As always you have two options, either optimise/speed up your
> code/job, or scale up.
>
> If you choose the former, think about Flink as just another Java
> application. Check metrics and resource usage, and understand what resource
> is the problem (cpu? memory? machine is swapping? io?). You might be able
> to guess what's your bottleneck (reading from kafka? deserialisation?
> something else? Flink itself?) by looking at some of the metrics
> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
> you can also simplify your job to bare minimum and test performance of
> independent components. Also you can always attach a code profiler and
> simply look at what's happening. First identify what's the source of the
> bottleneck and then try to understand what's causing it.
>
> Best,
> Piotrek
>
> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
> in the job graph based on busy/back pressured status and Flamegraph
> support)
>
> wt., 25 maj 2021 o 15:44 B.B.  napisał(a):
>
>> Hi,
>>
>> I am in the process of optimizing my job which at the moment by our
>> thinking is too slow.
>>
>> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1
>> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
>> parallelism of two).
>>
>> The main problem is one kafka source that has 3,8 million events that we
>> have to process.
>> As a test we made a simple job that connects to kafka using a custom
>> implementation of KafkaDeserializationSchema. There we are using
>> ObjectMapper that mapps input values eg.
>>
>> *var event = objectMapper.readValue(consumerRecord.value(),
>> MyClass.class);*
>>
>> This is then validated with hibernate validator and output of this
>> source is printed on the console.
>>
>> The time needed for the job to consume all the events was one and a half
>> hours, which seems a bit long.
>> Is there a way we can speed up this process?
>>
>> Is more cpu cores or memory solution?
>> Should we switch to avro deserialization schema?
>>
>>
>>
>> --
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane


Re: Time needed to read from Kafka source

2021-05-26 Thread B.B.
Hi,
I forgot to mention that we are running Flink 1.12.0.

This is the main function (some parts of codes are abbreviated and this is
the main part). As you can see the job was simplified to minimum. Just
reading from source and printing.


[image: Screenshot 2021-05-26 at 08.05.53.png]


And this is deserializer:

[image: Screenshot 2021-05-26 at 07.49.17.png]

BR,

BB


On Tue, 25 May 2021 at 17:51, Arvid Heise  wrote:

> Could you share your KafkaDeserializationSchema, we might be able to spot
> some optimization potential. You could also try out enableObjectReuse [1],
> which avoids copying data between tasks (not sure if you have any
> non-chained tasks).
>
> If you are on 1.13, you could check out the flamegraph to see where the
> bottleneck occurs. [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>
> On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> That's a throughput of 700 records/second, which should be well below
>> theoretical limits of any deserializer (from hundreds thousands up to tens
>> of millions records/second/per single operator), unless your records are
>> huge or very complex.
>>
>> Long story short, I don't know of a magic bullet to help you solve your
>> problem. As always you have two options, either optimise/speed up your
>> code/job, or scale up.
>>
>> If you choose the former, think about Flink as just another Java
>> application. Check metrics and resource usage, and understand what resource
>> is the problem (cpu? memory? machine is swapping? io?). You might be able
>> to guess what's your bottleneck (reading from kafka? deserialisation?
>> something else? Flink itself?) by looking at some of the metrics
>> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
>> you can also simplify your job to bare minimum and test performance of
>> independent components. Also you can always attach a code profiler and
>> simply look at what's happening. First identify what's the source of the
>> bottleneck and then try to understand what's causing it.
>>
>> Best,
>> Piotrek
>>
>> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
>> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
>> in the job graph based on busy/back pressured status and Flamegraph
>> support)
>>
>> wt., 25 maj 2021 o 15:44 B.B.  napisał(a):
>>
>>> Hi,
>>>
>>> I am in the process of optimizing my job which at the moment by our
>>> thinking is too slow.
>>>
>>> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1
>>> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
>>> parallelism of two).
>>>
>>> The main problem is one kafka source that has 3,8 million events that we
>>> have to process.
>>> As a test we made a simple job that connects to kafka using a custom
>>> implementation of KafkaDeserializationSchema. There we are using
>>> ObjectMapper that mapps input values eg.
>>>
>>> *var event = objectMapper.readValue(consumerRecord.value(),
>>> MyClass.class);*
>>>
>>> This is then validated with hibernate validator and output of this
>>> source is printed on the console.
>>>
>>> The time needed for the job to consume all the events was one and a half
>>> hours, which seems a bit long.
>>> Is there a way we can speed up this process?
>>>
>>> Is more cpu cores or memory solution?
>>> Should we switch to avro deserialization schema?
>>>
>>>
>>>
>>> --
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane


multiple streams joining

2021-05-26 Thread Lian Jiang
Hi,

Imagine I have one class having 4 fields: ID, A, B, C.  There are three
data sources providing data in the form of (ID, A), (ID, B), (ID, C)
respectively. I want to join these three data sources to get final (ID, A,
B, C) without any window. For example, (ID, A) could come one month after
(ID, B). Such joining needs global states. There are two designs in my mind.

1. Stream connect with separated kafka topic
streamA_B = DataSourceA connect DataSourceB
streamA_B_C = streamA_B connect DataSourceC

Each data source is ingested via a dedicated kafka topic. This design seems
not scalable because I need N stream connect operations for N+1 data
sources. Each stream connect needs to maintain a global state. For example,
streamA_B needs a global state for maintaining (ID, A, B) and streamA_B_C
needs another for maintaining (ID, A, B, C).

2. Shared kafka topic
All data sources are ingested via a shared kafka topic (using union event
type or schema reference). Then one Flink job can handle all events from
these data sources by maintaining one global state. This design seems more
scalable than solution 1.

Which one is recommended? Is there a better way that is missed? Appreciate
very much for any hints!


Re: 集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?

2021-05-26 Thread LakeShen
Hi,
 集群重启,具体是指什么重启呢,这个能在描述详细一点吗?

Best,
LakeShen

datayangl  于2021年5月26日周三 上午9:43写道:

> FixedDelaStrategy 默认是从最近一个ck
> 恢复,其他的策略可以看官网。如果你是想问怎么实现的,不建议在邮件列表里问实现原理的问题。可以google找相关文章、相关flip 或者
> 直接debug源码。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于savepoint恢复问题咨询

2021-05-26 Thread LakeShen
看下你的 flink 命令对不对,然后去 Flink Web UI  Checkpoint 界面,看下是否从 Savepoint 恢复(下面有个
restore path).
之后再看下你的窗口时间类型用的是什么。

Best,
LakeShen

王春浩  于2021年5月27日周四 上午9:26写道:

> hi, 社区
> ​
> 版本flink 1.7
> ​
>
> 我正在尝试从保存点(或检查点)还原flink作业,该作业的工作是从kafka读取->执行30分钟的窗口聚合(只是AggregationFunction,就像一个计数器)->下沉到kafka。
> ​
> 我使用rocksdb和启用检查点。
> ​
> 现在我尝试手动触发一个保存点。 每个汇总的期望值是30(1个数据/每分钟)。 但是,当我从保存点还原时(flink运行-d -s
> {savepoint的url}),聚合值不是30(小于30,取决于我取消flink作业并还原的时间)。 但是当作业正常运行时,它将达到30。
> ​
> 我不知道为什么有些数据似乎会丢失?
> ​
> 日志显示``No restore state for FlinkKafkaConsumer''
> ​
> ​​
> ​
> 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
> 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd
> Street, GaoXin District, Chengdu, Sichuan Province
> Mobile +86 15817382279
> Email  wangchun...@navercorp.com
>
> NCloud
>
> -Original Message-
> From: "王春浩"
> To: ;
> Cc:
> Sent: 2021/5/26周三 17:03 (GMT+08:00)
> Subject: inquire about restore from savepoint
>
> Hi Community,
> ​
> version flink 1.7
> im trying to make a flink job restore from a savepoint(or checkpoint),
> what the job do is reading from kafka -> do a 30-minutes-window
> aggregation(just AggregationFunction, acts like a counter) -> sink to kafka.
> i use rocksdb and enabled checkpoint.
> now i try to trigger a savepoint manually. the expected value of each
> aggregated one is 30(1 data/per minute). but when i restore from a
> savepoint(flink run -d -s {savepoint's url}), the aggregated value is not
> 30(less than 30, depends on the time i cancel flink job and restore). but
> when the job run normally, it gets 30.
> i don't know why could some data seems to be lost?
> and a log shows "No restore state for FlinkKafkaConsumer"​
> ​
> ​
> ​
> 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
> 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd
> Street, GaoXin District, Chengdu, Sichuan Province
> Mobile +86 15817382279
> Email  wangchun...@navercorp.com
>
> NCloud
>


Re: How can I use different user run flink

2021-05-26 Thread Jake
Hi igyu:

You can submit job use these arguements like this

```
-m yarn-cluster \
-yqu root.realtime \
-ynm “test" \
-yjm 2g \
-ytm 2g \
-n \
-d \
-ys 1 \
-yD security.kerberos.login.principal=xxx...@x.com \
-yD security.kerberos.login.keytab=/tmp/xx.keytab \
...
```



> On May 27, 2021, at 08:34, igyu  wrote:
> 
> I use CDH 6.3.2
> flink-1.12.3
> 
> I enabel kerberos
> 
> I want to use different user with different keytab,because I creat many queue 
> in YARN , different user use different queue. 
> 
> 
> igyu



Re: yarn ship from s3

2021-05-26 Thread Xintong Song
Hi Vijay,

Currently, Flink only supports shipping files from the local machine where
job is submitted.

There are tickets [1][2][3] tracking the efforts that shipping files from
remote paths, e.g., http, hdfs, etc. Once the efforts are done, adding s3
as an additional supported schema should be straightforward.

Unfortunately, these efforts are still in progress, and are more or less
staled recently.

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-20681
[2] https://issues.apache.org/jira/browse/FLINK-20811
[3] https://issues.apache.org/jira/browse/FLINK-20867

On Thu, May 27, 2021 at 12:23 AM Vijayendra Yadav 
wrote:

> Hi Pohl,
>
> I tried to ship my property file. Example: *-yarn.ship-files
> s3://applib/xx/xx/1.0-SNAPSHOT/application.properties  \*
>
>
> *Error:*
>
> 6:21:37.163 [main] ERROR org.apache.flink.client.cli.CliFrontend - Invalid
> command line arguments.
> org.apache.flink.client.cli.CliArgsException: Could not build the program
> from JAR file: JAR file does not exist: -yarn.ship-files
> at
> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:244)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:223)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_292]
> at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_292]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
> [hadoop-common-2.10.0-amzn-0.jar:?]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> Caused by: java.io.FileNotFoundException: JAR file does not exist:
> -yarn.ship-files
> at
> org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:740)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:717)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:242)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> ... 8 more
> Could not build the program from JAR file: JAR file does not exist:
> -yarn.ship-files
>
>
> *Thanks,*
>
> *Vijay*
>
> On Tue, May 25, 2021 at 11:58 PM Matthias Pohl 
> wrote:
>
>> Hi Vijay,
>> have you tried yarn-ship-files [1] or yarn-ship-archives [2]? Maybe,
>> that's what you're looking for...
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-files
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-archives
>>
>> On Tue, May 25, 2021 at 5:56 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Piotr,
>>>
>>> I have been doing the same process as you mentioned so far, now I am
>>> migrating the deployment process using AWS CDK and AWS Step Functions, kind
>>> of like the CICD process.
>>> I added a download step of jar and configs (1, 2, 3 and 4) from S3 using
>>> command-runner.jar (AWS Step); it loaded that into one of the Master nodes
>>> (out of 3). In the next step when I launched Flink Job it would not find
>>> build because Job is launched in some other yarn node.
>>>
>>> I was hoping just like *Apache spark *where whatever files we provide
>>> in *--file*s are shipped to yarn (s3 to yarn workfirectory), Flink
>>> should also have a solution.
>>>
>>> Thanks,
>>> Vijay
>>>
>>>
>>> On Tue, May 25, 2021 at 12:50 AM Piotr Nowojski 
>>> wrote:
>>>
 Hi Vijay,

 I'm not sure if I understand your question correctly. You have jar and
 configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
 those? Can you simply download those things (whole directory containing
 those) to the machine that will be starting the Flink job?

 Best, Piotrek

 wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
 napisał(a):

> Hi Team,
>
> I am trying to find a way to ship files from aws s3 for a flink
> streaming job, I am running on AWS EMR. What i need to ship are following:
> 1) application jar
> 2) application property file
> 3) custom flink-conf.yaml
> 4) log4j application specific
>
> Please let me know options.
>
> Thanks,
> Vijay




关于兰州哪里有开机械设备发票-兰州本地宝

2021-05-26 Thread luoshaoj
关于兰州哪里有开机械设备发票〖⒈⒌⒍一徴一⒎⒎⒍⒈一電一⒍⒍⒊⒈〗罗生-100%-项目齐全【餐饮】〖住宿〗「建筑」{手撕}《定额》〈运输〉〔材料〕<钢材>〔机械〕『咨询』【广告】{服务}【租赁】《设计》【培训】『劳务本书是为教师写的“教育常识”。本书所讲的教育常识,首先是我自己眼中的教育常识,它绝非教育常识的全部。我期待通过对教育常识的追问,唤醒对常识的尊重和敬畏。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

关于北京哪里有开化工原料发票-北京产品吧

2021-05-26 Thread luoshaoj
关于北京哪里有开化工原料发票〖⒈⒌⒍一徴一⒎⒎⒍⒈一電一⒍⒍⒊⒈〗罗生-100%-项目齐全【餐饮】〖住宿〗「建筑」{手撕}《定额》〈运输〉〔材料〕<钢材>〔机械〕『咨询』【广告】{服务}【租赁】《设计》【培训】『劳务本书是为教师写的“教育常识”。本书所讲的教育常识,首先是我自己眼中的教育常识,它绝非教育常识的全部。我期待通过对教育常识的追问,唤醒对常识的尊重和敬畏。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

关于savepoint恢复问题咨询

2021-05-26 Thread 王春浩
hi, 社区
​
版本flink 1.7
​
我正在尝试从保存点(或检查点)还原flink作业,该作业的工作是从kafka读取->执行30分钟的窗口聚合(只是AggregationFunction,就像一个计数器)->下沉到kafka。
​
我使用rocksdb和启用检查点。
​
现在我尝试手动触发一个保存点。 每个汇总的期望值是30(1个数据/每分钟)。 但是,当我从保存点还原时(flink运行-d -s 
{savepoint的url}),聚合值不是30(小于30,取决于我取消flink作业并还原的时间)。 但是当作业正常运行时,它将达到30。
​
我不知道为什么有些数据似乎会丢失?
​
日志显示``No restore state for FlinkKafkaConsumer''
​
​​
​
四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd 
Street, GaoXin District, Chengdu, Sichuan Province
Mobile +86 15817382279
Email  wangchun...@navercorp.com  

NCloud

-Original Message-
From: "王春浩"
To: ;
Cc:
Sent: 2021/5/26周三 17:03 (GMT+08:00)
Subject: inquire about restore from savepoint
 
Hi Community,
​
version flink 1.7
im trying to make a flink job restore from a savepoint(or checkpoint), what the 
job do is reading from kafka -> do a 30-minutes-window aggregation(just 
AggregationFunction, acts like a counter) -> sink to kafka.
i use rocksdb and enabled checkpoint.
now i try to trigger a savepoint manually. the expected value of each 
aggregated one is 30(1 data/per minute). but when i restore from a 
savepoint(flink run -d -s {savepoint's url}), the aggregated value is not 
30(less than 30, depends on the time i cancel flink job and restore). but when 
the job run normally, it gets 30.
i don't know why could some data seems to be lost?
and a log shows "No restore state for FlinkKafkaConsumer"​
​
​
​
四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd 
Street, GaoXin District, Chengdu, Sichuan Province
Mobile +86 15817382279
Email  wangchun...@navercorp.com  

NCloud


How can I use different user run flink

2021-05-26 Thread igyu
I use CDH 6.3.2
flink-1.12.3

I enabel kerberos

I want to use different user with different keytab,because I creat many queue 
in YARN , different user use different queue. 




igyu


Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

2021-05-26 Thread Jerome Li
Hi Yang,

Thanks for getting back to me.

By “restart master node”, I mean do “kubctl get nodes” to find the node’s role 
as master and “ssh” into one of master nodes as ubuntu user. Then run “sudo 
/sbin/reboot -f” to restart the master node.

It looks like The JobManager would cancel the running job and log this after 
that.

2021-05-26 18:28:37,997 [INFO] 
org.apache.flink.runtime.executiongraph.ExecutionGraph   - Discarding the 
results produced by task execution 34eb9f5009dc7cf07117e720e7d393de.

2021-05-26 18:28:37,999 [INFO] 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore - Suspending

2021-05-26 18:28:37,999 [INFO] 
org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter - 
Shutting down.

2021-05-26 18:28:38,000 [INFO] 
org.apache.flink.runtime.executiongraph.ExecutionGraph   - Job 
74fc5c858e50f5efc91db9ee16c17a8c has been suspended.

2021-05-26 18:28:38,007 [INFO] 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending 
SlotPool.

2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.JobMaster 
- Close ResourceManager connection 
5bac86fb0b5c984ef429225b8de82cc0: JobManager is no longer the leader..

2021-05-26 18:28:38,019 [INFO] 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl  - JobManager 
runner for job hogger (74fc5c858e50f5efc91db9ee16c17a8c) was granted leadership 
with session id 14b9004a-3807-42e8-ac03-c0d77efe5611 at 
akka.tcp://flink@hoggerflink-jobmanager:6123/user/rpc/jobmanager_2.

2021-05-26 18:28:38,292 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,292 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,292 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage 
until processing is started.

2021-05-26 18:28:38,295 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage 
until processing is started.

2021-05-26 18:28:38,295 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage 
until processing is started.

2021-05-26 18:28:38,295 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message 

Re: KafkaSource metrics

2021-05-26 Thread Alexey Trenikhun
Found https://issues.apache.org/jira/browse/FLINK-22766


From: Alexey Trenikhun 
Sent: Tuesday, May 25, 2021 3:25 PM
To: Ardhani Narasimha ; 陳樺威 
; Flink User Mail List 
Subject: Re: KafkaSource metrics

Looks like when KafkaSource is used instead of FlinkKafkaConsumer, metrics 
listed below are not available. Bug? Work in progress?


Thanks,
Alexey

From: Ardhani Narasimha 
Sent: Monday, May 24, 2021 9:08 AM
To: 陳樺威 
Cc: user 
Subject: Re: KafkaSource metrics

Use below respectively

flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate - 
Consumer rate
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max - Consumer lag
flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max - commit 
latency

unsure if reactive mode makes any difference.
On Mon, May 24, 2021 at 7:44 PM 陳樺威 
mailto:oscar8492...@gmail.com>> wrote:
Hello,

Our team tries to test reactive mode and replace FlinkKafkaConsumer with the 
new KafkaSource.
But we can’t find the KafkaSource metrics list. Does anyone have any idea? In 
our case, we want to know the Kafka consume delay and consume rate.

Thanks,
Oscar

---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---


Error restarting job from Savepoint

2021-05-26 Thread Yashwant Ganti
Hello,

We are facing an error restarting a job from a savepoint. We believe it is
because one of the common classes used across all of our jobs was changed
but there was no *serialVersionUID* assigned to the class. There error we
are facing is

java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:254)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:272)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:425)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:535)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:525)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_
> (2/4) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:160)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:345)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:163)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:115)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:559)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:101)
> at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:181)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:328)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
> ... 11 more
> Caused by: java.io.InvalidClassException: com..**.***; local
> class incompatible: stream classdesc serialVersionUID = -
> 7317586767482317266, local class serialVersionUID = -8797204481428423223
> at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
> at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source
> )
> at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown
> Source)
> at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown
> Source)
> at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
> at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown
> Source)
> at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
> at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
> at org.apache.beam.sdk.coders.SerializableCoder.decode(
> SerializableCoder.java:194)
> at org.apache.beam.sdk.coders.SerializableCoder.decode(
> SerializableCoder.java:54)
> at org.apache.beam.sdk.io.
> Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(
> Read.java:669)
> at org.apache.beam.sdk.io.
> Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(
> Read.java:642)
> at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
> .deserialize(CoderTypeSerializer.java:118)
> at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at org.apache.flink.runtime.state.
> KeyGroupPartitioner$PartitioningResultKeyGroupReader
> .readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
> at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> 

Re: yarn ship from s3

2021-05-26 Thread Vijayendra Yadav
Hi Pohl,

I tried to ship my property file. Example: *-yarn.ship-files
s3://applib/xx/xx/1.0-SNAPSHOT/application.properties  \*


*Error:*

6:21:37.163 [main] ERROR org.apache.flink.client.cli.CliFrontend - Invalid
command line arguments.
org.apache.flink.client.cli.CliArgsException: Could not build the program
from JAR file: JAR file does not exist: -yarn.ship-files
at
org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:244)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:223)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_292]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_292]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
[hadoop-common-2.10.0-amzn-0.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist:
-yarn.ship-files
at
org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:740)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:717)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:242)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
... 8 more
Could not build the program from JAR file: JAR file does not exist:
-yarn.ship-files


*Thanks,*

*Vijay*

On Tue, May 25, 2021 at 11:58 PM Matthias Pohl 
wrote:

> Hi Vijay,
> have you tried yarn-ship-files [1] or yarn-ship-archives [2]? Maybe,
> that's what you're looking for...
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-files
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-archives
>
> On Tue, May 25, 2021 at 5:56 PM Vijayendra Yadav 
> wrote:
>
>> Hi Piotr,
>>
>> I have been doing the same process as you mentioned so far, now I am
>> migrating the deployment process using AWS CDK and AWS Step Functions, kind
>> of like the CICD process.
>> I added a download step of jar and configs (1, 2, 3 and 4) from S3 using
>> command-runner.jar (AWS Step); it loaded that into one of the Master nodes
>> (out of 3). In the next step when I launched Flink Job it would not find
>> build because Job is launched in some other yarn node.
>>
>> I was hoping just like *Apache spark *where whatever files we provide in
>> *--file*s are shipped to yarn (s3 to yarn workfirectory), Flink should
>> also have a solution.
>>
>> Thanks,
>> Vijay
>>
>>
>> On Tue, May 25, 2021 at 12:50 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> I'm not sure if I understand your question correctly. You have jar and
>>> configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
>>> those? Can you simply download those things (whole directory containing
>>> those) to the machine that will be starting the Flink job?
>>>
>>> Best, Piotrek
>>>
>>> wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
>>> napisał(a):
>>>
 Hi Team,

 I am trying to find a way to ship files from aws s3 for a flink
 streaming job, I am running on AWS EMR. What i need to ship are following:
 1) application jar
 2) application property file
 3) custom flink-conf.yaml
 4) log4j application specific

 Please let me know options.

 Thanks,
 Vijay
>>>
>>>


Re: Customer operator in BATCH execution mode

2021-05-26 Thread 陳昌倬
On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote:
> Hi,
> 
> No there is no API in the operator to know which mode it works in. We
> aim to have separate operators for both modes if required. You can check
> e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].

Thanks for the information. We implement this according to Piotrek's
suggestion.

> 
> Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
> you want to apply a transformation at the end of each key. You could
> also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.

According to [0], timer time is irrelevant since timer will be triggered
at the end of time right? If that is the case, we can use the same code
for both streaming and batch mode.

[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/


> 
> A side note, I don't fully get what you mean by "build state for our
> streaming application". Bear in mind though you cannot take a savepoint
> from a job running in the BATCH execution mode. Moreover it uses a
> different kind of StateBackend. Actually a dummy one, which just
> imitates a real state backend.

What we plan to do here is:

1. Load configuration from broadcast event (custom source backed by REST
   API).
2. Load historical events as batch mode input (From GCS).
3. Use timer to trigger output so that the following will happen:
   a. Serialize keyed states into JSON.
   b. Output to Kafka.
   c. Streaming application consumes data from Kafka, and update its
  keyed states according to it.

We hope that in this way, we can rebuild our states with almost the same
code in streaming.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Query related to Minimum scrape interval for Prometheus and fetching metrics of all vertices in a job through Flink Rest API

2021-05-26 Thread Ashutosh Uttam
Hi team,

I have two queries as mentioned below:

*Query1:*
I am using PrometheusReporter to expose metrics to Prometheus Server.
What should be the minimum recommended scrape interval to be defined on
Prometheus server?
Is there any interval in which Flink reports metrics?

*Query2:*
Is there any way I can fetch the metrics of all vertices (including
subtasks) of a job through a single Monitoring Rest API of Flink.

As of now what I have tried is first finding the vertices and then querying
individual vertex for metrics as below:

*Step 1:* Finding jobId (http://:/jobs)
*Step 2:* Finding vertices Id (http://:/jobs/)
*Step 3:* Finding aggregated metrics (including parallelism) of a vertex
(http://:/jobs//vertices//subtasks/metrics?get=,)


So like wise I have to invoke multiple rest apis for each vertex id . Is
there any optimised way to get metrics of all vertices?


Thanks & Regards,
Ashutosh


退订

2021-05-26 Thread swsgoog


Re:退订

2021-05-26 Thread Roc Marshal
张斌,你好:如需退订,请回复信息到 user-zh-unsubscr...@flink.apache.org 
后,根据提示完成后续流程,即可退订。祝好。Best, flinker.
在 2021-05-26 17:05:59,"张斌"  写道:
>
>
>退订
>| |
>张斌
>|
>|
>herobin1...@163.com
>|
>签名由网易邮箱大师定制
>


Re: Managing Jobs entirely with Flink Monitoring API

2021-05-26 Thread Piotr Nowojski
Glad to hear it!  Thanks for confirming that it works.

Piotrek

śr., 26 maj 2021 o 12:59 Barak Ben Nathan 
napisał(a):

>
>
> Hi Piotrek,
>
>
>
> This is exactly what I was searching for. Thanks!
>
>
>
> Barak
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Wednesday, May 26, 2021 9:59 AM
> *To:* Barak Ben Nathan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Managing Jobs entirely with Flink Monitoring API
>
>
>
> *CAUTION*: external source
>
> Hi Barak,
>
>
>
> Before starting the JobManager I don't think there is any API running at
> all. If you want to be able to submit/stop multiple jobs to the same
> cluster session mode is indeed the way to go. But first you need to to
> start the cluster ( start-cluster.sh ) [1]
>
>
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/
> 
>
>
>
> wt., 25 maj 2021 o 14:10 Barak Ben Nathan 
> napisał(a):
>
>
>
> I want to manage the execution of Flink Jobs programmatically through
> Flink Monitoring API.
>
>
>
> I.e. I want to run/delete jobs ONLY with the
>  POST /jars/:jarid/run
>  POST /jobs/:jobid/stop
> API commands.
>
>
>
> Now, it seems that the Session Mode may fits my needs: “Session Mode: one
> JobManager instance manages multiple jobs sharing the same cluster of
> TaskManagers” (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/
> 
> )
>
> However, I couldn’t find a way to start the API server (i.e. a JobManager)
> that didn’t already include submitting a JAR file for a job execution.
>
> Any suggestions?
>
> Do not click on links or open attachments unless you recognize the sender.
> Please use the report button if you believe this email is suspicious.
>


Re: Customer operator in BATCH execution mode

2021-05-26 Thread Dawid Wysakowicz
Hi,

No there is no API in the operator to know which mode it works in. We
aim to have separate operators for both modes if required. You can check
e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].

Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
you want to apply a transformation at the end of each key. You could
also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.

A side note, I don't fully get what you mean by "build state for our
streaming application". Bear in mind though you cannot take a savepoint
from a job running in the BATCH execution mode. Moreover it uses a
different kind of StateBackend. Actually a dummy one, which just
imitates a real state backend.

Best,

Dawid


[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/KeyedBroadcastStateTransformationTranslator.java

On 25/05/2021 17:04, ChangZhuo Chen (陳昌倬) wrote:
> Hi,
>
> Currently, we want to use batch execution mode [0] and historical data
> to build state for our streaming application. Due to different between
> batch & streaming mode, we want to check current execution mode in
> custom operator. So our question is:
>
>
> * Is there any API for custom operator to know current execution mode
>   (batch or streaming)?
>
> * If we want to output after all elements of one specific key are
>   processed, can we just use timer since timer is triggered at the end
>   of input [0]?
>
>
> [0] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
>



OpenPGP_signature
Description: OpenPGP digital signature


Re: Job recovery issues with state restoration

2021-05-26 Thread Peter Westermann
/mnt/data is a local disk, so there shouldn’t be any additional latency. I’ll 
provide more information when/if this happens again.

Peter

From: Roman Khachatryan 
Date: Tuesday, May 25, 2021 at 6:54 PM
To: Peter Westermann 
Cc: user@flink.apache.org 
Subject: Re: Job recovery issues with state restoration
> I am not able to consistently reproduce this issue. It seems to only occur 
> when the failover happens at the wrong time. I have disabled task local 
> recovery and will report back if we see this again.

Thanks, please post any results here.

> The SST files are not the ones for task local recovery, those would be in a 
> different directory (we have configured io.tmp.dirs as /mnt/data/tmp).

Those files on /mnt could still be checked against the ones in
checkpoint directories (on S3/DFS), the size should match.

I'm also curious why do you place local recovery files on a remote FS?
(I assume /mnt/data/tmp is a remote FS or a persistent volume).
Currently, if a TM is lost (e.g. process dies) then those files can
not be used - and recovery will fallback to S3/DFS. So this probably
incurs some IO/latency unnecessarily.

Regards,
Roman

On Tue, May 25, 2021 at 2:16 PM Peter Westermann
 wrote:
>
> Hi Roman,
>
>
>
> I am not able to consistently reproduce this issue. It seems to only occur 
> when the failover happens at the wrong time. I have disabled task local 
> recovery and will report back if we see this again. We need incremental 
> checkpoints for our workload.
>
> The SST files are not the ones for task local recovery, those would be in a 
> different directory (we have configured io.tmp.dirs as /mnt/data/tmp).
>
>
>
> Thanks,
>
> Peter
>
>
>
>
>
> From: Roman Khachatryan 
> Date: Thursday, May 20, 2021 at 4:54 PM
> To: Peter Westermann 
> Cc: user@flink.apache.org 
> Subject: Re: Job recovery issues with state restoration
>
> Hi Peter,
>
> Do you experience this issue if running without local recovery or
> incremental checkpoints enabled?
> Or have you maybe compared local (on TM) and  remove (on DFS) SST files?
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 5:54 PM Peter Westermann
>  wrote:
> >
> > Hello,
> >
> >
> >
> > I’ve reported issues around checkpoint recovery in case of a job failure 
> > due to zookeeper connection loss in the past. I am still seeing issues 
> > occasionally.
> >
> > This is for Flink 1.12.3 with zookeeper for HA, S3 as the state backend, 
> > incremental checkpoints, and task-local recovery enabled.
> >
> >
> >
> > Here’s what happened: A zookeeper instance was terminated as part of a 
> > deployment for our zookeeper service, this caused a new jobmanager leader 
> > election (so far so good). A leader was elected and the job was restarted 
> > from the latest checkpoint but never became healthy. The root exception and 
> > the logs show issues reading state:
> >
> > o.r.RocksDBException: Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003579.sst.
> >  Size recorded in manifest 36718, actual size 2570\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003573.sst.
> >  Size recorded in manifest 13756, actual size 1307\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003575.sst.
> >  Size recorded in manifest 16278, actual size 1138\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003576.sst.
> >  Size recorded in manifest 23108, actual size 1267\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003577.sst.
> >  Size recorded in manifest 148089, actual size 1293\
> > \
> > \\tat org.rocksdb.RocksDB.open(RocksDB.java)\
> > \\tat org.rocksdb.RocksDB.open(RocksDB.java:286)\
> > \\tat 
> > o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80)\
> > \\t... 22 common frames omitted\
> > Wrapped by: java.io.IOException: Error while opening RocksDB instance.\
> > \\tat 
> > o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92)\
> > \\tat 
> > o.a.f.c.s.s.r.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:145)\
> > \\tat 
> > 

Re: [VOTE] Release 1.13.1, release candidate #1

2021-05-26 Thread Matthias Pohl
Hi Dawid,
+1 (non-binding)

Thanks for driving this release. I checked the following things:
- downloaded and build source code
- verified checksums
- double-checked diff of pom files between 1.13.0 and 1.13.1-rc1
- did a visual check of the release blog post
- started cluster and ran jobs (WindowJoin and WordCount); nothing
suspicious found in the logs
- verified change FLINK-22866 manually whether the issue is fixed

Best,
Matthias

On Tue, May 25, 2021 at 3:33 PM Dawid Wysakowicz 
wrote:

> Hi everyone,
> Please review and vote on the release candidate #1 for the version 1.13.1,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint 31D2DD10BFC15A2D [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.13.1-rc1" [5],
> * website pull request listing the new release and adding announcement
> blog post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Best,
> Dawid
>
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1422/
> [5] https://github.com/apache/flink/tree/release-1.13.1-rc1
> [6] https://github.com/apache/flink-web/pull/448
>


退订

2021-05-26 Thread 张斌


退订
| |
张斌
|
|
herobin1...@163.com
|
签名由网易邮箱大师定制



退订

2021-05-26 Thread 402991848
退订

退订

2021-05-26 Thread wujing...@shantaijk.cn
退订





退订

2021-05-26 Thread chongwei.zhou

退订

createTemporaryView接口注册table时,fieldname支持中横线(-)

2021-05-26 Thread Jun Zou
Hi,all:
我使用flink1.11.2进行作业开发,由于要兼容内部历史代码,需要把source手动注册成一张表,调用为:

> tableEnv.createTemporaryView(tableSource.getName, source, fields.toArray:
> _*)
>
其中,tableEnv为 StreamTableEnvironment类型,source是 DataStream[Row] 类型,代表source
connector生成的算子,fields是 由处理过的source table的 filed name 转换成的 Expression,将filed
name转换成expression 使用 *ExpressionParser.parseExpression* 这个方法

正常情况下,都能注册成功。
但是,当field name带中横线,如 source中一个字段名称为
“X-Oem”时,经过 ExpressionParser.parseExpression 会被解析为 “minus(X, Model)”
而非预期的“X-Oem”,导致注册成的表与DML语句中操作的字段名不一致报错。

有什么方法能够处理这种情况么?


Re: Flink 1.12.4 docker image

2021-05-26 Thread Arvid Heise
Just FYI https://hub.docker.com/_/flink is updated now as well.

On Wed, May 26, 2021 at 9:57 AM Nikola Hrusov  wrote:

> Hello Arvid,
>
> Thank you for your fast response
>
> Regards
> ,
> Nikola
>
>
> On Tue, May 25, 2021 at 7:11 PM Arvid Heise  wrote:
>
>> Hi Nikola,
>>
>> https://hub.docker.com/r/apache/flink now contains the images. It takes
>> a few days until https://hub.docker.com/_/flink is updated though.
>>
>> Sorry for the hassle.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, May 25, 2021 at 3:08 PM Arvid Heise  wrote:
>>
>>> Hi Nikola,
>>>
>>> I'm looking into it. I might have missed a step during release.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Mon, May 24, 2021 at 3:47 PM Nikola Hrusov 
>>> wrote:
>>>
 Hello,

 I saw that flink 1.12.4 just got released. However I am struggling to
 find the docker image.

 I checked both:
 - https://hub.docker.com/_/flink
 - https://hub.docker.com/r/apache/flink

 but on both 1.12.4 is not available.

 Are there plans to publish it as a docker image?

 Regards
 ,
 Nikola




Re: avro.ComplexPayloadAvro

2021-05-26 Thread r pp
谢谢,好奇为什么要这么做,动态编译么?

Qishang  于2021年5月26日周三 下午1:57写道:

> Hi.
>
> 会生成 `${project.basedir}/target/generated-sources/`
>
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml#L97
>
> r pp  于2021年5月25日周二 上午9:58写道:
>
> > 各位好,请问下,
> >
> >
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
> >
> > 在该类下的
> >
> >
> >
> /flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
> > 下面两个类,在代码哪里?
> > import org.apache.flink.streaming.tests.avro.ComplexPayloadAvro;
> > import org.apache.flink.streaming.tests.avro.InnerPayLoadAvro;
> > --
> > Best,
> >   pp
> >
>


-- 
Best,
  pp


Re: flink状态查看工具

2021-05-26 Thread Paul Lam
可以使用 State Processor [1]。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

Best,
Paul Lam

> 2021年5月26日 09:14,casel.chen  写道:
> 
> 我有一个flink sql写的数据实时同步作业,从mysql binlog cdc消费发到mongodb,仅此而已,没有lookup,也没有join。
> 查看checkpoint页显示状态有17MB,checkpoint耗时要2s。
> 想知道为什么状态会如此之大,有没有状态查看工具看看里面到底存了什么信息?



Fwd: Getting error in pod template

2021-05-26 Thread Priyanka Manickam
-- Forwarded message -
From: Priyanka Manickam 
Date: Tue, 25 May 2021, 21:13
Subject: Fwd: Getting error in pod template
To: user , Yang Wang 




-- Forwarded message -
From: Priyanka Manickam 
Date: Tue, 25 May 2021, 21:11
Subject: Re: Getting error in pod template
To: Yang Wang 


Hi team ,

Now i am able to run the flink with pod-template. Thanks for the input.


Problem 1:
 But i am not able to pull the jar from the blob storage throught command
used in the pod template file.

Problem 2:
Also, we are trying to write the events from one topic to another topic.
Where with parallelism 8, task slots 8, -Djobmanager.memory.process.size=1g,
-Dtaskmanager.memory.process.size=2g,
-Dkubernetes.jobmanager.cpu=0.5,
-Dtaskmanager.cpu=2.

Kafka(eventhub ) partition =3, we have planned to get 1 lac messages per
second.

But,I was able to get the through put of 555 mesages per seconds.

I have tried to increase the parallelism also, it doesnot work.

Could you please help me out here.

Thanks,
Priyanka Manickam.

On Fri, 14 May 2021, 21:00 Priyanka Manickam, 
wrote:

> Hi yang,
>
> I was using pod template to fetch the logs to the particular repository.
>
> But while deploying i have got some error , says "
> jobmanager-pod-template" is invalid : spec.containers(0).image: required
> value.
>
> . And if i try to give add the image for flink-main-container. Its giving
> image pull back of error.
>
> Am i proceeding in the a correct way . Because in the flink official
> website , no image is added after the flink-main-container.
>
> Could you please help with this. I have also searchsd for the demo videos
> for using the pod template with flink native kubernetes but i could not
> able to find..If you could share any demo videos on the website it will
> very useful for everyone.
>
> Good year ahead..
>
> Thanks,
> Priyanka Manickam.
>
>
>


Re: Flink 1.12.4 docker image

2021-05-26 Thread Nikola Hrusov
Hello Arvid,

Thank you for your fast response

Regards
,
Nikola


On Tue, May 25, 2021 at 7:11 PM Arvid Heise  wrote:

> Hi Nikola,
>
> https://hub.docker.com/r/apache/flink now contains the images. It takes a
> few days until https://hub.docker.com/_/flink is updated though.
>
> Sorry for the hassle.
>
> Best,
>
> Arvid
>
> On Tue, May 25, 2021 at 3:08 PM Arvid Heise  wrote:
>
>> Hi Nikola,
>>
>> I'm looking into it. I might have missed a step during release.
>>
>> Best,
>>
>> Arvid
>>
>> On Mon, May 24, 2021 at 3:47 PM Nikola Hrusov  wrote:
>>
>>> Hello,
>>>
>>> I saw that flink 1.12.4 just got released. However I am struggling to
>>> find the docker image.
>>>
>>> I checked both:
>>> - https://hub.docker.com/_/flink
>>> - https://hub.docker.com/r/apache/flink
>>>
>>> but on both 1.12.4 is not available.
>>>
>>> Are there plans to publish it as a docker image?
>>>
>>> Regards
>>> ,
>>> Nikola
>>>
>>>


Re: Flink 1.11.3 NoClassDefFoundError: Could not initialize class

2021-05-26 Thread Piotr Nowojski
Hi,

Maybe before deleting the pods, you could look inside them and inspect your
job's jar? What classes does it have inside it? The job's jar should be in
a local directory. Or maybe even first inspect the jar before submitting it?

Best, Piotrek

wt., 25 maj 2021 o 17:40 Georgi Stoyanov  napisał(a):

> Hi Piotr, thank you for the fast reply.
>
>
>
> The job is restarting in the same flink session and fails with that
> exception. When I delete the pods (we are using the google cdr, so I just
> kubectl delete FlinkCluster …) and the yaml is applied again, it’s working
> as expected. It looks to me that it’s jar problem, since I just notice it
> started to fail with a class from a internal common library, not only the
> jobs
>
> java.lang.NoClassDefFoundError: Could not initialize
> com.my.organization.core.cfg.PropertiesConfigurationClass
> at
> com.my.organization.core.CassandraSink$1.buildCluster(CassandraSink.java:162)
> at
> org.apache.flink.streaming.connectors.cassandra.ClusterBuilder.getCluster(ClusterBuilder.java:32)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:86)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:106)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Tuesday, May 25, 2021 6:18 PM
> *To:* Georgi Stoyanov 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink 1.11.3 NoClassDefFoundError: Could not initialize
> class
>
>
>
> Hi Georgi,
>
>
>
> I don't think it's a bug in Flink. It sounds like some problem with
> dependencies or jars in your job. Can you explain a bit more what do you
> mean by:
>
>
>
> > that some of them are constantly restarting with the following
> exception. After restart, everything is working as expected
>
>
>
> constantly restarting, but after a restart everything is working?
>
>
>
> Best,
>
> Piotrek
>
>
>
> wt., 25 maj 2021 o 16:12 Georgi Stoyanov  napisał(a):
>
> Hi all,
>
>
> We have running several Flink jobs on k8s with flink 1.11.3 and recently
> we notice that some of them are constantly restarting with the following
> exception. After restart, everything is working as expected.
> Could this be a bug?
> 2021-05-25 17:04:42
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.init(OperatorChain.java:126)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at
> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
> at
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>  

Re: Managing Jobs entirely with Flink Monitoring API

2021-05-26 Thread Piotr Nowojski
Hi Barak,

Before starting the JobManager I don't think there is any API running at
all. If you want to be able to submit/stop multiple jobs to the same
cluster session mode is indeed the way to go. But first you need to to
start the cluster ( start-cluster.sh ) [1]

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/

wt., 25 maj 2021 o 14:10 Barak Ben Nathan 
napisał(a):

>
>
> I want to manage the execution of Flink Jobs programmatically through
> Flink Monitoring API.
>
>
>
> I.e. I want to run/delete jobs ONLY with the
>  POST /jars/:jarid/run
>  POST /jobs/:jobid/stop
> API commands.
>
>
>
> Now, it seems that the Session Mode may fits my needs: “Session Mode: one
> JobManager instance manages multiple jobs sharing the same cluster of
> TaskManagers” (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/)
>
> However, I couldn’t find a way to start the API server (i.e. a JobManager)
> that didn’t already include submitting a JAR file for a job execution.
>
> Any suggestions?
>


Re: yarn ship from s3

2021-05-26 Thread Matthias Pohl
Hi Vijay,
have you tried yarn-ship-files [1] or yarn-ship-archives [2]? Maybe, that's
what you're looking for...

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-files
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-archives

On Tue, May 25, 2021 at 5:56 PM Vijayendra Yadav 
wrote:

> Hi Piotr,
>
> I have been doing the same process as you mentioned so far, now I am
> migrating the deployment process using AWS CDK and AWS Step Functions, kind
> of like the CICD process.
> I added a download step of jar and configs (1, 2, 3 and 4) from S3 using
> command-runner.jar (AWS Step); it loaded that into one of the Master nodes
> (out of 3). In the next step when I launched Flink Job it would not find
> build because Job is launched in some other yarn node.
>
> I was hoping just like *Apache spark *where whatever files we provide in
> *--file*s are shipped to yarn (s3 to yarn workfirectory), Flink should
> also have a solution.
>
> Thanks,
> Vijay
>
>
> On Tue, May 25, 2021 at 12:50 AM Piotr Nowojski 
> wrote:
>
>> Hi Vijay,
>>
>> I'm not sure if I understand your question correctly. You have jar and
>> configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
>> those? Can you simply download those things (whole directory containing
>> those) to the machine that will be starting the Flink job?
>>
>> Best, Piotrek
>>
>> wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
>> napisał(a):
>>
>>> Hi Team,
>>>
>>> I am trying to find a way to ship files from aws s3 for a flink
>>> streaming job, I am running on AWS EMR. What i need to ship are following:
>>> 1) application jar
>>> 2) application property file
>>> 3) custom flink-conf.yaml
>>> 4) log4j application specific
>>>
>>> Please let me know options.
>>>
>>> Thanks,
>>> Vijay
>>
>>


退订邮件

2021-05-26 Thread wujing...@shantaijk.cn
退订



wujing...@shantaijk.cn


Re: flink sql cdc并行度问题

2021-05-26 Thread Zorro
mysql-cdc connector只能设置一个并行度,主要可能有这些原因:
1. mysql binlog本质上是一个文件,多个并行度消费需要避免重复
2. 多个并行度消费难以保证顺序

sink可以设为多个并行度,但是顺序不一定,如果需要主键相同的记录发到同一个sink线程可以先做一个keyby,并且保证keyby并行度与sink并行度相同,这样基本上能够保证数据forward传输,不过也不能100%保证有序。

如果需要保证有序还是建议sink并行度为1



--
Sent from: http://apache-flink.147419.n8.nabble.com/