Flink-1.10-SQL TopN语法问题

2020-05-08 Thread guaishushu1...@163.com
hi 大家,我在使用TopN语法,往mysql写数据时,发现必须将rownum设置为数据库的主键,要不然会报错,有人遇到吗 guaishushu1...@163.com

Re:Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException

2020-05-08 Thread chenkaibit
Hi: 加了一些日志后发现是 checkpointMetaData 为 NULL 了 https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1421 测试程序为读 kafka,然后进行 wordcount,结果写入 kafka。checkpoint 配置如下: | Checkpointing Mode | Exactly Once | | Interval

Re: Cannot start native K8s

2020-05-08 Thread Dongwon Kim
Hi Yang, Oops, I forget to copy /etc/kube/admin.conf to $HOME/.kube/config so that the current user account can access to K8s. Now that I copied it, I found that kubernetes-session.sh is working fine. Thanks very much! Best, Dongwon [flink@DAC-E04-W06 ~]$ kubernetes-session.sh 2020-05-09

Re: 可撤回流是否可以进行双流 Join?

2020-05-08 Thread Benchao Li
你指的是time interval join,还是普通的双流join? time interval join应该是不能的。一般的双流join应该是可以的。 LakeShen 于2020年5月9日周六 上午11:28写道: > Hi 各位, > > 最近有业务需求,需要用到双流 Join (Interval Join),如果两个流中,其中一个流式可撤回流(Retract),或者两个流都是 > Retract 流,那么他们还能进行双流 Join 吗? > > 目前我知道如果两个流是 Append 流的话,肯定可以双流 Join,但是其中一个流式 Retract 流,就不知道还能不能

Re: Cannot start native K8s

2020-05-08 Thread Yang Wang
Hi Dongwon Kim, Thanks a lot for your information. I will dig into this issue. I think the "UnknownHostException" is caused by incorrectly setting the Kubernetes ApiServer address. Maybe you are using "kubernetes.default.svc". However, it could not be accessed outside of the Kubernetes cluster.

可撤回流是否可以进行双流 Join?

2020-05-08 Thread LakeShen
Hi 各位, 最近有业务需求,需要用到双流 Join (Interval Join),如果两个流中,其中一个流式可撤回流(Retract),或者两个流都是 Retract 流,那么他们还能进行双流 Join 吗? 目前我知道如果两个流是 Append 流的话,肯定可以双流 Join,但是其中一个流式 Retract 流,就不知道还能不能 Join 了。 期望你的回复。 Best, LakeShen

????????????webui??????job

2020-05-08 Thread Zhonghan Tang
? | | Zhonghan Tang | | 13122260...@163.com | ?? ??2020??05??9?? 11:20(Jiacheng Jiang)<920334...@qq.com> ?? hi all flink web ui??jobweb

??????webui??????job

2020-05-08 Thread ??????(Jiacheng Jiang)
hi all flink web ui??jobweb ui??job??savepointweb??job??

Re: flink-1.10 on yarn日志输出问题

2020-05-08 Thread LakeShen
Yarn 日志的话,直接根据 任务的 Application ID ,去 Yarn 的 Web UI 上面看吧。 Best, LakeShen guaishushu1...@163.com 于2020年5月8日周五 下午3:43写道: > 日志全部输出到.err日志里面了,flink-web也看不到日志有人知道吗? > > -- > guaishushu1...@163.com >

回复: 关于flink1.10.0使用HiveModule的问题

2020-05-08 Thread like
Hi, Jingsong Lee 非常感谢你的回答! 在不使用正则的情况下我用REGEXP替换REGEXP_REPLACE,暂时解决了该问题, 后续会按你提供的方案进行解决,还请不吝赐教,多谢! 在2020年5月8日 18:14,Jingsong Li 写道: Hi, 本来应该使用Flink内置的函数的,但是目前由于有bug[1],使用到了HiveModule。 一个简单的办法是实现一个你的Module,继承自HiveModule,overwrite getFunctionDefinition方法,REGEXP_REPLACE时返回empty。

Re: Cannot start native K8s

2020-05-08 Thread Dongwon Kim
Hello Yang, I'm using K8s v1.18.2 installed by Kubeadm over a cluster of 5 nodes (not a Minikube). Previously, as you pointed out, openjdk version "1.8.0_252" was installed. I bump up java version to openjdk 11.0.7 but got something different: [flink@DAC-E04-W06 bin]$ ./kubernetes-session.sh

