Re: Checkpoint fail due to timeout

2021-03-11 Thread Roman Khachatryan
Hello, This can be caused by several reasons such as back-pressure, large snapshots or bugs. Could you please share: - the stats of the previous (successful) checkpoints - back-pressure metrics for sources - which Flink version do you use? Regards, Roman On Thu, Mar 11, 2021 at 7:03 AM Alexey

Re: Question about Reactive mode support

2021-03-11 Thread Robert Metzger
Hey Sonam, I'm wondering whether it may be helpful to have a min and max parallelism, > and the actual parallelism be determined by the scaling policy mentioned > next? Yes, that's certainly possible. Thanks a lot for your input on the design of a scaling policy. Your input is very valuable

Re: clear() in a ProcessWindowFunction

2021-03-11 Thread Vishal Santoshi
Essentially, Does this code leak state private static class SessionIdProcessWindowFunction extends ProcessWindowFunction, KeyedSessionWithSessionID< KEY, VALUE>, KEY, TimeWindow> { private static final long serialVersionUID = 1L; private final static ValueStateDescriptor sessionId = new

clear() in a ProcessWindowFunction

2021-03-11 Thread Vishal Santoshi
Hello folks, The suggestion is to use windowState() for a key key per window state and clear the state explicitly. Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state

Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-11 Thread Sush Bankapura
Hi, We have multiple jobs that need to be deployed to a Flink cluster. Parallelism for jobs vary and dependent on the type of work being done and so are the memory requirements. All jobs currently use the same state backend. Since the workloads handled by each job is different, the scaling

Re: Flink Read S3 Intellij IDEA Error

2021-03-11 Thread sri hari kali charan Tummala
Let's close this issue guys please answer my questions. I am using Flink 1.8.1. Thanks Sri On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see > ConfigConstants.ENV_FLINK_LIB_DIR will

Re: Flink and Nomad ( from Hashicorp)

2021-03-11 Thread Vishal Santoshi
Makes total sense, Thanks, I'll check it out. On Wed, Mar 10, 2021 at 1:28 PM Till Rohrmann wrote: > Hi Vishal, > > There is no specific reason why Flink does not have a Nomad HA > implementation other than it has not been done yet. As long as Nomad > supports leader election, service recovery

Re: How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-11 Thread Lei Wang
Thanks Arvid, If too many jobs run in the same task manager JVM, will it cause too much metaspace memory occupation? Thanks, Lei On Thu, Mar 11, 2021 at 11:54 PM Arvid Heise wrote: > Hi Lei, > > each application has its own classloader as such each static constant > exists multiple times

Flink-1.11.2版本FileSystem connector问题

2021-03-11 Thread 史 正超
on /warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6 (inode 2792145632): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2

Flink-1.11.2版本 Filesystem-connector 问题

2021-03-11 Thread 史 正超
on /warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6 (inode 2792145632): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2

Flink sql中插入null值失败

2021-03-11 Thread Jimmy Zhang
Flink sql中如何插入null值,有人了解吗?目前,insert 语句values中直接写null在zeppelin上报错了。 | Best, Jimmy | Signature is customized by Netease Mail Master

Flink-1.11.2版本 Filesystem-connector 问题

2021-03-11 Thread 史 正超
on /warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6 (inode 2792145632): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2

Re: Re: flink sql如何从远程加载jar包中的udf

2021-03-11 Thread Jeff Zhang
Zeppelin 支持加载UDF jar的,可以参考下面的代码,不过架构上可能与你们的原有架构会有所差别 https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2#8iONE https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L469 chenxyz 于2021年3月12日周五 上午9:42写道: >

Re:Re: flink sql如何从远程加载jar包中的udf

2021-03-11 Thread chenxyz
目前这种方法不可行,在公司的平台化系统里提交flink任务,自己能掌控的只有代码这块。 在 2021-03-11 16:39:24,"silence" 写道: >启动时通过-C加到classpath里试试 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/

Re: How to debug checkpoint/savepoint stuck in Flink 1.12.2

2021-03-11 Thread 陳昌倬
On Thu, Mar 11, 2021 at 02:14:32PM +0100, Arvid Heise wrote: > Hi ChangZhuo, > > Did you upgrade to Flink 1.12.2 and change the settings at the time? If so, > could you maybe reset the settings to the old values on Flink 1.12.2 and > check if the job still gets stuck? Especially, turning off

Re: Evenly Spreading Out Source Tasks

2021-03-11 Thread Aeden Jameson
Hi Arvid, Thanks for responding. I did check the configuration tab of the job manager and the setting cluster.evenly-spread-out-slots: true is there. However I'm still observing unevenness in the distribution of source tasks. Perhaps this additional information could shed light. Version:

Re: User metrics outside tasks

2021-03-11 Thread Bob Tiernay
I too think this would be a useful capability for the job manager to be able to send metrics easily. Sometimes additional compute responsibilities are placed in the job manager and having a convenient way to add telemetry data into a metrics stream would be very useful. -- Sent from:

Re: Application cluster - Best Practice

2021-03-11 Thread Tamir Sagi
Hey Till, You are right. I'm new to Flink, I was looking for a Java way to deploy an application cluster. I first tried the standalone approach and changed to native (although the official documents specify that application mode is more suitable for production , they show only the CLI way).

Re: Question about Reactive mode support

2021-03-11 Thread Sonam Mandal
Hi Robert, Thanks for getting back to me. We are currently assessing Flink Standalone on Kubernetes and Native Flink on Kubernetes and haven't yet decided on which model we intend to use. We want to ensure that whichever model we choose, we'll be able to get the benefits of the new features

Questions with State Processor Api

2021-03-11 Thread Maminspapin
Hi, folk Using State Processor Api can I: 1. get full state of flink-application with rocksdb backend in cluster mode (as I realised it's checkpoins or savepoints)? 2. update it? 3. get this state from other flink-application (other jar)? 4. query it with sql (Table API & SQL) to get data I

Questions with State Processor Api

2021-03-11 Thread Maminspapin
Hi, folk Using State Processor Api can I: 1. get full state of flink-application with rocksdb backend in cluster mode (as I realised it's checkpoins or savepoints)? 2. update it? 3. get this state from other flink-application (other jar)? 4. query it with sql (Table API & SQL) to get data I

Re: Filtering lines in parquet

2021-03-11 Thread Avi Levi
Hi Arvid, assuming that I have A0,B0,C0 parquet files with different schema and a common field *ID*, I want to write them to A1,B2,C3 files respectively. My problem is that in my code I do not want to know the full schema just by filtering using the ID field and writing the unfiltered lines to the

Re: Application cluster - Best Practice

2021-03-11 Thread Till Rohrmann
What the Flink client does for you when starting an application mode cluster in native K8s mode is to generate the K8s job specification and to submit it to the K8s cluster. Hence, you can also do it yourself if you prefer to manage the K8s resources directly. Writing your own client should also

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-03-11 Thread Alexey Trenikhun
Hi Yang, Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but perhaps I hit FLINK-21028. This lead to question, if normal via API take-savepoint-and-cancel-job fails, what steps should be done outside Flink to be able to resume from savepoint with new job version? Is

[Schema Evolution] Cannot restore from savepoint after deleting field from POJO

2021-03-11 Thread Alexis Sarda-Espinosa
Hi everyone, It seems I'm having either the same problem, or a problem similar to the one mentioned here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-when-restoring-from-savepoint-with-missing-state-amp-POJO-modification-td39945.html I have a POJO class that is

flink-kinesis setParallelism 之后 shard 分布不均匀

2021-03-11 Thread mo jia
默认的 shard assigner public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = (shard, subtasks) -> shard.hashCode(); 如何shard 的数量 大于 并发度 很容易造成分布不均。 想着用这种方法,在主类使用 static ConcurrentHashMap map = new ConcurrentHashMap<>(); static AtomicInteger counter = new AtomicInteger(0); public static

Re: How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-11 Thread Arvid Heise
Hi Lei, each application has its own classloader as such each static constant exists multiple times (1 per job). So there should be no interference. You could verify it by logging the value of the constant and see it yourself. Best, Arvid On Thu, Mar 11, 2021 at 7:11 AM Lei Wang wrote: >

Re: Checkpointing Completed and then failed

2021-03-11 Thread Arvid Heise
Hi Abdullah, without specific logs, it's hard to diagnose what went wrong. Could you check in your taskmanager logs if any error occurred and add it? In Flink UI, you can also browse the latest exceptions and look at the checkpoint history. That may give you (and us) additional insights. On Thu,

Re: Problem when restoring from savepoint with missing state & POJO modification

2021-03-11 Thread Arvid Heise
Hi Alexis, could you open a new thread and post your exception? It sounds as if it should work, but it's not for some reason. Did you double check that the PojoSerializer is used? On Wed, Mar 10, 2021 at 10:27 PM sardaesp < alexis.sarda-espin...@microfocus.com> wrote: > I'm having the same

Re: mixing java libraries between 1.12.x and 1.11.x

2021-03-11 Thread Arvid Heise
Hi Jin, as Till already answered on the ticket: in general, there is no guarantee that stuff works in between different versions. Everything that builds on public APIs is guaranteed to be forward compatible. However, in this case, you want things to be backward-compatible, which is impossible to

Re: User metrics outside tasks

2021-03-11 Thread Alexey Trenikhun
Couple of use cases, I have metric representing job version, currently it bound for a task, but I want bound it to job manager, another example I have dump to on OOM exception configured, and on start, I want to check content of directory with dumps and if not empty increase

Re: Is there any complete code available for checkpointing

2021-03-11 Thread Abdullah bin Omar
Hi Arvid, Thank you for your reply. I am using to get input by using, DataStream data = env.socketTextStream("localhost", 9090); It shows a error: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed Is there any particular rule to get

Re: Flink application has slightly data loss using Processing Time

2021-03-11 Thread Rainie Li
Thanks for the suggestion, Arvid. Currently my job is using producer.kafka.request.timeout.ms=9 I will try to increase to 12. Best regards Rainie On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise wrote: > Hi Rainie, > > This looks like the record batching in Kafka producer timed out. At this

回复: Flink savepoint迁移问题

2021-03-11 Thread allanqinjy
建云, 之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s 启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。 | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制 在2021年03月11日 22:43,Kezhu Wang 写道: 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。 确实是这样的,checkpoint 把

Re: Application cluster - Best Practice

2021-03-11 Thread Tamir Sagi
Hey Till, Thank you for responding. I've already read the link you send , but they are not enough , they don't provide a good solution for production. Standalone-Kubernetes is not a good approach for production for 3 main reasons(In my opinion): * TMs are defined as deployment which

Re: Flink savepoint迁移问题

2021-03-11 Thread Kezhu Wang
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。 确实是这样的,checkpoint 把 serializer 也 snapshot 了。 重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 `MessageId.toByteArray`。 On March 11, 2021 at 20:26:15, 赵 建云

Re: KeyedProcessFunction

2021-03-11 Thread Maminspapin
I missed in documentation: A KeyedProcessFunction is always a RichFunction. Therefore, access to the RuntimeContext is always available and setup and teardown methods can be implemented. See RichFunction.open(org.apache.flink.configuration.Configuration) and RichFunction.close().

Re: Is there any complete code available for checkpointing

2021-03-11 Thread Arvid Heise
Hi Abdullah, You don't need to implement checkpointed functions for checkpointing to work - but you may lose state if you manage it manually. If you have enabled checkpointing, you should see it with any application that is running. Make sure that the checkpointing interval is small enough so

Re: Flink Non-Heap Memory Configuration

2021-03-11 Thread Arvid Heise
Hi Jan, 10 MB sounds very tight. How much memory are you giving your JVM? Are you loading big data structures in your user-defined functions? You can read about Flink's memory here [1]. You may need to lower memory fractions or set .max if you provide only little RAM to your JVM. I'm also

Re: Gradually increasing checkpoint size

2021-03-11 Thread Dawid Wysakowicz
Hey Dan, I think the logic should be correct. Mind that in the processElement we are using *relative*Upper/LowerBound, which are inverted global bound: relativeUpperBound = upperBound for left and -lowerBound for right relativeLowerBound = lowerBound for left and -upperBound for right

Re: Question about Reactive mode support

2021-03-11 Thread Robert Metzger
Hey Sonam, I'm very happy to hear that you are interested in reactive mode. Your understanding of the limitations for 1.13 is correct. Note that you can deploy standalone Flink on Kubernetes [1]. I'm actually currently preparing a demo for this [2]. We are certainly aware that support for active

Re: Evenly Spreading Out Source Tasks

2021-03-11 Thread Arvid Heise
Hi Aeden, the option that you mentioned should have actually caused your desired behavior. Can you double-check that it's set for the job (you can look at the config in the Flink UI to be 100% sure). Another option is to simply give all task managers 2 slots. In that way, the scheduler can only

Re: User metrics outside tasks

2021-03-11 Thread Arvid Heise
Hi Alexey, could you describe what you want to achieve? Most metrics are bound to a specific task (available in RuntimeContext). You can also access them in custom operators and state backends. Then you have some metrics bound to taskmanager and even java processes, but I don't see an easy way

Re: How to debug checkpoint/savepoint stuck in Flink 1.12.2

2021-03-11 Thread Arvid Heise
Hi ChangZhuo, Did you upgrade to Flink 1.12.2 and change the settings at the time? If so, could you maybe reset the settings to the old values on Flink 1.12.2 and check if the job still gets stuck? Especially, turning off unaligned checkpoints (UC) should clarify if it's a general issue in Flink

KeyedProcessFunction

2021-03-11 Thread Maminspapin
Hello, I'm learning State Processor API: https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html There is example in this page with StatefulFunctionWithTime extends KeyedProcessFunction. And here we can see method open() we need implement to initialize state. But

Re: Future of QueryableState

2021-03-11 Thread Arvid Heise
Hi Maciek, Thanks for reaching out. Only through these interactions, we know how important certain features are to users. Queryable State has some limitations and makes the whole system rather fragile. Most users that try it out are disappointed that there is actually no SQL support. If we could

Re: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.MultithreadEventLoopGroup

2021-03-11 Thread Arvid Heise
Hi Hemant, Yes, this looks like an issue with different library versions. You probably have 3 solutions: * use the netty version of Flink * shade your netty into your jar with relocations * ditch jasync and just use jdbc with a custom thread pool (little overhead) On Wed, Mar 10, 2021 at 2:40 PM

Re: Filtering lines in parquet

2021-03-11 Thread Arvid Heise
Hi Avi, I'm not entirely sure I understand the question. Let's say you have source A, B, C all with different schema but all have an id. You could use the ParquetMapInputFormat that provides a map of the records and just use a map-lookup. However, I'm not sure how you want to write these records

Re: Flink savepoint迁移问题

2021-03-11 Thread 赵 建云
你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +

Re: Flink application has slightly data loss using Processing Time

2021-03-11 Thread Arvid Heise
Hi Rainie, This looks like the record batching in Kafka producer timed out. At this point, the respective records are lost forever. You probably want to tweak your Kafka settings [1]. Usually, Flink should fail and restart at this point and recover without data loss. However, if the transactions

Re: How do I call an algorithm written in C++ in Flink?

2021-03-11 Thread Arvid Heise
Hi Suxi, to expand a bit on the answer of Yun: it depends on which kind of algorithm do you have. If you want to apply your C++ function to each record, then you can go Yun's way and use a RichMapFunction to load your library and invoke it for each record in map. If you need more records, then

Re: 将每个tm的slot数从2降低到1,任务反而无法启动

2021-03-11 Thread Smile
建议看下集群剩余的内存情况,看是不是 140 个 TAskManager 内存不够了 -- Sent from: http://apache-flink.147419.n8.nabble.com/

uniqueness of name when constructing a StateDescriptor

2021-03-11 Thread Colletta, Edward
The documentation for ValueStateDescriptor documents the name parameter as - "name - The (unique) name for the state." What is the scope of the uniqueness? Unique within an RichFunction instance? Unique withing job? Unique within a session cluster? I ask because I have several jobs that use a

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-03-11 Thread Yang Wang
Hi Alexey, >From your attached logs, it seems that the leader related config map is reused. Then the Flink application is recovered instead of submitting a new one. This is the root cause it is trying to recover from a wrong savepoint which is specified in your last submission. > So how to fix

退订

2021-03-11 Thread Lyon
退订 -- 发自我的网易邮箱手机智能版

Re: Error Starting PyFlink in Kubernetes Session Cluster "Could Not Get Rest Endpoint"

2021-03-11 Thread Yang Wang
What I mean is you could create a K8s deployment using the Flink image just like following. After then, you could use "kubectl exec -it {pod_name} bash" to tunnel in and submit the Flink python application to the existing session cluster. apiVersion: apps/v1 kind: Deployment metadata: name:

退订

2021-03-11 Thread hyangvv
退订

Re:Re: Trigger and completed Checkpointing do not appeared

2021-03-11 Thread Smile@LETTers
Hi, In short, [1] means whether the job will trigger checkpoints, and [2] means which operators will take action when checkpoints are triggered. If use ExampleCountSource, flink-streaming-java should be a dependency in pom.xml and classes such as ListState, ListStateDescriptor,

Jobmanager time out / long running batch job

2021-03-11 Thread Jan Oelschlegel
Hi, Im using Flink 1.11.3 and run a batch job. In the log of the jobmanager I see that all operators switched from running to finished. And then there is a timeout of the jobmanager. And after some pause the overall status is switched from running to finished. Why is there a big gap in

Re: Upgrade calcite version

2021-03-11 Thread Jingsong Li
Hi, Yes, as Danny said, it is very hard work... A suggestion is that you can cherry-pick some bugfixs from the new Calcite version to your own internal Calcite branch, if you just want to fix some bugs. Best, Jingsong On Thu, Mar 11, 2021 at 2:28 PM Danny Chan wrote: > Hi Sheng ~ > > It is a

Re: 提交两个SQL任务,其中一个不生效。

2021-03-11 Thread HunterXHunter
StatementSet inserts = tableEnv.createStatementSet(); inserts.addInsertSql("insert into xxx select * from xxx") // topic1 -》topic2任务 inserts.addInsertSql("insert into xxx select * from xxx") // topic2 -》Postgre 任务 inserts.execute(); -- Sent from: http://apache-flink.147419.n8.nabble.com/

回复:flink sql如何从远程加载jar包中的udf

2021-03-11 Thread 邓从宝
请user-zh 不要再发邮件了 -- 发件人:silence 发送时间:2021年3月11日(星期四) 16:39 收件人:user-zh 主 题:Re: flink sql如何从远程加载jar包中的udf 启动时通过-C加到classpath里试试 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink 在不同的 operator 之间传递通过基类方式,在 operator 中能转换为子类型吗?

2021-03-11 Thread Lei Wang
flink 变量在 operator 之间传递是需要序列话的。如果 DataStream<> 泛型通过基类引用,到后面的 operator 上能保留子类的完整信息并强制转换吗? 比如: DataStream stream = source.from(SubClass); stream.keyBy( ) { 这里的代码能判断并强制转换吗。 SubClass subObj = (SubClass) baseObj; } 谢谢, 王磊

Re: flink sql如何从远程加载jar包中的udf

2021-03-11 Thread silence
启动时通过-C加到classpath里试试 -- Sent from: http://apache-flink.147419.n8.nabble.com/

疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大

2021-03-11 Thread HunterXHunter
1:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大 我10分钟一次checkpoint,每次都增大2M,两天增大到400M,但其实我的实际应该只有20M(只做一个窗口计算)(我做savepoint之后也才20M)。 已设置了 ttl。 2:当我关闭state.backend.incremental 后 。每次checkpoint也就20M左右,不会变大了。 按我的理解:state.backend.incremental 开启后,Checkpointed Data