Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-07 Thread Rui Li
streaming file committer在提交分区之前会打印这样的日志: LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier); partition commit policy会在成功提交分区以后打印这样的日志: LOG.info("Committed partition {} to metastore", partitionSpec); LOG.info("Committed partition {} with success file",

回复:消费kafka数据乱序问题

2020-09-07 Thread Shuai Xia
可以通过设置Key的方式保证同一个账户的数据是有序的 -- 发件人:Danny Chan 发送时间:2020年9月8日(星期二) 11:12 收件人:user-zh 主 题:Re: 消费kafka数据乱序问题 你的 source 消费单/多 partition 数据相对 partition 来说仍然是有序的 只是 source 和下游 operator 如果存在数据 shuffle 就会破坏顺序,目前想保序,一种办法是 source

Re: 消费kafka数据乱序问题

2020-09-07 Thread Danny Chan
你的 source 消费单/多 partition 数据相对 partition 来说仍然是有序的 只是 source 和下游 operator 如果存在数据 shuffle 就会破坏顺序,目前想保序,一种办法是 source 的并发和下游保持一致。 Best, Danny Chan 在 2020年9月4日 +0800 PM4:40,smq <374060...@qq.com>,写道: > 大家好 > > 现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额. >

回复:回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-07 Thread 夏帅
就第二次提供的日志看,好像是你的namenode出现的问题 -- 发件人:MuChen <9329...@qq.com> 发送时间:2020年9月8日(星期二) 10:56 收件人:user-zh@flink.apache.org 夏帅 ; user-zh 主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败 在checkpoint失败的时间,tm上还有一些info和warn级别的日志:

?????? ??????????StreamingFileSink??hive metadata??????????????????

2020-09-07 Thread MuChen
??checkpointtm??info??warn 2020-09-04 17:17:59,520 INFO org.apache.hadoop.io.retry.RetryInvocationHandler [] - Exception while invoking create of class ClientNamenodeProtocolTranslatorPB over uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over

Re: 使用k8s 实现flink 的高可用

2020-09-07 Thread Yang Wang
目前Flink on K8s(包括Standalone on K8s和native K8s)支持的HA方式只有ZK+HDFS/S3/GFS/OSS的方式,配置就是和Standalone是一致的[1] 因为JobManager使用了deployment,所以失败以后会自动被再次拉起,然后从ZK和外部存储进行恢复 主要是以下这些配置: high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.storageDir:

Re: how flink-sql connector kafka reads array json

2020-09-07 Thread Benchao Li
Hi, 这个是一个已知的问题,已经有issue[1] 在跟进解决了。预计在1.12可以使用。 [1] https://issues.apache.org/jira/browse/FLINK-18590 大罗 于2020年9月8日周二 上午10:39写道: > hi,大家好,我遇到一个问题。 > > 下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming", > "age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql? > >

回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-07 Thread 夏帅
异常日志只有这些么?有没有详细点的

Re: flink-1.11 native k8s state 在 ceph 存储问题

2020-09-07 Thread Yang Wang
其实需要的其实是PV+FileSystemHAService这样的HA方式[1],但目前没有支持,并且如何来做还在讨论中。之前社区也有人有类似的需求,你也可以看下[1] 目前支持的是ZK+HDFS/S3/GFS/OSS等这样的HA方式, 同时在考虑K8s ConfigMap + HDFS/S3/GFS/OSS的方式[2] [1]. https://lists.apache.org/thread.html/r36882b6c8286132b6fe6851e1c04cd876e9fa35aff9a5b22d181487d%40%3Cuser.flink.apache.org%3E [2].

how flink-sql connector kafka reads array json

2020-09-07 Thread 大罗
hi,大家好,我遇到一个问题。 下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming", "age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql? CREATE TABLE mykafka1 (name String, age Int) WITH ( 'connector.type' = 'kafka', 'format.type' = 'json', 'update-mode' = 'append' );

?????? ????StreamingFileSink??hive metadata??????????????????

2020-09-07 Thread MuChen
?? ---- ??: "MuChen"

Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-07 Thread Husky Zeng
Hi Cristian, I don't know if it was designed to be like this deliberately. So I have already submitted an issue ,and wait for somebody to response. https://issues.apache.org/jira/browse/FLINK-19154 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

[ANNOUNCE] Weekly Community Update 2020/36

2020-09-07 Thread Konstantin Knauf
Dear community, happy to share another community update for the past week. This time with the upcoming release of Flink 1.11.2, a proposal for more efficient aggregation for batch processing with the DataStream API, and the comeback of two FLIPs that have been abandoned for a bit. Flink

Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

2020-09-07 Thread Teunissen, F.G.J. (Fred)
Hi All, My flink-job is using bounded input sources and writes the results to a StreamingFileSink. When it has processed all the input the job is finished and closes. But the output files are still named “-0-0..inprogress.”. I expected them to be named ““-0-0.”. Did I forget some setting or

Re: FLINK DATASTREAM Processing Question

2020-09-07 Thread Vijayendra Yadav
Thank You Dawid. Sent from my iPhone > On Sep 7, 2020, at 9:03 AM, Dawid Wysakowicz wrote: >

Re: Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

2020-09-07 Thread Ken Krugler
Hi Fred, I think this is the current behavior (though it would be helpful to know which version of Flink you’re using). From an email conversation with Kostas in January of this year: > Hi Ken, Jingsong and Li, > > Sorry for the late reply. > > As Jingsong pointed out, upon calling close()

Re: FLINK DATASTREAM Processing Question

2020-09-07 Thread Dawid Wysakowicz
Hi, You can see the execution plan via StreamExecutionEnvironment#getExecutionPlan(). You can visualize it in[1]. You can also submit your job and check the execution plan in Web UI. As for the question which option is preferred it is very subjective. As long as in the option b) both maps are

Flink alert after database lookUp

2020-09-07 Thread s_penakalap...@yahoo.com
Hi All, I am new to Flink, request your help!!! My scenario : 1> we receive Json messages at a very high frequency like 10,000 messages / second2> we need to raise an Alert for a particular user if there is any breach in threshold value against each attribute in Json.3> These threshold values

Re: How to access state in TimestampAssigner in Flink 1.11?

2020-09-07 Thread Theo Diefenthal
Hi Aljoscha, We have a ProcessFunction which does some processing per kafka partition. It basically buffers the incoming data over 1 minute and throws out some events from the stream if within the minute another related event arrived. In order to buffer the data and store the events over 1

Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-07 Thread Cristian
That's an excellent question. I can't explain that. All I know is this: - the job was upgraded and resumed from a savepoint - After hours of working fine, it failed (like it shows in the logs) - the Metadata was cleaned up, again as shown in the logs - because I run this in Kubernetes, the

?????? 1.11????????????????????????????????????????????????????????????????

2020-09-07 Thread Asahi Lee
?? StreamTableEnvironment.from("")??datagen??table?? ?? package org.apache.flink.playgrounds.spendreport; import

flink-1.11 native k8s state 在 ceph 存储问题

2020-09-07 Thread chenkaibit
HI: 想问下大家在 native k8s 模式下有没有用到 ceph 存储 flink state 和 HA 数据(替代原来的 HDFS). 我在测试中发现 flink on native k8s 下没有办法挂载 cephfs,这一块是不是需要修改代码才能支持?我在 flink jira 上发现一个相关 issue(https://issues.apache.org/jira/browse/FLINK-15649),在这里想请教下 flink-1.11 版本如何挂载 cephfs;如果不能挂载 cephfs 的话,还能使用除 hdfs 之外的哪些存储。 求大神解惑。

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-07 Thread Peihui He
Hi, 就是用hdfs的。 Jingsong Li 于2020年9月7日周一 上午11:16写道: > 另外,可能和使用本地文件系统有关?换成HDFS试试? > > On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li > wrote: > > > Hi, > > > > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里? > > > > On Sat, Sep 5, 2020 at 11:11 PM Peihui He wrote: > > > >> Hi, all > >> > >>

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-07 Thread Peihui He
Hi, 从jstack 分析,因该是卡在下面这里了。看代码好像是需要遍历所有hdfs上指定path的文件。是这样的不?如果文件很多的话不是要很慢? "flink-akka.actor.default-dispatcher-30" #103 prio=5 os_prio=0 tid=0x7f6264001000 nid=0x4a93 in Object.wait() [0x7f62964f1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native

Re: Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 Thread Xingbo Huang
Hi, 你这个图挂了。json, csv这种是format[1] 。filesystem,datagen, print, kafka等这种都是connector[2] ,用来从外部一个数据源读入数据或者写出数据。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/ [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ Best,

Re: Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 Thread Xingbo Huang
Hi, 你那个嵌套json串没法用Map来玩(要求所有key是一个类型,value是一个类型),我觉得你得用Row来表示,类型声明类似于Row(String, Map(String, String))这种(根据你给的那个{'table':'a','data':{'data1':'b','data2':'c'}这样的数据来的), 你可以认为Row是我们提供的继承自tuple的,所以你的这个json串还得你自己转成Row,方式也简单。 比如你的data 是上面说的{'table':'a','data':{'data1':'b','data2':'c'}

Flink stateful functions : compensating callback to invoked functions on a timeout

2020-09-07 Thread Mazen_Ezzeddine
Hi all, I am implementing a use case in Flink stateful functions. My specification highlights that starting from a stateful function f a business workflow (in other words a group of stateful functions f1, f2, … fn are called either sequentially or in parallel or both ). Stateful function f waits

Re: 1.11版本,创建视图后,根据表名称获取视图表对象,表名为临时名称的问题

2020-09-07 Thread Leonard Xu
>Table table1 = bsTableEnv.from("table1"); >System.out.println(table1); >// 上面的打印,我预期的表名应该为table1,但是为一个临时表名 Table 对象里的 tableName 没有太大的意义,看了下代码从来没有被赋值过。 我理解 Table 都是从QueryOperation 转化来的,而queryOperation对应的是一个 query (select id, total, 12 as col_1 from

Re: RocksDBStateBackend 问题

2020-09-07 Thread 刘建刚
直接存在rocksdb数据库。rocksdb会首先将数据写到内存buffer中(不会太大),等buffer满了再刷到磁盘。相比filesystem的statebackend,rocksdb会因为序列化和反序列化导致处理速度慢一些,但是优势是可以利用磁盘的超大空间来存储更大的状态。 zilong xiao 于2020年9月7日周一 下午5:51写道: > 可以看下这个文档: > >

Re: RocksDBStateBackend 问题

2020-09-07 Thread zilong xiao
可以看下这个文档: https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend guaishushu1...@163.com 于2020年9月7日周一 下午5:47写道: > 想问下关于RocksDBStateBackend > 是直接把状态存在rocksdb数据库,还是等内存满了再存到RocksDB数据库。如果直接存在RocksDB数据库,那岂不是很影响数据处理速度。 > > > >

使用k8s 实现flink 的高可用

2020-09-07 Thread 旧城以西
请教各位大佬,如果通过k8s session 实现flink的ha,如果有实现的了,请提供一下相应配置文件 根据官方文档,配置ha需要修改master,那么通过k8s 部署,如何修改master文件

RocksDBStateBackend 问题

2020-09-07 Thread guaishushu1...@163.com
想问下关于RocksDBStateBackend 是直接把状态存在rocksdb数据库,还是等内存满了再存到RocksDB数据库。如果直接存在RocksDB数据库,那岂不是很影响数据处理速度。 guaishushu1...@163.com

Re: flink-sql1.11写入mysql重复问题

2020-09-07 Thread Leonard Xu
Hi 这个原因在于flink 和 mysql主键是不一致导致的重复数据,你可以把当前mysql表中设置成flink主键的字段添加个unique key 约束,这样和pk也是等价的。 Best Leonard > 在 2020年9月6日,21:21,凌天荣 <466792...@qq.com> 写道: > > 使用flink-sql1.11时,insert进connect为mysql的flink > table时,由于mysql里面的id是主键自增的,1.11版本upsert模式设置主键在插入的时候又不能指定为mysql的主键id,只能设置别的字段为flink >

flink-CDC 中文乱码问题

2020-09-07 Thread Li,Qian(DXM,PB)
你好: 我在使用Flink CDC SQL CLI向ES6写数据的时候, 由于原数据库编码形式是latin1,这样往ES中写入的中文数据就是乱码了, 请问有什么解决方式么,可以进行编码转换? 谢谢! 李倩

Re:Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 Thread whh_960101
您好, 图中像datagen和print这样的connector更像是表名,之前听说的只有json、csv、filesystem这种类型的connector,请问connector在使用连接器DDL创建表时的作用是什么 在 2020-09-07 11:33:06,"Xingbo Huang" 写道: >Hi, >你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。 >

Re: How to access state in TimestampAssigner in Flink 1.11?

2020-09-07 Thread Aljoscha Krettek
Hi, sorry for the inconvenience! I'm sure we can find a solution together. Why do you need to keep state in the Watermark Assigner? The Kafka source will by itself maintain the watermark per partition, so just specifying a WatermarkStrategy will already correctly compute the watermark per

Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-07 Thread Husky Zeng
I means that checkpoints are usually dropped after the job was terminated by the user (except if explicitly configured as retained Checkpoints). You could use "ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION" to save your checkpoint when te cames to failure. When your zookeeper lost

flink1.9.3 on yarn 提交任务问题

2020-09-07 Thread 宁吉浩
我选择用 bin/flink run -m yarn cluster 的方式提交任务; 遇到了两个问题: 1. 这两个任务在一个flink集群ui里可见,甚至和之前的flink-session(测试使用)集群在同一个ui里, 这是正常现象吗? 2. 我知道可以通过并行度和slot的指定来确定 tm的数量,查看下图,两个任务一共占用了yarn的8个容器,请问 cpu这个该如何设定? ps: 之前使用spark 可以直接设定 执行器核心数量,现在找不到方法,总不能一个tm8个solt,就使用一个cpu吧

Re:Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 Thread whh_960101
您好,完全叙述一下我的问题: 1.首先我需要将一个定义好的python字典作为udf的输入参数,假设这个字典为dic = {1:'a',2:'b'} 那么我在定义udf的时候,如何写输入(一共两个输入参数,一是这个定义好的字典dic,二是一个DataTypes.ARRAY(DataTypes.STRING()),即下文的re_list) 我的方法是: class fun(ScalarFunction): def __int__(self): self.dic = {1:'a',2:'b'} def eval(self,re_list): #调用dic时,使用self.dic #..

Re: 邮件列表地址错误

2020-09-07 Thread darion
了解 谢谢您 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 streaming file sink to hdfs orc file format could be recognized by hive

2020-09-07 Thread Jingsong Li
Hi, flink-orc实现的OrcBulkWriterFactory,是有点“ 深入“的,重写了部分ORC的代码,所以没那么好做版本兼容。 你可以考虑使用Hive的streaming写,它使用native的hive orc writer[1],可以对应你需要的那个版本。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_streaming.html#streaming-writing Best, Jingsong On Mon, Sep 7, 2020 at 2:11

flink 1.11 streaming file sink to hdfs orc file format could be recognized by hive

2020-09-07 Thread 大罗
Hi,大家好,我的试验场景是想把kafka的数据直接写到hive(orc format),构建一个实时数仓的理念,但是,通过flink sql,又因为依赖问题而不能成功,那么我就想可否通过streaming file sink写入hdfs文件,给hive识别呢。我的试验过程如下: 我所使用的版本如下: Hadoop 3.0.0+cdh6.3.2 HDFS 3.0.0+cdh6.3.2 HBase 2.1.0+cdh6.3.2 Hive 2.1.1+cdh6.3.2 Flink 1.11.1