Re: Cannot start native K8s

2020-05-08 Thread Yang Wang
Hi Dongwon Kim, Are you running Flink on a minikube or a real Kubernetes cluster? I just could not reproduce it in a real Kubernetes cluster with java 8u252. For minikube, i get the exception with you. Best, Yang Yang Wang 于2020年5月6日周三 上午9:29写道: > Hi Dongwon Kim, > > I think it is a known

Re: No Slots available exception in Apache Flink Job Manager while Scheduling

2020-05-08 Thread Xintong Song
Linking to the jira ticket, for the record. https://issues.apache.org/jira/browse/FLINK-17560 Thank you~ Xintong Song On Sat, May 9, 2020 at 2:14 AM Josson Paul wrote: > Set up > -- > Flink verson 1.8.3 > > Zookeeper HA cluster > > 1 ResourceManager/Dispatcher (Same Node) > 1

Re: Flink on Kubernetes unable to Recover from failure

2020-05-08 Thread Yun Tang
Hi Morgan If "because the number of task slots has been reduced to zero", do you mean the total task slots reduced to 0? And how many registered task managers could you see when this happened (you could click to the "Task Managers" tab to view related information). All containers running do

Re: Correctly implementing of SourceFunction.run()

2020-05-08 Thread Senthil Kumar
OK, thank you. Much appreciated. Yes, I don’t want the job to fail. The source has very little data that is being pumped into a Broadcast stream. From: Robert Metzger Date: Friday, May 8, 2020 at 9:51 AM To: Jingsong Li Cc: Senthil Kumar , "user@flink.apache.org" Subject: Re: Correctly

No Slots available exception in Apache Flink Job Manager while Scheduling

2020-05-08 Thread Josson Paul
Set up -- Flink verson 1.8.3 Zookeeper HA cluster 1 ResourceManager/Dispatcher (Same Node) 1 TaskManager 4 pipelines running with various parallelism's Issue -- Occationally when the Job Manager gets restarted we noticed that all the pipelines are not getting scheduled. The error that

Re: Correctly implementing of SourceFunction.run()

2020-05-08 Thread Robert Metzger
Hey Kumar, if you are swallowing any and all exceptions, your Flink job will not fail because of issues arising from your custom source. It might make sense to stop the source if you are catching an InterruptedException. Throwing exceptions out of the run method basically signals the Flink

Re: MongoDB sink;

2020-05-08 Thread Robert Metzger
I'm also not aware of a MongoDB sink in Flink. The code provided by Jingsong applies to the "SinkFunction" interface of Flink. That's a good starting point to implement a custom Sink. On Wed, May 6, 2020 at 9:46 AM Jingsong Li wrote: > Hi, > > My impression is that MongoDB's API is not

Re: Flink on Kubernetes unable to Recover from failure

2020-05-08 Thread Robert Metzger
Hey Morgan, Is it possible for you to provide us with the full logs of the JobManager and the affected TaskManager? This might give us a hint why the number of task slots is zero. Best, Robert On Tue, May 5, 2020 at 11:41 AM Morgan Geldenhuys < morgan.geldenh...@tu-berlin.de> wrote: > >

Re: flink-s3-fs-hadoop retry configuration

2020-05-08 Thread Robert Metzger
I validated my assumption. Putting s3.connection.maximum: 123456 into the flink-conf.yaml file results in the following DEBUG log output: 2020-05-08 16:20:47,461 DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader [] - Adding Flink config entry for s3.connection.maximum as

Re: flink-s3-fs-hadoop retry configuration

2020-05-08 Thread Robert Metzger
Hey Jeff, Which Flink version are you using? Have you tried configuring the S3 filesystem via Flink's config yaml? Afaik all config parameters prefixed with "s3." are mirrored into the Hadoop file system connector. On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson wrote: > > 2) How can I tell

Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Kurt Young
+dev Best, Kurt On Fri, May 8, 2020 at 3:35 PM Caizhi Weng wrote: > Hi Jeff, > > Thanks for the response. However I'm using executeAsync so that I can run > the job asynchronously and get a JobClient to monitor the job. JobListener > only works for synchronous execute method. Is there other

Re: java.lang.IllegalStateException: The RPC connection is already closed

