回复:flink 1.12 on yarn WebUI不显示logs

2021-06-02 Thread smq
加了一些配置,一些程序有日志,一些没有日志





-- 原始邮件 --
发件人: JasonLee <17610775...@163.com
发送时间: 2021年6月3日 12:44
收件人: user-zh http://apache-flink.147419.n8.nabble.com/

Re: SourceFunction cannot run in Batch Mode

2021-06-02 Thread Ingo Bürk
Hi Oscar,

I think you'll find your answers in [1], have a look at Yun's response a
couple emails down. Basically, SourceFunction is the legacy source stack,
and ideally you'd instead implement your source using the FLIP-27 stack[2]
where you can directly define the boundedness, but he also mentioned a
workaround.


Regards
Ingo

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-Kafka-as-bounded-source-with-DataStream-API-in-batch-mode-Flink-1-12-td40637.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/sources/#the-data-source-api

On Thu, Jun 3, 2021 at 7:29 AM 陳樺威  wrote:

> Hi,
>
> Currently, we want to use batch execution mode [0] to consume historical
> data and rebuild states for our streaming application.
> The Flink app will be run on-demand and close after complete all the file
> processing.
> We implement a SourceFuntion [1] to consume bounded parquet files from
> GCS. However, the function will be detected as Batch Mode.
>
> Our question is, how to implement a SourceFunction as a Bounded DataStream?
>
> Thanks!
> Oscar
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
>
>
>
>


Re: SourceFunction cannot run in Batch Mode

2021-06-02 Thread 陳樺威
Sorry, there are some typos that may be misleading.

The SourceFunction will be detected as* Streaming Mode.*

陳樺威  於 2021年6月3日 週四 下午1:29寫道:

> Hi,
>
> Currently, we want to use batch execution mode [0] to consume historical
> data and rebuild states for our streaming application.
> The Flink app will be run on-demand and close after complete all the file
> processing.
> We implement a SourceFuntion [1] to consume bounded parquet files from
> GCS. However, the function will be detected as Batch Mode.
>
> Our question is, how to implement a SourceFunction as a Bounded DataStream?
>
> Thanks!
> Oscar
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
>
>
>
>


SourceFunction cannot run in Batch Mode

2021-06-02 Thread 陳樺威
Hi,

Currently, we want to use batch execution mode [0] to consume historical
data and rebuild states for our streaming application.
The Flink app will be run on-demand and close after complete all the file
processing.
We implement a SourceFuntion [1] to consume bounded parquet files from GCS.
However, the function will be detected as Batch Mode.

Our question is, how to implement a SourceFunction as a Bounded DataStream?

Thanks!
Oscar

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html


Re: flink 1.12 on yarn WebUI不显示logs

2021-06-02 Thread JasonLee
hi

有改过默认的日志配置文件吗?



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink stream processing issue

2021-06-02 Thread Qihua Yang
Hi,

I have a question. We have two data streams that may contain duplicate
data. We are using keyedCoProcessFunction to process stream data. I defined
the same keySelector for both streams. Our flink application has multiple
replicas. We want the same data to be processed by the same replica. Can
flink ensure that?

Thanks,
Qihua


Corrupted unaligned checkpoints in Flink 1.11.1

2021-06-02 Thread Alexander Filipchik
Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1
and run a job with unaligned checkpoints and Rocks Db backend. The whole
state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't
take a save point as they were timing out. It looks like the reason it was
timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't
start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found:
'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'.
Note, it is possible that the live version is still available but the
requested generation is deleted.
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions
.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(
GoogleCloudStorageImpl.java:653)
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(
GoogleCloudStorageFileSystem.java:277)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.(
GoogleHadoopFSInputStream.java:78)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(
GoogleHadoopFileSystemBase.java:620)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
.java:120)
at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
.java:37)
at org.apache.flink.core.fs.
PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(
PluginFileSystemFactory.java:127)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(
SafetyNetWrapperFileSystem.java:85)
at org.apache.flink.runtime.state.filesystem.FileStateHandle
.openInputStream(FileStateHandle.java:69)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(
ThrowingRunnable.java:50)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture
.java:1640)
at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(
DirectExecutorService.java:211)
at java.util.concurrent.CompletableFuture.asyncRunStage(
CompletableFuture.java:1654)
at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture
.java:1871)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
at org.apache.flink.contrib.streaming.state.restore.
RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(
RocksDBIncrementalRestoreOperation.java:230)
at org.apache.flink.contrib.streaming.state.restore.
RocksDBIncrementalRestoreOperation.restoreFromRemoteState(
RocksDBIncrementalRestoreOperation.java:195)
at org.apache.flink.contrib.streaming.state.restore.
RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(
RocksDBIncrementalRestoreOperation.java:169)
at org.apache.flink.contrib.streaming.state.restore.
RocksDBIncrementalRestoreOperation.restore(
RocksDBIncrementalRestoreOperation.java:155)
at org.apache.flink.contrib.streaming.state.
RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:
270)
... 15 more

We tried to roll back the code, we tried different checkpoints, but all the
attempts failed with the same error. The job ID in the error is not from
the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after
2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss
metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between
deployments to make sure the non existent file
is not coming from there.

We decided to drop the state as we have means to repopulate it, but it
would be great to get to the bottom of it. Any help will be appreciated.

Alex


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 Thread yujianbo
好的非常感谢,我拿几个任务测试一波,看看性能能不能接受!


Hi,

没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint
meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。

总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level
[1] 来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size

祝好
唐云



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 Thread Yun Tang
Hi,

没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint 
meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。

总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level [1] 
来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size

祝好
唐云


From: yujianbo <15205029...@163.com>
Sent: Wednesday, June 2, 2021 15:29
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

Hi,

确认的情况:

大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。

配置:
   idleStateRetention确实是设置3600秒,保留的ck目录是3个。
目前情况:
 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。
 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。
 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。

