Re:flink-checkpoint 问题

2024-01-10 Thread ouywl
我记得flink低版本有这个bug,会错误的删除某一个checkpoint的,你这个版本太老了,可以升级到新版本。


The following is the content of the forwarded email
From:"吴先生" <15951914...@163.com>
To:user-zh 
Date:2024-01-10 17:54:42
Subject:flink-checkpoint 问题

Flink版本: 1.12
checkpoint配置:hdfs
现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




Re:CUMULATE 窗口状态过大导致CK超时

2024-01-04 Thread ouywl
HI Jiaotong:
我的建议如下:
1. 本地存储使用高吞吐的SSD 
2. taskmanager.memory.managed.size 增加并且确保rocksdb memtable内存增加,减少rocksdb 刷磁盘的量
3. 如果有物化sink算子,关闭物化sink算子,减小state。


The following is the content of the forwarded email
From:"jiaot...@mail.jj.cn" 
To:user-zh 
Date:2024-01-05 09:41:01
Subject:CUMULATE 窗口状态过大导致CK超时

Hi All,
 我使用了CUMULATE( STEP => INTERVAL '1' MINUTES, SIZE => INTERVAL '1' DAYS) 
累积窗口,导致太多数据保存在状态中,即使开启了增量式RocksDB,但是当程序运行一段时间后,CK依然超时从而导致任务失败。因此想咨询对于这种大窗口大状态应该如何优化和使用。非常感谢
注:Flink版本 1.14.0



(DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread ouywl






Hi all    When I start a flinkcluster in session mode, It include jm/tm. And then I submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path  a.jar’. Even the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception “/opt/flink/bin/flink run --jobmanager ip:8081 --class com.netease.java.TopSpeedWindowing --parallelism 1 --detached /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath file:///opt/flink/job/fastjson-1.2.66.jarStarting execution of programExecuting TopSpeedWindowing example with default input data set.Use --input to specify file input.java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSONat com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)” As I read the code , flink cli have not load the —classspath jar, So It seems a bug about the flink cli. Are you agree with me?


  



Best,Ouywl



 






Re:Flink Web UI display nothing in k8s when use ingress

2020-03-03 Thread ouywl






Hi lake:    Ok, Show the jobmanager pod logs, Can you see the jm pods is running ok? Try use cube-proxy, or NodePort, That you can see the webUI?






  



Best,Ouywl



 


On 03/4/2020 14:08,LakeShen wrote: 


Hi community,        now we plan to move all flink tasks to k8s cluster. For one flink task , we want to see this flink task web ui . First , we create the k8s Service to expose 8081 port of jobmanager, then we use ingress controller so that we can see it outside.But the flink web like this :The flink web ui images and other info not display , what can I do to display flink web info ?Thanks to your replay.






When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-09 Thread ouywl
ointDueToTaskFailure(CheckpointCoordinator.java:1443)at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1353)at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:722)at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)









  

    

Best,Ouywl



 






Re: [Question] How to use different filesystem between checkpointdata and user data sink

2019-12-23 Thread ouywl
:438)
	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:359)
	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:116)
	... 13 more
Caused by: java.net.UnknownHostException: slothTest
	... 30 more






  










ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 12/19/2019 11:06,ouywl wrote: 







Hi Piotr Nowojski,   I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The jobmanage don’t start up ,and It load the filesystem plugin in my owner plugin jar . and the log is :  “2019-12-19 10:58:32,394 WARN  org.apache.flink.configuration.Configuration  - Config uses deprecated configuration key 'high-availability.zookeeper.storageDir' instead of proper key 'high-availability.storageDir'2019-12-19 10:58:32,398 INFO  com.filesystem.plugin.FileSystemFactoryEnhance-  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.failed.volumes.tolerated;  Ignoring.
2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
2019-12-19 10:58:32,878 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)  at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"






  










    ouywl




ou...@139.com

On 12/19/2019 00:01,Piotr Nowojski wrote: 


Hi,As Yang Wang pointed out, you should use the new plugins mechanism.If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi brok

Re: [Question] How to use different filesystem between checkpointdata and user data sink

2019-12-18 Thread ouywl