2020-05-08 Thread Robert Metzger
Are you able to reproduce the issue reliably? If so, it would be nice if you could test if the issue still happens with the 1.10.1 release candidate: https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc3/ On Mon, May 4, 2020 at 6:08 PM Manish G wrote: >

Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-08 Thread Seth Wiesman
Gordon is correct. Additionally, if you are using flink 1.10 you may be running into a known bug that has been resolved in 1.10.1 which will be released soon. Seth https://issues.apache.org/jira/browse/FLINK-16313 On Fri, May 8, 2020 at 5:19 AM Tzu-Li (Gordon) Tai wrote: > Hi, > > The last

Re: What is the RocksDB local directory in flink checkpointing?

2020-05-08 Thread Yun Tang
Hi LakeShen You could refer to [1] and [2] to know the temporary directory in YARN, the related log could be "Setting directories for temporary files to: " or "Overriding Fink's temporary file directories with those specified in the Flink config: " [1]

Re: What is the RocksDB local directory in flink checkpointing?

2020-05-08 Thread Yun Tang
Hi LakeShen You could refer to [1] and [2] to know the temporary directory in YARN, the related log could be "Setting directories for temporary files to: " or "Overriding Fink's temporary file directories with those specified in the Flink config: " [1]

Re: Window processing in Stateful Functions

2020-05-08 Thread m@xi
Dear Igal,Very insightful answer. Thanks. Igal Shilman wrote > An alternative approach would be to implement a * > thumbling window * > per vertex(a stateful function instance)by sending to itself a delayed > message [2]. When that specific delayedmessage arrives you wouldhave to > purge the

Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-08 Thread Tzu-Li (Gordon) Tai
Hi, The last time I saw this error, was that there was a mismatch in the used flink-state-processor-api version and other core Flink dependencies. Could you confirm that? Also, are you seeing this assertion error consistently, or only occasionally? cc'ing Seth, maybe he has other clues on the

Re: 关于flink1.10.0使用HiveModule的问题

2020-05-08 Thread Jingsong Li
Hi, 本来应该使用Flink内置的函数的,但是目前由于有bug[1],使用到了HiveModule。 一个简单的办法是实现一个你的Module,继承自HiveModule,overwrite getFunctionDefinition方法,REGEXP_REPLACE时返回empty。 [1]https://issues.apache.org/jira/browse/FLINK-15595 Best, Jingsong Lee On Fri, May 8, 2020 at 5:19 PM like wrote: > >

flink1.10 po extends ParentPo??print??????????print parent??????????

2020-05-08 Thread ??????????????
??PO?? env.addSource(consumer) .uid("source") .setParallelism(parameterTool.getInt("source_num", 1)) //json??value .map(value - JSONObject.parseObject(value, IPEntiy.class)) .uid("map")

回复: 关于flink1.10.0使用HiveModule的问题