那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度?
JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白)





Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1]
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <[hidden email]>
Sent: Tuesday, June 1, 2021 10:51
To: [hidden email] <[hidden email]>
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink jar包任务如何读取hdfs配置文件

2021-06-02 Thread 骆凡
rt flink jar任务 在一个yarn集群上运行 要将数据写入另一个hdfs集群 
会报错
Caused by: java.net.UnknownHostException: xxx-hdfs
有办法让jar包任务读取加载该集群的配置文件么



--
Sent from: http://apache-flink.147419.n8.nabble.com/


??????flink postgres jdbc catalog????????????

2021-06-02 Thread ????
??postgresql??mysql??1.11.1??




----
??: 
   "user-zh"



flink postgres jdbc catalog是只读的吗?

2021-06-02 Thread casel.chen
flink postgres jdbc catalog是只读的吗?能写的catalog 除了Hive Catalog还有哪些?社区什么时候会有Mysql 
JDBC Catalog呢?

Re: Parquet reading/writing

2021-06-02 Thread Taras Moisiuk
Thank you,

looks like shuffle() works



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Prometheus Reporter Enhancement

2021-06-02 Thread Mason Chen
Hi Chesnay,

I would like to take on https://issues.apache.org/jira/browse/FLINK-17495 
 as a contribution to OSS, 
if that’s alright with the team. We can discuss implementation details here or 
in the ticket, but I was thinking about modifying the ReporterScopedSettings to 
enable this generic tag support.

Best,
Mason

> On May 20, 2021, at 4:36 AM, Chesnay Schepler  wrote:
> 
> There is no plan to generally exclude label keys from the metric 
> identifier/logical scope. They ensure that the label set for a given 
> identifier/scope is unique, i.e., you can't have 2 metrics called 
> "numRecordsIn" with different label sets. Changing this would also break all 
> existing setups, so if anything if would have to be an opt-in feature.
> 
> What I envision more is for the user to have more control over the metric 
> identifier/logical scope via the scope formats. They are currently quite 
> limited by only   controlling part of the final identifier, while the 
> logical scope isn't controllable at all.
> 
> Generally though, there's a fair bit of internal re-structuring that we'd 
> like to do before extending the metric system further, because we've been 
> tacking on more and more things since it was released in 1.3.0 (!!!) but 
> barely refactored things to properly fit together.
> 
> On 5/20/2021 12:58 AM, Mason Chen wrote:
>> Are there any plans to rework some of the metric name formulations 
>> (getMetricIdentifier or getLogicalScope)? Currently, the label keys and/or 
>> label values are concatenated in the metric name and the information is 
>> redundant and makes the metric names longer.
>> 
>> Would it make sense to remove the tag related information 
>> (getAllVariables())?
>> 
>>> On May 18, 2021, at 3:45 PM, Chesnay Schepler >> > wrote:
>>> 
>>> There is already a ticket for this. Note that this functionality should be 
>>> implemented in a generic fashion to be usable for all reporters.
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-17495 
>>> 
>>> 
>>> On 5/18/2021 8:16 PM, Andrew Otto wrote:
 Sounds useful!
 
 On Tue, May 18, 2021 at 2:02 PM Mason Chen >>> > wrote:
 Hi all,
 
 Would people appreciate enhancements to the prometheus reporter to include 
 extra labels via a configuration, as a contribution to Flink? I can see it 
 being useful for adding labels that are not job specific, but infra 
 specific.
 
 The change would be nicely integrated with the Flink’s ConfigOptions and 
 unit tested.
 
 Best,
 Mason
>>> 
>> 
> 



Flink kafka consumers stopped consuming messages

2021-06-02 Thread Ilya Karpov
Hi there,

today I've observed strange behaviour of a flink streaming application (flink 
1.6.1, per-job cluster deployment, yarn):
3 task managers (2 slots each) are running but only 1 slot is actually 
consuming messages from kafka (v0.11.0.2), others were idling 
(currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics). 

So I started to investigate: 
- `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all 6 
topic partitions are constantly increasing.
- `kafka-consumer-groups.sh` listed only single (the 4th) partition. That makes 
me thinks that by somehow 5 kafka consumers lost connection to brokers.
- A LOT of messages "Committing offsets to Kafka takes longer than the 
checkpoint interval. Skipping commit of previous offsets because newer complete 
checkpoint offsets are available. This does not compromise Flink's checkpoint 
integrity." in each task manager instance.
- 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
- no application errors/uncaught exceptions etc.
- no reconnections to kafka.
- some network issues connected with hdfs (Slow waitForAckedSeqno).
- all kafka networking setting are default (e.g. timeouts).

After job restart all task managers started to consume messages (6 slots in 
total, and `kafka-consumer-groups.sh` listed that all 6 partitions are 
consumed).

May be someone had already experienced something similar?

Job topology is as follows (no window operations!):
```
val dataStream = env.addSource(kafkaSource).map(processor);

val terminalStream = AsyncDataStream
.unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
.process(sideOutputFun);

terminalStream
.keyBy(selector)
.process(keyProcFun)
.addSink(kafkaSink_1);