Hi Piotr Nowojski,   I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The jobmanage don’t start up ,and It load the filesystem plugin in my owner plugin jar . and the log is :  “2019-12-19 10:58:32,394 WARN  org.apache.flink.configuration.Configuration  - Config uses deprecated configuration key 'high-availability.zookeeper.storageDir' instead of proper key 'high-availability.storageDir'2019-12-19 10:58:32,398 INFO  com.filesystem.plugin.FileSystemFactoryEnhance-  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.failed.volumes.tolerated;  Ignoring.
2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
2019-12-19 10:58:32,878 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)  at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"






  










    ouywl




ou...@139.com

On 12/19/2019 00:01,Piotr Nowojski wrote: 


Hi,As Yang Wang pointed out, you should use the new plugins mechanism.If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi broken [1]. This is already fixed, but waiting to be released in 1.9.2 and 1.10.If it still doesn’t work, look for TaskManager logs what plugins/file systems are being loaded during the startup. If none, that's the problem.Piotrek[1] https://issues.apache.org/jira/browse/FLINK-14382On 18 Dec 2019, at 12:40, Yang Wang <danrtsey...@gmail.com> wrote:You could have a try the new plugin mechanism.Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it.Different plugins will be loaded by separate classloader to avoid conflict.Best,Yangvino yang <yanghua1...@gmail.com> 于2019年12月18日周三 下午6:46写道:Hi ouywl,>>Thread.currentThread().getContextClassLoader();What does this statement mean in your program?In addition, can you share your implementation of the customized file system plugin and the related exception?Best,Vinoouywl <ou...@139.com> 于2019年12月18日周三 下午4:5

[Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread ouywl







Hi all,    We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component.     We implemeted step:      1.  ‘FileSystemEnhance’ is implement from “FileSystem”      2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance"       3. Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of “ FileSystemFactoryEnhance.class”And  the job mainclass is :   “ public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(60*1000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getConfig().enableSysoutLogging();Properties props = new Properties();props.put("bootstrap.servers", SERVERS);props.put("group.id", GROUPID);props.put("enable.auto.commit", "true");// props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "3");props.put("auto.offset.reset", "latest");props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010("zyf_test_2", new SimpleStringSchema(), props);DataStream source = env.addSource(consumer011).setParallelism(1);source.print();Thread.currentThread().getContextClassLoader();StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8")).build();source.addSink(sink);env.execute();}”And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems how to avoid use “Thread.currentThread().getContextClassLoader()"






  





    




ouywl




ou...@139.com
 






Re:flink 1.9 conflict jackson version

2019-12-16 Thread ouywl






Hi Bu    I think It can use mvn-shade-plugin to resolve your problem,  It seem flink-client conflict with your owner jar?






  










ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 12/17/2019 08:10,Fanbin Bu wrote: 


Hi,After I upgrade flink 1.9, I got the following error message on EMR, it works locally on IntelliJ.I'm explicitly declaring the dependency as implementation 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'and I haveimplementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.11.595'java.lang.NoSuchMethodError: com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper;
	at com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54)
	at com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30)
	at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65)
	at com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53)
	at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)
	at com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256)
	at com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460)
	at com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424)
	at com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80)
	at com.coinbase.util.KmsClient$.getSnowflakeUsernamePassword(KmsClient.scala:21)
	at com.coinbase.ml.RunFlinkJob$.runBatch(RunFlinkJob.scala:94)
	at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:38)
	at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
	at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)






Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-12 Thread ouywl






 @Li Peng    I found your problems.  Your start cmd use args “start-foreground”, It will run “exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}””, and In ' flink-console.sh’, the code is “log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")” . So the log4j.properties not work. It need log4j-console.properties and logback-console.xml.






  









    
    ouywl




ou...@139.com








签名由
网易邮箱大师
定制
    
 


On 12/12/2019 15:35,ouywl wrote: 






HI yang,   Could you give more info detail? log4j.properties content, and The k8s yaml. Is use the dockerfile in flink-container? When I test it use the default per-job yaml in flick-container? It is only show logs in docker infos. And not logs in /opt/flink/log.






  









    
    ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 12/12/2019 13:47,Yang Wang wrote: 


