Re: is there any detrimental side-effect if i set the max parallelism as 32768

2023-03-13 Thread Tony Wei
Hi Hangxiang, David,

Thank you for your replies. Your responses are very helpful.

Best regards,
Tony Wei

David Anderson  於 2023年3月14日 週二 下午12:12寫道:

> I believe there is some noticeable overhead if you are using the
> heap-based state backend, but with RocksDB I think the difference is
> negligible.
>
> David
>
> On Tue, Mar 7, 2023 at 11:10 PM Hangxiang Yu  wrote:
> >
> > Hi, Tony.
> > "be detrimental to performance" means that some extra space overhead of
> the field of the key-group may influence performance.
> > As we know, Flink will write the key group as the prefix of the key to
> speed up rescaling.
> > So the format will be like: key group | key len | key | ..
> > You could check the relationship between max parallelism and bytes of
> key group as below:
> > --
> > max parallelism   bytes of key group
> >1281
> >   32768 2
> > --
> > So I think the cost will be very small if the real key length >> 2 bytes.
> >
> > On Wed, Mar 8, 2023 at 1:06 PM Tony Wei  wrote:
> >>
> >> Hi experts,
> >>
> >>> Setting the maximum parallelism to a very large value can be
> detrimental to performance because some state backends have to keep
> internal data structures that scale with the number of key-groups (which
> are the internal implementation mechanism for rescalable state).
> >>>
> >>> Changing the maximum parallelism explicitly when recovery from
> original job will lead to state incompatibility.
> >>
> >>
> >> I read the section above from Flink official document [1], and I'm
> wondering what the detail is regarding to the side-effect.
> >>
> >> Suppose that I have a Flink SQL job with large state, large parallelism
> and using RocksDB as my state backend.
> >> I would like to set the max parallelism as 32768, so that I don't
> bother if the max parallelism can be divided by the parallelism whenever I
> want to scale my job,
> >> because the number of key groups will not differ too much between each
> subtask.
> >>
> >> I'm wondering if this is a good practice, because based on the official
> document it is not recommended actually.
> >> If possible, I would like to know the detail about this side-effect.
> Which state backend will have this issue? and Why?
> >> Please give me an advice. Thanks in advance.
> >>
> >> Best regards,
> >> Tony Wei
> >>
> >> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism
> >
> >
> >
> > --
> > Best,
> > Hangxiang.
>


Re: is there any detrimental side-effect if i set the max parallelism as 32768

2023-03-13 Thread David Anderson
I believe there is some noticeable overhead if you are using the
heap-based state backend, but with RocksDB I think the difference is
negligible.

David

On Tue, Mar 7, 2023 at 11:10 PM Hangxiang Yu  wrote:
>
> Hi, Tony.
> "be detrimental to performance" means that some extra space overhead of the 
> field of the key-group may influence performance.
> As we know, Flink will write the key group as the prefix of the key to speed 
> up rescaling.
> So the format will be like: key group | key len | key | ..
> You could check the relationship between max parallelism and bytes of key 
> group as below:
> --
> max parallelism   bytes of key group
>1281
>   32768 2
> --
> So I think the cost will be very small if the real key length >> 2 bytes.
>
> On Wed, Mar 8, 2023 at 1:06 PM Tony Wei  wrote:
>>
>> Hi experts,
>>
>>> Setting the maximum parallelism to a very large value can be detrimental to 
>>> performance because some state backends have to keep internal data 
>>> structures that scale with the number of key-groups (which are the internal 
>>> implementation mechanism for rescalable state).
>>>
>>> Changing the maximum parallelism explicitly when recovery from original job 
>>> will lead to state incompatibility.
>>
>>
>> I read the section above from Flink official document [1], and I'm wondering 
>> what the detail is regarding to the side-effect.
>>
>> Suppose that I have a Flink SQL job with large state, large parallelism and 
>> using RocksDB as my state backend.
>> I would like to set the max parallelism as 32768, so that I don't bother if 
>> the max parallelism can be divided by the parallelism whenever I want to 
>> scale my job,
>> because the number of key groups will not differ too much between each 
>> subtask.
>>
>> I'm wondering if this is a good practice, because based on the official 
>> document it is not recommended actually.
>> If possible, I would like to know the detail about this side-effect. Which 
>> state backend will have this issue? and Why?
>> Please give me an advice. Thanks in advance.
>>
>> Best regards,
>> Tony Wei
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism
>
>
>
> --
> Best,
> Hangxiang.


Re: Watermarks lagging behind events that generate them

2023-03-13 Thread David Anderson
Watermarks always follow the corresponding event(s). I'm not sure why
they were designed that way, but that is how they are implemented.
Windows maintain this contract by emitting all of their results before
forwarding the watermark that triggered the results.

David

On Mon, Mar 13, 2023 at 5:28 PM Shammon FY  wrote:
>
> Hi Alexis
>
> Do you use both event-time watermark generator and TimerService for 
> processing time in your job? Maybe you can try using event-time watermark 
> first.
>
> Best,
> Shammon.FY
>
> On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa 
>  wrote:
>>
>> Hello,
>>
>> I recently ran into a weird issue with a streaming job in Flink 1.16.1. One 
>> of my functions (KeyedProcessFunction) has been using processing time 
>> timers. I now want to execute the same job based on a historical data dump, 
>> so I had to adjust the logic to use event time timers in that case (and did 
>> not use BATCH execution mode). Since my data has a timestamp field, I 
>> implemented a custom WatermarkGenerator that always emits a watermark with 
>> that timestamp in the onEvent callback, and does nothing in the 
>> onPeriodicEmit callback.
>>
>> My problem is that, sometimes, the very first time my function calls 
>> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes 
>> some false triggers when the first watermark actually arrives.
>>
>> I would have expected that, if WatermarkGenerator.onEvent emits a watermark, 
>> it would be sent before the corresponding event, but maybe this is not 
>> always the case?
>>
>> In case it's relevant, a brief overview of my job's topology:
>>
>> Source1 -> Broadcast
>>
>> Source2 ->
>>   keyBy ->
>>   connect(Broadcast) ->
>>   process ->
>>   filter ->
>>   assignTimestampsAndWatermarks -> // newly added for historical data
>>   keyBy ->
>>   process // function that uses timers
>>
>> Regards,
>> Alexis.


