Re: 关于非keyedstream使用定时器问题

2021-02-17 Thread yidan zhao
不行,那不就导致不均衡了。数据得均衡。 1均衡 2 batch 3 timeout 目前这3者依靠flink现有机制比较难实现,当然并不是所以场景都需要这样,比如mysql还需要考虑死锁问题,但对于部分不需要考虑锁的sink,其实不在意相同key是否分发到一起,所以不需要依靠keyedStream,这样能保证1,2,但无法保证3。 使用keyedStream使用随机key(很随机),会保证1,3,但无法有2(因为key太随机,每个key下数据太少)。 使用keyedStream使用随机key %

Re: Sharding of Operators

2021-02-17 Thread yidan zhao
Actually, we only need to ensure all records belonging to the same key will be forwarded to the same operator instance(i), and we do not need to guarantee that 'i' is the same with the 'i' in previous savepoints. When the job is restarted, the rule 'same key's record will be in together' is

?????? fink on yarn per job container ????

2021-02-17 Thread zhiyezou
Hi 1. rocksdb 2. state.backend.rocksdb.localdir??noneflinkNodeManager??LOCAL_DIRSflinkio.tmp.dirs

Re: FlinkKafka Consumer can't dynamic discover the partition update

2021-02-17 Thread 张云云
感谢回复! 我是这样配置的 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ConfigureManager.getString(CmConfigConstants.SOURCE_KAFKA_SERVERS, null, kafkaName)); properties.setProperty("group.id", ConfigureManager.getString(CmConfigConstants.SOURCE_KAFKA_GROUPID,

Re: blob server相关,文件找不到

2021-02-17 Thread Alex_gao
权限问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Performance issues when RocksDB block cache is full

2021-02-17 Thread Yun Tang
Hi Yaroslav, Unfortunately, RocksDB does not have such TTL block cache, and if you really only have very few active keys, current LRU implementation should work well as only useful latest entries are inserted into cache. What kind of behavior when cache reached the maximum? Have you ever

Re: Flink实时统计 结果波动时大时小

2021-02-17 Thread Robin Zhang
Hi,flink2021 首先看看业务场景,是否存在订单数据减少的情况,如果没有,就是逻辑或者代码有问题 Best, Robin flink2021 wrote > 我的数据源是kafka > 统计订单数据结果写入到mysql,发现在数据有积压的过程中,统计结果会忽大忽小?有人遇到过相关的问题没有呢?需要调整那些设置呢?(数据链路又点复杂,state > 使用rockdb报错,没有设置过期时间) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Sent from:

Re: FlinkKafka Consumer can't dynamic discover the partition update

2021-02-17 Thread Robin Zhang
Hi,张云云 1. flink.partition-discovery.interval-millis 是kafka的一个配置参数,不知道你是不是通过kafkaProp设置的 2. 通过shell查看topic分区是否顺利增加,并且有数据写入。 Best, Robin 张云云 wrote > When start the job, occurs WARN log like below: > > WARN org.apache.kafka.clients.consumer.ConsumerConfig - The > configuration

Best practices around checkpoint intervals and sizes?

2021-02-17 Thread Dan Hill
Hi. I'm playing around with optimizing our checkpoint intervals and sizes. Are there any best practices around this? I have a ~7 sequential joins and a few sinks. I'm curious what would result in the better throughput and latency trade offs. I'd assume less frequent checkpointing would

flink cdc 遇到Heartbeat of TaskManager with id timed out

2021-02-17 Thread william
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id eaffacbed6a9d6025a362df2738d5299 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at

flink cdc 遇到akka RemoteRpcInvocation 问题

2021-02-17 Thread william
报错日志: 2021-02-16 11:43:49,351 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink@xx:45578] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@xx:45578]] Caused by:

Re: Adding proctime columng to table api

2021-02-17 Thread Rex Fenley
Following from that, I'm not really sure why I need to provide a proctime timestamp. There should never be any late data with proctime, when a record arrives it should just be put into whatever the current window is. So why is there any requirement to specify a time column in this case? Thanks!