Hi Peng,What i mean is to use `docker exec` into the running pod and `ps` to get the realcommand that is running for jobmanager. Do you have checked the /opt/flink/conf/log4j.properties is right?I have tested standalone per-job on my kubernetes cluster, the logs show up as expected.Best,YangLi Peng <li.p...@doordash.com> 于2019年12月12日周四 上午2:59写道:Hey Yang, here are the commands:"/opt/flink/bin/taskmanager.sh","start-foreground","-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager","-Dtaskmanager.numberOfTaskSlots=1""/opt/flink/bin/standalone-job.sh","start-foreground","-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager","-Dparallelism.default={{ .Values.task.replicaCount }}"Yes it's very curious that I don't see any logs actually written to /opt/flink/log. On Tue, Dec 10, 2019 at 11:17 PM Yang Wang <danrtsey...@gmail.com> wrote:Could you find the logs under /opt/flink/log/jobmanager.log? If not, please share thecommands the JobManager and TaskManager are using? If the command is correctand the log4j under /opt/flink/conf is expected, it is so curious why we could not get the logs.Best,YangLi Peng <li.p...@doordash.com> 于2019年12月11日周三 下午1:24写道:Ah I see. I think the Flink app is reading files from /opt/flink/conf correctly as it is, since changes I make to flink-conf are picked up as expected, it's just the log4j properties that are either not being used, or don't apply to stdout or whatever source k8 uses for its logs? Given that the pods don't seem to have logs written to file anywhere, contrary to the properties, I'm inclined to say it's the former and that the log4j properties just aren't being picked up. Still have no idea why though.On Tue, Dec 10, 2019 at 6:56 PM Yun Tang <myas...@live.com> wrote:







Sure, /opt/flink/conf is mounted as a volume from the configmap.
 
Best
Yun Tang
 

From: Li Peng <li.p...@doordash.com>
Date: Wednesday, December 11, 2019 at 9:37 AM
To: Yang Wang <danrtsey...@gmail.com>
Cc: vino yang <yanghua1...@gmail.com>, user <user@flink.apache.org>
Subject: Re: Flink on Kubernetes seems to ignore log4j.properties


 


1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and /opt/flink/bin/taskmanager.sh on my job and task managers respectively. It's based on the setup described here: http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ .
 I haven't tried the configmap approach yet, does it also replace the conf files in /opt/flink/conf?

2. Hey Vino, here's a sample of the kubernetes: https://pastebin.com/fqJrgjZu  I didn't change any patterns from the default, so the string patterns should look the same, but as you can see it's full of info checkpoint
 logs that I originally was trying to suppress. Based on my log4j.properties, the level shoul

Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-11 Thread ouywl






HI yang,   Could you give more info detail? log4j.properties content, and The k8s yaml. Is use the dockerfile in flink-container? When I test it use the default per-job yaml in flick-container? It is only show logs in docker infos. And not logs in /opt/flink/log.






  










ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 12/12/2019 13:47,Yang Wang wrote: 


Hi Peng,What i mean is to use `docker exec` into the running pod and `ps` to get the realcommand that is running for jobmanager. Do you have checked the /opt/flink/conf/log4j.properties is right?I have tested standalone per-job on my kubernetes cluster, the logs show up as expected.Best,YangLi Peng <li.p...@doordash.com> 于2019年12月12日周四 上午2:59写道:Hey Yang, here are the commands:"/opt/flink/bin/taskmanager.sh","start-foreground","-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager","-Dtaskmanager.numberOfTaskSlots=1""/opt/flink/bin/standalone-job.sh","start-foreground","-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager","-Dparallelism.default={{ .Values.task.replicaCount }}"Yes it's very curious that I don't see any logs actually written to /opt/flink/log. On Tue, Dec 10, 2019 at 11:17 PM Yang Wang <danrtsey...@gmail.com> wrote:Could you find the logs under /opt/flink/log/jobmanager.log? If not, please share thecommands the JobManager and TaskManager are using? If the command is correctand the log4j under /opt/flink/conf is expected, it is so curious why we could not get the logs.Best,YangLi Peng <li.p...@doordash.com> 于2019年12月11日周三 下午1:24写道:Ah I see. I think the Flink app is reading files from /opt/flink/conf correctly as it is, since changes I make to flink-conf are picked up as expected, it's just the log4j properties that are either not being used, or don't apply to stdout or whatever source k8 uses for its logs? Given that the pods don't seem to have logs written to file anywhere, contrary to the properties, I'm inclined to say it's the former and that the log4j properties just aren't being picked up. Still have no idea why though.On Tue, Dec 10, 2019 at 6:56 PM Yun Tang <myas...@live.com> wrote:







Sure, /opt/flink/conf is mounted as a volume from the configmap.
 
Best
Yun Tang
 

From: Li Peng <li.p...@doordash.com>
Date: Wednesday, December 11, 2019 at 9:37 AM
To: Yang Wang <danrtsey...@gmail.com>
Cc: vino yang <yanghua1...@gmail.com>, user <user@flink.apache.org>
Subject: Re: Flink on Kubernetes seems to ignore log4j.properties


 