Re: flink k8s 部署启动报错

2023-03-13 Thread Weihua Hu
_DIRTY.json

看下以这个结尾的文件,内容应该是一个 json,如果不是标准 json 说明数据已经异常了,可以尝试删除



Best,
Weihua


On Tue, Mar 14, 2023 at 11:23 AM Jason_H  wrote:

> 您好,
> 我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  回复的原邮件 
> | 发件人 | Weihua Hu |
> | 发送日期 | 2023年3月14日 10:39 |
> | 收件人 |  |
> | 主题 | Re: flink k8s 部署启动报错 |
> Hi,
>
> 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
> 可以参考文档[1],检查相关的 HA 路径,清理下异常数据
>
> 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path
>
> Best,
> Weihua
>
>
> On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:
>
> hi,大家好
> 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
> of globally-terminated jobs from JobResultStore
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> [?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
> JobResults of globally-terminated jobs from JobResultStore
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
> Caused by:
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
> at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
> line: 1, column: 0]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>


回复: flink k8s 部署启动报错

2023-03-13 Thread Jason_H
您好,
我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年3月14日 10:39 |
| 收件人 |  |
| 主题 | Re: flink k8s 部署启动报错 |
Hi,

看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据

另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path

Best,
Weihua


On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:

hi,大家好
请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
of globally-terminated jobs from JobResultStore
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
JobResults of globally-terminated jobs from JobResultStore
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
No content to map due to end-of-input
at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
line: 1, column: 0]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more


| |
Jason_H
|
|
hyb_he...@163.com
|


回复: flink k8s 部署启动报错

2023-03-13 Thread Jason_H
您好,
对的,之前是正常启动的,突然失败了,然后我直接重启pod,就一直报这个错了。


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年3月14日 10:39 |
| 收件人 |  |
| 主题 | Re: flink k8s 部署启动报错 |
Hi,

看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据

另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path

Best,
Weihua


On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:

hi,大家好
请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
of globally-terminated jobs from JobResultStore
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
JobResults of globally-terminated jobs from JobResultStore
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
No content to map due to end-of-input
at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
line: 1, column: 0]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more


| |
Jason_H
|
|
hyb_he...@163.com
|


Re: flink k8s 部署启动报错

2023-03-13 Thread Weihua Hu
Hi,

看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据

另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path

Best,
Weihua


On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:

> hi,大家好
> 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
> of globally-terminated jobs from JobResultStore
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> [?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
> JobResults of globally-terminated jobs from JobResultStore
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>  at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
> line: 1, column: 0]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


Re:Re: Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-13 Thread guanyq
我昨天模拟下断电的情况
10个ha文件的日期是错开的5秒一个
chk-xxx也不是都损坏了,有的是可以启动的,这个我也试了 现在情况是 
yarn集群停电重启首先会循环尝试从10个ha的文件中启动应用,ha文件记录的chk的相关原数据 
1.如果ha文件都损坏了,即使chk没有损坏,flink应用也是拉不起来的

现在想的是让hdfs上存在至少1组个可用的的ha文件及其对应的chk 现在是5秒一个chk,保存了10个,也会出现损坏无法启动的问题 5秒*10 = 
50秒,也想知道多长时间的存档才能保证存在一组没有损坏ha和chk呢。














在 2023-03-14 10:16:48,"Guojun Li"  写道:
>Hi
>
>确认一下这些 ha 文件的 last modification time 是一致的还是错开的?
>
>另外,指定 chk- 恢复尝试了没有?可以恢复吗?
>
>Best,
>Guojun
>
>On Fri, Mar 10, 2023 at 11:56 AM guanyq  wrote:
>
>> flink ha路径为 /tmp/flink/ha/
>> flink chk路径为 /tmp/flink/checkpoint
>>
>>
>> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>>
>>
>>
>>
>> 会尝试从10个chk恢复,日志有打印
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>>
>>
>>
>> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
>> - Starting the SlotManager.
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Successfully ran initialization on master in 0 ms.
>> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
>> Initialized ZooKeeperCompletedCheckpointStore in
>> '/checkpoints/3844b96b002601d932e66233dd46899c'.
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Using application-defined state backend: File State Backend (checkpoints:
>> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
>> fileStateThreshold: -1)
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Configuring application-defined state backend with job/cluster config
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
>> error constructing remote block reader.
>> java.io.IOException: Got error, status message opReadBlock
>> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
>> received exception
>> 

Re: Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-13 Thread Guojun Li
Hi

确认一下这些 ha 文件的 last modification time 是一致的还是错开的?

另外,指定 chk- 恢复尝试了没有?可以恢复吗?

Best,
Guojun

On Fri, Mar 10, 2023 at 11:56 AM guanyq  wrote:

> flink ha路径为 /tmp/flink/ha/
> flink chk路径为 /tmp/flink/checkpoint
>
>
> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>
>
>
>
> 会尝试从10个chk恢复,日志有打印
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
>
>
>
> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
> - Starting the SlotManager.
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Successfully ran initialization on master in 0 ms.
> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
> Initialized ZooKeeperCompletedCheckpointStore in
> '/checkpoints/3844b96b002601d932e66233dd46899c'.
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Using application-defined state backend: File State Backend (checkpoints:
> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
> fileStateThreshold: -1)
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Configuring application-defined state backend with job/cluster config
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
> error constructing remote block reader.
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> 

flink k8s 部署启动报错

2023-03-13 Thread Jason_H
hi,大家好
请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of 
globally-terminated jobs from JobResultStore
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve 
JobResults of globally-terminated jobs from JobResultStore
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
 ~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
 No content to map due to end-of-input
 at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, 
column: 0]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
 ~[flink-dist-1.15.2.jar:1.15.2]
... 4 more


| |
Jason_H
|
|
hyb_he...@163.com
|

Re: Are the Table API Connectors production ready?

2023-03-13 Thread yuxia
The plan shows the filters has been pushed down. But remeber, although pused 
down, the filesystem table won't accept the filter. So, it'll be still like 
scan 
all files. 

Best regards, 
Yuxia 


发件人: "Maryam Moafimadani"  
收件人: "Hang Ruan"  
抄送: "yuxia" , "ravi suryavanshi" 
, "Yaroslav Tkachenko" , 
"Shammon FY" , "User"  
发送时间: 星期一, 2023年 3 月 13日 下午 10:07:57 
主题: Re: Are the Table API Connectors production ready? 

Hi All, 
It's exciting to see file filtering in the plan for development. I am curious 
whether the following query on a filesystem connector would actually push down 
the filter on metadata `file.path`? 

Select score, `file.path` from MyUserTable WHERE `file.path` LIKE '%prefix_%' 

== Optimized Execution Plan == 
Calc(select=[score, file.path], where=[LIKE(file.path, '%2022070611284%')]) 
+- TableSourceScan(table=[[default_catalog, default_database, MyUserTable, 
filter=[LIKE(file.path, _UTF-16LE'%2022070611284%')]]], fields=[score, 
file.path]) 

Thanks, 
Maryam 

On Mon, Mar 13, 2023 at 8:55 AM Hang Ruan < [ mailto:ruanhang1...@gmail.com | 
ruanhang1...@gmail.com ] > wrote: 



Hi, yuxia, 
I would like to help to complete this task. 

Best, 
Hang 

yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] > 
于2023年3月13日周一 09:32写道: 

BQ_BEGIN

Yeah, you're right. We don't provide filtering files with patterns. And 
actually we had already a jira[1] for it. 
I was intended to do this in the past, but don't have much time. Anyone who are 
insterested can take it over. We're 
happy to help review. 

[1] [ https://issues.apache.org/jira/browse/FLINK-17398 | 
https://issues.apache.org/jira/browse/FLINK-17398 ] 

Best regards, 
Yuxia 


发件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
收件人: "Yaroslav Tkachenko" < [ mailto:yaros...@goldsky.com | 
yaros...@goldsky.com ] >, "Shammon FY" < [ mailto:zjur...@gmail.com | 
zjur...@gmail.com ] > 
抄送: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
发送时间: 星期一, 2023年 3 月 13日 上午 12:36:46 
主题: Re: Are the Table API Connectors production ready? 

Thanks a lot, Yaroslav and Shammon. 
I want to use the Filesystem Connector. I tried it works well till it is 
running. If the job is restarted. It processes all the files again. 

Could not find the move or delete option after collecting the files. Also, I 
could not find the filtering using patterns. 

Pattern matching is required as different files exist in the same folder. 

Regards, 
Ravi 
On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY < [ 
mailto:zjur...@gmail.com | zjur...@gmail.com ] > wrote: 


Hi Ravi 

Agree with Yaroslav and if you find any problems in use, you can create an 
issue in jira [ 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK | 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK ] . I have 
used kafka/jdbc/hive in production too, they work well. 

Best, 
Shammon 

On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko < [ 
mailto:yaros...@goldsky.com | yaros...@goldsky.com ] > wrote: 

BQ_BEGIN

Hi Ravi, 

All of them should be production ready. I've personally used half of them in 
production. 

Do you have any specific concerns? 

On Thu, Mar 9, 2023 at 9:39 AM [ http://ravi_suryavanshi.yahoo.com/ | 
ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | 
user@flink.apache.org ] > wrote: 

BQ_BEGIN

Hi, 
Can anyone help me here? 

Thanks and regards, 
Ravi 

On Monday, 27 February, 2023 at 09:33:18 am IST, [ 
http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ 
mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: 


Hi Team, 


In Flink 1.16.0, we would like to use some of the Table API Connectors for 
production. Kindly let me know if the below connectors are production ready or 
only for testing purposes. 

Name Version Source Sink [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/
 | Filesystem ]  Bounded and Unbounded Scan, Lookup  Streaming 
Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/elasticsearch/
 | Elasticsearch ] 6.x & 7.x   Not supported   Streaming Sink, Batch 
Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/opensearch/
 | Opensearch ]   1.x & 2.x   Not supported   Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/
 | Apache Kafka ]  0.10+   Unbounded Scan  Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/dynamodb/
 | Amazon DynamoDB ]Not supported   Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kinesis/
 | Amazon Kinesis Data Streams ] Unbounded Scan  Streaming Sink 
[ 

Re: Are the Table API Connectors production ready?

2023-03-13 Thread yuxia
Thanks Hang for taking it. Assigned to you~ 

Best regards, 
Yuxia 


发件人: "Hang Ruan"  
收件人: "yuxia"  
抄送: "ravi suryavanshi" , "Yaroslav Tkachenko" 
, "Shammon FY" , "User" 
 
发送时间: 星期一, 2023年 3 月 13日 下午 8:54:49 
主题: Re: Are the Table API Connectors production ready? 

Hi, yuxia, 
I would like to help to complete this task. 

Best, 
Hang 

yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] > 
于2023年3月13日周一 09:32写道: 



Yeah, you're right. We don't provide filtering files with patterns. And 
actually we had already a jira[1] for it. 
I was intended to do this in the past, but don't have much time. Anyone who are 
insterested can take it over. We're 
happy to help review. 

[1] [ https://issues.apache.org/jira/browse/FLINK-17398 | 
https://issues.apache.org/jira/browse/FLINK-17398 ] 

Best regards, 
Yuxia 


发件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
收件人: "Yaroslav Tkachenko" < [ mailto:yaros...@goldsky.com | 
yaros...@goldsky.com ] >, "Shammon FY" < [ mailto:zjur...@gmail.com | 
zjur...@gmail.com ] > 
抄送: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
发送时间: 星期一, 2023年 3 月 13日 上午 12:36:46 
主题: Re: Are the Table API Connectors production ready? 

Thanks a lot, Yaroslav and Shammon. 
I want to use the Filesystem Connector. I tried it works well till it is 
running. If the job is restarted. It processes all the files again. 

Could not find the move or delete option after collecting the files. Also, I 
could not find the filtering using patterns. 

Pattern matching is required as different files exist in the same folder. 

Regards, 
Ravi 
On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY < [ 
mailto:zjur...@gmail.com | zjur...@gmail.com ] > wrote: 


Hi Ravi 

Agree with Yaroslav and if you find any problems in use, you can create an 
issue in jira [ 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK | 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK ] . I have 
used kafka/jdbc/hive in production too, they work well. 

Best, 
Shammon 

On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko < [ 
mailto:yaros...@goldsky.com | yaros...@goldsky.com ] > wrote: 

BQ_BEGIN

Hi Ravi, 

All of them should be production ready. I've personally used half of them in 
production. 

Do you have any specific concerns? 

On Thu, Mar 9, 2023 at 9:39 AM [ http://ravi_suryavanshi.yahoo.com/ | 
ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | 
user@flink.apache.org ] > wrote: 

BQ_BEGIN

Hi, 
Can anyone help me here? 

Thanks and regards, 
Ravi 

On Monday, 27 February, 2023 at 09:33:18 am IST, [ 
http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ 
mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: 


Hi Team, 


In Flink 1.16.0, we would like to use some of the Table API Connectors for 
production. Kindly let me know if the below connectors are production ready or 
only for testing purposes. 

Name Version Source Sink [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/
 | Filesystem ]  Bounded and Unbounded Scan, Lookup  Streaming 
Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/elasticsearch/
 | Elasticsearch ] 6.x & 7.x   Not supported   Streaming Sink, Batch 
Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/opensearch/
 | Opensearch ]   1.x & 2.x   Not supported   Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/
 | Apache Kafka ]  0.10+   Unbounded Scan  Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/dynamodb/
 | Amazon DynamoDB ]Not supported   Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kinesis/
 | Amazon Kinesis Data Streams ] Unbounded Scan  Streaming Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/firehose/
 | Amazon Kinesis Data Firehose ]   Not supported   Streaming Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/jdbc/
 | JDBC ]   Bounded Scan, LookupStreaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hbase/
 | Apache HBase ]  1.4.x & 2.2.x   Bounded Scan, LookupStreaming Sink, 
Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/overview/
 | Apache Hive ] 

Thanks and regards 




BQ_END


BQ_END




Re: Watermarks lagging behind events that generate them

2023-03-13 Thread Shammon FY
Hi Alexis

Do you use both event-time watermark generator and TimerService for
processing time in your job? Maybe you can try using event-time watermark
first.

Best,
Shammon.FY

On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> I recently ran into a weird issue with a streaming job in Flink 1.16.1.
> One of my functions (KeyedProcessFunction) has been using processing time
> timers. I now want to execute the same job based on a historical data dump,
> so I had to adjust the logic to use event time timers in that case (and did
> *not* use BATCH execution mode). Since my data has a timestamp field, I
> implemented a custom WatermarkGenerator that always emits a watermark with
> that timestamp in the onEvent callback, and does nothing in the
> onPeriodicEmit callback.
>
> My problem is that, sometimes, the very first time my function calls
> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
> some false triggers when the first watermark actually arrives.
>
> I would have expected that, if WatermarkGenerator.onEvent emits a
> watermark, it would be sent before the corresponding event, but maybe this
> is not always the case?
>
> In case it's relevant, a brief overview of my job's topology:
>
> Source1 -> Broadcast
>
> Source2 ->
>   keyBy ->
>   connect(Broadcast) ->
>   process ->
>   filter ->
>   assignTimestampsAndWatermarks -> // newly added for historical data
>   keyBy ->
>   process // function that uses timers
>
> Regards,
> Alexis.
>


Re: flink avro schema 升级变动,job如何平滑过渡

2023-03-13 Thread Shammon FY
Hi

从错误上看应该是schema跟数据不匹配导致导致的,看起来目前avro不支持这种schema变更新老数据一起处理

Best,
Shammon.FY


On Fri, Mar 10, 2023 at 2:29 PM Peihui He  wrote:

> java.io.IOException: Failed to deserialize Avro record.
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
>
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: java.io.EOFException
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:851)
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .BinaryDecoder.doReadBytes(BinaryDecoder.java:373)
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .BinaryDecoder.readString(BinaryDecoder.java:290)
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .ResolvingDecoder.readString(ResolvingDecoder.java:208)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
>
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> ... 9 more
>
>
> 如上,
> 比如
> 之前的schemal 是
> {
> a,
> b
> }
>
> 后来调整为
> {
> a,
> b,
> c
> }
>
> 当程序升级后,由于kafka中同时包含新旧数据,就会报错了
>
> Shammon FY  于2023年2月24日周五 18:56写道:
>
> > Hi
> >
> > 你可以贴一下错误看下具体原因
> >
> > Best,
> > Shammon
> >
> > On Fri, Feb 24, 2023 at 6:10 PM Peihui He  wrote:
> >
> > > Hi, all
> > >
> > > 请教大家有没有遇到这样的情况,flink 使用avro
> > > 消费kafka中数据,后来在schema结构中加入新的字段。在灰度过程中会混杂着新老数据,这样的flink 消费程序就会挂掉。
> > >
> > > 大家一般是怎么处理的呢
> > >
> > > Best Wishes.
> > >
> >
>


Re: [EXT] Re: Kubernetes operator set container resources and limits

2023-03-13 Thread Evgeniy Lyutikov
Thanks, this looks like what I need


kubernetes.jobmanager.memory.limit-factor: '1.5'

resources:
  limits:
cpu: "1"
memory: 4608Mi
  requests:
cpu: "1"
memory: 3Gi




От: Jasmin Redzepovic 
Отправлено: 13 марта 2023 г. 19:31:07
Кому: Andrew Otto; Evgeniy Lyutikov
Копия: user@flink.apache.org
Тема: Re: [EXT] Re: Kubernetes operator set container resources and limits

Hi,

You can set the following properties in the flinkConfiguration inside your 
.yaml file:

  *   kubernetes.jobmanager.cpu.limit-factor

-  kubernetes.jobmanager.memory.limit-factor
-  kubernetes.taskmanager.cpu.limit-factor
-  kubernetes.taskmanager.memory.limit-factor
Example:
jobManager:
replicas: 1
resource:
  memory: "1000m"
  cpu: 0.2
  taskManager:
resource:
  memory: "1000m"
  cpu: 0.3

flinkConfiguration:

taskmanager.numberOfTaskSlots: "1"

kubernetes.jobmanager.cpu.limit-factor: "2"

kubernetes.jobmanager.memory.limit-factor: "1.2"

kubernetes.taskmanager.cpu.limit-factor: "2"

kubernetes.taskmanager.memory.limit-factor: "1.5"

Limits for jobmanager would then be:

  *   cpu: 0.4
  *   memory: 1200

Hope this helps,
Jasmin


From: Andrew Otto 
Date: Monday, 13 March 2023 at 11:51
To: Evgeniy Lyutikov 
Cc: user@flink.apache.org 
Subject: [EXT] Re: Kubernetes operator set container resources and limits

[CAUTION] This email comes from an external organization. Do not click links or 
open attachments unless you recognize the sender and know the content is safe.
Hi,

> return to the same values from jobManager.resource FlinkDeployment manifest 
> parameter
I believe this is the correct way; using 
jobManager.resources
 and 
taskManager.resources
 in the 
FlinkDeployment.

Is there a reason you can't change the resources values there?  I don't think 
you should need to do so with podTemplate.



On Mon, Mar 13, 2023 at 9:56 AM Evgeniy Lyutikov 
mailto:eblyuti...@avito.ru>> wrote:
Hi all
Is there any way to specify different values for resources and limits for a 
jobmanager container?
The problem is that sometimes kubernetes kills the jobmanager container because 
it exceeds the memory consumption.


Last State: Terminated
  Reason:   OOMKilled
  Exit Code:137
  Started:  Tue, 07 Mar 2023 18:06:01 +0700
  Finished: Fri, 10 Mar 2023 23:20:54 +0700

What I tried to do:
1. added the 'jobmanager.memory.process.size' parameter to flinkConfiguration 
with a value less than the allocated resources for the container, but after 
launch, the value of this parameter is set to the amount of memory allocated to 
the container.
2. I tried to set resources and limits through the jobmanager pod template, but 
for the running container, the values again return to the same values from 
jobManager.resource FlinkDeployment manifest parameter

Kubernetes operator 1.2.0 and Flink 1.14.4




“This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное 

Re: Are the Table API Connectors production ready?

2023-03-13 Thread Maryam Moafimadani via user
Hi All,
It's exciting to see file filtering in the plan for development. I am
curious whether the following query on a filesystem connector would
actually push down the filter on metadata `file.path`?

*Select score, `file.path` from MyUserTable WHERE `file.path` LIKE
'%prefix_%' *

== Optimized Execution Plan ==
Calc(select=[score, file.path], where=[LIKE(file.path, '%2022070611284%')])
+- TableSourceScan(table=[[default_catalog, default_database, MyUserTable,
filter=[LIKE(file.path, _UTF-16LE'%2022070611284%')]]], fields=[score,
file.path])

Thanks,
Maryam

On Mon, Mar 13, 2023 at 8:55 AM Hang Ruan  wrote:

> Hi, yuxia,
> I would like to help to complete this task.
>
> Best,
> Hang
>
> yuxia  于2023年3月13日周一 09:32写道:
>
>> Yeah, you're right. We don't provide filtering files with patterns. And
>> actually we had already a jira[1] for it.
>> I was intended to do this in the past, but don't have much time.  Anyone
>> who are insterested can take it over. We're
>> happy to help review.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17398
>>
>> Best regards,
>> Yuxia
>>
>> --
>> *发件人: *"User" 
>> *收件人: *"Yaroslav Tkachenko" , "Shammon FY" <
>> zjur...@gmail.com>
>> *抄送: *"User" 
>> *发送时间: *星期一, 2023年 3 月 13日 上午 12:36:46
>> *主题: *Re: Are the Table API Connectors production ready?
>>
>> Thanks a lot, Yaroslav and Shammon.
>> I want to use the Filesystem Connector.  I tried it works well till it is
>> running. If the job is restarted. It processes all the files again.
>>
>> Could not find the move or delete option after collecting the files.
>> Also, I could not find the filtering using patterns.
>>
>> Pattern matching is required as different files exist in the same folder.
>>
>> Regards,
>> Ravi
>> On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY <
>> zjur...@gmail.com> wrote:
>>
>>
>> Hi Ravi
>>
>> Agree with Yaroslav and if you find any problems in use, you can create
>> an issue in jira
>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK . I
>> have used kafka/jdbc/hive in production too, they work well.
>>
>> Best,
>> Shammon
>>
>> On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko 
>> wrote:
>>
>> Hi Ravi,
>>
>> All of them should be production ready. I've personally used half of them
>> in production.
>>
>> Do you have any specific concerns?
>>
>> On Thu, Mar 9, 2023 at 9:39 AM ravi_suryavanshi.yahoo.com via user <
>> user@flink.apache.org> wrote:
>>
>> Hi,
>> Can anyone help me here?
>>
>> Thanks and regards,
>> Ravi
>>
>> On Monday, 27 February, 2023 at 09:33:18 am IST,
>> ravi_suryavanshi.yahoo.com via user  wrote:
>>
>>
>> Hi Team,
>>
>>
>> In Flink 1.16.0, we would like to use some of the Table API Connectors
>> for production. Kindly let me know if the below connectors are production
>> ready or only for testing purposes.
>>
>> NameVersionSourceSink
>> Filesystem
>> 
>>  Bounded
>> and Unbounded Scan, Lookup Streaming Sink, Batch Sink
>> Elasticsearch
>> 
>>  6.x
>> & 7.x Not supported Streaming Sink, Batch Sink
>> Opensearch
>> 
>>  1.x
>> & 2.x Not supported Streaming Sink, Batch Sink
>> Apache Kafka
>> 
>> 0.10+ Unbounded Scan Streaming Sink, Batch Sink
>> Amazon DynamoDB
>> 
>>  Not
>> supported Streaming Sink, Batch Sink
>> Amazon Kinesis Data Streams
>> 
>>  Unbounded
>> Scan Streaming Sink
>> Amazon Kinesis Data Firehose
>> 
>>  Not
>> supported Streaming Sink
>> JDBC
>> 
>>  Bounded
>> Scan, Lookup Streaming Sink, Batch Sink
>> Apache HBase
>> 
>>  1.4.x
>> & 2.2.x Bounded Scan, Lookup Streaming Sink, Batch Sink
>> Apache Hive
>> 
>>
>> Thanks and regards
>>
>>
>>

-- 
Maryam Moafimadani
Senior Data Developer @Shopify 


Re: Kubernetes operator set container resources and limits

2023-03-13 Thread Evgeniy Lyutikov
I want to set different values for resources and limits.



От: Andrew Otto 
Отправлено: 13 марта 2023 г. 17:50
Кому: Evgeniy Lyutikov
Копия: user@flink.apache.org
Тема: Re: Kubernetes operator set container resources and limits

Hi,

> return to the same values from jobManager.resource FlinkDeployment manifest 
> parameter
I believe this is the correct way; using 
jobManager.resources
 and 
taskManager.resources
 in the 
FlinkDeployment.

Is there a reason you can't change the resources values there?  I don't think 
you should need to do so with podTemplate.



On Mon, Mar 13, 2023 at 9:56 AM Evgeniy Lyutikov 
mailto:eblyuti...@avito.ru>> wrote:

Hi all
Is there any way to specify different values for resources and limits for a 
jobmanager container?
The problem is that sometimes kubernetes kills the jobmanager container because 
it exceeds the memory consumption.


Last State: Terminated
  Reason:   OOMKilled
  Exit Code:137
  Started:  Tue, 07 Mar 2023 18:06:01 +0700
  Finished: Fri, 10 Mar 2023 23:20:54 +0700


What I tried to do:
1. added the 'jobmanager.memory.process.size' parameter to flinkConfiguration 
with a value less than the allocated resources for the container, but after 
launch, the value of this parameter is set to the amount of memory allocated to 
the container.
2. I tried to set resources and limits through the jobmanager pod template, but 
for the running container, the values again return to the same values from 
jobManager.resource FlinkDeployment manifest parameter

Kubernetes operator 1.2.0 and Flink 1.14.4




“This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом.”


Re: Are the Table API Connectors production ready?

2023-03-13 Thread Hang Ruan
Hi, yuxia,
I would like to help to complete this task.

Best,
Hang

yuxia  于2023年3月13日周一 09:32写道:

> Yeah, you're right. We don't provide filtering files with patterns. And
> actually we had already a jira[1] for it.
> I was intended to do this in the past, but don't have much time.  Anyone
> who are insterested can take it over. We're
> happy to help review.
>
> [1] https://issues.apache.org/jira/browse/FLINK-17398
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"User" 
> *收件人: *"Yaroslav Tkachenko" , "Shammon FY" <
> zjur...@gmail.com>
> *抄送: *"User" 
> *发送时间: *星期一, 2023年 3 月 13日 上午 12:36:46
> *主题: *Re: Are the Table API Connectors production ready?
>
> Thanks a lot, Yaroslav and Shammon.
> I want to use the Filesystem Connector.  I tried it works well till it is
> running. If the job is restarted. It processes all the files again.
>
> Could not find the move or delete option after collecting the files. Also,
> I could not find the filtering using patterns.
>
> Pattern matching is required as different files exist in the same folder.
>
> Regards,
> Ravi
> On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY <
> zjur...@gmail.com> wrote:
>
>
> Hi Ravi
>
> Agree with Yaroslav and if you find any problems in use, you can create an
> issue in jira
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK . I have
> used kafka/jdbc/hive in production too, they work well.
>
> Best,
> Shammon
>
> On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko 
> wrote:
>
> Hi Ravi,
>
> All of them should be production ready. I've personally used half of them
> in production.
>
> Do you have any specific concerns?
>
> On Thu, Mar 9, 2023 at 9:39 AM ravi_suryavanshi.yahoo.com via user <
> user@flink.apache.org> wrote:
>
> Hi,
> Can anyone help me here?
>
> Thanks and regards,
> Ravi
>
> On Monday, 27 February, 2023 at 09:33:18 am IST,
> ravi_suryavanshi.yahoo.com via user  wrote:
>
>
> Hi Team,
>
>
> In Flink 1.16.0, we would like to use some of the Table API Connectors for
> production. Kindly let me know if the below connectors are production ready
> or only for testing purposes.
>
> NameVersionSourceSink
> Filesystem
> 
>  Bounded
> and Unbounded Scan, Lookup Streaming Sink, Batch Sink
> Elasticsearch
> 
>  6.x
> & 7.x Not supported Streaming Sink, Batch Sink
> Opensearch
> 
>  1.x
> & 2.x Not supported Streaming Sink, Batch Sink
> Apache Kafka
> 
> 0.10+ Unbounded Scan Streaming Sink, Batch Sink
> Amazon DynamoDB
> 
>  Not
> supported Streaming Sink, Batch Sink
> Amazon Kinesis Data Streams
> 
>  Unbounded
> Scan Streaming Sink
> Amazon Kinesis Data Firehose
> 
>  Not
> supported Streaming Sink
> JDBC
> 
>  Bounded
> Scan, Lookup Streaming Sink, Batch Sink
> Apache HBase
> 
>  1.4.x
> & 2.2.x Bounded Scan, Lookup Streaming Sink, Batch Sink
> Apache Hive
> 
>
> Thanks and regards
>
>
>


回复: Flink-Sql Watermarkers问题

2023-03-13 Thread 吴先生
好的感谢,我关注下


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年3月13日 18:49 |
| 收件人 |  |
| 主题 | Re: Flink-Sql Watermarkers问题 |
Hi

目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下
https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL

Best,
Shammon.FY

On Mon, Mar 13, 2023 at 6:29 PM 吴先生 <15951914...@163.com> wrote:

hi,
我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线


| |
吴先生
|
|
15951914...@163.com
|


Re: GenericRowData与BinaryRowData的转换

2023-03-13 Thread Benchao Li
Hi zilong,

应该是没有内置的方法直接进行转换的,如果有需要,还是需要自己根据schema做一遍读取和写入。

另外,在FLINK-24403[1] 中加强了对于复杂类型的print能力,可以直接把他们cast成string来打印。

[1] https://issues.apache.org/jira/browse/FLINK-24403

zilong xiao  于2023年3月13日周一 16:22写道:

> hi, benchao, 想问下有什么办法可以将BinaryRowData转成GenericRowData吗?我们业务场景需要对RowData
> toString,BinaryRowData没有实现该方法QQAQ
>
> Benchao Li  于2021年4月9日周五 10:42写道:
>
> > GenericRowData和BinaryRowData都是RowData这个接口的具体实现。
> > 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。
> >
> > 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData,
> > 比如典型的就是序列化的时候都会按照BinaryRowData来序列化。
> >
> > Luna Wong  于2021年4月8日周四 下午7:36写道:
> >
> > > 我看Kafka Connector源码生成的是GenericRowData,到Jdbc
> > > Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: GenericRowData与BinaryRowData的转换

2023-03-13 Thread Shammon FY
Hi

你可以考虑将field数据从BinaryRowData中读取出来再转换成string试试

Best,
Shammon.FY

On Mon, Mar 13, 2023 at 4:21 PM zilong xiao  wrote:

> hi, benchao, 想问下有什么办法可以将BinaryRowData转成GenericRowData吗?我们业务场景需要对RowData
> toString,BinaryRowData没有实现该方法QQAQ
>
> Benchao Li  于2021年4月9日周五 10:42写道:
>
> > GenericRowData和BinaryRowData都是RowData这个接口的具体实现。
> > 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。
> >
> > 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData,
> > 比如典型的就是序列化的时候都会按照BinaryRowData来序列化。
> >
> > Luna Wong  于2021年4月8日周四 下午7:36写道:
> >
> > > 我看Kafka Connector源码生成的是GenericRowData,到Jdbc
> > > Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


Re: Kubernetes operator set container resources and limits

2023-03-13 Thread Andrew Otto
Hi,

> return to the same values from jobManager.resource
FlinkDeployment manifest parameter
I believe this is the correct way; using jobManager.resources

and taskManager.resources

in
the FlinkDeployment

.

