贴一下代码
在 2020/9/8 14:09, zhongbaoluo 写道:
据插入数据执行失败,也没有找到异常。 yarn
看看你的表是不是事务表,hive建表的时候加上 'transactional' = 'false'
在 2020/9/8 16:26, 大罗 写道:
Hi,我使用flink sql 1.11.1 的hive catalog特性往hive orc表插入数据:
我所使用的版本如下:
Hadoop 3.0.0+cdh6.3.2
HDFS 3.0.0+cdh6.3.2
HBase 2.1.0+cdh6.3.2
Hive 2.1.1+cdh6.3.2
Flink 1.11.1
定义hive orc表如下:
create table dest_orc (
i int
)
业务端根据用户ID
hash发送到kafka,保证每一个用户的所有操作在kafka的同一个partition内,并且在发送端保证操作有序。
至于flink消费kafka后,乱序,不太可能,或者说可能性极小,毕竟都是按照offset来消费。
在 2020/9/4 18:59, Xiao Xu 写道:
两个方法
1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的
2. 就是你说的在 flink 里面做乱序处理
宁吉浩 于2020年9月4日周五 下午5:56写道:
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。
在 2020/9/4 13:14, Benchao Li 写道:
如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。
在 2020/9/4 10:34, Shuiqiang Chen 写道:
Hi,
为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费
offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit
org.apache.flink.streaming.api.windowing.windows.TimeWindow
getWindowStartWithOffset??17-182020-09-01
18:00:00.0
2020-09-01 18:00:00.0
??2020-09-01
应该是。通过源码可以知道flink-json目前支持2种内置json格式的解析,一个是canal,一个是debezium。
具体可参考:
org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema 和
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema
在 2020/8/24 17:27, dixingxin...@163.com 写道:
Hi all:
Flink1.11
: "taochanglian"
To: user-zh@flink.apache.org
Sent: 8/24/2020 5:28:56 AM
Subject: Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/
中的 flink-sql-connector-hive-3.1.2 下载了么,放到lib里面了么?
在 2020/8/24 3:01, 黄蓉 写道:
各位好:
我使用的环境是H
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/
中的 flink-sql-connector-hive-3.1.2 下载了么,放到lib里面了么?
在 2020/8/24 3:01, 黄蓉 写道:
各位好:
我使用的环境是HDP3.0.1的沙盒,flink是最新版本的1.11.1,从官网直接下载的编译好的jar包。我想测试flink与hive的集成,包括查询hive表的数据、写入数据到hive表等操作。目前我遇到问题就是通过flink
sql
你kafka里面的是json么?format是json么?
String resultCreateTableSql = createKafkaSourceSQL +" WITH ( " +" 'connector' = 'kafka' ," +" 'topic' = '" + kafkaTopic +"'," +" 'properties.bootstrap.servers' = '" +
kafkaBootstrapServers +"'," +" 'properties.group.id' = '" + kafkaGroupId +"'," +" 'format' = '" +
我这个是mysql的,直接run,没问题
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String createMysqlTableSQL ="CREATE TABLE mysqlTable1 (\n" +
" id INT,\n" +
" username
11 matches
Mail list logo