回复:回复:flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 Shuai Xia
改动其实很小,就那一句代码的事,主要就看你怎么编译而已


--
发件人:大罗 
发送时间:2020年9月8日(星期二) 17:05
收件人:user-zh 
主 题:Re: 回复:flink sql 1.11.1 could not insert hive orc record

你的回答我觉得应该是解决问题的方向。

有没有guideline,或者类似的参考,我可以自己修改ORC源码并且编译使用呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 大罗
你的回答我觉得应该是解决问题的方向。

有没有guideline,或者类似的参考,我可以自己修改ORC源码并且编译使用呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 Shuai Xia
# 主要原因为Orc在新版本后使用的WriterVersion为ORC_517
# 导致低版本的Hive解析不了
# 自实现OrcFile类,修改回旧版本
static {
CURRENT_WRITER = WriterVersion.HIVE_13083;
memoryManager = null;
}


--
发件人:大罗 
发送时间:2020年9月8日(星期二) 16:55
收件人:user-zh 
主 题:Re: flink sql 1.11.1 could not insert hive orc record

Hi ,我例子中的hive orc表,不是事务表,如图:

createtab_stmt 
CREATE TABLE `dest_orc`( 
  `i` int) 
PARTITIONED BY (  
  `ts` string) 
ROW FORMAT SERDE  
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'  
STORED AS INPUTFORMAT  
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'  
OUTPUTFORMAT  
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' 
LOCATION 
  'hdfs://nameservice1/opt/user/hive/warehouse/dw.db/dest_orc' 
TBLPROPERTIES ( 
  'is_generic'='false',  
  'orc.compress'='SNAPPY',  
  'transient_lastDdlTime'='1599555226') 




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 Jingsong Li
Hi,

flink-sql-orc_2.11-1.11.0.jar 和 flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar
目前是不能共存的,不然会冲突,你试试去掉flink-sql-orc看看?

On Tue, Sep 8, 2020 at 4:55 PM 大罗  wrote:

> Hi ,我例子中的hive orc表,不是事务表,如图:
>
> createtab_stmt
> CREATE TABLE `dest_orc`(
>   `i` int)
> PARTITIONED BY (
>   `ts` string)
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
> STORED AS INPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
> LOCATION
>   'hdfs://nameservice1/opt/user/hive/warehouse/dw.db/dest_orc'
> TBLPROPERTIES (
>   'is_generic'='false',
>   'orc.compress'='SNAPPY',
>   'transient_lastDdlTime'='1599555226')
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


回复:flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 Shuai Xia
flink-orc模块版本应该只支持新版本,2.1.1支持不了,可以自己修改ORC源码


--
发件人:大罗 
发送时间:2020年9月8日(星期二) 16:55
收件人:user-zh 
主 题:Re: flink sql 1.11.1 could not insert hive orc record

Hi ,我例子中的hive orc表,不是事务表,如图:

createtab_stmt 
CREATE TABLE `dest_orc`( 
  `i` int) 
PARTITIONED BY (  
  `ts` string) 
ROW FORMAT SERDE  
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'  
STORED AS INPUTFORMAT  
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'  
OUTPUTFORMAT  
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' 
LOCATION 
  'hdfs://nameservice1/opt/user/hive/warehouse/dw.db/dest_orc' 
TBLPROPERTIES ( 
  'is_generic'='false',  
  'orc.compress'='SNAPPY',  
  'transient_lastDdlTime'='1599555226') 




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 大罗
Hi ,我例子中的hive orc表,不是事务表,如图:

createtab_stmt  
CREATE TABLE `dest_orc`(
  `i` int)  
PARTITIONED BY (
  `ts` string)  
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'   
STORED AS INPUTFORMAT   
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' 
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'hdfs://nameservice1/opt/user/hive/warehouse/dw.db/dest_orc'  
TBLPROPERTIES ( 
  'is_generic'='false', 
  'orc.compress'='SNAPPY',  
  'transient_lastDdlTime'='1599555226') 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 taochanglian

看看你的表是不是事务表,hive建表的时候加上 'transactional' = 'false'

在 2020/9/8 16:26, 大罗 写道:

Hi,我使用flink sql 1.11.1 的hive catalog特性往hive orc表插入数据:

我所使用的版本如下:

Hadoop 3.0.0+cdh6.3.2

HDFS 3.0.0+cdh6.3.2

HBase 2.1.0+cdh6.3.2

Hive 2.1.1+cdh6.3.2

Flink 1.11.1

定义hive orc表如下:
create table dest_orc (
  i int
) partitioned by (ts string)
stored as orc
TBLPROPERTIES(
  'orc.compress' = 'SNAPPY'
);

在flink-sql插入数据:
Flink SQL> insert into dest_orc select 1,  '2020-09-08 10:11:00' ;
[INFO] Table update statement has been successfully submitted to the
cluster:
Job ID: a2c96bcaf23abc24de8e5405ec2bb7c6

报错如下:
2020-09-08 16:16:39
org.apache.flink.connectors.hive.FlinkHiveException:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create
Hive RecordWriter
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159)
at
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at
org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$183.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException:
Failed to create Hive RecordWriter
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58)
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151)
... 25 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55)
... 26 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getOptions(OrcOutputFormat.java:161)
at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:189)
at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:67)
at

flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 大罗
Hi,我使用flink sql 1.11.1 的hive catalog特性往hive orc表插入数据:

我所使用的版本如下:

Hadoop 3.0.0+cdh6.3.2

HDFS 3.0.0+cdh6.3.2

HBase 2.1.0+cdh6.3.2

Hive 2.1.1+cdh6.3.2

Flink 1.11.1

定义hive orc表如下:
create table dest_orc (
 i int
) partitioned by (ts string)
stored as orc
TBLPROPERTIES(
 'orc.compress' = 'SNAPPY'
);

在flink-sql插入数据:
Flink SQL> insert into dest_orc select 1,  '2020-09-08 10:11:00' ;
[INFO] Table update statement has been successfully submitted to the
cluster:
Job ID: a2c96bcaf23abc24de8e5405ec2bb7c6

报错如下:
2020-09-08 16:16:39
org.apache.flink.connectors.hive.FlinkHiveException:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create
Hive RecordWriter
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159)
at
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at
org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$183.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException:
Failed to create Hive RecordWriter
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58)
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151)
... 25 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55)
... 26 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getOptions(OrcOutputFormat.java:161)
at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:189)
at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:67)
at
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:284)