Is there a reason you can't change the resources values there?  I don't
think you should need to do so with podTemplate.



On Mon, Mar 13, 2023 at 9:56 AM Evgeniy Lyutikov 
wrote:

> Hi all
> Is there any way to specify different values for resources and limits for
> a jobmanager container?
> The problem is that sometimes kubernetes kills the jobmanager container
> because it exceeds the memory consumption.
>
>
> Last State: Terminated
>   Reason:   OOMKilled
>   Exit Code:137
>   Started:  Tue, 07 Mar 2023 18:06:01 +0700
>   Finished: Fri, 10 Mar 2023 23:20:54 +0700
>
> What I tried to do:
> 1. added the 'jobmanager.memory.process.size' parameter to
> flinkConfiguration with a value less than the allocated resources for the
> container, but after launch, the value of this parameter is set to the
> amount of memory allocated to the container.
> 2. I tried to set resources and limits through the jobmanager pod template,
> but for the running container, the values again return to the same values
> from jobManager.resource FlinkDeployment manifest parameter
>
> Kubernetes operator 1.2.0 and Flink 1.14.4
>
>
>
> * -- *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


Re: Flink-Sql Watermarkers问题

2023-03-13 Thread Shammon FY
Hi

目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下
https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL

Best,
Shammon.FY

On Mon, Mar 13, 2023 at 6:29 PM 吴先生 <15951914...@163.com> wrote:

> hi,
> 我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |


Flink-Sql Watermarkers问题

2023-03-13 Thread 吴先生
hi,
我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线


| |
吴先生
|
|
15951914...@163.com
|

Kubernetes operator set container resources and limits

2023-03-13 Thread Evgeniy Lyutikov
Hi all
Is there any way to specify different values for resources and limits for a 
jobmanager container?
The problem is that sometimes kubernetes kills the jobmanager container because 
it exceeds the memory consumption.


Last State: Terminated
  Reason:   OOMKilled
  Exit Code:137
  Started:  Tue, 07 Mar 2023 18:06:01 +0700
  Finished: Fri, 10 Mar 2023 23:20:54 +0700


What I tried to do:
1. added the 'jobmanager.memory.process.size' parameter to flinkConfiguration 
with a value less than the allocated resources for the container, but after 
launch, the value of this parameter is set to the amount of memory allocated to 
the container.
2. I tried to set resources and limits through the jobmanager pod template, but 
for the running container, the values again return to the same values from 
jobManager.resource FlinkDeployment manifest parameter

Kubernetes operator 1.2.0 and Flink 1.14.4




"This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом."


Re:Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread guanyq



理解了,非常感谢。








在 2023-03-13 16:57:18,"Weihua Hu"  写道:
>图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。
>
>YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
>控制,在这个时间内达到指定次数才会退出。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 4:27 PM guanyq  wrote:
>
>> 图片在附件
>> 但是实际却是超过了10次。。
>>
>>
>>
>>
>>
>>
>> 在 2023-03-13 15:39:39,"Weihua Hu"  写道:
>> >Hi,
>> >
>> >图片看不到了
>> >
>> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>> >
>> >Best,
>> >Weihua
>> >
>> >
>> >On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
>> >
>> >> flink1.10版本,flink配置如下
>> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> >> 请问appattempt_1678102326043_0006_000409
>> >> 
>> >> 每个序号不是代表一次尝试么
>> >>
>>
>>


Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread Weihua Hu
图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。

YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
控制,在这个时间内达到指定次数才会退出。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval

Best,
Weihua


On Mon, Mar 13, 2023 at 4:27 PM guanyq  wrote:

> 图片在附件
> 但是实际却是超过了10次。。
>
>
>
>
>
>
> 在 2023-03-13 15:39:39,"Weihua Hu"  写道:
> >Hi,
> >
> >图片看不到了
> >
> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
> >
> >Best,
> >Weihua
> >
> >
> >On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
> >
> >> flink1.10版本,flink配置如下
> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> >> 请问appattempt_1678102326043_0006_000409
> >> 
> >> 每个序号不是代表一次尝试么
> >>
>
>


Re:Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread guanyq
图片在附件
但是实际却是超过了10次。。
















在 2023-03-13 15:39:39,"Weihua Hu"  写道:
>Hi,
>
>图片看不到了
>
>按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
>
>> flink1.10版本,flink配置如下
>> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> 请问appattempt_1678102326043_0006_000409
>> 
>> 每个序号不是代表一次尝试么
>>


Re: GenericRowData与BinaryRowData的转换

2023-03-13 Thread zilong xiao
hi, benchao, 想问下有什么办法可以将BinaryRowData转成GenericRowData吗?我们业务场景需要对RowData
toString,BinaryRowData没有实现该方法QQAQ

Benchao Li  于2021年4月9日周五 10:42写道:

> GenericRowData和BinaryRowData都是RowData这个接口的具体实现。
> 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。
>
> 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData,
> 比如典型的就是序列化的时候都会按照BinaryRowData来序列化。
>
> Luna Wong  于2021年4月8日周四 下午7:36写道:
>
> > 我看Kafka Connector源码生成的是GenericRowData,到Jdbc
> > Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread Weihua Hu
Hi,

图片看不到了

按照这个配置,YARN 应该只会拉起 10 次 JobManager。

Best,
Weihua


On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:

> flink1.10版本,flink配置如下
> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> 请问appattempt_1678102326043_0006_000409
> 
> 每个序号不是代表一次尝试么
>


flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread guanyq
flink1.10版本,flink配置如下
yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
请问appattempt_1678102326043_0006_000409每个序号不是代表一次尝试么