尴尬。。。。
我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅
这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
还有两个问题问下,
问题1:
创建的kafka_table,在hive和Flink
SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
问题2:
刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
~[?:1.8.0_161]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
~[?:1.8.0_161]
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
~[?:1.8.0_161]
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
~[?:1.8.0_161]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[?:1.8.0_161]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[?:1.8.0_161]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_161]
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
[qile-data-flow-1.0.jar:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
[qile-data-flow-1.0.jar:?]
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
[qile-data-flow-1.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[qile-data-flow-1.0.jar:?]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[qile-data-flow-1.0.jar:?]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[qile-data-flow-1.0.jar:?]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[qile-data-flow-1.0.jar:?]
Caused by:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Unable to create a sink for writing table
'default_catalog.default_database.hive_table1'.
Table options are:
'connector'='filesystem'
'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='0s'
'sink.partition-commit.policy.kind'='metastore,success-file'
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a
sink for writing table 'default_catalog.default_database.hive_table1'.
Table options are:
'connector'='filesystem'
'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='0s'
'sink.partition-commit.policy.kind'='metastore,success-file'
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[qile-data-flow-1.0.jar:?]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[qile-data-flow-1.0.jar:?]
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
~[qile-data-flow-1.0.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
~[qile-data-flow-1.0.jar:?]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
~[qile-data-flow-1.0.jar:?]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
~[qile-data-flow-1.0.jar:?]
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
~[qile-data-flow-1.0.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
~[qile-data-flow-1.0.jar:?]
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
~[qile-data-flow-1.0.jar:?]
at
cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
~[qile-data-flow-1.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_161]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_161]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option ''connector'='filesystem''.
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[qile-data-flow-1.0.jar:?]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[qile-data-flow-1.0.jar:?]
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
~[qile-data-flow-1.0.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
~[qile-data-flow-1.0.jar:?]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
~[qile-data-flow-1.0.jar:?]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
~[qile-data-flow-1.0.jar:?]
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
~[qile-data-flow-1.0.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
~[qile-data-flow-1.0.jar:?]
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
~[qile-data-flow-1.0.jar:?]
at
cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
~[qile-data-flow-1.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_161]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_161]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any
factory for identifier 'filesystem' that implements
'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
Available factory identifiers are:
blackhole
hbase-1.4
jdbc
kafka
print
at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[qile-data-flow-1.0.jar:?]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[qile-data-flow-1.0.jar:?]
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
~[qile-data-flow-1.0.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
~[qile-data-flow-1.0.jar:?]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
~[qile-data-flow-1.0.jar:?]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
~[qile-data-flow-1.0.jar:?]
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
~[qile-data-flow-1.0.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
~[qile-data-flow-1.0.jar:?]
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
~[qile-data-flow-1.0.jar:?]
at
cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
~[qile-data-flow-1.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_161]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_161]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
~[flink-clients_2.11-1.11.0.jar:1.11.0]
... 10 more
query:
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000,
CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
val blinkEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
blinkEnvSettings)
streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
streamTableEnv.executeSql(
"""
|
|
|CREATE TABLE hive_table (
| user_id STRING,
| age INT
|) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
TBLPROPERTIES (
| 'connector'='filesystem',
| 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
| 'sink.partition-commit.delay'='0s',
| 'sink.partition-commit.policy.kind'='metastore,success-file'
|)
|
|""".stripMargin)
streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
streamTableEnv.executeSql(
"""
|
|CREATE TABLE kafka_table (
| uid VARCHAR,
| -- uid BIGINT,
| sex VARCHAR,
| age INT,
| created_time TIMESTAMP(3),
| WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = 'universal',
| 'connector.topic' = 'user',
| -- 'connector.topic' = 'user_long',
| 'connector.startup-mode' = 'latest-offset',
| 'connector.properties.zookeeper.connect' =
'cdh1:2181,cdh2:2181,cdh3:2181',
| 'connector.properties.bootstrap.servers' =
'cdh1:9092,cdh2:9092,cdh3:9092',
| 'connector.properties.group.id' = 'user_flink',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)
|""".stripMargin)
streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
streamTableEnv.executeSql(
"""
|
|INSERT INTO hive_table
|SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
DATE_FORMAT(created_time, 'HH')
|FROM kafka_table
|
|""".stripMargin)
streamTableEnv.executeSql(
"""
|
|SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18'
|
|""".stripMargin)
.print()
在 2020-07-13 17:52:54,"Jingsong Li" <[email protected]> 写道:
>你把完整的程序再贴下呢
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[email protected]> wrote:
>
>> Hi,
>>
>>
>> 我现在改成了:
>> 'sink.partition-commit.delay'='0s'
>>
>>
>> checkpoint完成了20多次,hdfs文件也产生了20多个,
>> hive表还是查不到数据
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-13 17:23:34,"夏帅" <[email protected]> 写道:
>>
>> 你好,
>> 你设置了1个小时的
>> SINK_PARTITION_COMMIT_DELAY
>>
>>
>> ------------------------------------------------------------------
>> 发件人:Zhou Zach <[email protected]>
>> 发送时间:2020年7月13日(星期一) 17:09
>> 收件人:user-zh <[email protected]>
>> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for
>> discovering a connector.
>>
>>
>> 开了checkpoint,
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> streamExecutionEnv.enableCheckpointing(5 * 1000,
>> CheckpointingMode.EXACTLY_ONCE)
>> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>>
>>
>>
>>
>> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-13 16:52:16,"Jingsong Li" <[email protected]> 写道:
>> >有开checkpoint吧?delay设的多少?
>> >
>> >Add partition 在 checkpoint完成 + delay的时间后
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[email protected]> wrote:
>> >
>> >> Hi,
>> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> >> partition到hive表吗,我当前设置了参数
>> >> 'sink.partition-commit.policy.kind'='metastore'
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> At 2020-07-13 15:01:28, "Jingsong Li" <[email protected]> wrote:
>> >> >Hi,
>> >> >
>> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >> >
>> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >> >
>> >> >Best,
>> >> >Jingsong
>> >> >
>> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[email protected]> wrote:
>> >> >
>> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >> >>
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> >> 也报错误
>> >> >> query:
>> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> >> streamTableEnv.executeSql(
>> >> >> """
>> >> >> |
>> >> >> |
>> >> >> |CREATE TABLE hive_table (
>> >> >> | user_id STRING,
>> >> >> | age INT
>> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> >> TBLPROPERTIES (
>> >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >> >> | 'sink.partition-commit.trigger'='partition-time',
>> >> >> | 'sink.partition-commit.delay'='1 h',
>> >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> >> |)
>> >> >> |
>> >> >> |""".stripMargin)
>> >> >>
>> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> >> streamTableEnv.executeSql(
>> >> >> """
>> >> >> |
>> >> >> |CREATE TABLE kafka_table (
>> >> >> | uid VARCHAR,
>> >> >> | -- uid BIGINT,
>> >> >> | sex VARCHAR,
>> >> >> | age INT,
>> >> >> | created_time TIMESTAMP(3),
>> >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3'
>> >> SECOND
>> >> >> |) WITH (
>> >> >> | 'connector.type' = 'kafka',
>> >> >> | 'connector.version' = 'universal',
>> >> >> | 'connector.topic' = 'user',
>> >> >> | -- 'connector.topic' = 'user_long',
>> >> >> | 'connector.startup-mode' = 'latest-offset',
>> >> >> | 'connector.properties.zookeeper.connect' =
>> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> >> | 'connector.properties.bootstrap.servers' =
>> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> >> | 'connector.properties.group.id' = 'user_flink',
>> >> >> | 'format.type' = 'json',
>> >> >> | 'format.derive-schema' = 'true'
>> >> >> |)
>> >> >> |""".stripMargin)
>> >> >>
>> >> >>
>> >> >>
>> >> >> streamTableEnv.executeSql(
>> >> >> """
>> >> >> |
>> >> >> |INSERT INTO hive_table
>> >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> >> >> DATE_FORMAT(created_time, 'HH')
>> >> >> |FROM kafka_table
>> >> >> |
>> >> >> |""".stripMargin)
>> >> >>
>> >> >> streamTableEnv.executeSql(
>> >> >> """
>> >> >> |
>> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >> >> |
>> >> >> |""".stripMargin)
>> >> >> .print()
>> >> >> 错误栈:
>> >> >> Exception in thread "main"
>> >> org.apache.flink.table.api.ValidationException:
>> >> >> Unable to create a sink for writing table
>> >> >> 'default_catalog.default_database.hive_table'.
>> >> >>
>> >> >> Table options are:
>> >> >>
>> >> >> 'hive.storage.file-format'='parquet'
>> >> >> 'is_generic'='false'
>> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> >> 'sink.partition-commit.delay'='1 h'
>> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> >> 'sink.partition-commit.trigger'='partition-time'
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> >> at
>> >> >>
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> >> at
>> >> >>
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> >> at
>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> >> at
>> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> >> at
>> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> >> at
>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >> >> at
>> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >> >> at
>> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >> >> at
>> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>> >> >> at
>> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
>> >> >> Caused by: org.apache.flink.table.api.ValidationException: Table
>> options
>> >> >> do not contain an option key 'connector' for discovering a connector.
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >> >> ... 19 more
>> >> >>
>> >> >>
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee