Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Yangze Guo
Great work! Congratulations to everyone involved!

Best,
Yangze Guo

On Fri, Oct 27, 2023 at 10:23 AM Qingsheng Ren  wrote:
>
> Congratulations and big THANK YOU to everyone helping with this release!
>
> Best,
> Qingsheng
>
> On Fri, Oct 27, 2023 at 10:18 AM Benchao Li  wrote:
>>
>> Great work, thanks everyone involved!
>>
>> Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
>> >
>> > Thanks for the great work!
>> >
>> > Best,
>> > Rui
>> >
>> > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam  wrote:
>> >
>> > > Finally! Thanks to all!
>> > >
>> > > Best,
>> > > Paul Lam
>> > >
>> > > > 2023年10月27日 03:58,Alexander Fedulov  写道:
>> > > >
>> > > > Great work, thanks everyone!
>> > > >
>> > > > Best,
>> > > > Alexander
>> > > >
>> > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser 
>> > > > wrote:
>> > > >
>> > > >> Thank you all who have contributed!
>> > > >>
>> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin 
>> > > >>
>> > > >>> Thanks for the great work! Congratulations
>> > > >>>
>> > > >>>
>> > > >>> Best,
>> > > >>> Feng Jin
>> > > >>>
>> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu  
>> > > >>> wrote:
>> > > >>>
>> > > >>>> Congratulations, Well done!
>> > > >>>>
>> > > >>>> Best,
>> > > >>>> Leonard
>> > > >>>>
>> > > >>>> On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee 
>> > > >>>> 
>> > > >>>> wrote:
>> > > >>>>
>> > > >>>>> Thanks for the great work! Congrats all!
>> > > >>>>>
>> > > >>>>> Best,
>> > > >>>>> Lincoln Lee
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> Jing Ge  于2023年10月27日周五 00:16写道:
>> > > >>>>>
>> > > >>>>>> The Apache Flink community is very happy to announce the release 
>> > > >>>>>> of
>> > > >>>>> Apache
>> > > >>>>>> Flink 1.18.0, which is the first release for the Apache Flink 1.18
>> > > >>>>> series.
>> > > >>>>>>
>> > > >>>>>> Apache Flink® is an open-source unified stream and batch data
>> > > >>>> processing
>> > > >>>>>> framework for distributed, high-performing, always-available, and
>> > > >>>>> accurate
>> > > >>>>>> data applications.
>> > > >>>>>>
>> > > >>>>>> The release is available for download at:
>> > > >>>>>> https://flink.apache.org/downloads.html
>> > > >>>>>>
>> > > >>>>>> Please check out the release blog post for an overview of the
>> > > >>>>> improvements
>> > > >>>>>> for this release:
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > > https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
>> > > >>>>>>
>> > > >>>>>> The full release notes are available in Jira:
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
>> > > >>>>>>
>> > > >>>>>> We would like to thank all contributors of the Apache Flink
>> > > >> community
>> > > >>>> who
>> > > >>>>>> made this release possible!
>> > > >>>>>>
>> > > >>>>>> Best regards,
>> > > >>>>>> Konstantin, Qingsheng, Sergey, and Jing
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> > >
>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Yangze Guo
Great work! Congratulations to everyone involved!

Best,
Yangze Guo

On Fri, Oct 27, 2023 at 10:23 AM Qingsheng Ren  wrote:
>
> Congratulations and big THANK YOU to everyone helping with this release!
>
> Best,
> Qingsheng
>
> On Fri, Oct 27, 2023 at 10:18 AM Benchao Li  wrote:
>>
>> Great work, thanks everyone involved!
>>
>> Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
>> >
>> > Thanks for the great work!
>> >
>> > Best,
>> > Rui
>> >
>> > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam  wrote:
>> >
>> > > Finally! Thanks to all!
>> > >
>> > > Best,
>> > > Paul Lam
>> > >
>> > > > 2023年10月27日 03:58,Alexander Fedulov  写道:
>> > > >
>> > > > Great work, thanks everyone!
>> > > >
>> > > > Best,
>> > > > Alexander
>> > > >
>> > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser 
>> > > > wrote:
>> > > >
>> > > >> Thank you all who have contributed!
>> > > >>
>> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin 
>> > > >>
>> > > >>> Thanks for the great work! Congratulations
>> > > >>>
>> > > >>>
>> > > >>> Best,
>> > > >>> Feng Jin
>> > > >>>
>> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu  
>> > > >>> wrote:
>> > > >>>
>> > > >>>> Congratulations, Well done!
>> > > >>>>
>> > > >>>> Best,
>> > > >>>> Leonard
>> > > >>>>
>> > > >>>> On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee 
>> > > >>>> 
>> > > >>>> wrote:
>> > > >>>>
>> > > >>>>> Thanks for the great work! Congrats all!
>> > > >>>>>
>> > > >>>>> Best,
>> > > >>>>> Lincoln Lee
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> Jing Ge  于2023年10月27日周五 00:16写道:
>> > > >>>>>
>> > > >>>>>> The Apache Flink community is very happy to announce the release 
>> > > >>>>>> of
>> > > >>>>> Apache
>> > > >>>>>> Flink 1.18.0, which is the first release for the Apache Flink 1.18
>> > > >>>>> series.
>> > > >>>>>>
>> > > >>>>>> Apache Flink® is an open-source unified stream and batch data
>> > > >>>> processing
>> > > >>>>>> framework for distributed, high-performing, always-available, and
>> > > >>>>> accurate
>> > > >>>>>> data applications.
>> > > >>>>>>
>> > > >>>>>> The release is available for download at:
>> > > >>>>>> https://flink.apache.org/downloads.html
>> > > >>>>>>
>> > > >>>>>> Please check out the release blog post for an overview of the
>> > > >>>>> improvements
>> > > >>>>>> for this release:
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > > https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
>> > > >>>>>>
>> > > >>>>>> The full release notes are available in Jira:
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
>> > > >>>>>>
>> > > >>>>>> We would like to thank all contributors of the Apache Flink
>> > > >> community
>> > > >>>> who
>> > > >>>>>> made this release possible!
>> > > >>>>>>
>> > > >>>>>> Best regards,
>> > > >>>>>> Konstantin, Qingsheng, Sergey, and Jing
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> > >
>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li


Re: Query on class annotation like @Experimental

2023-05-03 Thread Yangze Guo
Hi, Anuj,

- Classes annotated with @Internal are not public API and thus might
change across any two releases.
- Classes annotated with @Experimental are for experimental use. They
can be changed across any two releases as well.
- Classes annotated with @PublicEvolving are intended for public use.
They are stable across patch releases (1.17.0 and 1.17.1), but can be
changed across minor releases (1.17.0 and 1.18.0).

You can refer to [1] for more details.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-196%3A+Source+API+stability+guarantees

Best,
Yangze Guo

On Wed, May 3, 2023 at 12:08 PM Anuj Jain  wrote:
>
> Hi Community,
> I saw some flink classes annotated with
> @Experimental
> @PublicEvolving
> @Internal
>
> What do these annotations mean? Can I use these classes in production?
> How the class APIs would evolve in future. Can they break backward 
> compatibility in terms of API declaration or implementation, in minor or 
> major Flink releases.
>
> More specifically, I am trying to use AvroParquet reader/writers with File 
> source and sink operators and i saw some classes with these annotations like
> AvroParquetWriters (@Experimental)
> ParquetWriterFactory (@PublicEvolving  implements BulkWriter.Factory)
> BulkWriter (@PublicEvolving)
> AvroParquetReaders (@Experimental)
> AvroParquetRecordFormat (implements StreamFormat (@PublicEvolving))
> StreamFormatAdapter (@Internal)
> StreamFormat (@PublicEvolving)
>
> Are they safe to use ?
>
> Appreciate your feedback.
>
> Regards
> Anuj


Re: Flink资源动态分配

2021-11-14 Thread Yangze Guo
可以尝试下Reactive mode.[1] 扩缩容操作仍需要外部干预。

[1] 
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/elastic_scaling/

Best,
Yangze Guo

On Mon, Nov 15, 2021 at 3:41 PM 疾鹰击皓月 <1764232...@qq.com.invalid> wrote:
>
> 您好:
>   
> Flink工程白天和晚上数据流量差距非常巨大,如果并行度设置的低无法应对白天的流量洪峰,但如果增大资源分配晚上存在资源浪费。 
>  
> 想请问Flink是否有动态资源分配的方案,在不停止应用的前提下调整资源分配的情况吗,比如在不同的时间分配不同的资源(无论是通过增减container模式,增大每个container资源的模式或者其他可能的方案?)
>   非常感谢


Re: How to specify slot task sharing group for a task manager?

2021-11-12 Thread Yangze Guo
AFAIK, it is not under discussion now.

Best,
Yangze Guo

On Fri, Nov 12, 2021 at 9:16 PM Morten Gunnar Bjørner Lindeberg
 wrote:
>
> Hi again
>
> Ok, thanks then I understand:) Do you know if there is a plan to support this 
> with elastic scaling in a later release?
>
> Regards
> Morten
>
> On 12 Nov 2021, at 14:02, Chesnay Schepler  wrote:
>
> The fine-grained resource management currently does not support elastic 
> scaling.
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/finegrained_resource/#limitations
>
> No support for the Elastic Scaling. The elastic scaling only supports slot 
> requests without specified-resource at the moment.
>
> On 12/11/2021 12:57, Morten Gunnar Bjørner Lindeberg wrote:
>
> Hi again
>
> Thanks for the reply!
>
> Not sure I understood the part with active ResourceManager. But if I 
> understood you right, running Flink in standalone mode enables setting 
> specific values for the different TaskManagers in the cluster.
>
> Following the documentation linked by Chesnay,  I have now registered a 
> custom resource (called ‘close') to one of the task managers in my standalone 
> cluster (added in flink-conf.yaml only for that particular task manager). It 
> seems the task manager has been assigned with that resource:
>   "id": "10.0.0.5:36501-f787a2",
>   "path": "akka.tcp://flink@10.0.0.5:36501/user/rpc/taskmanager_0",
>   "dataPort": 45583,
>   "jmxPort": -1,
>   "timeSinceLastHeartbeat": 1636710281811,
>   "slotsNumber": 1,
>   "freeSlots": 0,
>   "totalResource": {
> "cpuCores": 1,
> "taskHeapMemory": 383,
> "taskOffHeapMemory": 0,
> "managedMemory": 512,
> "networkMemory": 128,
> "extendedResources": {
>   "close": 1
> }
>
>
> On the application side, I registered a task sharing group for this specific 
> resource
>
> .setExternalResource("close", 1.0)
>
>  And flowingly added one of this operators to the group.
>
> .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source").slotSharingGroup(ssg)
>
>
>
> However, it seems parallelism is still 4
> "id": "cbc357ccb763df2852fee8c4fc7d55f2",
>   "name": "Source: Kafka Source -> Flat Map",
>   "parallelism": 4,
>
> .. I can see the task running on all four nodes in the cluster. It seems to 
> me that the reactive mode scaling does not respect the resource demand for 
> the operator?
>
> Regards
> Morten
>
>
> On 11 Nov 2021, at 17:15, Yangze Guo  wrote:
>
> Hi, Morten,
>
> Sorry for the belated reply. With the doc provided by Chesnay, you can
> start the TaskManager with GPU. However, currently with active
> ResourceManager, the resource profiles of TaskManager are all the
> same, which means all of your TaskManager will have at least one GPU.
> If you only want some of the TaskManagers to have GPU, you may set up
> a standalone cluster and then manually start a TaskManager with GPU
> and let it register to the cluster.
>
> Best,
> Yangze Guo
>
> On Thu, Nov 11, 2021 at 9:57 PM Chesnay Schepler  wrote:
>
>
> The external resource documentation should contain instructions to do so.
>
> On 10/11/2021 09:18, Morten Gunnar Bjørner Lindeberg wrote:
>
> Hi :)
>
> I am trying the fine-grained resource management feature in Flink 1.14, 
> hoping it can enable assigning certain operators to certain TaskManagers.
>
> The sample code in 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/finegrained_resource/
>  -shows how to define the group and its resource (e.g. using 
> setExternalResource-method), but I do not see any option to "assign" a 
> TaskManager worker instance with the capabilities of this "external resource”.
>
> Following the GPU-based example in the documentation, how can I ensure that 
> Flink "knows" which task manager actually has the required GPU? Is there some 
> configuration option I am missing / missing in the documentation?
>
> Have a nice day!
> Morten Lindeberg
> PostDoc Informatics, University of Oslo
> mglin...@ifi.uio.no
>
>
>
>
>


Re: How to specify slot task sharing group for a task manager?

2021-11-11 Thread Yangze Guo
Hi, Morten,

Sorry for the belated reply. With the doc provided by Chesnay, you can
start the TaskManager with GPU. However, currently with active
ResourceManager, the resource profiles of TaskManager are all the
same, which means all of your TaskManager will have at least one GPU.
If you only want some of the TaskManagers to have GPU, you may set up
a standalone cluster and then manually start a TaskManager with GPU
and let it register to the cluster.

Best,
Yangze Guo

On Thu, Nov 11, 2021 at 9:57 PM Chesnay Schepler  wrote:
>
> The external resource documentation should contain instructions to do so.
>
> On 10/11/2021 09:18, Morten Gunnar Bjørner Lindeberg wrote:
>
> Hi :)
>
> I am trying the fine-grained resource management feature in Flink 1.14, 
> hoping it can enable assigning certain operators to certain TaskManagers.
>
> The sample code in 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/finegrained_resource/
>  -shows how to define the group and its resource (e.g. using 
> setExternalResource-method), but I do not see any option to "assign" a 
> TaskManager worker instance with the capabilities of this "external resource”.
>
> Following the GPU-based example in the documentation, how can I ensure that 
> Flink "knows" which task manager actually has the required GPU? Is there some 
> configuration option I am missing / missing in the documentation?
>
> Have a nice day!
> Morten Lindeberg
> PostDoc Informatics, University of Oslo
> mglin...@ifi.uio.no
>
>


Re: NoResourceAvailableException on taskmanager(s)

2021-11-04 Thread Yangze Guo
Hi, Deniz.

The exception implies that there are not enough slots in your
standalone cluster. You need to increase the
`taskmanager.numberOfTaskSlots` or the `numberOfTaskManagers`.
You can search the related log "Received resource requirements from
job" in jobManager, which indicates how many slots your job needs.

Best,
Yangze Guo

On Thu, Nov 4, 2021 at 5:58 PM Deniz Koçak  wrote:
>
> Hi,
>
> We have been running our job on flink image
> 1.13.2-stream1-scala_2.12-java11. It's a standalone deployment on a
> Kubernetes cluster (EKS on AWS which uses EC2 nodes as hosts and also
> depends on a auto-scaler to adjust the resources cluster wide). After
> a few mins. (5-20) we see the exception below on taskmanager(s). The
> job quite busy so we see backpressure on some tasks, though wasn't
> expecting such a problem under heavy load (we are ok with slow
> processing and backlog). Neither restarting the task or increasing the
> resources solved the issue. We always get the the exception below
> after a period of time which makes the job unstable.
>
> ---
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not acquire the minimum required resources.
> at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source)
> at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown
> Source)
> at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
> Source)
> at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
> Source)
> at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:535)
> 
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not acquire the minimum required resources.
> ---
>
> We tried different configs. in terms of cpu/mem allocated for the task
> managers in Flink configuration. We tried more cpu & mem. after
> realized the problem though none of the increases actually solved the
> problem. Part of the config we have is below.
>
> taskmanager.numberOfTaskSlots: '4'
>   kubernetes:
> pods:
>   affinity: null
>   annotations:
> prometheus.io.port: '9249'
> prometheus.io.scrape: 'true'
>   labels: {}
>   nodeSelector: {}
>   securityContext: null
>   logging:
> log4jLoggers:
>   '': INFO
> loggingProfile: default
>   numberOfTaskManagers: 2
>   parallelism: 8
>   resources:
> jobmanager:
>   cpu: 2
>   memory: 2G
> taskmanager:
>   cpu: 2
>   memory: 8G
>
>
> Please find the attached the configuration file we use at the moment.
>
> Thanks,


Re: flink 1.13.1 通过yarn-application运行批应用,处理mysql源一亿条数据到hive,发现需要配置16G+的Taskmangaer内存

2021-11-04 Thread Yangze Guo
失败的原因呢?有没有报错栈和日志?

Best,
Yangze Guo

On Thu, Nov 4, 2021 at 4:01 PM Asahi Lee <978466...@qq.com.invalid> wrote:
>
> hi!
> 我通过flink sql,将mysql的一亿条数据传输到hive库中,通过yarn-application方式运行,结果配置16G的内存,执行失败!


Re: [ANNOUNCE] Apache Flink 1.13.3 released

2021-10-22 Thread Yangze Guo
Thank Chesnay, Martijn, and everyone involved!

Best,
Yangze Guo

On Fri, Oct 22, 2021 at 4:25 PM Yun Tang  wrote:
>
> Thanks for Chesnay & Martijn and everyone who made this release happen.
>
> Best
> Yun Tang
> 
> From: JING ZHANG 
> Sent: Friday, October 22, 2021 10:17
> To: dev 
> Cc: Martijn Visser ; Jingsong Li 
> ; Chesnay Schepler ; user 
> 
> Subject: Re: [ANNOUNCE] Apache Flink 1.13.3 released
>
> Thank Chesnay, Martijn and every contributor for making this happen!
>
>
> Thomas Weise  于2021年10月22日周五 上午12:15写道:
>
> Thanks for making the release happen!
>
> On Thu, Oct 21, 2021 at 5:54 AM Leonard Xu  wrote:
> >
> > Thanks to Chesnay & Martijn and everyone who made this release happen.
> >
> >
> > > 在 2021年10月21日,20:08,Martijn Visser  写道:
> > >
> > > Thank you Chesnay, Leonard and all contributors!
> > >
> > > On Thu, 21 Oct 2021 at 13:40, Jingsong Li  > > <mailto:jingsongl...@gmail.com>> wrote:
> > > Thanks, Chesnay & Martijn
> > >
> > > 1.13.3 really solves many problems.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, Oct 21, 2021 at 6:46 PM Konstantin Knauf  > > <mailto:kna...@apache.org>> wrote:
> > > >
> > > > Thank you, Chesnay & Martijn, for managing this release!
> > > >
> > > > On Thu, Oct 21, 2021 at 10:29 AM Chesnay Schepler  > > > <mailto:ches...@apache.org>>
> > > > wrote:
> > > >
> > > > > The Apache Flink community is very happy to announce the release of
> > > > > Apache Flink 1.13.3, which is the third bugfix release for the Apache
> > > > > Flink 1.13 series.
> > > > >
> > > > > Apache Flink® is an open-source stream processing framework for
> > > > > distributed, high-performing, always-available, and accurate data
> > > > > streaming applications.
> > > > >
> > > > > The release is available for download at:
> > > > > https://flink.apache.org/downloads.html 
> > > > > <https://flink.apache.org/downloads.html>
> > > > >
> > > > > Please check out the release blog post for an overview of the
> > > > > improvements for this bugfix release:
> > > > > https://flink.apache.org/news/2021/10/19/release-1.13.3.html 
> > > > > <https://flink.apache.org/news/2021/10/19/release-1.13.3.html>
> > > > >
> > > > > The full release notes are available in Jira:
> > > > >
> > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350329
> > > > >  
> > > > > <https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350329>
> > > > >
> > > > > We would like to thank all contributors of the Apache Flink community
> > > > > who made this release possible!
> > > > >
> > > > > Regards,
> > > > > Chesnay
> > > > >
> > > > >
> > > >
> > > > --
> > > >
> > > > Konstantin Knauf
> > > >
> > > > https://twitter.com/snntrable <https://twitter.com/snntrable>
> > > >
> > > > https://github.com/knaufk <https://github.com/knaufk>
> > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> >


Re: High availability data clean up

2021-10-21 Thread Yangze Guo
For application mode, when the job finished normally or be canceled,
the ConfigMaps will be cleanup.
For session mode, when you stop the session through [1], the
ConfigMaps will be cleanup.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#stop-a-running-session-cluster

Best,
Yangze Guo

On Thu, Oct 21, 2021 at 6:37 AM Weiqing Yang  wrote:
>
>
> Hi,
>
> Per the doc, `kubernetes.jobmanager.owner.reference` can be used to set up 
> the owners of the job manager Deployment. If the owner is deleted, then the 
> job manager and its related pods will be deleted. How about the HA related 
> ConfigMaps? Are they also deleted when deleting the owner of the job manager 
> Deployment? Per the wiki here, the HA data will be retained when deleting 
> jobmanager Deployment. If we want to delete these HA related configMaps as 
> well when deleting the job manager, what is the suggested way to do that?
>
> Thanks,
> weiqing
>


Re: Programmatically configuring S3 settings

2021-10-17 Thread Yangze Guo
Hi, Pavel.

>From my understanding of the doc[1], you need to set it in
flink-conf.yaml instead of your job.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins

Best,
Yangze Guo

On Sat, Oct 16, 2021 at 5:46 AM Pavel Penkov  wrote:
>
> Apparently Flink 1.14.0 doesn't correctly translate S3 options when they are 
> set programmatically. I'm creating a local environment like this to connect 
> to local MinIO instance:
>
>   val flinkConf = new Configuration()
>   flinkConf.setString("s3.endpoint", "http://127.0.0.1:9000;)
>   flinkConf.setString("s3.aws.credentials.provider", 
> "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
>
>   val env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConf)
>
> Then StreamingFileSink fails with a huge stack trace with most relevant 
> messages being Caused by: 
> org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials 
> provided by SimpleAWSCredentialsProvider 
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
> com.amazonaws.SdkClientException: Failed to connect to service endpoint:  
> which means that Hadoop tried to enumerate all of the credential providers 
> instead of using the one set in configuration. What am I doing wrong?


Re: Yarn job not exit when flink job exit

2021-10-11 Thread Yangze Guo
Hi, Jake

In Flink 1.14, we recommend using "-t yarn-per-job"[1] for starting
per-job cluster. Regarding your issue, I could not reproduce it with
the Wordcount example. However, I think this is not the right way for
Flink's SQL client, which might be the root cause of your issue. Would
you like to take a look at [2]?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#per-job-cluster-mode
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/

Best,
Yangze Guo

On Tue, Oct 12, 2021 at 10:52 AM Caizhi Weng  wrote:
>
> Hi!
>
> yarn-cluster is the mode for a yarn session cluster, which means the cluster 
> will remain even after the job is finished. If you want to finish the Flink 
> job as well as the yarn job, use yarn-per-job mode instead.
>
> Jake  于2021年10月9日周六 下午5:53写道:
>>
>> Hi
>>
>> When submit job in yarn-cluster model, flink job finish but yarn job not 
>> exit. What should I do?
>>
>> Submit command:
>>
>> /opt/app/flink/flink-1.14.0/bin/flink run -m yarn-cluster 
>> ./flink-sql-client.jar --file dev.sql


Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Yangze Guo
Thanks, Xintong, Joe, Dawid for the great work, thanks to everyone involved!

Best,
Yangze Guo

On Thu, Sep 30, 2021 at 12:02 AM Rion Williams  wrote:
>
> Great news all! Looking forward to it!
>
> > On Sep 29, 2021, at 10:43 AM, Theo Diefenthal 
> >  wrote:
> >
> > 
> > Awesome, thanks for the release.
> >
> > - Ursprüngliche Mail -
> > Von: "Dawid Wysakowicz" 
> > An: "dev" , "user" , 
> > annou...@apache.org
> > Gesendet: Mittwoch, 29. September 2021 15:59:47
> > Betreff: [ANNOUNCE] Apache Flink 1.14.0 released
> >
> > The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.14.0.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> > streaming applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> > improvements for this bugfix release:
> > https://flink.apache.org/news/2021/09/29/release-1.14.0.html
> >
> > The full release notes are available in Jira:
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349614
> >
> > We would like to thank all contributors of the Apache Flink community
> > who made this release possible!
> >
> > Regards,
> > Xintong, Joe, Dawid


Re: Flink run different jars

2021-09-29 Thread Yangze Guo
You need to edit the conf/workers. Example of the config[1] and the process[2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode

Best,
Yangze Guo

On Wed, Sep 29, 2021 at 1:02 PM Qihua Yang  wrote:
>
> Hi Yangze,
>
> Thanks a lot for your reply. References are very helpful!
> Another quick question. Reference 1 can start a standalone cluster (session 
> Mode). That cluster has a jobManager. I can submit job to run. How about 
> taskManger? Do I need to manually start multiple taskManagers?
> Is there a complete example to show the process?
>
> Thanks,
> Qihua
>
>
> On Tue, Sep 28, 2021 at 7:02 PM Yangze Guo  wrote:
>>
>> Hi, Qihua
>>
>> IIUC, what you want might be a standalone cluster[1] or session 
>> cluster[2][3].
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode
>>
>> Best,
>> Yangze Guo
>>
>> On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang  wrote:
>> >
>> > Hi,
>> >
>> > Is that possible to run a flink app without a job? What I am trying to do 
>> > is I build multiple jars. And switch jar to run different jobs.
>> > I am not sure if flink supports this mode. I saw rest API can upload jar, 
>> > cancel job and run a jar.
>> > Right now I can upload a jar to flink. But when I cancel a job, flink will 
>> > restart automatically. I checked log. It show below logs. Can anyone help 
>> > me out?
>> >
>> > Caused by: 
>> > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
>> >  Application Status: CANCELED
>> > at 
>> > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73)
>> > ... 41 common frames omitted
>> > Caused by: org.apache.flink.runtime.client.JobCancellationException: Job 
>> > was cancelled.
>> > at 
>> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149)
>> > at 
>> > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64)
>> > ... 41 common frames omitted
>> >
>> > Thanks!


