Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Chesnay Schepler
This is incorrect; we do store the JobGraph in ZooKeeper. If you just delete the deployment the cluster will recover the previous JobGraph (assuming you aren't changing the Zookeeper configuration). If you wish to update the job, then you should cancel it (along with creating a savepoint),

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Till Rohrmann
Here is a link to information on how to integrate Flink with Hadoop [1]. In the latest version you only need to point Flink to the Hadoop libraries via setting the HADOOP_CLASSPATH environment variable. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html

Re: Decompose failure recovery time

2020-08-20 Thread Zhinan Cheng
Hi Piotr, Thanks a lot for your help. Yes, I finally realize that I can only approximate the time for [1] and [3] and measure [2] by monitoring the uptime and downtime metric provided by Flink. And now my problem is that I found the time in [2] can be up to 40s, I wonder why it takes so long to

Re: No space left on device exception

2020-08-20 Thread Piotr Nowojski
Hi, As far as I know when uploading a file to S3, the writer needs to first create some temporary files on the local disks. I would suggest to double check all of the partitions on the local machine and monitor available disk space continuously while the job is running. If you are just checking

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-20 Thread Kostas Kloudas
Hi all, Thanks for the comments! @Dawid: "execution.mode" can be a nice alternative and from a quick look it is not used currently by any configuration option. I will update the FLIP accordingly. @David: Given that having the option to allow timers to fire at the end of the job is already in

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Piotr Nowojski
Hi, It looks more like a dependency convergence issue - you have two conflicting versions of `org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest` on the class path. Or you built your jar with one version and trying to execute it with a different one. Till is it some kind of a known

Re: Watermark generation issues with File sources in Flink 1.11.1

2020-08-20 Thread Arti Pande
Hi Till, Thank you for your quick response. Both the AssignerWithPeriodicWatermarks and WatermarkStrategy I am using are very simple ones. *Code for AssignerWithPeriodicWatermarks:* public class CustomEventTimeWatermarkGenerator implements AssignerWithPeriodicWatermarks { private final

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Till Rohrmann
I agree with Piotr's analysis. It should not matter whether you are using RocksDBStateBackend or not. It seems as if you have a Hadoop dependency clash. Could you check which dependencies are on the class path? Cheers, Till On Thu, Aug 20, 2020 at 3:52 PM Piotr Nowojski wrote: > Hi, > > It

Re: Decompose failure recovery time

2020-08-20 Thread Piotr Nowojski
Hi Zhinan, It's hard to say, but my guess it takes that long for the tasks to respond to cancellation which consists of a couple of steps. If a task is currently busy processing something, it has to respond to interruption (`java.lang.Thread#interrupt`). If it takes 30 seconds for a task to react

Re: Flink checkpoint recovery time

2020-08-20 Thread Zhinan Cheng
Hi Till, Thanks for the quick reply. Yes, the job actually restarts twice, the metric fullRestarts also indicates this, its value is 2. But the job indeed takes around 30s to switch from CANCELLING to RESTARTING in its first restart. I just wonder why it takes so long here? Also, even I set the

How Flink distinguishes between late and in-time events?

2020-08-20 Thread Ori Popowski
In the documentation it states that: *[…], Flink keeps the state of windows until their allowed lateness expires. Once this happens, Flink removes the window and deletes its state, as also

Re: Flink checkpoint recovery time

2020-08-20 Thread Till Rohrmann
Hi Zhinan, the logs show that the cancellation does not take 30s. What happens is that the job gets restarted a couple of times. The problem seems to be that one TaskManager died permanently but it takes the heartbeat timeout (default 50s) until it is detected as dead. In the meantime the system

Re: ERROR : RocksDBStateBackend

2020-08-20 Thread Vijayendra Yadav
Hi Till/ Piotr, *My process was working with : FsStateBackend but when I switched to RocksDBStateBackend I faced this problem. My class path is below. * *Related jar in classpath: * /usr/lib/hadoop-yarn/hadoop-yarn-api-2.8.5-amzn-6.jar:/usr/lib/hadoop-yarn/hadoop-yarn-api.jar: *Classpath:*

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Hi Piotr, Thank you for your suggestion. I will try that, are the temporary files created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ? Would these files be the same size as checkpoints ? Thanks, Vishwas On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski wrote: > Hi, > > As far

Re: How Flink distinguishes between late and in-time events?

2020-08-20 Thread Piotr Nowojski
Hi Ori, No. Flink does it differently. Operators that are keeping track of late events, are remembering the latest watermark. If a new element arrives with even time lower compared to the latest watermark, it is marked as a late event [1] Piotrek [1]

Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Piotr Nowojski
Hi Alexey, I might be wrong (I don't know this side of Flink very well), but as far as I know JobGraph is never stored in the ZK. It's always recreated from the job's JAR. So you should be able to upgrade the job by replacing the JAR with a newer version, as long as the operator UIDs are the same

Re: Stream job with Event Timestamp (Backfill getting incorrect results)

2020-08-20 Thread Piotr Nowojski
Hi, It's hard for me to help you debug your code, but as long as: - you are using event time for processing records (in operators like `WindowOperator`) - you do not have late records - you are replaying the same records - your code is deterministic - you do not rely on the order of the records

Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Piotr Nowojski
Thank you for the clarification Chesney and sorry for the incorrect previous answer. Piotrek czw., 20 sie 2020 o 15:59 Chesnay Schepler napisał(a): > This is incorrect; we do store the JobGraph in ZooKeeper. If you just > delete the deployment the cluster will recover the previous JobGraph >

Re: Decompose failure recovery time

2020-08-20 Thread Zhinan Cheng
Hi Piotr, Thanks a lot. I will try your suggestion to see what happen. Regards, Zhinan On Fri, 21 Aug 2020 at 00:40, Piotr Nowojski wrote: > > Hi Zhinan, > > It's hard to say, but my guess it takes that long for the tasks to respond to > cancellation which consists of a couple of steps. If a

Re: No space left on device exception

2020-08-20 Thread Chesnay Schepler
That should work as well. On 20/08/2020 22:46, Vishwas Siravara wrote: Thank you Chesnay. Yes but I could change the staging directory by adding -Djava.io.tmpdir=/data/flink-1.7.2/tmp to /env.java.opts /in the flink-conf.yaml file. Do you see any problem with that? Best, Vishwas On Thu,

Re: No space left on device exception

2020-08-20 Thread Chesnay Schepler
Could you try adding this to your flink-conf.yaml? s3.staging-directory:/usr/mware/flink/tmp On 20/08/2020 20:50, Vishwas Siravara wrote: Hi Piotr, I did some analysis and realised that the temp files for s3 checkpoints are staged in /tmp although the /io.tmp.dirs /is set to a different 

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Thank you Chesnay. Yes but I could change the staging directory by adding -Djava.io.tmpdir=/data/flink-1.7.2/tmp to *env.java.opts *in the flink-conf.yaml file. Do you see any problem with that? Best, Vishwas On Thu, Aug 20, 2020 at 2:01 PM Chesnay Schepler wrote: > Could you try adding this

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Hi Piotr, I did some analysis and realised that the temp files for s3 checkpoints are staged in /tmp although the *io.tmp.dirs *is set to a different directory. ls -lrth drwxr-xr-x. 2 was was 32 Aug 20 17:52 hsperfdata_was -rw---. 1 was was 505M Aug 20 18:45

Debezium Flink EMR

2020-08-20 Thread Rex Fenley
Hi, I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html

Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Yuval Itzchakov
Hi Till, KafkaSerializationSchema is only pluggable for the DataStream API, not for the Table API. KafkaTableSink hard codes a KeyedSerializationSchema that uses a null key, and this behavior can't be overridden. I have to say I was quite surprised by this behavior, as publishing events to Kafka

Same kafka partition being consumed by multiple task managers.

2020-08-20 Thread Deshpande, Omkar
Hello, I am running a streaming Beam app with the Flink runner(java). * Beam 2.19 * Flink 1.9 Checkpoints and savepoints are configured to go to s3 and HA is enabled using Zookeeper. I was running the app with 3 task managers. I took a savepoint and started the app with 6 task

Re: How to access state in TimestampAssigner in Flink 1.11?

2020-08-20 Thread Till Rohrmann
Hi Theo, thanks for reaching out to the community. I am pulling in Aljoscha and Klou who have worked on the new WatermarkStrategy/WatermarkGenerator abstraction and might be able to help you with your problem. At the moment, it looks to me that there is no way to combine state with the new

Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Till Rohrmann
Hi Yuval, it looks as if the KafkaTableSink only supports writing out rows without a key. Pulling in Timo for verification. If you want to use a Kafka producer which writes the records out with a key, then please take a look at KafkaSerializationSchema. It supports this functionality. Cheers,

Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Dawid Wysakowicz
Hi Yuval, Unfortunately setting the key or timestamp (or other metadata) from the SQL API is not supported yet. There is an ongoing discussion to support it[1]. Right now your option would be to change the code of KafkaTableSink and write your own version of KafkaSerializationSchema as Till

Re: Table API Kafka Connector Sink Key Definition

2020-08-20 Thread Till Rohrmann
This is indeed not optimal. Could you file a JIRA issue to add this functionality? Thanks a lot Yuval. Cheers, Till On Thu, Aug 20, 2020 at 9:47 AM Yuval Itzchakov wrote: > Hi Till, > KafkaSerializationSchema is only pluggable for the DataStream API, not for > the Table API. KafkaTableSink

flink1.10中hive module 没有plus,greaterThan等函数

2020-08-20 Thread faaron zheng
Hi all, 我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module 中的build-in function比如plus,greaterThan。这会导致同样的sql core module可以执行成功,hive module却会报错,比如在使用row_number() over()时候。这是什么原因?

Monitor the usage of keyed state

2020-08-20 Thread Mu Kong
Hi community, I have a Flink job running with RichMapFunction that uses keyed state. Although the TTL is enabled, I wonder if there is a way that I can monitor the memory usage of the keyed state. I'm using RocksDB as the state backend. Best regards, Mu

Flink checkpointing with Azure block storage

2020-08-20 Thread Boris Lublinsky
Is there somewhere a complete configuration example for such option?

Re: Flink checkpointing with Azure block storage

2020-08-20 Thread Boris Lublinsky
To test it, I created flink-conf.yaml file and put it in resource directory of my project The file contains the following: #== # Fault tolerance and checkpointing

JSON to Parquet

2020-08-20 Thread Averell
Hello, I have a stream with each message is a JSON string with a quite complex schema (multiple fields, multiple nested layers), and I need to write that into parquet files after some slight modifications/enrichment. I wonder what options are available for me to do that. I'm thinking of JSON ->

Re: Flink checkpointing with Azure block storage

2020-08-20 Thread Yun Tang
Hi Boris I think the official guide [1] should be enough to tell you how to configure. However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using

Stream job with Event Timestamp (Backfill getting incorrect results)

2020-08-20 Thread aj
I have a streaming job where I am doing window operation on *"user_id" *and then doing some summarization based on some time bases logic like : 1. end the session based on 30 mins inactivity of the user. 2. The End_trip event or cancellation event has arrived for the user. I am trying to rerun

Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-20 Thread Stephan Ewen
We have removed some public methods in the past, after a careful deprecation period, if they were not well working any more. The sentiment I got from users is that careful cleanup is in fact appreciated, otherwise things get confusing over time (the deprecated methods cause noise in the API).

Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-20 Thread Dawid Wysakowicz
Hey Till, You've got a good point here. Removing some of the methods would cause breaking the stability guarantees. I do understand if we decide not to remove them for that reason, let me explain though why I am thinking it might make sense to remove them already. First of all I am a bit afraid

Re: Watermark generation issues with File sources in Flink 1.11.1

2020-08-20 Thread Till Rohrmann
Hi Arti, thanks for sharing this feedback with us. The WatermarkStrategy has been introduced quite recently and might have some rough edges. I am pulling in Aljoscha and Klou who have worked on this feature and might be able to help you. For better understanding your problem, it would be great if

Re: Decompose failure recovery time

2020-08-20 Thread Piotr Nowojski
Hi, > I want to decompose the recovery time into different parts, say > (1) the time to detect the failure, > (2) the time to restart the job, > (3) and the time to restore the checkpointing. 1. Maybe I'm missing something, but as far as I can tell, Flink can not help you with that. Time to

Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread abc15606
Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。 发自我的iPhone > 在 2020年8月21日,12:15,Jingsong Li 写道: > > 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。 > 确定这个版本hive写出的数据可以被读取吗? > >> On Fri, Aug 21, 2020 at 10:17 AM wrote: >> >> 使用版本是flink 1.11 >> Hive 2.1.1 >> flink

Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread Jingsong Li
Flink filesystem connector 或者 DataStream用flink-orc 的版本是比较新的版本,所以老版本的ORC读不了。 建议你用Flink hive表的方式来写orc On Fri, Aug 21, 2020 at 12:25 PM wrote: > Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。 > > 发自我的iPhone > > > 在 2020年8月21日,12:15,Jingsong Li 写道: > > > > 如果是hive table的写,flink

Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread Jingsong Li
是的 On Fri, Aug 21, 2020 at 1:30 PM wrote: > flink hive表的方式是什么意思?hive streaming吗? > > 发自我的iPhone > > > 在 2020年8月21日,13:27,Jingsong Li 写道: > > > > Flink filesystem connector 或者 DataStream用flink-orc > 的版本是比较新的版本,所以老版本的ORC读不了。 > > > > 建议你用Flink hive表的方式来写orc > > > >> On Fri, Aug 21, 2020 at 12:25

Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread Jingsong Li
如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。 确定这个版本hive写出的数据可以被读取吗? On Fri, Aug 21, 2020 at 10:17 AM wrote: > 使用版本是flink 1.11 > Hive 2.1.1 > flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬? > > > -- Best, Jingsong Lee

Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread Benchao Li
每秒1多条不算少了吧,如果还想再高一些,可以提高一下sink.buffer-flush.max-rows配置,默认是100 LittleFall <1578166...@qq.com> 于2020年8月20日周四 下午7:56写道: > 这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink. > > ```java > package main; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import

Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread LittleFall
谢谢你的回复,它确实帮到了我。 我找到了另一个问题: rewriteBatchedStatements=true 应该是驼峰式,而我之前用了全小写,以至于按批写入不生效。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 增量che ckpoint

2020-08-20 Thread Yun Tang
Hi 增量checkpoint与web界面的信息其实没有直接联系,增量checkpoint的信息记录由CheckpointCoordinator中的SharedStateRegistry[1] 进行计数管理,而保留多少checkpoint则由 CheckpointStore管理 [2]. 保留2个checkpoint的执行过程如下: chk-1 completed --> register chk-1 in state registry --> add to checkpoint store chk-2 completed --> register chk-2 in state

Re: state序列化问题

2020-08-20 Thread Yun Tang
Hi 如果想要在list中保存String,也就是list中的每个元素格式是String,ListState的格式应该是 ListState, 而不是 ListState>,后者表示有一个list,list中的每一个元素均是一个list ListState 本身并不属于java的collection,所以不存在ArrayList 与 LinkedList的区别。 祝好 唐云 From: shizk233 Sent: Thursday, August 20, 2020 18:00 To:

Re: 有什么方式基于yarn集群直接运行flink任务(仅部署yarn集群,不部署flink)

2020-08-20 Thread 徐骁
这个命令会把 flink-dist 提交到 hdfs 上的 赵一旦 于2020年8月19日周三 下午10:10写道: > 没太懂,yarn部分没自己部署过,yarn集群部署好(假设5台机器),那么这5台机器上不部署任何flink相关dist包就可以嘛。 > > 比如我从额外一台机器6作为提交任务的机器,向yarn集群提交flink任务。但是我的jar也只包含用户jar呀,yarn容器中运行的jobmanager/taskmanager等进程使用的flink的dist包从哪来呢? > > > 徐骁 于2020年8月19日周三 下午7:58写道: > > > flink yarn 有个

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread Benchao Li
Hi, 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。 shizk233 于2020年8月20日周四 下午2:22写道: > Hi all, > > 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。 > 问题1: > 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系? > 还是这两个方法是顺序执行的? > > 问题2: > 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的 >

Re: Re: Re:Re: 用hive streaming写 orc文件的问题

2020-08-20 Thread Jingsong Li
这是bug,已经修复了,待发布 On Fri, Aug 14, 2020 at 6:05 PM flink小猪 <18579099...@163.com> wrote: > 根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间 > 会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务, > parquet表依然没有任何问题,而orc表任务无限重启。并报错。 > >

Re: 【bug】flink 1.11使用连接器时schema丢失DataType指定的物理信息

2020-08-20 Thread 赵 建云
是的,新的DynamicTable在开发中。table api下不能绑定物理类型的情况,导致了一些类型兼容问题,现在有了解决的方案。 赵建云 2020年8月20日 2020年8月20日 下午2:27,Jingsong Li mailto:jingsongl...@gmail.com>> 写道: 1.11 就用新的source sink接口吧 On Wed, Aug 19, 2020 at 12:43 AM 赵 建云 mailto:zhaojianyu...@outlook.com>> wrote: 补充图片链接 创建连接器

Re: Orc文件问题请教

2020-08-20 Thread abc15606
已经找到原因了,hive on spark情况下,hive读取不了orc 发自我的iPhone > 在 2020年8月20日,14:50,Jingsong Li 写道: > > 你可以贴下异常栈, > 估计是ORC版本问题,如果你用file system的orc writer,那是比较新的版本。 > 建议你用下Hive的表来写,这样你可以选版本。 > > Best, > Jingsong > >> On Thu, Aug 20, 2020 at 12:10 PM wrote: >> >> 使用flink

Re: Orc文件问题请教

2020-08-20 Thread Jingsong Li
你可以贴下异常栈, 估计是ORC版本问题,如果你用file system的orc writer,那是比较新的版本。 建议你用下Hive的表来写,这样你可以选版本。 Best, Jingsong On Thu, Aug 20, 2020 at 12:10 PM wrote: > 使用flink sql写到orc文件,以后,flink能读取出来,但是spark和hive均不能读取出来,impala能读取。 > > 发自我的iPhone -- Best, Jingsong Lee

Re: 有什么方式基于yarn集群直接运行flink任务(仅部署yarn集群,不部署flink)

2020-08-20 Thread caozhen
赵一旦 wrote > 没太懂,yarn部分没自己部署过,yarn集群部署好(假设5台机器),那么这5台机器上不部署任何flink相关dist包就可以嘛。 > 比如我从额外一台机器6作为提交任务的机器,向yarn集群提交flink任务。但是我的jar也只包含用户jar呀,yarn容器中运行的jobmanager/taskmanager等进程使用的flink的dist包从哪来呢? > > > 徐骁 > ffxrqyzby@ > 于2020年8月19日周三 下午7:58写道: > >> flink yarn 有个 job 发布方式, >> >>

Flink 1.10.1 on Yarn

2020-08-20 Thread xuhaiLong
Hi datastream 转为 table。使用 `JDBCOutputFormat.buildJDBCOutputFormat()` 输出到 mysql,出现这个[1]异常 任务 failover, 2点58分开始,1小时一次。导致 任务出现 [2] 异常,metaspace 为 256M,猜测是由于启动过于频繁 classLoder 为同一个引起的。 期望解答: 关于[1] 异常,是什么原因引起的?有没有什么合适的解决方案。flink 1.10 有没有其他输出在 mysql 的 connector? 关于[2]异常,这个问题是我猜测的原因吗?flink

Re: 关于hive的一个疑问

2020-08-20 Thread Harold.Miao
hi hive catlog只存储元数据,元数据信息可以通过hive client获取Hive Table,然后通过table.getParameters()可以获取到。 至于具体数据,是跟你的元数据对应的存储系统相关的。要去对应的存储里面去查。 Bruce 于2020年8月20日周四 下午7:52写道: > hi,all. > > hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗? > > > > > 比如hive里面存储了MySQL,Oracle的表元数据信息,可以用hivecatalog读取到具体的表数据吗? > > >

Re: Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread 刘大龙
赞,是的,MySQL单机每秒能写入1条应该很多了,我之前做过一个业务需求,直接用Jdbc,而不是Flink jdbc connector写,按每批5000条数据,测下来性能也差不多1条,这个应该是MYSQL的瓶颈,而不在connector这边 > -原始邮件- > 发件人: "Benchao Li" > 发送时间: 2020-08-20 22:11:53 (星期四) > 收件人: user-zh > 抄送: > 主题: Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗 > >

Flink SQL Map类型字段大小写不敏感支持

2020-08-20 Thread zilong xiao
如题,在业务中有遇到过在Map类型字段中有表示同一个含义但是大小写不一致的情况,比如有个Map字段 my_map,里面可能存在key aB,ab,Ab,在SQL中取值时能否能做到大小写不敏感呢,my_map['ab'] 能取所有相关key的值

1.11??????????TableEnvironment.executeSql("insert into ...")??job??????????????

2020-08-20 Thread Asahi Lee
?? insert into??job ?? EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); String sourceDDL = "CREATE TABLE datagen ( " +

taskmanager引用通用jdbc连接池的问题

2020-08-20 Thread Bruce
请教大佬: flink on yarn按照并行度分配了2个taskManager,然后我们这有个jdbc的连接池在基础工程里「最大连接数是50」,有个算子的计算逻辑调用了基础工程的jdbc连接池进行业务处理。 那么2个taskManager共用这50个最大连接数还是每个taskmanager最大50 * 2 =100个连接数呢? 发自我的iPhone

Re: hive只作为元数据管理可以读到具体的表数据吗?

2020-08-20 Thread Rui Li
hive catalog只负责管理元数据,具体读数据不是hive catalog来做的哈。所以能读什么样的表取决于flink是不是有对应的connector。文档上看jdbc connector还是不支持Oracle的。 On Fri, Aug 21, 2020 at 11:16 AM Bruce wrote: > 请教大佬: > > > > > flink平台引入hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗? > >

回复:flink1.10中hive module 没有plus,greaterThan等函数

2020-08-20 Thread faaron zheng
Thanks,原来是我的打开方式不对 在2020年08月21日 11:17,Rui Li 写道: 是只用了hive module么?建议的方式是同时加载hive module和core module,解析函数的时候会根据加载的顺序去每个module里查找。 On Fri, Aug 21, 2020 at 11:06 AM faaron zheng wrote: > Hi all, 我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module > 中的build-in

Re: Flink 启动问题

2020-08-20 Thread zilong xiao
-yt应该只能写一个目录,你有什么痛点呢? guaishushu1...@163.com 于2020年8月20日周四 下午8:40写道: > 大佬们知道 flink 的-yt命令是不支持多个目录吗,而且只能上传到集群.jar文件吗??? > > > > guaishushu1...@163.com >

Re: flink1.11启动问题

2020-08-20 Thread Yang Wang
这样报错看着是Yarn NM的报错,你每次启动都是这样吗,还是偶然一次的 如果是偶然一次的,那这个报错应该是Flink stopContainer的时候Yarn NM正好重启了 Best, Yang 酷酷的浑蛋 于2020年8月20日周四 上午10:59写道: > > > flink1.11启动后报这个错,然后任务就自己挂了,没有其它错误,也没有报我代码错 > >

Re: state序列化问题

2020-08-20 Thread shizk233
抱歉,是我表述不清楚,ListState>只是举个例子,并不是我的应用场景实际的状态。 从实际考虑,我想利用MapState保存一系列特殊的计数器Map,也就是MapState>,主要用来做一个伪窗口,key是窗口的开始时间。 主要想知道,在MapStateDescriptor声明类型信息时,我是否应该把内部Map声明成明确的HashMap类型,而不是Map类型? Yun Tang 于2020年8月21日周五 上午12:13写道: > Hi > > 如果想要在list中保存String,也就是list中的每个元素格式是String,ListState的格式应该是 >

flink1.10中hive module 没有plus,greaterThan等函数

2020-08-20 Thread faaron zheng
Hi all, 我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module 中的build-in function比如plus,greaterThan。这会导致同样的sql core module可以执行成功,hive module却会报错,比如在使用row_number() over()时候。这是什么原因?

hive只作为元数据管理可以读到具体的表数据吗?

2020-08-20 Thread Bruce
请教大佬: flink平台引入hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗? 比如hive里面存储了Oracle的t_log表元数据信息,flink可以用hivecatalog读取到t_log具体的表数据吗? 发自我的iPhone

Re: flink1.10中hive module 没有plus,greaterThan等函数

2020-08-20 Thread Rui Li
是只用了hive module么?建议的方式是同时加载hive module和core module,解析函数的时候会根据加载的顺序去每个module里查找。 On Fri, Aug 21, 2020 at 11:06 AM faaron zheng wrote: > Hi all, 我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module > 中的build-in function比如plus,greaterThan。这会导致同样的sql core module可以执行成功,hive >

Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

2020-08-20 Thread yobdcdoll
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))). *apply(...)*.addSink(new TemplateMySQLSink()); On Wed, Aug 19, 2020 at 6:27 PM wangl...@geekplus.com wrote: > > 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 > >

回复: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread 引领
哈喽,你现在写入效率是否增加,我也遇到了,感觉写入速度比较低的问题 | | 引领 | | yrx73...@163.com | 签名由网易邮箱大师定制 在2020年08月20日 22:52,LittleFall<1578166...@qq.com> 写道: 谢谢你的回复,它确实帮到了我。 我找到了另一个问题: rewriteBatchedStatements=true 应该是驼峰式,而我之前用了全小写,以至于按批写入不生效。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread 赵一旦
jdbc connector是1.11的吗,我之前还是得自己封装,搞好复杂。batch+timeout+retry+metric等机制。 引领 于2020年8月21日周五 上午9:41写道: > 哈喽,你现在写入效率是否增加,我也遇到了,感觉写入速度比较低的问题 > > > | | > 引领 > | > | > yrx73...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年08月20日 22:52,LittleFall<1578166...@qq.com> 写道: > 谢谢你的回复,它确实帮到了我。 > > 我找到了另一个问题: > >

flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread abc15606
使用版本是flink 1.11 Hive 2.1.1 flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread shizk233
谢谢大佬解答。 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例, 那么数据流上的不同key(不是map里的hash key)是怎么保证只获取到对应的那部分map state数据呢? 按我的理解,按key分流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。 Benchao Li 于2020年8月20日周四 下午4:40写道: > Hi, > > 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。 > > shizk233 于2020年8月20日周四 下午2:22写道: >

state序列化问题

2020-08-20 Thread shizk233
Hi all, 请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化, 那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList, 会对类型信息提取产生不良影响吗? 按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。 但是都可以作为List>来声明。 请求野生的大佬支援一下!

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread Benchao Li
不同的key应该用的是同一个state的实例,但是state下面会处理不同的key对应的state,也就是key对于用户来说是透明的。 比如你用一个MapState,那就是约等于每个key都有一个Map实例,不同key之间是独立的。 shizk233 于2020年8月20日周四 下午5:03写道: > 谢谢大佬解答。 > 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例, > 那么数据流上的不同key(不是map里的hash key)是怎么保证只获取到对应的那部分map state数据呢? > >

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread shizk233
理解了!感谢! Benchao Li 于2020年8月20日周四 下午6:00写道: > 不同的key应该用的是同一个state的实例,但是state下面会处理不同的key对应的state,也就是key对于用户来说是透明的。 > 比如你用一个MapState,那就是约等于每个key都有一个Map实例,不同key之间是独立的。 > > shizk233 于2020年8月20日周四 下午5:03写道: > > > 谢谢大佬解答。 > > 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例, > > 那么数据流上的不同key(不是map里的hash

?????? KeyedCoProcessFunction??????????????????????????????????

2020-08-20 Thread Cayden chen
hi, https://blog.csdn.net/yuchuanchen/article/details/105677408 ---- ??: "user-zh"

DDL中声明主键会报类型不匹配

2020-08-20 Thread xiao cai
Hi: flink版本1.11.0 connector为kafka DDL中声明某个字段为primary key时,会报类型不匹配,去掉primary key constraint就可以正常执行。 把shop_id设置为 varchar not null也不行。 org.apache.flink.table.api.ValidationException: Type STRING NOT NULL of table field 'shop_id' does not match with the physical type STRING of the 'shop_id' field

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread shizk233
谢谢Cayden 我去看看 Cayden chen <1193216...@qq.com> 于2020年8月20日周四 下午4:58写道: > hi, 你可以看下这个,讲的挺详细的 > https://blog.csdn.net/yuchuanchen/article/details/105677408 > > > --原始邮件-- > 发件人: > "user-zh" >

Re: Flink StreamingFileSink滚动策略

2020-08-20 Thread Jingsong Li
只要你继承CheckpointRollingPolicy,想怎么实现shouldRollOnEvent和shouldRollOnProcessingTime都行 On Wed, Aug 19, 2020 at 6:20 PM guoliang_wang1335 wrote: > 请问,Flink StreamingFileSink使用批量写Hadoop SequenceFile > format,能自定义滚动策略吗?我想指定文件大小、文件最长未更新时间和checponit来进行滚动,可以通过实现RollingPolicy接口来定制吗?谢谢! > > > 看文档< >

回复:答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-20 Thread xiao cai
Hi: 感谢答复,确实是个思路。 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。 Best, xiao cai 原始邮件 发件人: 范超 收件人: user-zh@flink.apache.org 发送时间: 2020年8月20日(周四) 09:11 主题: 答复: Flink on Yarn

Re: 【bug】flink 1.11使用连接器时schema丢失DataType指定的物理信息

2020-08-20 Thread Jingsong Li
1.11 就用新的source sink接口吧 On Wed, Aug 19, 2020 at 12:43 AM 赵 建云 wrote: > 补充图片链接 > 创建连接器 > http://image.zhaojianyun.com/mweb/bug1.png > TableSourceSinkFactory中的创建sink > http://image.zhaojianyun.com/mweb/bug2.png > TableSchema的运行时物理信息 > http://image.zhaojianyun.com/mweb/bug3.png > > > > 2020年8月18日

Re: flink集成到cdh

2020-08-20 Thread Jingsong Li
具体什么错呢 On Tue, Aug 18, 2020 at 8:34 PM smq <374060...@qq.com> wrote: > > 大家好,在网上找了个制作parcel的工具,flink1.9版本打好之后可以正常通过cm安装运行,但是1.10和1.11安装之后都是启动不了,请问大家有这方面的经验可以传授下吗,感激不尽! -- Best, Jingsong Lee

Re: Flink SQL血缘关系

2020-08-20 Thread Jingsong Li
取决于你为啥要做血缘关系 On Wed, Aug 19, 2020 at 1:17 AM guaishushu1...@163.com < guaishushu1...@163.com> wrote: > 哪位大佬知道,如果要做Flink SQL血缘关系是在sqlNode中拿表之间关系好,还是在Transformation 算子中拿血缘关系好 > > > > guaishushu1...@163.com > -- Best, Jingsong Lee

KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 Thread shizk233
Hi all, 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。 问题1: 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系? 还是这两个方法是顺序执行的? 问题2: 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的 Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?

Re:Re: Flink StreamingFileSink滚动策略

2020-08-20 Thread guoliang_wang1335
我去试试,谢谢啦。 在 2020-08-20 14:19:41,"Jingsong Li" 写道: >只要你继承CheckpointRollingPolicy,想怎么实现shouldRollOnEvent和shouldRollOnProcessingTime都行 > >On Wed, Aug 19, 2020 at 6:20 PM guoliang_wang1335 >wrote: > >> 请问,Flink StreamingFileSink使用批量写Hadoop SequenceFile >>

Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread abc15606
flink hive表的方式是什么意思?hive streaming吗? 发自我的iPhone > 在 2020年8月21日,13:27,Jingsong Li 写道: > > Flink filesystem connector 或者 DataStream用flink-orc 的版本是比较新的版本,所以老版本的ORC读不了。 > > 建议你用Flink hive表的方式来写orc > >> On Fri, Aug 21, 2020 at 12:25 PM wrote: >> >> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。 >>

Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 Thread abc15606
试过了,一样的,本质也是通过写文件。 发自我的iPhone > 在 2020年8月21日,13:35,Jingsong Li 写道: > > 是的 > >> On Fri, Aug 21, 2020 at 1:30 PM wrote: >> >> flink hive表的方式是什么意思?hive streaming吗? >> >> 发自我的iPhone >> 在 2020年8月21日,13:27,Jingsong Li 写道: >>> >>> Flink filesystem connector 或者 DataStream用flink-orc >>

关于hive的一个疑问

2020-08-20 Thread Bruce
hi,all. hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗? 比如hive里面存储了MySQL,Oracle的表元数据信息,可以用hivecatalog读取到具体的表数据吗? 发自我的iPhone

Re: flink 1.11.1 sql client 流式 join 出现预期之外的含 null 行

2020-08-20 Thread LittleFall
本问题最后可以归结到这个问题: 请问在 flink sql 中建立的多张表应当怎样分辨接收 kafka 传来的 canal-json? 并且已经解决。 谢谢你的帮助 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 启动问题

2020-08-20 Thread guaishushu1...@163.com
大佬们知道 flink 的-yt命令是不支持多个目录吗,而且只能上传到集群.jar文件吗??? guaishushu1...@163.com

JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 Thread LittleFall
这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink. ```java package main; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.EnvironmentSettings; public class Main {