terminalStream
.getSideOutput("outputTag")
.addSink(kafkaSink_2);
```

Re: avro-confluent supports authentication enabled schema registry

2021-06-02 Thread tao xiao
JIRA created https://issues.apache.org/jira/browse/FLINK-22858 but I cannot
assign it to myself. Can you pls assign it to me?

On Wed, Jun 2, 2021 at 11:00 PM Fabian Paul 
wrote:

> Hi Tao,
>
> I was browsing the code a bit and I think this is currently not support
> but it seems to be not too
> difficult to implement. You would need to allow a map of configurations
> and finally pass it to [1]
>
> Can you create a ticket in our JIRA?
> Would you be willing to contribute this feature?
>
> Best,
> Fabian
>
>
> [1]
> https://github.com/apache/flink/blob/1db4e560d1b46fac27a18bce9556fec646f063d9/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java#L54
>
> On 2. Jun 2021, at 13:57, tao xiao  wrote:
>
> Hi Fabian,
>
> Unfortunately this will not work in our environment where we implement our
> own 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider
> which does the login and supplies the JWT to authorization HTTP header. The
> only way it will work is to pass the schema registry
> config BEARER_AUTH_CREDENTIALS_SOURCE [1] to table format factory
>
> [1]
> https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java#L85
>
> On Wed, Jun 2, 2021 at 5:27 PM Fabian Paul 
> wrote:
>
>> Hi Tao,
>>
>> Thanks for reaching out. Have you tried the following
>>
>>  'value.avro-confluent.schema-registry.url' = 
>> 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
>>
>>
>>
>> It may be possible to provide basic HTTP authentication by adding your
>> username and password to the URL. There is already a similar ticket open
>> unfortunately without much progress. [1]
>> Please let me know if this works for  you otherwise we can try to find a
>> different solution.
>>
>> Best,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-22763
>>
>>
>> On 2. Jun 2021, at 10:58, tao xiao  wrote:
>>
>> Hi team,
>>
>> Confluent schema registry supports HTTP basic authentication[1] but I
>> don't find the corresponding configs in Flink documentation[2]. Is this
>> achievable in Flink avro-confluent?
>>
>>
>> [1]
>> https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options
>> --
>> Regards,
>> Tao
>>
>>
>>
>
> --
> Regards,
> Tao
>
>
>

-- 
Regards,
Tao


Re: avro-confluent supports authentication enabled schema registry

2021-06-02 Thread Fabian Paul
Hi Tao,

I was browsing the code a bit and I think this is currently not support but it 
seems to be not too 
difficult to implement. You would need to allow a map of configurations and 
finally pass it to [1]

Can you create a ticket in our JIRA?
Would you be willing to contribute this feature?

Best,
Fabian


[1] 
https://github.com/apache/flink/blob/1db4e560d1b46fac27a18bce9556fec646f063d9/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java#L54

> On 2. Jun 2021, at 13:57, tao xiao  wrote:
> 
> Hi Fabian,
> 
> Unfortunately this will not work in our environment where we implement our 
> own 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider
>  which does the login and supplies the JWT to authorization HTTP header. The 
> only way it will work is to pass the schema registry config 
> BEARER_AUTH_CREDENTIALS_SOURCE [1] to table format factory
> 
> [1] 
> https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java#L85
>  
> 
> On Wed, Jun 2, 2021 at 5:27 PM Fabian Paul  > wrote:
> Hi Tao,
> 
> Thanks for reaching out. Have you tried the following
> 
>  'value.avro-confluent.schema-registry.url' = 
> 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud
>  ', 
> 
> 
> It may be possible to provide basic HTTP authentication by adding your 
> username and password to the URL. There is already a similar ticket open 
> unfortunately without much progress. [1]
> Please let me know if this works for  you otherwise we can try to find a 
> different solution.
> 
> Best,
> Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK-22763 
> 
> 
> 
>> On 2. Jun 2021, at 10:58, tao xiao > > wrote:
>> 
>> Hi team,
>> 
>> Confluent schema registry supports HTTP basic authentication[1] but I don't 
>> find the corresponding configs in Flink documentation[2]. Is this achievable 
>> in Flink avro-confluent?
>> 
>> 
>> [1]https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms
>>  
>> 
>> [2]https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options
>>  
>> --
>>  
>> Regards,
>> Tao
> 
> 
> 
> -- 
> Regards,
> Tao



Re: Parquet reading/writing

2021-06-02 Thread Taras Moisiuk
Hi Fabian,

Thank you for your answer. I've updated the flink version to 1.12.4 but
unfortunately the problem still persists.

I'm running this job in local mode, so I have only following log: 

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.api.java.ClosureCleaner
(file:/Users/user/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/apache/flink/flink-core/1.12.4/flink-core-1.12.4.jar)
to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-06-02 17:13:00.713+0300  info [TypeExtractor] class
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
does not contain a setter for field modificationTime 
2021-06-02 17:13:00.736+0300  info [TypeExtractor] Class class
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance. 
2021-06-02 17:13:01.136+0300  info [TaskExecutorResourceUtils] The
configuration option taskmanager.cpu.cores required for local execution is
not set, setting it to the maximal possible value. 
2021-06-02 17:13:01.136+0300  info [TaskExecutorResourceUtils] The
configuration option taskmanager.memory.task.heap.size required for local
execution is not set, setting it to the maximal possible value. 
2021-06-02 17:13:01.137+0300  info [TaskExecutorResourceUtils] The
configuration option taskmanager.memory.task.off-heap.size required for
local execution is not set, setting it to the maximal possible value. 
2021-06-02 17:13:01.137+0300  info [TaskExecutorResourceUtils] The
configuration option taskmanager.memory.network.min required for local
execution is not set, setting it to its default value 64 mb. 
2021-06-02 17:13:01.137+0300  info [TaskExecutorResourceUtils] The
configuration option taskmanager.memory.network.max required for local
execution is not set, setting it to its default value 64 mb. 
2021-06-02 17:13:01.137+0300  info [TaskExecutorResourceUtils] The
configuration option taskmanager.memory.managed.size required for local
execution is not set, setting it to its default value 128 mb. 
2021-06-02 17:13:01.147+0300  info [MiniCluster] Starting Flink Mini Cluster 
2021-06-02 17:13:01.149+0300  info [MiniCluster] Starting Metrics Registry 
2021-06-02 17:13:01.163+0300  info [MetricRegistryImpl] No metrics reporter
configured, no metrics will be exposed/reported. 
2021-06-02 17:13:01.164+0300  info [MiniCluster] Starting RPC Service(s) 
2021-06-02 17:13:01.177+0300  info [AkkaRpcServiceUtils] Trying to start
local actor system 
2021-06-02 17:13:01.468+0300  info [Slf4jLogger] Slf4jLogger started 
2021-06-02 17:13:01.621+0300  info [AkkaRpcServiceUtils] Actor system
started at akka://flink 
2021-06-02 17:13:01.638+0300  info [AkkaRpcServiceUtils] Trying to start
local actor system 
2021-06-02 17:13:01.651+0300  info [Slf4jLogger] Slf4jLogger started 
2021-06-02 17:13:01.683+0300  info [AkkaRpcServiceUtils] Actor system
started at akka://flink-metrics 
2021-06-02 17:13:01.699+0300  info [AkkaRpcService] Starting RPC endpoint
for org.apache.flink.runtime.metrics.dump.MetricQueryService at
akka://flink-metrics/user/rpc/MetricQueryService . 
2021-06-02 17:13:01.721+0300  info [MiniCluster] Starting high-availability
services 
2021-06-02 17:13:01.740+0300  info [BlobServer] Created BLOB server storage
directory
/var/folders/ms/18j3nxjd6rzcj9mmgly3n0rrq_g_vb/T/blobStore-880c269b-c4ef-475e-91a1-f542485ccee3
 
2021-06-02 17:13:01.749+0300  info [BlobServer] Started BLOB server at
0.0.0.0:56749 - max concurrent requests: 50 - max backlog: 1000 
2021-06-02 17:13:01.756+0300  info [PermanentBlobCache] Created BLOB cache
storage directory
/var/folders/ms/18j3nxjd6rzcj9mmgly3n0rrq_g_vb/T/blobStore-1c57d8bd-0a31-464f-8c74-62ce32f56f49
 
2021-06-02 17:13:01.757+0300  info [TransientBlobCache] Created BLOB cache
storage directory
/var/folders/ms/18j3nxjd6rzcj9mmgly3n0rrq_g_vb/T/blobStore-6f8d5c1b-1ee5-49bd-bbba-c66aa8d18a90
 
2021-06-02 17:13:01.757+0300  info [MiniCluster] Starting 1 TaskManger(s) 
2021-06-02 17:13:01.763+0300  info [TaskManagerRunner] Starting TaskManager
with ResourceID: 264008fd-b7b7-4639-a3d4-ae430793106a 
2021-06-02 17:13:01.784+0300  info [TaskManagerServices] Temporary file
directory '/var/folders/ms/18j3nxjd6rzcj9mmgly3n0rrq_g_vb/T': total 233 GB,
usable 18 GB (7.73% usable) 
2021-06-02 17:13:01.789+0300  info [FileChannelManagerImpl]
FileChannelManager uses directory
/var/folders/ms/18j3nxjd6rzcj9mmgly3n0rrq_g_vb/T/flink-io-dcec76d9-4e46-411d-ad09-4257383a9df6
for spill files. 
2021-06-02 

flink sql作业表定义部分字段问题

2021-06-02 Thread casel.chen
有一个flink sql 
mysql-cdc作业,从kafka消费canal-json格式的binlog数据,打到下游存储,遇到一个问题:上游源表字段是会动态添加的,而我的sql 
table定义是静态的,如果上游schema发生变化,我的作业就会失败。在flink 
sql中是否可以针对目标数据源只定义用到的字段?如果可以的话应该怎么实现?现在是会抛类似下面的error。意思是原本包含43列的数据表,在我的DDL中只定义了其中的4列。有哪些格式是支持定义部分字段的呢?


21/06/02 18:54:22 [Source: TableSourceScan(table=[[default_catalog, 
default_database, charge_log]], fields=[id, charge_id, trace_id, app_id]) -> 
Sink: Sink(table=[default_catalog.default_database.print_table], fields=[id, 
charge_id, trace_id, app_id]) (3/12)#0] WARN taskmanager.Task: Source: 
TableSourceScan(table=[[default_catalog, default_database, charge_log]], 
fields=[id, charge_id, trace_id, app_id]) -> Sink: 
Sink(table=[default_catalog.default_database.print_table], fields=[id, 
charge_id, trace_id, app_id]) (3/12)#0 (8810adcd7960cb22a6954c985ba49d0d) 
switched from RUNNING to FAILED.
 java.lang.IllegalArgumentException: Row arity: 43, but serializer arity: 4
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

怎么避免flink sql cdc作业重启后重新从头开始消费binlog?

2021-06-02 Thread casel.chen
我有一个如下flink sql cdc作业,设置了'scan.startup.mode' = 
'latest-offset'。但在作业重启后发现它又从头开始消费binlog,导致sink下游数据库频繁报duplicate key 
error,有什么办法可以避免吗?


CREATE TABLE `mysql_source` (
`id` STRING,
`acct_id` STRING,
`acct_name` STRING,
`acct_type` STRING,
`acct_bal` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'mysql',
'password' = 'mysql',
'database-name' = 'test',
'scan.startup.mode' = 'latest-offset',
'table-name' = 'test',
'server-time-zone' = 'Asia/Shanghai'
);

Re: recover from svaepoint

2021-06-02 Thread Piotr Nowojski
Hi,

I think there is no generic way. If this error has happened indeed after
starting a second job from the same savepoint, or something like that, user
can change Sink's operator UID.

If this is an issue of intentional recovery from an earlier
checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
helpful.

Best, Piotrek

wt., 1 cze 2021 o 15:16 Till Rohrmann  napisał(a):

> The error message says that we are trying to reuse a transaction id that is
> currently being used or has expired.
>
> I am not 100% sure how this can happen. My suspicion is that you have
> resumed a job multiple times from the same savepoint. Have you checked that
> there is no other job which has been resumed from the same savepoint and
> which is currently running or has run and completed checkpoints?
>
> @pnowojski  @Becket Qin  how
> does the transaction id generation ensures that we don't have a clash of
> transaction ids if we resume the same job multiple times from the same
> savepoint? From the code, I do see that we have a TransactionalIdsGenerator
> which is initialized with the taskName and the operator UID.
>
> fyi: @Arvid Heise 
>
> Cheers,
> Till
>
>
> On Mon, May 31, 2021 at 11:10 AM 周瑞  wrote:
>
> > HI:
> >   When "sink.semantic = exactly-once", the following exception is
> > thrown when recovering from svaepoint
> >
> >public static final String KAFKA_TABLE_FORMAT =
> > "CREATE TABLE "+TABLE_NAME+" (\n" +
> > "  "+COLUMN_NAME+" STRING\n" +
> > ") WITH (\n" +
> > "   'connector' = 'kafka',\n" +
> > "   'topic' = '%s',\n" +
> > "   'properties.bootstrap.servers' = '%s',\n" +
> > "   'sink.semantic' = 'exactly-once',\n" +
> > "   'properties.transaction.timeout.ms' =
> > '90',\n" +
> > "   'sink.partitioner' =
> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> > "   'format' = 'dbz-json'\n" +
> > ")\n";
> >   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> > debezium_source]], fields=[data]) -> Sink: Sink
> > (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
> > Unexpected error in InitProducerIdResponse; Producer attempted an
> > operation with an old epoch. Either there is a newer producer with the
> > same transactionalId, or the producer's transaction has been expired by
> > the broker.
> > at org.apache.kafka.clients.producer.internals.
> >
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> > .java:1352)
> > at org.apache.kafka.clients.producer.internals.
> > TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> > 1260)
> > at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> > .java:109)
> > at org.apache.kafka.clients.NetworkClient.completeResponses(
> > NetworkClient.java:572)
> > at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> > at org.apache.kafka.clients.producer.internals.Sender
> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
> > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> > .java:312)
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> > 239)
> > at java.lang.Thread.run(Thread.java:748)
> >
>


Re: Parquet reading/writing

2021-06-02 Thread Fabian Paul
Hi Taras,

On first glance this looks like a bug to me. Can you try the latest 1.12 
version (1.12.4)?
If the bug still persists can you share the full job manager and task manager 
logs to 
further debug this problem.

Best,
Fabian

> On 2. Jun 2021, at 13:22, Taras Moisiuk  wrote:
> 
> Update: 
> 
> The job is working correctly if add an additional identity mapping step: 
> 
> env.createInput(parquetInputFormat)
>.map(record => record)
>.sinkTo(FileSink.forBulkFormat...)
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: pyflink kafka connector报错ByteArrayDeserializer

2021-06-02 Thread Dian Fu
要用fat jar: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.1/flink-sql-connector-kafka_2.11-1.13.1.jar


> 2021年6月2日 下午2:43,qianhuan <819687...@qq.com> 写道:
> 
> 版本:
> python 3.8
> apache-flink   1.13.1
> apache-flink-libraries 1.13.1
> 
> 代码:
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> 
> def log_processing():
>env = StreamExecutionEnvironment.get_execution_environment()
>env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
>t_env = StreamTableEnvironment.create(stream_execution_environment=env,
> environment_settings=env_settings)
>t_env.get_config().get_configuration().\
>set_string("pipeline.jars",
> "file:///root/flink/jars/flink-connector-kafka_2.11-1.13.1.jar")
> 
>kafka_source_ddl = f"""
>CREATE TABLE kafka_source_table(
>a VARCHAR
>) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = 'universal',
>  'connector.topic' = 'test5',
>  'connector.properties.bootstrap.servers' = 'localhost:9092',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'connector.startup-mode' = 'latest-offset',
>  'format.type' = 'json'
>)
>"""
> 
>kafka_sink_ddl = f"""
>CREATE TABLE kafka_sink_table(
>b VARCHAR
>) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = 'universal',
>  'connector.topic' = 'test6',
>  'connector.properties.bootstrap.servers' = 'localhost:9092',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'format.type' = 'json'
>)
>"""
> 
>t_env.execute_sql(kafka_source_ddl)
>t_env.execute_sql(kafka_sink_ddl)
>print("all_tables", t_env.list_tables())
>t_env.sql_query("SELECT a FROM kafka_source_table") \
>.execute_insert("kafka_sink_table").wait()
> 
> if __name__ == '__main__':
>log_processing()
> 
> 报错:
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o65.executeInsert.
> : java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108)
>   at
> org.apache.flink.streaming.connectors.kafka.KafkaTableSource.createKafkaConsumer(KafkaTableSource.java:106)
>   at
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getKafkaConsumer(KafkaTableSourceBase.java:293)
>   at
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:194)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacyTableSourceScan.translateToPlanInternal(CommonExecLegacyTableSourceScan.java:94)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:172)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:112)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>   at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
>   at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at
> 

Re: avro-confluent supports authentication enabled schema registry

2021-06-02 Thread tao xiao
Hi Fabian,

Unfortunately this will not work in our environment where we implement our
own 
io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider
which does the login and supplies the JWT to authorization HTTP header. The
only way it will work is to pass the schema registry
config BEARER_AUTH_CREDENTIALS_SOURCE [1] to table format factory

[1]
https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java#L85

On Wed, Jun 2, 2021 at 5:27 PM Fabian Paul 
wrote:

> Hi Tao,
>
> Thanks for reaching out. Have you tried the following
>
>  'value.avro-confluent.schema-registry.url' = 
> 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
>
>
>
> It may be possible to provide basic HTTP authentication by adding your
> username and password to the URL. There is already a similar ticket open
> unfortunately without much progress. [1]
> Please let me know if this works for  you otherwise we can try to find a
> different solution.
>
> Best,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-22763
>
>
> On 2. Jun 2021, at 10:58, tao xiao  wrote:
>
> Hi team,
>
> Confluent schema registry supports HTTP basic authentication[1] but I
> don't find the corresponding configs in Flink documentation[2]. Is this
> achievable in Flink avro-confluent?
>
>
> [1]
> https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options
> --
> Regards,
> Tao
>
>
>

-- 
Regards,
Tao


Re: DSL for Flink CEP

2021-06-02 Thread Fabian Paul
Hi Dipanjan,

I am afraid there are no foreseeable efforts planned but if you find a nice 
addition, you can 
start a discussion in the community about this feature.

Best,
Fabian
> On 2. Jun 2021, at 12:10, Dipanjan Mazumder  wrote:
> 
> Hi Fabian,
> 
>  Understood but is there any plan to grow the flink  CEP and build a 
> friendly DSL around flink CEP by any chance.
> 
> Regards
> Dipanjan
> 
> On Wednesday, June 2, 2021, 03:22:46 PM GMT+5:30, Fabian Paul 
>  wrote:
> 
> 
> Hi Dipanjan,
> 
> Unfortunately, I have no experience with Siddhi but I am not aware of any 
> official joined efforts between Flink and Siddhi.
> I can imagine that not all Siddhi CEP expressions are compatible with Flink’s 
> CEP. At the moment there is also no active
> development for Flink’s CEP. 
> 
> I think to get a better understanding what the caveats are of the third party 
> solution you have to directly reach out to the 
> maintainers.
> 
> Best,
> Fabian
> 
> 
>> On 2. Jun 2021, at 08:37, Dipanjan Mazumder > > wrote:
>> 
>> Hi ,
>> 
>>I am currently using Siddhi CEP with flink , but the flink-siddhi library 
>> has limited support for flnk versions and i will either need to fix the 
>> library or get tied to a fix version of Flink to use th library.
>> 
>>  I am looking at Flink CEP as an option , and also came across a Flink CEP 
>> DSL library (https://github.com/phil3k3/flink-cep-dsl 
>> ) , but i am not sure about the 
>> acceptance for the same by the Flink community and DEV. Also is Flink CEP 
>> supporting the Siddhi CEP constructs and is rich on the same aspect.
>> 
>> Please let me know the same , so that i can take a cautious decision on the 
>> same.
>> 
>> Regards
>> Dipanjan
> 



Re: Parquet reading/writing

2021-06-02 Thread Taras Moisiuk
Update: 

The job is working correctly if add an additional identity mapping step: 

 env.createInput(parquetInputFormat)
.map(record => record)
.sinkTo(FileSink.forBulkFormat...)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Parquet reading/writing

2021-06-02 Thread Taras Moisiuk
Hi,

I'm trying to read parquet file with Flink 1.12.0 Scala API and save it as
another parquet file.

Now it's working correctly with ParquetRowInputFormat:

val inputPath: String = ...
val messageType: MessageType = ...

val parquetInputFormat = new ParquetRowInputFormat(new Path(inputPath),
messageType)
parquetInputFormat.setNestedFileEnumeration(true)

env.readFile(parquetInputFormat, inputPath)
.map(row => {//mapping row to MyPOJO})
.sinkTo(FileSink.forBulkFormat...)



But when I replace the inputFormat:

val pojoTypeInfo =
Types.POJO(classOf[MyPOJO]).asInstanceOf[PojoTypeInfo[MyPOJO]]
val parquetInputFormat = new ParquetPojoInputFormat(new Path(inputPath),
messageType, pojoTypeInfo)
parquetInputFormat.setNestedFileEnumeration(true)

 env.createInput(parquetInputFormat)
.sinkTo(FileSink.forBulkFormat...)



The job always fails with exception:

java.nio.channels.ClosedChannelException
at
java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:156)
at
java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:331)
at
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at
org.apache.flink.formats.parquet.PositionOutputStreamAdapter.getPos(PositionOutputStreamAdapter.java:50)
at
org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:349)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:171)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)
at
org.apache.flink.formats.parquet.ParquetBulkWriter.finish(ParquetBulkWriter.java:62)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:60)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:276)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:202)
at
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at
org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)

This exception is always throwed after a warning:
warn [ContinuousFileReaderOperator] not processing any records while closed


I would supposed that problem is in my file sink, but the same file sink
works for ParquetRowInputFormat.
Did I miss something?

Thank you!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: DSL for Flink CEP

2021-06-02 Thread Dipanjan Mazumder
 Hi Fabian,
     Understood but is there any plan to grow the flink  CEP and build a 
friendly DSL around flink CEP by any chance.
RegardsDipanjan
On Wednesday, June 2, 2021, 03:22:46 PM GMT+5:30, Fabian Paul 
 wrote:  
 
 Hi Dipanjan,
Unfortunately, I have no experience with Siddhi but I am not aware of any 
official joined efforts between Flink and Siddhi.I can imagine that not all 
Siddhi CEP expressions are compatible with Flink’s CEP. At the moment there is 
also no activedevelopment for Flink’s CEP. 
I think to get a better understanding what the caveats are of the third party 
solution you have to directly reach out to the maintainers.
Best,Fabian


On 2. Jun 2021, at 08:37, Dipanjan Mazumder  wrote:
Hi ,
   I am currently using Siddhi CEP with flink , but the flink-siddhi library 
has limited support for flnk versions and i will either need to fix the library 
or get tied to a fix version of Flink to use th library.
 I am looking at Flink CEP as an option , and also came across a Flink CEP DSL 
library (https://github.com/phil3k3/flink-cep-dsl) , but i am not sure about 
the acceptance for the same by the Flink community and DEV. Also is Flink CEP 
supporting the Siddhi CEP constructs and is rich on the same aspect.
Please let me know the same , so that i can take a cautious decision on the 
same.
RegardsDipanjan

  

flink 1.12 on yarn WebUI不显示logs

2021-06-02 Thread smq
大家好
   
目前在测试1.12.1版本时遇到个问题,提交程序之后在WebUI查看日志,jobmanager和taskmanager 
logs都不显示,按f12查看,logs 是404,并响应 errors :this file does not exist in Jobmanager 
log dir
  这是在启动过程中没有创建日志吗,不清楚什么原因导致的。

Re: DSL for Flink CEP

2021-06-02 Thread Fabian Paul
Hi Dipanjan,

Unfortunately, I have no experience with Siddhi but I am not aware of any 
official joined efforts between Flink and Siddhi.
I can imagine that not all Siddhi CEP expressions are compatible with Flink’s 
CEP. At the moment there is also no active
development for Flink’s CEP. 

I think to get a better understanding what the caveats are of the third party 
solution you have to directly reach out to the 
maintainers.

Best,
Fabian


> On 2. Jun 2021, at 08:37, Dipanjan Mazumder  wrote:
> 
> Hi ,
> 
>I am currently using Siddhi CEP with flink , but the flink-siddhi library 
> has limited support for flnk versions and i will either need to fix the 
> library or get tied to a fix version of Flink to use th library.
> 
>  I am looking at Flink CEP as an option , and also came across a Flink CEP 
> DSL library (https://github.com/phil3k3/flink-cep-dsl) , but i am not sure 
> about the acceptance for the same by the Flink community and DEV. Also is 
> Flink CEP supporting the Siddhi CEP constructs and is rich on the same aspect.
> 
> Please let me know the same , so that i can take a cautious decision on the 
> same.
> 
> Regards
> Dipanjan



Re: Re: Flink SQL 1.11.3问题请教

2021-06-02 Thread yinghua...@163.com
我这个情况还有点不一样的,本来单条数据是如下的:一条数据对应一个offset
 {"name":"test1"}
但是Nifi采集数据后,写入kafka格式是下面这样的,一个offset对应下面几条数据(每一个offset对应的真实数据条数还不是固定的)
 {"name":"test1"}
 {"name":"test2"}
 {"name":"test3"}
...

感谢你的回复,我借鉴下看怎么处理下,多谢了!




yinghua...@163.com
 
发件人: WeiXubin
发送时间: 2021-06-02 17:44
收件人: user-zh
主题: Re: Flink SQL 1.11.3问题请教
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 
[{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL
编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到
sink。
 
Row row = new Row(arity);
collect(row);
 
具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/
 
Best,Weixubin
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于 flinksql 维表的问题

2021-06-02 Thread WeiXubin
你好,可以麻烦详细描述一下吗? 谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL 1.11.3问题请教

2021-06-02 Thread WeiXubin
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 
[{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL
编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到
sink。

Row row = new Row(arity);
collect(row);

具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/

Best,Weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink自定义connector相关报错

2021-06-02 Thread chenlei677
你好,还有完整信息么?一般逻辑是,先找文件,然后匹配属性。麻烦将完整日志输出出来看看。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: avro-confluent supports authentication enabled schema registry

2021-06-02 Thread Fabian Paul
Hi Tao,

Thanks for reaching out. Have you tried the following

 'value.avro-confluent.schema-registry.url' = 
'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
 


It may be possible to provide basic HTTP authentication by adding your username 
and password to the URL. There is already a similar ticket open unfortunately 
without much progress. [1]
Please let me know if this works for  you otherwise we can try to find a 
different solution.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-22763


> On 2. Jun 2021, at 10:58, tao xiao  wrote:
> 
> Hi team,
> 
> Confluent schema registry supports HTTP basic authentication[1] but I don't 
> find the corresponding configs in Flink documentation[2]. Is this achievable 
> in Flink avro-confluent?
> 
> 
> [1]https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms
>  
> 
> [2]https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options
>  
> --
>  
> Regards,
> Tao



回复: flink自定义connector相关报错

2021-06-02 Thread MOBIN


Sorry, 工程下路径是没错的,是我发邮件时打错了,还可能是什么原因导致的呢?谢谢
src/main/resources/META-INF/services
| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月2日 17:05,Leonard Xu 写道:
路径错了

在 2021年6月2日,17:02,MOBIN <18814118...@163.com> 写道:

META-INF.services/org.apache.flink.table.factories.Factory

=>   META-INF/services/org.apache.flink.table.factories.Factory

祝好
Leonard

Re: flink自定义connector相关报错

2021-06-02 Thread Leonard Xu
路径错了

> 在 2021年6月2日,17:02,MOBIN <18814118...@163.com> 写道:
> 
> META-INF.services/org.apache.flink.table.factories.Factory

=>   META-INF/services/org.apache.flink.table.factories.Factory

祝好
Leonard

flink自定义connector相关报错

2021-06-02 Thread MOBIN


请教下,在自定义connector,IDEA上直接运demo时报了以下的错误:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' 
in
the classpath.


工程的resource目录也准确引入了META-INF.services/org.apache.flink.table.factories.Factory,但是感觉没生效一样
Tabel-common依赖也已经引入了
谢谢
| |
MOBIN
|
签名由网易邮箱大师定制



avro-confluent supports authentication enabled schema registry

2021-06-02 Thread tao xiao
Hi team,

Confluent schema registry supports HTTP basic authentication[1] but I don't
find the corresponding configs in Flink documentation[2]. Is this
achievable in Flink avro-confluent?


[1]
https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options
-- 
Regards,
Tao


Re: pyflink kafka connector报错ByteArrayDeserializer

2021-06-02 Thread qianhuan
是不是connector版本问题,之前1.12.2可以跑,有没有大神帮忙看下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-02 Thread smq
请问你是怎么解决这个问题的





-- 原始邮件 --
发件人: r pp http://apache-flink.147419.n8.nabble.com/



-- 
Best,
 pp

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 Thread yujianbo
Hi,

确认的情况:
 
大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。

配置:
   idleStateRetention确实是设置3600秒,保留的ck目录是3个。
目前情况:
 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。
 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。
 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。

那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度?
JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白)





Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1]
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <[hidden email]>
Sent: Tuesday, June 1, 2021 10:51
To: [hidden email] <[hidden email]>
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-02 Thread smq
你的意思是在log4j.properties中的配置吗,我门在这个里边配置了生成日志文件的格式,是在安装节点里加的,不过这个应该不是在webui里显示的。奇怪的一点是我们组有别的程序是正常的,但是一部分在webUI不显示日志。我们目前是从1.10升级到1.12,这种情况在1.12出现的





-- 原始邮件 --
发件人: r pp http://apache-flink.147419.n8.nabble.com/



-- 
Best,
 pp

pyflink kafka connector报错ByteArrayDeserializer

2021-06-02 Thread qianhuan
版本:
python 3.8
apache-flink   1.13.1
apache-flink-libraries 1.13.1

代码:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=env_settings)
t_env.get_config().get_configuration().\
set_string("pipeline.jars",
"file:///root/flink/jars/flink-connector-kafka_2.11-1.13.1.jar")

kafka_source_ddl = f"""
CREATE TABLE kafka_source_table(
a VARCHAR
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'test5',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json'
)
"""

kafka_sink_ddl = f"""
CREATE TABLE kafka_sink_table(
b VARCHAR
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'test6',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'format.type' = 'json'
)
"""

t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(kafka_sink_ddl)
print("all_tables", t_env.list_tables())
t_env.sql_query("SELECT a FROM kafka_source_table") \
.execute_insert("kafka_sink_table").wait()

if __name__ == '__main__':
log_processing()

报错:
py4j.protocol.Py4JJavaError: An error occurred while calling
o65.executeInsert.
: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSource.createKafkaConsumer(KafkaTableSource.java:106)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getKafkaConsumer(KafkaTableSourceBase.java:293)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:194)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacyTableSourceScan.translateToPlanInternal(CommonExecLegacyTableSourceScan.java:94)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:172)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:112)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at

Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-02 Thread r pp
嗨~  你们有没有改日志文件的名字

smq <374060...@qq.com> 于2021年6月2日周三 下午12:24写道:

> 你这个解决了吗,我也遇到了同样的问题
>
>
>
>
>
> -- 原始邮件 --
> 发件人: todd  发送时间: 2021年4月14日 19:11
> 收件人: user-zh  主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>
>
>
>  yarn上只有.out,.error的日志信息,但是从flink web ui的log框,无法显示日志内容。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best,
  pp


DSL for Flink CEP

2021-06-02 Thread Dipanjan Mazumder
Hi ,
   I am currently using Siddhi CEP with flink , but the flink-siddhi library 
has limited support for flnk versions and i will either need to fix the library 
or get tied to a fix version of Flink to use th library.
 I am looking at Flink CEP as an option , and also came across a Flink CEP DSL 
library (https://github.com/phil3k3/flink-cep-dsl) , but i am not sure about 
the acceptance for the same by the Flink community and DEV. Also is Flink CEP 
supporting the Siddhi CEP constructs and is rich on the same aspect.
Please let me know the same , so that i can take a cautious decision on the 
same.
RegardsDipanjan

flink sql cli 模式下,flink-conf.yaml 配置checkpoint无法生效

2021-06-02 Thread guozhi mang
各位好,我在flink1.13版本
的flink配置文件里配置checkpoint和savepoint参数时,相关配置项并没有生效,现将我的配置文件信息和日志放在下文。

*配置文件*
#==
# Fault tolerance and checkpointing
#==

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# .
#

# Directory for checkpoints filesystem, when using any of the default
bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
execution.checkpointing.interval: 1
state.backend: filesystem
state.checkpoints.dir: file:///opt/xxx/flink-1.13.0/savepoint/checkpoints
state.savepoints.dir: file:///opt/xxx/flink-1.13.0/savepoint/savepoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
state.backend.incremental: false

# The failover strategy, i.e., how the job computation recovers from task
failures.
# Only restart tasks that may have been affected by the task failure, which
typically includes
# downstream tasks and potentially upstream tasks if their produced data is
no longer available for consumption.

jobmanager.execution.failover-strategy: region

#==

*服务器log日志:*

flink-root-sql-client-xxx03.log


2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.checkpointing.interval, 1
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.backend, filesystem
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.checkpoints.dir,
file:///opt/xxx/flink-1.13.0/savepoint/checkpoints
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.savepoints.dir,
file:///opt/xxx/flink-1.13.0/savepoint/savepoints
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.backend.incremental, false
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.execution.failover-strategy, region
2021-06-02 11:29:57,567 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: rest.bind-port, 8086
2021-06-02 11:29:57,596 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli
   [] - Found Yarn properties file under
/tmp/.yarn-properties-root.
2021-06-02 11:29:57,998 INFO
 org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
'execution.restart-strategy.type' not specified. Using default value:
fallback
2021-06-02 11:29:58,028 INFO
 org.apache.flink.table.client.gateway.context.DefaultContext [] - Executor
config: {execution.savepoint.ignore-unclaimed-state=false,
execution.attached=true, yarn.application.id=xxx,
execution.shutdown-on-attached-exit=false,
pipeline.jars=[file:/opt/xxx/flink-1.13.0/opt/flink-sql-client_2.11-1.13.0.jar],
high-availability.cluster-id=application_1620482572059_3697,
pipeline.classpaths=[], execution.target=yarn-session,
$internal.deployment.config-dir=/opt/xxx/flink-1.13.0/conf}
2021-06-02 11:30:02,947 WARN
 org.apache.flink.yarn.configuration.YarnLogConfigUtil[] - The
configuration directory ('/opt/xxx/flink-1.13.0/conf') already contains a
LOG4J config file.If you want to use logback, then please delete or rename
the log configuration file.
2021-06-02 11:30:02,986 WARN  org.apache.flink.runtime.util.HadoopUtils
   [] - Could not find Hadoop configuration via any of the
supported methods (Flink configuration, environment variables).
2021-06-02 11:30:03,391 INFO  org.apache.flink.yarn.YarnClusterDescriptor
   [] - No path for the flink jar passed. Using the location of
class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-06-02 11:30:03,393 WARN  org.apache.flink.yarn.YarnClusterDescriptor
   [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR
environment variable is set.The Flink YARN Client needs one of these to be
set to properly load the Hadoop configuration for accessing YARN.
2021-06-02 11:30:03,445 INFO
 org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] -
Failing over to rm236
2021-06-02 11:30:03,502 INFO  org.apache.flink.yarn.YarnClusterDescriptor
   [] - Found Web Interface xxx:54194 of application 'xxx


*通过web ui观察到的日志:*

2021-05-28 15:32:30,673 INFO