Re: Flink run different jars

2021-09-28 Thread Yangze Guo
Hi, Qihua

IIUC, what you want might be a standalone cluster[1] or session cluster[2][3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode

Best,
Yangze Guo

On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang  wrote:
>
> Hi,
>
> Is that possible to run a flink app without a job? What I am trying to do is 
> I build multiple jars. And switch jar to run different jobs.
> I am not sure if flink supports this mode. I saw rest API can upload jar, 
> cancel job and run a jar.
> Right now I can upload a jar to flink. But when I cancel a job, flink will 
> restart automatically. I checked log. It show below logs. Can anyone help me 
> out?
>
> Caused by: 
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
>  Application Status: CANCELED
> at 
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73)
> ... 41 common frames omitted
> Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
> cancelled.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149)
> at 
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64)
> ... 41 common frames omitted
>
> Thanks!


Re: S3 access permission error

2021-09-22 Thread Yangze Guo
I'm not an expert on S3. If it is not a credential issue, have you
finish the checklist of this doc[1]?

[1] 
https://aws.amazon.com/premiumsupport/knowledge-center/emr-s3-403-access-denied/?nc1=h_ls

Best,
Yangze Guo

On Wed, Sep 22, 2021 at 3:39 PM Dhiru  wrote:
>
>
> Not sure @yangze ...  but other services which are deployed in same places we 
> are able to access s3 bucket, the link which you share are recommended way, 
> if we have access to s3 then we should not pass credentials ?
>
> On Wednesday, September 22, 2021, 02:59:05 AM EDT, Yangze Guo 
>  wrote:
>
>
> You might need to configure the access credential. [1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> Best,
> Yangze Guo
>
> On Wed, Sep 22, 2021 at 2:17 PM Dhiru  wrote:
> >
> >
> > i see org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) plugin 
> > is not able to create folder , not sure if I need to change something
> > Whereas when We are trying to pass from the local laptop and passing  aws 
> > credentails its able to create a folder and running as expected
> > On Wednesday, September 22, 2021, 01:39:04 AM EDT, Dhiru 
> >  wrote:
> >
> >
> > flink image I have added both s3 plugin
> > FROM flink:1.11.3-scala_2.12-java11
> > RUN mkdir ./plugins/flink-s3-fs-presto
> > RUN cp ./opt/flink-s3-fs-presto-1.11.3.jar  ./plugins/flink-s3-fs-presto/
> > RUN mkdir ./plugins/flink-s3-fs-hadoop
> > RUN cp ./opt/flink-s3-fs-hadoop-1.11.3.jar  ./plugins/flink-s3-fs-hadoop/
> >
> > some part of flink-conf.yaml  ( I tried with both s3a and s3  )
> ># REQUIRED: set storage location for job metadata in remote storage
> >  state.backend: filesystem
> >  state.backend.fs.checkpointdir: 
> > s3a://msc-actigraph-test-bucket/flink-checkpointing/checkpoints
> >  state.checkpoints.dir: 
> > s3a://msc-actigraph-test-bucket/flink-checkpointing/externalized-checkpoints
> >  state.savepoints.dir: 
> > s3a://msc-actigraph-test-bucket/flink-checkpointing/savepoints
> >  high-availability.storageDir: 
> > s3a://msc-actigraph-test-bucket/flink-checkpointing/storagedir
> >  s3.path.style.access: true
> >
> > org.apache.flink.runtime.rest.handler.RestHandlerException: Could not 
> > execute application. at 
> > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)
> >  at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> > Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at 
> > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> >  Source) at 
> > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> > Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: 
> > java.util.concurrent.CompletionException: 
> > org.apache.flink.util.FlinkRuntimeException: Could not execute application. 
> > at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> > Source) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: 
> > Could not execute application. at 
> > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)
> >  at 
> > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
> >  at 
> > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
> >  ... 7 more Caused by: 
> > org.apache.flink.client.program.ProgramInvocationException: The main method 
> > caused an error: Failed to execute job 'DeduplicationJob'. at 
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> >  at 
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>

Re: S3 access permission error

2021-09-22 Thread Yangze Guo
You might need to configure the access credential. [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials

Best,
Yangze Guo

On Wed, Sep 22, 2021 at 2:17 PM Dhiru  wrote:
>
>
> i see org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) plugin is 
> not able to create folder , not sure if I need to change something
> Whereas when We are trying to pass from the local laptop and passing  aws 
> credentails its able to create a folder and running as expected
> On Wednesday, September 22, 2021, 01:39:04 AM EDT, Dhiru 
>  wrote:
>
>
> flink image I have added both s3 plugin
> FROM flink:1.11.3-scala_2.12-java11
> RUN mkdir ./plugins/flink-s3-fs-presto
> RUN cp ./opt/flink-s3-fs-presto-1.11.3.jar  ./plugins/flink-s3-fs-presto/
> RUN mkdir ./plugins/flink-s3-fs-hadoop
> RUN cp ./opt/flink-s3-fs-hadoop-1.11.3.jar  ./plugins/flink-s3-fs-hadoop/
>
> some part of flink-conf.yaml   ( I tried with both s3a and s3  )
> # REQUIRED: set storage location for job metadata in remote storage
>  state.backend: filesystem
>  state.backend.fs.checkpointdir: 
> s3a://msc-actigraph-test-bucket/flink-checkpointing/checkpoints
>  state.checkpoints.dir: 
> s3a://msc-actigraph-test-bucket/flink-checkpointing/externalized-checkpoints
>  state.savepoints.dir: 
> s3a://msc-actigraph-test-bucket/flink-checkpointing/savepoints
>  high-availability.storageDir: 
> s3a://msc-actigraph-test-bucket/flink-checkpointing/storagedir
>  s3.path.style.access: true
>
> org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
> application. at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)
>  at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown 
> Source) at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
> Source) at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
> at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source) at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkRuntimeException: Could not execute application. 
> at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: 
> Could not execute application. at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>  ... 7 more Caused by: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute job 'DeduplicationJob'. at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>  ... 9 more Caused by: org.apache.flink.util.FlinkException: Failed to 
> execute job 'DeduplicationJob'. at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1829)
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
>  at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
>  at io.epiphanous.flinkrunner.flink.BaseFlinkJob.ru

Re: Invalid flink-config keeps going and ignores bad config values

2021-09-17 Thread Yangze Guo
AFAIK there is not. Flink will just skip the invalid lines.

Best,
Yangze Guo

On Sat, Sep 18, 2021 at 7:00 AM Dan Hill  wrote:
>
> Hi.  I noticed my flink-config.yaml had an error in it.  I assumed a bad 
> config would stop Flink from running (to catch errors earlier).  Is there a 
> way I can enable a strict parsing mode so any Flink parsing issue causes 
> Flink to fail?  I don't see one when looking at the code.
>
> 2021-09-17 22:45:18,893 WARN  
> org.apache.flink.configuration.GlobalConfiguration   [] - Error while 
> trying to split key and value in configuration file 
> /opt/flink/conf/flink-conf.yaml:29: "
> taskmanager.memory.network.fraction=0.2"
>
> 2021-09-17 22:45:18,894 WARN  
> org.apache.flink.configuration.GlobalConfiguration   [] - Error while 
> trying to split key and value in configuration file 
> /opt/flink/conf/flink-conf.yaml:30: "taskmanager.memory.network.max=2g"


Re: flink on native kubernetes作业cpu实际使用量与请求量相差太大问题

2021-09-16 Thread Yangze Guo
可能是cpu过少导致tm启动慢,超过其注册限定时间,可以尝试调大resourcemanager.taskmanager-registration.timeout

Best,
Yangze Guo

On Fri, Sep 17, 2021 at 9:36 AM casel.chen  wrote:
>
> 我们使用Flink运行实时作业在Kubernetes,发现作业实际使用的CPU资源远远小于作业请求量,但是将作业请求量降低后发现作业启动不了。请问这是个案还是正常情况?
> 例如,我们一个作业请求了0.5个cpu,但实际使用量只有0.09左右,修改请求为0.2个cpu,作业启动不了。
> 现在整个k8s集群有96个cpu,请求了86个cpu,实际使用只有7.5个cpu左右,这也相差太大了,有什么办法可以解决吗?


Re: Tracking Total Metrics Reported

2021-09-15 Thread Yangze Guo
Hi, Mason

AFAIK the JM does not report the total number of metrics it has. Maybe
you can stats it of each entity through [1]?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#rest-api-integration

Best,
Yangze Guo

On Thu, Sep 16, 2021 at 9:30 AM Mason Chen  wrote:
>
> For it to be most useful, the user should be able to obtain the total number 
> of counters, gauges, meters, and histograms, separately.
>
> On Wed, Sep 15, 2021 at 6:23 PM Mason Chen  wrote:
>>
>> Hi all,
>>
>> Does Flink have any sort of feature to track the total number of metrics 
>> reported by the Flink job? Ideally, the total would be reported by the job 
>> manager. Even if there is a log that exposes this information, that would be 
>> helpful!
>>
>> Best,
>> Mason


Re: Streaming SQL support for redis streaming connector

2021-09-14 Thread Yangze Guo
> What about the bahir streaming connectors? Are they considered canonical? 
> Would they be merged in to the main project at some point? Iiuc we can use 
> table API etc to write data to redis using that right?

I think it still can be used with Flink 1.12 and 1.13. However, it has
not been updated for 5 months and is not implemented based on the new
interfaces introduced in FLIP-95. AFAIK, the community does not plan
to merge it.

> What more will be required for us to use SQL? (as in specify the connector in 
> the WITH clause as redis)

To use the bahir with SQL, you need to build the project and move the
uber jar to the lib directory of your flink dist and fill in necessary
properties[1][2].

[1] 
https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
[2] 
https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java


Best,
Yangze Guo

On Wed, Sep 15, 2021 at 11:34 AM Osada Paranaliyanage
 wrote:
>
> Hi David,
>
>
>
>   What about the bahir streaming connectors? Are they considered canonical? 
> Would they be merged in to the main project at some point? Iiuc we can use 
> table API etc to write data to redis using that right? What more will be 
> required for us to use SQL? (as in specify the connector in the WITH clause 
> as redis)
>
>
>
> Thanks,
>
> Osada.
>
>
>
> From: David Morávek 
> Sent: Tuesday, September 14, 2021 7:53 PM
> To: Osada Paranaliyanage 
> Cc: user@flink.apache.org
> Subject: Re: Streaming SQL support for redis streaming connector
>
>
>
> [EXTERNAL EMAIL] This email has been received from an external source – 
> please review before actioning, clicking on links, or opening attachments.
>
> Hi Osada,
>
>
>
> in theory building a Redis table from "CDC stream" should definitely be 
> doable. Unfortunately Flink currently doesn't have any official Redis Sink 
> for the Table API and there is currently no on-going effort for adding it, so 
> it would need to be implemented first. The resulting syntax would be pretty 
> much the same to what's outlined in the mentioned example.
>
>
>
> Best,
>
> D.
>
>
>
> On Tue, Sep 14, 2021 at 2:57 PM Osada Paranaliyanage 
>  wrote:
>
> Hi All, We are looking to use flink to build a materialized view of a 
> relation db and a document db using cdc streams. For this purpose we would 
> like to use redis for hosting the materialized view. Can we do this in 
> streaming SQL? We have worked through 
> https://github.com/ververica/flink-sql-CDC and can see how this will work 
> with ES as a sink. But can we use redis as the sink? Where do we find the 
> syntax for that?
>
>
>
> Thanks,
>
> Osada.
>
>
>
>
>
> 
>
>
>
> This e-mail is confidential. It may also be legally privileged. If you are 
> not the intended recipient or have received it in error, please delete it and 
> all copies from your system and notify the sender immediately by return 
> e-mail. Any unauthorized reading, reproducing, printing or further 
> dissemination of this e-mail or its contents is strictly prohibited and may 
> be unlawful. Internet communications cannot be guaranteed to be timely, 
> secure, error or virus-free. The sender does not accept liability for any 
> errors or omissions.
>
>
> 
>
>
> This e-mail is confidential. It may also be legally privileged. If you are 
> not the intended recipient or have received it in error, please delete it and 
> all copies from your system and notify the sender immediately by return 
> e-mail. Any unauthorized reading, reproducing, printing or further 
> dissemination of this e-mail or its contents is strictly prohibited and may 
> be unlawful. Internet communications cannot be guaranteed to be timely, 
> secure, error or virus-free. The sender does not accept liability for any 
> errors or omissions.
>


Re: Job manager crash

2021-09-05 Thread Yangze Guo
Hi,

The root cause is not "java.lang.NoClassDefFound". The job has been
running but could not edit the config map
"myJob--jobmanager-leader" and it
seems finally disconnected with the API server. Is there another job
with the same cluster id (myJob) ?

I would also pull Yang Wang.

Best,
Yangze Guo

On Mon, Sep 6, 2021 at 10:10 AM Caizhi Weng  wrote:
>
> Hi!
>
> There is a message saying "java.lang.NoClassDefFound Error: 
> org/apache/hadoop/hdfs/HdfsConfiguration" in your log file. Are you visiting 
> HDFS in your job? If yes it seems that your Flink distribution or your 
> cluster is lacking hadoop classes. Please make sure that there are hadoop 
> jars in the lib directory of Flink, or your cluster has set the 
> HADOOP_CLASSPATH environment variable.
>
> mejri houssem  于2021年9月4日周六 上午12:15写道:
>>
>>
>> Hello ,
>>
>> I am facing a JM crash lately. I am deploying a flink application cluster on 
>> kubernetes.
>>
>> When i install my chart using helm everything works fine but after some time 
>> ,the Jm starts to crash
>>
>> and then it gets deleted eventually after 5 restarts.
>>
>> flink version: 1.12.5 (upgraded recently from 1.12.2)
>> HA mode : k8s
>>
>> Here's the full log of the JM attached file.


Re: Flink taskmanager in crash loop

2021-08-17 Thread Yangze Guo
> 2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal > error 
> occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully 
> within 180 + seconds.

It seems the Task 'MASKED' can not be terminated within the timeout. I
think this would be the root cause of TaskManager's termination. We
need to find why Task 'MASKED' has been canceled. Can you provide some
logs related to it? Maybe you can search the "CANCELING" in jm and tm
logs.

Best,
Yangze Guo

On Wed, Aug 18, 2021 at 1:20 AM Abhishek Rai  wrote:
>
> Before these message, there is the following message in the log:
>
> 2021-08-12 23:02:58.015 [Canceler/Interrupts for Source: MASKED]) 
> (1/1)#29103' did not react to cancelling signal for 30 seconds, but is stuck 
> in method:
>  java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
> java.base@11.0.11/java.util.concurrent.locks.LockSupport.parkNanos(Unknown 
> Source)
> java.base@11.0.11/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
>  Source)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
> app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> java.base@11.0.11/java.lang.Thread.run(Unknown Source)
>
> On Tue, Aug 17, 2021 at 9:22 AM Abhishek Rai  wrote:
>>
>> Thanks Yangze, indeed, I see the following in the log about 10s before the 
>> final crash (masked some sensitive data using `MASKED`):
>>
>> 2021-08-16 15:58:13.985 [Canceler/Interrupts for Source: MAKSED] WARN 
>> org.apache.flink.runtime.taskmanager.Task  - Task 'MASKED' did not react to 
>> cancelling signal for 30 seconds, but is stuck in method:
>>  java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
>> java.base@11.0.11/java.util.concurrent.locks.LockSupport.park(Unknown Source)
>> java.base@11.0.11/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
>>  Source)
>> java.base@11.0.11/java.util.concurrent.ForkJoinPool.managedBlock(Unknown 
>> Source)
>> java.base@11.0.11/java.util.concurrent.CompletableFuture.waitingGet(Unknown 
>> Source)
>> java.base@11.0.11/java.util.concurrent.CompletableFuture.join(Unknown Source)
>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
>> app//org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
>> app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
>> app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> java.base@11.0.11/java.lang.Thread.run(Unknown Source)
>>
>> 2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR 
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal error 
>> occurred while executing the TaskManager. Shutting it down...
>> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully 
>> within 180 + seconds.
>>   at 
>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1718)
>>   at java.base/java.lang.Thread.run(Unknown Source)
>>
>>
>>
>> On Mon, Aug 16, 2021 at 7:05 PM Yangze Guo  wrote:
>>>
>>> Hi, Abhishek,
>>>
>>> Do you see something like "Fatal error occurred while executing the
>>> TaskManager" in your log or would you like to provide the whole task
>>> manager log?
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai  wrote:
>>> >
>>> > Hello,
>>> >
>>> > In our production environment, running Flink 1.13 (Scala 2.11), where 
>>> > Flink has been working without issues with a dozen or so jobs running for 
>>> > a while, Flink taskmanager started crash looping with a period of ~4 
>

Re: redis sink from flink

2021-08-16 Thread Yangze Guo
Hi, Jin

IIUC, the DataStream connector `RedisSink` can still be used. However,
the Table API connector `RedisTableSink` might not work (at least in
the future) because it is implemented based on the deprecated Table
connector abstraction. You can still give it a try, though.

Best,
Yangze Guo

On Tue, Aug 17, 2021 at 9:15 AM Jin Yi  wrote:
>
> is apache bahir still a thing?  it hasn't been touched for months (since 
> redis 2.8.5).
>
> as such, looking at the current flink connector docs, it's no longer pointing 
> to anything from the bahir project.  looking around in either the flink or 
> bahir newsgroups doesn't turn up anything regarding bahir's EOL.
>
> is the best bet for a flink to redis sink something i roll on my own 
> (inclined to go this route w/ buffered writes)?  or should i try going 
> through via kafka and using confluent's kafka redis connector (flink => kafka 
> => redis)?


Re: Flink taskmanager in crash loop

2021-08-16 Thread Yangze Guo
Hi, Abhishek,

Do you see something like "Fatal error occurred while executing the
TaskManager" in your log or would you like to provide the whole task
manager log?

Best,
Yangze Guo

On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai  wrote:
>
> Hello,
>
> In our production environment, running Flink 1.13 (Scala 2.11), where Flink 
> has been working without issues with a dozen or so jobs running for a while, 
> Flink taskmanager started crash looping with a period of ~4 minutes per 
> crash.  The stack trace is not very informative, therefore reaching out for 
> help, see below.
>
> The only other thing that's unusual is that due to what might be a product 
> issue (custom job code running on Flink), some or all of our tasks are also 
> in a crash loop.  Still, I wasn't expecting taskmanager itself to die.  Does 
> taskmanager have some built in feature to crash if all/most tasks are 
> crashing?
>
> 2021-08-16 15:58:23.984 [main] ERROR 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Terminating 
> TaskManagerRunner with exit code 1.
> org.apache.flink.util.FlinkException: Unexpected failure during runtime of 
> TaskManagerRunner.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:382)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$3(TaskManagerRunner.java:413)
>   at java.base/java.security.AccessController.doPrivileged(Native Method)
>   at java.base/javax.security.auth.Subject.doAs(Unknown Source)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:413)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:396)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:354)
> Caused by: java.util.concurrent.TimeoutException: null
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source)
>   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>   at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>   at java.base/java.lang.Thread.run(Unknown Source)
> 2021-08-16 15:58:23.986 [TaskExecutorLocalStateStoresManager shutdown hook] 
> INFO  o.a.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting 
> down TaskExecutorLocalStateStoresManager.
>
>
> Thanks very much!
>
> Abhishek


Re: taskmanager数问题请教

2021-08-05 Thread Yangze Guo
现在yarn上都是按需申请的

Best,
Yangze Guo

On Fri, Aug 6, 2021 at 10:31 AM 上官  wrote:
>
> 1.13版本中yarn模式提交时 -yn好像不能用了,请问现在要如何指定容器(taskmanager)数量?


Re: Obtain JobManager Web Interface URL

2021-08-02 Thread Yangze Guo
>From my understanding, what you want is actually a management system
for Flink jobs. I think it might be good to submit the job(with `flink
run`) and retrieve the WebUI in another process.

Best,
Yangze Guo

On Mon, Aug 2, 2021 at 10:39 PM Hailu, Andreas [Engineering]
 wrote:
>
> Hi Yangze, sure!
>
> After a submitted Flink app is complete, our client app polls the RESTful 
> interface to pull job metrics -- operator start/end times, duration, records 
> + bytes read/written etc... All of these metrics are all published to a 
> database for analytical purposes, again both programmatic and ad-hoc.
>
> There was no clear exposure of ClusterClient, so we had originally worked 
> around this by extending the CliFrontend class with a bit of a façade class 
> that grabbed the ClusterClient from the executeProgram() method:
>
> @Override
> protected void executeProgram(PackagedProgram program, ClusterClient client, 
> int parallelism) throws ProgramMissingJobException, 
> ProgramInvocationException {
> logAndSysout("Starting execution of program");
> System.setProperty(JOB_MANAGER_WEB_INTERFACE_PROPERTY, 
> client.getWebInterfaceURL()); // <- Used elsewhere in application
> ...
> }
>
> These metrics prove immensely valuable as they help us optimize performance, 
> diagnose issues, as well as predict resource requirements for applications.
>
> // ah
>
> -Original Message-
> From: Yangze Guo 
> Sent: Sunday, August 1, 2021 10:38 PM
> To: Hailu, Andreas [Engineering] 
> Cc: user@flink.apache.org
> Subject: Re: Obtain JobManager Web Interface URL
>
> AFAIK, the ClusterClient should not be exposed through the public API.
> Would you like to explain your use case and why you need to get the web UI 
> programmatically?
>
> Best,
> Yangze Guo
>
> On Fri, Jul 30, 2021 at 9:54 PM Hailu, Andreas [Engineering] 
>  wrote:
> >
> > Hello Yangze, thanks for responding.
> >
> > I'm attempting to perform this programmatically on YARN, so looking at a 
> > log just won't do :) What's the appropriate way to get an instance of a 
> > ClusterClient? Do you know of any examples I can look at?
> >
> > // ah
> >
> > -Original Message-
> > From: Yangze Guo 
> > Sent: Thursday, July 29, 2021 11:17 PM
> > To: Hailu, Andreas [Engineering] 
> > Cc: user@flink.apache.org
> > Subject: Re: Obtain JobManager Web Interface URL
> >
> > Hi, Hailu
> >
> > AFAIK, the ClusterClient#getWebInterfaceURL has been available since 1.10.
> >
> > Regarding the JobManager web interface, it will be print in the logs when 
> > staring a native Kubernetes or Yarn cluster. In standalone mode, it is 
> > configured by yourself[1].
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_pro
> > jects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders
> > _standalone_overview_-23starting-2Dand-2Dstopping-2Da-2Dcluster=DwIF
> > aQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VD
> > hfisy2OJ1ZAzai-pcCC6TFXM=FVv2XIIuWzaAGdj6tz9whXTJ5GQ_xgAqIgesdgtEjG4
> > =Cu-w4-hIu8MGtvnq2Ob8StpWCZhbFmwN4knnt35NqOM=
> >
> > Best,
> > Yangze Guo
> >
> > On Fri, Jul 30, 2021 at 1:41 AM Hailu, Andreas [Engineering] 
> >  wrote:
> > >
> > > Hi team,
> > >
> > >
> > >
> > > Is there a method available to obtain the JobManager’s REST url? We 
> > > originally overloaded CliFrontend#executeProgram and nabbed it from the 
> > > ClusterClient#getWebInterfaceUrl method, but it seems this method’s 
> > > signature has been changed and no longer available as of 1.10.0.
> > >
> > >
> > >
> > > Best,
> > >
> > > Andreas
> > >
> > >
> > >
> > >
> > > 
> > >
> > > Your Personal Data: We may collect and process information about you
> > > that may be subject to data protection laws. For more information
> > > about how we use and disclose your personal data, how we protect
> > > your information, our legal basis to use your information, your
> > > rights and who you can contact, please refer to:
> > > http://www.gs.com/privacy-notices
> >
> > 
> >
> > Your Personal Data: We may collect and process information about you
> > that may be subject to data protection laws. For more information
> > about how we use and disclose your personal data, how we protect your
> > information, our legal basis to use your information, your rights and
> > who you can contact, please refer to:
> > www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>
>
> 
>
> Your Personal Data: We may collect and process information about you that may 
> be subject to data protection laws. For more information about how we use and 
> disclose your personal data, how we protect your information, our legal basis 
> to use your information, your rights and who you can contact, please refer 
> to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>


