Re: 如何 Flink 通过不同的hive_catalog,访问离线和实时两套集群的hive?
你好, 这个看起来是Hadoop的报错,连接失败的这个节点是NameNode么?创建HiveCatalog时可以指定Hadoop的配置文件的,要保证两个HiveCatalog读到的Hadoop配置是正确的。 另外使用多个HiveCatalog需要保证hive版本一致,不同版本的hive没办法一起用 On Fri, Aug 27, 2021 at 3:01 PM Jim Chen wrote: > Hi > > 集群上根本没有这个端口,也不知道这个端口是干嘛用的,为啥要连这个。这个是实时集群上的端口 > > Caizhi Weng 于2021年8月27日周五 下午2:33写道: > > > Hi! > > > > 看起来是 Flink 集群不能访问到 > wx12-dsjrt-master001/xx.xx.xx.xx:8050,检查一下网络以及这个端口的状态看看? > > > > Jim Chen 于2021年8月27日周五 下午1:59写道: > > > > > Hi, All > > > > > > > > > 我是使用的flink1.13.1版本,我们有2套Hadoop集群,离线和实时集群。现在实时集群上的任务,想通过hive_catalog访问离线集群。 > > > 按照官网例子,我分别配置离线和实时的hive-conf-dir的不同路径,发现一只报错。如: > > > 2021-08-27 13:50:22,902 INFO org.apache.hadoop.ipc.Client > > > [] - Retrying connect to server: > > > wx12-dsjrt-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry > > > policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50, > > sleepTime=1000 > > > MILLISECONDS)。 > > > 大家在生产中是如何解决这种问题的呢?非常感谢! > > > > > > -- Best regards! Rui Li
Re: flink 1.13.1版本,使用hive方言,执行insert overwirite语句,插入数据为空时,没有将表中原数据清空
你好, 这个可以去开个jira跟踪一下 On Tue, Aug 17, 2021 at 2:47 PM Asahi Lee <978466...@qq.com.invalid> wrote: > hi! > > 我使用如下sql,我select查询的数据为0行记录时,运行结束后,插入表的原数据没有被清空;而我在hive客户端执行时,表是被清空的! > INSERT OVERWRITE target_table SELECT * from source_table where id > 10; -- Best regards! Rui Li
Re: Flink HIve 文件压缩报错
目前没有办法让作业继续跑,只能重跑了 这里有几个不同的问题: 1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉 2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受 3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进) On Wed, Aug 11, 2021 at 7:52 PM 周瑞 wrote: > 您好: > 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 > > > -- Original ------ > From: "Rui Li" Date: Wed, Aug 11, 2021 07:49 PM > To: "user-zh" > Subject: Re: Flink HIve 文件压缩报错 > > > > 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 > > 目前flink这边写文件的exactly > once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 > > On Tue, Aug 10, 2021 at 7:45 PM 周瑞 > > 您好:Flink > > > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > > 2021-08-10 19:34:19 java.io.UncheckedIOException: > > java.io.FileNotFoundException: File does not exist: > > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > > at > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) > > at > > > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > > at > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) > > at > java.util.HashMap.forEach(HashMap.java:1288) > at > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > > at > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > > at > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > > at > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > > at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > > at > org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > > at org.apache.flink.streaming. > runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > > at > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at > > java.lang.Thread.run(Thread.java:748) Caused by: > > java.io.FileNotFoundException: File does not exist: > > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > > at > > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > > at > > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > > at > > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > > at > > > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > > at > > > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > > at > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) > > ... 19 more > > > > -- > Best regards! > Rui Li -- Best regards! Rui Li
Re: Flink HIve 文件压缩报错
这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 目前flink这边写文件的exactly once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 On Tue, Aug 10, 2021 at 7:45 PM 周瑞 wrote: > 您好:Flink > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > 2021-08-10 19:34:19 java.io.UncheckedIOException: > java.io.FileNotFoundException: File does not exist: > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) >at > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) >at java.util.HashMap.forEach(HashMap.java:1288) at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.io.FileNotFoundException: File does not exist: > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 >at > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > at > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > at > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) >... 19 more -- Best regards! Rui Li
Re: Re: filesystem connector不支持跨subtask合并小文件
可以把这个参数调大点试试呢,调成远大于单个文件的size On Thu, Aug 5, 2021 at 1:43 PM lixin58...@163.com wrote: > 你好, > 生成的三个文件挺小的,不到2kb,1k多一点,配这个是为了合并后比2k大 > > -- > lixin58...@163.com > > > *发件人:* Rui Li > *发送时间:* 2021-08-05 11:42 > *收件人:* user-zh > *主题:* Re: filesystem connector不支持跨subtask合并小文件 > 你好, > > 看到你的compaction.file-size配置成了2kb,这个是希望合并以后的文件的target size只有2kb么 > > On Wed, Aug 4, 2021 at 5:39 PM lixin58...@163.com > wrote: > > > 你好, > > 在使用filesystem > > > connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢? > > > > create table fs_parquet_compact > > (userid bigint, name string, part string) > > PARTITIONED BY (part) > > with( > > 'connector' = 'filesystem', > > 'path' = 'hdfs:///data/fs_parquet_compact', > > 'format' = 'parquet', > > 'auto-compaction' = 'true', > > 'compaction.file-size' = '2kb', > > 'sink.rolling-policy.file-size' = '500b', > > 'sink.rolling-policy.rollover-interval' = '800s', > > 'sink.rolling-policy.check-interval' = '60s' > > ); > > > > > > > > lixin58...@163.com > > > > > -- > Best regards! > Rui Li > > -- Best regards! Rui Li
Re: filesystem connector不支持跨subtask合并小文件
你好, 看到你的compaction.file-size配置成了2kb,这个是希望合并以后的文件的target size只有2kb么 On Wed, Aug 4, 2021 at 5:39 PM lixin58...@163.com wrote: > 你好, > 在使用filesystem > connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢? > > create table fs_parquet_compact > (userid bigint, name string, part string) > PARTITIONED BY (part) > with( > 'connector' = 'filesystem', > 'path' = 'hdfs:///data/fs_parquet_compact', > 'format' = 'parquet', > 'auto-compaction' = 'true', > 'compaction.file-size' = '2kb', > 'sink.rolling-policy.file-size' = '500b', > 'sink.rolling-policy.rollover-interval' = '800s', > 'sink.rolling-policy.check-interval' = '60s' > ); > > > > lixin58...@163.com > -- Best regards! Rui Li
Re: flink 1.13.1 使用hive方言,执行hive sql解析报错
ing, > > `equip_name` string, > > `enqueue_date` string, > > `shi_total_len` double, > > `shi_type_width` double, > > `shi_type_depth` double, > > `moonpool` string, > > `maxwindvelocity` string, > > `maxwaveheight` string, > > `airgap` string, > > `maxopewaterdepth` string, > > `drilldepthcap` string, > > `drillvl` string, > > `drillwater` string, > > `potablewater` string) > > ROW FORMAT SERDE > > 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' > > WITH SERDEPROPERTIES ( > > 'field.delim'=',', > > 'serialization.format'=',') > > STORED AS INPUTFORMAT > > 'org.apache.hadoop.hive.ql.io > .orc.OrcInputFormat' > > OUTPUTFORMAT > > 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' > > LOCATION > > > > > 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/dw_riginfoparam' > > TBLPROPERTIES ( > > 'COLUMN_STATS_ACCURATE'='false', > > 'numFiles'='1', > > 'numRows'='-1', > > 'rawDataSize'='-1', > > 'totalSize'='1564', > > 'transient_lastDdlTime'='1627353556') > > > > > > > > > > > > > > > > -- 原始邮件 -- > > 发件人: > > > "user-zh" > > > < > > lirui.fu...@gmail.com>; > > 发送时间: 2021年7月30日(星期五) 中午11:18 > > 收件人: "user-zh" > > > 主题: Re: flink 1.13.1 使用hive方言,执行hive sql解析报错 > > > > > > > > 你好, > > > > 能不能把你insert语句中使用到的表的DDL发一下?贴一下show create table的结果就可以了。 > > > > On Thu, Jul 29, 2021 at 9:03 PM Asahi Lee <978466...@qq.com.invalid > > > > wrote: > > > > > hi! > > > 我验证了,不是else的问题,下面的sql也报同样的问题?Invalid table alias or column > reference > > 'u' > > > ,我的sql里面没有'u'的名称! > > > CREATE CATALOG `tempo_df_hive_default_catalog` WITH( > > > &nbsp; &nbsp; 'type' = 'hive', > > > &nbsp; &nbsp; 'default-database' = 'default' > > > ); > > > USE CATALOG tempo_df_hive_default_catalog; > > > CREATE TABLE IF NOT EXISTS `default`.`tempo_blackhole_table` > ( > > > &nbsp; &nbsp;f0 INT > > > ); > > > insert into cosldatacenter.dw_riginfoparam > > > select&nbsp; > > > c.LARGE_EQUIP_ID, > > > c.EQUIP_CODE, > > > c.EQUIP_NAME, > > > c.ENQUEUE_DATE, > > > c.SHI_TOTAL_LEN, > > > c.SHI_TYPE_WIDTH, > > > c.SHI_TYPE_DEPTH, > > > case when b.param_cn = '月池尺寸' then a.param_value else null > end as > > Moonpool, > > > case when b.param_cn = '最大风速' then a.param_value else null > end as > > > MaxWindvelocity, > > > case when b.param_cn = '最大波浪高度' then a.param_value else null > end as > > > MaxWaveheight, > > > case when b.param_cn = '气隙' then a.param_value else null end > as > > Airgap, > > > case when b.param_cn = '设计最大作业水深' then a.param_value else > null end as > > > MaxOpeWaterdepth, > > > case when b.param_cn = '额定钻井深度' then a.param_value else null > end as > > > DrilldepthCap, > > > case when b.param_cn = '钻井可变载荷' then a.param_value else null > end as > > > DrillVL, > > > case when b.param_cn = '钻井水' then a.param_value else null > end as > > > DrillWater, > > > case when b.param_cn = '生活水' then a.param_value else null > end as > > > PotableWater > > > from cosldatacenter.ods_emp_maindata_iadc_paramvalue > a&nbsp; > > > inner join cosldatacenter.ods_emp_maindata_iadc_paramdef b on > > a.param_id = > > > b.param_id > > > inner join cosldatacenter.ods_emp_md_large_equip c on > > > a.SUBJECT_ID=c.LARGE_EQUIP_ID; > > > INSERT INTO `default`.`tempo_blackhole_table` SELECT 1 ; > > > > > > > > > > > > > > > > > > org.apache.hadoop.hive.ql.parse.SemanticException: Line > 2:178 Invalid > > > table alias or column reference 'u': (possible column names > are: > > > a.paramvalue_id, a.platform_id, a.equipment_id, a.param_id, > > a.param_value, > > > a.remark, a.create_time, a.creator, a.update_time, > a.update_person, > > > a.record_flag, a.subject_id, a.output_unit, a.show_seq, > b.param_id, > > > b.iadc_id, b.param_code, b.param_en, b.param_cn, > b.output_standard, > > > b.output_unit, b.param_type, b.param_value, b.remark, > b.create_time, > > > b.creator, b.update_time, b.update_person, b.record_flag, > > c.large_equip_id, > > > c.equip_name, c.equip_type, c.equip_function, c.equip_board, > > c.ship_yard, > > > c.manufacturer_date, c.enqueue_date, c.dockrepair_date, > c.scrap_date, > > > c.enqueue_mode, c.work_for_org, c.work_in_org, c.old_age, > > c.create_time, > > > c.creator, c.update_time, c.update_person, c.record_flag, > > c.data_timestamp, > > > c.work_unit_id, c.work_status, c.work_location, c.work_area, > > c.equip_code, > > > c.shi_main_power, c.shi_total_len, c.shi_type_width, > c.shi_type_depth, > > > c.shi_design_draft, c.shi_total_tonnage, c.shi_load_tonnage, > c.remark, > > > c.unit_classification1, c.unit_classification2) > > > > > > > > > > > > > > > > --&nbsp;原始邮件&nbsp;-- > > > 发件人: > > > > > > "user-zh" > > > > > > < > > > xbjt...@gmail.com&gt;; > > > 发送时间:&nbsp;2021年7月29日(星期四) 下午3:32 > > > 收件人:&nbsp;"user-zh" &gt;; > > > > > > 主题:&nbsp;Re: flink 1.13.1 使用hive方言,执行hive sql解析报错 > > > > > > > > > > > > 看起来是sql语法报错,这里面的ELSE呢? > > > > > > 祝好, > > > Leonard > > > > > > > > > &gt; 在 2021年7月27日,20:04,Asahi Lee > <978466...@qq.com.INVALID&gt; > > 写道: > > > &gt; > > > &gt; CASE > > > > > > &gt; > > WHEN mipd.`param_cn` = '月池尺寸' THEN > > > > > > &gt; > > mipv.`param_value`&amp;nbsp; > > > &gt; END AS `Moonpool` > > > > > > > > -- > > Best regards! > > Rui Li > > > > -- > Best regards! > Rui Li -- Best regards! Rui Li
Re: flink 1.13.1 使用hive方言,执行hive sql解析报错
MMENT '', > `param_en` string COMMENT '', > `param_cn` string COMMENT '', > `output_standard` string COMMENT '', > `output_unit` string COMMENT '', > `param_type` string COMMENT '', > `param_value` string COMMENT '', > `remark` string COMMENT '', > `create_time` string COMMENT '', > `creator` string COMMENT '', > `update_time` string COMMENT '', > `update_person` string COMMENT '', > `record_flag` double COMMENT '') > COMMENT '' > ROW FORMAT SERDE > 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' > WITH SERDEPROPERTIES ( > 'field.delim'=',', > 'serialization.format'=',') > STORED AS INPUTFORMAT > 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' > LOCATION > > 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_maindata_iadc_paramdef' > TBLPROPERTIES ( > 'COLUMN_STATS_ACCURATE'='false', > 'last_modified_by'='root', > 'last_modified_time'='1621834335', > 'numFiles'='0', > 'numRows'='-1', > 'rawDataSize'='-1', > 'totalSize'='0', > 'transient_lastDdlTime'='1621834335') > > > > CREATE TABLE `cosldatacenter.dw_riginfoparam`( > `large_equip_id` string, > `equip_code` string, > `equip_name` string, > `enqueue_date` string, > `shi_total_len` double, > `shi_type_width` double, > `shi_type_depth` double, > `moonpool` string, > `maxwindvelocity` string, > `maxwaveheight` string, > `airgap` string, > `maxopewaterdepth` string, > `drilldepthcap` string, > `drillvl` string, > `drillwater` string, > `potablewater` string) > ROW FORMAT SERDE > 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' > WITH SERDEPROPERTIES ( > 'field.delim'=',', > 'serialization.format'=',') > STORED AS INPUTFORMAT > 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' > LOCATION > > 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/dw_riginfoparam' > TBLPROPERTIES ( > 'COLUMN_STATS_ACCURATE'='false', > 'numFiles'='1', > 'numRows'='-1', > 'rawDataSize'='-1', > 'totalSize'='1564', > 'transient_lastDdlTime'='1627353556') > > > > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > < > lirui.fu...@gmail.com>; > 发送时间: 2021年7月30日(星期五) 中午11:18 > 收件人: "user-zh" > 主题: Re: flink 1.13.1 使用hive方言,执行hive sql解析报错 > > > > 你好, > > 能不能把你insert语句中使用到的表的DDL发一下?贴一下show create table的结果就可以了。 > > On Thu, Jul 29, 2021 at 9:03 PM Asahi Lee <978466...@qq.com.invalid> > wrote: > > > hi! > > 我验证了,不是else的问题,下面的sql也报同样的问题?Invalid table alias or column reference > 'u' > > ,我的sql里面没有'u'的名称! > > CREATE CATALOG `tempo_df_hive_default_catalog` WITH( > > 'type' = 'hive', > > 'default-database' = 'default' > > ); > > USE CATALOG tempo_df_hive_default_catalog; > > CREATE TABLE IF NOT EXISTS `default`.`tempo_blackhole_table` ( > > f0 INT > > ); > > insert into cosldatacenter.dw_riginfoparam > > select > > c.LARGE_EQUIP_ID, > > c.EQUIP_CODE, > > c.EQUIP_NAME, > > c.ENQUEUE_DATE, > > c.SHI_TOTAL_LEN, > > c.SHI_TYPE_WIDTH, > > c.SHI_TYPE_DEPTH, > > case when b.param_cn = '月池尺寸' then a.param_value else null end as > Moonpool, > > case when b.param_cn = '最大风速' then a.param_value else null end as > > MaxWindvelocity, > > case when b.param_cn = '最大波浪高度' then a.param_value else null end as > > MaxWaveheight, > > case when b.param_cn = '气隙' then a.param_value else null end as > Airgap, > > case when b.param_cn = '设计最大作业水深' then a.param_value else null end as > > MaxOpeWaterdepth, > > case when b.param_cn =
Re: flink 1.13.1 使用hive方言,执行hive sql解析报错
你好, 能不能把你insert语句中使用到的表的DDL发一下?贴一下show create table的结果就可以了。 On Thu, Jul 29, 2021 at 9:03 PM Asahi Lee <978466...@qq.com.invalid> wrote: > hi! > 我验证了,不是else的问题,下面的sql也报同样的问题?Invalid table alias or column reference 'u' > ,我的sql里面没有'u'的名称! > CREATE CATALOG `tempo_df_hive_default_catalog` WITH( > 'type' = 'hive', > 'default-database' = 'default' > ); > USE CATALOG tempo_df_hive_default_catalog; > CREATE TABLE IF NOT EXISTS `default`.`tempo_blackhole_table` ( > f0 INT > ); > insert into cosldatacenter.dw_riginfoparam > select > c.LARGE_EQUIP_ID, > c.EQUIP_CODE, > c.EQUIP_NAME, > c.ENQUEUE_DATE, > c.SHI_TOTAL_LEN, > c.SHI_TYPE_WIDTH, > c.SHI_TYPE_DEPTH, > case when b.param_cn = '月池尺寸' then a.param_value else null end as Moonpool, > case when b.param_cn = '最大风速' then a.param_value else null end as > MaxWindvelocity, > case when b.param_cn = '最大波浪高度' then a.param_value else null end as > MaxWaveheight, > case when b.param_cn = '气隙' then a.param_value else null end as Airgap, > case when b.param_cn = '设计最大作业水深' then a.param_value else null end as > MaxOpeWaterdepth, > case when b.param_cn = '额定钻井深度' then a.param_value else null end as > DrilldepthCap, > case when b.param_cn = '钻井可变载荷' then a.param_value else null end as > DrillVL, > case when b.param_cn = '钻井水' then a.param_value else null end as > DrillWater, > case when b.param_cn = '生活水' then a.param_value else null end as > PotableWater > from cosldatacenter.ods_emp_maindata_iadc_paramvalue a > inner join cosldatacenter.ods_emp_maindata_iadc_paramdef b on a.param_id = > b.param_id > inner join cosldatacenter.ods_emp_md_large_equip c on > a.SUBJECT_ID=c.LARGE_EQUIP_ID; > INSERT INTO `default`.`tempo_blackhole_table` SELECT 1 ; > > > > > > org.apache.hadoop.hive.ql.parse.SemanticException: Line 2:178 Invalid > table alias or column reference 'u': (possible column names are: > a.paramvalue_id, a.platform_id, a.equipment_id, a.param_id, a.param_value, > a.remark, a.create_time, a.creator, a.update_time, a.update_person, > a.record_flag, a.subject_id, a.output_unit, a.show_seq, b.param_id, > b.iadc_id, b.param_code, b.param_en, b.param_cn, b.output_standard, > b.output_unit, b.param_type, b.param_value, b.remark, b.create_time, > b.creator, b.update_time, b.update_person, b.record_flag, c.large_equip_id, > c.equip_name, c.equip_type, c.equip_function, c.equip_board, c.ship_yard, > c.manufacturer_date, c.enqueue_date, c.dockrepair_date, c.scrap_date, > c.enqueue_mode, c.work_for_org, c.work_in_org, c.old_age, c.create_time, > c.creator, c.update_time, c.update_person, c.record_flag, c.data_timestamp, > c.work_unit_id, c.work_status, c.work_location, c.work_area, c.equip_code, > c.shi_main_power, c.shi_total_len, c.shi_type_width, c.shi_type_depth, > c.shi_design_draft, c.shi_total_tonnage, c.shi_load_tonnage, c.remark, > c.unit_classification1, c.unit_classification2) > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > < > xbjt...@gmail.com>; > 发送时间: 2021年7月29日(星期四) 下午3:32 > 收件人: "user-zh" > 主题: Re: flink 1.13.1 使用hive方言,执行hive sql解析报错 > > > > 看起来是sql语法报错,这里面的ELSE呢? > > 祝好, > Leonard > > > > 在 2021年7月27日,20:04,Asahi Lee <978466...@qq.com.INVALID> 写道: > > > > CASE > >WHEN mipd.`param_cn` = '月池尺寸' THEN > >mipv.`param_value` > >END AS `Moonpool` -- Best regards! Rui Li
Re: flink 1.13.1 sql hive is_generic = false 建表成功后,没有列信息
1.13不再使用is_generic来标记是不是hive表了 (改成了'connector'='hive') ,文档需要更新一下。不过还是建议DDL操作hive元数据时用hive dialect。 On Mon, Jul 26, 2021 at 5:00 PM Asahi Lee <978466...@qq.com.invalid> wrote: > 我使用flink 1.12.0版本功能是好的 > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > < > lirui.fu...@gmail.com>; > 发送时间: 2021年7月26日(星期一) 下午4:43 > 收件人: "user-zh" > 主题: Re: flink 1.13.1 sql hive is_generic = false 建表成功后,没有列信息 > > > > 你好, > > 是否能尝试一下用hive dialect建表呢? > > On Mon, Jul 26, 2021 at 2:44 PM Asahi Lee <978466...@qq.com.invalid> > wrote: > > > hi! > > 我使用flink 1.13.1版本,通过sql创建hive表,程序正常运行,表创建成功,但是没有列信息;我的程序如下: > > 我连接的是hive 2.3.6版本,使用flink-sql-connector-hive-2.3.6依赖包。 > > > > > > package com.meritdata.cloud.flink.test; > > > > > > import org.apache.flink.table.api.EnvironmentSettings; > > import org.apache.flink.table.api.TableEnvironment; > > > > > > public class Test { > > > > > > public static void main(String[] args) { > > > > > > EnvironmentSettings > bbSettings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment > tableEnvironment = > > TableEnvironment.create(bbSettings); > > > > > > > tableEnvironment.executeSql("create catalog > > myhive with (" + > > > " 'type' = 'hive', > > " + > > > " > > 'default-database' = 'default', " + > > > ")"); > > > > > > > tableEnvironment.executeSql("use catalog > > myhive"); > > > tableEnvironment.executeSql("create table if > > not exists q1 " + > > > "( id string ) " + > > > "with > > ('is_generic' = 'false')"); > > > > > > /** > > * > hive上表创建成功,没有列, 信息如下 > > * desc > formatted q1; > > * > > * > col_name > > data_type > > > comment > > * > > * > > * Table > Parameters: > > * > flink.is_generic false > > * > flink.schema.0.data-type > > VARCHAR(2147483647) > > * > flink.schema.0.name id > > * > transient_lastDdTime 1627279802 > > * > > */ > > > > > > } > > > > > > > > > > } > > > > -- > Best regards! > Rui Li -- Best regards! Rui Li
Re: flink 1.13.1 sql hive is_generic = false 建表成功后,没有列信息
你好, 是否能尝试一下用hive dialect建表呢? On Mon, Jul 26, 2021 at 2:44 PM Asahi Lee <978466...@qq.com.invalid> wrote: > hi! > 我使用flink 1.13.1版本,通过sql创建hive表,程序正常运行,表创建成功,但是没有列信息;我的程序如下: > 我连接的是hive 2.3.6版本,使用flink-sql-connector-hive-2.3.6依赖包。 > > > package com.meritdata.cloud.flink.test; > > > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableEnvironment; > > > public class Test { > > > public static void main(String[] args) { > > > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnvironment = > TableEnvironment.create(bbSettings); > > > tableEnvironment.executeSql("create catalog > myhive with (" + > " 'type' = 'hive', > " + > " > 'default-database' = 'default', " + > ")"); > > > tableEnvironment.executeSql("use catalog > myhive"); > tableEnvironment.executeSql("create table if > not exists q1 " + > "( id string ) " + > "with > ('is_generic' = 'false')"); > > > /** > * hive上表创建成功,没有列, 信息如下 > * desc formatted q1; > * > * col_name > data_type > comment > * > * > * Table Parameters: > * flink.is_generic false > * flink.schema.0.data-type > VARCHAR(2147483647) > * flink.schema.0.name id > * transient_lastDdTime 1627279802 > * > */ > > > } > > > > > } -- Best regards! Rui Li
Re: 应用初始化HiveCatalog出错 "URI is not hierarchical"
ionHandler.process(AbstractProtocol.java:868) > at org.apache.tomcat.util.net > .NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590) > at org.apache.tomcat.util.net > .SocketProcessorBase.run(SocketProcessorBase.java:49) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at > org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ExceptionInInitializerError > at > org.apache.flink.table.catalog.hive.HiveCatalog.createHiveConf(HiveCatalog.java:230) > at > org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:169) > at > org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:160) > at > org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:152) > at > com.huifu.kunpeng.service.SchemaServiceImpl.init(SchemaServiceImpl.java:41) > at > com.huifu.kunpeng.controller.SchemaController.init(SchemaController.java:53) > at > com.huifu.kunpeng.controller.SchemaController$$FastClassBySpringCGLIB$$e12cdb43.invoke() > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) > at > org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88) > at > com.huifu.kunpeng.aop.role.AuthenticationAspect.doAround(AuthenticationAspect.java:36) > at sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633) > at > org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) > at > org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) > at > com.huifu.kunpeng.controller.SchemaController$$EnhancerBySpringCGLIB$$86ad9ca0.init() > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) > at > org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) > at > org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) > at > org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878) > at > org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792) > at > org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) > at > org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) > ... 48 more > Caused by: java.lang.IllegalArgumentException: URI is not hierarchical > at java.io.File.(File.java:420) > at org.apache.hadoop.hive.conf.HiveConf.findConfigFile(HiveConf.java:176) > at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:145) > ... 85 more -- Best regards! Rui Li
Re: 请问flink 什么时候支持读写ACID的hive表
你好, Flink暂时没有计划支持hive的ACID表。目前hive connector的代码无法保证ACID语义,所以即使你去掉“Reading or writing ACID table %s is not supported”这个检查也达不到预期的效果。 是否考虑将ACID表迁移到数据湖中呢,比如iceberg有相应的迁移工具[1]。 [1] https://iceberg.apache.org/spark-procedures/#table-migration On Wed, May 19, 2021 at 1:16 PM youngysh wrote: > hi > > > 我们使用 flink.1.12 读取 ACID hive table 时报错(Reading or writing ACID table > %s is not supported),我们尝试修改源码放开这个限制也会出现后续的一些错误如(cast转换 BytesColumnVector 为 > LongColumnVector 出错)。 > 背景:目前我们生产想采用 flink 做 ETL 等数据迁移工作,对应的hive都是hive 3.0左右的版本或者hive > 2.3.6的版本,默认都是ACID的表,而且数据量很大,现在使用flink做数据迁移,如果flink只支持读取非ACID标的话,我们需要全部重建hive的表是很费力的。 > 请问一下flink什么版本有计划支持读取 ACID的hive table?或者,目前有无办法解决我这样的问题? -- Best regards! Rui Li
Re: Fwd: flink1.12.2 CLI连接hive出现异常
感觉是提交job时遇到了问题。可以看看本地SQL client的日志有没有更详细的信息。另外可以试试用yarn session模式提交。 On Fri, Apr 30, 2021 at 5:15 PM 张锴 wrote: > 我没有指定,就是执行了一条查询语句,自己生成的,但是好像没彻底成功,所以日志没说哪里的问题 > > Rui Li 于2021年4月30日周五 下午4:51写道: > > > 你好, > > > > 看错误信息是找不到application_1605840182730_29292,这个是你提交任务的时候指定的么? > > > > On Thu, Apr 29, 2021 at 1:35 PM 张锴 wrote: > > > > > 我这里生产的hive没有配置Kerberos认证 > > > > > > 张锴 于2021年4月29日周四 上午10:05写道: > > > > > > > 官网有说吗,你在哪里找到的呢 > > > > > > > > guoyb <861277...@qq.com> 于2021年4月28日周三 上午10:56写道: > > > > > > > >> 我的也有这种问题,没解决,kerberos认证的hive导致的。 > > > >> > > > >> > > > >> > > > >> ---原始邮件--- > > > >> 发件人: "张锴" > > >> 发送时间: 2021年4月28日(周三) 上午10:41 > > > >> 收件人: "user-zh" > > >> 主题: Fwd: flink1.12.2 CLI连接hive出现异常 > > > >> > > > >> > > > >> -- Forwarded message - > > > >> 发件人: 张锴 > > >> Date: 2021年4月27日周二 下午1:59 > > > >> Subject: flink1.12.2 CLI连接hive出现异常 > > > >> To: > > >> > > > >> > > > >> *使用flink1.12.2 > > CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select > > > >> 语句时就出现异常。* > > > >> [ERROR] Could not execute SQL statement. Reason: > > > >> org.apache.hadoop.ipc.RemoteException: Application with id > > > >> 'application_1605840182730_29292' doesn't exist in RM. Please check > > that > > > >> the job submission was suc > > > >> at > > > >> > > > >> > > > > > > org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382) > > > >> at > > > >> > > > >> > > > > > > org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java: > > > >> at > > > >> > > > >> > > > > > > org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561) > > > >> at > > > >> > > > >> > > > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524) > > > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025) > > > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876) > > > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822) > > > >> at java.security.AccessController.doPrivileged(Native Method) > > > >> at javax.security.auth.Subject.doAs(Subject.java:422) > > > >> at > > > >> > > > >> > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) > > > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682) > > > >> > > > >> *使用yarn logs -applicationId application_1605840182730_29292 > > > >> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。* > > > >> INFO client.RMProxy: Connecting to ResourceManager at > > > >> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050 > > > >> Unable to get ApplicationState. Attempting to fetch logs directly > from > > > the > > > >> filesystem. > > > >> Can not find the appOwner. Please specify the correct appOwner > > > >> Could not locate application logs for > application_1605840182730_29292 > > > >> > > > >> 这个如何排查呢,有遇到类似的问题的小伙伴吗 > > > > > > > > > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best regards! Rui Li
Re: Fwd: flink1.12.2 CLI连接hive出现异常
你好, 看错误信息是找不到application_1605840182730_29292,这个是你提交任务的时候指定的么? On Thu, Apr 29, 2021 at 1:35 PM 张锴 wrote: > 我这里生产的hive没有配置Kerberos认证 > > 张锴 于2021年4月29日周四 上午10:05写道: > > > 官网有说吗,你在哪里找到的呢 > > > > guoyb <861277...@qq.com> 于2021年4月28日周三 上午10:56写道: > > > >> 我的也有这种问题,没解决,kerberos认证的hive导致的。 > >> > >> > >> > >> ---原始邮件--- > >> 发件人: "张锴" >> 发送时间: 2021年4月28日(周三) 上午10:41 > >> 收件人: "user-zh" >> 主题: Fwd: flink1.12.2 CLI连接hive出现异常 > >> > >> > >> -- Forwarded message - > >> 发件人: 张锴 >> Date: 2021年4月27日周二 下午1:59 > >> Subject: flink1.12.2 CLI连接hive出现异常 > >> To: >> > >> > >> *使用flink1.12.2 CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select > >> 语句时就出现异常。* > >> [ERROR] Could not execute SQL statement. Reason: > >> org.apache.hadoop.ipc.RemoteException: Application with id > >> 'application_1605840182730_29292' doesn't exist in RM. Please check that > >> the job submission was suc > >> at > >> > >> > org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382) > >> at > >> > >> > org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java: > >> at > >> > >> > org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561) > >> at > >> > >> > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524) > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025) > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876) > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822) > >> at java.security.AccessController.doPrivileged(Native Method) > >> at javax.security.auth.Subject.doAs(Subject.java:422) > >> at > >> > >> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682) > >> > >> *使用yarn logs -applicationId application_1605840182730_29292 > >> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。* > >> INFO client.RMProxy: Connecting to ResourceManager at > >> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050 > >> Unable to get ApplicationState. Attempting to fetch logs directly from > the > >> filesystem. > >> Can not find the appOwner. Please specify the correct appOwner > >> Could not locate application logs for application_1605840182730_29292 > >> > >> 这个如何排查呢,有遇到类似的问题的小伙伴吗 > > > > > -- Best regards! Rui Li
Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic
可以发一下具体的SQL语句么(包括DDL和insert)? On Wed, Apr 21, 2021 at 5:46 PM HunterXHunter <1356469...@qq.com> wrote: > 在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic
用partition-time的话是用watermark与分区字段的timestamp对比来触发提交的,因此还需要你的source有watermark。 On Fri, Apr 16, 2021 at 9:32 AM HunterXHunter <1356469...@qq.com> wrote: > 但是用process-time是有数据的,目前用partition-time一直没成功写出过数据 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: 提交flink-sql 出现无法部署到yarn集群
可以按照log里提示的container ID去找对应的container日志来看看 On Wed, Apr 14, 2021 at 8:00 PM 张锴 wrote: > 在用flink-sql的方式连接hive时 ,出现以下错误: > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Could not deploy Yarn job cluster. > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688) > at > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Could not deploy Yarn job cluster. > at > > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481) > at > > org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81) > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905) > at > > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) > at > > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782) > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765) > at com.erwan.flinksql.FlinkConnectHive.main(FlinkConnectHive.java:49) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) > ... 11 more > Caused by: > org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The > YARN application unexpectedly switched to state FAILED during deployment. > Diagnostics from YARN: Application application_1618298202025_0017 failed 1 > times (global limit =2; local limit is =1) due to AM Container for > appattempt_1618298202025_0017_01 exited with exitCode: 2 > Failing this attempt.Diagnostics: [2021-04-14 19:04:02.506]Exception from > container-launch. > Container id: container_e13_1618298202025_0017_01_01 > Exit code: 2。 > > 由于错误原因不明显,不好排查,也不确定是到底是哪里的问题,请问有什么办法能够定位问题。 > -- Best regards! Rui Li
Re: flink sql 客户端连接hive 3.1.0出现connection refused异常
你好, 可以看一下SQL client的日志,里面应该有更详细的堆栈信息。 On Wed, Apr 14, 2021 at 10:42 AM 张锴 wrote: > flink版本1.12.2 ,在交互式界面执行flink sql 连接hive操作时,出现连接拒绝异常,内容如下: > Flink SQL> use catalog myhive; > > Flink SQL> show tables; > dim_kcl_customer_source_1h_all > mytest > > Flink SQL> select * from dim_kcl_customer_source_1h_all limit 10; > 2021-04-14 10:22:24,451 WARN org.apache.hadoop.hive.conf.HiveConf > [] - HiveConf of name hive.hook.proto.base-directory does > not exist > 2021-04-14 10:22:24,452 WARN org.apache.hadoop.hive.conf.HiveConf > [] - HiveConf of name hive.strict.managed.tables does not > exist > 2021-04-14 10:22:24,452 WARN org.apache.hadoop.hive.conf.HiveConf > [] - HiveConf of name hive.stats.fetch.partition.stats does > not exist > 2021-04-14 10:22:24,452 WARN org.apache.hadoop.hive.conf.HiveConf > [] - HiveConf of name hive.heapsize does not exist > 2021-04-14 10:22:24,466 INFO > org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying > to connect to metastore with URI thrift://test-hadoop002:9083 > 2021-04-14 10:22:24,467 INFO > org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a > connection to metastore, current connections: 2 > 2021-04-14 10:22:24,468 INFO > org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - > Connected to metastore. > 2021-04-14 10:22:24,468 INFO > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - > RetryingMetaStoreClient proxy=class > org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=hadoop > (auth:SIMPLE) retries=24 delay=5 lifetime=0 > 2021-04-14 10:22:24,609 INFO > org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a > connection to metastore, current connections: 1 > 2021-04-14 10:22:25,057 WARN > org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory [] - The > short-circuit local reads feature cannot be used because libhadoop cannot > be loaded. > 2021-04-14 10:22:25,235 INFO org.apache.hadoop.mapred.FileInputFormat > [] - Total input files to process : 1 > [ERROR] > *Could not execute SQL statement. Reason:java.net.ConnectException: > Connection refused* > > *表能查出来,执行sql语句时遇到拒绝连接,在hive里面执行同样的sql能查到数据,请问一下这块出现的问题是什么原因导致的呢。该如何排查* > -- Best regards! Rui Li
Re: 求问Hive DDL TBLPROPERTIES不生效
1. watermark的问题需要检查一下source,比如watermark是如何定义的、是不是source没数据导致watermark不前进等。 2. 小文件合并的功能Hive跟FileSystem connector都是支持的,可以参考这个文档配置一下试试: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-compaction On Fri, Apr 9, 2021 at 1:04 PM HunterXHunter <1356469...@qq.com> wrote: > 你好, > 1:我设置的时候就是 使用的 partition-time 同时 > 设定checkpoint间隔为60s。但是我发现watermark一直没有生成或者更新,导致我的数据一直无法commit。想知道 > 为什么watermark无法生成。当时使用process-time是没问题的。 > 2:因为写hive的话会有小文件的问题。所以我使用file sink来设置合并文件和控制文件大小。但是写文件是无法写hive > metastore。所以hive查不出数据。 > > 想知道有什么方法解决hive小文件问题,难道只能T+1做小文件合并吗。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: 求问Hive DDL TBLPROPERTIES不生效
你好, sink.partition-commit.trigger设置成process-time的话是不按照分区值来提取timestamp的,设置成partition-time才可以,但请注意partition-time需要通过watermark来触发。更详细的信息可以参考文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#sink-partition-commit-trigger 另外如果怀疑是语法问题导致参数没有正确设置的话,也可以在hive里describe一下这张表验证一下。 On Thu, Apr 8, 2021 at 6:07 PM eriendeng wrote: > SET table.sql-dialect=hive; > CREATE TABLE hive_catalog.test_db.test_insert_tb ( > id BIGINT, > type INT, > user_id BIGINT, > title STRING, > ts TIMESTAMP > ) PARTITIONED BY (add_date STRING, add_hour STRING) STORED AS orc > TBLPROPERTIES ( > 'sink.partition-commit.trigger' = 'process-time', > 'sink.partition-commit.delay' = '0s', > 'sink.partition-commit.policy.kind' = 'metastore,success-file', > 'partition.time-extractor.kind' = 'custom', > 'partition.time-extractor.timestamp-pattern' = '$add_date $add_hour', > 'partition.time-extractor.class' = '.YMDPartTimeExtractor' > ); > > > 如上在hive-dialect里建了一张表,启动任务后从kafka读然后写入,貌似TBLPROPERTIES里的参数一个都没生效,数据能从hdfs上看到,但是hive没有partition,分区也没有按照我class写的进行(我的class需要把分区弄成 > MMdd, HH这种日期格式,同时修复+8时区),TM > JM也没有任何exception,请问有见过这种情况的吗?是不是某个参数没搞对所以都没生效 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: flink 1.12.0 FileSystem 读取 hdfs 文件
你好, tableEnv.executeSql对DQL和DML是异步执行的,要等作业结束可以用tableEnv.executeSql.await On Wed, Apr 7, 2021 at 3:55 PM Ink足迹 wrote: > Flink 1.12.0中 Filesystem 读取 hdfs > orc文件,但是数据没有打印出来,各位大佬这是什么原因导致的?EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > String createSql = "CREATE TABLE test_jds_table (" + > " dc__key Int," + > " name String," + > " c4 String," + > " dc__time String" + > ") WITH (" + > " 'connector' = 'filesystem'," + > " 'path' = > 'hdfs://HDFS41645/usr/hive/warehouse/tmp.db/test_jds_table_d839cd09f5cc44dba8f45eee0d282ee3'," > + > " 'format' = 'orc'" + > ")"; > String createPrintTableSql = "CREATE TABLE print_table WITH ('connector' = > 'print') LIKE test_jds_table (EXCLUDING ALL)"; > String insertSql = "insert into print_table select * from > `test_jds_table`"; > tableEnv.executeSql(createSql); > tableEnv.executeSql(createPrintTableSql); > tableEnv.executeSql(insertSql); -- Best regards! Rui Li
Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic
你好, 我用你提供的这个DDL没有复现这个问题,有更详细的操作步骤么?另外如果kafka表是通过create table like创建的话有个已知问题: https://issues.apache.org/jira/browse/FLINK-21660 On Thu, Apr 1, 2021 at 4:08 PM HunterXHunter <1356469...@qq.com> wrote: > 当配置好HiveCatalog后, > SQL-Cli 也可以查到hive库表信息 > 创建kafka表: > > create table test.test_kafka( > word VARCHAR > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'xx', > 'scan.startup.mode' = 'latest-offset', > 'properties.bootstrap.servers' = 'xx', > 'properties.group.id' = 'test', > 'format' = 'json', > 'json.ignore-parse-errors' = 'true' > ); > 在 Hive里面可以查到改表 > hive > DESCRIBE FORMATTED test_kafka >... > is_generic true >. > > 但是我在 Flink SQL > select * from test.test_kafka; > 报错: > org.apache.flink.table.api.ValidationException: Unsupported options found > for connector 'kafka'. > Unsupported options: > is_generic > Supported options: > connector > format > json.fail-on-missing-field > json.ignore-parse-errors > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: 【flink sql-client 读写 Kerberos认证的hive】
那应该就是跟https://issues.apache.org/jira/browse/FLINK-20913 有关了,这个issue是1.12.2修复的,可以升级一下试试。 On Mon, Mar 8, 2021 at 2:15 PM guoyb <861277...@qq.com> wrote: > 您好! > hive.metastore.sasl.enabled 是true > > > 启动sql client的时候,可以正常读取到认证信息,并读取metastore的表名。 > > > 读和写,认证就失败了。 > > > > ---原始邮件--- > 发件人: "Rui Li" 发送时间: 2021年3月8日(周一) 中午12:12 > 收件人: "user-zh" 主题: Re: 【flink sql-client 读写 Kerberos认证的hive】 > > > Hi, > > > 从你发的stacktrace来看,走到了set_ugi方法说明client认为server没有开启kerberos。确认一下你HiveCatalog这边指定的hive-site.xml是否配置正确呢,像hive.metastore.sasl.enabled是不是设置成true了? > > On Sun, Mar 7, 2021 at 5:49 PM 861277...@qq.com <861277...@qq.com> > wrote: > > > 环境: > > flink1.12.1 > > hive2.1.0 > > CDH6.2.0 > > > > > > 【问题描述】 > > 在没开启Kerberos认证时,可以正常读写hive表 > > > > 开启Kerberos认证后, > > 启动时可以正常读取到hive metastore的元数据信息,读写不了表。 > > > > > > 【sql-client.sh embedded】 > > Flink SQL> show tables; > > dimension_table > > dimension_table1 > > test > > > > > > Flink SQL> select * from test; > > [ERROR] Could not execute SQL statement. Reason: > > org.apache.flink.connectors.hive.FlinkHiveException: Failed to > collect all > > partitions from hive metaStore > > > > > > 【完整日志 > > > /opt/cloudera/parcels/FLINK-1.12.1-BIN-SCALA_2.11/lib/flink/log/flink-root-sql-client-cdh6.com.log】 > > > > 2021-03-07 10:29:18.776 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Trying to connect to localhost/127.0.0.1:6123 > > 2021-03-07 10:29:18.777 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address 'cdh6.com/192.168.31.10': Connection > > refused (Connection refused) > > 2021-03-07 10:29:18.778 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address '/127.0.0.1': Connection refused > > (Connection refused) > > 2021-03-07 10:29:18.778 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address > '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33': > > Network is unreachable (connect failed) > > 2021-03-07 10:29:18.778 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address '/192.168.31.10': Connection refused > > (Connection refused) > > 2021-03-07 10:29:18.779 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address '/0:0:0:0:0:0:0:1%lo': Network is > > unreachable (connect failed) > > 2021-03-07 10:29:18.779 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address '/127.0.0.1': Connection refused > > (Connection refused) > > 2021-03-07 10:29:18.779 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address > '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33': > > Network is unreachable (connect failed) > > 2021-03-07 10:29:18.779 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address '/192.168.31.10': Connection refused > > (Connection refused) > > 2021-03-07 10:29:18.780 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address '/0:0:0:0:0:0:0:1%lo': Network is > > unreachable (connect failed) > > 2021-03-07 10:29:18.780 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address '/127.0.0.1': Connection refused > > (Connection refused) > > 2021-03-07 10:29:18.780 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Could not connect. Waiting for 1600 msecs before next attempt > > 2021-03-07 10:29:20.381 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Trying to connect to localhost/127.0.0.1:6123 > > 2021-03-07 10:29:20.381 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address 'cdh6.com/192.168.31.10': Connection > > refused (Connection refused) > > 2021-03-07 10:29:20.382 [main] INFO > org.apache.flink.runtime.net.ConnectionUtils > > - Failed to connect from address '/127
Re: 【flink sql-client 读写 Kerberos认证的hive】
也可能是需要这个fix:https://issues.apache.org/jira/browse/FLINK-20913 On Mon, Mar 8, 2021 at 12:11 PM Rui Li wrote: > Hi, > > > 从你发的stacktrace来看,走到了set_ugi方法说明client认为server没有开启kerberos。确认一下你HiveCatalog这边指定的hive-site.xml是否配置正确呢,像hive.metastore.sasl.enabled是不是设置成true了? > > On Sun, Mar 7, 2021 at 5:49 PM 861277...@qq.com <861277...@qq.com> wrote: > >> 环境: >> flink1.12.1 >> hive2.1.0 >> CDH6.2.0 >> >> >> 【问题描述】 >> 在没开启Kerberos认证时,可以正常读写hive表 >> >> 开启Kerberos认证后, >> 启动时可以正常读取到hive metastore的元数据信息,读写不了表。 >> >> >> 【sql-client.sh embedded】 >> Flink SQL> show tables; >> dimension_table >> dimension_table1 >> test >> >> >> Flink SQL> select * from test; >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect >> all partitions from hive metaStore >> >> >> 【完整日志 >> /opt/cloudera/parcels/FLINK-1.12.1-BIN-SCALA_2.11/lib/flink/log/flink-root-sql-client-cdh6.com.log】 >> >> 2021-03-07 10:29:18.776 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Trying to connect to localhost/127.0.0.1:6123 >> 2021-03-07 10:29:18.777 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address 'cdh6.com/192.168.31.10': Connection >> refused (Connection refused) >> 2021-03-07 10:29:18.778 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/127.0.0.1': Connection refused >> (Connection refused) >> 2021-03-07 10:29:18.778 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33': >> Network is unreachable (connect failed) >> 2021-03-07 10:29:18.778 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/192.168.31.10': Connection refused >> (Connection refused) >> 2021-03-07 10:29:18.779 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/0:0:0:0:0:0:0:1%lo': Network is >> unreachable (connect failed) >> 2021-03-07 10:29:18.779 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/127.0.0.1': Connection refused >> (Connection refused) >> 2021-03-07 10:29:18.779 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33': >> Network is unreachable (connect failed) >> 2021-03-07 10:29:18.779 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/192.168.31.10': Connection refused >> (Connection refused) >> 2021-03-07 10:29:18.780 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/0:0:0:0:0:0:0:1%lo': Network is >> unreachable (connect failed) >> 2021-03-07 10:29:18.780 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/127.0.0.1': Connection refused >> (Connection refused) >> 2021-03-07 10:29:18.780 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Could not connect. Waiting for 1600 msecs before next attempt >> 2021-03-07 10:29:20.381 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Trying to connect to localhost/127.0.0.1:6123 >> 2021-03-07 10:29:20.381 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address 'cdh6.com/192.168.31.10': Connection >> refused (Connection refused) >> 2021-03-07 10:29:20.382 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/127.0.0.1': Connection refused >> (Connection refused) >> 2021-03-07 10:29:20.383 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33': >> Network is unreachable (connect failed) >> 2021-03-07 10:29:20.383 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/192.168.31.10': Connection refused >> (Connection refused) >> 2021-03-07 10:29:20.383 [main] INFO >> org.apache.flink.runtime.net.ConnectionUtils >> - Failed to connect from address '/0:0:0:0:0:0:0:1%lo': Net
Re: 【flink sql-client 读写 Kerberos认证的hive】
slateToPlan$1.apply(StreamPlanner.scala:65) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1321) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:328) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:287) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createPipeline(ExecutionContext.java:282) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:542) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:374) > at > org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:648) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:323) > at > java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214) > at > org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144) > at > org.apache.flink.table.client.SqlClient.start(SqlClient.java:115) > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > 2021-03-07 10:29:22.302 [main] INFO hive.metastore - Connected > to metastore. > > > > 861277...@qq.com > -- Best regards! Rui Li
Re: flinksql集成hive权限管理
你好, 目前hive connector还没有支持ranger,只支持HMS端基于storage的权限控制。 On Thu, Feb 25, 2021 at 8:49 PM 阿华田 wrote: > > 目前在做基于flinksql搭建实时计算平台,使用hive的catalog管理flinksql的元数据。hive本身使用Ranger进行权限管理。但是flinksql使用hive的catalog管理元数据,用户的权限Ranger好像无法管理。各位大佬知道有什么比较好的方式解决吗? > > > | | > 阿华田 > | > | > a15733178...@163.com > | > 签名由网易邮箱大师定制 > > -- Best regards! Rui Li
Re: 通过普通ddl来读写hive
这个取决于你们自己的元数据管理系统了,Flink这边实现Catalog的各个接口对接你们的系统就行。比如在Catalog::createTable的实现里可以增加鉴权机制,判断是否允许用户建表之类的。 On Wed, Feb 24, 2021 at 11:14 AM silence wrote: > 那用自定义的catalog怎么定义hive表来读写hive呢 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: 通过普通ddl来读写hive
Hello, 因为hive本身是通过metastore来管理元数据的,所以通过HiveCatalog对接metastore里的元数据是比较自然的用法。Flink引入Catalog接口的初衷也是为了能方便的对接外部系统的元数据。如果你们用的是自己开发的元数据管理平台,也可以考虑实现自定义的Catalog来对接。 我觉得用in-memory catalog维护hive元数据有点像手动对metastore的元数据做一次snapshot。虽然避免了用户直接访问底层元数据,但使用起来并不方便,比如想要读一张分区表的话需要手动把每个分区的信息添加到in-memory catalog里。 所以如果是出于元数据安全的考虑,更好的做法应该是把catalog接入到已有鉴权机制的控制中。 On Tue, Feb 23, 2021 at 7:17 PM silence wrote: > 我理解各个公司都会有自己的元数据管理平台,hive表的创建修改都需要经过严格的权限控制在平台上进行操作,包括调度任务、实时写入任务、数据血缘等。 > 我个人觉得理想的方式是单个flink > sql的所有的connector通过自维护的元数据进行生成,不需要引入hivecatalog,使用默认的MemoryCatalog即可。 > 总结一下就是不希望引入HiveCatalog来进行hive表的读写 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: 通过普通ddl来读写hive
Hi, 尝试回答一下你提的这几个问题。 1. 不希望用户直接在metastore中建表的话,那我理解用户所能创建的hive表就只有临时表了。目前HiveCatalog还不支持临时hive表,不过社区已经有计划做了,顺利的话可以在1.13中实现。我想了解一下抛开flink不谈,你们在hive中是如何解决这个问题的呢?也是只允许用户创建临时表么?还是说通过某种权限控制的机制来限制哪些用户可以建表? 2. 针对hive metastore里已有的表,通过flink读写数据不需要修改table property。除非是希望修改表自身的属性(比如format、SerDe等等),这方面跟hive中的使用习惯是一致的。 3. 不用hive方言创建hive表可以尝试添加'is_generic'='false'参数,但前提也是要创建在HiveCatalog里。另外这种方式所能表达的语义很有限,基本只能创建简单的文本表。 4. 这个问题跟#1比较类似,也是可以通过临时表达到这个效果。 On Tue, Feb 23, 2021 at 5:58 PM silence wrote: > 你好 > 感谢回复 > 主要有以下几点原因: > 1、直接使用hive catalog进行hive表的创建修改风险太高,更希望在平台层限制hive表的创建和修改 > > 2、connector的配置是保存在hive表的DBPROPERTIES里的,这是否就意味着想通过flink往现有hive表里写数据需要先通过alter语句修改hive表的属性配置,这里不希望对用户直接暴露alter > hive的能力 > 3、使用普通的ddl可以与现有connector的定义统一风格,不需要来回切换方言 > 4、可以不用将配置信息持久化,通过GenericInMemoryCatalog使用即可 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: 通过普通ddl来读写hive
你好, 请问一下不想用HiveCatalog来读写hive表的原因是什么呢?是不希望将hive表持久化(类似临时表的效果),或者是不希望维护一个metastore server? On Tue, Feb 23, 2021 at 2:57 PM silence wrote: > 问一下社区有没有计划支持普通的ddl(不用hive的catalog)来进行读写hive表吗 > 现在不支持是有什么考虑吗 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: Re: Flink SQL 写入Hive问题请教
是的,hive表必须存在HiveCatalog里才能正常读写 On Tue, Feb 23, 2021 at 10:14 AM yinghua...@163.com wrote: > > Flink的版本是1.11.3,目前我们所有表的catalog类型都是GenericInMemoryCatalog,是不是Hive表要用HiveCatalog才行? > > > > yinghua...@163.com > > 发件人: Rui Li > 发送时间: 2021-02-23 10:05 > 收件人: user-zh > 主题: Re: Re: Flink SQL 写入Hive问题请教 > 你好, > > 用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么? > > On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心 wrote: > > > 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert > > into时创建Hive表时提示没有连接器的配置 > > Table options are: 'is_generic'='false' > > 'partition.time-extractor.timestamp-pattern'='$dt $hr' > > 'sink.partition-commit.delay'='0S' > > 'sink.partition-commit.policy.kind'='metastore,success-file' > > 'sink.partition-commit.trigger'='partition-time' at > > > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > > at > > > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > > at > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > > at > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > > at > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > > at > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691) > > at > > > com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242) > > at > com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201) > > at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at > > com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > > ... 11 more Caused by: org.apache.flink.table.api.ValidationException: > > Table options do not contain an option key 'connector' for discovering a > > connector. at > > > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > > at > > > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > > ... 37 more > > > > > > 假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2021-02-22 17:12:55,"eriendeng" 写道: > > >你这没有把dialect set成hive吧,走到了else分支。default > > >dialect是需要指定connector的,参考文档的kafka到hive代码 > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing > > > > > > > > > > > >-- > > >Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > -- > Best regards! > Rui Li > -- Best regards! Rui Li
Re: Re: Flink SQL 写入Hive问题请教
你好, 用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么? On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心 wrote: > 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert > into时创建Hive表时提示没有连接器的配置 > Table options are: 'is_generic'='false' > 'partition.time-extractor.timestamp-pattern'='$dt $hr' > 'sink.partition-commit.delay'='0S' > 'sink.partition-commit.policy.kind'='metastore,success-file' > 'sink.partition-commit.trigger'='partition-time' at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691) > at > com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242) > at com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201) > at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at > com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 11 more Caused by: org.apache.flink.table.api.ValidationException: > Table options do not contain an option key 'connector' for discovering a > connector. at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ... 37 more > > > 假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理? > > > > > > > > > > > > > > > > > > 在 2021-02-22 17:12:55,"eriendeng" 写道: > >你这没有把dialect set成hive吧,走到了else分支。default > >dialect是需要指定connector的,参考文档的kafka到hive代码 > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing > > > > > > > >-- > >Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: Flink SQL temporal table join with Hive 报错
Hi, 那join的语句是怎么写的呢? On Mon, Feb 8, 2021 at 2:45 PM macia kk wrote: > 图就是哪个报错 > > 建表语句如下,表示公共表,我也没有改的权限. > > CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT > 'country', `currency` string COMMENT 'currency', `exchange_rate` > decimal(25,10) COMMENT 'exchange rate') > PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd') > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io > .parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ( > 'serialization.format' = '1' > ) > > > Rui Li 于2021年2月8日周一 下午2:17写道: > > > 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么? > > > > On Mon, Feb 8, 2021 at 10:33 AM macia kk wrote: > > > > > Currently the join key in Temporal Table Join can not be empty. > > > > > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错 > > > > > > [image: image.png] > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best regards! Rui Li
Re: Flink SQL temporal table join with Hive 报错
你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么? On Mon, Feb 8, 2021 at 10:33 AM macia kk wrote: > Currently the join key in Temporal Table Join can not be empty. > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错 > > [image: image.png] > -- Best regards! Rui Li
Re: Flink sql 1.12写入hive报metastore失败
pache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) > at > cn.wesure.stream.flink.sql.FlinkSqlEngine.process(FlinkSqlEngine.java:129) > at > cn.wesure.stream.flink.FlinkSqlEngineApp.main(FlinkSqlEngineApp.java:26) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:248) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1058) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) > at > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136) > > 1.12版本的lib目录如下: > -rw-r--r-- 1 dauser dauser 6322351 Jan 26 17:51 > flink-connector-hive_2.12-1.12.1.jar > -rw-r--r-- 1 dauser dauser 91745 Jan 10 08:26 flink-csv-1.12.1.jar > -rw-r--r-- 1 dauser dauser 105273329 Jan 10 08:29 > flink-dist_2.12-1.12.1.jar > -rw-r--r-- 1 dauser dauser137005 Jan 10 08:25 flink-json-1.12.1.jar > -rw-r--r-- 1 dauser dauser 7709741 Jul 29 15:33 > flink-shaded-zookeeper-3.4.14.jar > -rw-r--r-- 1 dauser dauser 34748023 Jan 10 08:28 > flink-table_2.12-1.12.1.jar > -rw-r--r-- 1 dauser dauser 3653 Jan 10 08:28 > flink-table-blink_2.12-1.12.1.jar > -rw-rw-r-- 1 dauser dauser 40603464 Jan 26 11:43 hive-exec-3.1.0.jar > -rw-rw-r-- 1 dauser dauser313702 Jan 26 17:43 libfb303-0.9.3.jar > -rw-r--r-- 1 dauser dauser290339 Jan 26 11:41 logback-classic-1.2.3.jar > -rw-r--r-- 1 dauser dauser471901 Jan 26 11:41 logback-core-1.2.3.jar > > 已配置kerberos认证,在1.11.1版本中能认证并提交成功到yarn上执行。 > 请大佬帮忙看下 > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
Hi, 估计是Hadoop跟hive的guava版本冲突,Hadoop-3.3依赖的版本是27 [1],hive-3.1.2依赖的版本是19 [2]。另外请注意hive-3.1.2依赖的Hadoop版本是3.1.0 [3],一般不建议runtime的Hadoop版本高于hive依赖的版本。 解决方案一是在hive-exec里对guava做relocation,这个需要自己手动给hive-exec重新打包。 另一个办法是降低Hadoop版本,这里不一定需要降低集群的Hadoop版本,而是仅仅降低flink和hive这边用到的Hadoop版本,相当于用老的Hadoop client去访问新的Hadoop server,这个小版本的兼容性一般来说是没问题的。 [1] https://issues.apache.org/jira/browse/HADOOP-16210 [2] https://github.com/apache/hive/blob/rel/release-3.1.2/pom.xml#L147 [3] https://github.com/apache/hive/blob/rel/release-3.1.2/pom.xml#L150 On Mon, Jan 25, 2021 at 2:12 PM yujianbo <15205029...@163.com> wrote: > 请教一下大佬后来如何解决,我的hadoop和hive版本跟您一致。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: FlinkSQL1.12查询hive表很快finished;No more splits available
Hi, 通过-l参数指定flink-sql-connector-hive-1.2.2 jar包是可以的,这个包里有hive-1.2.2的依赖(包含hive-common、hive-exec等),hive相关的依赖仅需要这一个包,不用额外添加orc或者Parquet的依赖了。 关于添加hive-site.xml,建议的方式是通过HiveCatalog的参数来指定 [1]。 从你之前发的stacktrace上来看,确实可能存在hive-common冲突,比如发生异常的ValidReadTxnList.readFromString(ValidReadTxnList.java:142),在hive-1.2.2版本中是没有这一行的 [2]。所以可能是你的classpath中有另外一个版本的hive-common。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/#connecting-to-hive [2] https://github.com/apache/hive/blob/rel/release-1.2.2/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java On Mon, Jan 25, 2021 at 1:44 PM 赵一旦 wrote: > 基于这个回答,还有另一个英文email中有个人说我的hive-common和hive-exec不一致的问题。 > > 我分析下来,我本身没依赖任何hive-common、hive-exec。唯一引入可能是flink-sql-connector-hive-1.2.2_2.11_1.12.0中的,我看了pom,其中orc的依赖部分是去除了hive的依赖的。然后我还单独引了一个flink-sql-orc的包。刚刚分析了下,本社flink-sql-orc这个包是为了引入orc相关依赖,而这个包中是没有去除orc的传递依赖hive的。我目前尝试了个方案,居然OK了,拿出来大家分析下能确定原因最好,不能就算了,反正问题估计是可以解决了我。 > > > 解决方式是直接不再依赖flink-sql-orc那个包。因为本身我按照官方文档加了flink-sql-connector-hive的包,同时我分析了这个包内已经shade了orc的包。因为并不需要单独搞一个flink-sql-orc的包。刚刚初步试了下,没问题。还没继续多实验。 > > > 赵一旦 于2021年1月25日周一 下午12:59写道: > > > 我hive版本应该是1.2.1,我看spark部分依赖的1.2.1的包。 > > 此外,关于hive配置。此处我需要问下,flink集群需要有hive的依赖嘛是?我flink集群本身没加任何hive依赖。 > > > > > 只是在flink的sql-client启动的时候通过-l参数指定了部分包,这个包是基于flink官网文档给的那个flink-sql-connector-hive-1.2.2。 > > > > > 此外,在flink-client端的conf中(即classpath中)加了hive-site.xml配置,内部也仅指定了最基础的一些关于metastore的连接信息。 > > > > Rui Li 于2021年1月25日周一 上午11:32写道: > > > >> 你好, > >> > >> 关于分区字段的filter,flink与hive的隐式类型转换规则不同,建议在写where条件时按照分区字段的类型来指定常量。 > >> 关于读ORC的异常,请问你的hive版本是多少呢?另外hive配置中是否指定过hive.txn.valid.txns参数? > >> > >> On Sun, Jan 24, 2021 at 6:45 AM 赵一旦 wrote: > >> > >> > 补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more > split。 > >> > 其次(2)去除这部分问题后,目前可以发现有split了,但是报了orc相关的错误。并且提交SQL会导致JM直接失败。JM日志如下: > >> > > >> > 2021-01-24 04:41:24,952 ERROR > >> > org.apache.flink.runtime.util.FatalExitExceptionHandler [] - > >> > FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an > >> > uncaught exception. Stopping the process... > >> > > >> > java.util.concurrent.CompletionException: > >> > org.apache.flink.util.FlinkRuntimeException: Failed to start the > >> > operator coordinators > >> > at > >> > > >> > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > >> > ~[?:1.8.0_251] > >> > at > >> > > >> > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > >> > ~[?:1.8.0_251] > >> > at > >> > > >> > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722) > >> > ~[?:1.8.0_251] > >> > at > >> > > >> > java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731) > >> > ~[?:1.8.0_251] > >> > at > >> > > >> > java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) > >> > ~[?:1.8.0_251] > >> > at > >> > > >> > org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935) > >> > ~[flink-dists-extended_2.11-1.12.0.jar:?] > >> > at > >> > > >> > org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801) > >> > ~[flink-dists-extended_2.11-1.12.0.jar:?] > >> > at > >> > > >> > org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357) > >> > ~[flink-dists-extended_2.11-1.12.0.jar:?] > >> > at > >> > > >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383) > >> > ~[flink-dists-extended_2.11-1.12.0.jar:?] > >> > at > >> > > >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > >> > ~[flink-dists-extended_2.11-1.12.0.jar:?] > >> > at > >> > > >> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88) > >> > ~[flink-dists-extended_2.11-1.12.0.jar:?] > >> > at > >> > > >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > >> > ~[flink-dists-extended_2.11-1.12.0.jar:?] > >>
Re: FlinkSQL1.12查询hive表很快finished;No more splits available
2.0.jar:?] > at > org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567) > ~[flink-dists-extended_2.11-1.12.0.jar:?] > at > org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944) > ~[flink-dists-extended_2.11-1.12.0.jar:?] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_251] > ... 27 more > Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 > at > org.apache.hadoop.hive.common.ValidReadTxnList.readFromString(ValidReadTxnList.java:142) > ~[?:?] > at > org.apache.hadoop.hive.common.ValidReadTxnList.(ValidReadTxnList.java:57) > ~[?:?] > at org.apache.hadoop.hive.ql.io > .orc.OrcInputFormat$Context.(OrcInputFormat.java:421) > ~[?:?] > at org.apache.hadoop.hive.ql.io > .orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:983) > ~[?:?] > at org.apache.hadoop.hive.ql.io > .orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048) > ~[?:?] > at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:86) > ~[?:?] > at > org.apache.flink.connectors.hive.HiveSourceFileEnumerator.enumerateSplits(HiveSourceFileEnumerator.java:57) > ~[?:?] > at > org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:140) > ~[flink-table_2.11-1.12.0.jar:1.12.0] > at > org.apache.flink.connectors.hive.HiveSource.createEnumerator(HiveSource.java:115) > ~[?:?] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:119) > ~[flink-dists-extended_2.11-1.12.0.jar:?] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:308) > ~[flink-dists-extended_2.11-1.12.0.jar:?] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:72) > ~[flink-dists-extended_2.11-1.12.0.jar:?] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:182) > ~[flink-dists-extended_2.11-1.12.0.jar:?] > at > org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1094) > ~[flink-dists-extended_2.11-1.12.0.jar:?] > at > org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567) > ~[flink-dists-extended_2.11-1.12.0.jar:?] > at > org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944) > ~[flink-dists-extended_2.11-1.12.0.jar:?] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_251] > ... 27 more > 2021-01-24 04:41:24,963 INFO org.apache.flink.runtime.blob.BlobServer > [] - Stopped BLOB server at 0.0.0.0:13146 > -- Best regards! Rui Li
Re: flink 写hive decimal类型报错
你好,有设置过table.exec.hive.fallback-mapred-writer参数么?可以把它设置成true再试试。 On Wed, Jan 20, 2021 at 4:39 PM kandy.wang wrote: > java.lang.NoSuchMethodError: > org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J > > at > org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010) > > at > org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:99) > > at > org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:159) > > at > org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56) > > at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557) > > at > org.apache.flink.orc.writer.OrcBulkWriter.addElement(OrcBulkWriter.java:58) > > at > org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:589) > > at > org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:585) > > at > org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48) > > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:209) > > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290) > > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > > at > org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140) > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > at StreamExecCalc$154.processElement(Unknown Source) > > 用的是flink-sql-connector-hive-2.3.6_2.11-1.12-SNAPSHOT.jar,公司的Hive也是这个版本,可能是什么原因导致? -- Best regards! Rui Li
Re: 回复:sql-client配置hive启动不了
Hi, 用table api的话可以设置flink的security参数来指定principal和keytab [1]。 SQL client的模式试一下启动前手动做kinit行不行吧 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems On Fri, Jan 8, 2021 at 10:06 AM amenhub wrote: > 啊?确实是带kerberos的hms,那请问有其他的解决办法吗 > > > > > 发件人: 叶贤勋 > 发送时间: 2021-01-08 10:03 > 收件人: user-zh@flink.apache.org > 主题: 回复:sql-client配置hive启动不了 > HMS是不是带kerberos认证的? > 目前社区hive connector不支持访问Kerberos的HMS > > > > > 在2021年01月7日 18:39,amenhub 写道: > 在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql > client > > 报错信息:https://imgchr.com/i/smQrlj > > flink 版本,1.12 > hive 版本,3.1.0 > > > > -- Best regards! Rui Li
Re: Flink SQL>查询的hive表数据全部为NULL
Hello, Flink和Hive版本是什么呢?ORC表的数据是否有过schema evolution? On Mon, Jan 4, 2021 at 9:19 AM Jacob <17691150...@163.com> wrote: > Dear All, > > Flink SQL>select * from table1; > > > 在Flink客户端查询hive数据,结果每个字段全为NULL,但数据条数是对的,select > count是正确的,查具体数据就是NULL,不知何故?同样的查询在hive客户端是可以查到数据的 > > > hive表时orc文件load的数据。 > > > > - > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: Flink 操作hive 一些疑问
不太确定是不是能整合到一个job里,你可以试试看用StatementSet能否实现,比如添加两条INSERT语句,一条是写入hive,一条是从hive里查询数据把结果写到其他的表。 On Thu, Dec 24, 2020 at 4:35 PM Jacob <17691150...@163.com> wrote: > Hi, > > 谢谢回复 > > 对,也可以这样理解,总体分为两部分,先处理流消息,每隔15min写进hive表。然后再做mapreduce处理上步15min的数据。 > > 目前的现状是: > 第一步用flink处理,第二步是一个定时job去处理上一步的数据。 > > 改善计划: > > 想整合这两步,都使用flin处理,flink新版本对hive有支持,就不用再使用MapReduce了,现在就是不知道怎样平滑地在同一个Job中执行。 > > > > > - > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: Flink catalog+hive问题
Hi, 你贴的是HDFS的权限控制,那应该就是基于storage的了。可以在HMS端开启验证,这样HiveCatalog去连接HMS的时候会生效。开启方式参考官网: https://cwiki.apache.org/confluence/display/Hive/Storage+Based+Authorization+in+the+Metastore+Server On Thu, Dec 24, 2020 at 2:14 PM 19916726683 <19916726...@163.com> wrote: > 可以参考下这个 > > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html > 贴的代码是org.apache.hadoop.hive.io.HdfsUtils 的setFullFileStatus 方法 > Original Message > Sender:Rui lilirui.fu...@gmail.com > Recipient:user-zhuser...@flink.apache.org > Date:Thursday, Dec 24, 2020 11:33 > Subject:Re: Flink catalog+hive问题 > > > Hello, > 你贴的图看不到了。可以贴一下参考的官网链接。hive至少支持三种不同的authorization模式,flink目前对接hive时只有用storage > based authorization会生效。 On Thu, Dec 24, 2020 at 10:51 AM 19916726683 > 19916726...@163.com wrote: hive的官网有介绍ACL,如何继承权限关系。源码在Hive- HDFSUtils类中 > 核心代码应该是上面的这点。 Original Message *Sender:* Rui lilirui.fu...@gmail.com > *Recipient:* user-zhuser...@flink.apache.org *Date:* Wednesday, Dec 23, > 2020 19:41 *Subject:* Re: Flink catalog+hive问题 > hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1] > 会生效 [1] > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer > On Wed, Dec 23, 2020 at 4:34 PM 19916726683 19916726...@163.com wrote: > spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式 Original > Message Sender:guaishushu1103@163.comguaishushu1...@163.com > Recipient:user-zhuser...@flink.apache.org Date:Wednesday, Dec 23, > 2020 15:53 Subject:Flink catalog+hive问题 在用flink > catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗? > guaishushu1...@163.com -- Best regards! Rui Li -- Best regards! > Rui Li -- Best regards! Rui Li
Re: Flink catalog+hive问题
Hello, 你贴的图看不到了。可以贴一下参考的官网链接。hive至少支持三种不同的authorization模式,flink目前对接hive时只有用storage based authorization会生效。 On Thu, Dec 24, 2020 at 10:51 AM 19916726683 <19916726...@163.com> wrote: > hive的官网有介绍ACL,如何继承权限关系。源码在Hive-> HDFSUtils类中 核心代码应该是上面的这点。 > > Original Message > *Sender:* Rui Li > *Recipient:* user-zh > *Date:* Wednesday, Dec 23, 2020 19:41 > *Subject:* Re: Flink catalog+hive问题 > > hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1] 会生效 > > [1]https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer > > On Wed, Dec 23, 2020 at 4:34 PM 19916726683 <19916726...@163.com> wrote: > > > spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式 > > > > > > Original Message > > Sender:guaishushu1103@163.comguaishushu1...@163.com > > Recipient:user-zhuser...@flink.apache.org > > Date:Wednesday, Dec 23, 2020 15:53 > > Subject:Flink catalog+hive问题 > > > > > > 在用flink > > catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗? > > guaishushu1...@163.com > > > > -- > Best regards! > Rui Li > > -- Best regards! Rui Li
Re: Flink 操作hive 一些疑问
Hi, 是说写数据的是一个流作业,读数据的是一个批作业么? On Tue, Dec 22, 2020 at 5:51 PM Jacob <17691150...@163.com> wrote: > Dear all, > > 我目前有个Flink job,执行完所以业务逻辑后生成了一些业务数据,然后将这些数据以ORC格式写到hdfs上,并调用hive api > 将orc文件load到Hive表,至此flink job的工作结束。 > > 后面,其他Java定时程序做Mapreduce,对上一步写进hive的数据进行后续操作。 > > 现在升级了Flink版本,Flink可以直接操作hive,不再依赖于Mapreduce。 > > 但这样一来,是不是需要两个flink job ,一个用来生成业务数据,一个用来操作hive 来处理这些业务数据 > > 因为两个job的执行环境不一样,如果不操作hive,是这样的操作环境 > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.execute("my job"); > 如果操作hive,就需要构造这样的操作的环境 > > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > .. > tableEnv.executeSql(hql); > > 有没有什么通用的方案,让这两个job合二为一呢?我想要的效果时,当生成完业务数据后,直接操作hive,取代mapreduce的工作。 > > > > - > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: Re: flink1.11.2写hive分区表,hive识别不到分区
流数据写hive分区表是需要额外的参数配置的。Flink 1.11里具体的参数可以参考这两个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#streaming-sink On Wed, Dec 23, 2020 at 9:17 AM kingdomad wrote: > 分区用的是记录中的字段,没有用到processing time或者event time去生成分区。 > 发现只要给hive的表加上以下这三个属性就可以马上提交分区到metastore了。 > 'sink.partition-commit.trigger'='process-time' > 'sink.partition-commit.delay'='0s' > 'sink.partition-commit.policy.kind'='metastore,success-file' > > > > > > > > > > > > > > -- > > kingdomad > > > > > > > > 在 2020-12-21 23:27:49,"赵一旦" 写道: > >即使不是flink写入,其他方式写入也需要这样做的哈。 > > > >r pp 于2020年12月21日周一 下午9:28写道: > > > >> 程序中,创建表后,执行命令。 > >> > >> kingdomad 于2020年12月21日周一 下午4:55写道: > >> > >> > > >> > flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。 > >> > 需要执行msck repair table修复分区表后,hive才能读取到数据。 > >> > 求助大佬,要如何解决。 > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > -- > >> > > >> > kingdomad > >> > > >> > > >> > -- Best regards! Rui Li
Re: flink 1.11.2 创建hive表的问题
只是日志里有异常信息还是说DDL会执行失败呢?另外可以贴一下日志里的异常堆栈,看看是哪里打出来的。 On Tue, Dec 22, 2020 at 2:41 PM 曹武 <14701319...@163.com> wrote: > 大佬好,我在使用create table if not > exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table > bm_tsk_001 already exists异常,查看源码发现if not > exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: Flink catalog+hive问题
hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1] 会生效 [1] https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer On Wed, Dec 23, 2020 at 4:34 PM 19916726683 <19916726...@163.com> wrote: > spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式 > > > Original Message > Sender:guaishushu1103@163.comguaishushu1...@163.com > Recipient:user-zhuser...@flink.apache.org > Date:Wednesday, Dec 23, 2020 15:53 > Subject:Flink catalog+hive问题 > > > 在用flink > catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗? > guaishushu1...@163.com -- Best regards! Rui Li
Re: flink1.11.2写hive分区表,hive识别不到分区
具体是怎么写hive的呢? On Mon, Dec 21, 2020 at 11:28 PM 赵一旦 wrote: > 即使不是flink写入,其他方式写入也需要这样做的哈。 > > r pp 于2020年12月21日周一 下午9:28写道: > > > 程序中,创建表后,执行命令。 > > > > kingdomad 于2020年12月21日周一 下午4:55写道: > > > > > > > > flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。 > > > 需要执行msck repair table修复分区表后,hive才能读取到数据。 > > > 求助大佬,要如何解决。 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > kingdomad > > > > > > > > > -- Best regards! Rui Li
Re: 关于flink-sql 元数据问题
Hi, 调用tableEnv.executeSql("create table .")以后表就已经创建了,不需要再调用tableEnv.execute。execute方法已经deprecate,建议统一使用executeSql哈 On Fri, Dec 11, 2020 at 7:23 PM JasonLee <17610775...@163.com> wrote: > hi > Flink SQL 建的表支持用 hive 的 catalog 来管理元数据,是否可以满足你的需求 ? > > > > - > Best Wishes > JasonLee > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: 回复: 生产hive sql 迁移flink 11 引擎,碰到的问题
Hi, 我会尝试下让HiveModule支持低版本的hive: https://issues.apache.org/jira/browse/FLINK-20563 On Mon, Dec 7, 2020 at 5:01 PM 莫失莫忘 <565094...@qq.com> wrote: > 找到配置module的地方了。在 sql-client-defaults.yaml 配置 module 就可以了。但是我的hive是 Hive > 1.1.0-cdh5.13.1 版本的。不支持listBuiltInFunctions()。报错 Listing built in functions > are not supported until Hive 1.2.0。所以还是无法使用 hive的内置函数 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题
Hello, 我试了一下用batch和streaming的方式写hive-2.1.1的orc表,batch模式没发现啥问题。在streaming模式下,如果把table.exec.hive.fallback-mapred-writer设为false时,会有依赖冲突,这是个已知问题,把这个参数设为true(默认值)应该可以避免。 另外我这边Hadoop的环境是2.7的,你的Hadoop是3.x么? On Fri, Dec 4, 2020 at 9:27 PM Rui Li wrote: > Hi, > > 现在CDC的数据是没办法直接对接hive的,目前流式数据写hive只能是insert-only的。 > > On Fri, Dec 4, 2020 at 10:56 AM yang xu <316481...@qq.com> wrote: > >> Hi >> 如果不支持ACID,那如果监听binlog日志的更新和删除操作需要另外写任务来处理么,如何才能做到真的批流统一 >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > -- > Best regards! > Rui Li > -- Best regards! Rui Li
Re: 生产hive sql 迁移flink 11 引擎,碰到的问题
Hi, 目前加载HiveModule可以使用(大部分)hive内置函数,也能解决调用内置函数时的类型转换问题。不过更全面的语法兼容还需要等FLIP-152实现了才能支持,欢迎关注。 On Fri, Dec 4, 2020 at 8:44 PM Jark Wu wrote: > Hi, > > Flink SQL 1.11 暂时还不兼容 Hive SQL 语法。这个功能的设计,最近才在社区中讨论,预计1.13中支持。可以关注下这个 > design 的讨论: > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html > > > Best, > Jark > > On Fri, 4 Dec 2020 at 11:45, 莫失莫忘 wrote: > > > 最近尝试把一个生产 hive sql 任务,执行引擎切换成 flink 1.11.2 ,发现flink 11 对hive > > SQL的支持有下列问题1、不支持 双引号 表示字符串 > > 2、不支持 != 表示不等运算 > > 3、不支持 类型隐式转换 > > 4、不支持 split 函数 > > 5、hive 不区分大小写,flink区分大小写 > > 6、join右表 不支持是一个子查询(Calcite bug > > https://issues.apache.org/jira/browse/CALCITE-2152) > > 7、不支持 create table table1 as select * from pokes; 中的 as > > > > > > > > 暂时只测到这些问题。总体感觉flink11 对 hive SQL的语句支持还不够,无法把已有离线 hive sql 任务直接 切换到flink > 引擎。 > -- Best regards! Rui Li
Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题
Hi, 现在CDC的数据是没办法直接对接hive的,目前流式数据写hive只能是insert-only的。 On Fri, Dec 4, 2020 at 10:56 AM yang xu <316481...@qq.com> wrote: > Hi > 如果不支持ACID,那如果监听binlog日志的更新和删除操作需要另外写任务来处理么,如何才能做到真的批流统一 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题
Hi, 我会找个hive 2.1.1的环境来复现一下这个问题。不过首先要说明的是,目前flink不支持hive的ACID表,即使你这个例子的数据写成功了也不满足ACID的语义,在hive那边可能也读不了。 On Thu, Dec 3, 2020 at 5:23 PM yang xu <316481...@qq.com> wrote: > Hi Rui Li > lib 下包如下: > flink-csv-1.11.2.jar > flink-dist_2.11-1.11.2.jar > flink-json-1.11.2.jar > flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar > flink-shaded-zookeeper-3.4.14.jar > flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar > flink-table_2.11-1.11.2.jar > flink-table-api-java-bridge_2.11-1.11.2.jar > flink-table-blink_2.11-1.11.2.jar > flink-table-planner-blink_2.11-1.11.2.jar > log4j-1.2-api-2.12.1.jar > log4j-api-2.12.1.jar > log4j-core-2.12.1.jar > log4j-slf4j-impl-2.12.1.jar > > 写hive的语句就是简单的insert: > insert into hive_t1 SELECT address FROM users > > 另外建表语句如下: > create table hive_t1(address string) > clustered by (address) into 8 buckets > stored as orc TBLPROPERTIES ('transactional'='true','orc.compress' = > 'SNAPPY'); > > 非常感谢你的解答! > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: 为什么要关闭calcite的隐式转换功能
Hi, 我理解可能是calcite隐式类型转换功能还比较新,暂时没有启用。不过即使开启 了跟hive的隐式转换逻辑也不一定完全一致,比如某些hive允许的转换calcite可能不允许。目前社区也在做hive语法兼容的工作,这个功能有了以后迁移hive任务会更容易。 On Wed, Dec 2, 2020 at 6:43 PM tangshiwei wrote: > 目前flink sql,flink hive > sql都不支持隐式转换功能。我们在调试的时候发现其实calcite本身是支持的。但是flink这边强制关闭了。而hive本身是支持隐式转换的。这导致我们的hive任务无法迁移到flink上执行。请问关闭的原因是什么?如果我们这边开启会带来什么问题吗? -- Best regards! Rui Li
Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题
Hi, 你的flink lib下都添加了哪些依赖呢,另外出问题的SQL是怎么写的? On Thu, Dec 3, 2020 at 4:15 PM yang xu <316481...@qq.com> wrote: > flink 版本1.11.2 > hive 版本2.1.1 基于cdh 6.2.1 > 写普通表或parquet没问题,写orc报如下错误: > < > http://apache-flink.147419.n8.nabble.com/file/t1150/flink_hive%E5%8C%85%E5%86%B2%E7%AA%81.png> > > > 也看到其它邮件列表说修改: > flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar > OrcFile: > WriterVersion CURRENT_WRITER = WriterVersion.HIVE_13083 > 重新编译即可,但是这样尝试之后还是报同样的错误,是Hive必须升级到3.x版本么? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: flink使用hive udf函数
Hi, 这是一个已知问题 [1][2],新版本中我们只是简单的把这几个函数在hive module里禁掉了 [3],建议先用flink的函数来绕一下。 [1] https://issues.apache.org/jira/browse/FLINK-16688 [2] https://issues.apache.org/jira/browse/FLINK-16618 [3] https://issues.apache.org/jira/browse/FLINK-18995 On Tue, Nov 24, 2020 at 11:54 AM 酷酷的浑蛋 wrote: > Flink-1.11.1, hive-2.2.0 > 在使用current_timestamp或者current_date函数时会报 > Caused by: java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51) > at > org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141) > > > > -- Best regards! Rui Li
Re: 关于Catalog的建议
Hi, FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。 关于你的两个问题: 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog [1],文档也许是可以说的更明确一些。 2. 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog On Mon, Nov 23, 2020 at 7:41 PM 赵一旦 wrote: > 目前Flink提供memory、jdbc、hive这3种catalog。 > 感觉实际使用中,可以使用如下几种方案。 > > (1)选择memory catalog,然后每次sql都带上自己的相关DDL。 > (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。 > > 方案1和方案2各有优缺点。 > 方案1的优点: > 比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka > > topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的) > 方案1的缺点: > 很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。 > > -然后,我的问题来了。 > > 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。 > 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。 > > > 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。 > > > 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。 > -- Best regards! Rui Li
Re: Flink 读取Hive,报错 :Could not read the user code wrapper: invalid type code: 9C。
eam.java:422) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > ... 23 more > > End of exception on server side>] > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more -- Best regards! Rui Li
Re: flink-1.11 写 hive 报错
at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: > Failed to create Hive RecordWriter > at > > org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58) > at > > org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151) > ... 40 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55) > ... 41 more > Caused by: java.lang.NoSuchFieldError: IGNORE_CLIENT_LOCALITY > at > org.apache.hadoop.hdfs.DFSOutputStream.(DFSOutputStream.java:204) > at > org.apache.hadoop.hdfs.DFSOutputStream.(DFSOutputStream.java:247) > at > > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:313) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1182) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1161) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1099) > at > > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:464) > at > > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:461) > at > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:475) > at > > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:402) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) > at > > org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:218) > at > > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:312) > at > > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) > at > org.apache.hadoop.hive.ql.io > .parquet.write.ParquetRecordWriterWrapper.(ParquetRecordWriterWrapper.java:67) > at > org.apache.hadoop.hive.ql.io > .parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:126) > at > org.apache.hadoop.hive.ql.io > .parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:115) > at > org.apache.hadoop.hive.ql.io > .HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:284) > ... 45 more > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: flink mysql cdc + hive streaming疑问
.mode' = 'earliest-offset',\n" + > >>>>> " 'properties.bootstrap.servers' = > >>>>> 'localhost:9092',\n" + > >>>>> " 'format' = 'changelog-json'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time > >>>>> \n" + > >>>>> "FROM cdc.team"); > >>>>> > >>>>> // 定义带op字段的stream > >>>>> Properties properties = new Properties(); > >>>>> properties.setProperty("bootstrap.servers", > "localhost:9092"); > >>>>> properties.setProperty("group.id", "test"); > >>>>> > >>>>> FlinkKafkaConsumerBase consumer = new > >>>>> FlinkKafkaConsumer<>( > >>>>> "team", > >>>>> new SimpleStringSchema(), > >>>>> properties > >>>>> ).setStartFromEarliest(); > >>>>> > >>>>> DataStream ds = streamEnv.addSource(consumer); > >>>>> > >>>>> String[] fieldNames = {"team_id", "team_name", "create_time", > >>>>> "update_time", "op"}; > >>>>> TypeInformation[] types = {Types.INT, Types.STRING, > >>>>> Types.STRING, Types.STRING, Types.STRING}; > >>>>> DataStream ds2 = ds.map(str -> { > >>>>> JSONObject jsonObject = JSON.parseObject(str); > >>>>> String op = jsonObject.getString("op"); > >>>>> JSONObject data = jsonObject.getJSONObject("data"); > >>>>> int arity = fieldNames.length; > >>>>> Row row = new Row(arity); > >>>>> row.setField(0, data.get("team_id")); > >>>>> row.setField(1, data.get("team_name")); > >>>>> row.setField(2, data.get("create_time")); > >>>>> row.setField(3, data.get("update_time")); > >>>>> String operation = getOperation(op); > >>>>> row.setField(4, operation); > >>>>> > >>>>> return row; > >>>>> }, new RowTypeInfo(types, fieldNames)); > >>>>> > >>>>> tableEnv.registerDataStream("merged_team", ds2); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > >>>>> > >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time STRING,\n" + > >>>>> " update_time STRING,\n" + > >>>>> " op STRING\n" + > >>>>> //") PARTITIONED BY (\n" + > >>>>> //"ts_date STRING,\n" + > >>>>> //"ts_hour STRING,\n" + > >>>>> //"ts_minute STRING\n" + > >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > >>>>> " 'sink.partition-commit.trigger' = > >>>>> 'partition-time',\n" + > >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + > >>>>> " 'sink.partition-commit.policy.kind' = > >>>>> 'metastore,success-file',\n" + > >>>>> " 'partition.time-extractor.timestamp-pattern' = > >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time, > >>>>> op \n" + > >>>>> //" DATE_FORMAT(TO_TIMESTAMP(create_time, '-MM-dd > >>>>> HH:mm:ss'), 'MMdd') as ts_date, \n" + > >>>>> //" DATE_FORMAT(TO_TIMESTAMP(create_time, '-MM-dd > >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + > >>>>> //" DATE_FORMAT(TO_TIMESTAMP(create_time, '-MM-dd > >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + > >>>>> "FROM merged_team"); > >>>>> tableEnv.execute("MysqlCDC2Hive2"); > >>>>> > >>>>> streamEnv.execute(""); > >>>>> } > >>>>> > >>>>> private static String getOperation(String op) { > >>>>> String operation = "INSERT"; > >>>>> for (RowKind rk : RowKind.values()) { > >>>>> if (rk.shortString().equals(op)) { > >>>>> switch (rk) { > >>>>> case UPDATE_BEFORE: > >>>>> case UPDATE_AFTER: > >>>>> operation = "UPDATE"; > >>>>> break; > >>>>> case DELETE: > >>>>> operation = "DELETE"; > >>>>> break; > >>>>> case INSERT: > >>>>> default: > >>>>> operation = "INSERT"; > >>>>> break; > >>>>> } > >>>>> break; > >>>>> } > >>>>> } > >>>>> return operation; > >>>>> } > >>>>> } > >>>>> > >>>>> Jark Wu 于2020年10月31日周六 下午1:45写道: > >>>>> > >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive > >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 > >>>>>> > >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming > >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive > >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 > >>>>>> > >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR > >>>>>> > >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive > >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 > >>>>>> > >>>>>> Best, > >>>>>> Jark > >>>>>> > >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <15927482...@163.com> wrote: > >>>>>> > >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > >>>>>>> > >>>>>>> > >>>>>>> | | > >>>>>>> 罗显宴 > >>>>>>> | > >>>>>>> | > >>>>>>> 邮箱:15927482...@163.com > >>>>>>> | > >>>>>>> > >>>>>>> 签名由 网易邮箱大师 定制 > >>>>>>> > >>>>>>> 在2020年10月31日 12:06,陈帅 写道: > >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > >>>>>>> > >>>>>>> Exception in thread "main" > org.apache.flink.table.api.TableException: > >>>>>>> AppendStreamTableSink doesn't support consuming update and delete > >>>>>>> changes > >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, > cdc, > >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) > >>>>>>> > >>>>>>> 我的问题: > >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> > >>>>>>> kafka,然后kafka > >>>>>>> -> hive streaming? 谢谢! > >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > >>>>>>> > >>>>>>> sql语句如下 > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS cdc > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS cdc.team > >>>>>>> > >>>>>>> CREATE TABLE team( > >>>>>>>team_id BIGINT, > >>>>>>>team_name STRING, > >>>>>>>create_time TIMESTAMP, > >>>>>>>update_time TIMESTAMP, > >>>>>>> proctime as proctime() > >>>>>>> ) WITH ( > >>>>>>> 'connector' = 'mysql-cdc', > >>>>>>> 'hostname' = 'localhost', > >>>>>>> 'port' = '3306', > >>>>>>> 'username' = 'root', > >>>>>>> 'password' = 'root', > >>>>>>> 'database-name' = 'test', > >>>>>>> 'table-name' = 'team' > >>>>>>> ) > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS ods > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS ods.team > >>>>>>> > >>>>>>> CREATE TABLE ods.team ( > >>>>>>> team_id BIGINT, > >>>>>>> team_name STRING, > >>>>>>> create_time TIMESTAMP, > >>>>>>> update_time TIMESTAMP, > >>>>>>> ) PARTITIONED BY ( > >>>>>>> ts_date STRING, > >>>>>>> ts_hour STRING, > >>>>>>> ts_minute STRING, > >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( > >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', > >>>>>>> 'sink.partition-commit.delay' = '1 min', > >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', > >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date > >>>>>>> $ts_hour:$ts_minute:00' > >>>>>>> ) > >>>>>>> > >>>>>>> INSERT INTO ods.team > >>>>>>> SELECT team_id, team_name, create_time, update_time, > >>>>>>> my_date_format(create_time,'-MM-dd', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') > >>>>>>> FROM cdc.team > >>>>>>> > >>>>>> > -- Best regards! Rui Li
Re: sql-client 连接hive报错 TTransportException
gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859) > ... 3 more > Caused by: org.apache.thrift.transport.TTransportException > at > org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) > at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) > at > org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) > at > org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) > at > org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) > at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) > at > org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_database(ThriftHiveMetastore.java:1135) > at > org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_database(ThriftHiveMetastore.java:1122) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:1511) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:1506) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208) > at com.sun.proxy.$Proxy28.getDatabase(Unknown Source) > at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.getDatabase(HiveMetastoreClientWrapper.java:107) > at > org.apache.flink.table.catalog.hive.HiveCatalog.databaseExists(HiveCatalog.java:330) > ... 15 more > > > 谢谢! > > > > -- Best regards! Rui Li
Re: flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1
Hi, 实时写hive有一个已知的性能问题:https://issues.apache.org/jira/browse/FLINK-19121 建议打一个这个patch再试试。 On Tue, Sep 29, 2020 at 7:12 PM Jun Zhang <825875...@qq.com> wrote: > 你的kafka的分区数是多少,把flink的并行度加大到kafka的分区数。 > > > > Best Jun > > > -- 原始邮件 -- > 发件人: me 发送时间: 2020年9月29日 19:08 > 收件人: user-zh 主题: 回复:flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1 > > > > flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1 > tableEnv.executeSql("insert into dwd_security_log select * from " + table) > > 实际写入hive之后,查看hdfs上写入的文件为19M,这是60秒内写入hive的,flink流式写入hive通过checkpotin来把数据刷入hive中。 > > > 请问大家只有有什么提升写入速度的参数或者方式吗? -- Best regards! Rui Li
Re: [flink-1.11] 读kafka写hive,IDEA运行成功,打成jar包,提交到yarn运行报错
Hi, 可以检查一下是不是缺少了kafka connector的依赖,还有一种可能是SPI的service文件被覆盖了,这种情况的话可以用maven shade plugin的ServicesResourceTransformer来合并不同jar里的service文件 On Thu, Sep 24, 2020 at 7:17 PM nashcen <2415370...@qq.com> wrote: > > 代码在IDEA运行成功,打成jar包,提交到yarn运行报错。一开始以为是少包,后来把所有依赖包都打了进来,全局dependency.scope设为compile,依然报错。 > > 启动命令: > nohup \ > $FLINK_HOME/bin/flink run \ > --class > > com.athub.dcpoints.scala.connector.table.hive.OdsDcpointsProdKafkaFlinkHiveApp > \ > --target yarn-per-job \ > --jobmanager yarn-cluster \ > --yarnjobManagerMemory 1024m \ > --yarntaskManagerMemory 4096m \ > --parallelism 4 \ > /bigdata/athub/deploy/kafka-flink-hive-1.0.jar \ > >/dev/null 2>/bigdata/athub/deploy/kafka-flink-hive-err.log & > > > 报错日志: > > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Unable to create a source for reading table > 'hive_catalog.dc_ods.ods_dcpoints_prod_kafka_source'. > > Table options are: > > 'connector'='kafka' > 'format'='json' > 'is_generic'='true' > 'json.fail-on-missing-field'='false' > 'json.ignore-parse-errors'='true' > > 'properties.bootstrap.servers'='prod-bigdata-03:9092,prod-bigdata-04:9092,prod-bigdata-05:9092,prod-bigdata-06:9092' > 'scan.startup.mode'='earliest-offset' > 'topic'='ods_dcpoints_prod' > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) > at > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > Caused by: org.apache.flink.table.api.ValidationException: Unable to create > a source for reading table > 'hive_catalog.dc_ods.ods_dcpoints_prod_kafka_source'. > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: [flink-1.11] 读 kafka 写 hive,partition-time 抽取值不准确
不好意思,我本地试了一下flink的FROM_UNIXTIME也是用的系统时区。你贴一下hive表的DDL和INSERT语句,我去试一下吧 On Fri, Sep 25, 2020 at 1:58 PM Rui Li wrote: > 这应该是时区差异导致的,flink的FROM_UNIXTIME用的是UTC时区,hive的FROM_UNIXTIME用的是系统时区。 > > On Thu, Sep 24, 2020 at 4:16 PM nashcen <2415370...@qq.com> wrote: > >> Kafka 表 定义如下: >> CREATE TABLE `dc_ods`.`ods_dcpoints_prod_kafka_source` ( >> `assetSpecId` STRING, >> `dcnum` STRING, >> `monitorType` STRING, >> `tagNo` STRING, >> `value` STRING, >> * `updateTime` BIGINT, >> `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(`updateTime` / >> 1000,'-MM-dd >> HH:mm:ss')),* >> WATERMARK FOR `eventTime` AS `eventTime` - INTERVAL '5' SECOND >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'ods_dcpoints_prod', >> 'properties.bootstrap.servers' = 'prod-bigdata-03:9092', >> 'scan.startup.mode' = 'earliest-offset', >> 'format' = 'json', >> 'json.fail-on-missing-field' = 'false', >> 'json.ignore-parse-errors' = 'true' >> ) >> >> 从已经插入到 hive的数据来看,有一条update_time=1600218000292,对应event_time的值,为 >> 2020-09-16 01:00:00.0 >> <http://apache-flink.147419.n8.nabble.com/file/t817/1600935127%281%29.png> >> >> >> 但是从 hive 查询 FROM_UNIXTIME(cast(1600218000292/1000 as int),'-MM-dd >> HH:mm:ss') 的值,为 >> 2020-09-16 09:00:00 >> <http://apache-flink.147419.n8.nabble.com/file/t817/1600935330%281%29.png> >> >> >> 两者对不上,这是什么原因? >> >> >> >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> > > > -- > Best regards! > Rui Li > -- Best regards! Rui Li
Re: [flink-1.11] 读 kafka 写 hive,partition-time 抽取值不准确
这应该是时区差异导致的,flink的FROM_UNIXTIME用的是UTC时区,hive的FROM_UNIXTIME用的是系统时区。 On Thu, Sep 24, 2020 at 4:16 PM nashcen <2415370...@qq.com> wrote: > Kafka 表 定义如下: > CREATE TABLE `dc_ods`.`ods_dcpoints_prod_kafka_source` ( > `assetSpecId` STRING, > `dcnum` STRING, > `monitorType` STRING, > `tagNo` STRING, > `value` STRING, > * `updateTime` BIGINT, > `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(`updateTime` / 1000,'-MM-dd > HH:mm:ss')),* > WATERMARK FOR `eventTime` AS `eventTime` - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ods_dcpoints_prod', > 'properties.bootstrap.servers' = 'prod-bigdata-03:9092', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json', > 'json.fail-on-missing-field' = 'false', > 'json.ignore-parse-errors' = 'true' > ) > > 从已经插入到 hive的数据来看,有一条update_time=1600218000292,对应event_time的值,为 > 2020-09-16 01:00:00.0 > <http://apache-flink.147419.n8.nabble.com/file/t817/1600935127%281%29.png> > > > 但是从 hive 查询 FROM_UNIXTIME(cast(1600218000292/1000 as int),'-MM-dd > HH:mm:ss') 的值,为 > 2020-09-16 09:00:00 > <http://apache-flink.147419.n8.nabble.com/file/t817/1600935330%281%29.png> > > > 两者对不上,这是什么原因? > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: Flink-1.11 sql-client yaml 配置问题
你好,这个感觉是缺少hive connector的依赖,lib下面添加了哪些jar呢? On Thu, Sep 24, 2020 at 11:00 AM nashcen <2415370...@qq.com> wrote: > 准备通过 命令行工具 $FLINK_HOME/bin/sql-client.sh embedded > 登录 Flink SQL 客户端 去连接 Hive, > > > 我在 Flink-SQL 的配置文件 sql-client-defaults.yaml 里, > 加入了以下参数 > catalogs: > - name: myhive > type: hive > hive-conf-dir: > /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/conf > default-database: dc_stg > > 启动报错,以下是报错信息: > > Reading default environment from: > > file:/bigdata/athub/app/bigdata/flink/flink-1.11.1/conf/sql-client-defaults.yaml > No session environment specified. > > > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: > Unexpected exception. This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > at > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870) > at > > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) > at > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could > not find a suitable table factory for > 'org.apache.flink.table.factories.CatalogFactory' in > the classpath. > > Reason: Required context properties mismatch. > > The following properties are requested: > default-database=dc_stg > > hive-conf-dir=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/conf > type=hive > > The following factories have been considered: > org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > at > > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > at > > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > at > > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > at > > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:377) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:626) > at java.util.HashMap.forEach(HashMap.java:1289) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:625) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859) > ... 3 more > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: kafka增加字段,hive表如何处理
Hi, 直接给hive表增加字段遇到的具体问题是什么呢?把stacktrace贴一下吧。 On Wed, Sep 23, 2020 at 6:50 PM china_tao wrote: > flink1.11.1,flink sql,已经实现flink sql > 读取kafka,存储到hive。现在的问题是,kafka源增加字段了,flink > sql中的hive如何修改。直接在hive中增加字段的话,每次启动,会报 hive表已经存在,如果drop table if > exists的话,历史数据就会丢。请问大家是如何处理的,谢谢。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: 任务提交中使用了hive的udf时的疑问
Hi, 这种模式目前还不支持,因为现在没有动态添加jar的功能,所以需要事先把依赖的jar准备好。 On Mon, Sep 21, 2020 at 9:47 AM Husky Zeng <568793...@qq.com> wrote: > Hi all, > > 在提交的flink任务中使用了hive的udf时,任务的运行会依赖一些udf相关的jar包,配置文件等资源。 > > > 在我们的生产环境中,这些udf相关的jar包,配置文件都由hive的metaStore统一管理着,因此,flink在同hive交互时,是可以拿到这些文件的远程存储路径的(hdfs路径)。 > > > 我们希望flink任务在提交时能够只提交这些从hive中得到的文件路径,而不必传输这些文件(在flink外围去做这样一个查询hive然后下载文件的事情,在生产环境中相当于多了一个步骤,带来了不必要的风险,因此希望能够在flink任务运行时自动完成这些事情)。在这样的方案里,flink任务会在运行时根据路径从hdfs下载相关jar包和配置文件。 > > 从代码里可以看到 ,FunctionInfo 这个对象里已经有了resources的相关路径。但是看上去并没有合理的利用它。 > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java#L80 > > 请问你们有没有什么办法,可以在提交任务时,不提交udf相关的资源文件?或者对于这样的方案,有没有开发计划? > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: Flink-1.11.1 读写 Hive 问题
Hi, 打印数组可以用Arrays.toString方法。获取hive表失败的堆栈是完整的么,感觉底下应该还有root cause? On Fri, Sep 18, 2020 at 3:32 PM nashcen <2415370...@qq.com> wrote: > Hello > > 各位,经过3天的踩坑,我已经能够从IDEA,用Flink访问Hive,但是如何查看Hive里的数据库、表、以及表里的数据,并把它们打印出来,目前还不太清楚。 > 以下是Hive中的表, > <http://apache-flink.147419.n8.nabble.com/file/t817/1600410319%281%29.jpg> > > Idea中查询出来的库与表信息,不完整 > <http://apache-flink.147419.n8.nabble.com/file/t817/1600410478.jpg> > Idea中查询表中数据,报错 > <http://apache-flink.147419.n8.nabble.com/file/t817/1600414228%281%29.jpg> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: flink hive批量作业报FileNotFoundException
Hello, 作业的逻辑大概是啥样的,我去试试 On Thu, Sep 17, 2020 at 10:00 PM godfrey he wrote: > cc @Rui Li > > 李佳宸 于2020年9月14日周一 下午5:11写道: > >> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件 >> 版本是1.11.1 >> Caused by: java.io.FileNotFoundException: File >> >> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144 >> does not exist. >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157) >> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0] >> at >> >> org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140) >> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] >> at >> >> org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98) >> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] >> at >> >> org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95) >> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] >> at >> >> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132) >> ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> at >> >> org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286) >> ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> >> 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题 >> > -- Best regards! Rui Li
Re: Flink sql权限
Hi, 目前使用hive catalog时只支持Metastore端的storage based authorization。 On Tue, Sep 15, 2020 at 4:16 PM Leonard Xu wrote: > Hi, > 据我所知,目前是不支持的,社区没找到对应的issue, 这块短期内应该没计划整,CC了熟悉这块的小伙伴。 > > 祝好 > Leonard Xu > > > 在 2020年9月11日,14:33,163 写道: > > > > > > 请问,flink sql支持元数据的权限校验吗?例如使用hive catalog时,支持hive的权限检查?如果目前不支持,未来是否会考虑? > > > > -- Best regards! Rui Li
Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询
} > > *protected* MyHiveCatalog(String catalogName, String defaultDatabase, > HiveConf hiveConf, String hiveVersion, > > *boolean* allowEmbedded) { > > *super*(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded > ); > > // *TODO* Auto-generated constructor stub > > } > > *private* *static* HiveConf createHiveConf(@Nullable HiveConf hiveConf) { > > //LOG.info("Setting hive conf dir as {}", hiveConfDir); > > > // try { > > // HiveConf.setHiveSiteLocation( > > // hiveConfDir == null ? > > // null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL()); > > // } catch (MalformedURLException e) { > > // throw new CatalogException( > > // String.format("Failed to get hive-site.xml from %s", hiveConfDir), e); > > // } > > > // create HiveConf from hadoop configuration > > Configuration hadoopConf = HadoopUtils.*getHadoopConfiguration*(*new* > org.apache.flink.configuration.Configuration()); > > > // Add mapred-site.xml. We need to read configurations like compression > codec. > > *for* (String possibleHadoopConfPath : HadoopUtils. > *possibleHadoopConfPaths*(*new* > org.apache.flink.configuration.Configuration())) { > > File mapredSite = *new* File(*new* File(possibleHadoopConfPath), > "mapred-site.xml"); > > *if* (mapredSite.exists()) { > > hadoopConf.addResource(*new* Path(mapredSite.getAbsolutePath())); > > *break*; > > } > > } > > HiveConf conf = *new* HiveConf(hadoopConf, HiveConf.*class*); > > conf.addResource(hiveConf); > > *return* conf; > > } > > > } > > ** > Thanks & Best Regards! > > 杉欣集团-技术研究院 云平台 > 钟保罗 > > 上海浦东新区东方路3261号振华广场B座23楼(杉欣集团) > email: zhongbao...@shxgroup.net > 手机: 18157855633 > > > > 原始邮件 > *发件人:* taochanglian > *收件人:* user-zh; zhongbaoluo< > zhongbao...@shxgroup.net> > *发送时间:* 2020年9月8日(周二) 16:51 > *主题:* Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询 > > 贴一下代码 > 在 2020/9/8 14:09, zhongbaoluo 写道: > > 据插入数据执行失败,也没有找到异常。 yarn > > > -- Best regards! Rui Li
Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
另外也list一下没有提交的分区目录吧,看看里面的文件是什么状态 On Tue, Sep 8, 2020 at 9:19 PM Rui Li wrote: > 作业有发生failover么?还是说作业能成功结束但是某些partition始终没提交? > > On Tue, Sep 8, 2020 at 5:20 PM MuChen <9329...@qq.com> wrote: > >> hi, Rui Li: >> 如你所说,的确有类似日志,但是只有成功增加的分区的日志,没有失败分区的日志: >> 2020-09-04 17:17:10,548 INFO org.apache.flink.streaming.api.operators. >> AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=18} of table >> `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be committed >> 2020-09-04 17:17:10,716 INFO org.apache.flink.table.filesystem. >> MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=18} >> to metastore >> 2020-09-04 17:17:10,720 INFO org.apache.flink.table.filesystem. >> SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22, hour=18} >> with success file >> 2020-09-04 17:17:19,652 INFO org.apache.flink.streaming.api.operators. >> AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=19} of table >> `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be committed >> 2020-09-04 17:17:19,820 INFO org.apache.flink.table.filesystem. >> MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=19} >> to metastore >> 2020-09-04 17:17:19,824 INFO org.apache.flink.table.filesystem. >> SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22, hour=19} >> with success file >> >> 写hdfs的日志是都有的: >> 2020-09-04 17:16:04,100 INFO org.apache.hadoop.hive.ql.io.parquet.write. >> ParquetRecordWriterWrapper [] - creating real writer to write at hdfs:// >> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020- >> 08-22/hour=07/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1140 >> .inprogress.1631ac6c-a07c-4ad7-86ff-cf0d4375d1de >> 2020-09-04 17:16:04,126 INFO org.apache.hadoop.hive.ql.io.parquet.write. >> ParquetRecordWriterWrapper [] - creating real writer to write at hdfs:// >> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020- >> 08-22/hour=19/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1142 >> .inprogress.2700eded-5ed0-4794-8ee9-21721c0c2ffd >> >> -- 原始邮件 -- >> *发件人:* "Rui Li" ; >> *发送时间:* 2020年9月8日(星期二) 中午12:09 >> *收件人:* "user-zh";"夏帅"; >> *抄送:* "MuChen"<9329...@qq.com>; >> *主题:* Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败 >> >> streaming file committer在提交分区之前会打印这样的日志: >> >> LOG.info("Partition {} of table {} is ready to be committed", partSpec, >> tableIdentifier); >> >> partition commit policy会在成功提交分区以后打印这样的日志: >> >> LOG.info("Committed partition {} to metastore", partitionSpec); >> >> LOG.info("Committed partition {} with success file", >> context.partitionSpec()); >> >> 可以检查一下这样的日志,看是不是卡在什么地方了 >> >> On Tue, Sep 8, 2020 at 11:02 AM 夏帅 wrote: >> >>> 就第二次提供的日志看,好像是你的namenode出现的问题 >>> >>> >>> -- >>> 发件人:MuChen <9329...@qq.com> >>> 发送时间:2020年9月8日(星期二) 10:56 >>> 收件人:user-zh@flink.apache.org 夏帅 ; user-zh < >>> user-zh@flink.apache.org> >>> 主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败 >>> >>> 在checkpoint失败的时间,tm上还有一些info和warn级别的日志: >>> 2020-09-04 17:17:59,520 INFO >>> org.apache.hadoop.io.retry.RetryInvocationHandler [] - Exception while >>> invoking create of class ClientNamenodeProtocolTranslatorPB over >>> uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts. >>> Trying to fail over immediately. >>> java.io.IOException: java.lang.InterruptedException >>> at org.apache.hadoop.ipc.Client.call(Client.java:1449) >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] >>> at org.apache.hadoop.ipc.Client.call(Client.java:1401) >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] >>> at >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] >>> at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?] >>> at >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295) >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] >>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) >>> ~[?:?] >>> at >>> sun.reflect.DelegatingMethodAcce
Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
作业有发生failover么?还是说作业能成功结束但是某些partition始终没提交? On Tue, Sep 8, 2020 at 5:20 PM MuChen <9329...@qq.com> wrote: > hi, Rui Li: > 如你所说,的确有类似日志,但是只有成功增加的分区的日志,没有失败分区的日志: > 2020-09-04 17:17:10,548 INFO org.apache.flink.streaming.api.operators. > AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=18} of table > `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be committed > 2020-09-04 17:17:10,716 INFO org.apache.flink.table.filesystem. > MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=18} > to metastore > 2020-09-04 17:17:10,720 INFO org.apache.flink.table.filesystem. > SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22, hour=18} > with success file > 2020-09-04 17:17:19,652 INFO org.apache.flink.streaming.api.operators. > AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=19} of table > `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be committed > 2020-09-04 17:17:19,820 INFO org.apache.flink.table.filesystem. > MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=19} > to metastore > 2020-09-04 17:17:19,824 INFO org.apache.flink.table.filesystem. > SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22, hour=19} > with success file > > 写hdfs的日志是都有的: > 2020-09-04 17:16:04,100 INFO org.apache.hadoop.hive.ql.io.parquet.write. > ParquetRecordWriterWrapper [] - creating real writer to write at hdfs:// > Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08 > -22/hour=07/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1140.inprogress. > 1631ac6c-a07c-4ad7-86ff-cf0d4375d1de > 2020-09-04 17:16:04,126 INFO org.apache.hadoop.hive.ql.io.parquet.write. > ParquetRecordWriterWrapper [] - creating real writer to write at hdfs:// > Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08 > -22/hour=19/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1142.inprogress. > 2700eded-5ed0-4794-8ee9-21721c0c2ffd > > -- 原始邮件 -- > *发件人:* "Rui Li" ; > *发送时间:* 2020年9月8日(星期二) 中午12:09 > *收件人:* "user-zh";"夏帅"; > *抄送:* "MuChen"<9329...@qq.com>; > *主题:* Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败 > > streaming file committer在提交分区之前会打印这样的日志: > > LOG.info("Partition {} of table {} is ready to be committed", partSpec, > tableIdentifier); > > partition commit policy会在成功提交分区以后打印这样的日志: > > LOG.info("Committed partition {} to metastore", partitionSpec); > > LOG.info("Committed partition {} with success file", context.partitionSpec()); > > 可以检查一下这样的日志,看是不是卡在什么地方了 > > On Tue, Sep 8, 2020 at 11:02 AM 夏帅 wrote: > >> 就第二次提供的日志看,好像是你的namenode出现的问题 >> >> >> -- >> 发件人:MuChen <9329...@qq.com> >> 发送时间:2020年9月8日(星期二) 10:56 >> 收件人:user-zh@flink.apache.org 夏帅 ; user-zh < >> user-zh@flink.apache.org> >> 主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败 >> >> 在checkpoint失败的时间,tm上还有一些info和warn级别的日志: >> 2020-09-04 17:17:59,520 INFO >> org.apache.hadoop.io.retry.RetryInvocationHandler [] - Exception while >> invoking create of class ClientNamenodeProtocolTranslatorPB over >> uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts. >> Trying to fail over immediately. >> java.io.IOException: java.lang.InterruptedException >> at org.apache.hadoop.ipc.Client.call(Client.java:1449) >> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] >> at org.apache.hadoop.ipc.Client.call(Client.java:1401) >> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) >> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] >> at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?] >> at >> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295) >> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] >> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) ~[?:?] >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_144] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144] >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) >> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >> ~[music_copyright-1.
Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
pache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609) > ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] > at > org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707) > ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] > at > org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370) > ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.hadoop.ipc.Client.getConnection(Client.java:1523) > ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] > at org.apache.hadoop.ipc.Client.call(Client.java:1440) > ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] > ... 38 more > > 补充:程序多次执行,均会出现部分分区创建失败的情况,而且每次失败的分区是不同的 > > > -- 原始邮件 -- > 发件人: "user-zh@flink.apache.org 夏帅" ; > 发送时间: 2020年9月8日(星期二) 上午10:47 > 收件人: "user-zh";"MuChen"<9329...@qq.com>; > 主题: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败 > > 异常日志只有这些么?有没有详细点的 -- Best regards! Rui Li
Re: flink-sql-gateway hive ddl 语句parse报错 godfrey he 能帮忙看下吗?
SelectedKeys(NioEventLoop.java:549) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] > Caused by: com.ververica.flink.table.gateway.utils.SqlGatewayException: > Failed to parse statement. > at > > com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:102) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > at > > com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > ... 43 more > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered > "STRING" at line 1, column 88. > Was expecting one of: > ")" ... > "," ... > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:203) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > com.ververica.flink.table.gateway.operation.SqlCommandParser.parseStmt(SqlCommandParser.java:115) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > at > > com.ververica.flink.table.gateway.operation.SqlCommandParser.parse(SqlCommandParser.java:103) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > at > > com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:93) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > at > > com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > ... 43 more > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered > "STRING" at line 1, column 88. > Was expecting one of: > ")" ... > "," ... > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:2775) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:252) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:201) > ~[flink-table_2.11-1.11.1.jar:1.11.1] > at > > com.ververica.flink.table.gateway.operation.SqlCommandParser.parseStmt(SqlCommandParser.java:115) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > at > > com.ververica.flink.table.gateway.operation.SqlCommandParser.parse(SqlCommandParser.java:103) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > at > > com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:93) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > at > > com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81) > ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] > ... 43 more > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: 请教 hive streaming 报错
可以试试不把hive-exec和PB 3.5.1的jar放到lib下面,而是通过命令行参数的方式指定这两个依赖? On Wed, Sep 2, 2020 at 5:52 PM liangck wrote: > 最后加了好多jar包到 flink/lib > > 下,任务跑起来了。但是hive-exec中依赖的protobuf版本是2.5.0而且打进了jar包里,和其他任务里依赖的protobuf版本3.5.1不兼容。。请问下大佬们有什么好办法吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best regards! Rui Li
Re: flink-1.11.1 Table API /SQL 无法写入hive orc表
没错,flink-sql-connector-hive-2.2.0可以支持Hive 2.0.0 ~ 2.2.0. 具体每个flink-sql-connector-hive可以支持的hive版本请参考官网 [1]。 关于TypeDescription.fromString的异常可能是遇到了已知问题 [2][3]。可以用最新1.11分支的代码试试看能不能解决。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#dependencies [2] https://issues.apache.org/jira/browse/FLINK-18659 [3] https://issues.apache.org/jira/browse/FLINK-18942 On Tue, Sep 1, 2020 at 10:01 AM amen...@163.com wrote: > hi Jian Wang, > > 根据我的理解,在flink > lib目录下导入官方的flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar是指hive[2.0.0-2.2.0]版本都可以使用此依赖。 > > > 关于你的问题我曾经遇到过,hive也是2.1.1,我的demo参考[1]可以运行成功,而不需要额外导入flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar, > 只需要把[1]中的依赖改成provided并把其jar包导入flink/lib即可。 > > 希望能帮到你, > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#program-maven > > best, > amenhub > > > 发件人: Jian Wang > 发送时间: 2020-08-31 21:55 > 收件人: user-zh > 主题: flink-1.11.1 Table API /SQL 无法写入hive orc表 > Hi all, > > 我基于flink 1.11 + hadoop 3.0.0 + hive 2.1.1 , flink on yarn模式,在streaming > job上的Table API上执行flink sql实时写入hive表。 > > 根据文档 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ > < > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/> > 去配置,现在遇到flink和hive的依赖问题。 > > > 在项目内的pom上,所有相关hive相关依赖都是provided,在flink > lib下放进flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar, > 提交任务的时候,会出现hive-exec.jar的冲突导致java.lang.NoClassDefFoundError: Could not > initialize class org.apache.hadoop.hive.ql.io.orc.OrcInputFormat > (因为我的hive是2.1.1版本,flink没有提供flink-sql-connector-hive的2.1.1版本,所以我用的和它最近的2.2.0)。 > > > 我又尝试了根据我的hive版本2.1.1, > 去根据flink源码,把hive-exec改成2.1.1去手动打包flink-sql-connector-hive-2.1.1_2.11-1.11.1.jar放到flink > lib下, > 但是发现flink-sql-connector-hive里面的orc-core-1.4.3和hive-exec-2.1.1冲突,java.lang.NoSuchMethodError: > org.apache.orc.TypeDescription.fromString(Ljava/lang/String;)Lorg/apache/orc/TypeDescription; > > 我看到在flink提供的flink-sql-connector-hive中确实没有2.1.1的版本,是这个版本无法和flink兼容吗?或 > 有flink 1.11和hive 2.1.1集成成功的范例参考吗? > > 谢谢 > > > 王剑 > -- Best regards! Rui Li
Re: 请教 hive streaming 报错
Hi, 怀疑是类加载的问题的话可以尝试把所有依赖的jar都放到lib下面试试,保证这些依赖是同一个classloader加载的 On Tue, Sep 1, 2020 at 9:42 AM liangck wrote: > 遇到同样的问题,请问解决了吗。我是flink-connector-hive和hive-exec打进jar包里提交的。但是 > > flink-connector-hive里有个org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder类,引用了streaming-java包里的org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl。估计是因为类加载器不同导致无法引用报错。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?
Hi, 这个场景目前还是不支持的。定义watermark需要在DDL里做,hive表本身没有这个概念,所以DDL里定义不了。以后也许可以通过额外的参数来指定watermark。 On Sun, Aug 30, 2020 at 10:16 PM me wrote: > 如果是直接连接的hive catalog呢,是hive中已存在的表,直接去流式的连接读取? > 您那有什么可解决的想法吗? > > > 原始邮件 > 发件人: Zou Dan > 收件人: user-zh > 发送时间: 2020年8月30日(周日) 21:55 > 主题: Re: flink1.11 流式读取hive怎么设置 process_time 和event_time? > > > Event time 是通过 DDL 中 watermark 语句设置的,具体可以参考文档 [1] [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table > <"> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table> > Best, Dan Zou > 2020年8月30日 下午9:42,me 写道: > > flink1.11 > 可以使用在使用select语句时,显式的指定是流式读取,流式的读出出来之后如果想使用实时计算中的特性窗口函数然后指定时间语义 > 事件时间和处理时间,但是flink sql需要显示的定义数据中的时间字段才能识别为 event_time,求问这个怎么去设置。 -- Best regards! Rui Li
Re: 一个main方法启动2个yarn job问题
作业代码是怎么写的啊?按说写SQL的话不需要执行Env.execute On Fri, Aug 28, 2020 at 9:41 AM air23 wrote: > 你好。我有一个接kafka 写入tidb的任务 为什么会启动2个yarn任务去运行呢? > 我是先用datastream 接入kafka。然后转成table sql写入到tidb > 2个job name 一个叫Env.execute配置的名字 > 一个是叫insert 写入tidb的sql语句名字 > > -- Best regards! Rui Li
Re: flink-sql 1.11整合hbase的查询性能问题
Hello, 目前hive connector没有支持hive的storage handler表 [1],也就是说通过STORED BY定义的表现在是不支持的。普通的external表是支持的。 [1] https://cwiki.apache.org/confluence/display/Hive/StorageHandlers#StorageHandlers-DDL On Fri, Aug 28, 2020 at 2:43 PM Leonard Xu wrote: > Hi > > > 另外,HbaseTableSource 有没有计划什么时候支持 SupportsFilterPushDown. > 我搜了下社区还没相关的issue,如果是强需求你可以去社区建个issue[1],让社区支持下 > 第二个异常栈,如果确认”org.apache.hive:hive-hbase-handler:2.1.1” 已经加载,我感觉是个bug, cc > Rui Li 确认下 > > 祝好 > Leonard > [1] https://issues.apache.org/jira/projects/FLINK/summary < > https://issues.apache.org/jira/projects/FLINK/summary> > > > > 关于"select * from hive_hbase_t1"的异常日志如下。 > > > > > > Flink SQL> select * from hive_hbase_t1; > > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.vectorized.use.checked.expressions does not > exist > > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.strict.checks.no.partition.filter does not > exist > > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.strict.checks.orderby.no.limit does not exist > > 2020-08-28 13:20:19,985 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.vectorized.input.format.excludes does not > exist > > 2020-08-28 13:20:19,986 WARN org.apache.hadoop.hive.conf.HiveConf > > > [] - HiveConf of name hive.strict.checks.bucketing does not exist > > [ERROR] Could not execute SQL statement. Reason: > > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > > error., > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > > job. > > at > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344) > > at > > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > > at > > > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > > at > > > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > > at > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not > > instantiate JobManager. > > at > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) > > at > > > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > > ... 6 more > > Caused by: org.apache.flink.runtime.JobException: Creating the input > splits > > caused an error: Unable to instantiate the hadoop input format > > at > > > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:272) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228) > > at > > > org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) > > at > > > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) > > at > > > org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229) > > at > > > org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119) > > at > > > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) > > at > > > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) > > at > org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) > > at > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > >
Re: tidb Binlog 整库同步到 hive
是的,我觉得需要自己实现一个sink,检测到新增的表时需要通过catalog去hive里创建一下。有点像hive的dynamic partitioning,只不过写的是多张表而不是多个partition。 On Fri, Aug 28, 2020 at 2:08 PM Qishang wrote: > Hi Rui Li. > > > 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了 > 这个实现有啥思路,能稍微详细说一下嘛? 是不是需要自己开发一个 Sink 来适配? > > Rui Li 于2020年8月28日周五 下午1:47写道: > > > Hi, > > > > 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了,具体写表的时候可以试试看能不能复用hive > > connector里现有的sink。 > > > > On Fri, Aug 28, 2020 at 12:15 PM Leonard Xu wrote: > > > > > Hi > > > > > > > 多路输出是个动态的,因为是整库的binlog ,不确定库中是否有新增表,作业要触发hive里面去创建表的动作。hive catalog > > > create > > > > table 是否可以在运行中来调用吗? > > > > 程序启动之后生成的jobgraph运行中是不是不可以调整的?如果可以的话,是否有类似的案例可以参考。 > > > > > > 用dataStream是会更灵活些,思路也差不多,在运行中可以调用的建表动作的,但是运行的拓扑是不可以动态调整的,不管DataStream > 还是 > > > SQL 的拓扑。 > > > > > > 祝好 > > > Leonard > > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best regards! Rui Li
Re: tidb Binlog 整库同步到 hive
Hi, 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了,具体写表的时候可以试试看能不能复用hive connector里现有的sink。 On Fri, Aug 28, 2020 at 12:15 PM Leonard Xu wrote: > Hi > > > 多路输出是个动态的,因为是整库的binlog ,不确定库中是否有新增表,作业要触发hive里面去创建表的动作。hive catalog > create > > table 是否可以在运行中来调用吗? > > 程序启动之后生成的jobgraph运行中是不是不可以调整的?如果可以的话,是否有类似的案例可以参考。 > > 用dataStream是会更灵活些,思路也差不多,在运行中可以调用的建表动作的,但是运行的拓扑是不可以动态调整的,不管DataStream 还是 > SQL 的拓扑。 > > 祝好 > Leonard > > -- Best regards! Rui Li
Re: flink stream sink hive
Hello, 贴一下完整的stacktrace看看吧。流式写orc表有个已知问题 [1],可能是跟这个有关。 [1] https://issues.apache.org/jira/browse/FLINK-18659 On Thu, Aug 27, 2020 at 9:16 PM Congxian Qiu wrote: > Hi >从异常看,可能是类冲突了,或许是有两个版本的 `org.apache.orc.TypeDescription` 依赖,可以排除或者 shade > 一下相关的 class > Best, > Congxian > > > liya...@huimin100.cn 于2020年8月27日周四 下午8:18写道: > > > Exception in thread "main" java.lang.NoSuchMethodError: > > > org.apache.orc.TypeDescription.fromString(Ljava/lang/String;)Lorg/apache/orc/TypeDescription; > > > > -- > > liya...@huimin100.cn > > > > > > *发件人:* liya...@huimin100.cn > > *发送时间:* 2020-08-27 19:09 > > *收件人:* user-zh > > *主题:* flink stream sink hive > > flink1.11.1 往hive2.1.1 的orc表写数据报的异常,在网上查不到,只能来这里了,麻烦大佬们帮我看看 > > -- > > liya...@huimin100.cn > > > > > -- Best regards! Rui Li
Re: [ANNOUNCE] New PMC member: Dian Fu
Congratulations Dian! On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei wrote: > Congrats! > > On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang wrote: > >> Congratulations Dian! >> >> Best, >> Xingbo >> >> jincheng sun 于2020年8月27日周四 下午5:24写道: >> >>> Hi all, >>> >>> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now >>> part of the Apache Flink Project Management Committee (PMC). >>> >>> Dian Fu has been very active on PyFlink component, working on various >>> important features, such as the Python UDF and Pandas integration, and >>> keeps checking and voting for our releases, and also has successfully >>> produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push >>> forward the release of Flink 1.12. >>> >>> Please join me in congratulating Dian Fu for becoming a Flink PMC Member! >>> >>> Best, >>> Jincheng(on behalf of the Flink PMC) >>> >> -- Best regards! Rui Li
Re: 请教一下flink链接hive的权限控制
Hi, 不好意思,我查了一下hive文档发现SQL standard authorization是在CLI/HS2端做的,那HiveCatalog目前没办法支持这种模式。HMS端可以用storage based authorization,也就是你说的通过HDFS的ACL来控制权限。这种模式对外表和内表都是有效的,但管理起来一般比较繁琐,需要人工去设置路径的ACL。 On Wed, Aug 26, 2020 at 11:08 AM faaron zheng wrote: > Hi Rui,感谢你的分享。我简单试了一下开启SQL Standard > Authorization,没什么效果,不知道是我用的不对还是我们hive被定制过。此外,我发现在使用kerberos的情况下,可以通过hdfs的路径来控制权限,不过这种情况主要对外表比较有效。 > 在2020年08月25日 21:34,Rui Li 写道: Hi, > Authentication的话支持kerberos,应该正常做kinit就可以了。或者可以设置flink > security相关的参数,如security.kerberos.login.keytab和security.kerberos.login.principal。具体可以参考: > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems > Authorization目前HiveCatalog这边没有做。如果你的HMS启用了authorization(比如hive自身的SQL > standard authorization),那验证的动作应该发生在HMS端,对HiveCatalog也是生效的。 On Tue, Aug 25, > 2020 at 4:48 PM xiaoyan hua wrote: > > 我们当前用的是kerberos认证,需要额外配置什么么? xiaoyan hua 邮箱:xiaoyanhua...@gmail.com 签名由 > > 网易邮箱大师 定制 在2020年08月25日 15:54,faaron zheng 写道: Hi all, 我在使用flink > > sql-client链接hive metastore的时候,发现好像没有做任何权限控制,可以访问所有的表?这一块是没做么?有什么计划么? -- > Best regards! Rui Li -- Best regards! Rui Li
Re: 请教一下flink链接hive的权限控制
Hi, Authentication的话支持kerberos,应该正常做kinit就可以了。或者可以设置flink security相关的参数,如security.kerberos.login.keytab和security.kerberos.login.principal。具体可以参考: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems Authorization目前HiveCatalog这边没有做。如果你的HMS启用了authorization(比如hive自身的SQL standard authorization),那验证的动作应该发生在HMS端,对HiveCatalog也是生效的。 On Tue, Aug 25, 2020 at 4:48 PM xiaoyan hua wrote: > 我们当前用的是kerberos认证,需要额外配置什么么? xiaoyan hua 邮箱:xiaoyanhua...@gmail.com 签名由 > 网易邮箱大师 定制 在2020年08月25日 15:54,faaron zheng 写道: Hi all, 我在使用flink > sql-client链接hive metastore的时候,发现好像没有做任何权限控制,可以访问所有的表?这一块是没做么?有什么计划么? -- Best regards! Rui Li
Re: hive-exec依赖导致hadoop冲突问题
Hi, hive-exec本身并不包含Hadoop,如果是因为maven的传递依赖引入的话可以在打包时去掉。运行时使用的Hadoop版本可以用你集群Hadoop版本,而不是hive本身依赖的Hadoop版本。另外对于Flink 1.11也可以考虑使用官方提供的flink-sql-connector-hive Uber jar,这个jar包含所有hive的依赖(Hadoop的依赖还是需要另外添加)。更详细的信息建议参考文档 [1][2]。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#dependencies [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes On Mon, Aug 24, 2020 at 9:05 PM amen...@163.com wrote: > > 补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里, > 现在看来,可能是另外某个原因导致的,贴一点点异常栈如下: > > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Could not deploy Yarn job cluster. > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431) > at > org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) > at > org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) > ... 19 more > Caused by: java.lang.ClassCastException: > org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto > cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618) > at > org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) > at > org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424) > ... 24 more > > best, > amenhub > > 发件人: amen...@163.com > 发送时间: 2020-08-24 20:40 > 收件人: user-zh > 主题: hive-exec依赖导致hadoop冲突问题 > hi, everyone > > 组件版本:flink-1.11.1,hive-2.1.1 > > 问题描述: > 使用Table > API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行; > > 当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive > table(不会发生hadoop依赖冲突); > > 但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突; > > > 请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗? > > best, > amenhub > -- Best regards! Rui Li
Re: 请教 hive streaming 报错
hive相关的依赖是怎么添加的啊?这两个类的package名字是一样的,按说可以访问。不确定是不是因为通过不同的classloader加载导致的。 On Mon, Aug 24, 2020 at 2:17 PM McClone wrote: > 版本为:Flink 1.11.0 > > > 2020-08-24 13:33:03,019 ERROR > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Unhandled > exception. > > java.lang.IllegalAccessError: tried to access class > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl > from class > org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder > > at > org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder.(HadoopPathBasedBulkFormatBuilder.java:70) > ~[?:?] > > at > org.apache.flink.connectors.hive.HiveTableSink.consumeDataStream(HiveTableSink.java:197) > ~[?:?] > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:114) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] -- Best regards! Rui Li
Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据
Hello, HDP里的hive版本是多少啊?另外你要查的表是啥样的呢(describe formatted看一下)? On Mon, Aug 24, 2020 at 3:02 AM 黄蓉 wrote: > 各位好: > > > 我使用的环境是HDP3.0.1的沙盒,flink是最新版本的1.11.1,从官网直接下载的编译好的jar包。我想测试flink与hive的集成,包括查询hive表的数据、写入数据到hive表等操作。目前我遇到问题就是通过flink > > sql client查询不出表数据,并且也不报错。但是该表在hive中查询是有记录的。其余的show tables,show > database等语句都可以正常显示。 > > 配置的hadoop环境变量如下: > export HADOOP_CONF_DIR="/etc/hadoop/conf" > export HADOOP_HOME="/usr/hdp/3.0.1.0-187/hadoop" > export > > HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/current/hadoop-mapreduce-client/lib/*" > > sql-client配置文件如下: > tables: [] > functions: [] > catalogs: > - name: myhive > type: hive > hive-conf-dir: /opt/hive-conf > execution: >planner: blink >type: batch >result-mode: table >max-table-result-rows: 100 >parallelism: 3 >max-parallelism: 128 >min-idle-state-retention: 0 >max-idle-state-retention: 0 >current-catalog: myhive >current-database: default >restart-strategy: > type: fallback > deployment: >response-timeout: 5000 >gateway-address: "" >gateway-port: 0 > > > 请问出现这种情况是不是官网的flink包与hdp3.0.1不兼容?我需要自己重新编译flink吗? > > Jessie > jessie...@gmail.com > > -- Best regards! Rui Li
Re: flink orc与hive2.1.1版本冲突怎么解决
是说写数据用的是flink的FileSystem connector,然后读数据是用hive自己去读的?具体错误的stacktrace贴一下看看呢 On Fri, Aug 21, 2020 at 3:05 PM wrote: > flink table > sql把mysql的表数据写到hdfs的路径上,存成orc。hive创建外部表,查询报错。最根本原因是hive2.1是把orc的相关类打包一起的,包名举例,org.apache.hive.orc.,而且新版的是org.apache.orc.。 > > 发自我的iPhone > > > 在 2020年8月21日,14:37,Rui Li 写道: > > > > Hi, > > > > 能不能具体描述一下你的作业是怎么写的?比如怎么创建的hive表,如何把数据写进去之类的。我们可以试试能不能重现你的问题 > > > >> On Fri, Aug 21, 2020 at 1:41 PM wrote: > >> > >> 试过了,一样的,本质也是通过写文件。 > >> > >> 发自我的iPhone > >> > >>>> 在 2020年8月21日,13:35,Jingsong Li 写道: > >>> > >>> 是的 > >>> > >>>> On Fri, Aug 21, 2020 at 1:30 PM wrote: > >>>> > >>>> flink hive表的方式是什么意思?hive streaming吗? > >>>> > >>>> 发自我的iPhone > >>>> > >>>>>> 在 2020年8月21日,13:27,Jingsong Li 写道: > >>>>> > >>>>> Flink filesystem connector 或者 DataStream用flink-orc > >>>> 的版本是比较新的版本,所以老版本的ORC读不了。 > >>>>> > >>>>> 建议你用Flink hive表的方式来写orc > >>>>> > >>>>>> On Fri, Aug 21, 2020 at 12:25 PM wrote: > >>>>>> > >>>>>> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。 > >>>>>> > >>>>>> 发自我的iPhone > >>>>>> > >>>>>>>> 在 2020年8月21日,12:15,Jingsong Li 写道: > >>>>>>> > >>>>>>> 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive > sql写orc是一样的。 > >>>>>>> 确定这个版本hive写出的数据可以被读取吗? > >>>>>>> > >>>>>>>> On Fri, Aug 21, 2020 at 10:17 AM wrote: > >>>>>>>> > >>>>>>>> 使用版本是flink 1.11 > >>>>>>>> Hive 2.1.1 > >>>>>>>> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Best, Jingsong Lee > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Best, Jingsong Lee > >>>> > >>>> > >>> > >>> -- > >>> Best, Jingsong Lee > >> > >> > > > > -- > > Best regards! > > Rui Li > > -- Best regards! Rui Li
Re: flink orc与hive2.1.1版本冲突怎么解决
Hi, 能不能具体描述一下你的作业是怎么写的?比如怎么创建的hive表,如何把数据写进去之类的。我们可以试试能不能重现你的问题 On Fri, Aug 21, 2020 at 1:41 PM wrote: > 试过了,一样的,本质也是通过写文件。 > > 发自我的iPhone > > > 在 2020年8月21日,13:35,Jingsong Li 写道: > > > > 是的 > > > >> On Fri, Aug 21, 2020 at 1:30 PM wrote: > >> > >> flink hive表的方式是什么意思?hive streaming吗? > >> > >> 发自我的iPhone > >> > >>>> 在 2020年8月21日,13:27,Jingsong Li 写道: > >>> > >>> Flink filesystem connector 或者 DataStream用flink-orc > >> 的版本是比较新的版本,所以老版本的ORC读不了。 > >>> > >>> 建议你用Flink hive表的方式来写orc > >>> > >>>> On Fri, Aug 21, 2020 at 12:25 PM wrote: > >>>> > >>>> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。 > >>>> > >>>> 发自我的iPhone > >>>> > >>>>>> 在 2020年8月21日,12:15,Jingsong Li 写道: > >>>>> > >>>>> 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。 > >>>>> 确定这个版本hive写出的数据可以被读取吗? > >>>>> > >>>>>> On Fri, Aug 21, 2020 at 10:17 AM wrote: > >>>>>> > >>>>>> 使用版本是flink 1.11 > >>>>>> Hive 2.1.1 > >>>>>> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬? > >>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> -- > >>>>> Best, Jingsong Lee > >>>> > >>> > >>> > >>> -- > >>> Best, Jingsong Lee > >> > >> > > > > -- > > Best, Jingsong Lee > > -- Best regards! Rui Li
Re: hive只作为元数据管理可以读到具体的表数据吗?
hive catalog只负责管理元数据,具体读数据不是hive catalog来做的哈。所以能读什么样的表取决于flink是不是有对应的connector。文档上看jdbc connector还是不支持Oracle的。 On Fri, Aug 21, 2020 at 11:16 AM Bruce wrote: > 请教大佬: > > > > > flink平台引入hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗? > > 比如hive里面存储了Oracle的t_log表元数据信息,flink可以用hivecatalog读取到t_log具体的表数据吗? > > > > > 发自我的iPhone -- Best regards! Rui Li
Re: flink1.10中hive module 没有plus,greaterThan等函数
是只用了hive module么?建议的方式是同时加载hive module和core module,解析函数的时候会根据加载的顺序去每个module里查找。 On Fri, Aug 21, 2020 at 11:06 AM faaron zheng wrote: > Hi all, 我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core module > 中的build-in function比如plus,greaterThan。这会导致同样的sql core module可以执行成功,hive > module却会报错,比如在使用row_number() over()时候。这是什么原因? -- Best regards! Rui Li
Re: Flink1.11 hive catalog使用
我试了一下确实是有问题,我后面跟进一下。暂时可以用in-memory catalog来避免一下这个问题。 On Wed, Aug 19, 2020 at 3:55 PM Dream-底限 wrote: > hi > 先设置hiveCatalog及默认数据库,然后use hivecatalog > 接下来建表的时候create temporary table test(...),使用的时候:select ... from test > 这个时候就报错说hivecatalog的默认数据库内无这张表 > > Rui Li 于2020年8月19日周三 下午3:49写道: > > > 是怎么用的啊,我去试试 > > > > On Wed, Aug 19, 2020 at 11:19 AM Dream-底限 wrote: > > > > > hi > > > 我先前用这种方式创建的表,在sql中直接使用的时候提示hivecatalog内无这张表,请问这张表使用的时候要怎么引用哪 > > > >或者用create temporary table的方式应该也可以。 > > > > > > Rui Li 于2020年8月19日周三 上午11:11写道: > > > > > > > 可以把kafka的表保存在内置的in-memory > > > catalog里,名字应该是default_catalog,这样就不会持久化了。或者用create > > > > temporary table的方式应该也可以。 > > > > > > > > On Wed, Aug 19, 2020 at 10:53 AM Dream-底限 > wrote: > > > > > > > > > hi > > > > > 我这面在使用hivecatalog将kafka数据落地到hive,但现在有一个问题是,我不想保存kafka source > > > > > table元数据(默认自动保存),通过创建视图或临时表的方式启动flink任务,这时候直接通过kafka source > > > > > table表名直接引用报错,提示hive catalog内没这张表,这种情况我改怎么引用我的kafka未持久化表哪 > > > > > > > > > > > > > > > > > -- > > > > Best regards! > > > > Rui Li > > > > > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best regards! Rui Li
Re: Flink1.11 hive catalog使用
是怎么用的啊,我去试试 On Wed, Aug 19, 2020 at 11:19 AM Dream-底限 wrote: > hi > 我先前用这种方式创建的表,在sql中直接使用的时候提示hivecatalog内无这张表,请问这张表使用的时候要怎么引用哪 > >或者用create temporary table的方式应该也可以。 > > Rui Li 于2020年8月19日周三 上午11:11写道: > > > 可以把kafka的表保存在内置的in-memory > catalog里,名字应该是default_catalog,这样就不会持久化了。或者用create > > temporary table的方式应该也可以。 > > > > On Wed, Aug 19, 2020 at 10:53 AM Dream-底限 wrote: > > > > > hi > > > 我这面在使用hivecatalog将kafka数据落地到hive,但现在有一个问题是,我不想保存kafka source > > > table元数据(默认自动保存),通过创建视图或临时表的方式启动flink任务,这时候直接通过kafka source > > > table表名直接引用报错,提示hive catalog内没这张表,这种情况我改怎么引用我的kafka未持久化表哪 > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best regards! Rui Li