1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and /opt/flink/bin/taskmanager.sh on my job and task managers respectively. It's based on the setup described here: http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ .
 I haven't tried the configmap approach yet, does it also replace the conf files in /opt/flink/conf?

2. Hey Vino, here's a sample of the kubernetes: https://pastebin.com/fqJrgjZu  I didn't change any patterns from the default, so the string patterns should look the same, but as you can see it's full of info checkpoint
 logs that I originally was trying to suppress. Based on my log4j.properties, the level should be set to WARN. I couldn't actually find any .out files on the pod, this is from the kubectl logs command. I also didn't see any files in /opt/flink/log, which I
 thought my log4j was specified to do, hence me thinking that the properties weren't actually being consumed. I also have the same properties in my src/main/resources folder.

3. Hey Yang, yes this is a standalone session cluster. I did specify in the docker file to copy the log4j.properties to the /opt/flink/conf folder on the image, and I confirmed that the properties are correct when I bash'd into the pod and viewed them manually.


 


Incidentally, I also tried passing the -Dlog4j.configuration argument to the programs, and it doesn't work either. And based on what I'm reading on jira, that option is not really supported anymore?

 


Thanks for your responses, folks!


Li



 


On Mon, Dec 9, 2019 at 7:10 PM Yang Wang <danrtsey...@gmail.com> wrote:



Hi Li Peng, 

 


You are running standalone session cluster or per-job cluster on kubernetes. Right?


If so, i think you need to check your log4j.properties in the image, not local. The log is


stored to /opt/flink/log/jobmanag

Re: [DISCUSS] Support configure remote flink jar

2019-11-19 Thread ouywl







I have implemented this feature in our env, Use ‘Init Container’ of docker to get URL of a jar file ,It seems a good idea.






  










ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 11/19/2019 12:11,Thomas Weise wrote: 


There is a related use case (not specific to HDFS) that I came across:It would be nice if the jar upload endpoint could accept the URL of a jar file as alternative to the jar file itself. Such URL could point to an artifactory or distributed file system.ThomasOn Mon, Nov 18, 2019 at 7:40 PM Yang Wang <danrtsey...@gmail.com> wrote:Hi tison,

Thanks for your starting this discussion.
* For user customized flink-dist jar, it is an useful feature. Since it
could avoid to upload the flink-dist jar
every time. Especially in production environment, it could accelerate the
submission process.
* For the standard flink-dist jar, FLINK-13938[1] could solve
the problem.Upload a official flink release
binary to distributed storage(hdfs) first, and then all the submission
could benefit from it. Users could
also upload the customized flink-dist jar to accelerate their submission.

If the flink-dist jar could be specified to a remote path, maybe the user
jar have the same situation.

[1]. https://issues.apache.org/jira/browse/FLINK-13938

tison <wander4...@gmail.com> 于2019年11月19日周二 上午11:17写道:

> Hi forks,
>
> Recently, our customers ask for a feature configuring remote flink jar.
> I'd like to reach to you guys
> to see whether or not it is a general need.
>
> ATM Flink only supports configures local file as flink jar via `-yj`
> option. If we pass a HDFS file
> path, due to implementation detail it will fail with
> IllegalArgumentException. In the story we support
> configure remote flink jar, this limitation is eliminated. We also make
> use of YARN locality so that
> reducing uploading overhead, instead, asking YARN to localize the jar on
> AM container started.
>
> Besides, it possibly has overlap with FLINK-13938. I'd like to put the
> discussion on our
> mailing list first.
>
> Are you looking forward to such a feature?
>
> @Yang Wang: this feature is different from that we discussed offline, it
> only focuses on flink jar, not
> all ship files.
>
> Best,
> tison.
>







Elasticsearch6UpsertTableSink how to trigger es delete index。

2019-10-16 Thread ouywl











Hi,    When I use Elasticsearch6UpsertTableSink, and It seems implements delete index. Like code:             @Overridepublic void process(Tuple2, Row> element, RuntimeContext ctx, RequestIndexer indexer) {   if (element.f0) {  processUpsert(element.f1, indexer);   } else {  processDelete(element.f1, indexer);   }}

  


I don’t  which condition can trigger element.f0 == false and delete es index call processDelete() 






    ouywl




ou...@139.com








签名由
网易邮箱大师
定制