Re: Obtain JobManager Web Interface URL

2021-08-01 Thread Yangze Guo
AFAIK, the ClusterClient should not be exposed through the public API.
Would you like to explain your use case and why you need to get the
web UI programmatically?

Best,
Yangze Guo

On Fri, Jul 30, 2021 at 9:54 PM Hailu, Andreas [Engineering]
 wrote:
>
> Hello Yangze, thanks for responding.
>
> I'm attempting to perform this programmatically on YARN, so looking at a log 
> just won't do :) What's the appropriate way to get an instance of a 
> ClusterClient? Do you know of any examples I can look at?
>
> // ah
>
> -Original Message-
> From: Yangze Guo 
> Sent: Thursday, July 29, 2021 11:17 PM
> To: Hailu, Andreas [Engineering] 
> Cc: user@flink.apache.org
> Subject: Re: Obtain JobManager Web Interface URL
>
> Hi, Hailu
>
> AFAIK, the ClusterClient#getWebInterfaceURL has been available since 1.10.
>
> Regarding the JobManager web interface, it will be print in the logs when 
> staring a native Kubernetes or Yarn cluster. In standalone mode, it is 
> configured by yourself[1].
>
> [1] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders_standalone_overview_-23starting-2Dand-2Dstopping-2Da-2Dcluster=DwIFaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=FVv2XIIuWzaAGdj6tz9whXTJ5GQ_xgAqIgesdgtEjG4=Cu-w4-hIu8MGtvnq2Ob8StpWCZhbFmwN4knnt35NqOM=
>
> Best,
> Yangze Guo
>
> On Fri, Jul 30, 2021 at 1:41 AM Hailu, Andreas [Engineering] 
>  wrote:
> >
> > Hi team,
> >
> >
> >
> > Is there a method available to obtain the JobManager’s REST url? We 
> > originally overloaded CliFrontend#executeProgram and nabbed it from the 
> > ClusterClient#getWebInterfaceUrl method, but it seems this method’s 
> > signature has been changed and no longer available as of 1.10.0.
> >
> >
> >
> > Best,
> >
> > Andreas
> >
> >
> >
> >
> > 
> >
> > Your Personal Data: We may collect and process information about you
> > that may be subject to data protection laws. For more information
> > about how we use and disclose your personal data, how we protect your
> > information, our legal basis to use your information, your rights and
> > who you can contact, please refer to:
> > http://www.gs.com/privacy-notices
>
> 
>
> Your Personal Data: We may collect and process information about you that may 
> be subject to data protection laws. For more information about how we use and 
> disclose your personal data, how we protect your information, our legal basis 
> to use your information, your rights and who you can contact, please refer 
> to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>


Re: Obtain JobManager Web Interface URL

2021-07-29 Thread Yangze Guo
Hi, Hailu

AFAIK, the ClusterClient#getWebInterfaceURL has been available since 1.10.

Regarding the JobManager web interface, it will be print in the logs
when staring a native Kubernetes or Yarn cluster. In standalone mode,
it is configured by yourself[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster

Best,
Yangze Guo

On Fri, Jul 30, 2021 at 1:41 AM Hailu, Andreas [Engineering]
 wrote:
>
> Hi team,
>
>
>
> Is there a method available to obtain the JobManager’s REST url? We 
> originally overloaded CliFrontend#executeProgram and nabbed it from the 
> ClusterClient#getWebInterfaceUrl method, but it seems this method’s signature 
> has been changed and no longer available as of 1.10.0.
>
>
>
> Best,
>
> Andreas
>
>
>
>
> 
>
> Your Personal Data: We may collect and process information about you that may 
> be subject to data protection laws. For more information about how we use and 
> disclose your personal data, how we protect your information, our legal basis 
> to use your information, your rights and who you can contact, please refer 
> to: www.gs.com/privacy-notices


Re: TaskManager crash after cancelling a job

2021-07-28 Thread Yangze Guo
In your case, the entry point is the `cleanUpInvoke` function called
by `StreamTask#invoke`.

@ro...@apache.org Could you take another look at this?

Best,
Yangze Guo

On Thu, Jul 29, 2021 at 2:29 AM Ivan Yang  wrote:
>
> Hi Yangze,
>
> I deployed 1.13.1, same problem exists. It seems like that the cancel logic 
> has changed since 1.11.0 (which was the one we have been running for almost 1 
> year). In 1.11.0, during the cancellation, we saw some subtask stays in the 
> cancelling state for sometime, but eventually the job will be cancelled, and 
> no task manager were lost. So we can start the job right away. In the new 
> version 1.13.x, it will kill the task managers where those stuck sub tasks 
> were running on, then takes another 4-5 minutes for the task manager to 
> rejoin.  Can you point me the code that manages the job cancellation routine? 
> Want to understand the logic there.
>
> Thanks,
> Ivan
>
> > On Jul 26, 2021, at 7:22 PM, Yangze Guo  wrote:
> >
> > Hi, Ivan
> >
> > My gut feeling is that it is related to FLINK-22535. Could @Yun Gao
> > take another look? If that is the case, you can upgrade to 1.13.1.
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jul 27, 2021 at 9:41 AM Ivan Yang  wrote:
> >>
> >> Dear Flink experts,
> >>
> >> We recently ran into an issue during a job cancellation after upgraded to 
> >> 1.13. After we issue a cancel (from Flink console or flink cancel 
> >> {jobid}), a few subtasks stuck in cancelling state. Once it gets to that 
> >> situation, the behavior is consistent. Those “cancelling tasks will never 
> >> become canceled. After 3 minutes, The job stopped, as a result, number of 
> >> task manages were lost. It will take about another 5 minute for the those 
> >> lost task manager to rejoin the Job manager. Then we can restart the job 
> >> from the previous checkpoint. Found an exception from the hanging 
> >> (cancelling) Task Manager.
> >> ==
> >>sun.misc.Unsafe.park(Native Method) 
> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
> >> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> >>  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) 
> >> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> >>  java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) 
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
> >>  
> >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
> >>  
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
> >>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) 
> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> >> java.lang.Thread.run(Thread.java:748)
> >> ===
> >>
> >> Here are some background information about our job and setup.
> >> 1) The job is relatively large, we have 500+ parallelism and 2000+ 
> >> subtasks. It’s mainly reading from a Kinesis stream and perform some 
> >> transformation and fanout to multiple output s3 buckets. It’s a stateless 
> >> ETL job.
> >> 2) The same code and setup running on smaller environments don’t seem to 
> >> have this cancel failure problem.
> >> 3) We have been using Flink 1.11.0 for the same job, and never seen this 
> >> cancel failure and killing Task Manager problem.
> >> 4) With upgrading to 1.13, we also added Kubernetes HA (zookeeperless). 
> >> Pervious we don’t not use HA.
> >>
> >> The cancel and restart from previous checkpoint is our regular procedure 
> >> to support daily operation. With this 10 minutes TM restart cycle, it 
> >> basically slowed down our throughput. I try to understand what leads into 
> >> this situation. Hoping maybe some configuration change will smooth things 
> >> out. Also any suggestion to shorten the waiting. It appears to be some 
> >> timeout on the TaskManager and JobManager can be  adjusted to speed it up. 
> >> But really want to avoid stuck in cancellation if we can.
> >>
> >> Thanks you, hoping to get some insight knowledge here.
> >>
> >> Ivan
>


Re: TaskManager crash after cancelling a job

2021-07-26 Thread Yangze Guo
Hi, Ivan

My gut feeling is that it is related to FLINK-22535. Could @Yun Gao
take another look? If that is the case, you can upgrade to 1.13.1.

Best,
Yangze Guo

On Tue, Jul 27, 2021 at 9:41 AM Ivan Yang  wrote:
>
> Dear Flink experts,
>
> We recently ran into an issue during a job cancellation after upgraded to 
> 1.13. After we issue a cancel (from Flink console or flink cancel {jobid}), a 
> few subtasks stuck in cancelling state. Once it gets to that situation, the 
> behavior is consistent. Those “cancelling tasks will never become canceled. 
> After 3 minutes, The job stopped, as a result, number of task manages were 
> lost. It will take about another 5 minute for the those lost task manager to 
> rejoin the Job manager. Then we can restart the job from the previous 
> checkpoint. Found an exception from the hanging (cancelling) Task Manager.
> ==
> sun.misc.Unsafe.park(Native Method) 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>  java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
>  
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> java.lang.Thread.run(Thread.java:748)
> ===
>
> Here are some background information about our job and setup.
> 1) The job is relatively large, we have 500+ parallelism and 2000+ subtasks. 
> It’s mainly reading from a Kinesis stream and perform some transformation and 
> fanout to multiple output s3 buckets. It’s a stateless ETL job.
> 2) The same code and setup running on smaller environments don’t seem to have 
> this cancel failure problem.
> 3) We have been using Flink 1.11.0 for the same job, and never seen this 
> cancel failure and killing Task Manager problem.
> 4) With upgrading to 1.13, we also added Kubernetes HA (zookeeperless). 
> Pervious we don’t not use HA.
>
> The cancel and restart from previous checkpoint is our regular procedure to 
> support daily operation. With this 10 minutes TM restart cycle, it basically 
> slowed down our throughput. I try to understand what leads into this 
> situation. Hoping maybe some configuration change will smooth things out. 
> Also any suggestion to shorten the waiting. It appears to be some timeout on 
> the TaskManager and JobManager can be  adjusted to speed it up. But really 
> want to avoid stuck in cancellation if we can.
>
> Thanks you, hoping to get some insight knowledge here.
>
> Ivan


Re: kerberos token expire

2021-07-05 Thread Yangze Guo
The ticket cache will be expired after its lifespan. You can try to
set the security.kerberos.login.use-ticket-cache to false as you
provide the keytab.

Best,
Yangze Guo

On Tue, Jul 6, 2021 at 10:02 AM 谢扬成  wrote:
>
> Hi,
>
> I processed data with flink which version is 1.12.2, data source read from 
> kafka, after logic processing, then write into HDFS with parquet format, the 
> Hadoop cluster opened kerberos authentication mechanism.
> flink-conf.yml like below:
> security.kerberos.login.use-ticket-cache: true
> security.kerberos.login.keytab: /var/kerberos/krb5/user/infa.keytab
> security.kerberos.login.principal: infa
> ticket expires after 24 hours and the renew config is false.
>
> The flink job runs normally and correctly, but after 24 hours, it throws 
> exception which is about token expires or something, the exception stack is 
> below:
> 2021-06-21 07:57:29,124 WARN  org.apache.hadoop.ipc.Client
>  [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,125 INFO  com.citics.flink.parquet.sink.ParquetBulkSink   
>  [] - activeNameNode->null
> 2021-06-21 07:57:29,127 WARN  org.apache.hadoop.ipc.Client
>  [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,128 INFO  com.citics.flink.parquet.sink.ParquetBulkSink   
>  [] - 
> realDataPath->hdfs://nameservice1/warehouse/tablespace/external/hive/edm_realtime.db/rtdw_dwd_hma_records_part
> 2021-06-21 07:57:29,138 WARN  org.apache.hadoop.ipc.Client
>  [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,139 INFO  com.citics.flink.parquet.sink.ParquetBulkSink   
>  [] - activeNameNode: null
> 2021-06-21 07:57:29,140 WARN  org.apache.hadoop.ipc.Client
>  [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,141 WARN  org.apache.hadoop.ipc.Client
>  [] - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2021-06-21 07:57:29,142 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - Source: Custom Source -> Map -> Sink: Parquet Sink 
> (1/1)#155020 (42bffc050e7ed72ca555c7cd92505404) switched from RUNNING to 
> FAILED.
> java.io.IOException: Committing during recovery failed: Could not access 
> status of source file.
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commitAfterRecovery(HadoopRecoverableFsDataOutputStream.java:281)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:218)
>  ~[flink-table-blink_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179)
>  ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at 
> org.apache.flink.streaming.api.functions.sink.files

Re: Yarn doesn't deploy multple TMs; -yn option missing in newer versions

2021-07-05 Thread Yangze Guo
Hi, Marzi,

> But only one Task manager spins up and even when I submit a second job which 
> requires more resources, The job is stuck in SCHEDULED state and no 
> additional TMs get automatically assigned.
Would you like to check if there are enough resources in your yarn
cluster? Also, the JM log will help to figure out the root cause.

> Also I have tried to specify the number of TM when starting the yarn session 
> like below, but the session starts with only 1 TM.
The "-n" or "-yn" has been dropped in Flink 1.12.  [1] FLINK-15959
might be treated as a replacement but the progress has not been
started yet. As a workaround for your testing purpose, you can submit
a warmup job(e.g. WordCount with required parallelism) and increase
the "slotmanager.taskmanager-timeout" to ensure the TM will not
timeout fast.

[1] http://issues.apache.org/jira/browse/FLINK-15959

Best,
Yangze Guo

On Tue, Jul 6, 2021 at 8:41 AM Marzi K  wrote:
>
> Hi All,
>
> I am exploring running Flink 1.12.0 on yarn and so far I have been able to 
> start a yarn session and submit a job. But only one Task manager spins up and 
> even when I submit a second job which requires more resources, The job is 
> stuck in SCHEDULED state and no additional TMs get automatically assigned.
> Any pointers as why this is the case?
> Also I have tried to specify the number of TM when starting the yarn session 
> like below, but the session starts with only 1 TM.
> ./bin/yarn-session.sh —queue  -n 4 -jm 1024 -tm 4096
>
> The older Flink version had the -yn option in the below command but it’s 
> missing from newer versions and I don’t see any replacement for it:
> ./bin/flink run -m yarn-cluster -p10 -yn 5 -yjm 1024 -ytm 4069 -yqu 
>  examples/batch/wordCount.jar
>
> Would appreciate ant pointers as this is not allowing me to test out HA 
> performance on yarn.
>
> Best,
> Marzi


Re: specify number of TM; how stream app use state of batch app; orc / parquet file format have different impact on tpcds performance benchmark.

2021-07-01 Thread Yangze Guo
> 1.  how to specify the number of TaskManager?
> In batch mode, I tried to use (Max Parallelism / (cores per tm)), but it 
> does not work. Number of TaskManager is muchlarger than (Max Parallelism / 
> cores per tm).

It not the cores per tm, but the number of slots per tm. Please refer
to taskmanager.numberOfTaskSlots [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#taskmanager-numberoftaskslots


Best,
Yangze Guo

Best,
Yangze Guo


On Thu, Jul 1, 2021 at 3:57 PM vtygoss  wrote:
>
> Hi,
>
>
> i have some questions
>
>
> 1.  how to specify the number of TaskManager?
>  In batch mode, I tried to use (Max Parallelism / (cores per tm)), but it 
> does not work. Number of TaskManager is much larger than (Max Parallelism / 
> cores per tm).
>
> 2.  in my scenario, there has alot of cumulative data and streaming 
> incremental data. is there a way to compute the result with cumulative data 
> and save the state, then continue to compute incremental data using the 
> computed state?
>
> 3.  in flink 3tb tpc-ds benchmark, i find a stange problem that ORC / Parquet 
> FileFormat has a significant impact on performance.  do i make something 
> wrong?
>
>  tpcds query1, table: store_returns, num records: 833,763,236, bytes: 
> 80GB+.  Flink task parallelism=500
>
> - using ORC+SNAPPY,  token 10 seconds to read.   picture below
>
> - using PARQUET+SNAPPY, token 5min 32 seconds to read.  picture below
>
>
>
>
> there are no special configuration about parquet in 
> $FLINK_HOME/conf/hive-site.xml.  and hive-site.xml is in attachment.
>
>
>
> ```
>
> [hive-site.xml]
>
>parquet.memory.pool.ratio=0.5
>
>hive.parquet.timestamp.skip.conversion=true
>
> ```
>
>
> pleasure to get some suggestions from you, thank you very much!
>
> Best Regards!


Re: FW: Hadoop3 with Flink

2021-06-28 Thread Yangze Guo
Sorry for the belated reply. In 1.12, you just need to make sure that
the HADOOP_CLASSPATH environment variable is set up. For more details,
please refer to [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/

Best,
Yangze Guo

On Mon, Jun 28, 2021 at 5:11 PM V N, Suchithra (Nokia - IN/Bangalore)
 wrote:
>
> Hi,
>
> Can anyone please share inputs on this?
>
>
>
> Regards,
>
> Suchithra
>
>
>
> From: V N, Suchithra (Nokia - IN/Bangalore)
> Sent: Thursday, June 24, 2021 2:35 PM
> To: user@flink.apache.org
> Subject: Hadoop3 with Flink
>
>
>
> Hello,
>
>
>
> We are using Apache flink 1.12.3 and planning to use Hadoop 3 version. Could 
> you please suggest how to use Hadoop 3 with flink distribution.
>
>
>
> Regards,
>
> Suchithra
>
>
>
>


Re: How to set state.backend.rocksdb.latency-track-enabled

2021-06-18 Thread Yangze Guo
Hi, Chen-Che,

IIUC, the "state.backend.rocksdb.latency-track-enabled" is just a
reject alternative and has been incorrectly written to the release
note. You can refer to the [1] instead.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backends-latency-tracking-options

Best,
Yangze Guo

On Fri, Jun 18, 2021 at 3:39 PM Chen-Che Huang  wrote:
>
> Hi,
>
> The 1.13 release note 
> (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) mentions that 
> we can set state.backend.rocksdb.latency-track-enabled to obtain some rockdb 
> metrics with a marginal impact. However, I couldn't see 
> state.backend.rocksdb.latency-track-enabled in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/.
>  Based on this PR (https://github.com/apache/flink/pull/15091), it seems that 
> state.backend.rocksdb.latency-track-enabled is related to 
> state.backend.latency-track.keyed-state-enable? Or which option I should set 
> for metrics with a marginal impact. However, I couldn't see. Thanks.
>
> Best wishes,
> Chen-Che Huang


Re: after upgrade flink1.12 to flink1.13.1, flink web-ui's taskmanager detail page error

2021-06-17 Thread Yangze Guo
Thanks for the report, Yidan.

It will be fixed in FLINK-23024 and hopefully fixed in 1.13.2.

Best,
Yangze Guo

On Fri, Jun 18, 2021 at 10:00 AM yidan zhao  wrote:
>
>  Yeah, I also think it is a bug.
>
> Arvid Heise  于2021年6月17日周四 下午10:13写道:
> >
> > Hi Yidan,
> >
> > could you check if the bucket exist and is accessible? Seems like this 
> > directory cannot be created 
> > bos://flink-bucket/flink/ha/opera_upd_FlinkTestJob3/blob.
> >
> > The second issue looks like a bug. I will create a ticket.
> >
> > On Wed, Jun 16, 2021 at 5:21 AM yidan zhao  wrote:
> >>
> >> does anyone has idea? Here I give another exception stack.
> >>
> >>
> >> Unhandled exception.
> >> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed
> >> to serialize the result for RPC call : requestTaskManagerDetailsInfo.
> >> at 
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> >> ~[?:1.8.0_251] at
> >> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
> >> ~[?:1.8.0_251] at
> >> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)
> >> ~[?:1.8.0_251] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.Mailbox.run(Mailbox.scala:225)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] Caused by:
> >> java.io.NotSerializableException:
> >> org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots at
> >> java.io.ObjectOutputStream.writeOb

Re: Elasticsearch sink connector timeout

2021-06-06 Thread Yangze Guo
Hi, Kai,

I think the exception should be thrown from
RetryRejectedExecutionFailureHandler as you configure the
'failure-handler' to 'retry-rejected'. It will retry the action that
failed with EsRejectedExecutionException and throw all other failures.

AFAIK, there is no way to configure the connection/socket timeout in
Elasticsearch SQL connector. However, if the root cause is a network
jitter, you may increase the sink.bulk-flush.backoff.delay and the
sink.bulk-flush.backoff.max-retries.


Best,
Yangze Guo

On Sat, Jun 5, 2021 at 2:28 PM Kai Fu  wrote:
>
> With some investigation in the task manager's log, the exception was raised 
> from RetryRejectedExecutionFailureHandler path, the related logs are showing 
> below, not sure why it's that.
>
>
> 5978 2021-06-05 05:31:31,529 INFO 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler
>  [] - Bulk request 1033 has been cancelled.
> 5979 java.lang.InterruptedException: null
> 5980 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 5981 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 5983 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 5984 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5985 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5986 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 5987 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5988 at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5989 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5990 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5991 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5992 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5993 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5994 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5995 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
> 5999 2021-06-05 05:31:31,530 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
>  [] - Failed Elasticsearch item request: null
> 6000 java.lang.InterruptedException: null
> 6001 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 6002 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 6004 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6005 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.e

Re: Flink app performance test framework

2021-06-06 Thread Yangze Guo
Hi, Luck,

I may not fully understand your requirements. If you just want to test
the performance of typical streaming jobs with the Flink, you can
refer to the nexmark[1]. If you just care about the performance
regression of your specific production jobs, I don't know there is
such a framework.

[1] https://github.com/nexmark/nexmark


Best,
Yangze Guo

On Sun, Jun 6, 2021 at 7:35 AM luck li  wrote:
>
> Hi flink community,
>
> Is there any test framework that we can use to test flink jobs performance?
> We would like to automate process for regression tests during flink version 
> upgrade and job performance tests when rolling out new changes to prod.
>
> Any suggestions would be appreciated!
>
> Thank you
> Best regards
> Luck


Re: [ANNOUNCE] Apache Flink 1.13.1 released

2021-05-31 Thread Yangze Guo
Thanks, Dawid for the great work, thanks to everyone involved.

Best,
Yangze Guo

On Mon, May 31, 2021 at 4:14 PM Youngwoo Kim (김영우)  wrote:
>
> Got it.
> Thanks Dawid for the clarification.
>
> - Youngwoo
>
> On Mon, May 31, 2021 at 4:50 PM Dawid Wysakowicz  
> wrote:
>>
>> Hi Youngwoo,
>>
>> Usually we publish the docker images a day after the general release, so
>> that the artifacts are properly distributed across Apache mirrors. You
>> should be able to download the docker images from apache/flink now. It
>> may take a few extra days to have the images published as the official
>> image, as it depends on the maintainers of docker hub.
>>
>> Best,
>>
>> Dawid
>>
>> On 31/05/2021 08:01, Youngwoo Kim (김영우) wrote:
>> > Great work! Thank you Dawid and all of the contributors.
>> > I'm eager to adopt the new release, however can't find docker images for
>> > that from https://hub.docker.com/_/flink
>> >
>> > Hope it'll be available soon.
>> >
>> > Thanks,
>> > Youngwoo
>> >
>> >
>> > On Sat, May 29, 2021 at 1:49 AM Dawid Wysakowicz 
>> > wrote:
>> >
>> >> The Apache Flink community is very happy to announce the release of Apache
>> >> Flink 1.13.1, which is the first bugfix release for the Apache Flink 1.13
>> >> series.
>> >>
>> >> Apache Flink® is an open-source stream processing framework for
>> >> distributed, high-performing, always-available, and accurate data 
>> >> streaming
>> >> applications.
>> >>
>> >> The release is available for download at:
>> >> https://flink.apache.org/downloads.html
>> >>
>> >> Please check out the release blog post for an overview of the improvements
>> >> for this bugfix release:
>> >> https://flink.apache.org/news/2021/05/28/release-1.13.1.html
>> >>
>> >> The full release notes are available in Jira:
>> >>
>> >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
>> >>
>> >> We would like to thank all contributors of the Apache Flink community who
>> >> made this release possible!
>> >>
>> >> Regards,
>> >> Dawid Wysakowicz
>> >>
>>


Re: Heartbeat Timeout

2021-05-27 Thread Yangze Guo
Hi, Rober,

To mitigate this issue, you can increase the "heartbeat.interval" and
"heartbeat.timeout". However, I think we should first figure out the
root cause, would you like to provide the log of
10.42.0.49:6122-e26293?

Best,
Yangze Guo

On Thu, May 27, 2021 at 10:44 PM Robert Cullen  wrote:
>
> I have a job that fails after @1 hour due to a TaskManager Timeout. How can I 
> prevent this from happening?
>
> 2021-05-27 10:24:21
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
> at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
> at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
> at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
> at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
> at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundRece

Re: DataStream API in Batch Mode job is timing out, please advise on how to adjust the parameters.

2021-05-25 Thread Yangze Guo
Hi, Marco,

The root cause is NoResourceAvailableException. Could you provide the
following information?
- How many slots each TM has?
- Your job's topology, it would also be good to share the job manager log.

Best,
Yangze Guo

On Tue, May 25, 2021 at 12:10 PM Marco Villalobos
 wrote:
>
> I am running with one job manager and three task managers.
>
> Each task manager is receiving at most 8 gb of data, but the job is timing 
> out.
>
> What parameters must I adjust?
>
> Sink: back fill db sink) (15/32) (50626268d1f0d4c0833c5fa548863abd) switched 
> from SCHEDULED to FAILED on [unassigned resource].
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Slot request bulk is not fulfillable! Could not allocate the required slot 
> within slot request timeout
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) 
> ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>  ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>  ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>  ~[?:1.8.0_282]
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[?:1.8.0_282]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_282]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at 

Re: ES sink never receive error code

2021-05-24 Thread Yangze Guo
Jacky is right. It's a known issue and will be fixed in FLINK-21511.

Best,
Yangze Guo

On Tue, May 25, 2021 at 8:40 AM Jacky Yin 殷传旺  wrote:
>
> If you are using es connector 6.*, actually there is a deadlock bug if the 
> backoff is enabled. The 'retry' and 'flush' share one thread pool which has 
> only one thread. Sometimes the one holding the thread tries to get the 
> semaphore which is hold by the one who tries to get the thread. Therefore 
> please upgrade to connector 7.*.
>
> 
> 发件人: Qihua Yang 
> 发送时间: 2021年5月24日 23:17
> 收件人: Yangze Guo 
> 抄送: ro...@apache.org ; user 
> 主题: Re: ES sink never receive error code
>
> Got it! thanks for helping.
>
> On Thu, May 20, 2021 at 7:15 PM Yangze Guo  wrote:
>
> > So, ES BulkProcessor retried after bulk request was partially rejected. And 
> > eventually that request was sent successfully? That is why failure handler 
> > was not called?
>
> If the bulk request fails after the max number of retries
> (bulk.flush.backoff.retries), the failure handler will still be
> called.
>
>
> Best,
> Yangze Guo
>
> On Fri, May 21, 2021 at 5:53 AM Qihua Yang  wrote:
> >
> > Thank you for the reply!
> > Yes, we did config bulk.flush.backoff.enable.
> > So, ES BulkProcessor retried after bulk request was partially rejected. And 
> > eventually that request was sent successfully? That is why failure handler 
> > was not called?
> >
> > Thanks,
> > Qihua
> >
> > On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan  wrote:
> >>
> >> Hi,
> >>
> >> Have you tried to change bulk.flush.backoff.enable?
> >> According to the docs [1], the underlying ES BulkProcessor will retry
> >> (by default), so the provided failure handler might not be called.
> >>
> >> [1]
> >> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
> >> >
> >> > Hello,
> >> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data 
> >> > to ES by using bulk requests. From ES metrics, we observed some bulk 
> >> > thread pool rejections. Contacted AWS team, their explanation is part of 
> >> > bulk request was rejected. Response body should include status for each 
> >> > item. For bulk thread pool rejection, the error code is 429.
> >> > Our flink app override FailureHandler to process error cases.
> >> > I checked Flink code, it has AfterBulk() method to handle item errors. 
> >> > FailureHandler() never received any 429 error.
> >> > Is that flink issue? Or we need to config something to make it work?
> >> > Thanks,
> >> >
> >> > Qihua


Re: Issues with forwarding environment variables

2021-05-20 Thread Yangze Guo
Hi, Milind

Could you help to provide the skeleton of your job code? Actually, if
you implement a custom function, like Tokenizer in the WordCount
example, the class member will be initialized at the client-side and
be serialized to the task manager. As a result, neither the system
envs nor the system properties at the TaskManager will be used.

If that is the case, you can initiate the `serviceName` field in the
map/flatMap or open function. Then, it will read the TM's envs or
properties instead.

Best,
Yangze Guo


On Fri, May 21, 2021 at 5:40 AM Milind Vaidya  wrote:
>
> This is java code. I have a flink job running and it is trying to fetch this 
> variable at run time itself. I see the properties getting reflected in the 
> logs as already mentioned but not visible from the code.
>
> On Thu, May 20, 2021 at 1:53 PM Roman Khachatryan  wrote:
>>
>> > private String serviceName = System.getenv("SERVICE_NAME");
>> Is it a scala object? If so, it can be initialized before any
>> properties are set.
>> What happens if the variable/property is read later at run time?
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 10:41 PM Milind Vaidya  wrote:
>> >
>> > here are the entries from taskmanager logs
>> >
>> > 2021-05-20 13:34:13,739 INFO 
>> > org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
>> > property: env.java.opts.taskmanager, 
>> > "-DSERVICE_NAME=hello-test,-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
>> > 2021-05-20 13:34:13,740 INFO 
>> > org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
>> > property: jobmanager.execution.failover-strategy, region
>> > 2021-05-20 13:34:13,742 INFO 
>> > org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
>> > property: containerized.taskmanager.env.SERVICE_NAME, "hello-test"
>> > 2021-05-20 13:34:13,743 INFO 
>> > org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
>> > property: containerized.master.env.SERVICE_NAME, "hello-test"
>> >
>> > But the error still persists
>> >
>> >
>> > On Thu, May 20, 2021 at 1:20 PM Roman Khachatryan  wrote:
>> >>
>> >> Thanks, it should work. I've created a ticket to track the issue [1].
>> >> Could you please specify Flink and Yarn versions you are using?
>> >>
>> >> You can also use properties (which don't depend on Yarn integration),
>> >> for example like this:
>> >> In flink-conf.yaml: env.java.opts.taskmanager: -DSERVICE_NAME=...
>> >> In the application: System.getProperty("SERVICE_NAME");
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Thu, May 20, 2021 at 9:50 PM Milind Vaidya  wrote:
>> >> >
>> >> >
>> >> > Hi Roman,
>> >> >
>> >> > I have added following lines to conf/flink-conf.yaml
>> >> >
>> >> > containerized.taskmanager.env.SERVICE_NAME: "test_service_name"
>> >> > containerized.master.env.SERVICE_NAME: "test_service_name"
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On Thu, May 20, 2021 at 12:30 PM Roman Khachatryan  
>> >> > wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> Could you please share the relevant parts of your flink-conf.yaml?
>> >> >>
>> >> >> Regards,
>> >> >> Roman
>> >> >>
>> >> >> On Thu, May 20, 2021 at 9:13 PM Milind Vaidya  
>> >> >> wrote:
>> >> >> >
>> >> >> > Hi
>> >> >> >
>> >> >> > Need to forward a few env variables to Job and Task manager.
>> >> >> > I am running jobs in Yarn cluster
>> >> >> > I was referring to this : Forwarding
>> >> >> >
>> >> >> > I also found Stack Overflow
>> >> >> >
>> >> >> > I was able to configure and see the variables in Flink Dashboard
>> >> >> >
>> >> >> > But the task manager logs stills says
>> >> >> >
>> >> >> > `The system environment variable SERVICE_NAME is missing` as an 
>> >> >> > exception message.
>> >> >> >
>> >> >> > The code trying to fetch it is as follows
>> >> >> >
>> >> >> > private String serviceName = System.getenv("SERVICE_NAME");
>> >> >> >
>> >> >> > Is the fetched one not the same as set one ? How to set / fetch 
>> >> >> > environment variables in such case ?
>> >> >> >


Re: ES sink never receive error code

2021-05-20 Thread Yangze Guo
> So, ES BulkProcessor retried after bulk request was partially rejected. And 
> eventually that request was sent successfully? That is why failure handler 
> was not called?

If the bulk request fails after the max number of retries
(bulk.flush.backoff.retries), the failure handler will still be
called.


Best,
Yangze Guo

On Fri, May 21, 2021 at 5:53 AM Qihua Yang  wrote:
>
> Thank you for the reply!
> Yes, we did config bulk.flush.backoff.enable.
> So, ES BulkProcessor retried after bulk request was partially rejected. And 
> eventually that request was sent successfully? That is why failure handler 
> was not called?
>
> Thanks,
> Qihua
>
> On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Have you tried to change bulk.flush.backoff.enable?
>> According to the docs [1], the underlying ES BulkProcessor will retry
>> (by default), so the provided failure handler might not be called.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
>> >
>> > Hello,
>> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to 
>> > ES by using bulk requests. From ES metrics, we observed some bulk thread 
>> > pool rejections. Contacted AWS team, their explanation is part of bulk 
>> > request was rejected. Response body should include status for each item. 
>> > For bulk thread pool rejection, the error code is 429.
>> > Our flink app override FailureHandler to process error cases.
>> > I checked Flink code, it has AfterBulk() method to handle item errors. 
>> > FailureHandler() never received any 429 error.
>> > Is that flink issue? Or we need to config something to make it work?
>> > Thanks,
>> >
>> > Qihua


Re: Root Exception can not be shown on Web UI in Flink 1.13.0

2021-05-12 Thread Yangze Guo
Hi, it seems to be related to FLINK-22276. Thus, I'd involve Matthias
to take a look.

@Matthias My gut feeling is that not all execution who has failureInfo
has been deployed?

Best,
Yangze Guo

On Wed, May 12, 2021 at 10:12 PM Gary Wu  wrote:
>
> Hi,
>
> We have upgraded our Flink applications to 1.13.0 but we found that Root 
> Exception can not be shown on Web UI with an internal server error message. 
> After opening browser development console and trace the message, we found 
> that there is a exception in jobmanager:
>
> 2021-05-12 13:30:45,589 ERROR 
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] - Unhandled 
> exception.
> java.lang.IllegalArgumentException: The location must not be null for a 
> non-global failure.
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.assertLocalExceptionInfo(JobExceptionsHandler.java:218)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createRootExceptionInfo(JobExceptionsHandler.java:191)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) 
> ~[?:?]
> at java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]
> at 
> java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632) 
> ~[?:?]
> at 
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
>  ~[?:?]
> at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
>  ~[?:?]
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) 
> ~[?:?]
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
> ~[?:?]
> at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) 
> ~[?:?]
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
> ~[?:?]
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) 
> ~[?:?]
> at 
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionHistory(JobExceptionsHandler.java:169)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionsInfo(JobExceptionsHandler.java:154)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:101)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:63)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>  [?:?]
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>  [?:?]
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  [?:?]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
> at java.lang.Thread.run(Thread.java:834) [?:?]
>
> We would like to check Is there any configuration change should be done for 
> the application? Thanks!
>
> Regards,
> -Gary
>
>
>
> APPIER EMAIL NOTICE
>
> The contents of this email message and any attachments from Appier Group Inc. 
> and/or its affiliates may be privileged and confidential. If you are not the 
> intended recipient of this email, please note that any disclosure, copying, 
> distribution, or use of this message or its attachments is prohibited. If you 
> have received this email in error, please contact us immediately and delete 
> this message and any attachments.


Re: Customized Metric Reporter can not be found by Flink

2021-05-11 Thread Yangze Guo
Hi, Fan

Flink loaded the custom reporter through the service loader mechanism.[1]
Do you add the service file in the "resources/META-INF/services" directory?

[1] https://docs.oracle.com/javase/9/docs/api/java/util/ServiceLoader.html

Best,
Yangze Guo

On Wed, May 12, 2021 at 7:53 AM Fan Xie  wrote:
>
> Hi Flink Community,
>
> Recently I implemented a customized metric reporter (named: 
> DiagnosticsMessageReporter) to report Flink metrics to a Kafka topic. I built 
> this reporter into a jar file and copy it to 
> /opt/flink/plugins/DiagnosticsMessageReporter/DiagnosticsMessageReporter.jar 
> for both the Job Manager and task manager's containers. But later on I found 
> the following logs indicated that the metric reporter can not be loaded:
>
> 2021-05-11 23:08:31,523 WARN  org.apache.flink.runtime.metrics.ReporterSetup  
>  [] - The reporter factory 
> (org.apache.flink.metrics.reporter.DiagnosticsMessageReporterFactory) could 
> not be found for reporter DiagnosticsMessageReporter. Available factories: 
> [org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, 
> org.apache.flink.metrics.slf4j.Slf4jReporterFactory, 
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory, 
> org.apache.flink.metrics.graphite.GraphiteReporterFactory, 
> org.apache.flink.metrics.statsd.StatsDReporterFactory, 
> org.apache.flink.metrics.prometheus.PrometheusReporterFactory, 
> org.apache.flink.metrics.jmx.JMXReporterFactory, 
> org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].
> 2021-05-11 23:21:55,698 INFO  
> org.apache.flink.runtime.metrics.MetricRegistryImpl  [] - No metrics 
> reporter configured, no metrics will be exposed/reported.
>
> The Flink configs I used are as following:
>
> #DiagnosticsMessageReporter configs
> metrics.reporters: DiagnosticsMessageReporter
> metrics.reporter.DiagnosticsMessageReporter.factory.class: 
> org.apache.flink.metrics.reporter.DiagnosticsMessageReporterFactory
> metrics.reporter.DiagnosticsMessageReporter.bootstrap.servers: kafka:9092
> metrics.reporter.DiagnosticsMessageReporter.topic: flink-metrics
> metrics.reporter.DiagnosticsMessageReporter.keyBy: task_attempt_id
> metrics.reporter.DiagnosticsMessageReporter.interval: 1 SECONDS
>
> Does anyone have any idea about what happened here? Am I missing some of the 
> steps to load the customized reporter as a plugin? Really appreciate if 
> someone can help to take a look at this!
>
> Best,
> Fan
>


Re: Session mode on Kubernetes and # of TMs

2021-05-11 Thread Yangze Guo
Hi, Youngwoo

In K8S session, the number of TMs depends on how many slots your job
needs and the number of slots per task managers (config key:
taskmanager.numberOfTaskSlots). In this case,

# of TM  = Ceil(total slots need / taskmanager.numberOfTaskSlots)

How many your job's topology and parallelism. For streaming SQL, the
whole job graph will locate in one slot by default. So, the number of
slots would be equal to the parallelism you set.

> Is it possible to run pre-spawned TMs for session mode? I'm looking for a way 
> to scale the computing resources. i.e., # of TM for the jobs.

I might not fully understand your problem. Do you mean starting TMs
before submitting the job? If that is the case,
- You can try the standalone k8s mode. [1]
- Warmup the session by submitting some puppet jobs yourselves and
submit your job before those TMs idle timeout.
- In FLINK-15959, we will introduce the min number of slots of the
cluster. With this feature, you can configure how many TMs needed
before submitting the jobs.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/

Best,
Yangze Guo

On Tue, May 11, 2021 at 12:24 PM Youngwoo Kim (김영우)  wrote:
>
> Hi,
>
> I have deployed a cluster with session mode on kubernetes and I can see one 
> deployment, services and one JM. I'm trying to run a SQL query through sql 
> client. for instance, 'INSERT INTO ... SELECT ...;'
>
> When I run the query in cli, the Flink session is spinning up a TM for the 
> query and then the query is running in a job.
>
> Now, I'm curious. How does Flink calculate the number of TMs for the query? 
> and also, Is it possible to run pre-spawned TMs for session mode? I'm looking 
> for a way to scale the computing resources. i.e., # of TM for the jobs.
>
> Thanks,
> Youngwoo


Re: 问题咨询

2021-05-09 Thread Yangze Guo
Hi, 看日志application已经提交到yarn了,但是am没有调度起来,看一下Yarn界面application_1620481460888_0003
这个application的状态是什么?有没有报错

Best,
Yangze Guo

On Mon, May 10, 2021 at 9:41 AM wyinj...@126.com  wrote:
>
> 您好,在使用flink on YARN的时候遇到了问题,寻求帮助,YARN集群搭建完成之后,运行flink命令如下:
> ./bin/flink run -t yarn-per-job --detached ./examples/batch/WordCount.jar
> 但是发现没有输出运行结果,日志如下
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> 2021-05-08 21:53:46,561 WARN  
> org.apache.flink.yarn.configuration.YarnLogConfigUtil[] - The 
> configuration directory ('/mnt/flink-1.12.3/conf') already contains a LOG4J 
> config file.If you want to use logback, then please delete or rename the log 
> configuration file.
> 2021-05-08 21:53:46,611 INFO  
> org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - 
> Connecting to ResourceManager at node01/172.19.0.132:8032
> 2021-05-08 21:53:46,925 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - No path for the flink jar passed. Using the location of 
> class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2021-05-08 21:53:47,159 INFO  org.apache.hadoop.conf.Configuration
>  [] - resource-types.xml not found
> 2021-05-08 21:53:47,160 INFO  
> org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to 
> find 'resource-types.xml'.
> 2021-05-08 21:53:47,224 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - The configured JobManager memory is 1600 MB. YARN will 
> allocate 2048 MB to make up an integer multiple of its minimum allocation 
> memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The 
> extra 448 MB may not be used by Flink.
> 2021-05-08 21:53:47,224 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - The configured TaskManager memory is 1728 MB. YARN will 
> allocate 2048 MB to make up an integer multiple of its minimum allocation 
> memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The 
> extra 320 MB may not be used by Flink.
> 2021-05-08 21:53:47,225 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Cluster specification: 
> ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, 
> slotsPerTaskManager=1}
> 2021-05-08 21:53:53,360 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Submitting application master application_1620481460888_0003
> 2021-05-08 21:53:53,425 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl[] - Submitted 
> application application_1620481460888_0003
> 2021-05-08 21:53:53,425 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Waiting for the cluster to be allocated
> 2021-05-08 21:53:53,427 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Deploying cluster, current state ACCEPTED
> 2021-05-08 21:54:53,548 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Deployment took more than 60 seconds. Please check if the 
> requested resources are available in the YARN cluster
> 2021-05-08 21:54:53,825 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Deployment took more than 60 seconds. Please check if the 
> requested resources are available in the YARN cluster
> 2021-05-08 21:54:54,077 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Deployment took more than 60 seconds. Please check if the 
> requested resources are available in the YARN cluster
>
> 代码里输出的内容正常打印,但是没有打印结果
> flink版本  1.12.3   Hadoop版本3.3.0
> 期望得到您的回复
>
>
>
> wyinj...@126.com


Re: How to increase the number of task managers?

2021-05-07 Thread Yangze Guo
Hi,

> I wonder if I can tune the number of task managers? Is there a corresponding 
> config?

With K8S/Yarn resource provider, the task managers are allocated on
demand. So, the number of them are depends on the max parallelism and
the slot sharing group topology of your job.
In standalone mode, you need to config the "conf/workers" in your
flink distribution to decide the number of task managers[3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster

Best,
Yangze Guo

Best,
Yangze Guo


On Fri, May 7, 2021 at 7:34 PM Tamir Sagi  wrote:
>
> Hey
>
> num of TMs = parallelism / num of slots
>
> parallelism.default is another config you should consider.
>
> Read also
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/parallel/
>
>
> 
> From: Yik San Chan 
> Sent: Friday, May 7, 2021 1:56 PM
> To: user 
> Subject: How to increase the number of task managers?
>
>
> EXTERNAL EMAIL
>
>
>
> Hi community,
>
> According to the 
> [docs](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/):
>
> > taskmanager.numberOfTaskSlots: The number of slots that a TaskManager 
> > offers (default: 1). Each slot can take one task or pipeline. Having 
> > multiple slots in a TaskManager can help amortize certain constant 
> > overheads (of the JVM, application libraries, or network connections) 
> > across parallel tasks or pipelines. See the Task Slots and Resources 
> > concepts section for details.
>
> > Running more smaller TaskManagers with one slot each is a good starting 
> > point and leads to the best isolation between tasks. Dedicating the same 
> > resources to fewer larger TaskManagers with more slots can help to increase 
> > resource utilization, at the cost of weaker isolation between the tasks 
> > (more tasks share the same JVM).
>
> We're able to tune slot count by setting taskmanager.numberOfTaskSlots, that 
> may help parallelize my task.
>
> I wonder if I can tune the number of task managers? Is there a corresponding 
> config?
>
> Best,
> Yik San
>
>
> Confidentiality: This communication and any attachments are intended for the 
> above-named persons only and may be confidential and/or legally privileged. 
> Any opinions expressed in this communication are not necessarily those of 
> NICE Actimize. If this communication has come to you in error you must take 
> no action based on it, nor must you copy or show it to anyone; please 
> delete/destroy and inform the sender by e-mail immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and 
> attachments are free from any virus, we advise that in keeping with good 
> computing practice the recipient should ensure they are actually virus free.


Re: Enabling Checkpointing using FsStatebackend

2021-05-07 Thread Yangze Guo
Hi,

I think the checkpointing is not the root cause of your job failure.
As the log describes, your job failed caused by the authorization
issue of Kafka. "Caused by:
org.apache.kafka.common.errors.TransactionalIdAuthorizationException:
Transactional Id authorization failed."

Best,
Yangze Guo

On Fri, May 7, 2021 at 11:29 PM sudhansu jena
 wrote:
>
> Hi Team,
>
> We have recently enabled checking pointing using FsStateBackend where we are 
> trying to use S3 bucket as the persistent storage but after enabling it we 
> are running into issues while submitting the job into the cluster.
>
> Can you please let us know if we are missing anything ?
>
>
> Below is the code sample  for enabling Checkpointing.
>
> env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/fhirmapper"));
> env.enableCheckpointing(1000);
>
>
>
> Below logs for the issue.
>
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, 
> backoffTimeMS=15000)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
> at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
> at jdk.internal.reflect.GeneratedMethodAccessor366.invoke(Unknown Source)
> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.kafka.common.errors.TransactionalIdAuthorizationException: 
> Transactional Id authorization failed.
>
>
> Thanks,
> Sudhansu
>
>


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-07 Thread Yangze Guo
Thanks, Dawid & Guowei for the great work, thanks to everyone involved.

Best,
Yangze Guo

On Thu, May 6, 2021 at 5:51 PM Rui Li  wrote:
>
> Thanks to Dawid and Guowei for the great work!
>
> On Thu, May 6, 2021 at 4:48 PM Zhu Zhu  wrote:
>>
>> Thanks Dawid and Guowei for being the release managers! And thanks everyone 
>> who has made this release possible!
>>
>> Thanks,
>> Zhu
>>
>> Yun Tang  于2021年5月6日周四 下午2:30写道:
>>>
>>> Thanks for Dawid and Guowei's great work, and thanks for everyone involved 
>>> for this release.
>>>
>>> Best
>>> Yun Tang
>>> 
>>> From: Xintong Song 
>>> Sent: Thursday, May 6, 2021 12:08
>>> To: user ; dev 
>>> Subject: Re: [ANNOUNCE] Apache Flink 1.13.0 released
>>>
>>> Thanks Dawid & Guowei as the release managers, and everyone who has
>>> contributed to this release.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:
>>>
>>> > Thanks Dawid & Guowei for the great work, thanks everyone involved.
>>> >
>>> > Best,
>>> > Leonard
>>> >
>>> > 在 2021年5月5日,17:12,Theo Diefenthal  写道:
>>> >
>>> > Thanks for managing the release. +1. I like the focus on improving
>>> > operations with this version.
>>> >
>>> > --
>>> > *Von: *"Matthias Pohl" 
>>> > *An: *"Etienne Chauchot" 
>>> > *CC: *"dev" , "Dawid Wysakowicz" <
>>> > dwysakow...@apache.org>, "user" ,
>>> > annou...@apache.org
>>> > *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
>>> > *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
>>> >
>>> > Yes, thanks for managing the release, Dawid & Guowei! +1
>>> >
>>> > On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
>>> > wrote:
>>> >
>>> >> Congrats to everyone involved !
>>> >>
>>> >> Best
>>> >>
>>> >> Etienne
>>> >> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> >> Apache Flink 1.13.0.
>>> >>
>>> >> Apache Flink® is an open-source stream processing framework for
>>> >> distributed, high-performing, always-available, and accurate data 
>>> >> streaming
>>> >> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> >> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >>
>>> >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349287
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community who
>>> >> made this release possible!
>>> >>
>>> >> Regards,
>>> >> Guowei & Dawid
>>> >>
>>> >>
>>> >
>>> >
>
>
>
> --
> Best regards!
> Rui Li


Re: Deployment/Memory Configuration/Scalability

2021-04-26 Thread Yangze Guo
Hi, Radoslav,

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I 
> should go with session mode and make sure that my flink cluster is running a 
> single job?

Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
your configuration, you need to also enable the checkpoint[2], which
is automatically triggered and helps you to resume the program when
failure, by setting the execution.checkpointing.interval.

> 3. Let's assume that savepoints should be triggered only before job 
> update/deployment. How can I trigger a savepoint if my job is already 
> consuming more than 80% of the allowed memory per pod in k8s? My observations 
> show that k8s kills task managers (which are running as pods) and I need to 
> retry it a couple of times.

I think with the checkpoint, you no longer need to trigger the
savepoint manually with a specific condition as the checkpoint will be
periodically triggered.

> 4. Should I consider upgrading to version 1.12.3?
> 5. Should I consider switching off state.backend.rocksdb.memory.managed 
> property even in version 1.12.3?

I'm not an expert on the state backend, but it seems the fix of that
issue is only applied to the docker image. So I guess you can package
a custom image yourselves if you do not want to upgrade. However, if
you are using the Native K8S mode[3] and there is no compatibility
issue, I think it might be good to upgrading because there are also
lots of improvements[4] in 1.12.

> 6. How do I decide when the job parallelism should be increased? Are there 
> some metrics which can lead me to a clue that the parallelism should be 
> increased?

As there are 6 Kafka sources in your job, I think the parallelism
should first be fixed with the topic partition number. For metrics,
you could refer to the backpressure of tasks and
numRecordsOutPerSecond[5].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[4] https://issues.apache.org/jira/browse/FLINK-17709
[5] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io

Best,
Yangze Guo

On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov
 wrote:
>
> Hi all,
>
> I am having multiple questions regarding Flink :) Let me give you some 
> background of what I have done so far.
>
> Description
> I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed 
> from 6 different kafka topics and it is joined via multiple 
> CoProcessFunctions. On a daily basis the job is handling ~20 millions events 
> from the source kafka topics.
>
> Configuration
> These are the settings I am using:
>
> jobmanager.memory.process.size: 4096m
> jobmanager.memory.off-heap.size: 512m
> taskmanager.memory.process.size: 12000m
> taskmanager.memory.task.off-heap.size: 512m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 5
> taskmanager.rpc.port: 6122
> jobmanager.execution.failover-strategy: region
> state.backend: rocksdb
> state.backend.incremental: true
> state.backend.rocksdb.localdir: /opt/flink/rocksdb
> state.backend.rocksdb.memory.managed: true
> state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
> state.backend.rocksdb.block.cache-size: 64mb
> state.checkpoints.dir: s3://bucket/checkpoints
> state.savepoints.dir: s3://bucket/savepoints
> s3.access-key: AWS_ACCESS_KEY_ID
> s3.secret-key: AWS_SECRET_ACCESS_KEY
> s3.endpoint: http://
> s3.path.style.access: true
> s3.entropy.key: _entropy_
> s3.entropy.length: 8
> presto.s3.socket-timeout: 10m
> client.timeout: 60min
>
> Deployment setup
> Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task 
> managers. I have a daily cron job which triggers savepoint in order to have a 
> fresh copy of the whole state.
>
> Problems with the existing setup
> 1. I observe that savepoints are causing Flink to consume more than the 
> allowed memory. I observe the behavior described in this stackoverflow post 
> (which seems to be solved in 1.12.X if I am getting it right).
> 2. I cannot achieve high availability with Per-Job mode and thus I ended up 
> having a regular savepoint on a daily basis.
>
> Questions
> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I 
> should go with session mode and make sure that my flink cluster is running a 
> single job?
> 3. Let's assume that savepoints should be triggered only before job 
> update/deployment. How can I trigger 

Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Yangze Guo
If the GenericCLI is selected, then the execution.target should have
been overwritten to "yarn-application" in GenericCLI#toConfiguration.
It is odd that why the GenericCLI#isActive return false as the
execution.target is defined in both flink-conf and command line.

Best,
Yangze Guo

On Mon, Apr 26, 2021 at 5:14 PM Till Rohrmann  wrote:
>
> I think you are right that the `GenericCLI` should be the first choice. From 
> the top of my head I do not remember why FlinkYarnSessionCli is still used. 
> Maybe it is in order to support some Yarn specific cli option parsing. I 
> assume it is either an oversight or some parsing has not been completely 
> migrated to the GenericCLI.
>
> Cheers,
> Till
>
> On Mon, Apr 26, 2021 at 11:07 AM Yangze Guo  wrote:
>>
>> Hi, Till,
>>
>> I agree that we need to resolve the issue by overriding the
>> configuration before selecting the CustomCommandLines. However, IIUC,
>> after FLINK-15852 the GenericCLI should always be the first choice.
>> Could you help me to understand why the FlinkYarnSessionCli can be
>> activated?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann  wrote:
>> >
>> > Hi Tony,
>> >
>> > I think you are right that Flink's cli does not behave super consistent at 
>> > the moment. Case 2. should definitely work because `-t yarn-application` 
>> > should overwrite what is defined in the Flink configuration. The problem 
>> > seems to be that we don't resolve the configuration wrt the specified 
>> > command line options before calling into `CustomCommandLine.isActive`. If 
>> > we parsed first the command line configuration options which can overwrite 
>> > flink-conf.yaml options and then replaced them, then the custom command 
>> > lines (assuming that they use the Configuration as the ground truth) 
>> > should behave consistently.
>> >
>> > For your questions:
>> >
>> > 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on 
>> > purpose when introducing the yarn application mode.
>> > 2. See answer 1.
>> >
>> > I think it is a good idea to extend the description of the config option 
>> > `execution.target`. Do you want to create a ticket and a PR for it?
>> >
>> > Cheers,
>> > Till
>> >
>> > On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
>> >>
>> >> Hi, Tony.
>> >>
>> >> What is the version of your flink-dist. AFAIK, this issue should be
>> >> addressed in FLINK-15852[1]. Could you give the client log of case
>> >> 2(set the log level to DEBUG would be better).
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-15852
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei  wrote:
>> >> >
>> >> > Hi Experts,
>> >> >
>> >> > I recently tried to run yarn-application mode on my yarn cluster, and I 
>> >> > had a problem related to configuring `execution.target`.
>> >> > After reading the source code and doing some experiments, I found that 
>> >> > there should be some room of improvement for `FlinkYarnSessionCli` or 
>> >> > `AbstractYarnCli`.
>> >> >
>> >> > My experiments are:
>> >> >
>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and run 
>> >> > `flink run-application -t yarn-application`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is not active
>> >> > `GenericCLI` is active
>> >> >
>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run 
>> >> > `flink run-application -t yarn-application`: run job failed
>> >> >
>> >> > failed due to `ClusterDeploymentException` [1]
>> >> > `FlinkYarnSessionCli` is active
>> >> >
>> >> > setting `execution.target: yarn-application` in flink-conf.yaml and run 
>> >> > `flink run -t yarn-per-job`: run job successfully.
>> >> >
>> >> > `FlinkYarnSessionCli` is not active
>> >> > `GenericCLI` is active
>> >> >
>> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run 
>> >> > `flink run -t yarn-per-job`: run job successfully.
>> >> >
>> >> > `FlinkYarnSession

Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Yangze Guo
Hi, Till,

I agree that we need to resolve the issue by overriding the
configuration before selecting the CustomCommandLines. However, IIUC,
after FLINK-15852 the GenericCLI should always be the first choice.
Could you help me to understand why the FlinkYarnSessionCli can be
activated?


Best,
Yangze Guo

On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann  wrote:
>
> Hi Tony,
>
> I think you are right that Flink's cli does not behave super consistent at 
> the moment. Case 2. should definitely work because `-t yarn-application` 
> should overwrite what is defined in the Flink configuration. The problem 
> seems to be that we don't resolve the configuration wrt the specified command 
> line options before calling into `CustomCommandLine.isActive`. If we parsed 
> first the command line configuration options which can overwrite 
> flink-conf.yaml options and then replaced them, then the custom command lines 
> (assuming that they use the Configuration as the ground truth) should behave 
> consistently.
>
> For your questions:
>
> 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on purpose 
> when introducing the yarn application mode.
> 2. See answer 1.
>
> I think it is a good idea to extend the description of the config option 
> `execution.target`. Do you want to create a ticket and a PR for it?
>
> Cheers,
> Till
>
> On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
>>
>> Hi, Tony.
>>
>> What is the version of your flink-dist. AFAIK, this issue should be
>> addressed in FLINK-15852[1]. Could you give the client log of case
>> 2(set the log level to DEBUG would be better).
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15852
>>
>> Best,
>> Yangze Guo
>>
>> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei  wrote:
>> >
>> > Hi Experts,
>> >
>> > I recently tried to run yarn-application mode on my yarn cluster, and I 
>> > had a problem related to configuring `execution.target`.
>> > After reading the source code and doing some experiments, I found that 
>> > there should be some room of improvement for `FlinkYarnSessionCli` or 
>> > `AbstractYarnCli`.
>> >
>> > My experiments are:
>> >
>> > setting `execution.target: yarn-application` in flink-conf.yaml and run 
>> > `flink run-application -t yarn-application`: run job successfully.
>> >
>> > `FlinkYarnSessionCli` is not active
>> > `GenericCLI` is active
>> >
>> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run `flink 
>> > run-application -t yarn-application`: run job failed
>> >
>> > failed due to `ClusterDeploymentException` [1]
>> > `FlinkYarnSessionCli` is active
>> >
>> > setting `execution.target: yarn-application` in flink-conf.yaml and run 
>> > `flink run -t yarn-per-job`: run job successfully.
>> >
>> > `FlinkYarnSessionCli` is not active
>> > `GenericCLI` is active
>> >
>> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run `flink 
>> > run -t yarn-per-job`: run job successfully.
>> >
>> > `FlinkYarnSessionCli` is active
>> >
>> > From `AbstractYarnCli#isActive` [2] and `FlinkYarnSessionCli#isActive` 
>> > [3], `FlinkYarnSessionCli` will be active when `execution.target` is 
>> > specified with `yarn-per-job` or `yarn-session`.
>> >
>> > According to the flink official document [4], I thought the 2nd experiment 
>> > should also work well, but it didn't.
>> >>
>> >> The --target will overwrite the execution.target specified in the 
>> >> config/flink-config.yaml.
>> >
>> >
>> > The root cause is that `FlinkYarnSessionCli` only overwrite the 
>> > `execution.target` with `yarn-session` or `yarn-per-job` [5], but no 
>> > `yarn-application`.
>> > So, my question is
>> >
>> > should we use `FlinkYarnSessionCli` in case 2?
>> > if we should, how we can improve `FlinkYarnSessionCli` so that we can 
>> > overwrite `execution.target` via `--target`?
>> >
>> > and one more improvement, the config description for `execution.target` 
>> > [6] should include `yarn-application` as well.
>> >
>> > [1] 
>> > https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L439-L447
>> > [2] 
>> > https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java#L54-L66
>> > [3] 
>> > https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L373-L377
>> > [4] 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#selecting-deployment-targets
>> > [5] 
>> > https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L397-L413
>> > [6] 
>> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46
>> >
>> > best regards,
>> >


Re: Kubernetes Setup - JM as job vs JM as deployment

2021-04-26 Thread Yangze Guo
Hi, Gil

IIUC, you want to deploy Flink cluster using YAML files yourselves and
want to know whether the JM should be deployed as Job[1] or
Deployment. If that is the case, as Matthias mentioned, Flink provides
two ways to integrate with K8S [2][3], in [3] the JM will be deployed
as a Deployment.

[1] https://kubernetes.io/docs/concepts/workloads/controllers/job/
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html

Best,
Yangze Guo

On Thu, Apr 22, 2021 at 10:46 PM Matthias Pohl  wrote:
>
> Hi Gil,
> I'm not sure whether I understand you correctly. What do you mean by 
> deploying the job manager as "job" or "deployment"? Are you referring to the 
> different deployment modes, Flink offers [1]? These would be independent of 
> Kubernetes. Or do you wonder what the differences are between the Flink on 
> Kubernetes (native) [2] vs Flink on Kubernetes (standalone using YAML files)?
>
> Best,
> Matthias
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/#deployment-modes
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html
>
> On Wed, Apr 21, 2021 at 11:19 PM Gil Amsalem  wrote:
>>
>> Hi,
>>
>> I found that there are 2 different approaches to setup Flink over kubernetes.
>> 1. Deploy job manager as Job.
>> 2. Deploy job manager as Deployment.
>>
>> What is the recommended way? What are the benefits of each?
>>
>> Thanks,
>> Gil Amsalem


Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Yangze Guo
Hi, Tony.

What is the version of your flink-dist. AFAIK, this issue should be
addressed in FLINK-15852[1]. Could you give the client log of case
2(set the log level to DEBUG would be better).

[1] https://issues.apache.org/jira/browse/FLINK-15852

Best,
Yangze Guo

On Sun, Apr 25, 2021 at 11:33 AM Tony Wei  wrote:
>
> Hi Experts,
>
> I recently tried to run yarn-application mode on my yarn cluster, and I had a 
> problem related to configuring `execution.target`.
> After reading the source code and doing some experiments, I found that there 
> should be some room of improvement for `FlinkYarnSessionCli` or 
> `AbstractYarnCli`.
>
> My experiments are:
>
> setting `execution.target: yarn-application` in flink-conf.yaml and run 
> `flink run-application -t yarn-application`: run job successfully.
>
> `FlinkYarnSessionCli` is not active
> `GenericCLI` is active
>
> setting `execution.target: yarn-per-job` in flink-conf.yaml and run `flink 
> run-application -t yarn-application`: run job failed
>
> failed due to `ClusterDeploymentException` [1]
> `FlinkYarnSessionCli` is active
>
> setting `execution.target: yarn-application` in flink-conf.yaml and run 
> `flink run -t yarn-per-job`: run job successfully.
>
> `FlinkYarnSessionCli` is not active
> `GenericCLI` is active
>
> setting `execution.target: yarn-per-job` in flink-conf.yaml and run `flink 
> run -t yarn-per-job`: run job successfully.
>
> `FlinkYarnSessionCli` is active
>
> From `AbstractYarnCli#isActive` [2] and `FlinkYarnSessionCli#isActive` [3], 
> `FlinkYarnSessionCli` will be active when `execution.target` is specified 
> with `yarn-per-job` or `yarn-session`.
>
> According to the flink official document [4], I thought the 2nd experiment 
> should also work well, but it didn't.
>>
>> The --target will overwrite the execution.target specified in the 
>> config/flink-config.yaml.
>
>
> The root cause is that `FlinkYarnSessionCli` only overwrite the 
> `execution.target` with `yarn-session` or `yarn-per-job` [5], but no 
> `yarn-application`.
> So, my question is
>
> should we use `FlinkYarnSessionCli` in case 2?
> if we should, how we can improve `FlinkYarnSessionCli` so that we can 
> overwrite `execution.target` via `--target`?
>
> and one more improvement, the config description for `execution.target` [6] 
> should include `yarn-application` as well.
>
> [1] 
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L439-L447
> [2] 
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java#L54-L66
> [3] 
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L373-L377
> [4] 
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#selecting-deployment-targets
> [5] 
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L397-L413
> [6] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46
>
> best regards,
>


Re: Receiving context information through JobListener interface

2021-04-25 Thread Yangze Guo
It seems that the JobListener interface could not expose such
information. Maybe you can set the RuleId as the jobName(or the suffix
of the jobName) of the application, then you can get the mappings of
jobId to jobName(RuleId) throw /jobs/overview.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/#jobs-overview

Best,
Yangze Guo

On Sun, Apr 25, 2021 at 4:17 PM Barak Ben Nathan
 wrote:
>
>
>
> Hi all,
>
>
>
> I am building an application that launches Flink Jobs and monitors them.
>
>
>
> I want to use the JobListener interface to output job evemts to a Kafka Topic.
>
>
>
> The problem:
>
> In the application we have RuleId, i.e.  business logic identifier for the 
> job,  and there’s JobId which is  the internal identifier generated by Flink.
>
> I need the events emitted to Kafka to be partitioned by *RuleId*.
>
>
>
> Is there a way to pass this kind of information to Flink and get it through 
> the JobListener interface?
>
>
>
> Thanks,
>
> Barak


Re: flink1.12.1 Sink数据到ES7,遇到 Invalid lambda deserialization 问题

2021-04-15 Thread Yangze Guo
可以参考下[1], 如果是相同的问题,将依赖改为flink-connector-elasticsearch

[1] https://issues.apache.org/jira/browse/FLINK-18857

Best,
Yangze Guo

On Fri, Apr 16, 2021 at 10:43 AM Yangze Guo  wrote:
>
> 有完整报错栈或者日志能发下么?
>
> Best,
> Yangze Guo
>
> On Fri, Apr 16, 2021 at 9:33 AM william <712677...@qq.com> wrote:
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.12.1 Sink数据到ES7,遇到 Invalid lambda deserialization 问题

2021-04-15 Thread Yangze Guo
有完整报错栈或者日志能发下么?

Best,
Yangze Guo

On Fri, Apr 16, 2021 at 9:33 AM william <712677...@qq.com> wrote:
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Does it support gpu coding in flink?

2021-04-12 Thread Yangze Guo
Hi, 目前有一个MNIST Inference的Demo[1]是使用GPU的,但是没有用到TensorFlow.
在flink-ai-extended项目中有个TensorFlow训练MNIST的例子[2],但不确定能否直接用GPU版TF执行。我帮你involve一下Becket和Wei来确认下。

[1] https://github.com/KarmaGYZ/flink-mnist
[2] 
https://github.com/alibaba/flink-ai-extended/tree/master/deep-learning-on-flink/flink-ml-examples/src/main/java/com/alibaba/flink/ml/examples/tensorflow/mnist

Best,
Yangze Guo


On Mon, Apr 12, 2021 at 2:35 PM 张颖  wrote:
>
> HI,I am running a tf inference task on my cluster,but I flind it took so long 
> a time to get response, becase it is a bert model and I run it on cpu 
> machine.My componey has gpu k8s cluster,and I read the document 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/advanced/external_resources/
>
>
>
> count you give me a demo?Including tf inference on gpu and train on gpu?
>
> I use alink in some of my task, is there a demo for alink on gpu?
>
>
> this is part of the answer:
> https://issues.apache.org/jira/browse/FLINK-22205


Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread Yangze Guo
IIUC, your program will finally generate 100 ChildFirstClassLoader in
a TM. But it should always be GC when job finished. So, as Arvid said,
you'd better check who is referencing those ChildFirstClassLoader.


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 5:43 PM 太平洋 <495635...@qq.com> wrote:
>
> My application program looks like this. Does this structure has some problem?
>
> public class StreamingJob {
> public static void main(String[] args) throws Exception {
> int i = 0;
> while (i < 100) {
> try {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.setParallelism(Parallelism);
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner()
> .inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
> bsSettings);
>
> bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
> Table t = bsTableEnv.sqlQuery(query);
>
> DataStream points = bsTableEnv.toAppendStream(t, DataPoint.class);
>
> DataStream weightPoints = points.map();
>
> DataStream predictPoints = weightPoints.keyBy()
> .reduce().map();
>
> // side output
> final OutputTag outPutPredict = new 
> OutputTag("predict") {
> };
>
> SingleOutputStreamOperator mainDataStream = predictPoints
> .process();
>
> DataStream exStream = 
> mainDataStream.getSideOutput(outPutPredict);
>
> //write data to clickhouse
> String insertIntoCKSql = "xxx";
> mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
> new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
> new 
> JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
> .withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));
>
> // write data to kafka
> FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
> exStream.map().addSink(producer);
>
> env.execute("Prediction Program");
> } catch (Exception e) {
> e.printStackTrace();
> }
> i++;
> Thread.sleep(window * 1000);
> }
> }
> }
>
>
>
> -- 原始邮件 --
> 发件人: "Arvid Heise" ;
> 发送时间: 2021年4月8日(星期四) 下午2:33
> 收件人: "Yangze Guo";
> 抄送: 
> "太平洋"<495635...@qq.com>;"user";"guowei.mgw";"renqschn";
> 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> Hi,
>
> ChildFirstClassLoader are created (more or less) by application jar and 
> seeing so many looks like a classloader leak to me. I'd expect you to see a 
> new ChildFirstClassLoader popping up with each new job submission.
>
> Can you check who is referencing the ChildFirstClassLoader transitively? 
> Usually, it's some thread that is lingering around because some third party 
> library is leaking threads etc.
>
> OneInputStreamTask is legit and just indicates that you have a job running 
> with 4 slots on that TM. It should not hold any dedicated metaspace memory.
>
> On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo  wrote:
>>
>> I went through the JM & TM logs but could not find any valuable clue.
>> The exception is actually thrown by kafka-producer-network-thread.
>> Maybe @Qingsheng could also take a look?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote:
>> >
>> > I have configured to 512M, but problem still exist. Now the memory size is 
>> > still 256M.
>> > Attachments are TM and JM logs.
>> >
>> > Look forward to your reply.
>> >
>> > -- 原始邮件 --
>> > 发件人: "Yangze Guo" ;
>> > 发送时间: 2021年4月6日(星期二) 晚上6:35
>> > 收件人: "太平洋"<495635...@qq.com>;
>> > 抄送: "user";"guowei.mgw";
>> > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>> >
>> > > I have tried this method, but the problem still exist.
>> > How much memory do you configure for it?
>> >
>> > > is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
>> > Not quite sure about it. AFAIK, each job will have a classloader.
>> > Multiple tasks of the same job in the same TM will share the same
>> > classloader. The classloader will be removed if there is no more task
>> > running on the TM. Classloader without reference will be finally
>> > cleanup by GC. Could you share JM and TM logs for further analysis?
>

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-07 Thread Yangze Guo
I went through the JM & TM logs but could not find any valuable clue.
The exception is actually thrown by kafka-producer-network-thread.
Maybe @Qingsheng could also take a look?


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote:
>
> I have configured to 512M, but problem still exist. Now the memory size is 
> still 256M.
> Attachments are TM and JM logs.
>
> Look forward to your reply.
>
> -- 原始邮件 ------
> 发件人: "Yangze Guo" ;
> 发送时间: 2021年4月6日(星期二) 晚上6:35
> 收件人: "太平洋"<495635...@qq.com>;
> 抄送: "user";"guowei.mgw";
> 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> > I have tried this method, but the problem still exist.
> How much memory do you configure for it?
>
> > is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
> Not quite sure about it. AFAIK, each job will have a classloader.
> Multiple tasks of the same job in the same TM will share the same
> classloader. The classloader will be removed if there is no more task
> running on the TM. Classloader without reference will be finally
> cleanup by GC. Could you share JM and TM logs for further analysis?
> I'll also involve @Guowei Ma in this thread.
>
>
> Best,
> Yangze Guo
>
> On Tue, Apr 6, 2021 at 6:05 PM 太平洋 <495635...@qq.com> wrote:
> >
> > I have tried this method, but the problem still exist.
> > by heap dump analysis, is 21 instances of 
> > "org.apache.flink.util.ChildFirstClassLoader" normal?
> >
> >
> > -- 原始邮件 --
> > 发件人: "Yangze Guo" ;
> > 发送时间: 2021年4月6日(星期二) 下午4:32
> > 收件人: "太平洋"<495635...@qq.com>;
> > 抄送: "user";
> > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
> >
> > I think you can try to increase the JVM metaspace option for
> > TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
> >
> > Best,
> > Yangze Guo
> >
> > Best,
> > Yangze Guo
> >
> >
> > On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
> > >
> > > batch job:
> > > read data from s3 by sql,then by some operators and write data to 
> > > clickhouse and kafka.
> > > after some times, task-manager quit with OutOfMemoryError: Metaspace.
> > >
> > > env:
> > > flink version:1.12.2
> > > task-manager slot count: 5
> > > deployment: standalone kubernetes session 模式
> > > dependencies:
> > >
> > > 
> > >
> > >   org.apache.flink
> > >
> > >   flink-connector-kafka_2.11
> > >
> > >   ${flink.version}
> > >
> > > 
> > >
> > > 
> > >
> > >   com.google.code.gson
> > >
> > >   gson
> > >
> > >   2.8.5
> > >
> > > 
> > >
> > > 
> > >
> > >   org.apache.flink
> > >
> > >   flink-connector-jdbc_2.11
> > >
> > >   ${flink.version}
> > >
> > > 
> > >
> > > 
> > >
> > >   ru.yandex.clickhouse
> > >
> > >   clickhouse-jdbc
> > >
> > >   0.3.0
> > >
> > > 
> > >
> > > 
> > >
> > >   org.apache.flink
> > >
> > > flink-parquet_2.11
> > >
> > > ${flink.version}
> > >
> > > 
> > >
> > > 
> > >
> > >  org.apache.flink
> > >
> > >  flink-json
> > >
> > >  ${flink.version}
> > >
> > > 
> > >
> > >
> > > heap dump1:
> > >
> > > Leak Suspects
> > >
> > > System Overview
> > >
> > >  Leaks
> > >
> > >  Overview
> > >
> > >
> > >   Problem Suspect 1
> > >
> > > 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> > > "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 29,656,880 
> > > (41.16%) bytes.
> > >
> > > Biggest instances:
> > >
> > > org.apache.flink.util.Child

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread Yangze Guo
> I have tried this method, but the problem still exist.
How much memory do you configure for it?