2020-05-08 Thread like
最初因为使用了hive中自定义的UDF,所以我注册了hive的catalog,后面又要用到hive的内置函数,所以又用了HiveModule,代码如下: val hive = new HiveCatalog("hive", "default", "/etc/hive_config", "1.2.1") tableEnv.registerCatalog("hive", hive) tableEnv.useCatalog("hive") tableEnv.useDatabase("default") tableEnv.loadModule("myhive", new

Re: 关于flink1.10.0使用HiveModule的问题

2020-05-08 Thread Jingsong Li
Hi, 你是怎么用HiveModule的?还保留了CoreModule吗? Best, Jingsong Lee On Fri, May 8, 2020 at 4:14 PM like wrote: > 各位大佬好, > 目前我在使用HiveModule的过程中碰到了一些问题,在未使用HiveModule的时候用了REGEXP_REPLACE > 函数,这是可以正常使用的,在使用HiveModule之后却出现了问题,当我把字符替换为空的时候就会报错 > REGEXP_REPLACE('abcd', 'a',

Re: Re: exception:beyond the 'PHYSICAL' memory limit

2020-05-08 Thread tiantingting5...@163.com
谢谢你,问题已经找到了,是我的程序导致的内存泄露。 应该是getProperties方法每被调用一次,都会读流,造成的内存泄漏 public class LazyLoadConfig { public static Properties getProperties(String name) { InputStream in = LazyLoadConfig.class.getClassLoader().getResourceAsStream(name); Properties prop = new Properties();

关于flink1.10.0使用HiveModule的问题

2020-05-08 Thread like
各位大佬好, 目前我在使用HiveModule的过程中碰到了一些问题,在未使用HiveModule的时候用了REGEXP_REPLACE 函数,这是可以正常使用的,在使用HiveModule之后却出现了问题,当我把字符替换为空的时候就会报错 REGEXP_REPLACE('abcd', 'a', ''),大家有碰到这个问题吗?或者我能选择使用flink或者hive的内置函数吗? 异常堆栈信息如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-08 Thread Congxian Qiu
Hi Currently, it is hard to determine which files can be deleted safely in the shared folder, the ground truth is in the checkpoint metafile. I've created an issue[1] for such a feature [1] https://issues.apache.org/jira/browse/FLINK-17571 Best, Congxian Trystan 于2020年5月8日周五 下午1:05写道: > Aha,

Re: exception:beyond the 'PHYSICAL' memory limit

2020-05-08 Thread Xintong Song
也可以尝试按照下面的方法进一步调大 native 内存。 1. taskmanager.memory.task.heap.size 进一步调小,比如 1.5g 或 1g 2. taskmanager.memory.jvm-overhead.max 调大以避免冲突,建议先调成 4g 另外你说这个问题是作业运行一段时间后发生的,具体是多长时间发生的呢?也可以考虑先把 -ytm 调得很大,观察一下 container 的内存用量是如何增长的。如果持续增长不收敛,那有可能是存在内存泄漏的问题。 Thank you~ Xintong Song On Fri, May 8, 2020 at

Re: exception:beyond the 'PHYSICAL' memory limit

2020-05-08 Thread Xintong Song
这个报错是 Flink 使用的内存超出了申请的 container 大小,被 yarn 杀掉了。因为 jvm 没有报 oom,所以应该只能是 native 内存超用。 看了一下你的配置,按照这个配置 jvm overhead 留出了 800MB+,再加上按你说的 streaming 作业没有用到 state,那 512 MB 的 manage memory 也空出来了,加在一起 native 内存有 1.3GB+ 应该是够用的。从 dump 信息来看配置是正常生效了,也没有其他子进程,按理说不应该超用才对。 建议还是再确认下是否有用到了 rocksdb,以及是否有用到其他使用

flink-1.10 on yarn日志输出问题

2020-05-08 Thread guaishushu1...@163.com
日志全部输出到.err日志里面了,flink-web也看不到日志有人知道吗? guaishushu1...@163.com

Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Caizhi Weng
Hi Jeff, Thanks for the response. However I'm using executeAsync so that I can run the job asynchronously and get a JobClient to monitor the job. JobListener only works for synchronous execute method. Is there other way to achieve this? Jeff Zhang 于2020年5月8日周五 下午3:29写道: > I use

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

2020-05-08 Thread Luan Cooper
圆满,感谢 On Fri, May 8, 2020 at 10:19 AM Jark Wu wrote: > Hi, > > FLIP-95 和 FLIP-105 后,上述的 query 就可以原生支持了。 > FLIP-95 和 FLIP-105 的核心工作就是识别 binlog 中的 update/delete/insert 消息,而不是全当成 > append 消息。 > 预计 1.11 能见到这些功能。 > > Best, > Jark > > On Thu, 7 May 2020 at 21:34, oliver wrote: > > > > 有其他办法吗 > > > >

Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Jeff Zhang
I use JobListener#onJobExecuted to be notified that the flink job is done. It is pretty reliable for me, the only exception is the client process is down. BTW, the reason you see ApplicationNotFound exception is that yarn app is terminated which means the flink cluster is shutdown. While for

Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-08 Thread luisfaamaral
No one? :) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: flink how to access remote hdfs using namenode nameservice

2020-05-08 Thread Yang Wang
Do you mean to use the hdfs nameservice? You could find it with config key "dfs.nameservices" in hdfs-site.xml. For example, hdfs://myhdfs/flink/recovery. Please keep in mind that you need to set the HADOOP_CONF_DIR environment beforehand. Best, Yang wangl...@geekplus.com.cn 于2020年5月7日周四

What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Caizhi Weng
Hi dear Flink community, I would like to determine whether a job has finished (no matter successfully or exceptionally) in my code. I used to think that JobClient#getJobStatus is a good idea, but I found that it behaves quite differently under different executing environments. For example, under