Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread vino yang
Hi ouywl, *>>Thread.currentThread().getContextClassLoader();* What does this statement mean in your program? In addition, can you share your implementation of the customized file system plugin and the related exception? Best, Vino ouywl 于2019年12月18日周三 下午4:59写道: > Hi all, > We have

Deploying Operator on concrete Task Manager

2019-12-18 Thread KristoffSC
Hi, I have a question regarding job/operator deployment on Task Managers. If I understand correctly, my job will be spitted into individual tasks, which will be "deployed and executed" on particular task slot/s of Task Manager (depending on parallelism level of course). Lets imagine I have a Job

Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread Yang Wang
You could have a try the new plugin mechanism. Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it. Different plugins will be loaded by separate classloader to avoid conflict. Best, Yang vino yang 于2019年12月18日周三 下午6:46写道: > Hi

Re: Deploying Operator on concrete Task Manager

2019-12-18 Thread Zhu Zhu
Hi KristoffSC, Flink does not support specifying the TM for tasks. So I think you need to launch a separate job to do the "AsyncCall + map" in the secured zone. Thanks, Zhu Zhu KristoffSC 于2019年12月18日周三 下午8:04写道: > Hi, > I have a question regarding job/operator deployment on Task Managers. >

Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi, I want to get the last value stored in ValueState when processing element in Trigger. But as the log shows that sometimes I can get the value, sometimes not. Only one key in my data(SensorReading). ValueState: class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

Re: Deploying Operator on concrete Task Manager

2019-12-18 Thread KristoffSC
Hi, thanks for the replay. Just to clarify, I will have to have *a new Flink Cluster* (Job Manager and Task Manager) that will run in the secure zone which will ran the AsyncEnrich Job right? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi Vino, Thanks for your reply ! The key of my input data is same value. So I think there is only one partition. And Why sometimes I can get the value stored in the ValueState before update? > > > > before update value : 3 > > > > after update value: 4 What’s more, How can I stored the

Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-18 Thread Ethan Li
Thank you Vino for the information. Best, Ethan > On Dec 17, 2019, at 8:29 PM, vino yang wrote: > > Hi Ethan, > > Share two things: > > I have found "taskmanager.memory.preallocate" config option has been removed > in the master codebase. > After researching git history, I found the

[DISCUSS] Drop vendor specific repositories from pom.xml

2019-12-18 Thread Till Rohrmann
Hi everyone, following the discussion started by Seth [1] I would like to discuss dropping the vendor specific repositories from Flink's parent pom.xml. As building Flink against a vendor specific Hadoop version is no longer needed (as it simply needs to be added to the classpath) and documented,

Re: Restore metrics on broadcast state after restart

2019-12-18 Thread Yun Tang
Hi Gaël You can try initializeState [1] to initialize your metrics values from states when restoring from a checkpoint. context.getOperatorStateStore().getBroadcastState() could visit your restored broadcast state. [1]

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi Vino, Maybe it is due to the type of window. What I used is ProcessingTimeSessionWindows, while keyedState is scoped to window and key. Window changes so that the ValueState is different. Best  regards Utopia 在 2019年12月18日 +0800 22:30,Utopia ,写道: > Hi Vino, > > Thanks for your reply ! > >

Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread Piotr Nowojski
Hi, As Yang Wang pointed out, you should use the new plugins mechanism. If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct

Re: [DISCUSS] Drop vendor specific repositories from pom.xml

2019-12-18 Thread Robert Metzger
I guess we are talking about this profile [1] in the pom.xml? +1 to remove. I'm not sure if we need to rush this for the 1.10 release. The profile is not doing us any harm at the moment. [1]https://github.com/apache/flink/blob/master/pom.xml#L1035 On Wed, Dec 18, 2019 at 4:51 PM Till Rohrmann

Operators resource requirements on K8s Flink session cluster

2019-12-18 Thread Michaël Melchiore
Hello, I plan to run topologies on a Flink session cluster on Kubernetes. In my topologies, operators will have varying resource requirements in term of CPU and RAM. How can I make these informations available from Flink to Kubernetes so the latter takes it into account to optimize its deployment

How to convert retract stream to dynamic table?

2019-12-18 Thread James Baker
Hi! I've been looking at Flink for the last few days and have very much appreciated the concept of Dynamic Tables, it solves a lot of my needs and handles a lot of the complex state tracking that is otherwise painful. I have a question about the composability of the system which the docs don't

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia, The behavior may be correct. First, the default value is null. It's the correct value. `ValueStateDescriptor` has multiple constructors, some of them can let you specify a default value. However, these constructors are deprecated. And the doc does not recommend them.[1] For the other

Re: MapState with List Type for values

2019-12-18 Thread Yun Tang
Hi Aaron You cannot set up a map state whose value is a list state, but you can set its value as a list. However, I think that would also suffer in serialize/deserialize when appending the list as value. What is the KEY in your map state? If you could use emoji as your KEY, and you could act

Re: How to reprocess certain events in Flink?

2019-12-18 Thread Zhu Zhu
Hi Pooja, My main confusion is, if 2 events have the same transaction_id, how can we tell if it is a wanted one for value updates, or it is an unwanted duplicate? MapState with a TTL can be used for deduplicating, if it is supposed that a duplicated event would not happen too late after the

[Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread ouywl
Hi all,    We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component.     We

Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread M Singh
Hi: I am using AWS EMR with Flink application and two of the job managers are running on the same host.  I am looking at the metrics documentation (Apache Flink 1.9 Documentation: Metrics) and and see the following:  | | | | Apache Flink 1.9 Documentation: Metrics | | | -

Re: MapState with List Type for values

2019-12-18 Thread Aaron Langford
So the suggestion as I read it is to have some kind of shared queue for all waiting records. This allows for use of ListState, and cheap appends. Then the map state counts how many of each record is queued. When I finally get a record that allows me to remove elements from the queue, I can

Re: MapState with List Type for values

2019-12-18 Thread Yun Tang
Hi Aaron There exists a runtime key which acts in the keyBy action, and one map-key in your map state. Generally speaking, the runtime key is not the same as the map-key. If you could store your emoji as the map-key in your state, no list state is necessary. The basic idea is a bit like join

Need guidance on a use case

2019-12-18 Thread Eva Eva
Hi Team, I'm trying Flink for the first time and encountered an issue that I would like to discuss and understand if there is a way to achieve my use case with Flink. *Use case:* I need to perform unbounded stream joins on multiple data streams by listening to different Kafka topics. I have a

Re: MiniCluster with ProcessingTimeTrigger

2019-12-18 Thread John Morrow
Thanks Biao! I tried slowing down the input stream by replacing the env.fromCollection() with a custom SourceFunction (below) which drip feeds the data a bit slower. By the way, in my real scenario the datasource for the pipeline will be a RabbitMQ source. I do get better results, but it

[DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Hi folks, As release-1.10 is under feature-freeze(The stateless Python UDF is already supported), it is time for us to plan the features of PyFlink for the next release. To make sure the features supported in PyFlink are the mostly demanded for the community, we'd like to get more people

Re: How to convert retract stream to dynamic table?

2019-12-18 Thread Kurt Young
Hi James, If I understand correctly, you can use `TableEnvironment#sqlQuery` to achieve what you want. You can pass the whole sql statement in and get a `Table` back from the method. I believe this is the table you want which is semantically equivalent with the stream you mentioned. For example,

Re: [Question] How to use different filesystem between checkpointdata and user data sink

2019-12-18 Thread ouywl
Hi Piotr Nowojski,   I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The jobmanage don’t start up ,and It load the filesystem plugin in my owner plugin jar . and the log is :  “2019-12-19 10:58:32,394 WARN

Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Also CC user-zh. Best, Jincheng jincheng sun 于2019年12月19日周四 上午10:20写道: > Hi folks, > > As release-1.10 is under feature-freeze(The stateless Python UDF is > already supported), it is time for us to plan the features of PyFlink for > the next release. > > To make sure the features supported in

Re: Operators resource requirements on K8s Flink session cluster

2019-12-18 Thread Yang Wang
Hi Michaël, Glad to hear that you are going to run Flink workload on Kubernetes. AFAIK, we have two deployment ways. 1. Running Flink standalone session/per-job cluster on K8s. You need to calculate how many taskmanagers you need and the per taskmanager. All the taskmanager will be started by a

Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread vino yang
Hi Mans, IMO, one job manager represents one Flink cluster and one Flink cluster has a suite of Flink configuration e.g. metrics reporter. Some metrics reporters support tag feature, you can specify it to distinguish different Flink cluster.[1] [1]:

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia, IMO, your analysis is correct. Best, Vino Utopia 于2019年12月19日周四 上午12:44写道: > Hi Vino, > > Maybe it is due to the type of window. What I used is > ProcessingTimeSessionWindows, while keyedState is scoped to *window and > key*. Window changes so that the ValueState is different. > >

Rich Function Thread Safety

2019-12-18 Thread Aaron Langford
Hello Flink Community, I'm hoping to verify some understanding: If I have a function with managed state, I'm wondering if a checkpoint will ever be taken while a function is mutating state. I'll try to illustrate the situation I'm hoping to be safe from: Happy Path: t0 -> processFunction

Re: MiniCluster with ProcessingTimeTrigger

2019-12-18 Thread Biao Liu
Hi John, The critical issue of your test case is that it's a finite streaming job. The mini cluster or distributed cluster does not matter. When the job is finishing, there are some windows not triggered yet. The current behavior is dropping these windows. It's acceptable from the perspective of

Re: Rich Function Thread Safety

2019-12-18 Thread Zhu Zhu
Hi Aaron, It is thread safe since the state snapshot happens in the same thread with the user function. Thanks, Zhu Zhu Aaron Langford 于2019年12月19日周四 上午11:25写道: > Hello Flink Community, > > I'm hoping to verify some understanding: > > If I have a function with managed state, I'm wondering if

Kafka table descriptor missing startFromTimestamp()

2019-12-18 Thread Steve Whelan
Examining the org.apache.flink.table.descriptors.Kafka class in Flink v1.8, it has the following startUpModes for consumers: .startFromEarliest() .startFromLatest() .startFromSpecificOffsets(...) However, it does not have a method to support starting from a Timestamp. The

Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread Biao Liu
Hi Mans, That's indeed a problem. We have a plan to fix it. I think it could be included in 1.11. You could follow this issue [1] to check the progress. [1] https://issues.apache.org/jira/browse/FLINK-9543 Thanks, Biao /'bɪ.aʊ/ On Thu, 19 Dec 2019 at 14:51, vino yang wrote: > Hi Mans, > >

Re: Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-18 Thread Xintong Song
- "TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外" 这个是针对 batch (dataset / blink sql) 作业的,我看你跑的应该是 streaming 作业,把 taskmanager.memory.off-heap 设成 true 只是单纯为了减小 jvm heap size,留出空间给 rocksdb。 - 有一个 flink-examples 的目录,里面有一些示例作业,不过主要是展示 api 用法的。部署、资源调优方面的示例暂时还没有。 - 另外,我在上一封邮件里描述的解决方案,是针对 flink

Re: flink sql confluent schema avro topic注册成表

2019-12-18 Thread 陈帅
谢谢回复,有了schema registry url为何还需要填subject和avroSchemaStr呢? 朱广彬 于2019年12月18日周三 上午10:30写道: > Hi 陈帅, > > 目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro > schema的管理,所以,我们改动了flink-avro 的源码来支持。 > > 主要涉及到这些地方: > >

Re: flink如何动态修改窗口大小和类型?

2019-12-18 Thread jingjing bai
目前一个任务中,仅支持一种窗口。 动态修改本身应该是一个伪需求 如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。 LakeShen 于2019年12月18日周三 下午2:12写道: > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。 > > 陈帅 于2019年12月14日周六 下午6:44写道: > > > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口. > > >

Re: flink如何动态修改窗口大小和类型?

2019-12-18 Thread 陈帅
现实中确实有这类需求,简单点的是元素本身带有窗口大小设置,复杂点的是有一个控制窗口大小的控制流connect实现。 LakeShen 于2019年12月18日周三 下午2:12写道: > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。 > > 陈帅 于2019年12月14日周六 下午6:44写道: > > > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口. > > >

Re: flink如何动态修改窗口大小和类型?

2019-12-18 Thread Utopia
不是有 Dynamic gap 吗? Best  regards Utopia 在 2019年12月18日 +0800 22:34,jingjing bai ,写道: > 目前一个任务中,仅支持一种窗口。 > 动态修改本身应该是一个伪需求 > 如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。 > > > LakeShen 于2019年12月18日周三 下午2:12写道: > > > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。 > > > > 陈帅 于2019年12月14日周六 下午6:44写道: > > >

flink跨集群kerberos认证问题

2019-12-18 Thread 李现
各位好, flink集群跨集群无法通过kerberos认证。 集群1:flink集群,无kerberos认证 集群2:hadoop2.6.0集群,有kerberos认证 集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。 现在没有什么思路,希望有经验的朋友提点下,不胜感激。

Re: Re: flink如何动态修改窗口大小和类型?

2019-12-18 Thread 王双利
有例子吗?复杂点的是有一个控制窗口大小的控制流connect实现 王双利 发件人: 陈帅 发送时间: 2019-12-18 22:51 收件人: user-zh@flink.apache.org 主题: Re: flink如何动态修改窗口大小和类型? 现实中确实有这类需求,简单点的是元素本身带有窗口大小设置,复杂点的是有一个控制窗口大小的控制流connect实现。 LakeShen 于2019年12月18日周三 下午2:12写道: > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。 > > 陈帅 于2019年12月14日周六

Re: (补充图片链接) 关于直接设置Watermark和flatmap后再设置的疑问

2019-12-18 Thread Dino Zhang
kafka的exactly once是通过checkpoint机制保存消费位点来保证的,和event time没关系。在进入时间窗口前提取event time和设定watermark即可。 On Wed, Dec 18, 2019 at 4:12 PM 猫猫 <16770...@qq.com> wrote: > 图片不能粘贴,放到github上面了。 > https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg > > > > >

Re: jobmanager异常日志

2019-12-18 Thread Dino Zhang
这个只能说明超时,具体原因要看taskmanager日志 On Mon, Dec 16, 2019 at 2:49 PM pengchenglin wrote: > 各位: > > 大家好,在standalone的jobmanager节点的日志中看到的,这个153的taskmanager挂掉了,不知道是不是下面的报错导致的,有知道下面的Error是啥意思不 > > 2019-12-15 17:15:21.999 [flink-metrics-379] ERROR akka.remote.Remoting >

?????????????????????? ????????????Watermark??flatmap??????????????

2019-12-18 Thread ????
??github https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg ---- ??:""<16770...@qq.com; :2019??12??18??(??) 4:03 ??:"user-zh"

Re:Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-18 Thread USERNAME
@tonysong...@gmail.com 感谢回复 看了下参数的含义, taskmanager.memory.off-heap: 如果设置为true,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。 JVM堆内使用的内存是受YARN限制的,JVM堆外不受YARN限制,如果这样确实能 说通现在我的问题, 已经修改并且在测试了,非常感谢tonysong...@gmail.com

?????? ????????????Watermark??flatmap??????????????

2019-12-18 Thread ????
??kafka??event-time?? ??kafka ??flatmap??event-time

Re: flink跨集群kerberos认证问题

2019-12-18 Thread Leslie Yuen
遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka > 在 2019年12月18日,23:05,李现 写道: > > 各位好, >flink集群跨集群无法通过kerberos认证。 >集群1:flink集群,无kerberos认证 >集群2:hadoop2.6.0集群,有kerberos认证 >

Re: flink跨集群kerberos认证问题

2019-12-18 Thread 李现
这个方法我试过,如果是在应用集群的flink配置好conf文件,flinkjob初始化以及checkpoint存储需要和本地的hdfs交互,这样也是会遇到问题。就是client是security,server(本地hdfs)是simple,这样也是会抛异常。 Leslie Yuen 于2019年12月19日 周四08:35写道: > > 遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka > > > 在 2019年12月18日,23:05,李现

Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Also CC user-zh. Best, Jincheng jincheng sun 于2019年12月19日周四 上午10:20写道: > Hi folks, > > As release-1.10 is under feature-freeze(The stateless Python UDF is > already supported), it is time for us to plan the features of PyFlink for > the next release. > > To make sure the features supported in

回复: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-18 Thread Yuan,Youjun
不好意思,之前没看到这个问题。 Darwin-amd64就是mac上的可执行文件格式。信任他,直接可以执行的。 -邮件原件- 发件人: 陈帅 发送时间: Saturday, December 7, 2019 10:48 PM 收件人: user-zh@flink.apache.org 主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数? 请问这个平台生成可执行文件creek是如何实现的?对应的Dawin-amd64环境下载下来的文件是什么格式的? Yuan,Youjun 于2019年12月7日周六 下午8:32写道: >

Re: flink跨集群kerberos认证问题

2019-12-18 Thread 李现
是配置了:keytab和principal两个配置吗 Leslie Yuen 于2019年12月19日 周四08:35写道: > > 遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka > > > 在 2019年12月18日,23:05,李现 写道: > > > > 各位好, > >flink集群跨集群无法通过kerberos认证。 > >集群1:flink集群,无kerberos认证 > >