> is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
Not quite sure about it. AFAIK, each job will have a classloader.
Multiple tasks of the same job in the same TM will share the same
classloader. The classloader will be removed if there is no more task
running on the TM. Classloader without reference will be finally
cleanup by GC. Could you share JM and TM logs for further analysis?
I'll also involve @Guowei Ma in this thread.


Best,
Yangze Guo

On Tue, Apr 6, 2021 at 6:05 PM 太平洋 <495635...@qq.com> wrote:
>
> I have tried this method, but the problem still exist.
> by heap dump analysis, is 21 instances of 
> "org.apache.flink.util.ChildFirstClassLoader" normal?
>
>
> ------ 原始邮件 --
> 发件人: "Yangze Guo" ;
> 发送时间: 2021年4月6日(星期二) 下午4:32
> 收件人: "太平洋"<495635...@qq.com>;
> 抄送: "user";
> 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> I think you can try to increase the JVM metaspace option for
> TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
> >
> > batch job:
> > read data from s3 by sql,then by some operators and write data to 
> > clickhouse and kafka.
> > after some times, task-manager quit with OutOfMemoryError: Metaspace.
> >
> > env:
> > flink version:1.12.2
> > task-manager slot count: 5
> > deployment: standalone kubernetes session 模式
> > dependencies:
> >
> > 
> >
> >   org.apache.flink
> >
> >   flink-connector-kafka_2.11
> >
> >   ${flink.version}
> >
> > 
> >
> > 
> >
> >   com.google.code.gson
> >
> >   gson
> >
> >   2.8.5
> >
> > 
> >
> > 
> >
> >   org.apache.flink
> >
> >   flink-connector-jdbc_2.11
> >
> >   ${flink.version}
> >
> > 
> >
> > 
> >
> >   ru.yandex.clickhouse
> >
> >   clickhouse-jdbc
> >
> >   0.3.0
> >
> > 
> >
> > 
> >
> >   org.apache.flink
> >
> > flink-parquet_2.11
> >
> > ${flink.version}
> >
> > 
> >
> > 
> >
> >  org.apache.flink
> >
> >  flink-json
> >
> >  ${flink.version}
> >
> > 
> >
> >
> > heap dump1:
> >
> > Leak Suspects
> >
> > System Overview
> >
> >  Leaks
> >
> >  Overview
> >
> >
> >   Problem Suspect 1
> >
> > 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> > "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 29,656,880 (41.16%) 
> > bytes.
> >
> > Biggest instances:
> >
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73ca2a1e8 - 1,474,760 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d2af820 - 1,474,168 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73cdcaa10 - 1,474,160 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73cf6aab0 - 1,474,160 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dd8 - 1,474,160 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d2bb108 - 1,474,128 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73de202e0 - 1,474,120 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dadc778 - 1,474,112 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d5f70e8 - 1,474,064 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d93aa38 - 1,474,064 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73e179638 - 1,474,064 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dc80418 - 1,474,056 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dfcda60 - 1,474,056 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73e4bcd38 - 1,474,056 
> > (2.05%) bytes.
> > org.a

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread Yangze Guo
I think you can try to increase the JVM metaspace option for
TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
>
> batch job:
> read data from s3 by sql,then by some operators and write data to clickhouse 
> and kafka.
> after some times, task-manager quit with OutOfMemoryError: Metaspace.
>
> env:
> flink version:1.12.2
> task-manager slot count: 5
> deployment: standalone kubernetes session 模式
> dependencies:
>
> 
>
>   org.apache.flink
>
>   flink-connector-kafka_2.11
>
>   ${flink.version}
>
> 
>
> 
>
>   com.google.code.gson
>
>   gson
>
>   2.8.5
>
> 
>
> 
>
>   org.apache.flink
>
>   flink-connector-jdbc_2.11
>
>   ${flink.version}
>
> 
>
> 
>
>   ru.yandex.clickhouse
>
>   clickhouse-jdbc
>
>   0.3.0
>
> 
>
> 
>
>   org.apache.flink
>
> flink-parquet_2.11
>
> ${flink.version}
>
> 
>
> 
>
>  org.apache.flink
>
>  flink-json
>
>  ${flink.version}
>
> 
>
>
> heap dump1:
>
> Leak Suspects
>
> System Overview
>
>  Leaks
>
>  Overview
>
>
>   Problem Suspect 1
>
> 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 29,656,880 (41.16%) 
> bytes.
>
> Biggest instances:
>
> org.apache.flink.util.ChildFirstClassLoader @ 0x73ca2a1e8 - 1,474,760 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d2af820 - 1,474,168 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73cdcaa10 - 1,474,160 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73cf6aab0 - 1,474,160 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dd8 - 1,474,160 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d2bb108 - 1,474,128 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73de202e0 - 1,474,120 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dadc778 - 1,474,112 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d5f70e8 - 1,474,064 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d93aa38 - 1,474,064 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e179638 - 1,474,064 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dc80418 - 1,474,056 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dfcda60 - 1,474,056 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e4bcd38 - 1,474,056 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d6006e8 - 1,474,032 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73c7d2ad8 - 1,461,944 (2.03%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73ca1bb98 - 1,460,752 (2.03%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73bf203f0 - 1,460,744 (2.03%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e3284a8 - 1,445,232 (2.01%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e65de00 - 1,445,232 (2.01%) 
> bytes.
>
>
>
> Keywords
> org.apache.flink.util.ChildFirstClassLoader
> sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0
> Details »
>
>   Problem Suspect 2
>
> 34,407 instances of "org.apache.flink.core.memory.HybridMemorySegment", 
> loaded by "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 7,707,168 
> (10.70%) bytes.
>
> Keywords
> org.apache.flink.core.memory.HybridMemorySegment
> sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0
>
> Details »
>
>
>
> heap dump2:
>
> Leak Suspects
>
> System Overview
>
>  Leaks
>
>  Overview
>
>   Problem Suspect 1
>
> 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 26,061,408 (30.68%) 
> bytes.
>
> Biggest instances:
>
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e9e9930 - 1,474,224 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73edce0b8 - 1,474,224 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73f1ad7d0 - 1,474,168 (1.74%) 
> bytes.
> org.apache.flink.util.C

Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-03-28 Thread Yangze Guo
+1

Best,
Yangze Guo

On Mon, Mar 29, 2021 at 11:31 AM Xintong Song  wrote:
>
> +1
> It's already a matter of fact for a while that we no longer port new features 
> to the Mesos deployment.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Mar 26, 2021 at 10:37 PM Till Rohrmann  wrote:
>>
>> +1 for officially deprecating this component for the 1.13 release.
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 25, 2021 at 1:49 PM Konstantin Knauf  wrote:
>>>
>>> Hi Matthias,
>>>
>>> Thank you for following up on this. +1 to officially deprecate Mesos in the 
>>> code and documentation, too. It will be confusing for users if this 
>>> diverges from the roadmap.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On Thu, Mar 25, 2021 at 12:23 PM Matthias Pohl  
>>> wrote:
>>>>
>>>> Hi everyone,
>>>> considering the upcoming release of Flink 1.13, I wanted to revive the
>>>> discussion about the Mesos support ones more. Mesos is also already listed
>>>> as deprecated in Flink's overall roadmap [1]. Maybe, it's time to align the
>>>> documentation accordingly to make it more explicit?
>>>>
>>>> What do you think?
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> [1] https://flink.apache.org/roadmap.html#feature-radar
>>>>
>>>> On Wed, Oct 28, 2020 at 9:40 AM Till Rohrmann  wrote:
>>>>
>>>> > Hi Oleksandr,
>>>> >
>>>> > yes you are right. The biggest problem is at the moment the lack of test
>>>> > coverage and thereby confidence to make changes. We have some e2e tests
>>>> > which you can find here [1]. These tests are, however, quite coarse 
>>>> > grained
>>>> > and are missing a lot of cases. One idea would be to add a Mesos e2e test
>>>> > based on Flink's end-to-end test framework [2]. I think what needs to be
>>>> > done there is to add a Mesos resource and a way to submit jobs to a Mesos
>>>> > cluster to write e2e tests.
>>>> >
>>>> > [1] https://github.com/apache/flink/tree/master/flink-jepsen
>>>> > [2]
>>>> > https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common
>>>> >
>>>> > Cheers,
>>>> > Till
>>>> >
>>>> > On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi <
>>>> > o.nitavs...@criteo.com> wrote:
>>>> >
>>>> >> Hello Xintong,
>>>> >>
>>>> >> Thanks for the insights and support.
>>>> >>
>>>> >> Browsing the Mesos backlog and didn't identify anything critical, which
>>>> >> is left there.
>>>> >>
>>>> >> I see that there are were quite a lot of contributions to the Flink 
>>>> >> Mesos
>>>> >> in the recent version:
>>>> >> https://github.com/apache/flink/commits/master/flink-mesos.
>>>> >> We plan to validate the current Flink master (or release 1.12 branch) 
>>>> >> our
>>>> >> Mesos setup. In case of any issues, we will try to propose changes.
>>>> >> My feeling is that our test results shouldn't affect the Flink 1.12
>>>> >> release cycle. And if any potential commits will land into the 1.12.1 it
>>>> >> should be totally fine.
>>>> >>
>>>> >> In the future, we would be glad to help you guys with any
>>>> >> maintenance-related questions. One of the highest priorities around this
>>>> >> component seems to be the development of the full e2e test.
>>>> >>
>>>> >> Kind Regards
>>>> >> Oleksandr Nitavskyi
>>>> >> 
>>>> >> From: Xintong Song 
>>>> >> Sent: Tuesday, October 27, 2020 7:14 AM
>>>> >> To: dev ; user 
>>>> >> Cc: Piyush Narang 
>>>> >> Subject: [BULK]Re: [SURVEY] Remove Mesos support
>>>> >>
>>>> >> Hi Piyush,
>>>> >>
>>>> >> Thanks a lot for sharing the information. It would be a great relief 
>>>> >> that
>>>> >> you are good with Flink on Mesos as is.
>>>> >>
>>>> >> 

Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-19 Thread Yangze Guo
Thanks Xintong for the great work!

Best,
Yangze Guo

On Tue, Jan 19, 2021 at 4:47 PM Till Rohrmann  wrote:
>
> Thanks a lot for driving this release Xintong. This was indeed a release with 
> some obstacles to overcome and you did it very well!
>
> Cheers,
> Till
>
> On Tue, Jan 19, 2021 at 5:59 AM Xingbo Huang  wrote:
>>
>> Thanks Xintong for the great work!
>>
>> Best,
>> Xingbo
>>
>> Peter Huang  于2021年1月19日周二 下午12:51写道:
>>
>> > Thanks for the great effort to make this happen. It paves us from using
>> > 1.12 soon.
>> >
>> > Best Regards
>> > Peter Huang
>> >
>> > On Mon, Jan 18, 2021 at 8:16 PM Yang Wang  wrote:
>> >
>> > > Thanks Xintong for the great work as our release manager!
>> > >
>> > >
>> > > Best,
>> > > Yang
>> > >
>> > > Xintong Song  于2021年1月19日周二 上午11:53写道:
>> > >
>> > >> The Apache Flink community is very happy to announce the release of
>> > >> Apache Flink 1.12.1, which is the first bugfix release for the Apache
>> > Flink
>> > >> 1.12 series.
>> > >>
>> > >> Apache Flink® is an open-source stream processing framework for
>> > >> distributed, high-performing, always-available, and accurate data
>> > streaming
>> > >> applications.
>> > >>
>> > >> The release is available for download at:
>> > >> https://flink.apache.org/downloads.html
>> > >>
>> > >> Please check out the release blog post for an overview of the
>> > >> improvements for this bugfix release:
>> > >> https://flink.apache.org/news/2021/01/19/release-1.12.1.html
>> > >>
>> > >> The full release notes are available in Jira:
>> > >>
>> > >>
>> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
>> > >>
>> > >> We would like to thank all contributors of the Apache Flink community
>> > who
>> > >> made this release possible!
>> > >>
>> > >> Regards,
>> > >> Xintong
>> > >>
>> > >
>> >


Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-19 Thread Yangze Guo
Thanks Xintong for the great work!

Best,
Yangze Guo

On Tue, Jan 19, 2021 at 4:47 PM Till Rohrmann  wrote:
>
> Thanks a lot for driving this release Xintong. This was indeed a release with 
> some obstacles to overcome and you did it very well!
>
> Cheers,
> Till
>
> On Tue, Jan 19, 2021 at 5:59 AM Xingbo Huang  wrote:
>>
>> Thanks Xintong for the great work!
>>
>> Best,
>> Xingbo
>>
>> Peter Huang  于2021年1月19日周二 下午12:51写道:
>>
>> > Thanks for the great effort to make this happen. It paves us from using
>> > 1.12 soon.
>> >
>> > Best Regards
>> > Peter Huang
>> >
>> > On Mon, Jan 18, 2021 at 8:16 PM Yang Wang  wrote:
>> >
>> > > Thanks Xintong for the great work as our release manager!
>> > >
>> > >
>> > > Best,
>> > > Yang
>> > >
>> > > Xintong Song  于2021年1月19日周二 上午11:53写道:
>> > >
>> > >> The Apache Flink community is very happy to announce the release of
>> > >> Apache Flink 1.12.1, which is the first bugfix release for the Apache
>> > Flink
>> > >> 1.12 series.
>> > >>
>> > >> Apache Flink® is an open-source stream processing framework for
>> > >> distributed, high-performing, always-available, and accurate data
>> > streaming
>> > >> applications.
>> > >>
>> > >> The release is available for download at:
>> > >> https://flink.apache.org/downloads.html
>> > >>
>> > >> Please check out the release blog post for an overview of the
>> > >> improvements for this bugfix release:
>> > >> https://flink.apache.org/news/2021/01/19/release-1.12.1.html
>> > >>
>> > >> The full release notes are available in Jira:
>> > >>
>> > >>
>> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
>> > >>
>> > >> We would like to thank all contributors of the Apache Flink community
>> > who
>> > >> made this release possible!
>> > >>
>> > >> Regards,
>> > >> Xintong
>> > >>
>> > >
>> >


Re: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 Thread Yangze Guo
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包

Best,
Yangze Guo

On Mon, Jan 18, 2021 at 10:34 AM 刘海  wrote:
>
> 你好
>  根据你的建议我试了一下
> 将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D 
> jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB 
> -D heartbeat.timeout=180  
> /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
>
>
> jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
>
>
> 结果出现找不到jar包的异常:
> org.apache.flink.client.cli.CliArgsException: Could not get job jar and 
> dependencies from JAR file: JAR file does not exist: 1536
> at 
> org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
> at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>  [hadoop-common-3.0.0-cdh6.3.2.jar:?]
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  [flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) 
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
> at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> ... 8 more
>
>
> | |
> 刘海
> |
> |
> liuha...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年1月18日 10:12,Yangze Guo 写道:
> Hi, 请使用 -D -tm -jm 不需要加y前缀
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:
>
>
> 刘海
> liuha...@163.com
> 签名由 网易邮箱大师 定制
> 在2021年1月18日 09:15,刘海 写道:
>
> Hi  Dear All,
> 请教各位一个问题,下面是我的集群配置:
> 1、我现在使用的是flink1.12版本;
> 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
> 3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
> 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:
>
> #==
> # Common 通用设置选项
> #==
> jobmanager.rpc.address: cdh1
>
> # The RPC port where the JobManager is reachable.
> jobmanager.rpc.port: 6123
> # The total process memory size for the JobManager.
> #
> # Note this accounts for all memory usage within the JobManager process, 
> including JVM metaspace and other overhead.
> jobmanager.memory.process.size: 2048m
>
> # The total process memory size for the TaskManager.
> # Note this accounts for all memory usage within the TaskManager process, 
> including JVM metaspace and other overhead.
> taskmanager.memory.process.size: 6144m
>
> # To exclude JVM metaspace and overhead, please, use total Flink memory size 
> instead of 'taskmanager.memory.process.size'.
> # It is not recommended to set both 'taskmanager.memory.process.size' and 
> Flink memory.
> # taskmanager.memory.flink.size: 1280m
> # The number of task slots that each TaskManager offers. Each slot runs one 
> parallel pipeline.
> #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
> #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
> taskmanager.numberOfTaskSlots: 1
> # The parallelism used for programs that did not specify and other 
> parallelism.
> #当未在任何地方指定并行度时使用的默认并行性(默认值:1)
> parallelism.default: 1
> #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
> #taskmanager.host: 0.0.0.0
> # The default file system scheme and authority.
> # By default file paths without scheme are interpreted relative to the local
> # root file system 'file:///'. Use this to override the default and interpret
> # relative paths relative to a different file system,
> # for example 'hdfs://mynamenode:12345'
> #
> # fs.default-scheme
>
> #==
> # High Availability
> #==
> # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
> high-availability: zookeeper
> # The pat

Re: Monitor the Flink

2021-01-17 Thread Yangze Guo
Hi,

First of all, there’s no resource isolation atm between
operators/tasks within a slot, except for managed memory. So,
monitoring of individual tasks might be meaningless.

Regarding TM/JM level cpu/memory metrics, you can refer to [1] and
[2]. Regarding the traffic between tasks, you can refer to [3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#cpu
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#memory
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#default-shuffle-service

Best,
Yangze Guo

On Sun, Jan 17, 2021 at 6:43 PM penguin.  wrote:
>
> Hello,
>
>
> In the Flink cluster,
>
> How to monitor each taskslot of taskmanager? For example, the CPU and memory 
> usage of each slot and the traffic between slots.
>
> What is the way to get the traffic between nodes?
>
> thank you very much!
>
>
> penguin
>
>
>
>


Re: Number of parallel connections for Elasticsearch Connector

2021-01-17 Thread Yangze Guo
Hi, Rex.

> How many connections does the ES connector use to write to Elasticsearch?
I think the number is equal to your parallelism. Each subtask of an
Elasticsearch sink will have its own separate Bulk Processor as both
the Client and the Bulk Processor are class private[1]. The subtasks
will be placed into different slots and have their own Elasticsearch
sink instance.

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L204.

Best,
Yangze Guo

On Sun, Jan 17, 2021 at 11:33 AM Rex Fenley  wrote:
>
> I found the following, indicating that there is no concurrency for the 
> Elasticsearch Connector 
> https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L382
>
> Does each subtask of an Elasticsearch sink have it's own separate Bulk 
> Processor to allow for parallel bulk writes?
>
> Thanks!
>
> On Sat, Jan 16, 2021 at 10:33 AM Rex Fenley  wrote:
>>
>> Hello,
>>
>> How many connections does the ES connector use to write to Elasticsearch? We 
>> have a single machine with 16 vCPUs and parallelism of 4 running our job, 
>> with -p 4 I'd expect there to be 4 parallel bulk request writers / 
>> connections to Elasticsearch. Is there a place in the code to confirm this?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Re: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 Thread Yangze Guo
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:
>
>
> 刘海
> liuha...@163.com
> 签名由 网易邮箱大师 定制
> 在2021年1月18日 09:15,刘海 写道:
>
> Hi  Dear All,
>请教各位一个问题,下面是我的集群配置:
> 1、我现在使用的是flink1.12版本;
> 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
> 3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
> 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:
>
> #==
> # Common 通用设置选项
> #==
> jobmanager.rpc.address: cdh1
>
> # The RPC port where the JobManager is reachable.
> jobmanager.rpc.port: 6123
> # The total process memory size for the JobManager.
> #
> # Note this accounts for all memory usage within the JobManager process, 
> including JVM metaspace and other overhead.
> jobmanager.memory.process.size: 2048m
>
> # The total process memory size for the TaskManager.
> # Note this accounts for all memory usage within the TaskManager process, 
> including JVM metaspace and other overhead.
> taskmanager.memory.process.size: 6144m
>
> # To exclude JVM metaspace and overhead, please, use total Flink memory size 
> instead of 'taskmanager.memory.process.size'.
> # It is not recommended to set both 'taskmanager.memory.process.size' and 
> Flink memory.
> # taskmanager.memory.flink.size: 1280m
> # The number of task slots that each TaskManager offers. Each slot runs one 
> parallel pipeline.
> #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
> #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
> taskmanager.numberOfTaskSlots: 1
> # The parallelism used for programs that did not specify and other 
> parallelism.
> #当未在任何地方指定并行度时使用的默认并行性(默认值:1)
> parallelism.default: 1
> #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
> #taskmanager.host: 0.0.0.0
> # The default file system scheme and authority.
> # By default file paths without scheme are interpreted relative to the local
> # root file system 'file:///'. Use this to override the default and interpret
> # relative paths relative to a different file system,
> # for example 'hdfs://mynamenode:12345'
> #
> # fs.default-scheme
>
> #==
> # High Availability
> #==
> # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
> high-availability: zookeeper
> # The path where metadata for master recovery is persisted. While ZooKeeper 
> stores
> # the small ground truth for checkpoint and leader election, this location 
> stores
> # the larger objects, like persisted dataflow graphs.
> # Must be a durable file system that is accessible from all nodes
> # (like HDFS, S3, Ceph, nfs, ...)
> high-availability.storageDir: hdfs:///flink/ha/
> # The list of ZooKeeper quorum peers that coordinate the high-availability
> # setup. This must be a list of the form:
> # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
> high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
> high-availability.zookeeper.client.acl: open
> high-availability.zookeeper.path.root: /flink
> #==
> # Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
> #==
> state.backend: rocksdb
> #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
> #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
> #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
> state.backend.incremental: true
> #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
> state.backend.local-recovery: true
> #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
> state.backend.rocksdb.block.cache-size: 268435456
> #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
> state.backend.rocksdb.timer-service.factory: HEAP
> # Directory for checkpoints filesystem, when using any of the default bundled
> # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
> state.checkpoints.dir: hdfs:///flink/flink-checkpoints
> # Default target directory for savepoints, optional.
> #保存点的默认目录。由状态后端用于将保存点写入文件系统
> state.savepoints.dir: hdfs:///flink/flink-savepoints
> # 要保留的最大已完成检查点数
> state.checkpoints.num-retained: 3
> #此选项指定作业计算如何从任务失败中恢复。可接受的值为:
> #'full':重新启动所有任务以恢复作业。
> #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
> jobmanager.execution.failover-strategy: region
> #==
> # Advanc

Re: Main class logs in Yarn Mode

2021-01-12 Thread Yangze Guo
I think you can try the application mode[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/#application-mode

Best,
Yangze Guo

On Tue, Jan 12, 2021 at 5:23 PM bat man  wrote:
>
> Thanks Yangze Gua.
> Is there a way these can be redirected to a yarn logs.
>
> On Tue, 12 Jan 2021 at 2:35 PM, Yangze Guo  wrote:
>>
>> The main function of your WordCountExample is executed in your local
>> environment. So, the logs you are looking for ("Entering
>> application.") are be located in your console output and the "log/"
>> directory of your Flink distribution.
>>
>> Best,
>> Yangze Guo
>>
>> On Tue, Jan 12, 2021 at 4:50 PM bat man  wrote:
>> >
>> > Hi,
>> >
>> > I am running a sample job as below -
>> >
>> > public class WordCountExample {
>> > static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>> >
>> > public static void main(String[] args) throws Exception {
>> > final ExecutionEnvironment env = 
>> > ExecutionEnvironment.getExecutionEnvironment();
>> >
>> > logger.info("Entering application.");
>> >
>> > DataSet text = env.fromElements(
>> > "Who's there?",
>> > "I think I hear them. Stand, ho! Who's there?");
>> >
>> > List elements = new ArrayList();
>> > elements.add(0);
>> >
>> >
>> > DataSet set = env.fromElements(new TestClass(elements));
>> >
>> > DataSet> wordCounts = text
>> > .flatMap(new LineSplitter())
>> > .withBroadcastSet(set, "set")
>> > .groupBy(0)
>> > .sum(1);
>> >
>> > wordCounts.print();
>> >
>> > logger.info("Processing done");
>> >
>> > //env.execute("wordcount job complete");
>> >
>> > }
>> >
>> > public static class LineSplitter implements FlatMapFunction> > Tuple2> {
>> >
>> > static Logger loggerLineSplitter = 
>> > LoggerFactory.getLogger(LineSplitter.class);
>> >
>> > @Override
>> > public void flatMap(String line, Collector> out) {
>> > loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>> > for (String word : line.split(" ")) {
>> > out.collect(new Tuple2(word, 1));
>> > }
>> > }
>> > }
>> >
>> > public static class TestClass implements Serializable {
>> > private static final long serialVersionUID = -2932037991574118651L;
>> >
>> > static Logger loggerTestClass = 
>> > LoggerFactory.getLogger("WordCountExample.TestClass");
>> >
>> > List integerList;
>> > public TestClass(List integerList){
>> > this.integerList=integerList;
>> > loggerTestClass.info("Logger in TestClass");
>> > }
>> >
>> >
>> > }
>> > }
>> >
>> > When run in IDE I can see the logs from main class i.e. statements like 
>> > below in console logs -
>> >
>> > 13:40:24.459 [main] INFO  com.flink.transform.WordCountExample - Entering 
>> > application.
>> > 13:40:24.486 [main] INFO  WordCountExample.TestClass - Logger in TestClass
>> >
>> >
>> > When run on Yarn with command - flink run -m yarn-cluster  -c 
>> > com.flink.transform.WordCountExample rt-1.0-jar-with-dependencies.jar
>> >
>> > I only see the flatmap logging statements like -
>> > INFO  com.flink.transform.WordCountExample$LineSplitter - Logger in 
>> > LineSplitter.flatMap
>> > INFO  com.flink.transform.WordCountExample$LineSplitter - Logger in 
>> > LineSplitter.flatMap
>> >
>> > I have checked the jobmanager and taskmanager logs from yarn in EMR.
>> >
>> > This is my log4j.properties from EMR cluster
>> >
>> > log4j.rootLogger=INFO,file,elastic
>> >
>> > # Config ES logging appender
>> > log4j.appender.elastic=com.letfy.log4j.appenders.ElasticSearchClientAppender
>> > log4j.appender.elastic.elasticHost=http://<>:9200
>> > log4j.appender.elastic.hostName=<>
>> > log4j.appender.elastic.applicationName=<>
>> >
>> > # more options (see github project for the full list)
>> > log4j.appender.elastic.elasticIndex=<>
>> > log4j.appender.elastic.elasticType=<>
>> >
>> > # Log all infos in the given file
>> > log4j.appender.file=org.apache.log4j.FileAppender
>> > log4j.appender.file.file=${log.file}
>> > log4j.appender.file.append=false
>> > log4j.appender.file.layout=org.apache.log4j.PatternLayout
>> > log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
>> > %-5p %-60c %x - %m%n
>> >
>> > # suppress the irrelevant (wrong) warnings from the netty channel handler
>> > log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file
>> >
>> >
>> > How can I access main driver logs when run on yarn as master.
>> >
>> > Thanks,
>> > Hemant
>> >
>> >
>> >
>> >


Re: Main class logs in Yarn Mode

2021-01-12 Thread Yangze Guo
The main function of your WordCountExample is executed in your local
environment. So, the logs you are looking for ("Entering
application.") are be located in your console output and the "log/"
directory of your Flink distribution.

Best,
Yangze Guo

On Tue, Jan 12, 2021 at 4:50 PM bat man  wrote:
>
> Hi,
>
> I am running a sample job as below -
>
> public class WordCountExample {
> static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>
> logger.info("Entering application.");
>
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
>
> List elements = new ArrayList();
> elements.add(0);
>
>
> DataSet set = env.fromElements(new TestClass(elements));
>
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
>
> wordCounts.print();
>
> logger.info("Processing done");
>
> //env.execute("wordcount job complete");
>
> }
>
> public static class LineSplitter implements FlatMapFunction Tuple2> {
>
> static Logger loggerLineSplitter = 
> LoggerFactory.getLogger(LineSplitter.class);
>
> @Override
> public void flatMap(String line, Collector> out) {
> loggerLineSplitter.info("Logger in LineSplitter.flatMap");
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
>
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
>
> static Logger loggerTestClass = 
> LoggerFactory.getLogger("WordCountExample.TestClass");
>
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> loggerTestClass.info("Logger in TestClass");
> }
>
>
> }
> }
>
> When run in IDE I can see the logs from main class i.e. statements like below 
> in console logs -
>
> 13:40:24.459 [main] INFO  com.flink.transform.WordCountExample - Entering 
> application.
> 13:40:24.486 [main] INFO  WordCountExample.TestClass - Logger in TestClass
>
>
> When run on Yarn with command - flink run -m yarn-cluster  -c 
> com.flink.transform.WordCountExample rt-1.0-jar-with-dependencies.jar
>
> I only see the flatmap logging statements like -
> INFO  com.flink.transform.WordCountExample$LineSplitter - Logger in 
> LineSplitter.flatMap
> INFO  com.flink.transform.WordCountExample$LineSplitter - Logger in 
> LineSplitter.flatMap
>
> I have checked the jobmanager and taskmanager logs from yarn in EMR.
>
> This is my log4j.properties from EMR cluster
>
> log4j.rootLogger=INFO,file,elastic
>
> # Config ES logging appender
> log4j.appender.elastic=com.letfy.log4j.appenders.ElasticSearchClientAppender
> log4j.appender.elastic.elasticHost=http://<>:9200
> log4j.appender.elastic.hostName=<>
> log4j.appender.elastic.applicationName=<>
>
> # more options (see github project for the full list)
> log4j.appender.elastic.elasticIndex=<>
> log4j.appender.elastic.elasticType=<>
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
> %-60c %x - %m%n
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file
>
>
> How can I access main driver logs when run on yarn as master.
>
> Thanks,
> Hemant
>
>
>
>


Re: task manager内存使用问题

2020-12-17 Thread Yangze Guo
1. 加jvm参数可以使用env.java.opts.taskmanager配置
2. 目前tm中没有对heap memory进行slot间细粒度管理,session模式下不支持这种功能

Best,
Yangze Guo

On Fri, Dec 18, 2020 at 9:22 AM guoliubi...@foxmail.com
 wrote:
>
> Hi,
> 现在使用的是flink1.12,使用standalone cluster模式运行。
> 在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。
> 想问下怎么给task manager的jvm加上heap dump相关参数。
> 还有是否有选项,可以在某个job吃满heap后是kill这个job而不是shutdown整个task manager,因为这个task 
> manager还有其他job在跑,会导致其他job一起fail。
>
>
>
> guoliubi...@foxmail.com


Re: flink sql 1.12 写数据到elasticsearch,部署问题

2020-12-15 Thread Yangze Guo
需要放 flink-sql-connector-elasticsearch7_2.11-1.12.0.jar

Best,
Yangze Guo

On Wed, Dec 16, 2020 at 11:34 AM cljb...@163.com  wrote:
>
> hi,
> flink sql 1.12版本,写数据到elasticsearch时,本地执行正常,部署到服务器上,报如下错误。
> 检查了打的jar包,里面是包含相应的类的,在flink 
> lib下也已经放了flink-connector-elasticsearch7_2.11-1.12.0.jar 包。
> 调整了类的加载,试了child-first和parent-first都不行
> 有遇到类似问题的吗?
> 谢谢!
>
> 错误提示如下:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Could not load service provider for table factories.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
> at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1685)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
> Caused by: org.apache.flink.table.api.TableException: Could not load service 
> provider for table factories.
> at 
> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:218)
> at 
> org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
> at 
> org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
> at 
> org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:263)
> at 
> org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:90)
> at com.searchrec.main.XfkEsIndex.main(XfkEsIndex.java:24)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
> ... 11 more
> Caused by: java.util.ServiceConfigurationError: 
> org.apache.flink.table.factories.TableFactory: Provider 
> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
>  could not be instantiate
> d at java.util.ServiceLoader.fail(ServiceLoader.java:232)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at 
> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
> ... 22 more
> Caused by: java.lang.NoClassDefFoundError: 
> org/elasticsearch/common/xcontent/XContentType
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.(ElasticsearchUpsertTableSinkFactoryBase.java:105)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
> ... 26 more
> Caused by: java.lang.ClassNotFoundException: 
> org.elasticsearch.common.xcontent.XContentType
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 33 more
>
>
>
>
> cljb...@163.com


Re: flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?

2020-12-15 Thread Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Wed, Dec 16, 2020 at 11:34 AM 李世钰  wrote:
>
> flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?
> elasticsearch7.0
>
>
>
>
>
>
> --
>
> --
>
> 李世钰
>
> Mail:m...@lishiyu.cn
>
> TEL:18801236165
>
> Motto:让身边的人快乐,你的身边就充满快乐!
>
>
>
>
>
>
> 
>
>
>
> 


Re: 使用RedisSink无法将读取的Kafka数据写入Redis中

2020-12-06 Thread Yangze Guo
大概率是网络不通,可以检查一下白名单设置

Best,
Yangze Guo

On Mon, Dec 7, 2020 at 10:28 AM Jark Wu  wrote:
>
> 这个估计和网络和部署有关,建议咨询下华为云的技术支持。
>
> On Sun, 6 Dec 2020 at 20:40, 赵一旦  wrote:
>
> > 连接不上,你的华为云确认和redis服务器连通吗?
> >
> > 追梦的废柴  于2020年12月6日周日 下午8:35写道:
> >
> > > 各位:
> > > 晚上好!
> > > 现在我所在的项目组在调研Flink框架,有一个指标需要读取Kafka中的数据然后使用Redis存储最终的结果。
> > >
> > >
> > 我们在pom文件中引入了flink-redis的connector,然后按照官方的RedisSink案例,在本地开发的时候可以正常写入到某台服务器上的Redis中,
> > > 但是当我把程序打成Jar包之后,部署到服务器(华为云MRS)上使用flink
> > > run提交到yarn之后总是在报错,无法写入到Redis中,各位知道是为什么吗?
> > > 问题已经卡了我两天了,一点进展都没有,有劳各位帮忙解答一下,Thank you!
> > > 报错如下:
> > > redis.client.jedis.exceptions.JedisConnectionException:Could not get a
> > > resource from the pool at .
> > >
> > >
> >


Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Yangze Guo
My gut feeling is your "vmArgs" does not take effect.

Best,
Yangze Guo

On Mon, Dec 7, 2020 at 10:32 AM Yangze Guo  wrote:
>
> Hi, Rex,
>
> Can you share more logs for it. Did you see something like "The
> configuration option taskmanager.cpu.cores required for local
> execution is not set, setting it to" in your logs?
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Sat, Dec 5, 2020 at 6:53 PM David Anderson  wrote:
> >
> > taskmanager.cpu.cores is intended for internal use only -- you aren't meant 
> > to set this option. What happens if you leave it alone?
> >
> > Regards,
> > David
> >
> >
> > On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
> >>
> >> We're running this in a local environment so that may be contributing to 
> >> what we're seeing.
> >>
> >> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
> >>>
> >>> Hello,
> >>>
> >>> I'm tuning flink for parallelism right now and when I look at the 
> >>> JobManager I see
> >>> taskmanager.cpu.cores1.7976931348623157E308
> >>> Which looks like the maximum double number.
> >>>
> >>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper threading. 
> >>> We have 37 operators so we rounded up and set 40 task slots.
> >>>
> >>> Here is our configuration
> >>>
> >>> "vmArgs": "-Xmx16g -Xms16g -XX:MaxDirectMemorySize=1207959552 
> >>> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/tmp/flink.log 
> >>> -Dtaskmanager.memory.framework.off-heap.size=134217728b 
> >>> -Dtaskmanager.memory.network.max=1073741824b 
> >>> -Dtaskmanager.memory.network.min=1073741824b 
> >>> -Dtaskmanager.memory.framework.heap.size=134217728b 
> >>> -Dtaskmanager.memory.managed.size=6335076856b 
> >>> -Dtaskmanager.memory.task.heap.size=8160437768b 
> >>> -Dtaskmanager.memory.task.off-heap.size=0b 
> >>> -Dtaskmanager.numberOfTaskSlots=40 -Dtaskmanager.cpu.cores=16.0"
> >>>
> >>> We then tried with -Dtaskmanager.cpu.cores=7.0 and still ended up with 
> >>> that very odd value for cpu cores.
> >>>
> >>> How do we correctly adjust this?
> >>>
> >>> Thanks!
> >>> --
> >>>
> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>>
> >>>
> >>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
> >>
> >>
> >>
> >> --
> >>
> >> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Yangze Guo
Hi, Rex,

Can you share more logs for it. Did you see something like "The
configuration option taskmanager.cpu.cores required for local
execution is not set, setting it to" in your logs?

Best,
Yangze Guo

Best,
Yangze Guo


On Sat, Dec 5, 2020 at 6:53 PM David Anderson  wrote:
>
> taskmanager.cpu.cores is intended for internal use only -- you aren't meant 
> to set this option. What happens if you leave it alone?
>
> Regards,
> David
>
>
> On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
>>
>> We're running this in a local environment so that may be contributing to 
>> what we're seeing.
>>
>> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
>>>
>>> Hello,
>>>
>>> I'm tuning flink for parallelism right now and when I look at the 
>>> JobManager I see
>>> taskmanager.cpu.cores1.7976931348623157E308
>>> Which looks like the maximum double number.
>>>
>>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper threading. We 
>>> have 37 operators so we rounded up and set 40 task slots.
>>>
>>> Here is our configuration
>>>
>>> "vmArgs": "-Xmx16g -Xms16g -XX:MaxDirectMemorySize=1207959552 
>>> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/tmp/flink.log 
>>> -Dtaskmanager.memory.framework.off-heap.size=134217728b 
>>> -Dtaskmanager.memory.network.max=1073741824b 
>>> -Dtaskmanager.memory.network.min=1073741824b 
>>> -Dtaskmanager.memory.framework.heap.size=134217728b 
>>> -Dtaskmanager.memory.managed.size=6335076856b 
>>> -Dtaskmanager.memory.task.heap.size=8160437768b 
>>> -Dtaskmanager.memory.task.off-heap.size=0b 
>>> -Dtaskmanager.numberOfTaskSlots=40 -Dtaskmanager.cpu.cores=16.0"
>>>
>>> We then tried with -Dtaskmanager.cpu.cores=7.0 and still ended up with that 
>>> very odd value for cpu cores.
>>>
>>> How do we correctly adjust this?
>>>
>>> Thanks!
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Yangze Guo
Hi,

There is a login operation in
YarnEntrypointUtils.logYarnEnvironmentInformation without the keytab.
One suspect is that Flink may access the HDFS when it tries to build
the PackagedProgram.

Does this issue only happen in the application mode? If so, I would cc
@kkloudas.

Best,
Yangze Guo

On Tue, Nov 17, 2020 at 4:52 PM Yangze Guo  wrote:
>
> Hi,
>
> AFAIK, Flink does exclude the HDFS_DELEGATION_TOKEN in the
> HadoopModule when user provides the keytab and principal. I'll try to
> do a deeper investigation to figure out is there any HDFS access
> before the HadoopModule installed.
>
> Best,
> Yangze Guo
>
>
> On Tue, Nov 17, 2020 at 4:36 PM Kien Truong  wrote:
> >
> > Hi,
> >
> > Yes, I did. There're also logs about logging in using keytab successfully 
> > in both Job Manager and Task Manager.
> >
> > I found some YARN docs about token renewal on AM restart
> >
> >
> > > Therefore, to survive AM restart after token expiry, your AM has to get 
> > > the NMs to localize the keytab or make no HDFS accesses until (somehow) a 
> > > new token has been passed to them from a client.
> >
> > Maybe Flink did access HDFS with an expired token, before switching to use 
> > the localized keytab ?
> >
> > Regards,
> > Kien
> >
> >
> >
> > On 17 Nov 2020 at 15:14, Yangze Guo  wrote:
> >
> > Hi, Kien,
> >
> >
> >
> > Do you config the "security.kerberos.login.principal" and the
> >
> > "security.kerberos.login.keytab" together? If you only set the keytab,
> >
> > it will not take effect.
> >
> >
> >
> > Best,
> >
> > Yangze Guo
> >
> >
> >
> > On Tue, Nov 17, 2020 at 3:03 PM Kien Truong  wrote:
> >
> > >
> >
> > > Hi all,
> >
> > >
> >
> > > We are having an issue where Flink Application Master is unable to 
> > > automatically restart Flink job after its delegation token has expired.
> >
> > >
> >
> > > We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster 
> > > mode. We have also add valid keytab configuration and taskmanagers are 
> > > able to login with keytabs correctly. However, it seems YARN Application 
> > > Master still use delegation tokens instead of the keytab.
> >
> > >
> >
> > > Any idea how to resolve this would be much appreciated.
> >
> > >
> >
> > > Thanks
> >
> > > Kien
> >
> > >
> >
> > >
> >
> > >
> >
> > >
> >


Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Yangze Guo
Hi,

AFAIK, Flink does exclude the HDFS_DELEGATION_TOKEN in the
HadoopModule when user provides the keytab and principal. I'll try to
do a deeper investigation to figure out is there any HDFS access
before the HadoopModule installed.

Best,
Yangze Guo


On Tue, Nov 17, 2020 at 4:36 PM Kien Truong  wrote:
>
> Hi,
>
> Yes, I did. There're also logs about logging in using keytab successfully in 
> both Job Manager and Task Manager.
>
> I found some YARN docs about token renewal on AM restart
>
>
> > Therefore, to survive AM restart after token expiry, your AM has to get the 
> > NMs to localize the keytab or make no HDFS accesses until (somehow) a new 
> > token has been passed to them from a client.
>
> Maybe Flink did access HDFS with an expired token, before switching to use 
> the localized keytab ?
>
> Regards,
> Kien
>
>
>
> On 17 Nov 2020 at 15:14, Yangze Guo  wrote:
>
> Hi, Kien,
>
>
>
> Do you config the "security.kerberos.login.principal" and the
>
> "security.kerberos.login.keytab" together? If you only set the keytab,
>
> it will not take effect.
>
>
>
> Best,
>
> Yangze Guo
>
>
>
> On Tue, Nov 17, 2020 at 3:03 PM Kien Truong  wrote:
>
> >
>
> > Hi all,
>
> >
>
> > We are having an issue where Flink Application Master is unable to 
> > automatically restart Flink job after its delegation token has expired.
>
> >
>
> > We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster 
> > mode. We have also add valid keytab configuration and taskmanagers are able 
> > to login with keytabs correctly. However, it seems YARN Application Master 
> > still use delegation tokens instead of the keytab.
>
> >
>
> > Any idea how to resolve this would be much appreciated.
>
> >
>
> > Thanks
>
> > Kien
>
> >
>
> >
>
> >
>
> >
>


Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Yangze Guo
Hi, Kien,

Do you config the "security.kerberos.login.principal" and the
"security.kerberos.login.keytab" together? If you only set the keytab,
it will not take effect.

Best,
Yangze Guo

On Tue, Nov 17, 2020 at 3:03 PM Kien Truong  wrote:
>
> Hi all,
>
> We are having an issue where Flink Application Master is unable to 
> automatically restart Flink job after its delegation token has expired.
>
> We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster mode. 
> We have also add valid keytab configuration and taskmanagers are able to 
> login with keytabs correctly. However, it seems YARN Application Master still 
> use delegation tokens instead of the keytab.
>
> Any idea how to resolve this would be much appreciated.
>
> Thanks
> Kien
>
>
>
>


Re: Re: flink tm cpu cores设置

2020-11-10 Thread Yangze Guo
你这个少了一个"v", 应该是yarn.containers.vcores

Best,
Yangze Guo

On Tue, Nov 10, 2020 at 3:43 PM zjfpla...@hotmail.com
 wrote:
>
> JM logs里面有
> Loading configuration property: yarn.containers.cores,4
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: zjfpla...@hotmail.com
> 发送时间: 2020-11-10 15:33
> 收件人: user-zh
> 主题: Re: Re: flink tm cpu cores设置
> 当设为5的时候 tm cpu cores为4,当设为4的时候,tm cpu cores还是为4
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: Yangze Guo
> 发送时间: 2020-11-09 10:19
> 收件人: user-zh
> 主题: Re: Re: flink tm cpu cores设置
> 如何确认没有用的呢?能分享一下jm日志么?
> 另外这个参数实际是否生效也取决于yarn的调度器是否开启了cpu调度
>
> Best,
> Yangze Guo
>
> On Thu, Nov 5, 2020 at 1:50 PM zjfpla...@hotmail.com
>  wrote:
> >
> > 这个再flink-conf.yaml中设置过没用
> >
> >
> >
> > zjfpla...@hotmail.com
> >
> > 发件人: JasonLee
> > 发送时间: 2020-11-05 13:49
> > 收件人: user-zh
> > 主题: Re: flink tm cpu cores设置
> > hi 设置yarn.containers.vcores这个参数就可以了
> >
> >
> >
> > -
> > Best Wishes
> > JasonLee
> > --
> > Sent from: 
> > https://nam12.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-flink.147419.n8.nabble.com%2Fdata=04%7C01%7C%7Cb671605ec41446d8662d08d88455e3ee%7C84df9e7fe9f640afb435%7C1%7C0%7C637404851712945054%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000sdata=NVzrOsZuELZMWsx%2B9PjXjB8F4J4184pILw65N3r%2FOQQ%3Dreserved=0


Re: Re: flink tm cpu cores设置

2020-11-08 Thread Yangze Guo
如何确认没有用的呢?能分享一下jm日志么?
另外这个参数实际是否生效也取决于yarn的调度器是否开启了cpu调度

Best,
Yangze Guo

On Thu, Nov 5, 2020 at 1:50 PM zjfpla...@hotmail.com
 wrote:
>
> 这个再flink-conf.yaml中设置过没用
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: JasonLee
> 发送时间: 2020-11-05 13:49
> 收件人: user-zh
> 主题: Re: flink tm cpu cores设置
> hi 设置yarn.containers.vcores这个参数就可以了
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-1.11.2提交到yarn一直处于CREATED中

2020-11-03 Thread Yangze Guo
有更完整的am日志么?需要看一下rm那边资源申请情况。

Best,
Yangze Guo

On Wed, Nov 4, 2020 at 11:45 AM 酷酷的浑蛋  wrote:
>
>
>
> 下面是报错,说是没有资源,但资源是充足的,之后我把版本改为1.11.1,任务就可以运行了
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate the required slot within slot request timeout. Please make 
> sure that the cluster has enough resources.
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$21(FutureUtils.java:1132)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1036)
>  ~[release-s
> Causedby:java.util.concurrent.CompletionException:java.util.concurrent.TimeoutException
> atjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)~[?:1.8.0_191]
> ...25more
> Causedby:java.util.concurrent.TimeoutException
> ...23more
> 在2020年11月4日 11:20,Guowei Ma 写道:
> hi,
> 有看过am的日志没有,日志中有报什么异常么?
>
> Best,
> Guowei
>
>
> On Wed, Nov 4, 2020 at 11:04 AM 酷酷的浑蛋  wrote:
>
>
> flink-1.11.2提交到yarn一直处于CREATED中,不会运行,flink-1.11.1没问题
> 资源已经分配
>
>


Re: yarn部署模式kerberos问题

2020-11-03 Thread Yangze Guo
你好,

请问描述中的"在客户端通过Kerberos权限认证指定用户"指的具体是什么操作?

-yD security.kerberos.login.principal=xxx -yD security.kerberos.login.keytab=xxx
这两个参数的作用是在Flink中enable
HadoopModule,这个Module利用UserGroupInformation来处理Kerberos认证。同时在Yarn部署中,会帮你把这个Keytab上传到yarn中的container里。

可以参照社区文档再看一下[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html

Best,
Yangze Guo

On Tue, Nov 3, 2020 at 4:17 PM amen...@163.com  wrote:
>
> hi everyone,
>
> 最近使用flink-1.11.1在通过per-job方式提交任务到yarn队列的时候,碰到了kerberos权限认证问题。
>
> 具体描述:在客户端通过Kerberos权限认证指定用户,提交flink任务到yarn队列,正常提交,但是当任务被yarn分配到指定节点进行执行时,根据报错信息来看,是因为需要操作hdfs(创建检查点目录和保存点目录,因为我使用FileSystem
>  StateBackend)而并没有获得操作hdfs的权限,被kerberos常规的拦截了。
>
> 所以我通过查找社区邮件了解到,使用-yD参数可以避免这个问题,但是理论上来说在客户端通过认证并成功提交到yarn之后,无论是任务提交节点还是任务执行节点,权限都应该互通吗?
>
> 这里的-yD security.kerberos.login.principal=xxx -yD 
> security.kerberos.login.keytab=xxx是纯粹为了解决这类问题而使用的吗?帮忙解惑~
>
> best,
> amenhub


Re: Increase in parallelism has very bad impact on performance

2020-11-02 Thread Yangze Guo
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner  wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed 
> from Kafka and then creates time windows keyed by some field, and apply an 
> aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events 
> per second (so also 1.6K events per slot). With parallelism 5, that goes down 
> to 1.2K events per slot, and when I increase the parallelism to 10, it drops 
> to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total 
> throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run 
> on the same machine, causing it's CPU to spike and probably, this is the 
> reason that the throughput dramatically decreases. After increasing the 
> parallelism to 15 and now tasks run on 2/3 machines, the average throughput 
> per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


Re: flink1.11 elasticsearch connector

2020-10-29 Thread Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 29, 2020 at 3:37 PM 赵帅  wrote:
>
> elasticsearch7.6有账号认证,目前flink1.11 elasticsearch connector sql api如何加入账号认证?


Re: No pooled slot available and request to ResourceManager for new slot failed

2020-10-28 Thread Yangze Guo
Hi,

你job的并发是多少?一共请求了多少个slot?
方便的话最好发一下jm的日志来帮助排查

Best,
Yangze Guo

On Thu, Oct 29, 2020 at 10:07 AM marble.zh...@coinflex.com.INVALID
 wrote:
>
> 大家好。
>
> 只有一个job,设置了jm/tm各总内存为3G,一个taskmanager,总共10个slot,为什么还是报这个错?
>
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> No pooled slot available and request to ResourceManager for new slot failed
> ... 27 more
>
> 有没有一些建议,谢谢。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>
> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>  TABLE myUserTable (
>   user_id STRING,
>   user_name STRING
>   uv BIGINT,
>   pv BIGINT,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'http://localhost:9200',
>   'index' = 'users'
> );Connector Options
> | Option | Required | Default | Type | Description |
> |
> connector
> | required | (none) | String | Specify what connector to use, valid values 
> are:
> elasticsearch-6: connect to Elasticsearch 6.x cluster
> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> |
> |
> hosts
> | required | (none) | String | One or more Elasticsearch hosts to connect to, 
> e.g. 'http://host_name:9092;http://host_name:9093'. |
> |
> index
> | required | (none) | String | Elasticsearch index for every record. Can be a 
> static index (e.g. 'myIndex') or a dynamic index (e.g. 
> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for more 
> details. |
> |
> document-type
> | required in 6.x | (none) | String | Elasticsearch document type. Not 
> necessary anymore in elasticsearch-7. |
> |
> document-id.key-delimiter
> | optional | _ | String | Delimiter for composite keys ("_" by default), 
> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> |
> failure-handler
> | optional | fail | String | Failure handling strategy in case a request to 
> Elasticsearch fails. Valid strategies are:
> fail: throws an exception if a request fails and thus causes a job failure.
> ignore: ignores failures and drops the request.
> retry_rejected: re-adds requests that have failed due to queue capacity 
> saturation.
> custom class name: for failure handling with a ActionRequestFailureHandler 
> subclass.
> |
> |
> sink.flush-on-checkpoint
> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
> sink will not wait for all pending action requests to be acknowledged by 
> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
> guarantees for at-least-once delivery of action requests. |
> |
> sink.bulk-flush.max-actions
> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
> request. Can be set to '0' to disable it. |
> |
> sink.bulk-flush.max-size
> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
> per bulk request. Must be in MB granularity. Can be set to '0' to disable it. 
> |
> |
> sink.bulk-flush.interval
> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
> allowing for complete async processing of buffered actions. |
> |
> sink.bulk-flush.backoff.strategy
> | optional | DISABLED | String | Specify how to perform retries if any flush 
> actions failed due to a temporary request error. Valid strategies are:
> DISABLED: no retry performed, i.e. fail after the first request error.
> CONSTANT: wait for backoff delay between retries.
> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
> between retries.
> |
> |
> sink.bulk-flush.backoff.max-retries
> | optional | 8 | Integer | Maximum number of backoff retries. |
> |
> sink.bulk-flush.backoff.delay
> | optional | 50ms | Duration | Delay between each backoff attempt. For 
> CONSTANT backoff, this is simply the delay between each retry. For 
> EXPONENTIAL backoff, this is the initial base delay. |
> |
> connection.max-retry-timeout
> | optional | (none) | Duration | Maximum timeout between retries. |
> |
> connection.path-prefix
> | optional | (none) | String | Prefix string to be added to every REST 
> communication, e.g., '/v1' |
> |
> format
> | optional | json | String | Elasticsearch connector supports to specify a 
> format. The format must produce a valid json document. By default uses 
> built-in 'json' format. Please refer to JSON Format page for more details. |
>
>
>
>
>
>
>


Re: flink slot资源隔离

2020-10-20 Thread Yangze Guo
Managed Memory是隔离的,Heap,Network都是TM级别共享

Best,
Yangze Guo

On Wed, Oct 21, 2020 at 10:06 AM 赵一旦  wrote:
>
> flink slot的资源隔离,内存会真实隔离嘛?cpu肯定不会。


Re: Flink multiple task managers setup

2020-09-21 Thread Yangze Guo
Hi,

As the error message said, it could not find the flink-dist.jar in
"/cygdrive/d/Apacheflink/dist/apache-flink-1.9.3/deps/lib". Where is
your flink distribution and do you change the directory structure of
it?

Best,
Yangze Guo

On Mon, Sep 21, 2020 at 5:31 PM saksham sapra  wrote:
>
> HI,
>
>  i installed cygdrive and tried to run start-cluster.sh where zookeeper is up 
> and running and defined one job manager and one task manager,
> but getting this issue.
>
> $ start-cluster.sh start
> Starting HA cluster with 1 masters.
> -zTheFIND: Invalid switch
>  system cannot find the file specified.
> [ERROR] Flink distribution jar not found in 
> /cygdrive/d/Apacheflink/dist/apache-flink-1.9.3/deps/lib.
> File not found - 
> D:\Apacheflink\dist\apache-flink-1.9.3\deps/log/flink--standalo
> nesession-7-PLRENT-5LC73H2*
> Starting standalonesession daemon on host PLRENT-5LC73H2.
> -zThe system cannot FIND: Invalid switch
> find the file specified.
> [ERROR] Flink distribution jar not found in 
> /cygdrive/d/Apacheflink/dist/apache-flink-1.9.3/deps/lib.
> File not found - 
> D:\Apacheflink\dist\apache-flink-1.9.3\deps/log/flink--taskexec
> utor-7-PLRENT-5LC73H2*
> Starting taskexecutor daemon on host PLRENT-5LC73H2.
>>>
>>>


Re: Flink multiple task managers setup

2020-09-17 Thread Yangze Guo
Sorry that the community decided to not maintain it anymore, you could
take a look at [1].

[1] 
https://lists.apache.org/thread.html/r7693d0c06ac5ced9a34597c662bcf37b34ef8e799c32cc0edee373b2%40%3Cdev.flink.apache.org%3E

Best,
Yangze Guo

On Thu, Sep 17, 2020 at 5:21 PM saksham sapra  wrote:
>
> Thanks Yangze, So should i raise a JIRA Ticket for the same on the flink 
> community group?
>
> Thanks & Regards,
> Saksham
>
> On Thu, Sep 17, 2020 at 2:38 PM Yangze Guo  wrote:
>>
>> Hi,
>>
>> It seems you run it in Windows. In that case, only start-cluster.bat
>> could be used. However, this script could only start one TM[1] no
>> matter how you configure the slaves/workers.
>>
>> [1] 
>> https://github.com/apache/flink/blob/release-1.9/flink-dist/src/main/flink-bin/bin/start-cluster.bat
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Sep 17, 2020 at 4:53 PM saksham sapra  
>> wrote:
>> >
>> > HI Yangze,
>> >
>> > I tried to run start-cluster.sh and i can see in host , when flink tries 
>> > to run second task manager or executor, pop up or host gets closed.
>> > Please find attached logs for two command : start-cluster.sh and 
>> > start-cluster.bat.
>> >
>> > Thanks & Regards,
>> > Saksham
>> >
>> > On Thu, Sep 17, 2020 at 2:00 PM Yangze Guo  wrote:
>> >>
>> >> Hi,
>> >>
>> >> > I wasnt having "workers" file in conf/workers so i created one, but i 
>> >> > have "slaves" file in  conf/workers, so i edited both two localhost 
>> >> > like screenshot given below :
>> >> Yes, for 1.9.3, you need to edit the 'slaves' file.
>> >>
>> >> I think we need more information to figure out what happened.
>> >> - What is the output when you execute ./bin/start-cluster.sh, could
>> >> you see two "Starting taskexecutor daemon on host" lines?
>> >> - Could you see two flink-xxx-taskexecutor-xxx.log in $FLINK_DIST/log?
>> >> If so, could you share these two log files?
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >>
>> >> On Thu, Sep 17, 2020 at 4:06 PM saksham sapra  
>> >> wrote:
>> >> >
>> >> > Hi Yangze,
>> >> >
>> >> > Thanks for replying, but i still have some questions.
>> >> > I wasnt having "workers" file in conf/workers so i created one, but i 
>> >> > have "slaves" file in  conf/workers, so i edited both two localhost 
>> >> > like screenshot given below :
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > and then again started flink , but i can see only one task manager
>> >> >
>> >> >
>> >> > Please find my config.yaml file attached.
>> >> >
>> >> >
>> >> > Thanks for helping.
>> >> >
>> >> > Thanks & Regards,
>> >> > Saksham Sapra
>> >> >
>> >> > On Thu, Sep 17, 2020 at 12:57 PM Yangze Guo  wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> From my understanding, you want to set up a standalone cluster in your
>> >> >> local machine. If that is the case, you could simply edit the
>> >> >> $FLINK_DIST/conf/workers, in which each line represents a TM host. By
>> >> >> default, there is only one TM in localhost. In your case, you could
>> >> >> add a line 'localhost' to it. Then, execute the
>> >> >> $FLINK_DIST/bin/start-cluster.sh, you could see a standalone cluster
>> >> >> with two TM in your local machine.
>> >> >>
>> >> >> [1] 
>> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/cluster_setup.html#configuring-flink
>> >> >>
>> >> >> Best,
>> >> >> Yangze Guo
>> >> >>
>> >> >> On Thu, Sep 17, 2020 at 3:16 PM saksham sapra 
>> >> >>  wrote:
>> >> >> >
>> >> >> > Hi ,
>> >> >> >
>> >> >> > I am unable to set two task managers in my local machine and neither 
>> >> >> > any documentation provided for the same.
>> >> >> >
>> >> >> > I want to run a parallel job in two task managers using flink.
>> >> >> > kindly help me with the same, how can i set up in my local without 
>> >> >> > using any zookeeper or something.
>> >> >> >
>> >> >> >
>> >> >> > Thanks & Regards,
>> >> >> > Saksham Sapra
>> >> >> >
>> >> >> >


Re: Flink multiple task managers setup

2020-09-17 Thread Yangze Guo
Hi,

It seems you run it in Windows. In that case, only start-cluster.bat
could be used. However, this script could only start one TM[1] no
matter how you configure the slaves/workers.

[1] 
https://github.com/apache/flink/blob/release-1.9/flink-dist/src/main/flink-bin/bin/start-cluster.bat

Best,
Yangze Guo

On Thu, Sep 17, 2020 at 4:53 PM saksham sapra  wrote:
>
> HI Yangze,
>
> I tried to run start-cluster.sh and i can see in host , when flink tries to 
> run second task manager or executor, pop up or host gets closed.
> Please find attached logs for two command : start-cluster.sh and 
> start-cluster.bat.
>
> Thanks & Regards,
> Saksham
>
> On Thu, Sep 17, 2020 at 2:00 PM Yangze Guo  wrote:
>>
>> Hi,
>>
>> > I wasnt having "workers" file in conf/workers so i created one, but i have 
>> > "slaves" file in  conf/workers, so i edited both two localhost like 
>> > screenshot given below :
>> Yes, for 1.9.3, you need to edit the 'slaves' file.
>>
>> I think we need more information to figure out what happened.
>> - What is the output when you execute ./bin/start-cluster.sh, could
>> you see two "Starting taskexecutor daemon on host" lines?
>> - Could you see two flink-xxx-taskexecutor-xxx.log in $FLINK_DIST/log?
>> If so, could you share these two log files?
>>
>> Best,
>> Yangze Guo
>>
>> Best,
>> Yangze Guo
>>
>>
>> On Thu, Sep 17, 2020 at 4:06 PM saksham sapra  
>> wrote:
>> >
>> > Hi Yangze,
>> >
>> > Thanks for replying, but i still have some questions.
>> > I wasnt having "workers" file in conf/workers so i created one, but i have 
>> > "slaves" file in  conf/workers, so i edited both two localhost like 
>> > screenshot given below :
>> >
>> >
>> >
>> >
>> >
>> >
>> > and then again started flink , but i can see only one task manager
>> >
>> >
>> > Please find my config.yaml file attached.
>> >
>> >
>> > Thanks for helping.
>> >
>> > Thanks & Regards,
>> > Saksham Sapra
>> >
>> > On Thu, Sep 17, 2020 at 12:57 PM Yangze Guo  wrote:
>> >>
>> >> Hi,
>> >>
>> >> From my understanding, you want to set up a standalone cluster in your
>> >> local machine. If that is the case, you could simply edit the
>> >> $FLINK_DIST/conf/workers, in which each line represents a TM host. By
>> >> default, there is only one TM in localhost. In your case, you could
>> >> add a line 'localhost' to it. Then, execute the
>> >> $FLINK_DIST/bin/start-cluster.sh, you could see a standalone cluster
>> >> with two TM in your local machine.
>> >>
>> >> [1] 
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/cluster_setup.html#configuring-flink
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Thu, Sep 17, 2020 at 3:16 PM saksham sapra  
>> >> wrote:
>> >> >
>> >> > Hi ,
>> >> >
>> >> > I am unable to set two task managers in my local machine and neither 
>> >> > any documentation provided for the same.
>> >> >
>> >> > I want to run a parallel job in two task managers using flink.
>> >> > kindly help me with the same, how can i set up in my local without 
>> >> > using any zookeeper or something.
>> >> >
>> >> >
>> >> > Thanks & Regards,
>> >> > Saksham Sapra
>> >> >
>> >> >


Re: Flink multiple task managers setup

2020-09-17 Thread Yangze Guo
Hi,

> I wasnt having "workers" file in conf/workers so i created one, but i have 
> "slaves" file in  conf/workers, so i edited both two localhost like 
> screenshot given below :
Yes, for 1.9.3, you need to edit the 'slaves' file.

I think we need more information to figure out what happened.
- What is the output when you execute ./bin/start-cluster.sh, could
you see two "Starting taskexecutor daemon on host" lines?
- Could you see two flink-xxx-taskexecutor-xxx.log in $FLINK_DIST/log?
If so, could you share these two log files?

Best,
Yangze Guo

Best,
Yangze Guo


On Thu, Sep 17, 2020 at 4:06 PM saksham sapra  wrote:
>
> Hi Yangze,
>
> Thanks for replying, but i still have some questions.
> I wasnt having "workers" file in conf/workers so i created one, but i have 
> "slaves" file in  conf/workers, so i edited both two localhost like 
> screenshot given below :
>
>
>
>
>
>
> and then again started flink , but i can see only one task manager
>
>
> Please find my config.yaml file attached.
>
>
> Thanks for helping.
>
> Thanks & Regards,
> Saksham Sapra
>
> On Thu, Sep 17, 2020 at 12:57 PM Yangze Guo  wrote:
>>
>> Hi,
>>
>> From my understanding, you want to set up a standalone cluster in your
>> local machine. If that is the case, you could simply edit the
>> $FLINK_DIST/conf/workers, in which each line represents a TM host. By
>> default, there is only one TM in localhost. In your case, you could
>> add a line 'localhost' to it. Then, execute the
>> $FLINK_DIST/bin/start-cluster.sh, you could see a standalone cluster
>> with two TM in your local machine.
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/cluster_setup.html#configuring-flink
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Sep 17, 2020 at 3:16 PM saksham sapra  
>> wrote:
>> >
>> > Hi ,
>> >
>> > I am unable to set two task managers in my local machine and neither any 
>> > documentation provided for the same.
>> >
>> > I want to run a parallel job in two task managers using flink.
>> > kindly help me with the same, how can i set up in my local without using 
>> > any zookeeper or something.
>> >
>> >
>> > Thanks & Regards,
>> > Saksham Sapra
>> >
>> >


Re: Flink multiple task managers setup

2020-09-17 Thread Yangze Guo
Hi,

>From my understanding, you want to set up a standalone cluster in your
local machine. If that is the case, you could simply edit the
$FLINK_DIST/conf/workers, in which each line represents a TM host. By
default, there is only one TM in localhost. In your case, you could
add a line 'localhost' to it. Then, execute the
$FLINK_DIST/bin/start-cluster.sh, you could see a standalone cluster
with two TM in your local machine.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/cluster_setup.html#configuring-flink

Best,
Yangze Guo

On Thu, Sep 17, 2020 at 3:16 PM saksham sapra  wrote:
>
> Hi ,
>
> I am unable to set two task managers in my local machine and neither any 
> documentation provided for the same.
>
> I want to run a parallel job in two task managers using flink.
> kindly help me with the same, how can i set up in my local without using any 
> zookeeper or something.
>
>
> Thanks & Regards,
> Saksham Sapra
>
>


Re: Use of slot sharing groups causing workflow to hang

2020-09-09 Thread Yangze Guo
Hi, Ken

>From the RM perspective, could you share the following logs:
- "Request slot with profile {} for job {} with allocation id {}.".
- "Requesting new slot [{}] and profile {} with allocation id {} from
resource manager."
This will help to figure out how many slots your job indeed requests.
And probably help to figure out what the ExecutionGraph finally looks
like.


Best,
Yangze Guo

On Thu, Sep 10, 2020 at 10:47 AM Ken Krugler
 wrote:
>
> Hi Til,
>
> On Sep 3, 2020, at 12:31 AM, Till Rohrmann  wrote:
>
> Hi Ken,
>
> I believe that we don't have a lot if not any explicit logging about the slot 
> sharing group in the code. You can, however, learn indirectly about it by 
> looking at the required number of AllocatedSlots in the SlotPool. Also the 
> number of "multi task slot" which are created should vary because every group 
> of slot sharing tasks will create one of them. For learning about the 
> SlotPoolImpl's status, you can also take a look at SlotPoolImpl.printStatus.
>
> For the underlying problem, I believe that Yangze could be right. How many 
> resources do you have in your cluster?
>
>
> I've got a Flink MiniCluster with 12 slots. Even with only 6 pipelined
> operators, each with a parallelism of 1, it still hangs while starting. So
> I don't think that it's a resource issue.
>
> One odd thing I've noticed. I've got three streams that I union together.
> Two of the streams are in separate slot sharing groups, the third is not
> assigned to a group. But when I check the logs, I see three "Create multi
> task slot" entries. I'm wondering if unioning streams that are in different
> slot sharing groups creates a problem.
>
> Thanks,
>
> -- Ken
>
> On Thu, Sep 3, 2020 at 4:25 AM Yangze Guo  wrote:
>>
>> Hi,
>>
>> The failure of requesting slots usually because of the lack of
>> resources. If you put part of the workflow to a specific slot sharing
>> group, it may require more slots to run the workflow than before.
>> Could you share logs of the ResourceManager and SlotManager, I think
>> there are more clues in it.
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Sep 3, 2020 at 4:39 AM Ken Krugler  
>> wrote:
>> >
>> > Hi all,
>> >
>> > I’ve got a streaming workflow (using Flink 1.11.1) that runs fine locally 
>> > (via Eclipse), with a parallelism of either 3 or 6.
>> >
>> > If I set up part of the workflow to use a specific (not “default”) slot 
>> > sharing group with a parallelism of 3, and the remaining portions of the 
>> > workflow have a parallelism of either 1 or 2, then the workflow never 
>> > starts running, and eventually fails due to a slot request not being 
>> > fulfilled in time.
>> >
>> > So I’m wondering how best to debug this.
>> >
>> > I don’t see any information (even at DEBUG level) being logged about which 
>> > operators are in what slot sharing group, or which slots are assigned to 
>> > what groups.
>> >
>> > Thanks,
>> >
>> > — Ken
>> >
>> > PS - I’ve looked at https://issues.apache.org/jira/browse/FLINK-8712, and 
>> > tried the approach of setting # of slots in the config, but that didn’t 
>> > change anything. I see that issue is still open, so wondering what Til and 
>> > Konstantin have to say about it.
>> >
>> > --
>> > Ken Krugler
>> > http://www.scaleunlimited.com
>> > custom big data solutions & training
>> > Hadoop, Cascading, Cassandra & Solr
>> >
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>


Re: Difficulties with Minio state storage

2020-09-08 Thread Yangze Guo
Hi, Rex,

I've tried to use MinIO as state backend and everything seems works well.
Just sharing my configuration:
```
s3.access-key:
s3.secret-key:
s3.endpoint: http://localhost:9000
s3.path.style.access: true
state.checkpoints.dir: s3://flink/checkpoints
```

I think the problem might be caused by the following reasons:
- The MinIO is not well configured.
- Maybe you need to create a bucket for it first. In my case, I create
a bucket called "flink" first.

Best,
Yangze Guo

On Wed, Sep 9, 2020 at 9:33 AM Rex Fenley  wrote:
>
> Hello!
>
> I'm trying to test out Minio as state storage backend using docker-compose on 
> my local machine but keep running into errors that seem strange to me. Any 
> help would be much appreciated :)
>
> The problem:
> With the following environment:
>
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: flink-jobmanager
> parallelism.default: 2
> s3.access-key: 
> s3.secret-key: 
> s3.path.style.access: true
>
> And the following State Backend (with flink-jdbc-test_graph-minio_1 being the 
> container serving minio):
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStateBackend(
> new RocksDBStateBackend(
> "s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
> true
> )
> )
>
> And submitting the flink job and saving from another docker container like so:
>
> flink run -m flink-jdbc-test_flink-jobmanager_1:8081 -c  
> .jar
>
> flink savepoint -m flink-jdbc-test_flink-jobmanager_1:8081  
> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints
>
> I end up with the following error:
>
> Caused by: 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>  com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: 
> Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 
> A7E3BB7EEFB524FD; S3 Extended Request ID: 
> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=),
>  S3 Extended Request ID: 
> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY= 
> (Path: 
> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints/savepoint-5c4090-5f90e0cdc603/_metadata)
> at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
> at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
> at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
> at 
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
> at 
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
> at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306)
> ... 10 more
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
> (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request 
> ID: A7E3BB7EEFB524FD; S3 Extended Request ID: 
> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=)
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
>
> If I add to the environment to include:
> ...
> s3.endpoint: s3://flink-jdbc-test_graph-minio_1:9000
> ...
>
> Then I end up with the following error just trying to submit the job:
> Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a 
> valid host name: s3://flink-jdbc-test_graph-minio_1:9000
> at 
> com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
> at 
> com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)
>
> Changing s3: to http: like so:
> s3.endpoint: http://flink-jdbc-test_graph-minio_1:9000
>
> Then I receive the same error as before whe

Re: Use of slot sharing groups causing workflow to hang

2020-09-02 Thread Yangze Guo
Hi,

The failure of requesting slots usually because of the lack of
resources. If you put part of the workflow to a specific slot sharing
group, it may require more slots to run the workflow than before.
Could you share logs of the ResourceManager and SlotManager, I think
there are more clues in it.

Best,
Yangze Guo

On Thu, Sep 3, 2020 at 4:39 AM Ken Krugler  wrote:
>
> Hi all,
>
> I’ve got a streaming workflow (using Flink 1.11.1) that runs fine locally 
> (via Eclipse), with a parallelism of either 3 or 6.
>
> If I set up part of the workflow to use a specific (not “default”) slot 
> sharing group with a parallelism of 3, and the remaining portions of the 
> workflow have a parallelism of either 1 or 2, then the workflow never starts 
> running, and eventually fails due to a slot request not being fulfilled in 
> time.
>
> So I’m wondering how best to debug this.
>
> I don’t see any information (even at DEBUG level) being logged about which 
> operators are in what slot sharing group, or which slots are assigned to what 
> groups.
>
> Thanks,
>
> — Ken
>
> PS - I’ve looked at https://issues.apache.org/jira/browse/FLINK-8712, and 
> tried the approach of setting # of slots in the config, but that didn’t 
> change anything. I see that issue is still open, so wondering what Til and 
> Konstantin have to say about it.
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>


  1   2   >