hello,all
想咨询下各位是否遇到这个问题:
环境:
hadoop 2.10.1
hive 2.3.6
flink 1.12-1.14(这些版本都测试了,都有这个问题)
依赖部署方式:
在环境变量中添加了`hadoop class_path`
使用官方提供的flink-sql-connector-hive-2.3.6 包放置flink/lib 下
使用sql-client 进行任务提交sql 如下:
create table source_kafka(
order_id int,
order_date varchar,
customer_name varchar,
price decimal(10,3),
product_id int,
order_status boolean
)with(
'connector' = 'kafka',
'topic' = 'topn_char1',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
create table datagen_source
with( 'connector' = 'datagen') like source_kafka(EXCLUDING OPTIONS);
create catalog myhive_local with ('type' = 'hive', 'hive-conf-dir' =
'/Users/feng/hive-2.3.6/conf');
SET 'table.sql-dialect'='hive';
create table orders_local(
order_id int,
order_date string,
customer_name string,
price decimal(10,3),
product_id int,
order_status boolean
)partitioned by (dt string)
stored as orc;
SET 'table.sql-dialect'='default';
insert into myhive_local.cataloghive.orders_local
/*+ OPTIONS(
'sink.partition-commit.trigger'='process-time',
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='1min',
'sink.rolling-policy.check-interval'='1min',
'auto-compaction'='true',
'compaction.file-size'='128MB'
) */
select * , '2099-09-00' as dt from datagen_source;
这个任务直接提交至本地集群运行是没问题的,但是写表的方式是用mapred 的方式,比较慢,无法满足性能要求,根据官方文档
加了
set table.exec.hive.fallback-mapred-writer =
false<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hive/hive_read_write/#table-exec-hive-fallback-mapred-writer>
这个参数后,再提交都会报缺少orc 包中的方法,不知是冲突还是依赖配置的问题
但是parquet 的表格式就可以支持,orc 的表格式就无法支持,试了flink 1.12以后的版本都是相同的报错
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.NoSuchMethodError:
org.apache.orc.OrcFile$WriterOptions.getHadoopShims()Lorg/apache/orc/impl/HadoopShims;
at
org.apache.flink.orc.writer.PhysicalWriterImpl.<init>(PhysicalWriterImpl.java:103)
at
org.apache.flink.orc.writer.OrcBulkWriterFactory.create(OrcBulkWriterFactory.java:99)
at
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory.create(FileSystemTableSink.java:630)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:75)
at
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
at
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at StreamExecCalc$21.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
at
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:117)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
请教下,是否有人遇到过?