Re: blob server相关,文件找不到

2021-02-17 Thread Alex_gao
遇到了相同的问题,mark -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Adding proctime columng to table api

2021-02-17 Thread Rex Fenley
Also, as an example, I've tried table.window(Tumble over 1.seconds on proctime() as $"w")... and it failed. On Wed, Feb 17, 2021 at 9:30 PM Rex Fenley wrote: > Hi, > > When using streaming api, if I want a tumbling window on proctime all I > have to do is the following: >

Adding proctime columng to table api

2021-02-17 Thread Rex Fenley
Hi, When using streaming api, if I want a tumbling window on proctime all I have to do is the following: table.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))... I don't even need to explicitly create a proctime column. However, adding an intermediate tumbling window on proctime using

Re: Using INTERVAL parameters for UDTF

2021-02-17 Thread Patrick Angeles
NVM. Found the actual source on Calcite trunk. Looks like interval type (and a few others) are not yet supported. https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeUtil.java On Wed, Feb 17, 2021 at 8:11 PM Patrick Angeles wrote: > For some

FlinkKafka Consumer can't dynamic discover the partition update

2021-02-17 Thread 张云云
When start the job, occurs WARN log like below: WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config. And I try to change the kafka partion with command, partition number from 3 to 4

Re: Native K8S HA Session Cluster Issue 1.12.1

2021-02-17 Thread Yang Wang
I second till's suggestion. You could also build your own flink-kubernetes jar from source code of branch 1.12. After that, bundle the flink-kubernetes jar to the image under /opt/flink/lib directory. And push to docker repository. Some users come into the same issues with you and have verified

Re: Flink docker in session cluster mode - is a local distribution needed?

2021-02-17 Thread Yang Wang
I am not aware of some simple solution we could use the Flink runtime jars within the docker images, except for "docker run/exec". So if we want to provide some easy commands to submit Flink jobs, I think they are also a wrapper of "docker run/exec". Best, Yang Till Rohrmann 于2021年2月17日周三

Re: flink k8s高可用如何使用oss作为high-availability.storageDir?

2021-02-17 Thread Yang Wang
使用社区官方镜像flink:1.12.1,你需要配置如下参数 最后两个参数是通过环境变量的方式来enable oss的plugin high-availability.storageDir: oss://flink/flink-ha fs.oss.endpoint: fs.oss.accessKeyId: fs.oss.accessKeySecret: containerized.master.env.ENABLE_BUILT_IN_PLUGINS: flink-oss-fs-hadoop-1.12.1.jar

Re: Understanding Job Manager Web UI in HA Mode

2021-02-17 Thread Yang Wang
I think you could also configure the same Persistent Volume for all the JobManagers and mount it to /path/of/job-jars in Pod. After that, set the config option "web.upload.dir: /path/of/job-jars". This will make the web submission works for multiple JobManagers. Best, Yang Till Rohrmann

Flink实时统计 结果波动时大时小

2021-02-17 Thread flink2021
我的数据源是kafka 统计订单数据结果写入到mysql,发现在数据有积压的过程中,统计结果会忽大忽小?有人遇到过相关的问题没有呢?需要调整那些设置呢?(数据链路又点复杂,state 使用rockdb报错,没有设置过期时间) -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink实时统计 结果波动时大时小

2021-02-17 Thread flink2021
我用Zepplin 上面提交作业统计我们的订单数 结果放入mysql,数据源为kafka ,发现在kafka中数据有积压时,然后去查询结果表的数据,发现有时候时大时小,(连续的几个查询)有时是100w,下一秒可能就是50w,这里是什么原因呢?PS.flink计算的逻辑有点复杂 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Using INTERVAL parameters for UDTF

2021-02-17 Thread Patrick Angeles
For some reason I can't get view the source so I don't have exact line numbers, but IntelliJ was kind enough to decompile this part (SqlTypeUtil.class) for me. This appears to be the exception I'm hitting. if (!isAtomic(type) && !isNull(type)) { > if (isCollection(type)) { >

Using INTERVAL parameters for UDTF

2021-02-17 Thread Patrick Angeles
Wondering if anyone has seen this before, and has any suggestions. I have a UDTF with the following signature: public void eval(LocalDateTime startTime, LocalDateTime endTime, Duration > step) { According to the docs, this should be mapped from the following SQL snippet: ... LATERAL TABLE

Tag flink metrics to job name

2021-02-17 Thread bat man
Hello there, I am using prometheus to push metrics to prometheus and then use grafana for visualization. There are metrics like - flink_taskmanager_Status_JVM_CPU_Load, flink_taskmanager_Status_JVM_CPU_Load, flink_taskmanager_Status_JVM_CPU_Time etc which do not gives job_name. It is tied to an

Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-17 Thread Jan Oelschlegel
Hi, i have a question regarding FlinkSQL connector for Kafka. I have 3 Kafka partitions and 1 Kafka SQL source connector (Parallelism 1). The data within the Kafka parttitons are sorted based on a event-time field, which is also my event-time in Flink. My Watermark is generated with a delay of

Which type serializer is being used?

2021-02-17 Thread Sudharsan R
Hi, I would like to find out what types are being serialized with which serializer. Is there an easy way to get this information? We have the following situation: We have two types T1 and T2. The input to a window process function is a Either. Both T1 and T2 themselves are POJOs. We added a field

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Salva Alcántara
Good to know Kezhu, many thanks again! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to report metric based on keyed state piece

2021-02-17 Thread Salva Alcántara
Awesome Piotr! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to report metric based on keyed state piece

2021-02-17 Thread Salva Alcántara
Many thanks Kezhu for pointing me on that direction! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Piotr Nowojski
> Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`, `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES` > should achieve the goal and not interfering with checkpoint, but the control side must not be bounded before FLIP-147 delivered. H, but I think in

Sharding of Operators

2021-02-17 Thread Tripathi,Vikash
Hi there, I wanted to know how re-partitioning of keys per operator instance would happen when the current operator instances are scaled up or down and we are restarting our job from a previous savepoint which had a different number of parallel instances of the same operator. My main concern

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Kezhu Wang
Piotr is right. So just ignore my words. It is the price of going deep down the rabbit hole:-). Best, Kezhu Wang On February 17, 2021 at 23:48:30, Piotr Nowojski (pnowoj...@apache.org) wrote: Note^2: InputSelectable is `@PublicEvolving` API, so it can be used. However as Timo pointed out, it

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Piotr Nowojski
Note^2: InputSelectable is `@PublicEvolving` API, so it can be used. However as Timo pointed out, it would block the checkpointing. If I remember correctly there is a checkState that will not allow to use `InputSelectable` with enabled checkpointing. Piotrek śr., 17 lut 2021 o 16:46 Kezhu Wang

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Kezhu Wang
Hi all, Thanks Arvid and Timo for more candidates. I also think “buffering until control side ready” should be more canonical in current stage of Flink. Timo has created FLINK-21392 for exposing user friendly data stream api to block one input temporarily. If one really want go deep down the

Re: How to report metric based on keyed state piece

2021-02-17 Thread Piotr Nowojski
Hi Salva, I'm not sure, but I think you can not access the state (especially the keyed state) from within the metric, as metrics are being evaluated outside of the keyed context, and also from another thread. Also things like `ValueState`/`MapState` are not exposing any size. So probably you

Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Till Rohrmann
Great to hear that it is now working and thanks for letting the community know :-) On Wed, Feb 17, 2021 at 2:48 PM Clay Teeter wrote: > Yep, that was it! thanks! And to complete the thread, this is the working > revision. > > package com.maalka.flink.sinks > > import

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Arvid Heise
Note that the question is also posted on SO [1]. [1] https://stackoverflow.com/questions/66236004/connectedstreams-paused-until-control-stream-ready/ On Wed, Feb 17, 2021 at 3:31 PM Timo Walther wrote: > Hi Kezhu, > > `InputSelectable` is currently not exposed in the DataStream API because >

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Timo Walther
Hi Kezhu, `InputSelectable` is currently not exposed in the DataStream API because it might have side effects that need to be considered (e.g. are checkpoints still go through?). In any case, we don't have a good story for blocking a control stream yet. The best option is to buffer the other

Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Clay Teeter
Yep, that was it! thanks! And to complete the thread, this is the working revision. package com.maalka.flink.sinks import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate} import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.functions.RuntimeContext import

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Kezhu Wang
A combination of `BoundedMultiInput` and `InputSelectable` could help. You could see `org.apache.flink.table.runtime.operators.join.HashJoinOperator` for an usage example. The control topic have not to be bounded. There are maybe other approaches from later responses. I could not tell whether it

Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Till Rohrmann
I am not 100% sure but maybe (_, _) => {} captures a reference to object TestSink which is not serializable. Maybe try to simply define a no op JdbcStatementBuilder and pass such an instance to JdbcSink.sink(). Cheers, Till On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter wrote: > Ok, this is about

Re: How to report metric based on keyed state piece

2021-02-17 Thread Kezhu Wang
With an initial `y`, I think you could compute new `y` on new stream value. Upon recovering from checkpoint, may be `KeyedStateBackend.applyToAllKeys` could help you to rebuild an initial `y`. Best, Kezhu Wang On February 17, 2021 at 13:09:39, Salva Alcántara (salcantara...@gmail.com) wrote: I

Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Clay Teeter
Ok, this is about as simple as I can get. package com.maalka.flink.sinks import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate} import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.functions.RuntimeContext import

Re: Flink docker in session cluster mode - is a local distribution needed?

2021-02-17 Thread Till Rohrmann
Yes, agreed. This could be better streamlined. If you wanna help with this, then feel free to open a JIRA issue for it. Cheers, Till On Wed, Feb 17, 2021 at 11:37 AM Manas Kale wrote: > Hi Till, > Oh I see... I managed to do what you said using a bunch of docker exec > commands. However, I

Re: Flink docker in session cluster mode - is a local distribution needed?

2021-02-17 Thread Manas Kale
Hi Till, Oh I see... I managed to do what you said using a bunch of docker exec commands. However, I think this solution is quite hacky and could be improved by providing some simple command to submit jobs using the Flink runtime within the docker images. I believe this will achieve full

flink k8s高可用如何使用oss作为high-availability.storageDir?

2021-02-17 Thread casel.chen
如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。

Re: Joining and windowing multiple streams using DataStream API or Table API & SQL

2021-02-17 Thread Till Rohrmann
Hi Pieter, from the top of my head, I think the easiest way to solve this problem is to implement your own "window join" operation by first unioning all three streams and then applying a ProcessWindowFunction similar to allEvents.keyBy((KeySelector) value ->

Re: 回复: DataStream problem

2021-02-17 Thread Dawid Wysakowicz
I am sure you can achieve that with a ProcessFunction[1] Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html#process-function On 16/02/2021 07:28, ?g???U?[ wrote: > Hi Dawid > > ?0?2 ?0?2 For example, if user 001 takes an

Re: Configure classes

2021-02-17 Thread Till Rohrmann
Hi Abhinav, out of the box Flink does not support what you are asking for. If you want to minimize the amount of Flink code to write, then I would recommend looking at Flink's SQL API [1]. For more advanced injection logic I think you have to write a bit of tooling on your own. [1]

Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Till Rohrmann
Hi Clay, could you maybe share the source code of com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this sink uses a lambda which is not serializable. Maybe it holds a reference to some non Serializable class as part of its closure. Cheers, Till On Tue, Feb 16, 2021 at 8:58 PM